This is the first part of the Distributed Designs series. For your convenience you can find other parts using the links below:
Part 1 — Outbox without idempotency nor synchronous commit
Part 2 — Transactional outbox pattern and multiple instances of relay
Part 3 — Taking lock in MVCC for transactional outbox pattern
Transactional outbox is a common pattern in the distributed systems. It helps to avoid having either orphaned records (for which messages were not published) or messages pointing to non-existing records (for which database entities do not exist). However, the pattern assumes, that the consumer is idempotent, so the consumer can handle duplicates easily by retrying the logic. Even the linked page says that:
The Message Relay might publish a message more than once. It might, for example, crash after publishing a message but before recording the fact that it has done so. When it restarts, it will then publish the message again. As a result, a message consumer must be idempotent, perhaps by tracking the IDs of the messages that it has already processed. Fortunately, since Message Consumers usually need to be idempotent (because a message broker can deliver messages more than once) this is typically not a problem.
What can we do in case when the message consumer doesn’t support idempotency? One idea is to detect duplicates somehow, and only handle given message once. However, how can we do that in case when we lack a synchronous commit in the data storage? I recently stumbled upon such a problem and this is the solution I used.
Problem statement
Let’s say that we have n
clusters of our application. We can communicate between them, but all the changes we perform are not synchronously committed but only eventually consistent. All clusters are active, they can accept incoming transactions, they can modify the state. We have some database, we have some queue, we have some synchronization mechanisms between the clusters, but we can’t build any lock nor run transaction across all of the clusters. To sum up, these are our requirements:
- We have
n
clusters. Each cluster runs a database, a queue, a message relay, and a message consumer - Database is replicated across clusters, but we lack a synchronous commit. We do have an eventual consistency
- We now we have some bounded timeout for synchronization, but we don’t know what it is. We can safely assume it’s belowe some reasonable value like 10 minutes
- Message consumer is not idempotent
If we implement transactional outbox the regular way, then we’ll end up with n
messages published to the queue. Since our message consumer is not idempotent, it will modify the state n
times. However, we can’t easily track duplicates, as we lack a synchronous commit in the database across all clusters, so we can’t easily check if the message was already processed.
Solution
We introduce the following environment variables:
CLUSTERS_COUNT
– that indicates how many clusters are runningCLUSTER_ID
– identifier of the cluster a given component is inMAXIMUM_DELAY
– some reasonable timeout for replication, like 10 minutes
Now, the trick is. We extend our outbox table with additional column indicating which cluster was the true source of the message. So we add additional integer column which we fill with the value of CLUSTER_ID
variable. Next, we modify our message relay to extract messages in the following way:
1 2 3 |
SELECT * FROM outbox WHERE cluster_id = $CLUSTER_ID OR creation_time < NOW - $MAXIMUM_DELAY |
This reads as: message relay gets all the messages coming from the same cluster, or all messages that are old enough. The former is the “happy path” – the message was created by the application code, shortly after the relay picks it up and puts to the queue, and also removes it from the outbox table (or marks as processed). The latter is the “unhappy path” – if the originating cluster crashed for whatever reason after putting the message to the outbox, then the message will be delayed by MAXIMUM_DELAY
and then picked up by some other message relay from some other cluster. This shouldn’t happen often, but when it happens, then it’s an indicator of some “catastrophic outage”.
However, in the “happy path” we won’t get any duplicate posted to the queue. Obviously, we may still face the regular crashes or connection rests, as in the transactional outbox, but they are not because of the lack of synchronous commit.
You may also notice that when the catastrophic outage happens, then the message will be duplicated n-1
times. We may come up with some more sophisticated query to extract messages, to effectively introduce ordering between the clusters.