How to Efficiently Manage Queues in SQL Databases

Written by timurnav | Published 2023/03/01
Tech Story Tags: java | sql | database | messaging | software-development | backend | programming | data

TLDRSQL-based queue could be usefull with exactly-once or at-least-once semantics, or as Transaction Outbox and Dead Letter Queue. Advantages: it's easy, messages can be queried, easy to enhance features. Disadvantages: low throughput, could be expensive in cloud infrastructurevia the TL;DR App

The concept of a queue is very simple: there is a sender and a receiver, the first sends messages, and the second receives them. But still, the semantics of delivery is crucial.

There are 3 semantic options

  • At-most-once;

  • At-least-once;

  • Exactly once.

At-most-once is also known as Fire-and-forget. Suitable for analytics systems or notifications sending - where the lost messages make no harm. But an advantage of having such a thing is to keep performance at a higher level because there is no need to track if a message is received and even sent.

At-least-once - here, in case of an error, the message is not marked as read and remains so until it is read successfully. This is the most common semantic, due to simplicity and high reliability (it goes with idempotent message handling).

Exactly-once  - when a message will be delivered (and processed) at least 1 time and no more than 1 time. It’s really hard to achieve and most message brokers don’t provide such an ability. It requires synchronization between the sender and recipient and atomic processing on the receiver’s side. For example, a common ACID transaction, is used by both: the message handler and an operation that marks the message as read.

A typical use case for the solution described below would be systems where Exactly-once and At-least-once are needed. OR it could be the Transactional Outbox pattern implementation. Another common thing is a Failover Job (aka Dead Letter Queue) — a process in which it is important not to lose messages, but not to stop processing in case of a failure.

The idea I am implementing below is not new and has already been discussed many times, but I’m sharing my hands-on experience and explaining variances and trade-offs. Any decision is a set of compromises and in each case, you should lean on what you’re used to. But if you decide to use the database as a queue, then the implementation in the code will most likely be the same as I suggest in this article.

Implementation

Message Sender

Sending to a queue in a SQL database is trivial as it can be done by a simple SQL-insert. In its simplest form, it might look like this:

private final ObjectMapper objectMapper;
private final SimpleJdbcInsert jdbcInsert;

public void send(String topic, T message) {
   try {
       MessageEntity entity = new MessageEntity();
       entity.setTopic(topic);
       entity.setPayload(objectMapper.writeValueAsString(message));
       entity.setCreated(Instant.now().toEpochMilli());
       jdbcInsert.execute(new BeanPropertySqlParameterSource(entity));
   } catch (Exception e) {
       log.error("Unable to send the message {}", message, e);
       throw new RuntimeException(e);
   }
}

To send a message we need the following:

  • Know where to send to - topic.

  • The message itself - can be serialized in any convenient way. Here I have chosen json as human-readable text, but perhaps a binary format can be used to save disk space.

  • The creation date message can be useful for issues investigations.

To use scheduled/delayed messages, you can use an additional field with the date after which the message can be processed. Using a common transaction along with business logic will apply ACID properties to the "sending" message.

Message Receiver

A common practice is to use multiple instances of the applications to avoid a single point of failure or to scale. Therefore, we need to avoid concurrent access to a particular message. The best way to do this is to use a lock.

Depending on the time required to handle the message, you can choose one of two options:

  1. To lock the database row using SELECT ... FOR UPDATE - works for short transactions (not longer than a couple of seconds), because this lock works only within an open transaction. So the message should be marked as completed or deleted in the same transaction.

  2. Lock based on the value of a status column in the database: update the row by setting it IN_PROGRESS. Start processing and after processing is complete, set the status to DONE or delete the row. This will require several transactions, and the intermediate state can be dropped due to a server restart or as a result of an error. Therefore, the lock should have an expiration date and we need to refresh the data during the handling of the message. Also not to forget about fencing token and other nuances of distributed locking.

Below, I implemented the first option. It’s a simple but effective way, that would work for the vast majority of use cases of the SQL-based queues

private static final String SELECT = 
        "SELECT * FROM messages WHERE topic =:topic FOR UPDATE SKIP LOCKED LIMIT 1";
private static final String DELETE = 
        "DELETE FROM messages WHERE id =:id";

private final DataClassRowMapper<MessageEntity> rowMapper = 
        new DataClassRowMapper<>(MessageEntity.class);

private final String topic;
private final Class<T> messageType;
private final MessageHandler<T> handler;

private final ObjectMapper objectMapper;
private final NamedParameterJdbcOperations jdbcOperations;

@Transactional
public void processMessage() {
   try {
       List<MessageEntity> messages = 
               jdbcOperations.query(SELECT, Map.of("topic", topic), rowMapper);
       if (messages.isEmpty()) {
           return;
       }
       MessageEntity messageEntity = messages.get(0);
       String payload = messageEntity.getPayload();
       T message = objectMapper.readValue(payload, messageType);
       handler.handle(message);
       jdbcOperations.update(DELETE, Map.of("id", messageEntity.getId()));
   } catch (Throwable e) {
       log.error("Unable to handle message of {}", topic, e);
   }
}

Let’s take a closer look at the fetching query.

SELECT * FROM messages WHERE topic =:topic FOR UPDATE SKIP LOCKED LIMIT 1;

As already told above, FOR UPDATE acquires the lock on a selected row within the current transaction. Construction SKIP LOCKED allows us to immediately return one of the unlocked rows or an empty set if all rows are locked or nothing is matching to the where clause.

The intention is to fetch only one message, so we limit the result set, therefore transaction blocks only one message as the maximum.

The database row contains serialized messages, so it should be deserialized and passed to the handler. After successful handling, the code above deletes the row from the database and commits the transaction. Deleting processed messages without a trace could create a hard time on investigation issues. Another option is to use flags and the handling timestamp.

The method processMessage should be looped, with all appropriate optimizations: like shrinking the number of handlers if the queue is empty or increasing the number to handle peak load.

Conclusion

I keep repeating that the use of non-standard solutions should not become a problem for a growing project. The key is to assess the loads and pick feasible solutions for it. Some pros and cons of the solution are listed below.

Advantages

  • The database provides an understandable ACID mechanism that makes writing and reading messages easy. You can not be afraid of failures and server restarts.
  • You can always query the table without interfering with the handling process.
  • If you use a queue in the database, you don't need to know other systems and the maintenance of separate components.
  • You can easily enhance your features, like delay message handling.

Disadvantages

  • Relatively low throughput. It is possible to scale the database with read-only replicas, but in the case of a queue, reading and writing will need to be done exactly in the master node. Thus, both the recipient and the sender, and in general all database users, will compete for resources, and memory usage and the number of connections must be very clearly controlled.
  • The load of the queue table - consists of frequent inserts and deletions (or soft deletions), and during the processing itself, the row is locked, which leads either to long transactions or complex locking mechanisms. When using cloud solutions, this can lead to very high costs for maintaining the database.


Written by timurnav | 7+ YOE in backend applications, architecture and design. Microservices, distributed system. Senior SDE at Booking.com
Published by HackerNoon on 2023/03/01