Thoughts on inter-service messaging

Anyone who knows me knows that message buses are something of a passion of mine. 신경 is effectively feature-complete at this point, as no other types of messaging make sense to use therewith, so I’ve been turning my attention to the rest of the message bus / queue / log ecosystem and starting to think about ways it can be improved.

This is, of course, a topic that’s near and dear to my heart, so this is probably gonna get long. Don’t say I didn’t warn you!

Now let’s discuss some common messaging patterns!

Event bus #

First and foremost is our dear friend, the event bus. Event buses are an incredibly common pattern when it comes to writing applications. If you’ve ever heard of an event loop, it’s more/less the same thing in practice time for someone to tell me how I’m wrong..

Using an event bus is as simple as:

public class MyConsumer implements Consumer {
    @Override
    public Event consume(Event event) {
        System.out.println("Got event: " + event.value());
        return event;
    }
}

public class Main {
    public static void main(String[] args) {
        EventBus.register(new MyConsumer());
        EventBus.fire(new Event("test"));
    }
}

This is, of course, a simplified version that omits implementation details. Note that some event buses may not allow you to mutate and return an event. That particular detail isn’t super-relevant to this.

Implementations of a particular event bus will almost always let you specify that your consumer only handles certain types of events, or it may be wrapped in an adapter class that switches on types for you, or… There’s no end to ways that this can be implemented. This method of consuming only some events is generally implemented by either passing a type to the event bus’ register function, or using a string to specify a “topic” to listen on.

One thing that is important to know is that an event bus may come with a concept of listener priority. That is, an event bus will pass events from one client to the next – often allowing for transformation of events as it passes to each client – and will do so in a specified order, which is one of:

If there’s no priority or other external ordering – which is the most common case, in my experience – messages will usually be delivered to clients in the order they were added, first-to-last.

Push-only #

Speaking very broadly, an event bus is push-only. This means that events will be sent to your consumers as fast as they arrive, and can result in overloading them. This, of course, can be seen as timeouts, out-of-memory errors, not enough database connections, or outright crashing. It all depends on how your code handles – or doesn’t handle – this particular failure case.

In some cases, this can be a desirable property. Many years ago, I wrote Minecraft mods that injected event-firing bytecode into the client’s main tick/render loops, and then could register listeners to do things at certain stages of the tick / render process. Pushing events as fast as possible is a desirable property, because these events have to be fired synchronously, and you want to avoid blocking tick/render loops for very long. This does imply that your event consumers have to be fast, but ideally that wouldn’t be an issue.

Pub/Sub #

Pubsub is effectively the same thing as an event bus in practice. Often times you’ll see this implemented as listening on a topic – via passing a string instead of a type, such as in Redis Pub/Sub or the Vert.x event bus – rather than a specific type. This happens because of the many-languages problem: How can you push a specific type 1) across the network, and 2) when the receiver may not know it exists? Protobufs and the like do solve this problem, and are used for it in practice, but that’s not exactly the same as passing types.

Aside: Message push in 신경 #

신경 pushes messages to clients based on server-stored metadata that they set. You can sidestep the backpressuring problem a bit by emulating it in metadata, ie. set a “current capacity” metadata value and then use a metadata query to only push to clients with sufficient capacity.

Message queuing #

An event bus’ fatal flaw ends up being backpressure – how do we prevent our consumers from being overloaded? Well, we could backpressure in the consumer, making sure it never overloads. But what about when a consumer crashes? What happens to its backpressure queue? Those events are just lost! So what do we do?

Message queues!

By queuing messages, and only dispatching them to a consumer when said consumer requests them, we sidestep the backpressuring problem quite neatly. This has the caveat that the server has to buffer messages, persist them or hope that cross-node replication is good enough, whatever your use-case needs.

However, message queuing has the same general problems as an event bus! Namely, the sorts of problems that come out of “just put in a string to subscribe to a topic.” While this is a decent way of doing it, you always run the risk of accidentally pushing bad data somehow.

This is honestly a very good way of handling it. As long as you can make sure that your types work right, you’ll likely never have issues.

Aside: Message queuing in 신경 #

신경’s message queuing is basically what was described above, with one main caveat: When a client requests a message dispatch, the queue will dispatch a message if and only if the next message’s metadata query matches the requesting client’s metadata. If there’s no match, no dispatch.

Message log #

Now this is the super-interesting one. You may have heard of Apache Kafka before1 – it’s the big player in this space.

The idea with a log is that it’s an immutable, persistent, in-order record of when events were dispatched. You can replay events from any point in the past, and even only partially-replay events to only look at a subset of events. This is stupidly powerful to have – it means that you can have things like trivial rollback by just playing the event stream in reverse until a certain point.

This combines with the power of generic message queuing to create something that’s useful for a very large number of circumstances. With smart disk access patterns and caching, a message log-based system can push out messages in <5ms! Kafka, for example, can reach latencies of 2 milliseconds, which is unbelievably low latency.

With a log-backed messaging system like this, it becomes trivial to implement event sourcing, potentially going as far as implementing CQRS thereupon.

For a real-world example of this, consider a Minecraft server. It’ll have multiple players online at any given time, all of them making changes to the world, with some of those changes cascading by causing other changes to happen. A player could place a water bucket, which would cause cascading events by water spreading, water spread breaking blocks, etc.

