Messaging
Overview
Messaging is the concept of different services (or parts of a service) communicate with each other asynchronously. The Mosaic platform uses RabbitMQ as the message broker that implements all the required options to manage and distribute messages.
This document explains the messaging concepts and what is going on "under the hood". In your daily work, most of this is encapsulated by the Mosaic message bus, so you don’t have to worry about it.
For asynchronous message processing, we use the Mosaic message bus which is based on the Rascal pub/sub wrapper around the amqplib library. The Mosaic message bus adds the notions of events and commands and provides the functionality to easily integrate into the Mosaic Managed Services in a secure way.
The main objective of the messaging
folder in services' source code is the setup and registration of the
message handlers and middleware to consume messages and the publishers to send out
messages. In addition, there is a media message handler that protects message
handlers by checking the message for specific permissions.
Decoupling and Scalability
By embracing messaging patterns, Mosaic achieves loose coupling between its microservices. Each service can operate independently, communicating through messages without the need for direct service-to-service calls. This decoupling enhances scalability, as new services can be added or scaled up without affecting existing components.
Fault Tolerance and Reliability
The transactional nature of message processing ensures fault tolerance and reliability. Messages are processed atomically, with retries and error-handling mechanisms built-in. This guarantees that critical operations, such as order processing, payment updates, or inventory management, are executed reliably and consistently.
Asynchronous Processing
Asynchronous messaging allows Mosaic services to handle tasks and events in a non-blocking manner. Long-running operations, such as file processing, notifications, or complex calculations, can be offloaded to background workers, improving system responsiveness and user experience.
Messaging life-cycle
At the heart of RabbitMQ, are exchanges and queues. An exchange
is the
endpoint, where messages are sent to. A queue
is a place where a message is
stored. From there, consumers can read messages. Exchanges route messages to the
inbox queues of the target services. The routing
defines the logic of which
messages should go to which queue(s). These can be zero queues if nobody is
interested in such messages. However, it could also be one queue or many queues.
In Mosaic services we use a single "inbox" queue per service. Service use the
transactional inbox pattern (described further down) to receive and store inbox
messages in the service database.
The term publisher
is used in your application for the functionality that
reads a message from the transactional outbox and sends the message to an
exchange in RabbitMQ. On the other side, there are the so-called "receivers"
that receive messages from their RabbitMQ inbox queue. They use a push model
where RabbitMQ actively sends them new messages. The receivers store the message
in the transactional inbox database table.
When a message is then retrieved from the inbox table, it is processed by a
consumer which is called a message handler
. The consumer processes the message
and marks it as "processed" in the inbox table.
To be able to read or send a message, an application must first connect to
RabbitMQ. The RabbitMQ instance is called the Broker
. It has virtual hosts
(vhost
) that offer a logical grouping of the exchanges and queues and include
a permission concept. This can be thought of as a similar concept as a database
server like PostgreSQL (~Broker) having different databases (~vhosts). Moreover,
databases have tables (~queues). A connection
needs to be established to such
a vhost
.
RabbitMQ supports different connection protocols. The most common one,
also used by Mosaic, is AMQP 0.9.1 which is based on a long-lived TCP
connection. This is the "physical" connection from an application to a RabbitMQ
vhost. On top of that, there are channels
which are a kind of "virtual"
connections that use the "physical" connection. A best-practice for connections
is to use one connection for publishers and another for consumers.
Message
A message is a general term that is used for sending some data to RabbitMQ that ends up in a queue. In Mosaic, messages are further differentiated into commands and events.
Commands
are messages that are sent to a known recipient with the intention
that this recipient performs some action. Commands should be named in the
imperative form: "StartIngest"
or "CreateUser"
. This tells the consumer of
this command to start the ingest process based on the details given in the
message. Or it could be used to create a new user. The owner of a command is
the consumer - there is only one consumer of this command. A command is
"sent", never "published".
Events
are messages that want to inform about something that happened. It is
not known (or at least does not need to be known) to the sender who is
interested in this message. Possibly, it could be nobody, a single subscriber,
or many subscribers who might want to receive such a message. Events are named
in the past tense (something has been performed). Examples are IngestFinished
and UserCreated
. The owner of an event is the producer. There is only one
producer of this event. An event is "published", never "sent".
Exchanges
The Mosaic message bus uses two exchanges to send messages. The command
and the event
exchange where the corresponding messages are sent/published to.
Both of them are topic
exchanges that allow for flexible message routing.
The inbox queues bind to these exchanges to subscribe to commands and events.
In addition, there are delay
and retry
exchanges that are used for delayed
retries if a message fails to be processed. Moreover, there is also the
dead_letter
exchange which means that a message failed too many times. The
dead_letter
queues of the services bind to that exchange.
Every Mosaic service must make sure that those exchanges exist when the application starts.
Queues and Bindings
Queues are used to receive commands and events. When consumers want to receive a specific message (command or event), they create the inbox queue in which the messages should end up. The queue is then bound to one of the exchanges to receive the desired messages.
As mentioned in the message patterns section, a command is always handled by a
single consumer. A logical consumer (with potentially multiple scaled
application instances) has one inbox queue. From there it reads all the messages
and stores them in the service inbox database table. Command messages have the
routing key with a naming convention of consumer.entity.command
and are sent
to the command exchange. That exchange forwards the message to the bound inbox
queue.
Events are owned by a single publisher. The routing key of the message is formed
with a naming convention of producer.entity.event
. Unlike commands, the
routing key for events is formed around the producer. Every interested consumer
can now subscribe to this routing key. An example routing key would be
media_service.movie.published
. The Media Template Catalog Service subscribes
to such a message but there could be other interested consumers (a reporting
service or a monetization service) that are also subscribed to this event. Each
consumer has its own message queue to which the exchange adds all such event
messages.
As the publisher of an event does not know the consumers, it is the
responsibility of the consumer to create the queue and bind it to the event
exchange.
Transactional Outbox
In Mosaic, the transactional outbox pattern is utilized for reliable message sending from one service to itself or other external systems. When a service needs to publish an event or command, it first writes the message details to its outbox table within the same database transaction as the business logic. This ensures the message’s atomicity and durability together with the associated business data. The message publisher of that service polls then this outbox table and sends the messages via the actual RabbitMQ messaging broker.
Transactional Inbox
On the receiving end, services in Mosaic employ the transactional inbox pattern to process incoming messages reliably and efficiently. Messages are retrieved from the RabbitMQ queue and stored exactly once in the inbox database table. The associated business logic is then executed within the same database transaction that marks the inbox message as processed. This guarantees that the message is processed exactly once, even in the event of failures or system restarts.
Retries
If an inbox message is delivered to a consumer, the consumer tries to handle it. If the consumer is not able to correctly handle it (the database is currently down, some data is missing, any other exception), the message should be retried. Retry means that our application noticed that this message could not be handled. It shall not mark the inbox message as "processed". It will be retried again after a configurable delay. Maybe the database would be then up again, some required data would exist, or some other issue would be resolved.
Mosaic uses retries on the RabbitMQ level on on the transactional inbox and
outbox level. On the RabbitMQ side, a message that produced an error is kept in
a delay
queue. The first few times when a message fails, it is put to the 10
second delay queue. If the message fails a few times more, it is put to the 30
seconds delay queue. When all attempts fail, the message is forwarded to the
dead_letter
queue. As the receiver only stores messages into the transactional
inbox which is a quite small operation those kind of retries should rarely
happen if the infrastructure is not down.
On the transactional inbox side messages are polled from the inbox database table. To ensure exactly once message processing the message is locked on the DB level and in addition a "locked-until" date is set. In error case a retry will be done after that locked-until time has passed.
Poisonous Message Handling
Poisonous message handling is related to retries. On the RabbitMQ level they are called "redeliveries" and are needed if a message was delivered to a consumer but the consumer crashed. That could be caused by some development bug, incorrect error handling, out of memory exception, or any other reason that caused the application to crash while the message was delivered but not acknowledged.
RabbitMQ notices that it delivered the message but the channel closed and no ack/nack answer was received. It puts the message back at the front of the queue. The next consumer takes the message. If the error persists, the consuming application crashes again. The message would thus be a "poisonous message" that brings down the system until it is removed. The same applies to the transactional inbox message handling. If a specific inbox message crashes the service it would be polled, processed, and would then crashes the service over and over again.
RabbitMQ marks a message that was already delivered but put back with the
redelivered
flag. The problem is that this is just a flag, not a counter.
This means we would only have the option to process every message once and move
flagged messages directly to the dead_letter
queue. Another option would be to
find some other way to know how often this message was already consumed.
To solve this, we need to count the number of times a message was already
delivered in our application. If that count is exceeded, we move it to the
dead_letter
queue. This information cannot be stored with the message: the
service crashes before we can handle it. We cannot store it in memory either as
it would be lost when the application crashes. Therefore, we need to store this
in some external service like PostgreSQL. The Mosaic libraries include the
functionality to create such a redeliveries counter.
Similar to the normal retries: as the incoming RabbitMQ messages are only inserted into the corresponding inbox database table the probability of a poisonous message are rather limited.
The actual message processing is done in the message handlers based on a transactional inbox message. This pattern has a built in poisonous message handling that tracks the number of times a message was polled for being processed and compares it with the number of times it was actually marked as finished. If the difference between those numbers exceeds a configurable threshold, the message is assumed to be a poisonous message and is abandoned.
Mosaic Messaging
Contracts
Mosaic services use a design-first approach for messaging definitions. Messaging is described in the form of contracts. JSON Schema files define each command and event message payload. The AsyncAPI specification is used to document for every payload which RabbitMQ routing key and target queue to use.
Messaging contracts are not only playing the role of a single source of truth but also:
- Provide a transparent process for managing messages.
- Serve as documentation.
- Source for code generation.
- Extendable: define once, generate code for multiple technologies and e2e tests.
- Can be exchanged with third parties for integration.
AsyncAPI Specification
AsyncAPI specification, an adaptation of the OpenAPI specification, allows to describe Messaging endpoints in a simple and readable format. The specification can be provided in JSON or YAML formats.
AsyncAPI Specification: 'example-asyncapi.yml'
asyncapi: 2.0.0
info:
title: Example Service
version: '1.0.0'
description: |
Example Management
# Mosaic extension for AsyncAPI specification, used to define service id. If not defined, as service id will be used info.title in lower-kebab-case
x-service-id: ax-service-example
channels:
# command
'example.create': # RabbitMQ routing key
bindings:
amqp:
queue:
name: inbox # RabbitMQ queue name should always be "inbox"
publish:
message:
# reference to message definition under `components/messages` section of document
$ref: '#/components/messages/example-create-command'
components:
messages:
example-create-command:
tags:
- name: aggregate-type:example # defines the aggregate type "example"
contentType: application/json
payload:
# reference to the location of the file with JSON schema for the message payload
$ref: 'commands/create-example-command.json'
Message
RabbitMQ allows to send messages with textual or binary message bodies. Based on
the content-type
, the consumer can check how to deserialize the message.
Mosaic-based messages use JSON as the message format. All messages share a
common message structure that defines the message payload (custom defined) but
also the metadata for exchanging messages. The metadata allows for a common way
to log/trace messages, define the message type, carry the authentication token,
share context, etc. The business-relevant data is contained in the payload
field.
{
"payload":{
"title": "Example Title 1",
"count": 3
},
"message_context":{
"example_correlation_id": "9b9824d7-5af1-45c2-9f5d-32423f0d6d39"
},
"message_id":"3eb8257f-1341-49fb-bf77-027549cd4761",
"timestamp":"2024-03-02T08:56:22.555Z",
"message_type":"ExampleCreateCommand",
"aggregate_id": "3",
"aggregate_type": "example",
"message_version":"1.1",
"auth_token":"eyJhbGcixxx.eyJ0ZW5hbnRJZCxxx.VYqnlcLWxxx"
}
Message field definitions
Property | Description |
---|---|
payload | The Actual message payload/content. |
message_context | An object that provides contextual information. This object can be attached to messages. When the initial message contains a message context, it should be included in every event that is published when handling that message. This is especially useful when sending a command where the resulting event should be mapped back to the starting entity. |
message_id | The unique message identifier as UUID. |
timestamp | Date and time (UTC) when the message was first published. |
message_type | Message type, e.g. EnsureVideoExistsStart . |
aggregate_id | The unique identifier of the aggregate/entity this message is based on. |
`aggregate_type`` | The name of the aggregate of this message e.g. video . |
message_version | A version string of the message version. Can be increased if a new message format is created, so both the old and new message format can be correctly handled. |
auth_token | A JWT token of the user/actor who triggered the message publication. |
The message payload is defined using a JSON schema because of its powerful expressive features and mature (and abundant) tooling.
The following is an example of how a command message payload can be defined.
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type":"object",
"title": "create_example_command",
"description": "Create example command schema.",
"additionalProperties": false,
"required": ["title", "count"],
"properties": {
"title": {
"type": "string",
"description": "The title of the example entity."
},
"count": {
"type": "integer",
"description": "Some example count."
}
}
}