Skip to main content
This page provides concrete data examples for each transform type. For setup instructions and configuration details, see the individual transform pages linked in each section. Pipeline transforms run in the Apache Flink streaming layer and are defined by you. They execute in the order you specify. See Transform Ordering for sequencing guidance.

Filter

Use Transform / Filter Records to keep or discard records based on custom logic. In this example, only orders with status equal to "completed" pass through. Before — input record:
{
  "key": { "id": 1001 },
  "value": {
    "id": 1001,
    "customer_id": 55,
    "status": "completed",
    "amount": 249.99,
    "created_at": "2025-11-02T14:30:00Z"
  }
}
After — record passes the filter (status is “completed”):
{
  "key": { "id": 1001 },
  "value": {
    "id": 1001,
    "customer_id": 55,
    "status": "completed",
    "amount": 249.99,
    "created_at": "2025-11-02T14:30:00Z"
  }
}
A record that does not match the filter is dropped entirely and never reaches downstream transforms or the destination. Dropped — record does not pass the filter (status is “pending”):
{
  "key": { "id": 1002 },
  "value": {
    "id": 1002,
    "customer_id": 78,
    "status": "pending",
    "amount": 59.00,
    "created_at": "2025-11-02T15:10:00Z"
  }
}
This record is discarded. No output is produced for it.

Enrich

Use Enrich (Async) to call an external REST API and add data to each record. In this example, a product record is enriched with inventory data from a warehouse API. Before — input record:
{
  "key": { "product_id": "SKU-4421" },
  "value": {
    "product_id": "SKU-4421",
    "name": "Wireless Keyboard",
    "category": "peripherals",
    "price": 79.99
  }
}
After — enriched with external API data:
{
  "key": { "product_id": "SKU-4421" },
  "value": {
    "product_id": "SKU-4421",
    "name": "Wireless Keyboard",
    "category": "peripherals",
    "price": 79.99,
    "warehouse_stock": 342,
    "warehouse_location": "US-WEST-2",
    "last_restocked": "2025-10-28T08:00:00Z"
  }
}
The three new fields (warehouse_stock, warehouse_location, last_restocked) were returned by the external API and merged into the record.
You can also use the SQL-based Enrich transform to look up values from a cached Kafka topic instead of calling an external API.

Fan Out

Use Fan Out to route a single input record to different output topics based on field values. In this example, event records are routed to separate topics by their event_type field. Before — input record:
{
  "key": { "event_id": "evt-9001" },
  "value": {
    "event_id": "evt-9001",
    "user_id": 204,
    "event_type": "purchase",
    "item": "Annual Plan",
    "amount": 199.00,
    "timestamp": "2025-11-03T09:22:00Z"
  }
}
After — routed to the events.purchases output topic:
{
  "key": { "event_id": "evt-9001" },
  "value": {
    "event_id": "evt-9001",
    "user_id": 204,
    "event_type": "purchase",
    "item": "Annual Plan",
    "amount": 199.00,
    "timestamp": "2025-11-03T09:22:00Z"
  }
}
A different record with "event_type": "page_view" would be routed to a different output topic (e.g., events.page_views) based on the topic transform logic. The record data itself may remain unchanged — it is the output topic that changes.

Destination-Side Transforms (SMTs)

Destination-side transforms run automatically on the destination connector as Single Message Transforms (SMTs). They execute in a fixed system-managed order after all pipeline transforms. See Transform Ordering for details on execution sequence.

ToJsonJ / ToJsonbJ

Converts a structured field (nested object, map, or array) into a JSON string representation. This is useful when your destination column expects a flat JSON string rather than a nested structure. Before — record with a nested address object:
{
  "customer_id": 55,
  "name": "Alice Martin",
  "address": {
    "street": "742 Evergreen Terrace",
    "city": "Springfield",
    "state": "IL",
    "zip": "62704"
  }
}
After — address field converted to a JSON string:
{
  "customer_id": 55,
  "name": "Alice Martin",
  "address": "{\"street\":\"742 Evergreen Terrace\",\"city\":\"Springfield\",\"state\":\"IL\",\"zip\":\"62704\"}"
}
The address field is now a single string containing the JSON representation of the original nested object.
ToJsonJ and ToJsonbJ both convert structured fields to JSON strings. The ToJsonbJ variant produces PostgreSQL-compatible JSONB output. The differences may vary by destination — choose the variant that matches your destination’s expected column type.

