Distributed Designs Part 2 — Transactional outbox pattern and multiple instances of relay

This is the second part of the Distributed Designs series. For your convenience you can find other parts in the table of contents in Part 1 — Outbox without idempotency nor synchronous commit

Last time we saw how to use transactional outbox pattern with multiple databases and no synchronous commit. We learned how to synchronize things between datacenters. The solution used delays in Relay component to get messages from the same datacenter only to avoid having the lock between databases.

Now, let’s consider another problem: can we scale out Relay? In other words, can we have multiple instances of Relay running in the same datacenter? Let’s see.

Should we scale Relay?

First, let’s be explicit: we most likely don’t need to scale the Relay. One instance is enough from the performance perspective. Obviously, there are cases when we have so many messages to propagate that we could benefit from parallelizing that, but it’s not as straightforward as it may seem. Relay needs to meet the following requirements:

  • Ideally, Relay shouldn’t produce duplicates (on the happy path)
  • Relay should process messages in the proper order. We don’t want to reorder the events if possible
  • Relay should be fast. We should avoid locks and delays if possible

Being that said, it gets tricky to parallelize Relay. We need to send messages serially, so there is no easy way to make it faster.

However, we may need to scale out Relay because we run things in parallel by default and we can’t have just one instance. Without going into details whether it’s reasonable, we may be forced to run multiple instances of Relay. How to do it?

Naïve scaling

When people describe transactional outbox pattern, they typically focus on the idea to insert business entity and message entity in the same transaction. However, there is another important problem to solve: how to read the messages from the table to avoid duplicating messages sent to the queue? If you just scale out Relay, then you’ll effectively get duplicates.

Typically, Relay works this way:

Let’s say there are two instances of Relay. Both of them get messages from the database, both of them send them to the queue, and then both of them try to mark messages as sent and commit changes to the database. However, one instance will succeed, and the other instance will simply fail. This is not a problem per se, the message is still delivered to the queue. However, with this approach we will have duplicates on the happy path.

To solve that, we need to block the latter Relay instance from reading the rows that are being processed by the former instance. To do that, we need to make sure that locks are properly taken on the database end. Specifically, we need the following:

  1. When reading a row, a lock is taken
  2. The lock prevents other transactions from modifying the row until the end of the current transaction
  3. The lock prevents other transaction from reading the row

Let’s examine one by one.

When reading a row, a lock is taken

This is done automatically by the database engine. Whenever we touch the row, the locks are taken appropriately.

The lock prevents other transactions from modifying the row until the end of the current transaction

This is seemingly easy. We have various database isolation levels. However, they typically focus on reading the data when they can be modified by other transaction. However, just preventing the modifications is not enough. Let’s see why.

First, let’s create the table:

Let’s now try running things in parallel with READ COMMITTED (the default) using MS SQL:

First transaction reads the row, takes the lock, and immediately releases it. Second transaction reads the row, updates it, and commits. First transaction updates the row and commits. There is no error here, no exception. It just works. This way, we get duplicates in the queue.

Let’s change the isolation level to REPEATABLE READ:

First transaction reads the row. Second transaction reads the row properly. However, when it tries to update it, the second transaction hangs. The first transaction then hangs the same way and gets killed. The second transaction finishes properly. As a result, we get the duplicate in the queue. However, we observed the error in the Relay, but we can’t help that. It’s too late. We get the same with SERIALIZABLE.

However, what if we try that with MySQL? Since MySQL with InnoDB uses snapshots for REPEATABLE READ, we get the following:

This works with no issue. When we move to SERIALIZABLE, we get this:

Therefore, SERIALIZABLE makes the issue visible. Similarly, for Oracle we would get ORA-08177: can't serialize access for this transaction.

From the theoretical point of view, we need to use REPEATABLE READ isolation level. However, the problem is with the actual implementations. Typically, the database engine doesn’t try to predict the future, so the engine won’t stop the transaction from moving forward just because something wrong could happen. Instead, the engine decides to roll things back in case of issues. That’s why we get an error in the REPEATABLE READ or SERIALIZABLE examples above. This is not enough for us, though. We need to prevent others from reading the rows and not let them move forward. To see how to do that, let’s move on.

The lock prevents other transaction from reading the row

When we read about database isolation levels, they typically focus on preventing modifications to the row we want to read. However, they don’t cover how to prevent other transactions from reading the row at all. To do that, we can use the SELECT FOR UPDATE syntax. In practice, this is not a matter of the isolation level, but rather a case of taking the lock for updates and the internal database implementation. Let’s see how to do that in MS SQL:

Similarly, we can do the same in MySQL or Oracle using SELECT * FROM dbo.t FOR UPDATE;.

This way we can block the row and make sure that no duplicates are sent to the queue. And notice that it works with READ COMMITTED. Read on to understand why.

What isolation level should I choose? And what about snapshots?

Let’s now consider what isolation level we should choose and why.

In theory, we need to go with REPEATABLE READ or above. That’s because READ COMMITTED guarantees that we read only the committed data. However, committed doesn’t mean latest. We can read a row that has its values updated already but these values are not committed yet. What’s worse, in theory we can read data that has been changed already and committed to the database. If we take that to the extreme, we could even always read the same empty result, because at some point that’s what was committed to the database. This is crazy, but if our transaction doesn’t modify the rows at all, then it’s perfectly valid according to the definition (and completely unreasonable and unexpected).

However, REPEATABLE READ makes it much more predictable. That’s because the UPDATE statement needs to read the rows again. Therefore, if the row was modified at some point and committed to the database, then the UPDATE would fail because it wouldn’t find the same row which is against the definition of REPEATABLE READ. Therefore, we need to go with REPEATABLE READ to make sure the result is correct at the very end.

Use REPEATABLE READ

However, we also need to prevent others from reading the rows. To guarantee that, we can either rely on the definitions or on the actual implementation. According to the definitions of the isolation levels, we must use at least REPEATABLE READ. But in order to block other readers, we need to understand whether the database uses pessimistic or optimistic locking. Typically, common SQL databases use pessimistic locking with optimizations, so they take locks as late as possible and escalate them as late as possible. We can enforce taking locks earlier by using FOR UPDATE syntax which makes the database to take the locks eagerly. The side effect of that is that the protocol we defined above works with READ COMMITTED. That’s just a coincidence, not something that we should rely on.

However, this approach won’t work for snapshot isolation and Multi-version Concurrency Control (MVCC). That’s because MVCC effectively instructs the database to take locks as late as possible. The database will either fail with FOR UPDATE syntax, or the database will simply ignore it and carry on with optimistic locking. We’ll see in the next post how to fix that.

In short: use REPEATABLE READ and take locks eagerly to avoid rollbacks. If you can’t take locks eagerly (because the database uses MVCC or forces optimistic locking), then you need a different protocol that I cover here

What about the performance?

We can see that this approach will not give us benefits. How can we process things faster and still maintain the order? The solution would be to read messages in batches, send these batches to the queue, but commit them only after. This would effectively create a transaction in the queue. It would look like this:

This way we can improve the performance. However, implementing this solution is hard and it requires the way to commit messages on the queue end which may not be supported.

Summary

Transactional outbox pattern requires not only the transaction for the business entity and the message, but also the careful extraction of the messages. It may be a little bit harder when we scale out Relay to run multiple instances.