> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamkap.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka Direct

> Consume messages from an external Kafka cluster and stream them through Streamkap pipelines.

Kafka Direct lets you consume messages from an external Kafka cluster and stream them through Streamkap pipelines to your destinations. Unlike CDC-based sources that capture changes from a database transaction log, Kafka Direct simply reads messages from the specified Kafka topics and forwards them downstream.

<Note>
  Kafka Direct is for **pulling data into** Streamkap from an external Kafka cluster. If you need to **push data to** Streamkap's internal Kafka, see [Kafka (Writing)](/write-to-kafka). If you need to **replicate topics from** Streamkap to another Kafka cluster, see [Kafka (Push)](/kafka-to-kafka).
</Note>

## Prerequisites

* An external Kafka cluster accessible from Streamkap
* Kafka bootstrap server URLs (external listener addresses, not internal) e.g. `broker-1.mycompany.com:9094,broker-2.mycompany.com:9094`
* The names of the Kafka topics you want to consume
* Configure one of the [Connection Options](/connection-options) to ensure Streamkap can reach your Kafka cluster

## Supported Platforms

Kafka Direct works with any Kafka-compatible cluster, including:

| Platform                                 | Notes                                                                                                           |
| ---------------------------------------- | --------------------------------------------------------------------------------------------------------------- |
| **Apache Kafka**                         | Open-source Apache Kafka clusters (self-hosted or managed)                                                      |
| **Confluent Cloud / Confluent Platform** | Fully compatible; use SASL\_SSL authentication with API key and secret                                          |
| **Amazon MSK**                           | Amazon Managed Streaming for Apache Kafka; supports IAM and SASL\_SSL authentication                            |
| **Azure Event Hubs (Kafka protocol)**    | Azure Event Hubs exposes a Kafka-compatible endpoint; use the Event Hubs connection string as the SASL password |
| **Redpanda**                             | Kafka API-compatible streaming platform; works with standard Kafka authentication methods                       |

## Authentication

Kafka Direct supports several authentication mechanisms to connect securely to your external Kafka cluster. Configure authentication in the connector settings when creating or editing your Kafka Direct source.

### SASL\_PLAIN

Username and password authentication transmitted in plaintext. Suitable when the connection is already secured by a private network or VPN.

* **Security protocol:** `SASL_PLAINTEXT`
* **SASL mechanism:** `PLAIN`
* Provide the **username** and **password** in the connector settings

<Warning>
  SASL\_PLAIN transmits credentials in cleartext. Only use this mechanism over trusted, private networks. For production environments, prefer SASL\_SSL.
</Warning>

### SASL\_SSL

Username and password authentication over a TLS-encrypted connection. This is the recommended authentication method for most managed Kafka services (Confluent Cloud, Amazon MSK, etc.).

* **Security protocol:** `SASL_SSL`
* **SASL mechanism:** `PLAIN`, `SCRAM-SHA-256`, or `SCRAM-SHA-512` depending on your cluster configuration
* Provide the **username** and **password** in the connector settings

### SSL / mTLS (Mutual TLS)

Client certificate authentication where both the client and broker verify each other's identity using TLS certificates. Use this when your Kafka cluster requires certificate-based authentication.

* **Security protocol:** `SSL`
* Provide the **client certificate**, **client key**, and **CA certificate** in the connector settings

<Info>
  For detailed information on network connectivity options (SSH tunnels, VPN, AWS PrivateLink, IP allowlisting), see [Connection Options](/connection-options).
</Info>

## Key Differences from CDC Sources

Kafka Direct behaves differently from CDC-based sources (such as PostgreSQL, MySQL, or MongoDB):

* **No transaction log** -- Kafka Direct reads messages directly from Kafka topics rather than tailing a database transaction log.
* **No snapshots** -- There is no initial snapshot or backfill mechanism. Consumption starts based on the configured offset policy.
* **No heartbeats** -- Heartbeat monitoring is not applicable to Kafka Direct sources.
* **Topic naming** -- Kafka Direct uses the user-defined `topic.prefix` value (not the `source_{id}` prefix used by CDC sources).

***

## Streamkap Setup

Follow these steps to configure your new connector:

### 1. Create the Source

