> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamkap.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka (Reading)

> Read directly from Kafka

<Info>
  Customers on a paid plan can enable direct access via Proxy.
</Info>

This guide shows you how to read messages from your Streamkap Kafka topics using Python or command-line tools.

## Creating Kafka Users

You can create and manage Kafka users through the Streamkap web interface at [Kafka Access](https://app.streamkap.com/kafka-access).

To create a new Kafka user, click the "Create User" button. This will open the user creation dialog where you can configure the user's permissions and access settings.

<Info>
  For detailed step-by-step instructions on creating and managing Kafka users through the UI, see the [Kafka Access](/kafka-access) documentation.
</Info>

### User Configuration

When creating a Kafka user, you'll need to configure:

* **Username**: Enter a lowercase username for the Kafka user
* **Password**: Set a secure password for authentication
* **Safe listed IPs**: Specify IP addresses or CIDR ranges that are allowed to connect
* **Kafka ACLs**: Configure access control lists to define what the user can do

### Access Control Lists (ACLs)

Kafka ACLs control what operations users can perform on specific resources. When creating a user, you can configure:

* **Resource Type**:
  * `TOPIC` - Controls access to Kafka topics
  * `GROUP` - Controls access to consumer groups

* **Operation**: The type of operation allowed (varies by resource type)<br />
  **For `TOPIC` resources:**

  * `ALL` - All operations
  * `WRITE` - Write/produce messages
  * `READ` - Read/consume messages
  * `ALTER` - Modify resource configurations
  * `ALTER_CONFIGS` - Modify resource configurations
  * `CREATE` - Create new resources
  * `DELETE` - Delete resources
  * `DESCRIBE` - View resource metadata
  * `DESCRIBE_CONFIGS` - View resource configurations

  **For `GROUP` resources (consumers only):**

  * `READ` - Join and consume from consumer group
  * `DELETE` - Delete consumer group
  * `DESCRIBE` - View consumer group metadata

* **Pattern Type**: How the resource name is matched
  * `LITERAL` - Exact match of the resource name
  * `PREFIXED` - Match resources with the specified prefix

* **Name**: The specific resource name or prefix to apply the ACL to

### Connection Details

Once a user is created, your endpoints are shown under "Proxy Endpoints". These endpoints follow the naming pattern:

`<service-name>-<kafka-username>.streamkap.net:PORT`

Where:

* `<service-name>` - Your Streamkap service/tenant name
* `<kafka-username>` - The Kafka user's username
* `PORT` - One of the available ports: 32400, 32401, or 32402

**Example proxy endpoints:**

* `my-service-kafka-user.streamkap.net:32400`
* `my-service-kafka-user.streamkap.net:32401`
* `my-service-kafka-user.streamkap.net:32402`

**Connection settings:**

* **Security protocol**: `SASL_SSL` (recommended for secure connections)
* **SASL mechanism**: `PLAIN`
* **Username/password**: As configured for the user

### Required Permissions

To read from Kafka topics, your user needs these ACL permissions:

**Essential permissions (always required):**

* **Resource Type**: `TOPIC` | **Operation**: `READ` | **Pattern Type**: `LITERAL` or `PREFIXED` | **Name**: Your topic name/prefix

**Consumer group permissions (required for Python only):**

* **Resource Type**: `GROUP` | **Operation**: `READ` | **Pattern Type**: `LITERAL` or `PREFIXED` | **Name**: Your consumer group ID

**Additional permissions (commonly required):**

* **Resource Type**: `TOPIC` | **Operation**: `DESCRIBE` | **Pattern Type**: `LITERAL` or `PREFIXED` | **Name**: Your topic name/prefix

<Info>
  `TOPIC READ` is always required. `GROUP READ` is only required for Python consumers (the confluent-kafka library always uses consumer groups). CLI tools like kcat work with just `TOPIC READ` permissions. Add `DESCRIBE` permissions if your client needs metadata access.
</Info>

## Code Examples

### Prerequisites

<Tabs>
  <Tab title="Python">
    Install the required packages:

    ```bash theme={null}
    pip install confluent-kafka certifi
    ```
  </Tab>

  <Tab title="CLI">
    Install kcat:

    ```bash theme={null}
    # macOS
    brew install kcat

    # Ubuntu/Debian
    sudo apt-get install kcat
    ```
  </Tab>
</Tabs>

<CodeGroup>
  ```python Python theme={null}
  from confluent_kafka import Consumer
  import socket
  import certifi

  conf = {
      'bootstrap.servers': '<service-name>-<kafka-username>.streamkap.net:32400,<service-name>-<kafka-username>.streamkap.net:32401,<service-name>-<kafka-username>.streamkap.net:32402',
      'security.protocol': 'SASL_SSL',
      'sasl.mechanism': 'PLAIN',
      'sasl.username': '<your-username>',
      'sasl.password': '<your-password>',
      'group.id': '<consumer-group-id>',
      'client.id': socket.gethostname(),
      'auto.offset.reset': 'earliest',
      # Required to trust AWS root certificates
      'ssl.ca.location': certifi.where(),
  }

  consumer = Consumer(conf)
  consumer.subscribe(['<topic-name>'])

  while True:
      msg = consumer.poll(1.0)
      if msg is None:
          continue
      if msg.error():
          print(f"Consumer error: {msg.error()}")
          continue
      
      print(f"Received message: {msg.value().decode('utf-8')}")

  consumer.close()
  ```

  ```bash CLI (Latest Message) theme={null}
  # Consume latest messages (real-time)
  kcat -b <service-name>-<kafka-username>.streamkap.net:32400 \
    -X security.protocol=SASL_SSL \
    -X sasl.mechanisms=PLAIN \
    -X sasl.username=<your-username> \
    -X sasl.password=<your-password> \
    -C -t <topic-name> -K \| -c1 -o -1
  ```

  ```bash CLI (All Messages) theme={null}
  # Consume from beginning (all messages)
  kcat -b <service-name>-<kafka-username>.streamkap.net:32400 \
    -X security.protocol=SASL_SSL \
    -X sasl.mechanisms=PLAIN \
    -X sasl.username=<your-username> \
    -X sasl.password=<your-password> \
    -C -t <topic-name> -o beginning
  ```

  ```bash CLI (Direct Partition) theme={null}
  # Read from specific partition/offset (requires only TOPIC READ)
  kcat -b <service-name>-<kafka-username>.streamkap.net:32400 \
    -X security.protocol=SASL_SSL \
    -X sasl.mechanisms=PLAIN \
    -X sasl.username=<your-username> \
    -X sasl.password=<your-password> \
    -C -t <topic-name> -p 0 -o 0 -c1
  ```
</CodeGroup>

**Replace the following values in the examples above:**

<Info>
  Your proxy endpoints are listed in the Streamkap web interface at [Kafka Access](https://app.streamkap.com/kafka-access) under "Proxy Endpoints". The format is `<service-name>-<kafka-username>.streamkap.net:PORT`.
</Info>

* `<service-name>-<kafka-username>` - Your proxy endpoints
* `<your-username>` - Your Kafka user username
* `<your-password>` - Your Kafka user password
* `<topic-name>` - The topic you want to read from
* `<consumer-group-id>` - Your consumer group ID (Python only)

## Integrations

Here are quick links to some integrations that support reading from Kafka:

* Materialize: [link to docs](https://materialize.com/docs/sql/create-source/kafka/)
* Tinybird: [link to docs](https://www.tinybird.co/docs/get-data-in/connectors/kafka)
* SingleStore: [link to docs](https://docs.singlestore.com/cloud/load-data/load-data-with-pipelines/how-to-load-data-using-pipelines/load-data-from-kafka/)
* Starburst: [link to docs](https://docs.starburst.io/starburst-galaxy/working-with-data/data-ingest/kafka-streaming-ingestion.html)
* StarTree: [link to docs](https://dev.startree.ai/docs/use-data-manager/kafka)
* Microsoft Fabric: [link to docs](https://learn.microsoft.com/en-us/fabric/real-time-intelligence/event-streams/add-source-apache-kafka)

## Troubleshooting

<AccordionGroup>
  <Accordion title="Connectivity Issues">
    Before diving into complex debugging, verify basic network connectivity to your Streamkap Kafka cluster.

    **Test DNS Resolution:**

    ```bash theme={null}
    # Check if hostname resolves
    nslookup <service-name>-<kafka-username>.streamkap.net
    ```

    **Test Port Connectivity:**

    ```bash theme={null}
    # Test with netcat (preferred - quick and clean)
    nc -zv <service-name>-<kafka-username>.streamkap.net 32400

    # Test all three ports
    nc -zv <service-name>-<kafka-username>.streamkap.net 32400
    nc -zv <service-name>-<kafka-username>.streamkap.net 32401
    nc -zv <service-name>-<kafka-username>.streamkap.net 32402

    # Alternative with telnet (press Ctrl+C to exit after connection success)
    telnet <service-name>-<kafka-username>.streamkap.net 32400
    ```

    **Test SSL/TLS Handshake:**

    ```bash theme={null}
    # Test SSL handshake and certificate chain
    openssl s_client -connect <service-name>-<kafka-username>.streamkap.net:32400 -servername <service-name>-<kafka-username>.streamkap.net

    # Alternative with timeout (press Ctrl+C to exit)
    echo "Q" | openssl s_client -connect <service-name>-<kafka-username>.streamkap.net:32400 -servername <service-name>-<kafka-username>.streamkap.net
    ```

    **Common Network Issues & Solutions:**

    * **VPN interference**: Disconnect VPN and try again
    * **Firewall blocking ports**: Ensure ports 32400-32402 are accessible
    * **Safe listed IPs**: Verify your public IP address is in the user's safe list

    If basic connectivity fails, check your network configuration before proceeding with Kafka-specific troubleshooting.
  </Accordion>

  <Accordion title="SSL & Authentication Issues">
    **Common Errors**:

    * `SSL connection closed by peer` during message production
    * SSL certificate verification failures
    * `SASL authentication failed` or authentication errors
    * SSL handshake failures

    **Authentication Solutions**:

    1. Verify username and password are correct
    2. Ensure `sasl.mechanism` is set to `PLAIN` and `security.protocol` is set to `SASL_SSL`
    3. Check that the user account is active and not disabled
    4. Confirm the user has basic connection permissions

    **SSL Solutions**:

    1. **For Python**<br />Ensure certificates are properly configured:
       ```bash theme={null}
       pip install --upgrade certifi
       ```
       ```python theme={null}
       import certifi
       'ssl.ca.location': certifi.where()
       ```
    2. **For CLI tools**<br />Try different certificate paths:
       ```bash theme={null}
       -X ssl.ca.location=/etc/ssl/cert.pem
       # or
       -X ssl.ca.location=/etc/ssl/certs/ca-certificates.crt
       ```
    3. **Disable hostname verification** (temporary):
       ```python theme={null}
       'ssl.endpoint.identification.algorithm': 'none'
       ```
    4. **Contact support** if issues persist - may require infrastructure team resolution

    **Note**: Metadata operations (listing topics) may work while data operations fail
  </Accordion>

  <Accordion title="Topic authorization failed">
    **Error**: `Topic authorization failed` or `TOPIC_AUTHORIZATION_FAILED`

    **Cause**: Missing `TOPIC READ` or `TOPIC WRITE` permissions

    **Solution**: Add the appropriate ACL permissions:

    * **Resource Type**: `TOPIC`
    * **Operation**: `READ` (for consumers) or `WRITE` (for producers)
    * **Pattern Type**: `LITERAL` or `PREFIXED`
    * **Name**: Your topic name or prefix
  </Accordion>

  <Accordion title="Group authorization failed (consumers only)">
    **Error**: `GROUP_AUTHORIZATION_FAILED` or `Group authorization failed`

    **Cause**: Missing `GROUP READ` permissions for your consumer group

    **Solution**: Add the following ACL permission:

    * **Resource Type**: `GROUP`
    * **Operation**: `READ`
    * **Pattern Type**: `LITERAL` or `PREFIXED`
    * **Name**: Your consumer group ID (e.g., `my-consumer-group`)

    **Note**: This only affects Python consumers and CLI tools using consumer groups
  </Accordion>

  <Accordion title="No messages received (consumers only)">
    **Issue**: Consumer polls but receives no messages

    **Possible Causes**:

    1. **No messages in topic**: Topic is empty or messages are at different offsets
    2. **Consumer group offset**: Group has already consumed available messages
    3. **Partition assignment**: Messages might be in different partitions
    4. **Offset reset**: Check `auto.offset.reset` setting

    **Solutions**:

    1. **Check topic contents**: Use CLI to verify messages exist
    2. **Use fresh consumer group**: Try with a new `group.id`
    3. **Reset offsets**: Set `auto.offset.reset` to `earliest`
    4. **Check all partitions**: For CLI, try without specifying partition
  </Accordion>
</AccordionGroup>
