Skip to main content

Messaging

General

This how-to guide walks you through the steps to use the Mosaic messaging approach in your applications. The example extends the Media Service ingest process to update a movie with details from the MovieDB database.

Make sure your development environment is set up correctly and start the following development scripts from your project root each in a separate console:

  • yarn dev:libs
  • yarn dev:services
  • yarn dev:workflows

Define and register a message

First, create a new command message that should trigger the movie update from the Movie DB. The message should go into the folder libs/media-messages/schemas/payloads/media/commands and name it update-data-from-movie-db-command.json.

The message takes the movie_id that should be updated as payload. It should look like this:

{
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "update_data_from_movie_db_command",
"description": "Update movie data from the movie DB schema.",
"additionalProperties": false,
"required": ["movie_id"],
"properties": {
"movie_id": {
"type": "integer",
"description": "Id of the movie to update."
}
}
}

Update the file libs/media-messages/schemas/payloads/media/media-service-asyncapi.yml to include the new message.

Add the following code to the channels section near the other commands:

'ingest.update_data_from_movie_db':
bindings:
amqp:
queue:
name: inbox
publish:
message:
$ref: '#/components/messages/update-data-from-movie-db-command'

And the following code into the #/components/messages section:

update-data-from-movie-db-command:
tags:
- name: aggregate-type:ingest-item
contentType: application/json
payload:
$ref: 'commands/update-data-from-movie-db-command.json'

The dev:libs script creates from that definition the corresponding TypeScript files.

Now you can update the libs\media-messages\src\media\media-messaging-settings.ts to add the new command settings:

public static UpdateDataFromMovieDb = new MediaServiceMessagingSettings(
'UpdateDataFromMovieDb',
'inbox',
'ingest.update_data_from_movie_db',
'command',
'ingest-item'
);

Add another RabbitMQ configuration builder to services\media\service\src\messaging\register-messaging.ts in the ingestBuilders array:

new RascalTransactionalConfigBuilder(
MediaServiceMessagingSettings.UpdateDataFromMovieDb,
config,
)
.sendCommand()
.subscribeForCommand(() => inboxWriter),

Message Handler

First, you need to create a Movie DB account (https://www.themoviedb.org/signup). Now you can get an API key and add it to the .env file as MOVIE_DB_API_KEY=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.

Adjust the config-definitions.ts file and add a new property as movieDbApiKey: () => env.get('MOVIE_DB_API_KEY').asString().

Create a new file for the message handler: services/media/service/src/domains/movies/handlers/update-data-from-movie-db-handler.ts

This handler will try to request the data from the movie DB based on the movie title and update the original title, description, and release date.

import { Logger } from '@axinom/mosaic-service-common';
import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox';
import {
MediaServiceMessagingSettings,
UpdateDataFromMovieDbCommand,
} from 'media-messages';
import { ClientBase } from 'pg';
import { selectExactlyOne, update } from 'zapatos/db';
import { Config } from '../../../common';
import { MediaGuardedTransactionalInboxMessageHandler } from '../../../messaging';

export class UpdateDataFromMovieDbHandler extends MediaGuardedTransactionalInboxMessageHandler<UpdateDataFromMovieDbCommand> {
constructor(config: Config) {
super(
MediaServiceMessagingSettings.UpdateDataFromMovieDb,
['ADMIN', 'MOVIES_EDIT'],
new Logger({
config,
context: UpdateDataFromMovieDbHandler.name,
}),
config,
);
}

override async handleMessage(
{ payload }: TypedTransactionalMessage<UpdateDataFromMovieDbCommand>,
loginClient: ClientBase,
): Promise<void> {
const movieId = payload.movie_id;

this.logger.debug({
message: 'Update movie from The Movie DB',
details: {
movieId,
},
});

const movie = await selectExactlyOne('movies', { id: movieId }).run(
loginClient,
);

const searchMovieUrl = `https://api.themoviedb.org/3/search/movie?api_key=${this.config.movieDbApiKey}&language=en-US&query=${movie.title}&page=1`;

const searchMovieResponse = await fetch(searchMovieUrl);
if (searchMovieResponse.ok) {
const searchMovieData = await searchMovieResponse.json();
const firstMovie = searchMovieData['results'][0];
if (firstMovie) {
this.logger.debug(JSON.stringify(firstMovie));
await update(
'movies',
{
original_title: firstMovie.original_title,
description: firstMovie.overview,
released: firstMovie.release_date,
},
{
id: movieId,
},
).run(loginClient);
}
}
}
}

Now you can register the handler and the command in the services\media\service\src\messaging\register-messaging.ts by adding the handler via new UpdateDataFromMovieDbHandler(config) to the ingestMessageHandlers array.

GraphQL

To start the message processing we expose a GraphQL API endpoint. For this we create a new plugin file as services/media/service/src/domains/movies/plugins/update-data-from-movie-db-plugin.ts.

Add the following contents to the file to expose the GraphQL mutation endpoint startMovieDbUpdate to send a message via the transactional outbox so the UpdateDataFromMovieDbHandler can handle the actual import and processing including retries if there are update errors.

import { MosaicError } from '@axinom/mosaic-service-common';
import { gql, makeExtendSchemaPlugin } from 'graphile-utils';
import {
MediaServiceMessagingSettings,
UpdateDataFromMovieDbCommand,
} from 'media-messages';
import { getLongLivedToken } from '../../..//common';
import { v4 as uuid } from 'uuid';
import { ExtendedGraphQLContext } from '../../..//graphql';

export const StartMovieDbUpdatePlugin = makeExtendSchemaPlugin((build) => {
return {
typeDefs: gql`
input StartMovieDbUpdateInput {
movieId: Int!
}
type StartMovieDbUpdatePayload {
message: String!
query: Query
}
extend type Mutation {
startMovieDbUpdate(
input: StartMovieDbUpdateInput!
): StartMovieDbUpdatePayload
}
`,
resolvers: {
Mutation: {
startMovieDbUpdate: async (
_query,
args,
context: ExtendedGraphQLContext,
) => {
try {
const movieId = args.input.movieId;
if (!context.pgClient) {
throw new MosaicError({
message: 'The database client was not initialized.',
code: 'DATABASE_CLIENT_NOT_INITIALIZED',
});
}

const token = await getLongLivedToken(
context.jwtToken ?? '',
context.config,
);

// Sending only a database ID in a scenario of detached services is an anti-pattern
// Ideally, the whole doc should have been sent and the message should be self-contained,
// but because the document can be quite big we save it to the DB and pass only its ID.
await context.storeOutboxMessage<UpdateDataFromMovieDbCommand>(
movieId,
MediaServiceMessagingSettings.UpdateDataFromMovieDb,
{ movie_id: movieId },
context.pgClient,
{
envelopeOverrides: {
auth_token: token,
message_context: { correlation_id: uuid() }
}
},
);

return {
message: 'The import from The Movie DB was started.',
query: build.$$isQuery,
};
} catch (error) {
if (error instanceof MosaicError) {
throw error;
}
throw new MosaicError({
error: error as Error,
message: 'The attempt to start the movie DB import failed.',
code: 'MOVIE_DB_UPDATE_FAILED',
});
}
},
},
},
};
});

Add the plugin to the AllMoviePlugins to include it in the GraphQL API. If you start the application now you can check the service console log output and notice that the startMovieDbUpdate endpoint is disabled. To enable it, add M.startMovieDbUpdate to the array MoviesMutateOperations in the file services/media/service/src/domains/movies/operation-groups.ts.

Now the API is available and you can start the movie DB import via:

mutation start {
startMovieDbUpdate(input: {movieId: 1}) {
message
}
}