ToIntJ / ToFloatJ / ToStringJ

Casts a field value from one type to another. Use these transforms when the source data type does not match the destination column type. ToIntJ — string to integer: Before:
{
  "order_id": 3001,
  "quantity": "42",
  "product": "Widget"
}
After:
{
  "order_id": 3001,
  "quantity": 42,
  "product": "Widget"
}
The quantity field changed from the string "42" to the integer 42. ToFloatJ — string to float: Before:
{
  "order_id": 3001,
  "total_price": "149.95",
  "currency": "USD"
}
After:
{
  "order_id": 3001,
  "total_price": 149.95,
  "currency": "USD"
}
The total_price field changed from the string "149.95" to the float 149.95. ToStringJ — integer to string: Before:
{
  "order_id": 3001,
  "zip_code": 62704,
  "city": "Springfield"
}
After:
{
  "order_id": 3001,
  "zip_code": "62704",
  "city": "Springfield"
}
The zip_code field changed from the integer 62704 to the string "62704".

RenameFields

Renames one or more fields to match destination naming conventions or resolve naming conflicts. Before — original field names:
{
  "custId": 55,
  "custName": "Alice Martin",
  "custEmail": "alice@example.com",
  "created": "2025-11-01T10:00:00Z"
}
After — fields renamed to match destination schema:
{
  "customer_id": 55,
  "customer_name": "Alice Martin",
  "customer_email": "alice@example.com",
  "created": "2025-11-01T10:00:00Z"
}
Three fields were renamed: custId to customer_id, custName to customer_name, and custEmail to customer_email. The created field was not included in the rename configuration and remains unchanged.

DropFields

Removes specified fields from the record before it reaches the destination. Use this to exclude sensitive data, internal metadata, or fields that are not needed in the destination. Before — record with internal metadata fields:
{
  "order_id": 3001,
  "customer_id": 55,
  "amount": 249.99,
  "status": "completed",
  "__source_ts_ms": 1730500200000,
  "__deleted": false,
  "__op": "c"
}
After — internal metadata fields removed:
{
  "order_id": 3001,
  "customer_id": 55,
  "amount": 249.99,
  "status": "completed"
}
The three fields prefixed with __ were dropped from the record. The remaining fields are delivered to the destination unchanged.

CopyField

Copies the value of an existing field into a new field. The original field is preserved. Before — record with a single email field:
{
  "user_id": 204,
  "email": "alice@example.com",
  "name": "Alice Martin",
  "role": "admin"
}
After — email copied to contact_email:
{
  "user_id": 204,
  "email": "alice@example.com",
  "name": "Alice Martin",
  "role": "admin",
  "contact_email": "alice@example.com"
}
The original email field remains intact. A new contact_email field was added with the same value. This is useful when a destination requires the same data under a different field name while the original must also be preserved.

RenameKeyFields

Renames one or more fields in the message key (as opposed to the record value). This is useful when the destination expects specific key field names for primary key matching or partitioning, but the source key field names differ. Before — original key field names:
{
  "key": { "id": 1001 },
  "value": {
    "id": 1001,
    "customer_id": 55,
    "status": "completed"
  }
}
After — key field renamed to match destination convention:
{
  "key": { "order_id": 1001 },
  "value": {
    "id": 1001,
    "customer_id": 55,
    "status": "completed"
  }
}
The key field id was renamed to order_id. The record value is unchanged — only the message key is affected.

AddStringSuffix

Appends a suffix string to the value of a specified field. This is useful for adding units, environment tags, or other fixed suffixes to field values before they reach the destination. Before — field without suffix:
{
  "order_id": 3001,
  "region": "us-east",
  "status": "completed"
}
After — region field with "-prod" suffix appended:
{
  "order_id": 3001,
  "region": "us-east-prod",
  "status": "completed"
}
The suffix "-prod" was appended to the region field value. All other fields remain unchanged.

StringReplace

Replaces occurrences of a pattern within string field values. Use this when you need to normalize or sanitize field values before they reach the destination. Before — field with unwanted characters:
{
  "user_id": 204,
  "phone": "+1 (555) 123-4567",
  "name": "Alice Martin"
}
After — phone field with non-digit characters replaced:
{
  "user_id": 204,
  "phone": "15551234567",
  "name": "Alice Martin"
}
The pattern matching non-digit characters in the phone field was replaced with empty strings, leaving only digits. The transform uses regex-based pattern matching.

