Messaging
General
This how-to guide walks you through the steps to use the Mosaic messaging approach in your applications. The example extends the Media Service ingest process to update a movie with details from the MovieDB database.
Make sure your development environment is set up correctly and start the following development scripts from your project root each in a separate console:
yarn dev:libs
yarn dev:services
yarn dev:workflows
Define and register a message
First, create a new command message that should trigger the movie update from the Movie DB. The message should go into the folder libs/media-messages/schemas/payloads/media/commands
and name it update-data-from-movie-db-command.json
.
The message takes the movie_id
that should be updated as payload. It should look like this:
{
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "update_data_from_movie_db_command",
"description": "Update movie data from the movie DB schema.",
"additionalProperties": false,
"required": ["movie_id"],
"properties": {
"movie_id": {
"type": "integer",
"description": "Id of the movie to update."
}
}
}
Update the file libs/media-messages/schemas/payloads/media/media-service-asyncapi.yml
to include the new message.
Add the following code to the channels
section near the other commands:
'ingest.update_data_from_movie_db':
bindings:
amqp:
queue:
name: inbox
publish:
message:
$ref: '#/components/messages/update-data-from-movie-db-command'
And the following code into the #/components/messages
section:
update-data-from-movie-db-command:
tags:
- name: aggregate-type:ingest-item
contentType: application/json
payload:
$ref: 'commands/update-data-from-movie-db-command.json'
The dev:libs
script creates from that definition the corresponding TypeScript files.
Now you can update the libs\media-messages\src\media\media-messaging-settings.ts
to add the new command settings:
public static UpdateDataFromMovieDb = new MediaServiceMessagingSettings(
'UpdateDataFromMovieDb',
'inbox',
'ingest.update_data_from_movie_db',
'command',
'ingest-item'
);
Add another RabbitMQ configuration builder to services\media\service\src\messaging\register-messaging.ts
in the ingestBuilders
array:
new RascalTransactionalConfigBuilder(
MediaServiceMessagingSettings.UpdateDataFromMovieDb,
config,
)
.sendCommand()
.subscribeForCommand(() => inboxWriter),
Message Handler
First, you need to create a Movie DB account (https://www.themoviedb.org/signup). Now you can get an API key and add it to the .env
file as MOVIE_DB_API_KEY=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
.
Adjust the config-definitions.ts
file and add a new property as movieDbApiKey: () => env.get('MOVIE_DB_API_KEY').asString()
.
Create a new file for the message handler: services/media/service/src/domains/movies/handlers/update-data-from-movie-db-handler.ts
This handler will try to request the data from the movie DB based on the movie title and update the original title, description, and release date.
import { Logger } from '@axinom/mosaic-service-common';
import { TypedTransactionalMessage } from '@axinom/mosaic-transactional-inbox-outbox';
import {
MediaServiceMessagingSettings,
UpdateDataFromMovieDbCommand,
} from 'media-messages';
import { ClientBase } from 'pg';
import { selectExactlyOne, update } from 'zapatos/db';
import { Config } from '../../../common';
import { MediaGuardedTransactionalInboxMessageHandler } from '../../../messaging';
export class UpdateDataFromMovieDbHandler extends MediaGuardedTransactionalInboxMessageHandler<UpdateDataFromMovieDbCommand> {
constructor(config: Config) {
super(
MediaServiceMessagingSettings.UpdateDataFromMovieDb,
['ADMIN', 'MOVIES_EDIT'],
new Logger({
config,
context: UpdateDataFromMovieDbHandler.name,
}),
config,
);
}
override async handleMessage(
{ payload }: TypedTransactionalMessage<UpdateDataFromMovieDbCommand>,
loginClient: ClientBase,
): Promise<void> {
const movieId = payload.movie_id;
this.logger.debug({
message: 'Update movie from The Movie DB',
details: {
movieId,
},
});
const movie = await selectExactlyOne('movies', { id: movieId }).run(
loginClient,
);
const searchMovieUrl = `https://api.themoviedb.org/3/search/movie?api_key=${this.config.movieDbApiKey}&language=en-US&query=${movie.title}&page=1`;
const searchMovieResponse = await fetch(searchMovieUrl);
if (searchMovieResponse.ok) {
const searchMovieData = await searchMovieResponse.json();
const firstMovie = searchMovieData['results'][0];
if (firstMovie) {
this.logger.debug(JSON.stringify(firstMovie));
await update(
'movies',
{
original_title: firstMovie.original_title,
description: firstMovie.overview,
released: firstMovie.release_date,
},
{
id: movieId,
},
).run(loginClient);
}
}
}
}
Now you can register the handler and the command in the services\media\service\src\messaging\register-messaging.ts
by adding the handler via new UpdateDataFromMovieDbHandler(config)
to the ingestMessageHandlers
array.
GraphQL
To start the message processing we expose a GraphQL API endpoint. For this we create a new plugin file as services/media/service/src/domains/movies/plugins/update-data-from-movie-db-plugin.ts
.
Add the following contents to the file to expose the GraphQL mutation endpoint startMovieDbUpdate
to send a message via the transactional outbox so the UpdateDataFromMovieDbHandler
can handle the actual import and processing including retries if there are update errors.
import { MosaicError } from '@axinom/mosaic-service-common';
import { gql, makeExtendSchemaPlugin } from 'graphile-utils';
import {
MediaServiceMessagingSettings,
UpdateDataFromMovieDbCommand,
} from 'media-messages';
import { getLongLivedToken } from '../../..//common';
import { v4 as uuid } from 'uuid';
import { ExtendedGraphQLContext } from '../../..//graphql';
export const StartMovieDbUpdatePlugin = makeExtendSchemaPlugin((build) => {
return {
typeDefs: gql`
input StartMovieDbUpdateInput {
movieId: Int!
}
type StartMovieDbUpdatePayload {
message: String!
query: Query
}
extend type Mutation {
startMovieDbUpdate(
input: StartMovieDbUpdateInput!
): StartMovieDbUpdatePayload
}
`,
resolvers: {
Mutation: {
startMovieDbUpdate: async (
_query,
args,
context: ExtendedGraphQLContext,
) => {
try {
const movieId = args.input.movieId;
if (!context.pgClient) {
throw new MosaicError({
message: 'The database client was not initialized.',
code: 'DATABASE_CLIENT_NOT_INITIALIZED',
});
}
const token = await getLongLivedToken(
context.jwtToken ?? '',
context.config,
);
// Sending only a database ID in a scenario of detached services is an anti-pattern
// Ideally, the whole doc should have been sent and the message should be self-contained,
// but because the document can be quite big we save it to the DB and pass only its ID.
await context.storeOutboxMessage<UpdateDataFromMovieDbCommand>(
movieId,
MediaServiceMessagingSettings.UpdateDataFromMovieDb,
{ movie_id: movieId },
context.pgClient,
{
envelopeOverrides: {
auth_token: token,
message_context: { correlation_id: uuid() }
}
},
);
return {
message: 'The import from The Movie DB was started.',
query: build.$$isQuery,
};
} catch (error) {
if (error instanceof MosaicError) {
throw error;
}
throw new MosaicError({
error: error as Error,
message: 'The attempt to start the movie DB import failed.',
code: 'MOVIE_DB_UPDATE_FAILED',
});
}
},
},
},
};
});
Add the plugin to the AllMoviePlugins
to include it in the GraphQL API. If you start the application now you can check the service console log output and notice that the startMovieDbUpdate
endpoint is disabled. To enable it, add M.startMovieDbUpdate
to the array MoviesMutateOperations
in the file services/media/service/src/domains/movies/operation-groups.ts
.
Now the API is available and you can start the movie DB import via:
mutation start {
startMovieDbUpdate(input: {movieId: 1}) {
message
}
}