Skip to main content
Kafka Direct lets you consume messages from an external Kafka cluster and stream them through Streamkap pipelines to your destinations. Unlike CDC-based sources that capture changes from a database transaction log, Kafka Direct simply reads messages from the specified Kafka topics and forwards them downstream.
Kafka Direct is for pulling data into Streamkap from an external Kafka cluster. If you need to push data to Streamkap’s internal Kafka, see Kafka (Writing). If you need to replicate topics from Streamkap to another Kafka cluster, see Kafka (Push).

Prerequisites

  • An external Kafka cluster accessible from Streamkap
  • Kafka bootstrap server URLs (external listener addresses, not internal) e.g. broker-1.mycompany.com:9094,broker-2.mycompany.com:9094
  • The names of the Kafka topics you want to consume
  • Configure one of the Connection Options to ensure Streamkap can reach your Kafka cluster

Supported Platforms

Kafka Direct works with any Kafka-compatible cluster, including:
PlatformNotes
Apache KafkaOpen-source Apache Kafka clusters (self-hosted or managed)
Confluent Cloud / Confluent PlatformFully compatible; use SASL_SSL authentication with API key and secret
Amazon MSKAmazon Managed Streaming for Apache Kafka; supports IAM and SASL_SSL authentication
Azure Event Hubs (Kafka protocol)Azure Event Hubs exposes a Kafka-compatible endpoint; use the Event Hubs connection string as the SASL password
RedpandaKafka API-compatible streaming platform; works with standard Kafka authentication methods

Authentication

Kafka Direct supports several authentication mechanisms to connect securely to your external Kafka cluster. Configure authentication in the connector settings when creating or editing your Kafka Direct source.

SASL_PLAIN

Username and password authentication transmitted in plaintext. Suitable when the connection is already secured by a private network or VPN.
  • Security protocol: SASL_PLAINTEXT
  • SASL mechanism: PLAIN
  • Provide the username and password in the connector settings
SASL_PLAIN transmits credentials in cleartext. Only use this mechanism over trusted, private networks. For production environments, prefer SASL_SSL.

SASL_SSL

Username and password authentication over a TLS-encrypted connection. This is the recommended authentication method for most managed Kafka services (Confluent Cloud, Amazon MSK, etc.).
  • Security protocol: SASL_SSL
  • SASL mechanism: PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512 depending on your cluster configuration
  • Provide the username and password in the connector settings

SSL / mTLS (Mutual TLS)

Client certificate authentication where both the client and broker verify each other’s identity using TLS certificates. Use this when your Kafka cluster requires certificate-based authentication.
  • Security protocol: SSL
  • Provide the client certificate, client key, and CA certificate in the connector settings
For detailed information on network connectivity options (SSH tunnels, VPN, AWS PrivateLink, IP allowlisting), see Connection Options.

Key Differences from CDC Sources

Kafka Direct behaves differently from CDC-based sources (such as PostgreSQL, MySQL, or MongoDB):
  • No transaction log — Kafka Direct reads messages directly from Kafka topics rather than tailing a database transaction log.
  • No snapshots — There is no initial snapshot or backfill mechanism. Consumption starts based on the configured offset policy.
  • No heartbeats — Heartbeat monitoring is not applicable to Kafka Direct sources.
  • Topic naming — Kafka Direct uses the user-defined topic.prefix value (not the source_{id} prefix used by CDC sources).

Streamkap Setup

Follow these steps to configure your new connector:

1. Create the Source

2. Connection Settings

  • Name: Enter a unique and memorable name for your connector.
  • Kafka Bootstrap Servers: A comma-separated list of host:port pairs for the external Kafka brokers, such as broker1.kafka.company.com:9092,broker2.kafka.company.com:9092.
  • Topic Prefix: A prefix applied to topic names inside Streamkap. This helps organise and identify topics originating from this source within your Streamkap pipelines.

3. Topic Selection

  • Topics to include: A comma-separated list of the external Kafka topic names you want to consume, such as orders,events,user-activity.

