> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamkap.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Transform Examples

> Practical before-and-after examples for pipeline transforms and destination-side transforms.

This page provides concrete data examples for each transform type. For setup instructions and configuration details, see the individual transform pages linked in each section.

## Pipeline Transforms (Flink)

Pipeline transforms run in the Apache Flink streaming layer and are defined by you. They execute in the order you specify. See [Transform Ordering](/transform-ordering) for sequencing guidance.

### Filter

Use [Transform / Filter Records](/transform-filter-records) to keep or discard records based on custom logic. In this example, only orders with `status` equal to `"completed"` pass through.

**Before -- input record:**

```json theme={null}
{
  "key": { "id": 1001 },
  "value": {
    "id": 1001,
    "customer_id": 55,
    "status": "completed",
    "amount": 249.99,
    "created_at": "2025-11-02T14:30:00Z"
  }
}
```

**After -- record passes the filter (status is "completed"):**

```json theme={null}
{
  "key": { "id": 1001 },
  "value": {
    "id": 1001,
    "customer_id": 55,
    "status": "completed",
    "amount": 249.99,
    "created_at": "2025-11-02T14:30:00Z"
  }
}
```

A record that does **not** match the filter is dropped entirely and never reaches downstream transforms or the destination.

**Dropped -- record does not pass the filter (status is "pending"):**

```json theme={null}
{
  "key": { "id": 1002 },
  "value": {
    "id": 1002,
    "customer_id": 78,
    "status": "pending",
    "amount": 59.00,
    "created_at": "2025-11-02T15:10:00Z"
  }
}
```

This record is discarded. No output is produced for it.

***

### Enrich

Use [Enrich (Async)](/transform-enrich-async) to call an external REST API and add data to each record. In this example, a product record is enriched with inventory data from a warehouse API.

**Before -- input record:**

```json theme={null}
{
  "key": { "product_id": "SKU-4421" },
  "value": {
    "product_id": "SKU-4421",
    "name": "Wireless Keyboard",
    "category": "peripherals",
    "price": 79.99
  }
}
```

**After -- enriched with external API data:**

```json theme={null}
{
  "key": { "product_id": "SKU-4421" },
  "value": {
    "product_id": "SKU-4421",
    "name": "Wireless Keyboard",
    "category": "peripherals",
    "price": 79.99,
    "warehouse_stock": 342,
    "warehouse_location": "US-WEST-2",
    "last_restocked": "2025-10-28T08:00:00Z"
  }
}
```

The three new fields (`warehouse_stock`, `warehouse_location`, `last_restocked`) were returned by the external API and merged into the record.

<Tip>
  You can also use the SQL-based [Enrich](/transform-enrich) transform to look up values from a cached Kafka topic instead of calling an external API.
</Tip>

***

### Fan Out

Use [Fan Out](/transform-fanout) to route a single input record to different output topics based on field values. In this example, event records are routed to separate topics by their `event_type` field.

**Before -- input record:**

```json theme={null}
{
  "key": { "event_id": "evt-9001" },
  "value": {
    "event_id": "evt-9001",
    "user_id": 204,
    "event_type": "purchase",
    "item": "Annual Plan",
    "amount": 199.00,
    "timestamp": "2025-11-03T09:22:00Z"
  }
}
```

**After -- routed to the `events.purchases` output topic:**

```json theme={null}
{
  "key": { "event_id": "evt-9001" },
  "value": {
    "event_id": "evt-9001",
    "user_id": 204,
    "event_type": "purchase",
    "item": "Annual Plan",
    "amount": 199.00,
    "timestamp": "2025-11-03T09:22:00Z"
  }
}
```

A different record with `"event_type": "page_view"` would be routed to a different output topic (e.g., `events.page_views`) based on the topic transform logic. The record data itself may remain unchanged -- it is the **output topic** that changes.

***

## Destination-Side Transforms (SMTs)

Destination-side transforms run automatically on the destination connector as Single Message Transforms (SMTs). They execute in a fixed system-managed order after all pipeline transforms. See [Transform Ordering](/transform-ordering) for details on execution sequence.

### ToJsonJ / ToJsonbJ

