Skip to main content

@axinom/mosaic-message-bus

The mosaic-message-bus package implements the Mosaic messaging pattern in a robust and reliable way. Please read up on the messaging concept to familiarize yourself with this approach. The library includes the functionality to set up the RabbitMQ connection, queues, bindings, etc. You use it to send messages, handle incoming messages, and have automatic retries and redeliveries.

The messages that you are sending and receiving are defined in messaging packages. These packages include also message routing instructions, queue bindings, and other messaging settings.

Axinom provides the @axinom/mosaic-messages which contains all the events and commands that the Axinom Mosaic services are sending and receiving. Each of your projects should have its own customizable messages package with your events, commands, and messaging settings. You can find more information on how to implement or extend your messaging in the messaging usage how-to guide.

Setup

The library offers one central entry point which is the setupMessagingBroker function. It is used to initialize the messaging broker and the bindings for all the message handlers.

You supply it with the required configuration parameters on how to connect to the RabbitMQ server, builders, and optional middleware. All message handlers and the messages that are sent are configured via the RascalConfigBuilder.

A basic usage example of the library:

// general setup (plus database pools and other setup-related work)
const app = express();
const config = getFullConfig();
const counter = initMessagingCounter(getOwnerPgPool(app));

// register all your builders to send or receive events/commands:
const builders = [new RascalConfigBuilder(MediaServiceMessagingSettings.StartIngest, config)
.sendCommand()
.subscribeForCommand<StartIngestCommand>(
(broker: Broker) =>
new IngestHandler(broker, getLoginPgPool(app), config),
),
// ...
];

const broker = await setupMessagingBroker({
app,
config,
builders,
components: { counters: { postgresCounter: counter } },
// ...
});

Input object definitions

PropertyDescription
appA MessagingRegistry object to store and share objects by keys e.g. the express app.
configThe MessagingConfig object with settings on how to connect to the RabbitMQ server, vhost, and other similar settings.
buildersA list of RascalConfigBuilder configuration builder objects. They are responsible to configure the message bindings for sending and receiving events and commands in your code.
componentsExtension possibility for the broker initialization. In this example, a message counter is added to protect from "poisonous" messages.
onMessageMiddlewareAn array of OnMessageMiddleware middleware objects. Each of them will be called before a message is given to the corresponding message handler.
loggerA Mosaic logger instance used during setup and for error/debug logging during message processing.
shutdownActionsUsed to nicely shut down the message broker and RabbitMQ connections when your application is being shut down.
rascalConfigExportPathA relative path from your project root to a folder where the generated config JSON will be saved to.

Sending Messages

Messages can be sent via the Broker instance from anywhere in your code. This can be either events or commands.

Publishing an event or command message:
await this.broker.publish(
MediaServiceMessagingSettings.IngestStarted.messageType,
{
entity_id: 123,
entity_type: 'movie'
},
{
auth_token: my_auth_token,
},
);

Publish method parameters

PropertyDescription
keyThe RabbitMQ routing key. If you follow the Mosaic conventions this key is defined in your AsyncAPI document and exposed in your custom messaging settings.
payloadThe business-relevant part of the message. This will be added as the payload property when sending the RabbitMQ message.
envelopeOverridesExplicitly defined message envelope values, e.g. an auth token or message context information. (optional)
optionsExtra message options, e.g. if additional header values should be passed in the RabbitMQ message.

Receiving Messages

Incoming messages are handled by a message handler. It is strongly advised to inherit from the GuardedMessageHandler that is part of the @axinom/mosaic-id-guard library. This adds support for secure permission checks.

Message Handler

Message Handler structure
export class StartIngestHandler extends GuardedMessageHandler<StartIngestCommand> {
constructor(
private readonly broker: Broker,
private readonly loginPool: LoginPgPool,
config: Config,
) {
super(
MediaServiceMessagingSettings.DeleteEntity.messageType,
[
'ADMIN',
'COLLECTIONS_EDIT',
'MOVIES_EDIT',
'SETTINGS_EDIT',
'TVSHOWS_EDIT',
],
config.serviceId,
{
tenantId: config.tenantId,
environmentId: config.environmentId,
authEndpoint: config.idServiceAuthBaseUrl,
},
);
}


public async onMessage(
content: StartIngestCommand,
message: MessageInfo,
): Promise<void> {
// handler logic
}

public async onMessageFailure(
content: StartIngestCommand,
message: MessageInfo,
error: Error,
): Promise<void> {
// final error handling logic
}
}

Initialize a Message Handler

Constructor parameters

PropertyDescription
messageTypeThe type of message that this message handler handles.
permissionsA set of permissions that allow the usage of this message handler. The message sender authentication token must include at least one of them in order for the message to be processed.
serviceIdThe ID of the current service. This is used to check the permissions of the message sender.
authEndpointThe authorization endpoint URL to get the JWT signing public key from to verify the authentication tokens.
overridesCustom adjustments to overwrite Rascal subscription configuration settings.
middlewareMiddleware functions that should be executed for this specific message handler in addition to the global ones.

Handle a Message

The "onMessage" function is the core part of a message handler. You add the message processing logic into this function. By default, a message will be acknowledged when the "onMessage" function does not throw an error. Otherwise it will not be acknowledged and be put back into the RabbitMQ queue.

onMessage parameters

PropertyDescription
payloadThe business-relevant payload of the message
messageThe full message envelope with all related message metadata.
ackOrNackAcknowledgment or rejection callback that can be called to forcefully complete the message processing.

Messaging Error Handling.

The Mosaic Message Bus library takes care of the redelivery of messages if the handler fails to process them. It will redeliver the message for up to ten attempts in case of a message handling failure. When the final attempt should fail, the onMessageFailure function is called and the message is (by default) moved to the dead-letter queue.

onMessageFailure parameters

PropertyDescription
payloadThe business-relevant payload of the message
messageThe full message envelope with all related message metadata.
errorThe last error that was thrown when trying to execute the "onMessage" function.