HeaderToFieldCustom

Copies a Kafka message header value into a record field. This is useful when metadata is carried in Kafka headers (e.g., source connector name, event timestamp, or correlation IDs) and you need it available as a regular field in the destination. Before — record without header data in fields:
{
  "key": { "id": 1001 },
  "headers": {
    "source_connector": "mysql-prod-01",
    "event_time": "2025-11-03T09:22:00Z"
  },
  "value": {
    "id": 1001,
    "amount": 249.99,
    "status": "completed"
  }
}
After — header value copied into a record field:
{
  "key": { "id": 1001 },
  "value": {
    "id": 1001,
    "amount": 249.99,
    "status": "completed",
    "source_connector": "mysql-prod-01"
  }
}
The source_connector header value was copied into the record value as a new field. The original header remains on the message.

MarkColumnsAsOptional (ToOptional)

Marks specified columns as optional (nullable) in the schema. This is useful when your source schema defines fields as NOT NULL, but the destination schema requires them to be nullable — for example, when certain columns are only populated for some record types, or when schema evolution introduces new columns that existing rows do not have values for. Before — schema marks middle_name as required (non-null):
{
  "schema": {
    "fields": [
      { "field": "user_id", "type": "int32", "optional": false },
      { "field": "first_name", "type": "string", "optional": false },
      { "field": "middle_name", "type": "string", "optional": false },
      { "field": "last_name", "type": "string", "optional": false }
    ]
  },
  "payload": {
    "user_id": 204,
    "first_name": "Alice",
    "middle_name": "Marie",
    "last_name": "Martin"
  }
}
After — middle_name marked as optional (nullable):
{
  "schema": {
    "fields": [
      { "field": "user_id", "type": "int32", "optional": false },
      { "field": "first_name", "type": "string", "optional": false },
      { "field": "middle_name", "type": "string", "optional": true },
      { "field": "last_name", "type": "string", "optional": false }
    ]
  },
  "payload": {
    "user_id": 204,
    "first_name": "Alice",
    "middle_name": "Marie",
    "last_name": "Martin"
  }
}
The middle_name field changed from "optional": false to "optional": true. The payload data is unchanged — only the schema metadata is modified. This prevents schema enforcement errors at the destination when a record has a null value for that field.

Pipeline Transform Examples

Join

Use Join to combine records from two or more input topics into a single output record using SQL join logic. In this example, an orders topic is joined with a customers topic on a shared customer_id field. Input — record from orders topic:
{
  "key": { "order_id": 3001 },
  "value": {
    "order_id": 3001,
    "customer_id": 55,
    "amount": 249.99,
    "status": "completed",
    "created_at": "2025-11-02T14:30:00Z"
  }
}
Input — record from customers topic:
{
  "key": { "customer_id": 55 },
  "value": {
    "customer_id": 55,
    "name": "Alice Martin",
    "email": "alice@example.com",
    "tier": "gold"
  }
}
After — joined output record:
{
  "key": { "order_id": 3001 },
  "value": {
    "order_id": 3001,
    "customer_id": 55,
    "amount": 249.99,
    "status": "completed",
    "created_at": "2025-11-02T14:30:00Z",
    "customer_name": "Alice Martin",
    "customer_email": "alice@example.com",
    "customer_tier": "gold"
  }
}
The join matched records from both topics on customer_id = 55 and merged the customer fields into the order record. The output field names (customer_name, customer_email, customer_tier) are defined in the join SQL logic.
Join transforms use SQL syntax to define the join condition and output fields. See Join for setup instructions and SQL examples.

Chained Transforms