Converts a structured field (nested object, map, or array) into a JSON string representation. This is useful when your destination column expects a flat JSON string rather than a nested structure.

**Before -- record with a nested `address` object:**

```json theme={null}
{
  "customer_id": 55,
  "name": "Alice Martin",
  "address": {
    "street": "742 Evergreen Terrace",
    "city": "Springfield",
    "state": "IL",
    "zip": "62704"
  }
}
```

**After -- `address` field converted to a JSON string:**

```json theme={null}
{
  "customer_id": 55,
  "name": "Alice Martin",
  "address": "{\"street\":\"742 Evergreen Terrace\",\"city\":\"Springfield\",\"state\":\"IL\",\"zip\":\"62704\"}"
}
```

The `address` field is now a single string containing the JSON representation of the original nested object.

<Info>
  ToJsonJ and ToJsonbJ both convert structured fields to JSON strings. The ToJsonbJ variant produces PostgreSQL-compatible JSONB output. The differences may vary by destination -- choose the variant that matches your destination's expected column type.
</Info>

***

### ToIntJ / ToFloatJ / ToStringJ

Casts a field value from one type to another. Use these transforms when the source data type does not match the destination column type.

**ToIntJ -- string to integer:**

Before:

```json theme={null}
{
  "order_id": 3001,
  "quantity": "42",
  "product": "Widget"
}
```

After:

```json theme={null}
{
  "order_id": 3001,
  "quantity": 42,
  "product": "Widget"
}
```

The `quantity` field changed from the string `"42"` to the integer `42`.

**ToFloatJ -- string to float:**

Before:

```json theme={null}
{
  "order_id": 3001,
  "total_price": "149.95",
  "currency": "USD"
}
```

After:

```json theme={null}
{
  "order_id": 3001,
  "total_price": 149.95,
  "currency": "USD"
}
```

The `total_price` field changed from the string `"149.95"` to the float `149.95`.

**ToStringJ -- integer to string:**

Before:

```json theme={null}
{
  "order_id": 3001,
  "zip_code": 62704,
  "city": "Springfield"
}
```

After:

```json theme={null}
{
  "order_id": 3001,
  "zip_code": "62704",
  "city": "Springfield"
}
```

The `zip_code` field changed from the integer `62704` to the string `"62704"`.

***

### RenameFields

Renames one or more fields to match destination naming conventions or resolve naming conflicts.

**Before -- original field names:**

```json theme={null}
{
  "custId": 55,
  "custName": "Alice Martin",
  "custEmail": "alice@example.com",
  "created": "2025-11-01T10:00:00Z"
}
```

**After -- fields renamed to match destination schema:**

```json theme={null}
{
  "customer_id": 55,
  "customer_name": "Alice Martin",
  "customer_email": "alice@example.com",
  "created": "2025-11-01T10:00:00Z"
}
```

Three fields were renamed: `custId` to `customer_id`, `custName` to `customer_name`, and `custEmail` to `customer_email`. The `created` field was not included in the rename configuration and remains unchanged.

***

### DropFields

Removes specified fields from the record before it reaches the destination. Use this to exclude sensitive data, internal metadata, or fields that are not needed in the destination.

**Before -- record with internal metadata fields:**

```json theme={null}
{
  "order_id": 3001,
  "customer_id": 55,
  "amount": 249.99,
  "status": "completed",
  "__source_ts_ms": 1730500200000,
  "__deleted": false,
  "__op": "c"
}
```

**After -- internal metadata fields removed:**

```json theme={null}
{
  "order_id": 3001,
  "customer_id": 55,
  "amount": 249.99,
  "status": "completed"
}
```

The three fields prefixed with `__` were dropped from the record. The remaining fields are delivered to the destination unchanged.

***

### CopyField

Copies the value of an existing field into a new field. The original field is preserved.

**Before -- record with a single `email` field:**

```json theme={null}
{
  "user_id": 204,
  "email": "alice@example.com",
  "name": "Alice Martin",
  "role": "admin"
}
```

**After -- `email` copied to `contact_email`:**

```json theme={null}
{
  "user_id": 204,
  "email": "alice@example.com",
  "name": "Alice Martin",
  "role": "admin",
  "contact_email": "alice@example.com"
}
```

