Ingest
The Ingest feature is used to import metadata into the system and orchestrate the import of assets. It takes care of inserting new data or updating existing data. For example, it updates movie details and inserts new tags for it in the Media Service. It also orchestrates the same for related services, like importing videos and images for movies and episodes. The ingest goal is to bring the metadata in your services into the desired state.
For the Media Template implementation, the ingest logic is included in the Media Service template. However, for more complex scenarios, the ingest could be extracted into a separate service.
This documentation focuses on the implementation aspect of the ingest. There is also an Ingest How-To Guide that describes different ingest use cases.
The following table defines the ingest specific terms as they are used in the ingest documentation.
Glossary
Term | Description |
---|---|
Ingest (document) | The processing of a (JSON) document to insert or update entities into the database of the current system and to orchestrate ingests into other systems. |
Ingest item | One object in the ingest document that represents the details on how to create/update a movie, TV show, season, or episode. |
Ingest entity | The representation of the processing state of one ingest operation in the database. |
Ingest item entity | A database record of a single ingest item processing state. It stores the states for different processing steps across all systems (e.g. Metadata, Videos, Images, etc...). |
Main entity | The (database) entity that should be updated along with its associated data. This is either a movie, TV show, season, or episode. |
Ingest Document
The ingest document defines the data that should be ingested in the JSON format.
This JSON document must have a name
field to find the ingest again via the
ingest explorer. It should also include an items
array that holds all the data
for the metadata that should be ingested. It can optionally contain a date
field, detailing when the document was created (different from the ingest entity
created date). The ingest items are defined within the items
array.
Ingest Item Definition
Every item in the items
array must contain:
- a
type
field (string enum) that defines the type of the item that should be ingested. Those enum values are project-specific. For the Media Template, the following values are available:MOVIE
,TVSHOW
,SEASON
,EPISODE
. - an
external_id
field (string). The value in this field must uniquely identify the entity. This value must be provided from the outside and will not be generated during the ingest. - a
data
field (object). This field contains all the details on how the entity should look like after the ingest is done. It can define specific fields, related data, such as tags or genres, and ingest data that is handled in related services, e.g. images and videos.
{
"name": "July 2021 Ingest",
"document_created": "2021-07-21T14:05:12Z",
"items": [
{
"type": "project-specific-type-like-movie",
"external_id": "defined-by-CSP-983",
"data": {
"this-section-is-project-specific": "some values"
}
},
{
"type": "project-specific-type-like-episode",
"external_id": "defined-by-CSP-khel3",
"data": {
"this-section-is-also-project-specific": "other values"
}
}
]
}
What |
Description |
How |
Example |
---|---|---|---|
simple field |
Define a single field that should update a field in the database |
|
|
array type field |
Define an array of simple values (string or integer) that should be stored in PostgreSQL as an array column type |
|
|
1:n relation of simple data |
Use multiple values that should be stored in a separate table. This can be used for items that only have a name-like field. One example is tags that have the tag name as their only meaningful field. The table also has other fields but these are filled out automatically (ID, foreign key, create date, etc.). If the "n" table supports sorting, the sort order could be taken from the input array sort order. |
Note: from the definition in the JSON document it cannot be distinguished from the array type field. |
|
m:n relations with lookup values |
This is about creating a relation to some other entity that is not created/managed by the current entity. For example, genres or persons might be managed as their own entities. A unique value from the target entity must be provided for the mapping. For genres, this could be the genre title. For other entities, it could be the external ID or something else. If the "m:n" table supports sorting, the sort order could be taken from the input array sort order. |
note the definition in the JSON document is the same as for the array type or 1:n fields. |
|
1:n complex managed objects relations |
This is about managing a related object that is more complex than having just a title property (more complex than e.g. tags). For example, licenses are a list of complex license entities. A license entity is not just a single string field. Instead, it has the license start and end date as well as a list of country codes to which the license applies. |
|
|
JSON Schema
The Media Template provides a JSON schema to validate the ingest document when it
is uploaded into the Media Service. It provides the definitions to validate the
ingest document name
and the ingest items
along with their type
,
external_id
, and data
. All those properties are required properties, except
for the document_created
. The structural validation of the data
object for
all the different types is provided per item type.
{
"$schema": "http://json-schema.org/draft-07/schema",
"type": "object",
"title": "The Media Template ingest schema",
"required": ["name", "items"],
"properties": {
"name": {
"type": "string",
"description": "Defines a name for ingest document."
},
"document_created": {
"type": "string",
"format": "date-time",
"description": "Optional date of document."
},
"items": {
"type": "array",
"minItems": 1,
"description": "An array of ingest items of different types to be ingested.",
"items": {
"type": "object",
"description": "Each item represents an entity that will be created or updated inside of the Media Service.",
"required": ["type", "external_id", "data"],
"properties": {
"type": {
"enum": ["MOVIE", "TVSHOW", "SEASON", "EPISODE"],
"description": "Must be one of supported type values that represents an entity type in Media Service.",
"examples": ["MOVIE"]
},
"external_id": {
"$ref": "#/definitions/non-empty-string",
"description": "A unique identifier of an ingest item.",
"examples": ["avatar67A23"]
},
"data": {
"type": "object",
"description": "Object containing metadata of a specific media item."
}
}
}
}
}
}
- The JSON schema document can be used to validate the ingest document even before uploading it with, for example, the JSON schema validator.
- In addition, there are graphical tools that help to create the (ingest) JSON document based on a JSON schema. E.g., the JSON editor.
- To create the initial JSON schema definition for your entity types, you can use https://jsonschema.net/. However, this should be used only as a starting point as the generated schema is often not easy to read nor maintain.
Ingest Process
The ingest is a (potentially) long-running process that ingests many items in different steps. Every ingest operation for a single entity can potentially span multiple database tables and even different services.
High-level process:
- Upload the ingest document via GraphQL API.
- Validate the overall structural integrity of the ingested document. For JSON, this would be done with a JSON schema with an overall validation (not based on the ingest item).
- Ensure that every main entity exists in the database (e.g. in the movies/episodes table). If there is none yet, a new entity will be created with the minimum required information. This step must finish for all entities before the next step can start. Further steps can run independent of each other.
- Start the ingest for external systems. For the media template, it is about ingesting videos and images. Wait for message responses and update the entities accordingly.
- Update the metadata of the main entity and all its related entities.
- Wait until all ingest items are finished and finalize the ingest.
The following state diagram shows the full ingest process starting from the GraphQL API which receives an ingest document:
A note on idempotency:
All ingest operations should work in an idempotent way. If something is applied more than once, it should not give a different result compared to if it was done only once. For example, "add tag" would not be idempotent if it would always add a new tag. If that operation is called twice, it would add a tag twice. Instead, it should be created as "set tags" where it would make sure that the given tags exist. It would not make any difference if it was called once or ten times. In the end, the desired tags would be there only once.
Integrating other services should follow this approach as well. When another service is asked to ingest an entity, it should check if that exact entity (e.g. video or image) already exists in the system. If this is the case (same source image location or same video location), the existing database ID is returned instead of creating a new entity and processing the image/video. If it does not exist, the external service must first create a new DB entry for the item (an image or a video) entity, start the job for the image or video import, and immediately return the ID of that entity (potentially, with other data) in the API response. The actual video transcoding job and the import of an image are immediately created but will finish in the background. In both cases, the ingest operation will remember the returned database ID. With this logic implemented, it does not matter how often the external API is called. It only ever creates the entity once and uses this existing DB entity for each following call.
Idempotency is especially important for ingest operations. They are often done in an iterative way, where the ingest file is updated over time to fix and improve the metadata of the entities. If some operation fails, it must be retried. And the result of a second/third/... retry should not be any different to the ingest if it would have succeeded on the first try.
Database Schema
The ingest process uses multiple database tables to store the ingest data
and track the progress. The ingest_documents
table contains the JSON ingest
document and fields to track errors and the overall progress. The ingest_items
table holds the data for a single ingest item, while the ingest_item_steps
table captures all the orchestration steps for that ingest item.
Ingest Document Upload
A GraphQL API is a part of the Media Service that accepts the ingest document as a part of the request (JSON file as a stream). In the API, it is decoded as JSON, parsed, and pre-validated (via the corresponding JSON schema file or custom validation rules that do not rely on making database requests). If the pre-validation fails, the ingest is not started and a GraphQL error is returned as the API response, containing a list of validation errors. In case of JSON schema validation, the path, line, and column values are also specified to easily locate invalid data.
If the basic validation is fine, a new ingest entity is created in the table
ingest_documents
.
Ensure the Main Database Entities Exist
During the file upload, the ingest logic makes sure that all the main entities
exist before any further work starts. A "main entity" refers to the main
database table entry for example, for a movie or episode which all related tables would
reference. For a movie, the main table is movies
, while related data like the
tags and production countries are stored in the movies_tags
and
movies_production_countries
tables.
Every ingest item contains the entity type (movie/tvshow/episode/etc. and the external ID. The external ID is a unique identifier that the external data provider generates. It must be unique per entity type. The ingest checks that all entities already exist in the database, based on that external ID. If one of them does not exist yet, the entity is created in the most minimal way possible with only the external ID and the fields required by the database schema. Only then are all the other tasks for adding relations and ingesting external data started. If the ingest items would be ingested without a guarantee that all the main entities exist, it would have to be done sequentially. In this case, it would be very hard (or impossible) to figure out the correct order in which the items need to be ingested.
As some entity types depend on others (e.g. episode depends on the season), the sort order to create those entities matters. For the Media Template implementation, the order is the following:
- Make sure all the TV shows exist. They are required for seasons to be created.
- Make sure all the seasons exist. They are required for episodes to be created.
- Continue with episodes, then movies (however, for those, the order does not really matter anymore).
For each ingest item, an entity is created in the table ingest_items
from the
JSON document data
part of that item. For data mapping purposes, it contains
the external ID value from the JSON ingest item, the entity type
(MOVIE
/TVSHOW
/SEASON
/EPISODE
), and the database ID of the main entity. It
contains the JSON data part from the ingest document that belongs to this entity
ingest.
For every ingest item, a StartIngestItemCommand
message is sent (through
RabbitMQ) that triggers the background process for each item.
Ingest Item Handler
The StartIngestItemHandler
processes every StartIngestItemCommand
. It
checks which entity type should be ingested and calls the corresponding
processor. The processor analyzes the ingest item data and decides which steps
are necessary. It then sends out commands to update the metadata, to ensure that
the main and trailer videos exist, and to make sure that the referenced images also
exist.
Each message handler for these commands is responsible for handling one specific part of the entity ingest process. This is based on the Mosaic message bus implementation. Each command carries the required fields that the handler needs. Moreover, it also carries some contextual information. The contextual information is sent along by the message handlers to later enable the mapping of messages to the ingest item entities.
Metadata Update
The UpdateMetadataCommand
triggers the handler that is responsible for bringing
the entity into the desired state. As the data is stored in PostgreSQL (a
relational database), it is likely that the main entity is stored in one table
(the description for a movie is stored in the movies table), while other data is
stored in related tables (e.g. movie tags or movie genre relations). This
ingest task makes sure to run all these metadata updates in a single database
transaction. All the metadata updates must succeed. Otherwise, no change is
applied at all.
The following logic is used in the Media Template to match each metadata property (title, description, release year, etc.) with the system entities:
- If a property is entirely missing (undefined): ignore that property and do not apply it.
- If a property has any value, it is applied. This includes null/empty/default values, such as the empty string, zero for a number, an empty array for an array property, or an empty object, if applicable.
- Array input types and related assignments are fully replaced. This approach is always used: both for array type PostgresSQL fields as well as related tables, such as movie tags or movie cast. The logic is to bring the entity into the desired state. Therefore, every array element that was not mentioned in the ingest is removed and the missing ones are added.
- If an unknown property is provided in the ingest document item, it is ignored.
Considerations:
- Mandatory fields or validation rules are not handled in any specific way during the metadata updates. The processing logic creates all the needed insert, update, and delete commands and executes them. The database defined validation rules are used to see whether the data can be saved.
- The general vision of Media Template is to use a rather relaxed approach for the input validation. Mostly, it tries to save any data as long as the mandatory properties are available (e.g. the title or some season ID). The Media Template does not use many required fields or field length restrictions where they are not really needed. Instead, it rather depends on the publish validation logic to define whether an item can be published or not.
- The initial task already made sure that all the main entities that were mentioned in the ingest file exist and that all required fields have a value. For some items, we need to look up the target of the relation. For example, to assign a movie to a genre, we would need to find the genre by the genre title and relate it by its database ID. This is also required when (re-) assigning a season to a TV show. Errors are more likely to happen in that kind of assignment when dependencies are missing. If any such related item cannot be found, the full metadata update is not partially executed. Instead, it fails completely.
Image Ingest
Images are not managed as a part of the Media Service. They are kept and maintained in the Image Service. This service is responsible for downloading images from a source location and storing them in its storage.
For each image ingest, a separate EnsureImageExistsStartCommand
is sent.
If one command processing fails, the others can still proceed. If an ingest
document has a movie entity that defines a cover and a teaser image, there
would be two image-ingest tasks for that movie. The Image Service ingest-handler
handles the command in an idempotent way, defined in the "ingest process"
section. The message format and ingest logic are defined in more detail in the Image
Service documentation.
The data in the ingest document must provide the following fields:
- the image relative path - from where the Image Service should download the image
- the image type - for the correct assignment to the movie image type (e.g. movie_cover).
Actions for ingesting images:
- Send the
EnsureImageExistsStartCommand
with the data defined above. - The Image Service checks if an image from that exact relative path was already
ingested in the past.
- If it was, it simply returns the existing image id as the
EnsureImageExistsAlreadyExistedEvent
- If it was ingested before, but the image was ingested under a different type - an
error is sent as the
EnsureImageExistsFailedEvent
. - If the image does not exist, it is downloaded, verified for validity,
and uploaded to the blob storage. It then sends the
EnsureImageExistsImageCreatedEvent
or theEnsureImageExistsFailedEvent
if something failed.
- If it was, it simply returns the existing image id as the
- In the Media Service, the
ImageSucceededHandler
processes the two success event messages in the same way:- Loads the corresponding ingest item entity.
- Updates the image relation, for example, for the movie cover using the image type from the received message context.
- Marks that image as being handled in the ingest item entity.
- The error message event contains an error message text that is written into the errors array of a corresponding ingest item entity.
Video Ingest
Videos are managed in the Video Service. The service manages the video data and uses the encoder to bring the source videos into the desired output format.
The ingested entity types can have a single video or multiple videos. For example, movies and episodes can have one (single) "main video". Moreover, movies, TV shows, seasons, and episodes can have a list of trailers.
For every video, the ingest process sends one
EnsureVideoExistsStartCommand
to the Video Service. The service includes a
message handler to handle this command. It follows the idempotent approach,
defined in the "overall ingest process" definition.
The ingest item has separate properties for the main video (object) and for trailers (array of objects). The data that must be provided for each video object is:
- The source video folder - for the relative path.
- Optionally, the video transcoding profile which defines the transcoding settings to use. This profile defines the output format (HLS, DASH, DASH_HLS, or CMAF), if DRM should be applied, and many more settings.
Actions for ingesting videos:
- Send the
EnsureVideoExistsStartCommand
with the data defined above. - The Video Service checks if a video from that exact relative path was already
ingested in the past.
- If it was, it simply returns the existing video id as the
EnsureVideoExistsAlreadyExistedEvent
. - If the video does not exist, it starts the transcoding job that downloads, verifies,
transcodes, packages, applies DRM protection to the video, and stores the video in
the target location. The Video Service immediately sends the
EnsureVideoExistsCreationStartedEvent
without waiting for the transcoding job to finish. - If the transcoding fails, the
EnsureVideoExistsFailedEvent
is sent-
- If it was, it simply returns the existing video id as the
- In the Media Service, the
VideoSucceededHandler
processes the two success event messages in the same way:- Loads the corresponding ingest item entity.
- Checks the received event to see whether the video is of type
main
ortrailer
. - If it is for the main video, it updates the video relation and marks the video as being handled in the ingest item entity.
- If it is for a trailer video, it updates the video relation and marks the
corresponding video as being handled in the ingest item entity. Only after all
the trailer events are received, it updates the movie trailers in the
database. This may add new or remove existing trailers.
- The error message event contains an error message text that is written into the errors array of a corresponding ingest item entity.
Localizations Ingest
The Localization Service manages localizations for those fields of your entities that are localizable. Multiple locales can be defined and ingested.
Localizations are optional
If the ingest document does not specify localizations for an ingested entity, they will be skipped. This is the same behavior as for videos and images.
Localization can be completely disabled for the Media Service. In this case, the ingest will skip localization processing even if they are specified for an ingested entity.
Furthermore, if you specify localizations for multiple locales, only locales that are defined in the Localization Service settings would be processed, skipping the ones that are not defined.
Processing in a single step
The LocalizeEntity
command of the Localization Service stores localizations
for a single localizable entity (e.g. movie). But it handles localizations for
multiple locales at the same time. Because of this there will be only one
LOCALIZATIONS
step for each ingested item.
While it is possible to send multiple LocalizeEntity
commands, one for each
locale, this would produce an unnecessary spam of messages and make the process
less performant.
Processing of localizations is delayed
Before the LocalizeEntity
command can be sent, the Localization Service must
know what kind of entity we want to process, what kind of localizable fields it
has, what rules should be applied to said fields, etc... This information is
synchronized with the Localization Service on the Media Service startup in form
of Localizable Entity Definitions. This synchronization must be finished before
ingest or other code can use localizations.
Each time an entity is created or updated in the Media Service, the information
of that particular entity is sent to the Localization Service using the
UpsertLocalizationSourceEntity
command. The entities in origin service with
their field values are called "source entities". The sent source entity metadata
is associated in the Localization Service with the entity definition. It becomes
a parent of further localizations that are later sent using the LocalizeEntity
command.
Whenever a source entity is changed in the Media Service dedicated database
triggers detect the change, prepare the data which is needed for the
UpsertLocalizationSourceEntity
command, and then this data is used to send the
actual command using the transactional inbox mechanism.
The important point here is that UpsertLocalizationSourceEntity
command is
able to change the localized states of individual fields. For example, let’s say
you have created a movie, filled all its localizable fields, went to the
localization workflows and filled localization value for all locales, and
approved said localizations. If you now change the description
of this movie -
an UpsertLocalizationSourceEntity
command will be sent, it will update the
stored source description
value, and it will change the state of description
localizations for all locales from Approved
to Untranslated
. This is done
as it is very likely when the source values change that localized values also
need to be changed - or at least be verified again.
Lets check what steps are taken when a new movie is ingested.
- When the initial entity is created it only sets the minimum amount of fields.
For movies this is just the title. A database trigger will then start the
process to send a
UpsertLocalizationSourceEntity
message with the value of thetitle
field. - The ingest continues and the
UpdateMetadata
step sets all other field values. This will agin trigger the sending of a newUpsertLocalizationSourceEntity
command including the values from thedescription
andsynopsis
field (as they were changed). - In a separate ingest step a cover image is assigned to the movie. This will
also trigger the
UpsertLocalizationSourceEntity
command which includes only the newimage_id
that will be shown in the localization UI.
The first two points perform source data updates, and have a potential to change
the localizations state. This means, if we first send the LocalizeEntity
command for the entity, and only after this we process the UpdateMetadata
step -
the UpdateMetadata
step would send UpsertLocalizationSourceEntity
, update the
description
and synopsis
states, basically invalidating the states set by
the LocalizeEntity
command.
Because of this particular case, the LOCALIZATIONS
step is not executed as
soon as possible, and is delayed to be executed only when a response event is
received for the UpsertLocalizationSourceEntity
command that was sent by the
UpdateMetadata
step.
UpdateMetadata adjustments
To support the delayed processing of localizations during ingest, the
responsibility to initiate said step is delegated to the latest step that could
affect the proper processing of localizations - the UpdateMetadata
ingest
step.
Because UpsertLocalizationSourceEntity
is not directly sent by the ingest
process, and works on its own using dedicated database triggers, we need a way
to "plug" into it. To do so, a dedicated ingest_correlation_id
column is added
to the tables of all main ingestable entities.
When UpdateMetadata
performs a relevant update of a localizable fields - we
also set the ingest_correlation_id
to the value of Ingest Item ID. The
database triggers are adjusted to recognize this property and are made sure to
include in into the message_context
that is sent along the resulting
UpsertLocalizationSourceEntity
command. This way, we preserve the ingest
context and associate the command with an ongoing ingest operation.
Usually, the UpsertLocalizationSourceEntity
command of the Localization
Service does not send response event messages. This is because, most of the
time, no one will actually listen to the response events, this command is
expected to execute a lot, and we want to avoid unnecessary spam of event
messages.
But, in cases like ingest, response events are actually important to send, so if
the received command has at least one property in the message_context
we
assume that a response event is expected, so the command handler sends it.
In this particular case, the Media Service will receive the
UpsertLocalizationSourceEntityFinished
event, retrieve the Ingest Item ID from
the message_context
, and will initiate the LocalizeEntity
command. At this
point, all localizable source fields are guaranteed to be processed for
this particular entity, so we can safely start the LOCALIZATIONS
step without
worrying about localization field states being overridden.
The value of ingest_correlation_id
only matters for a single database
update query withing a whole UpdateMetadata
database transaction. To avoid
other updates being mistaken for being done in the context of an ingest, the
ingest_correlation_id
is set back to null
right after the original update,
but within the same database transaction. This means that when explicitly
querying the database - ingest_correlation_id
will always be null
.
Finalizing the localizations processing
When an image or video relation is ingested - the Media Service usually stores some sort of relational information about it, e.g. a unique identifier. When it comes to Localization, the Localization Service itself stores the identifiers of a source entity in form of entity ID, entity Type, and originating Service ID.
This means that the Media Service itself does not have to store any localization-related identifiers on its side and can just use the APIs and functionality of the Localization Service to work with localizations.
Summary
The ingest process sends both the source entity field values and localizations values to the Localization Service. The ingest process is split into multiple steps that can be executed in parallel. To preserve the integrity of the data sent to the Localization Service, localization-related functionality is execute in a deferred way, after the source data is processed.
Security
The ingest adheres to the same authentication rules as any other code. There are permissions in place that allow somebody to use the ingest (or not). And there are permissions in place to read or mutate specific entities, such as movies or episodes. The Ingest API, as well as every message handler, validates those as well.
The GraphQL ingest API validates the authentication token of the user and checks if the user has ingest rights. Moreover, for each entity type that is going to be ingested, it verifies whether the user has the permission to mutate this entity type.