Skip to main content
Customers on a paid plan can enable direct access via Proxy.
This guide shows you how to write messages to your Streamkap Kafka topics using Python or command-line tools.

Creating Kafka Users

You can create and manage Kafka users through the Streamkap web interface at Kafka Access. To create a new Kafka user, click the “Create User” button. This will open the user creation dialog where you can configure the user’s permissions and access settings.
For detailed step-by-step instructions on creating and managing Kafka users through the UI, see the Kafka Access documentation.

User Configuration

When creating a Kafka user, you’ll need to configure:
  • Username: Enter a lowercase username for the Kafka user
  • Password: Set a secure password for authentication
  • Whitelisted IPs: Specify IP addresses or CIDR ranges that are allowed to connect
  • Kafka ACLs: Configure access control lists to define what the user can do

Access Control Lists (ACLs)

Kafka ACLs control what operations users can perform on specific resources. When creating a user, you can configure:
  • Resource Type:
    • TOPIC - Controls access to Kafka topics
    • GROUP - Controls access to consumer groups
  • Operation: The type of operation allowed (varies by resource type)
    For TOPIC resources:
    • ALL - All operations
    • WRITE - Write/produce messages
    • READ - Read/consume messages
    • ALTER - Modify resource configurations
    • ALTER_CONFIGS - Modify resource configurations
    • CREATE - Create new resources
    • DELETE - Delete resources
    • DESCRIBE - View resource metadata
    • DESCRIBE_CONFIGS - View resource configurations For GROUP resources (consumers only):
    • READ - Join and consume from consumer group
    • DELETE - Delete consumer group
    • DESCRIBE - View consumer group metadata
  • Pattern Type: How the resource name is matched
    • LITERAL - Exact match of the resource name
    • PREFIXED - Match resources with the specified prefix
  • Name: The specific resource name or prefix to apply the ACL to

Connection Details

Once a user is created, your endpoints are shown under “Proxy Endpoints”. These endpoints follow the naming pattern: <service-name>-<kafka-username>.streamkap.net:PORT Where:
  • <service-name> - Your Streamkap service/tenant name
  • <kafka-username> - The Kafka user’s username
  • PORT - One of the available ports: 32400, 32401, or 32402
Example proxy endpoints:
  • my-service-kafka-user.streamkap.net:32400
  • my-service-kafka-user.streamkap.net:32401
  • my-service-kafka-user.streamkap.net:32402
Connection settings:
  • Security protocol: SASL_SSL (recommended for secure connections)
  • SASL mechanism: PLAIN
  • Username/password: As configured for the user

Required Permissions

To write to Kafka topics, your user needs these ACL permissions: Essential permissions (always required):
  • Resource Type: TOPIC | Operation: WRITE | Pattern Type: LITERAL or PREFIXED | Name: Your topic name/prefix
  • Resource Type: TOPIC | Operation: DESCRIBE | Pattern Type: LITERAL or PREFIXED | Name: Your topic name/prefix
Additional permission (if topic doesn’t exist):
  • Resource Type: TOPIC | Operation: CREATE | Pattern Type: LITERAL or PREFIXED | Name: Your topic name/prefix
Both WRITE and DESCRIBE are required for successful message production. Add CREATE only if you need to create new topics.

Code Examples

Prerequisites

  • Python
  • CLI
Install the required packages:
pip install confluent-kafka certifi
from confluent_kafka import Producer
import socket
import certifi
import os

def delivery_report(err, msg):
    """Called once for each message produced to indicate delivery result."""
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')

conf = {
    'bootstrap.servers': '<service-name>-<kafka-username>.streamkap.net:32400,<service-name>-<kafka-username>.streamkap.net:32401,<service-name>-<kafka-username>.streamkap.net:32402',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': '<your-username>',
    'sasl.password': '<your-password>',
    'client.id': socket.gethostname(),
    # Required to trust AWS root certificates
    'ssl.ca.location': certifi.where(),
}

producer = Producer(conf)

# Produce a message
producer.produce('<topic-name>', key='key1', value='Hello Streamkap!', callback=delivery_report)
producer.flush()

