Pipeline Transforms (Flink)
Pipeline transforms run in the Apache Flink streaming layer and are defined by you. They execute in the order you specify. See Transform Ordering for sequencing guidance.Filter
Use Transform / Filter Records to keep or discard records based on custom logic. In this example, only orders withstatus equal to "completed" pass through.
Before — input record:
Enrich
Use Enrich (Async) to call an external REST API and add data to each record. In this example, a product record is enriched with inventory data from a warehouse API. Before — input record:warehouse_stock, warehouse_location, last_restocked) were returned by the external API and merged into the record.
Fan Out
Use Fan Out to route a single input record to different output topics based on field values. In this example, event records are routed to separate topics by theirevent_type field.
Before — input record:
events.purchases output topic:
"event_type": "page_view" would be routed to a different output topic (e.g., events.page_views) based on the topic transform logic. The record data itself may remain unchanged — it is the output topic that changes.
Destination-Side Transforms (SMTs)
Destination-side transforms run automatically on the destination connector as Single Message Transforms (SMTs). They execute in a fixed system-managed order after all pipeline transforms. See Transform Ordering for details on execution sequence.ToJsonJ / ToJsonbJ
Converts a structured field (nested object, map, or array) into a JSON string representation. This is useful when your destination column expects a flat JSON string rather than a nested structure. Before — record with a nestedaddress object:
address field converted to a JSON string:
address field is now a single string containing the JSON representation of the original nested object.
ToJsonJ and ToJsonbJ both convert structured fields to JSON strings. The ToJsonbJ variant produces PostgreSQL-compatible JSONB output. The differences may vary by destination — choose the variant that matches your destination’s expected column type.
ToIntJ / ToFloatJ / ToStringJ
Casts a field value from one type to another. Use these transforms when the source data type does not match the destination column type. ToIntJ — string to integer: Before:quantity field changed from the string "42" to the integer 42.
ToFloatJ — string to float:
Before:
total_price field changed from the string "149.95" to the float 149.95.
ToStringJ — integer to string:
Before:
zip_code field changed from the integer 62704 to the string "62704".
RenameFields
Renames one or more fields to match destination naming conventions or resolve naming conflicts. Before — original field names:custId to customer_id, custName to customer_name, and custEmail to customer_email. The created field was not included in the rename configuration and remains unchanged.
DropFields
Removes specified fields from the record before it reaches the destination. Use this to exclude sensitive data, internal metadata, or fields that are not needed in the destination. Before — record with internal metadata fields:__ were dropped from the record. The remaining fields are delivered to the destination unchanged.
CopyField
Copies the value of an existing field into a new field. The original field is preserved. Before — record with a singleemail field:
email copied to contact_email:
email field remains intact. A new contact_email field was added with the same value. This is useful when a destination requires the same data under a different field name while the original must also be preserved.
RenameKeyFields
Renames one or more fields in the message key (as opposed to the record value). This is useful when the destination expects specific key field names for primary key matching or partitioning, but the source key field names differ. Before — original key field names:id was renamed to order_id. The record value is unchanged — only the message key is affected.
AddStringSuffix
Appends a suffix string to the value of a specified field. This is useful for adding units, environment tags, or other fixed suffixes to field values before they reach the destination. Before — field without suffix:region field with "-prod" suffix appended:
"-prod" was appended to the region field value. All other fields remain unchanged.
StringReplace
Replaces occurrences of a pattern within string field values. Use this when you need to normalize or sanitize field values before they reach the destination. Before — field with unwanted characters:phone field with non-digit characters replaced:
phone field was replaced with empty strings, leaving only digits. The transform uses regex-based pattern matching.
HeaderToFieldCustom
Copies a Kafka message header value into a record field. This is useful when metadata is carried in Kafka headers (e.g., source connector name, event timestamp, or correlation IDs) and you need it available as a regular field in the destination. Before — record without header data in fields:source_connector header value was copied into the record value as a new field. The original header remains on the message.
MarkColumnsAsOptional (ToOptional)
Marks specified columns as optional (nullable) in the schema. This is useful when your source schema defines fields asNOT NULL, but the destination schema requires them to be nullable — for example, when certain columns are only populated for some record types, or when schema evolution introduces new columns that existing rows do not have values for.
Before — schema marks middle_name as required (non-null):
middle_name marked as optional (nullable):
middle_name field changed from "optional": false to "optional": true. The payload data is unchanged — only the schema metadata is modified. This prevents schema enforcement errors at the destination when a record has a null value for that field.
Pipeline Transform Examples
Join
Use Join to combine records from two or more input topics into a single output record using SQL join logic. In this example, anorders topic is joined with a customers topic on a shared customer_id field.
Input — record from orders topic:
customers topic:
customer_id = 55 and merged the customer fields into the order record. The output field names (customer_name, customer_email, customer_tier) are defined in the join SQL logic.
Chained Transforms
Pipeline transforms execute sequentially — each transform receives the output of the previous one. This example shows data flowing through three transforms in order: Filter, Enrich, and Transform / Filter Records (used as a map). Stage 0 — original input record:amount (249.99) exceeds the threshold. Orders with amount <= 100 are dropped at this stage and never reach subsequent transforms.
Stage 2 — after Enrich (add customer tier from cached topic):
customer_tier field was added by looking up customer_id = 55 in the cached customers topic.
Stage 3 — after Transform / Filter Records (compute a new field):
priority field: orders from "gold" tier customers with amount > 200 are tagged "high". This field was only possible to compute because the enrichment step added customer_tier first.
For guidance on ordering pipeline transforms, see Transform Ordering.
Real-World Patterns
These patterns show how to combine transforms to solve common data engineering challenges.PII Masking
Use DropFields to remove sensitive fields before data reaches the destination, or use Transform / Filter Records to mask values while preserving the field structure. Approach 1 — Drop sensitive fields entirely with DropFields: Configure DropFields to remove fields such asssn, credit_card_number, and date_of_birth. The fields are stripped from the record before it lands in the destination.
Before:
Date Formatting
Use Transform / Filter Records to convert timestamps between formats. This is common when the source produces ISO 8601 timestamps but the destination expects epoch milliseconds, or when you need to extract date parts.Null Handling
Use Transform / Filter Records to replace null values with defaults, preventing null-related errors at the destination.See Also
- Transform Types Overview — All available transform types
- Transform Ordering — How pipeline and destination-side transforms are sequenced
- Transform / Filter Records — Setup and implementation for filtering
- Enrich — SQL-based cached topic enrichment
- Enrich (Async) — REST API enrichment
- Join — Multi-topic joins
- Fan Out — Record routing to multiple output topics
- Pipelines — How transforms integrate into pipelines