Now, if a player were to grief another by destroying their house, all these events would be recorded in the log, and could be rolled back by doing so. You can, of course, do this by just recording events to MySQL or Postgres or ClickHouse or any other database, but if you were doing something wild like trying to keep many servers in sync, an immutable and soft-real-time event log, some conflict resolution, and the possibility of forking event streams could actually be very useful for this.

Aside: Message log in 신경 #

This does not exist. Probably for good reason.

So why did we cover all of that, exactly? #

Now that we know what all the types of messaging are, we can talk about what this rambling thing was supposed to be about – thoughts about messaging and what we can do to improve it. In a previous blog post, I talked a lot about the idea of specifics vs. semantics when it comes to messaging. This is going to be at a lower level than that post was, since here we’ll be diving into thoughts, speculation, and theorycrafting around actually building out this sort of stuff.

With that out of the way, let’s talk messaging!

About a year ago, I wrote crush, a time-traveling key-value store. I didn’t realise it at the time, but crush is basically a bootleg log – past revisions are immutable, although it only stores up to N revisions (currently 8), past versions of an object are stored as binary diffs, and you can get back patched or unpatched revisions of an object as well as its current state.

Lately, I’ve been thinking about adding persistence to crush. Currently it’s just a replicated DeltaCRDT that stores all the data, patches and all. I’m thinking it might be smart to store key-value pairs in sled via some NIFs. That allows for evicting most data from memory via an LRU cache, removal of DeltaCRDTs entirely, removing the revision cap, and finding missing entries by querying other nodes in the cluster.

And once there’s persistence, we have a real log with unlimited revisions! We’re basically halfway to replacing Kafka at that point.

But I want to take this idea further.

There’s been a lot of cool things coming up in the world of (D)VCS, like Pijul and Fossil. It makes me wonder if I can apply the same concept to crush. Forking data off at a certain revision, being able to “commit” to both versions, a solid merge strategy, and more. It almost sounds like a dream for this sort of thing – as long as the forks/merges/etc. are persisted in their own log, so that you can understand how your system got to a certain point, but that’s less-relevant right now.

At this point of development, it would be a very powerful tool. This does, of course, ignore the obvious improvements like full or partial database indexing, but getting into that is getting too far off of this topic.

But we can go even further into this idea.

What if I want to know when something happens? What if I want to receive events when something happens in the crush database? I could even emit those events as 신경 messages!

And suddenly, we have a full, real message log!

Every time you change an entity, you automatically get a diff of it stored in the log, and this would persist across forks/merges thereof. When you fork or merge an entity, you can automatically know about things like merge conflicts, fork limits, and so on. Clients can either subscribe to push events, or read from the log, optionally following data forks.

Granted, this means it’s no longer quite just a log; it’s turned into more of a database. And while Kafka is technically a database, “Kafka is not a database” is a common refrain. Things like ACID compliance will bite you, very hard, if you aren’t careful.

Why not just build this into 신경? #

While it is tempting, I don’t think that 신경 makes sense to have this integrated therein. Sure, it could be cool to see things like a client’s metadata history, but there’s no real point. It’s pretty far outside of 신경’s focus.

신경 also tries to be very elastic and tolerate nodes disappearing randomly with minimal visible impact. Building a replicated database like this is technically what a DVCS is for, but that’s not the same as it being easy to implement in a sane way, unless every read and/or write blocked long-enough to make sure it was fully replicated. So that’s an issue for another day.

As a separate project, though, it starts feeling more viable. It can be done as just a single node, and maybe from there it can build out to deal with replication once the core functionality is more solid.

It could be incredibly useful to have. It means that, for the most part, you can sidestep dealing with separate messaging and databases, you can minimise the number of things that you need to host, and more. Of course, there are downsides that have to be accounted for – such as maybe not wanting to store every revision of a multi-GB blob – but for the most part those can be worked around at an API level, I think.

A system like this also complements 신경 very well. You can subscribe to data store events by setting some metadata about your client, and the data store can trivially emit those. Doing that would also help to keep reasonable separation of concerns – an event store doesn’t really need to know how to route messages to clients. This does lead to the current weak-point of 신경, namely that metadata is only barely typed, in that only top-level types are validated, and anything can be stored inside those. This starts getting into the ideas laid out in the previous blog post mentioned earlier, so you should definitely go read that if you made it this far.

By baking this idea of data versioning in at the event store level, you get lovely benefits like being able to sorta-kinda fake transactions by forking the data and then merging it back into the main branch of the data; in theory, this would even allow for real handling of transactions happening at the same time! Take this with a grain of salt, of course, as ACID compliance is an area where my knowledge starts to hit a wall, and is the sort of thing that everyone struggles with unless they’ve already done it.

VCS is, of course, a very hard problem to solve in this case. Pijul seems to support binary patches, so it’s not entirely unlikely that this turns out to be doable. Pijul is actually really cool since it’s based on a sound theory of patches which is a little bit over my head but I can get the gist of it. It just means that this little thought experiment is just a bit more viable now!

This is, of course, a very complicated project that won’t get fully built any time soon. But it’s the small steps that start these things out; I actually convinced myself that this is a good idea while writing this, and so I’ll likely be building it out at some point soon. It does sound like a very fun project to work on.


All of this being said?

I’d like to live in a world where this thing real.

 
0
Kudos
 
0
Kudos

Now read this

Building app infrastructure in Elixir: Time-traveling state

In the previous post, I discussed making crush into a more-viable data store for mahou. Since then, I’ve implemented forking and joining of key-value pairs in crush, and this post will be discussing the implementation thereof. Fork-join... Continue →