The original `email` field remains intact. A new `contact_email` field was added with the same value. This is useful when a destination requires the same data under a different field name while the original must also be preserved.

***

### RenameKeyFields

Renames one or more fields in the **message key** (as opposed to the record value). This is useful when the destination expects specific key field names for primary key matching or partitioning, but the source key field names differ.

**Before -- original key field names:**

```json theme={null}
{
  "key": { "id": 1001 },
  "value": {
    "id": 1001,
    "customer_id": 55,
    "status": "completed"
  }
}
```

**After -- key field renamed to match destination convention:**

```json theme={null}
{
  "key": { "order_id": 1001 },
  "value": {
    "id": 1001,
    "customer_id": 55,
    "status": "completed"
  }
}
```

The key field `id` was renamed to `order_id`. The record value is unchanged -- only the message key is affected.

***

### AddStringSuffix

Appends a suffix string to the value of a specified field. This is useful for adding units, environment tags, or other fixed suffixes to field values before they reach the destination.

**Before -- field without suffix:**

```json theme={null}
{
  "order_id": 3001,
  "region": "us-east",
  "status": "completed"
}
```

**After -- `region` field with `"-prod"` suffix appended:**

```json theme={null}
{
  "order_id": 3001,
  "region": "us-east-prod",
  "status": "completed"
}
```

The suffix `"-prod"` was appended to the `region` field value. All other fields remain unchanged.

***

### StringReplace

Replaces occurrences of a pattern within string field values. Use this when you need to normalize or sanitize field values before they reach the destination.

**Before -- field with unwanted characters:**

```json theme={null}
{
  "user_id": 204,
  "phone": "+1 (555) 123-4567",
  "name": "Alice Martin"
}
```

**After -- `phone` field with non-digit characters replaced:**

```json theme={null}
{
  "user_id": 204,
  "phone": "15551234567",
  "name": "Alice Martin"
}
```

The pattern matching non-digit characters in the `phone` field was replaced with empty strings, leaving only digits. The transform uses regex-based pattern matching.

***

### HeaderToFieldCustom

Copies a Kafka message header value into a record field. This is useful when metadata is carried in Kafka headers (e.g., source connector name, event timestamp, or correlation IDs) and you need it available as a regular field in the destination.

**Before -- record without header data in fields:**

```json theme={null}
{
  "key": { "id": 1001 },
  "headers": {
    "source_connector": "mysql-prod-01",
    "event_time": "2025-11-03T09:22:00Z"
  },
  "value": {
    "id": 1001,
    "amount": 249.99,
    "status": "completed"
  }
}
```

**After -- header value copied into a record field:**

```json theme={null}
{
  "key": { "id": 1001 },
  "value": {
    "id": 1001,
    "amount": 249.99,
    "status": "completed",
    "source_connector": "mysql-prod-01"
  }
}
```

The `source_connector` header value was copied into the record value as a new field. The original header remains on the message.

***

### MarkColumnsAsOptional (ToOptional)

Marks specified columns as optional (nullable) in the schema. This is useful when your source schema defines fields as `NOT NULL`, but the destination schema requires them to be nullable -- for example, when certain columns are only populated for some record types, or when schema evolution introduces new columns that existing rows do not have values for.

**Before -- schema marks `middle_name` as required (non-null):**

```json theme={null}
{
  "schema": {
    "fields": [
      { "field": "user_id", "type": "int32", "optional": false },
      { "field": "first_name", "type": "string", "optional": false },
      { "field": "middle_name", "type": "string", "optional": false },
      { "field": "last_name", "type": "string", "optional": false }
    ]
  },
  "payload": {
    "user_id": 204,
    "first_name": "Alice",
    "middle_name": "Marie",
    "last_name": "Martin"
  }
}
```

**After -- `middle_name` marked as optional (nullable):**

```json theme={null}
{
  "schema": {
    "fields": [
      { "field": "user_id", "type": "int32", "optional": false },
      { "field": "first_name", "type": "string", "optional": false },
      { "field": "middle_name", "type": "string", "optional": true },
      { "field": "last_name", "type": "string", "optional": false }
    ]
  },
  "payload": {
    "user_id": 204,
    "first_name": "Alice",
    "middle_name": "Marie",
    "last_name": "Martin"
  }
}
```