# Note: If the topic doesn't exist, you may need to create it first
# This requires CREATE permissions in addition to WRITE and DESCRIBE
Replace the following values in the examples above:
Your proxy endpoints are listed in the Streamkap web interface at Kafka Access under “Proxy Endpoints”. The format is <service-name>-<kafka-username>.streamkap.net:PORT.
  • <service-name>-<kafka-username> - Your proxy endpoints
  • <your-username> - Your Kafka user username
  • <your-password> - Your Kafka user password
  • <topic-name> - The topic you want to write to

Troubleshooting

Before diving into complex debugging, verify basic network connectivity to your Streamkap Kafka cluster.Test DNS Resolution:
# Check if hostname resolves
nslookup <service-name>-<kafka-username>.streamkap.net
Test Port Connectivity:
# Test with netcat (preferred - quick and clean)
nc -zv <service-name>-<kafka-username>.streamkap.net 32400

# Test all three ports
nc -zv <service-name>-<kafka-username>.streamkap.net 32400
nc -zv <service-name>-<kafka-username>.streamkap.net 32401
nc -zv <service-name>-<kafka-username>.streamkap.net 32402

# Alternative with telnet (press Ctrl+C to exit after connection success)
telnet <service-name>-<kafka-username>.streamkap.net 32400
Test SSL/TLS Handshake:
# Test SSL handshake and certificate chain
openssl s_client -connect <service-name>-<kafka-username>.streamkap.net:32400 -servername <service-name>-<kafka-username>.streamkap.net

# Alternative with timeout (press Ctrl+C to exit)
echo "Q" | openssl s_client -connect <service-name>-<kafka-username>.streamkap.net:32400 -servername <service-name>-<kafka-username>.streamkap.net
Common Network Issues & Solutions:
  • VPN interference: Disconnect VPN and try again
  • Firewall blocking ports: Ensure ports 32400-32402 are accessible
  • Whitelisted IPs: Verify your public IP address is in the user’s whitelist
If basic connectivity fails, check your network configuration before proceeding with Kafka-specific troubleshooting.
Common Errors:
  • SSL connection closed by peer during message production
  • SSL certificate verification failures
  • SASL authentication failed or authentication errors
  • SSL handshake failures
Authentication Solutions:
  1. Verify username and password are correct
  2. Ensure sasl.mechanism is set to PLAIN and security.protocol is set to SASL_SSL
  3. Check that the user account is active and not disabled
  4. Confirm the user has basic connection permissions
SSL Solutions:
  1. For Python
    Ensure certificates are properly configured:
    pip install --upgrade certifi
    
    import certifi
    'ssl.ca.location': certifi.where()
    
  2. For CLI tools
    Try different certificate paths:
    -X ssl.ca.location=/etc/ssl/cert.pem
    # or
    -X ssl.ca.location=/etc/ssl/certs/ca-certificates.crt
    
  3. Disable hostname verification (temporary):
    'ssl.endpoint.identification.algorithm': 'none'
    
  4. Contact support if issues persist - may require infrastructure team resolution
Note: Metadata operations (listing topics) may work while data operations fail
Error: Topic authorization failed or TOPIC_AUTHORIZATION_FAILEDCause: Missing TOPIC READ or TOPIC WRITE permissionsSolution: Add the appropriate ACL permissions:
  • Resource Type: TOPIC
  • Operation: READ (for consumers) or WRITE (for producers)
  • Pattern Type: LITERAL or PREFIXED
  • Name: Your topic name or prefix
Error: GROUP_AUTHORIZATION_FAILED or Group authorization failedCause: Missing GROUP READ permissions for your consumer groupSolution: Add the following ACL permission:
  • Resource Type: GROUP
  • Operation: READ
  • Pattern Type: LITERAL or PREFIXED
  • Name: Your consumer group ID (e.g., my-consumer-group)
Note: This only affects Python consumers and CLI tools using consumer groups
Issue: Consumer polls but receives no messagesPossible Causes:
  1. No messages in topic: Topic is empty or messages are at different offsets
  2. Consumer group offset: Group has already consumed available messages
  3. Partition assignment: Messages might be in different partitions
  4. Offset reset: Check auto.offset.reset setting
Solutions:
  1. Check topic contents: Use CLI to verify messages exist
  2. Use fresh consumer group: Try with a new group.id
  3. Reset offsets: Set auto.offset.reset to earliest
  4. Check all partitions: For CLI, try without specifying partition
I