This is the multi-page printable view of this section.
Click here to print.
Return to the regular view of this page.
Event transformations
Transform the event schema, add or remove the data, enrich, or filter out events using complex rules.
During the replication, you might want to transform events using some complex rules. For example, some fields need to be removed, the JSON schema should change, or some data need to be merged, split, or even enriched with external data.
Transformations allow you:
- Move events to another stream
- Change the event type
- Manipulate the event data, like changing field names and values, or even the structure
- Same, but for metadata
- [WIP] Slit one event into multiple events
For this purpose, you can use the transformation function. Find out more about available transformation options on pages listed below.
1 - JavaScript transformation
Transform and filter events using in-proc JavaScript function
Introduction
When you need to perform simple changes in the event schema, change the stream name or event type based on the existing event details and data, you can use a JavaScript transform.
Warning:
JavaScript transforms only work with JSON payloads.
For this transform, you need to supply a code snippet, written in JavaScript, which does the transformation. The code snippet must be placed in a separate file, and it cannot have any external dependencies. There’s no limitation on how complex the code is. Replicator uses the V8 engine to execute JavaScript code. Therefore, this transform normally doesn’t create a lot of overhead for the replication.
Guidelines
You can configure Replicator to use a JavaScript transformation function using the following parameters:
replicator.transform.type
- must be set to js
replicator.transform.config
- name of the file, which contains the transformation function
For example:
replicator:
transform:
type: js
config: ./transform.js
The function itself must be named transform
. Replicator will call it with the following arguments:
Stream
- original stream name
EventType
- original event type
Data
- event payload as an object
Metadata
- event metadata as an object
The function must return an object, which contains Stream
, EventType
, Data
and Metadata
fields. Both Data
and Metadata
must be valid objects, the Metadata
field can be undefined
. If you haven’t changed the event data, you can pass Data
and Metadata
arguments, which the function receives as arguments.
Logging
You can log from JavaScript code directly to Replicator logs. Use the log
object with debug
, info
, warn
and error
. You can use string interpolation as usual, or pass templated strings in Serilog format. The first parameter is the template string, plus you can pass up to five additional values, which could be values or objects.
For example:
log.info(
"Transforming event {@Data} of type {Type}",
original.data, original.eventType
);
Remember that the default log level is Information
, so debug logs won’t be shown. Enable debug-level logging by setting the REPLICATOR_DEBUG
environment variable to true
.
Example
Here is an example of a transformation function, which changes the event data, stream name, and event type:
function transform(original) {
// Log the transform calls
log.info(
"Transforming event {Type} from {Stream}",
original.EventType, original.Stream
);
// Ignore some events
if (original.Stream.length > 7) return undefined;
// Create a new event version
const newEvent = {
// Copy original data
...original.Data,
// Change an existing property value
Data1: `new${original.Data.Data1}`,
// Add a new property
NewProp: `${original.Data.Id} - ${original.Data.Data2}`
};
// Return the new proposed event with modified stream and type
return {
Stream: `transformed${original.Stream}`,
EventType: `V2.${original.EventType}`,
Data: newEvent,
Meta: original.Meta
}
}
If the function returns undefined
, the event will not be replicated, so the JavaScript transform can also be used as an advanced filter. The same happens if the transform function returns an event, but either the stream name or event type is empty or undefined
.
2 - HTTP transformation
Transform and filter events using out-of-process HTTP server
Introduction
An HTTP transformation can be used for more complex changes in event schema or data, which is done in an external process. It allows you to use any language and stack, and also call external infrastructure to enrich events with additional data.
When configured accordingly, Replicator will call an external HTTP endpoint for each event it processes, and expect a converted event back. As event data is delivered as-is (as bytes) to the transformer, there’s no limitation of the event content type and serialisation format.
Note
Right now, events are sent to the transformer one by one. It guarantees the same order of events in the sink store, but it may be slow. We plan to enable event batching to speed up external transformations.
Guidelines
Before using the HTTP transformation, you need to build and deploy the transformation function, which is accessible using an HTTP(S) endpoint. The endpoint is built and controlled by you. It must return a response with a transformed event with 200
status code, or 204
status code with no payload. When the Replicator receives a 204
back, it will not propagate the event, so it also works as an advanced filter.
The transformation configuration has two parameters:
replicator.transform.type
- should be http
for HTTP transform
replicator.transform.config
- the HTTP(S) endpoint URL
For example:
replicator:
transform:
type: http
config: http://transform-func.myapp.svc.cluster.local
Replicator doesn’t support any authentication, so the endpoint must be open and accessible. You can host it at the same place as Replicator itself to avoid the network latency, or elsewhere. For example, your transformation service can be running in the same Kubernetes cluster, so you can provide the internal DNS name to its service. Alternatively, you can use a serverless function.
Replicator will call your endpoint using a POST
request with JSON body.
The request and response formats are the same:
{
"eventType": "string",
"streamName": "string",
"metadata": "string",
"payload": "string"
}
Your HTTP transformation can modify any of these four properties.
Both metadata
and payload
are UTF-8 encoded bytes as a string.
If you store your events in JSON format, payload
will be a JSON string that you can deserialize.
metadata
is always a JSON string.
If your endpoint returns HTTP status code 204
, the event will be ignored and wont be replicated to the sink.
Example
Here is an example of a serverless function in GCP, which transforms each part of the original event:
using System.Text.Json;
using Google.Cloud.Functions.Framework;
using Microsoft.AspNetCore.Http;
using System.Threading.Tasks;
namespace HelloWorld {
public class Function : IHttpFunction {
public async Task HandleAsync(HttpContext context) {
var original = await JsonSerializer
.DeserializeAsync<HttpEvent>(context.Request.Body);
var payload = JsonSerializer
.Deserialize<TestEvent>(original.Payload);
payload.EventProperty1 = $"Transformed {payload.EventProperty1}";
var metadata = JsonSerializer
.Deserialize<Metadata>(original.Metadata);
metadata.MetadataProperty1 = $"Transformed {metadata.MetadataProperty1}";
var proposed = new HttpEvent {
StreamName = $"transformed-{original.StreamName}",
EventType = $"V2.{original.EventType}",
Metadata = JsonSerializer.Serialize(metadata),
Payload = JsonSerializer.Serialize(payload)
};
await context.Response
.WriteAsync(JsonSerializer.Serialize(proposed));
}
class HttpEvent {
public string EventType { get; set; }
public string StreamName { get; set; }
public string Metadata { get; set; }
public string Payload { get; set; }
}
class Metadata {
public string MetadataProperty1 { get; set; }
public string MetadataProperty2 { get; set; }
}
class TestEvent {
public string EventProperty1 { get; set; }
public string EventProperty2 { get; set; }
}
}
}
The TestEvent
is the original event contract, which is kept the same. However, you are free to change the event schema too, if necessary.