* Navigate to [Add Connectors](https://app.streamkap.com/connectors/add?tab=Sources).
* Choose **Kafka Direct**.

### 2. Connection Settings

* **Name**: Enter a unique and memorable name for your connector.

* **Kafka Bootstrap Servers**: A comma-separated list of `host:port` pairs for the external Kafka brokers, such as `broker1.kafka.company.com:9092,broker2.kafka.company.com:9092`.

* **Topic Prefix**: A prefix applied to topic names inside Streamkap. This helps organise and identify topics originating from this source within your Streamkap pipelines.

### 3. Topic Selection

* **Topics to include**: A comma-separated list of the external Kafka topic names you want to consume, such as `orders,events,user-activity`.

### 4. Format Settings

* **Format**: The message format of the external Kafka topics.
  * **string** (default) -- Messages are treated as plain strings.
  * **json** -- Messages are parsed as JSON.

* **Schemas Enable** (only when Format is set to **json**): Toggle this to enable schema support. When enabled, Streamkap uses its internal Schema Registry to validate and enforce message structure.

<Info>
  Enabling schema support is recommended when your JSON messages have a consistent structure. Streamkap's internal Schema Registry handles schema management automatically — you do not need to provide an external Schema Registry URL.
</Info>

Click **Save**.

## Consumer Group Configuration

Streamkap manages consumer groups automatically for Kafka Direct sources. Each Kafka Direct connector uses a dedicated consumer group to track its position (offset) in each topic partition.

<Note>
  The consumer group ID is managed by Streamkap and is tied to the connector instance. If you delete and recreate a Kafka Direct source, a new consumer group is created and consumption restarts according to the configured offset policy. If you need to reset offsets for an existing connector, contact Streamkap support.
</Note>

Key considerations:

* **Offset policy**: When the connector starts for the first time (or when the consumer group has no committed offsets), it uses the configured offset policy to determine where to begin reading. Typical options are `earliest` (read from the beginning of the topic) or `latest` (read only new messages).
* **Parallel consumption**: Streamkap assigns consumer group members to topic partitions automatically. The level of parallelism is determined by the number of partitions in the source topic.
* **Rebalancing**: If partitions are added to a source topic, the consumer group rebalances automatically to include the new partitions.

## Adding Topics Programmatically

You can add topics to an existing Kafka Direct source via the API or Terraform using the `topic.include.list.user.defined` parameter (Terraform: `topic_include_list_user_defined`).

This lets you append topics to the source without modifying the original topic list configured through the UI.

<Tip>
  See [Terraform Resources](/terraform-resources) for a full Kafka Direct Terraform example.
</Tip>

## Troubleshooting

<AccordionGroup>
  <Accordion title="Connection refused or timeout">
    **Symptoms:** The connector fails to start or reports connection errors referencing broker addresses.

    **Possible Causes:**

    * Firewall rules are blocking traffic from Streamkap to the Kafka brokers
    * The broker address or port is incorrect (e.g., using an internal DNS name instead of an external listener)
    * The Kafka broker port is not open or the broker is not running

    **Resolution:**

    1. Verify that the bootstrap server addresses use externally reachable hostnames and the correct listener port
    2. Ensure your firewall or security group allows inbound connections from Streamkap's IP addresses (see [Connection Options](/connection-options))
    3. Test connectivity to the broker address and port from a machine on the same network as Streamkap
  </Accordion>

  <Accordion title="Authentication failure">
    **Symptoms:** The connector fails with SASL authentication errors or SSL handshake failures.

    **Possible Causes:**

    * Incorrect username or password
    * Wrong SASL mechanism selected (e.g., using `PLAIN` when the broker expects `SCRAM-SHA-256`)
    * Expired or invalid client certificates (for mTLS)
    * The security protocol does not match the broker's listener configuration

    **Resolution:**

    1. Verify that the username, password, and SASL mechanism match the broker configuration
    2. For mTLS, ensure the client certificate and key are valid and not expired
    3. Confirm the security protocol (`SASL_PLAINTEXT`, `SASL_SSL`, or `SSL`) matches the Kafka listener you are connecting to
  </Accordion>

  <Accordion title="No messages received">
    **Symptoms:** The connector is running but no data appears in the pipeline or destination.

    **Possible Causes:**

    * The topic name is incorrect or misspelled
    * The topic has no new messages and the offset policy is set to `latest`
    * The consumer group has already consumed all available messages
    * The topic has no partitions with data

    **Resolution:**

    1. Verify the topic names in the connector configuration match the exact topic names on the Kafka cluster
    2. Check the offset policy -- if set to `latest`, only new messages produced after the connector started will be consumed
    3. Confirm that the source Kafka cluster is actively producing messages to the configured topics
    4. Inspect the topic partitions on the source cluster to verify they contain data
  </Accordion>

  <Accordion title="Schema or format errors">
    **Symptoms:** Messages are routed to the DLQ or the connector reports deserialization errors.

    **Possible Causes:**

    * The selected format (string or JSON) does not match the actual message format on the topic
    * Schema support is enabled but messages do not conform to a consistent JSON structure
    * Messages contain unsupported or malformed data

    **Resolution:**

    1. Verify that the **Format** setting matches the actual format of messages on the source topic
    2. If using JSON with schema support enabled, ensure all messages on the topic share a consistent structure
    3. Inspect a sample of messages on the source topic to confirm the format before configuring the connector
    4. Check the [DLQ](/dlq-operations) for detailed error information on failed messages
  </Accordion>
</AccordionGroup>
