> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamkap.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka (Writing)

> Write directly to Kafka

<Info>
  Customers on a paid plan can enable direct access via Proxy.
</Info>

This guide shows you how to write messages to your Streamkap Kafka topics using Python or command-line tools.

## Creating Kafka Users

You can create and manage Kafka users through the Streamkap web interface at [Kafka Access](https://app.streamkap.com/kafka-access).

To create a new Kafka user, click the "Create User" button. This will open the user creation dialog where you can configure the user's permissions and access settings.

<Info>
  For detailed step-by-step instructions on creating and managing Kafka users through the UI, see the [Kafka Access](/kafka-access) documentation.
</Info>

### User Configuration

When creating a Kafka user, you'll need to configure:

* **Username**: Enter a lowercase username for the Kafka user
* **Password**: Set a secure password for authentication
* **Safe listed IPs**: Specify IP addresses or CIDR ranges that are allowed to connect
* **Kafka ACLs**: Configure access control lists to define what the user can do

### Access Control Lists (ACLs)

Kafka ACLs control what operations users can perform on specific resources. When creating a user, you can configure:

* **Resource Type**:
  * `TOPIC` - Controls access to Kafka topics
  * `GROUP` - Controls access to consumer groups

* **Operation**: The type of operation allowed (varies by resource type)<br />
  **For `TOPIC` resources:**

  * `ALL` - All operations
  * `WRITE` - Write/produce messages
  * `READ` - Read/consume messages
  * `ALTER` - Modify resource configurations
  * `ALTER_CONFIGS` - Modify resource configurations
  * `CREATE` - Create new resources
  * `DELETE` - Delete resources
  * `DESCRIBE` - View resource metadata
  * `DESCRIBE_CONFIGS` - View resource configurations

  **For `GROUP` resources (consumers only):**

  * `READ` - Join and consume from consumer group
  * `DELETE` - Delete consumer group
  * `DESCRIBE` - View consumer group metadata

* **Pattern Type**: How the resource name is matched
  * `LITERAL` - Exact match of the resource name
  * `PREFIXED` - Match resources with the specified prefix

* **Name**: The specific resource name or prefix to apply the ACL to

### Connection Details

Once a user is created, your endpoints are shown under "Proxy Endpoints". These endpoints follow the naming pattern:

`<service-name>-<kafka-username>.streamkap.net:PORT`

Where:

* `<service-name>` - Your Streamkap service/tenant name
* `<kafka-username>` - The Kafka user's username
* `PORT` - One of the available ports: 32400, 32401, or 32402

**Example proxy endpoints:**

* `my-service-kafka-user.streamkap.net:32400`
* `my-service-kafka-user.streamkap.net:32401`
* `my-service-kafka-user.streamkap.net:32402`

**Connection settings:**

* **Security protocol**: `SASL_SSL` (recommended for secure connections)
* **SASL mechanism**: `PLAIN`
* **Username/password**: As configured for the user

### Required Permissions

To write to Kafka topics, your user needs these ACL permissions:

**Essential permissions (always required):**

* **Resource Type**: `TOPIC` | **Operation**: `WRITE` | **Pattern Type**: `LITERAL` or `PREFIXED` | **Name**: Your topic name/prefix
* **Resource Type**: `TOPIC` | **Operation**: `DESCRIBE` | **Pattern Type**: `LITERAL` or `PREFIXED` | **Name**: Your topic name/prefix

**Additional permission (if topic doesn't exist):**

* **Resource Type**: `TOPIC` | **Operation**: `CREATE` | **Pattern Type**: `LITERAL` or `PREFIXED` | **Name**: Your topic name/prefix

<Info>
  Both `WRITE` and `DESCRIBE` are required for successful message production. Add `CREATE` only if you need to create new topics.
</Info>

## Code Examples

### Prerequisites

<Tabs>
  <Tab title="Python">
    Install the required packages:

    ```bash theme={null}
    pip install confluent-kafka certifi
    ```
  </Tab>

  <Tab title="CLI">
    Install kcat:

    ```bash theme={null}
    # macOS
    brew install kcat

    # Ubuntu/Debian
    sudo apt-get install kcat
    ```
  </Tab>
</Tabs>

<CodeGroup>
  ```python Python theme={null}
  from confluent_kafka import Producer
  import socket
  import certifi
  import os

  def delivery_report(err, msg):
      """Called once for each message produced to indicate delivery result."""
      if err is not None:
          print(f'Message delivery failed: {err}')
      else:
          print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')

  conf = {
      'bootstrap.servers': '<service-name>-<kafka-username>.streamkap.net:32400,<service-name>-<kafka-username>.streamkap.net:32401,<service-name>-<kafka-username>.streamkap.net:32402',
      'security.protocol': 'SASL_SSL',
      'sasl.mechanism': 'PLAIN',
      'sasl.username': '<your-username>',
      'sasl.password': '<your-password>',
      'client.id': socket.gethostname(),
      # Required to trust AWS root certificates
      'ssl.ca.location': certifi.where(),
  }

  producer = Producer(conf)

  # Produce a message
  producer.produce('<topic-name>', key='key1', value='Hello Streamkap!', callback=delivery_report)
  producer.flush()

  # Note: If the topic doesn't exist, you may need to create it first
  # This requires CREATE permissions in addition to WRITE and DESCRIBE
  ```

  ```bash CLI (Single Message) theme={null}
  # Single message
  echo "Hello Streamkap!" | kcat -P \
    -b <service-name>-<kafka-username>.streamkap.net:32400,<service-name>-<kafka-username>.streamkap.net:32401,<service-name>-<kafka-username>.streamkap.net:32402 \
    -t <topic-name> \
    -X security.protocol=SASL_SSL \
    -X sasl.mechanisms=PLAIN \
    -X sasl.username=<your-username> \
    -X sasl.password=<your-password>
  ```

  ```bash CLI (Batch Messages) theme={null}
  # Multiple messages from file
  kcat -P \
    -b <service-name>-<kafka-username>.streamkap.net:32400,<service-name>-<kafka-username>.streamkap.net:32401,<service-name>-<kafka-username>.streamkap.net:32402 \
    -t <topic-name> \
    -X security.protocol=SASL_SSL \
    -X sasl.mechanisms=PLAIN \
    -X sasl.username=<your-username> \
    -X sasl.password=<your-password> \
    -l messages.txt
  ```

  ```bash CLI (Key-Value) theme={null}
  # Produce messages with keys (key:value format)
  echo "user123:Hello from user 123" | kcat -P \
    -b <service-name>-<kafka-username>.streamkap.net:32400,<service-name>-<kafka-username>.streamkap.net:32401,<service-name>-<kafka-username>.streamkap.net:32402 \
    -t <topic-name> \
    -K: \
    -X security.protocol=SASL_SSL \
    -X sasl.mechanisms=PLAIN \
    -X sasl.username=<your-username> \
    -X sasl.password=<your-password>
  ```
</CodeGroup>

**Replace the following values in the examples above:**

<Info>
  Your proxy endpoints are listed in the Streamkap web interface at [Kafka Access](https://app.streamkap.com/kafka-access) under "Proxy Endpoints". The format is `<service-name>-<kafka-username>.streamkap.net:PORT`.
</Info>

* `<service-name>-<kafka-username>` - Your proxy endpoints
* `<your-username>` - Your Kafka user username
* `<your-password>` - Your Kafka user password
* `<topic-name>` - The topic you want to write to

## Troubleshooting

<AccordionGroup>
  <Accordion title="Connectivity Issues">
    Before diving into complex debugging, verify basic network connectivity to your Streamkap Kafka cluster.

    **Test DNS Resolution:**

    ```bash theme={null}
    # Check if hostname resolves
    nslookup <service-name>-<kafka-username>.streamkap.net
    ```

    **Test Port Connectivity:**

    ```bash theme={null}
    # Test with netcat (preferred - quick and clean)
    nc -zv <service-name>-<kafka-username>.streamkap.net 32400

    # Test all three ports
    nc -zv <service-name>-<kafka-username>.streamkap.net 32400
    nc -zv <service-name>-<kafka-username>.streamkap.net 32401
    nc -zv <service-name>-<kafka-username>.streamkap.net 32402

    # Alternative with telnet (press Ctrl+C to exit after connection success)
    telnet <service-name>-<kafka-username>.streamkap.net 32400
    ```

    **Test SSL/TLS Handshake:**

    ```bash theme={null}
    # Test SSL handshake and certificate chain
    openssl s_client -connect <service-name>-<kafka-username>.streamkap.net:32400 -servername <service-name>-<kafka-username>.streamkap.net

    # Alternative with timeout (press Ctrl+C to exit)
    echo "Q" | openssl s_client -connect <service-name>-<kafka-username>.streamkap.net:32400 -servername <service-name>-<kafka-username>.streamkap.net
    ```

    **Common Network Issues & Solutions:**

    * **VPN interference**: Disconnect VPN and try again
    * **Firewall blocking ports**: Ensure ports 32400-32402 are accessible
    * **Safe listed IPs**: Verify your public IP address is in the user's safe list

    If basic connectivity fails, check your network configuration before proceeding with Kafka-specific troubleshooting.
  </Accordion>

  <Accordion title="SSL & Authentication Issues">
    **Common Errors**:

    * `SSL connection closed by peer` during message production
    * SSL certificate verification failures
    * `SASL authentication failed` or authentication errors
    * SSL handshake failures

    **Authentication Solutions**:

    1. Verify username and password are correct
    2. Ensure `sasl.mechanism` is set to `PLAIN` and `security.protocol` is set to `SASL_SSL`
    3. Check that the user account is active and not disabled
    4. Confirm the user has basic connection permissions

    **SSL Solutions**:

    1. **For Python**<br />Ensure certificates are properly configured:
       ```bash theme={null}
       pip install --upgrade certifi
       ```
       ```python theme={null}
       import certifi
       'ssl.ca.location': certifi.where()
       ```
    2. **For CLI tools**<br />Try different certificate paths:
       ```bash theme={null}
       -X ssl.ca.location=/etc/ssl/cert.pem
       # or
       -X ssl.ca.location=/etc/ssl/certs/ca-certificates.crt
       ```
    3. **Disable hostname verification** (temporary):
       ```python theme={null}
       'ssl.endpoint.identification.algorithm': 'none'
       ```
    4. **Contact support** if issues persist - may require infrastructure team resolution

    **Note**: Metadata operations (listing topics) may work while data operations fail
  </Accordion>

  <Accordion title="Topic authorization failed">
    **Error**: `Topic authorization failed` or `TOPIC_AUTHORIZATION_FAILED`

    **Cause**: Missing `TOPIC READ` or `TOPIC WRITE` permissions

    **Solution**: Add the appropriate ACL permissions:

    * **Resource Type**: `TOPIC`
    * **Operation**: `READ` (for consumers) or `WRITE` (for producers)
    * **Pattern Type**: `LITERAL` or `PREFIXED`
    * **Name**: Your topic name or prefix
  </Accordion>

  <Accordion title="Group authorization failed (consumers only)">
    **Error**: `GROUP_AUTHORIZATION_FAILED` or `Group authorization failed`

    **Cause**: Missing `GROUP READ` permissions for your consumer group

    **Solution**: Add the following ACL permission:

    * **Resource Type**: `GROUP`
    * **Operation**: `READ`
    * **Pattern Type**: `LITERAL` or `PREFIXED`
    * **Name**: Your consumer group ID (e.g., `my-consumer-group`)

    **Note**: This only affects Python consumers and CLI tools using consumer groups
  </Accordion>

  <Accordion title="No messages received (consumers only)">
    **Issue**: Consumer polls but receives no messages

    **Possible Causes**:

    1. **No messages in topic**: Topic is empty or messages are at different offsets
    2. **Consumer group offset**: Group has already consumed available messages
    3. **Partition assignment**: Messages might be in different partitions
    4. **Offset reset**: Check `auto.offset.reset` setting

    **Solutions**:

    1. **Check topic contents**: Use CLI to verify messages exist
    2. **Use fresh consumer group**: Try with a new `group.id`
    3. **Reset offsets**: Set `auto.offset.reset` to `earliest`
    4. **Check all partitions**: For CLI, try without specifying partition
  </Accordion>
</AccordionGroup>