The `middle_name` field changed from `"optional": false` to `"optional": true`. The payload data is unchanged -- only the schema metadata is modified. This prevents schema enforcement errors at the destination when a record has a `null` value for that field.

***

## Pipeline Transform Examples

### Join

Use [Join](/transform-join) to combine records from two or more input topics into a single output record using SQL join logic. In this example, an `orders` topic is joined with a `customers` topic on a shared `customer_id` field.

**Input -- record from `orders` topic:**

```json theme={null}
{
  "key": { "order_id": 3001 },
  "value": {
    "order_id": 3001,
    "customer_id": 55,
    "amount": 249.99,
    "status": "completed",
    "created_at": "2025-11-02T14:30:00Z"
  }
}
```

**Input -- record from `customers` topic:**

```json theme={null}
{
  "key": { "customer_id": 55 },
  "value": {
    "customer_id": 55,
    "name": "Alice Martin",
    "email": "alice@example.com",
    "tier": "gold"
  }
}
```

**After -- joined output record:**

```json theme={null}
{
  "key": { "order_id": 3001 },
  "value": {
    "order_id": 3001,
    "customer_id": 55,
    "amount": 249.99,
    "status": "completed",
    "created_at": "2025-11-02T14:30:00Z",
    "customer_name": "Alice Martin",
    "customer_email": "alice@example.com",
    "customer_tier": "gold"
  }
}
```

The join matched records from both topics on `customer_id = 55` and merged the customer fields into the order record. The output field names (`customer_name`, `customer_email`, `customer_tier`) are defined in the join SQL logic.

<Tip>
  Join transforms use SQL syntax to define the join condition and output fields. See [Join](/transform-join) for setup instructions and SQL examples.
</Tip>

***

### Chained Transforms

Pipeline transforms execute sequentially -- each transform receives the output of the previous one. This example shows data flowing through three transforms in order: **Filter**, **Enrich**, and **Transform / Filter Records** (used as a map).

**Stage 0 -- original input record:**

```json theme={null}
{
  "key": { "order_id": 3001 },
  "value": {
    "order_id": 3001,
    "customer_id": 55,
    "amount": 249.99,
    "status": "completed",
    "created_at": "2025-11-02T14:30:00Z"
  }
}
```

**Stage 1 -- after Filter (keep only orders with amount > 100):**

```json theme={null}
{
  "key": { "order_id": 3001 },
  "value": {
    "order_id": 3001,
    "customer_id": 55,
    "amount": 249.99,
    "status": "completed",
    "created_at": "2025-11-02T14:30:00Z"
  }
}
```

Record passes through because `amount` (249.99) exceeds the threshold. Orders with `amount <= 100` are dropped at this stage and never reach subsequent transforms.

**Stage 2 -- after Enrich (add customer tier from cached topic):**

```json theme={null}
{
  "key": { "order_id": 3001 },
  "value": {
    "order_id": 3001,
    "customer_id": 55,
    "amount": 249.99,
    "status": "completed",
    "created_at": "2025-11-02T14:30:00Z",
    "customer_tier": "gold"
  }
}
```

The `customer_tier` field was added by looking up `customer_id = 55` in the cached customers topic.

**Stage 3 -- after Transform / Filter Records (compute a new field):**

```json theme={null}
{
  "key": { "order_id": 3001 },
  "value": {
    "order_id": 3001,
    "customer_id": 55,
    "amount": 249.99,
    "status": "completed",
    "created_at": "2025-11-02T14:30:00Z",
    "customer_tier": "gold",
    "priority": "high"
  }
}
```

The transform logic computed a `priority` field: orders from `"gold"` tier customers with `amount > 200` are tagged `"high"`. This field was only possible to compute because the enrichment step added `customer_tier` first.

<Info>
  For guidance on ordering pipeline transforms, see [Transform Ordering](/transform-ordering).
</Info>

***

## Real-World Patterns

These patterns show how to combine transforms to solve common data engineering challenges.

### PII Masking

Use **DropFields** to remove sensitive fields before data reaches the destination, or use **Transform / Filter Records** to mask values while preserving the field structure.

