Skip to main content
Customers on a paid plan can enable direct access via Proxy.
This guide shows you how to read messages from your Streamkap Kafka topics using Python or command-line tools.

Creating Kafka Users

You can create and manage Kafka users through the Streamkap web interface at Kafka Access. To create a new Kafka user, click the “Create User” button. This will open the user creation dialog where you can configure the user’s permissions and access settings.
For detailed step-by-step instructions on creating and managing Kafka users through the UI, see the Kafka Access documentation.

User Configuration

When creating a Kafka user, you’ll need to configure:
  • Username: Enter a lowercase username for the Kafka user
  • Password: Set a secure password for authentication
  • Safe listed IPs: Specify IP addresses or CIDR ranges that are allowed to connect
  • Kafka ACLs: Configure access control lists to define what the user can do

Access Control Lists (ACLs)

Kafka ACLs control what operations users can perform on specific resources. When creating a user, you can configure:
  • Resource Type:
    • TOPIC - Controls access to Kafka topics
    • GROUP - Controls access to consumer groups
  • Operation: The type of operation allowed (varies by resource type)
    For TOPIC resources:
    • ALL - All operations
    • WRITE - Write/produce messages
    • READ - Read/consume messages
    • ALTER - Modify resource configurations
    • ALTER_CONFIGS - Modify resource configurations
    • CREATE - Create new resources
    • DELETE - Delete resources
    • DESCRIBE - View resource metadata
    • DESCRIBE_CONFIGS - View resource configurations For GROUP resources (consumers only):
    • READ - Join and consume from consumer group
    • DELETE - Delete consumer group
    • DESCRIBE - View consumer group metadata
  • Pattern Type: How the resource name is matched
    • LITERAL - Exact match of the resource name
    • PREFIXED - Match resources with the specified prefix
  • Name: The specific resource name or prefix to apply the ACL to

Connection Details

Once a user is created, your endpoints are shown under “Proxy Endpoints”. These endpoints follow the naming pattern: <service-name>-<kafka-username>.streamkap.net:PORT Where:
  • <service-name> - Your Streamkap service/tenant name
  • <kafka-username> - The Kafka user’s username
  • PORT - One of the available ports: 32400, 32401, or 32402
Example proxy endpoints:
  • my-service-kafka-user.streamkap.net:32400
  • my-service-kafka-user.streamkap.net:32401
  • my-service-kafka-user.streamkap.net:32402
Connection settings:
  • Security protocol: SASL_SSL (recommended for secure connections)
  • SASL mechanism: PLAIN
  • Username/password: As configured for the user

Required Permissions

To read from Kafka topics, your user needs these ACL permissions: Essential permissions (always required):
  • Resource Type: TOPIC | Operation: READ | Pattern Type: LITERAL or PREFIXED | Name: Your topic name/prefix
Consumer group permissions (required for Python only):
  • Resource Type: GROUP | Operation: READ | Pattern Type: LITERAL or PREFIXED | Name: Your consumer group ID
Additional permissions (commonly required):
  • Resource Type: TOPIC | Operation: DESCRIBE | Pattern Type: LITERAL or PREFIXED | Name: Your topic name/prefix
TOPIC READ is always required. GROUP READ is only required for Python consumers (the confluent-kafka library always uses consumer groups). CLI tools like kcat work with just TOPIC READ permissions. Add DESCRIBE permissions if your client needs metadata access.

Code Examples

Prerequisites

Install the required packages:
pip install confluent-kafka certifi
from confluent_kafka import Consumer
import socket
import certifi

conf = {
    'bootstrap.servers': '<service-name>-<kafka-username>.streamkap.net:32400,<service-name>-<kafka-username>.streamkap.net:32401,<service-name>-<kafka-username>.streamkap.net:32402',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': '<your-username>',
    'sasl.password': '<your-password>',
    'group.id': '<consumer-group-id>',
    'client.id': socket.gethostname(),
    'auto.offset.reset': 'earliest',
    # Required to trust AWS root certificates
    'ssl.ca.location': certifi.where(),
}

consumer = Consumer(conf)
consumer.subscribe(['<topic-name>'])

while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print(f"Consumer error: {msg.error()}")
        continue
    
    print(f"Received message: {msg.value().decode('utf-8')}")