4. Format Settings

  • Format: The message format of the external Kafka topics.
    • string (default) — Messages are treated as plain strings.
    • json — Messages are parsed as JSON.
  • Schemas Enable (only when Format is set to json): Toggle this to enable schema support. When enabled, Streamkap uses its internal Schema Registry to validate and enforce message structure.
Enabling schema support is recommended when your JSON messages have a consistent structure. Streamkap’s internal Schema Registry handles schema management automatically — you do not need to provide an external Schema Registry URL.
Click Save.

Consumer Group Configuration

Streamkap manages consumer groups automatically for Kafka Direct sources. Each Kafka Direct connector uses a dedicated consumer group to track its position (offset) in each topic partition.
The consumer group ID is managed by Streamkap and is tied to the connector instance. If you delete and recreate a Kafka Direct source, a new consumer group is created and consumption restarts according to the configured offset policy. If you need to reset offsets for an existing connector, contact Streamkap support.
Key considerations:
  • Offset policy: When the connector starts for the first time (or when the consumer group has no committed offsets), it uses the configured offset policy to determine where to begin reading. Typical options are earliest (read from the beginning of the topic) or latest (read only new messages).
  • Parallel consumption: Streamkap assigns consumer group members to topic partitions automatically. The level of parallelism is determined by the number of partitions in the source topic.
  • Rebalancing: If partitions are added to a source topic, the consumer group rebalances automatically to include the new partitions.

Adding Topics Programmatically

You can add topics to an existing Kafka Direct source via the API or Terraform using the topic.include.list.user.defined parameter (Terraform: topic_include_list_user_defined). This lets you append topics to the source without modifying the original topic list configured through the UI.
See Terraform Resources for a full Kafka Direct Terraform example.

Troubleshooting

Symptoms: The connector fails to start or reports connection errors referencing broker addresses.Possible Causes:
  • Firewall rules are blocking traffic from Streamkap to the Kafka brokers
  • The broker address or port is incorrect (e.g., using an internal DNS name instead of an external listener)
  • The Kafka broker port is not open or the broker is not running
Resolution:
  1. Verify that the bootstrap server addresses use externally reachable hostnames and the correct listener port
  2. Ensure your firewall or security group allows inbound connections from Streamkap’s IP addresses (see Connection Options)
  3. Test connectivity to the broker address and port from a machine on the same network as Streamkap
Symptoms: The connector fails with SASL authentication errors or SSL handshake failures.Possible Causes:
  • Incorrect username or password
  • Wrong SASL mechanism selected (e.g., using PLAIN when the broker expects SCRAM-SHA-256)
  • Expired or invalid client certificates (for mTLS)
  • The security protocol does not match the broker’s listener configuration
Resolution:
  1. Verify that the username, password, and SASL mechanism match the broker configuration
  2. For mTLS, ensure the client certificate and key are valid and not expired
  3. Confirm the security protocol (SASL_PLAINTEXT, SASL_SSL, or SSL) matches the Kafka listener you are connecting to
Symptoms: The connector is running but no data appears in the pipeline or destination.Possible Causes:
  • The topic name is incorrect or misspelled
  • The topic has no new messages and the offset policy is set to latest
  • The consumer group has already consumed all available messages
  • The topic has no partitions with data
Resolution:
  1. Verify the topic names in the connector configuration match the exact topic names on the Kafka cluster
  2. Check the offset policy — if set to latest, only new messages produced after the connector started will be consumed
  3. Confirm that the source Kafka cluster is actively producing messages to the configured topics
  4. Inspect the topic partitions on the source cluster to verify they contain data
Symptoms: Messages are routed to the DLQ or the connector reports deserialization errors.Possible Causes:
  • The selected format (string or JSON) does not match the actual message format on the topic
  • Schema support is enabled but messages do not conform to a consistent JSON structure
  • Messages contain unsupported or malformed data
Resolution:
  1. Verify that the Format setting matches the actual format of messages on the source topic
  2. If using JSON with schema support enabled, ensure all messages on the topic share a consistent structure
  3. Inspect a sample of messages on the source topic to confirm the format before configuring the connector
  4. Check the DLQ for detailed error information on failed messages