@axinom/mosaic-message-bus
The mosaic-message-bus package implements the Mosaic messaging pattern in a robust and reliable way. Please read up on the messaging concept to familiarize yourself with this approach. The library includes the functionality to set up the RabbitMQ connection, queues, bindings, etc. You use it to send messages, handle incoming messages, and have automatic retries and redeliveries.
The messages that you are sending and receiving are defined in messaging packages. These packages include also message routing instructions, queue bindings, and other messaging settings.
Axinom provides the @axinom/mosaic-messages which
contains all the events and commands that the Axinom Mosaic services are sending
and receiving. Each of your projects should have its own customizable messages
package with your events, commands, and messaging settings. You can find more
information on how to implement or extend your messaging in the
messaging usage how-to guide.
Setup
The library offers one central entry point which is the setupMessagingBroker
function. It is used to initialize the messaging broker and the bindings for all
the message handlers.
You supply it with the required configuration parameters on how to connect to
the RabbitMQ server, builders, and optional middleware. All message handlers and
the messages that are sent are configured via the RascalConfigBuilder
.
A basic usage example of the library:
// general setup (plus database pools and other setup-related work)
const app = express();
const config = getFullConfig();
const counter = initMessagingCounter(getOwnerPgPool(app));
// register all your builders to send or receive events/commands:
const builders = [new RascalConfigBuilder(MediaServiceMessagingSettings.StartIngest, config)
.sendCommand()
.subscribeForCommand<StartIngestCommand>(
(broker: Broker) =>
new IngestHandler(broker, getLoginPgPool(app), config),
),
// ...
];
const broker = await setupMessagingBroker({
app,
config,
builders,
components: { counters: { postgresCounter: counter } },
// ...
});
Input object definitions
Property | Description |
---|---|
app | A MessagingRegistry object to store and share objects by keys e.g. the express app. |
config | The MessagingConfig object with settings on how to connect to the RabbitMQ server, vhost, and other similar settings. |
builders | A list of RascalConfigBuilder configuration builder objects. They are responsible to configure the message bindings for sending and receiving events and commands in your code. |
components | Extension possibility for the broker initialization. In this example, a message counter is added to protect from "poisonous" messages. |
onMessageMiddleware | An array of OnMessageMiddleware middleware objects. Each of them will be called before a message is given to the corresponding message handler. |
logger | A Mosaic logger instance used during setup and for error/debug logging during message processing. |
shutdownActions | Used to nicely shut down the message broker and RabbitMQ connections when your application is being shut down. |
rascalConfigExportPath | A relative path from your project root to a folder where the generated config JSON will be saved to. |
Sending Messages
Messages can be sent via the Broker
instance from anywhere in your code.
This can be either events or commands.
await this.broker.publish(
MediaServiceMessagingSettings.IngestStarted.messageType,
{
entity_id: 123,
entity_type: 'movie'
},
{
auth_token: my_auth_token,
},
);
Publish method parameters
Property | Description |
---|---|
key | The RabbitMQ routing key. If you follow the Mosaic conventions this key is defined in your AsyncAPI document and exposed in your custom messaging settings. |
payload | The business-relevant part of the message. This will be added as the payload property when sending the RabbitMQ message. |
envelopeOverrides | Explicitly defined message envelope values, e.g. an auth token or message context information. (optional) |
options | Extra message options, e.g. if additional header values should be passed in the RabbitMQ message. |
Receiving Messages
Incoming messages are handled by a message handler. It is strongly advised
to inherit from the GuardedMessageHandler
that is part of the
@axinom/mosaic-id-guard
library. This adds support for secure permission
checks.
Message Handler
export class StartIngestHandler extends GuardedMessageHandler<StartIngestCommand> {
constructor(
private readonly broker: Broker,
private readonly loginPool: LoginPgPool,
config: Config,
) {
super(
MediaServiceMessagingSettings.DeleteEntity.messageType,
[
'ADMIN',
'COLLECTIONS_EDIT',
'MOVIES_EDIT',
'SETTINGS_EDIT',
'TVSHOWS_EDIT',
],
config.serviceId,
{
tenantId: config.tenantId,
environmentId: config.environmentId,
authEndpoint: config.idServiceAuthBaseUrl,
},
);
}
public async onMessage(
content: StartIngestCommand,
message: MessageInfo,
): Promise<void> {
// handler logic
}
public async onMessageFailure(
content: StartIngestCommand,
message: MessageInfo,
error: Error,
): Promise<void> {
// final error handling logic
}
}
Initialize a Message Handler
Constructor parameters
Property | Description |
---|---|
messageType | The type of message that this message handler handles. |
permissions | A set of permissions that allow the usage of this message handler. The message sender authentication token must include at least one of them in order for the message to be processed. |
serviceId | The ID of the current service. This is used to check the permissions of the message sender. |
authEndpoint | The authorization endpoint URL to get the JWT signing public key from to verify the authentication tokens. |
overrides | Custom adjustments to overwrite Rascal subscription configuration settings. |
middleware | Middleware functions that should be executed for this specific message handler in addition to the global ones. |
Handle a Message
The "onMessage" function is the core part of a message handler. You add the message processing logic into this function. By default, a message will be acknowledged when the "onMessage" function does not throw an error. Otherwise it will not be acknowledged and be put back into the RabbitMQ queue.
onMessage parameters
Property | Description |
---|---|
payload | The business-relevant payload of the message |
message | The full message envelope with all related message metadata. |
ackOrNack | Acknowledgment or rejection callback that can be called to forcefully complete the message processing. |
Messaging Error Handling.
The Mosaic Message Bus library takes care of the redelivery of messages
if the handler fails to process them. It will redeliver the message for
up to ten attempts in case of a message handling failure. When the final
attempt should fail, the onMessageFailure
function is called and the
message is (by default) moved to the dead-letter queue.
onMessageFailure parameters
Property | Description |
---|---|
payload | The business-relevant payload of the message |
message | The full message envelope with all related message metadata. |
error | The last error that was thrown when trying to execute the "onMessage" function. |