Pipeline transforms execute sequentially — each transform receives the output of the previous one. This example shows data flowing through three transforms in order: Filter, Enrich, and Transform / Filter Records (used as a map). Stage 0 — original input record:
{
  "key": { "order_id": 3001 },
  "value": {
    "order_id": 3001,
    "customer_id": 55,
    "amount": 249.99,
    "status": "completed",
    "created_at": "2025-11-02T14:30:00Z"
  }
}
Stage 1 — after Filter (keep only orders with amount > 100):
{
  "key": { "order_id": 3001 },
  "value": {
    "order_id": 3001,
    "customer_id": 55,
    "amount": 249.99,
    "status": "completed",
    "created_at": "2025-11-02T14:30:00Z"
  }
}
Record passes through because amount (249.99) exceeds the threshold. Orders with amount <= 100 are dropped at this stage and never reach subsequent transforms. Stage 2 — after Enrich (add customer tier from cached topic):
{
  "key": { "order_id": 3001 },
  "value": {
    "order_id": 3001,
    "customer_id": 55,
    "amount": 249.99,
    "status": "completed",
    "created_at": "2025-11-02T14:30:00Z",
    "customer_tier": "gold"
  }
}
The customer_tier field was added by looking up customer_id = 55 in the cached customers topic. Stage 3 — after Transform / Filter Records (compute a new field):
{
  "key": { "order_id": 3001 },
  "value": {
    "order_id": 3001,
    "customer_id": 55,
    "amount": 249.99,
    "status": "completed",
    "created_at": "2025-11-02T14:30:00Z",
    "customer_tier": "gold",
    "priority": "high"
  }
}
The transform logic computed a priority field: orders from "gold" tier customers with amount > 200 are tagged "high". This field was only possible to compute because the enrichment step added customer_tier first.
For guidance on ordering pipeline transforms, see Transform Ordering.

Real-World Patterns

These patterns show how to combine transforms to solve common data engineering challenges.

PII Masking

Use DropFields to remove sensitive fields before data reaches the destination, or use Transform / Filter Records to mask values while preserving the field structure. Approach 1 — Drop sensitive fields entirely with DropFields: Configure DropFields to remove fields such as ssn, credit_card_number, and date_of_birth. The fields are stripped from the record before it lands in the destination. Before:
{
  "user_id": 204,
  "name": "Alice Martin",
  "email": "alice@example.com",
  "ssn": "123-45-6789",
  "credit_card_number": "4111111111111111"
}
After (DropFields removes ssn and credit_card_number):
{
  "user_id": 204,
  "name": "Alice Martin",
  "email": "alice@example.com"
}
Approach 2 — Mask field values with Transform / Filter Records: Use a JavaScript-based Filter transform to replace sensitive values with masked versions while keeping the field present.
// Transform / Filter Records logic
function transform(record) {
  if (record.value.email) {
    var parts = record.value.email.split("@");
    record.value.email = parts[0].substring(0, 2) + "***@" + parts[1];
  }
  if (record.value.ssn) {
    record.value.ssn = "***-**-" + record.value.ssn.slice(-4);
  }
  return record;
}
Before:
{
  "user_id": 204,
  "email": "alice@example.com",
  "ssn": "123-45-6789"
}
After (values masked, fields preserved):
{
  "user_id": 204,
  "email": "al***@example.com",
  "ssn": "***-**-6789"
}
PII masking should be applied as early as possible in your pipeline to prevent sensitive data from reaching intermediate storage. Use pipeline transforms (Flink) rather than destination-side transforms when possible.

Date Formatting

Use Transform / Filter Records to convert timestamps between formats. This is common when the source produces ISO 8601 timestamps but the destination expects epoch milliseconds, or when you need to extract date parts.
// Transform / Filter Records logic -- convert ISO timestamp to epoch milliseconds
function transform(record) {
  if (record.value.created_at) {
    record.value.created_at_epoch = new Date(record.value.created_at).getTime();
  }
  return record;
}
Before:
{
  "order_id": 3001,
  "created_at": "2025-11-02T14:30:00Z"
}
After (new epoch field added):
{
  "order_id": 3001,
  "created_at": "2025-11-02T14:30:00Z",
  "created_at_epoch": 1730557800000
}
You can also use ToStringJ to cast numeric timestamp fields to strings if your destination requires string-typed date columns.

Null Handling

Use Transform / Filter Records to replace null values with defaults, preventing null-related errors at the destination.
// Transform / Filter Records logic -- apply default values for null fields
function transform(record) {
  if (record.value.region === null || record.value.region === undefined) {
    record.value.region = "unknown";
  }
  if (record.value.amount === null || record.value.amount === undefined) {
    record.value.amount = 0.0;
  }
  return record;
}
Before (region is null):
{
  "order_id": 3002,
  "region": null,
  "amount": null,
  "status": "pending"
}
After (nulls replaced with defaults):
{
  "order_id": 3002,
  "region": "unknown",
  "amount": 0.0,
  "status": "pending"
}
If your destination supports nullable columns, you can also use MarkColumnsAsOptional to allow null values through without replacing them. Choose between default values and nullable columns based on your downstream query requirements.

See Also