consumer.close()
Replace the following values in the examples above:
Your proxy endpoints are listed in the Streamkap web interface at Kafka Access under “Proxy Endpoints”. The format is <service-name>-<kafka-username>.streamkap.net:PORT.
  • <service-name>-<kafka-username> - Your proxy endpoints
  • <your-username> - Your Kafka user username
  • <your-password> - Your Kafka user password
  • <topic-name> - The topic you want to read from
  • <consumer-group-id> - Your consumer group ID (Python only)

Integrations

Here are quick links to some integrations that support reading from Kafka:

Troubleshooting

Before diving into complex debugging, verify basic network connectivity to your Streamkap Kafka cluster.Test DNS Resolution:
# Check if hostname resolves
nslookup <service-name>-<kafka-username>.streamkap.net
Test Port Connectivity:
# Test with netcat (preferred - quick and clean)
nc -zv <service-name>-<kafka-username>.streamkap.net 32400

# Test all three ports
nc -zv <service-name>-<kafka-username>.streamkap.net 32400
nc -zv <service-name>-<kafka-username>.streamkap.net 32401
nc -zv <service-name>-<kafka-username>.streamkap.net 32402

# Alternative with telnet (press Ctrl+C to exit after connection success)
telnet <service-name>-<kafka-username>.streamkap.net 32400
Test SSL/TLS Handshake:
# Test SSL handshake and certificate chain
openssl s_client -connect <service-name>-<kafka-username>.streamkap.net:32400 -servername <service-name>-<kafka-username>.streamkap.net

# Alternative with timeout (press Ctrl+C to exit)
echo "Q" | openssl s_client -connect <service-name>-<kafka-username>.streamkap.net:32400 -servername <service-name>-<kafka-username>.streamkap.net
Common Network Issues & Solutions:
  • VPN interference: Disconnect VPN and try again
  • Firewall blocking ports: Ensure ports 32400-32402 are accessible
  • Safe listed IPs: Verify your public IP address is in the user’s safe list
If basic connectivity fails, check your network configuration before proceeding with Kafka-specific troubleshooting.
Common Errors:
  • SSL connection closed by peer during message production
  • SSL certificate verification failures
  • SASL authentication failed or authentication errors
  • SSL handshake failures
Authentication Solutions:
  1. Verify username and password are correct
  2. Ensure sasl.mechanism is set to PLAIN and security.protocol is set to SASL_SSL
  3. Check that the user account is active and not disabled
  4. Confirm the user has basic connection permissions
SSL Solutions:
  1. For Python
    Ensure certificates are properly configured:
    pip install --upgrade certifi
    
    import certifi
    'ssl.ca.location': certifi.where()
    
  2. For CLI tools
    Try different certificate paths:
    -X ssl.ca.location=/etc/ssl/cert.pem
    # or
    -X ssl.ca.location=/etc/ssl/certs/ca-certificates.crt
    
  3. Disable hostname verification (temporary):
    'ssl.endpoint.identification.algorithm': 'none'
    
  4. Contact support if issues persist - may require infrastructure team resolution
Note: Metadata operations (listing topics) may work while data operations fail
Error: Topic authorization failed or TOPIC_AUTHORIZATION_FAILEDCause: Missing TOPIC READ or TOPIC WRITE permissionsSolution: Add the appropriate ACL permissions:
  • Resource Type: TOPIC
  • Operation: READ (for consumers) or WRITE (for producers)
  • Pattern Type: LITERAL or PREFIXED
  • Name: Your topic name or prefix
Error: GROUP_AUTHORIZATION_FAILED or Group authorization failedCause: Missing GROUP READ permissions for your consumer groupSolution: Add the following ACL permission:
  • Resource Type: GROUP
  • Operation: READ
  • Pattern Type: LITERAL or PREFIXED
  • Name: Your consumer group ID (e.g., my-consumer-group)
Note: This only affects Python consumers and CLI tools using consumer groups
Issue: Consumer polls but receives no messagesPossible Causes:
  1. No messages in topic: Topic is empty or messages are at different offsets
  2. Consumer group offset: Group has already consumed available messages
  3. Partition assignment: Messages might be in different partitions
  4. Offset reset: Check auto.offset.reset setting
Solutions:
  1. Check topic contents: Use CLI to verify messages exist
  2. Use fresh consumer group: Try with a new group.id
  3. Reset offsets: Set auto.offset.reset to earliest
  4. Check all partitions: For CLI, try without specifying partition