**Approach 1 -- Drop sensitive fields entirely with DropFields:**

Configure DropFields to remove fields such as `ssn`, `credit_card_number`, and `date_of_birth`. The fields are stripped from the record before it lands in the destination.

**Before:**

```json theme={null}
{
  "user_id": 204,
  "name": "Alice Martin",
  "email": "alice@example.com",
  "ssn": "123-45-6789",
  "credit_card_number": "4111111111111111"
}
```

**After (DropFields removes ssn and credit\_card\_number):**

```json theme={null}
{
  "user_id": 204,
  "name": "Alice Martin",
  "email": "alice@example.com"
}
```

**Approach 2 -- Mask field values with Transform / Filter Records:**

Use a JavaScript-based Filter transform to replace sensitive values with masked versions while keeping the field present.

```javascript theme={null}
// Transform / Filter Records logic
function transform(record) {
  if (record.value.email) {
    var parts = record.value.email.split("@");
    record.value.email = parts[0].substring(0, 2) + "***@" + parts[1];
  }
  if (record.value.ssn) {
    record.value.ssn = "***-**-" + record.value.ssn.slice(-4);
  }
  return record;
}
```

**Before:**

```json theme={null}
{
  "user_id": 204,
  "email": "alice@example.com",
  "ssn": "123-45-6789"
}
```

**After (values masked, fields preserved):**

```json theme={null}
{
  "user_id": 204,
  "email": "al***@example.com",
  "ssn": "***-**-6789"
}
```

<Warning>
  PII masking should be applied as early as possible in your pipeline to prevent sensitive data from reaching intermediate storage. Use pipeline transforms (Flink) rather than destination-side transforms when possible.
</Warning>

### Date Formatting

Use **Transform / Filter Records** to convert timestamps between formats. This is common when the source produces ISO 8601 timestamps but the destination expects epoch milliseconds, or when you need to extract date parts.

```javascript theme={null}
// Transform / Filter Records logic -- convert ISO timestamp to epoch milliseconds
function transform(record) {
  if (record.value.created_at) {
    record.value.created_at_epoch = new Date(record.value.created_at).getTime();
  }
  return record;
}
```

**Before:**

```json theme={null}
{
  "order_id": 3001,
  "created_at": "2025-11-02T14:30:00Z"
}
```

**After (new epoch field added):**

```json theme={null}
{
  "order_id": 3001,
  "created_at": "2025-11-02T14:30:00Z",
  "created_at_epoch": 1730557800000
}
```

You can also use **ToStringJ** to cast numeric timestamp fields to strings if your destination requires string-typed date columns.

### Null Handling

Use **Transform / Filter Records** to replace null values with defaults, preventing null-related errors at the destination.

```javascript theme={null}
// Transform / Filter Records logic -- apply default values for null fields
function transform(record) {
  if (record.value.region === null || record.value.region === undefined) {
    record.value.region = "unknown";
  }
  if (record.value.amount === null || record.value.amount === undefined) {
    record.value.amount = 0.0;
  }
  return record;
}
```

**Before (region is null):**

```json theme={null}
{
  "order_id": 3002,
  "region": null,
  "amount": null,
  "status": "pending"
}
```

**After (nulls replaced with defaults):**

```json theme={null}
{
  "order_id": 3002,
  "region": "unknown",
  "amount": 0.0,
  "status": "pending"
}
```

<Tip>
  If your destination supports nullable columns, you can also use **MarkColumnsAsOptional** to allow null values through without replacing them. Choose between default values and nullable columns based on your downstream query requirements.
</Tip>

***

## See Also

* [Transform Types Overview](/transforms) -- All available transform types
* [Transform Ordering](/transform-ordering) -- How pipeline and destination-side transforms are sequenced
* [Transform / Filter Records](/transform-filter-records) -- Setup and implementation for filtering
* [Enrich](/transform-enrich) -- SQL-based cached topic enrichment
* [Enrich (Async)](/transform-enrich-async) -- REST API enrichment
* [Join](/transform-join) -- Multi-topic joins
* [Fan Out](/transform-fanout) -- Record routing to multiple output topics
* [Pipelines](/pipelines) -- How transforms integrate into pipelines
