Skip to main content

Overview

Use the Webhook Source to ingest data into Streamkap by accepting HTTP requests at a dynamically generated endpoint. This is useful for integrating third-party services, webhooks from external platforms, or custom applications that push data via HTTP. The connector automatically routes incoming requests to Kafka topics based on the URL path or a default topic.

Prerequisites

  • An external service or application capable of sending HTTP POST, PUT, or DELETE requests
  • Understanding of your data format (JSON or string)
  • Plan for topic naming and routing strategy
  • API key for secure access to your webhook endpoint

How It Works

  1. Webhook Generation: Once created, the connector generates a unique webhook URL and API key.
  2. Flexible Routing: Requests to https://webhook.endpoint/topic_name are routed to the topic_name topic. Requests to the base URL route to the default topic.
  3. Automatic Topic Creation: If you POST to a new path (e.g., /orders), the connector can automatically create that topic, but you should pre-register it in the Schema section for proper configuration.
  4. Data Transformation: Incoming data is converted to JSON format and optionally enriched with metadata (e.g., HTTP method, deletion markers).
  5. Message Keys: The connector extracts Kafka message keys from HTTP headers, enabling downstream systems to perform upserts and deletes based on unique identifiers.

Streamkap Setup (UI)

  1. Navigate to Sources and choose Webhook.
  2. Fill in the fields:
    1. Name – A memorable identifier for this webhook source (e.g., “External Service Webhook”).
    2. Webhook URL – Auto-generated and displayed as read-only after creation. This URL is unique to your webhook. Share this URL with external services. Example: https://wh-abc123xyz-tenant-internal.streamkap.net.
    3. API Key – Auto-generated and displayed as read-only after creation. External services must include this in requests using the X-API-Key header.
    4. Data Format – Choose json or string:
      • JSON – Expects JSON payloads; automatically parsed and converted to Kafka records.
      • String – Treats the raw HTTP body as a string value.
    5. Add Delete Field – Enable to add a delete: true field when DELETE HTTP requests are received. Useful for marking deletions in downstream systems.
    6. Camel Message Header Key – The name of the HTTP header containing the Kafka message key (default: key). This is critical for enabling upserts and deletes in downstream consumers.
      • The header value must be a valid JSON object representing the key fields.
      • The same key field(s) must also be present in the request body/payload.
      • Example: If you want to upsert records by id, set this to key and include both the header and body field.
      • See Message Key Extraction section below for detailed examples.
  3. Schema Configuration:
    1. Define the topics where incoming data will be stored.
    2. Default Topic Name – The topic that receives all requests to the base webhook URL (default: default_topic). Must be added to the Schema section below.
    3. Topic Names – Additional topics to route requests to based on URL paths. Add topic names you plan to use. Examples:
      • orders – Requests to https://wh-abc123xyz-tenant-internal.streamkap.net/orders route here.
      • payments – Requests to https://wh-abc123xyz-tenant-internal.streamkap.net/payments route here.
      • events – Requests to https://wh-abc123xyz-tenant-internal.streamkap.net/events route here.
    4. Important: Always include default_topic in the list. If you expect dynamic topics, pre-register them here or wait for the automatic topic discovery service to detect them (which may take time).
  4. Save the Source.
  5. Attach to Pipelines and configure downstream Sinks as needed.

Authentication

All requests must include the API key using the X-API-Key header:
X-API-Key: <YOUR_API_KEY>
Requests without a valid API key will be rejected with a 401 Unauthorized response.

Example Request with API Key

curl -X POST https://wh-abc123xyz-tenant-internal.streamkap.net/orders \
  -H "Content-Type: application/json" \
  -H "X-API-Key: sk-1234567890abcdef" \
  -d '{"order_id": 12345, "customer": "John Doe", "amount": 99.99}'

Message Key Extraction

The connector supports extracting Kafka message keys from HTTP headers. This enables downstream systems to perform upserts (update or insert) and deletes based on unique identifiers.

How It Works

  1. Header Configuration: Set the Camel Message Header Key to the name of the HTTP header containing the key (default: key).
  2. Key Format: The header value must be a valid JSON object representing the key fields.
  3. Dual Presence: The key field(s) must appear in both the HTTP header AND the request body for consistency.
  4. Single Record Assumption: The connector assumes each HTTP request contains exactly one message with one key.

Key Extraction Examples

Example 1: Simple ID-Based Key

Setup:
  • Camel Message Header Key: key
  • Body includes id field
Request:
curl -X POST https://wh-abc123xyz-tenant-internal.streamkap.net/orders \
  -H "Content-Type: application/json" \
  -H "X-API-Key: sk-1234567890abcdef" \
  -H "key: {\"id\": 12345}" \
  -d '{
    "id": 12345,
    "customer": "John Doe",
    "amount": 99.99,
    "timestamp": "2024-01-15T10:30:00Z"
  }'
Resulting Kafka Record (orders topic):
  • Message Key: {"id": 12345}
  • Message Value:
{
  "id": 12345,
  "customer": "John Doe",
  "amount": 99.99,
  "timestamp": "2024-01-15T10:30:00Z"
}
Downstream Usage: A sink connector can now upsert records by the id field, replacing old records with the same id.

Example 2: Composite Key

Setup:
  • Camel Message Header Key: key
  • Body includes both customer_id and order_number fields
Request:
curl -X POST https://wh-abc123xyz-tenant-internal.streamkap.net/orders \
  -H "Content-Type: application/json" \
  -H "X-API-Key: sk-1234567890abcdef" \
  -H "key: {\"customer_id\": \"CUST-123\", \"order_number\": \"ORD-001\"}" \
  -d '{
    "customer_id": "CUST-123",
    "order_number": "ORD-001",
    "amount": 150.50,
    "status": "pending"
  }'
Resulting Kafka Record (orders topic):
  • Message Key: {"customer_id": "CUST-123", "order_number": "ORD-001"}
  • Message Value:
{
  "customer_id": "CUST-123",
  "order_number": "ORD-001",
  "amount": 150.50,
  "status": "pending"
}

Example 3: DELETE with Key

Setup:
  • Camel Message Header Key: key
  • Add Delete Field: Enabled
Request:
curl -X DELETE https://wh-abc123xyz-tenant-internal.streamkap.net/users \
  -H "Content-Type: application/json" \
  -H "X-API-Key: sk-1234567890abcdef" \
  -H "key: {\"user_id\": \"USER-456\"}" \
  -d '{
    "user_id": "USER-456"
  }'
Resulting Kafka Record (users topic):
  • Message Key: {"user_id": "USER-456"}
  • Message Value:
{
  "user_id": "USER-456",
  "delete": true
}
Downstream Usage: A sink connector recognizes the delete: true field and removes the record with key {"user_id": "USER-456"} from the target system.

Example 4: Custom Header Name

Setup:
  • Camel Message Header Key: msg_key (custom header name)
  • Body includes product_id field
Request:
curl -X POST https://wh-abc123xyz-tenant-internal.streamkap.net/products \
  -H "Content-Type: application/json" \
  -H "X-API-Key: sk-1234567890abcdef" \
  -H "msg_key: {\"product_id\": \"PROD-789\"}" \
  -d '{
    "product_id": "PROD-789",
    "name": "Laptop",
    "price": 999.99,
    "stock": 50
  }'
Resulting Kafka Record (products topic):
  • Message Key: {"product_id": "PROD-789"}
  • Message Value:
{
  "product_id": "PROD-789",
  "name": "Laptop",
  "price": 999.99,
  "stock": 50
}

Best Practices for Message Keys

  • Always Include Both: Ensure the key field(s) exist in both the HTTP header and the request body.
  • Use Immutable IDs: Use unique, immutable identifiers (e.g., order ID, user ID) as keys to avoid data inconsistencies.
  • Composite Keys: For datasets requiring uniqueness across multiple fields, use composite keys (e.g., customer_id + order_number).
  • Consistency Across Calls: Use the same key structure for all requests to the same topic.
  • Valid JSON: The header value must be valid JSON. Invalid JSON in the key header will cause request failures.

Using Your Webhook

Once the webhook is created, you’ll see:
  • Webhook URL: The endpoint external services should POST/PUT/DELETE to
  • API Key: Required for authentication via X-API-Key header

Routing Behavior

Request URLResulting TopicNotes
POST https://wh-abc123xyz-tenant-internal.streamkap.netdefault_topicBase URL routes to default topic
POST https://wh-abc123xyz-tenant-internal.streamkap.net/ordersordersPath-based routing (must be pre-registered in Schema)
PUT https://wh-abc123xyz-tenant-internal.streamkap.net/customerscustomersPUT requests also supported
DELETE https://wh-abc123xyz-tenant-internal.streamkap.net/users with key headerusersDELETE requests require key header to identify the record to delete. Body must also contain the key field(s).

Data Format & Transformation

JSON Format

Incoming JSON payloads are automatically parsed and sent as Kafka records:
{
  "order_id": 12345,
  "customer": "John Doe",
  "amount": 99.99
}
Becomes a Kafka record with the JSON structure preserved.

String Format

Raw string bodies are wrapped in a data field:
"raw text data"
Becomes:
{
  "data": "raw text data"
}

DELETE Method Handling

If Add Delete Field is enabled and a DELETE request is received:
{
  "order_id": 12345,
  "delete": true
}
This allows downstream systems to identify and process deletions.

Topic Discovery & Auto-Creation

The webhook connector supports flexible URL paths:
  • Pre-registered Topics – Topics explicitly added in the Schema section always accept requests.
  • Dynamic Topics – New paths automatically create topics, but:
    • The topic must be added to the Schema section in advance for guaranteed routing.
    • If not pre-registered, a background topic discovery service will detect and register new topics, but this may take time (typically a few minutes).
    • To avoid delays, register all expected topics upfront.

Example Workflow

  1. You plan to receive data for orders, payments, and events.
  2. Add all three topics to the Topic Names field in the Schema section.
  3. External services can now reliably POST to /orders, /payments, or /events.
  4. If a request arrives for /refunds (not pre-registered), the connector will create the topic, but there may be a delay before it appears in your pipeline.

Request Limits & Performance

The webhook connector supports large payloads with the following defaults:
SettingDefaultPurpose
Max Initial Line Length16 KBHTTP request line size limit
Max Header Size64 KBTotal HTTP headers size limit
Max Chunk Size50 MBMaximum payload size per request
Backlog Size200Queued connections before rejection
Adjust these in advanced settings if needed, but ensure your external services respect these limits.

Data Format Examples

JSON POST to Orders Topic with Key

Request:
curl -X POST https://wh-abc123xyz-tenant-internal.streamkap.net/orders \
  -H "Content-Type: application/json" \
  -H "X-API-Key: sk-1234567890abcdef" \
  -H "key: {\"order_id\": \"ORD-001\"}" \
  -d '{
    "order_id": "ORD-001",
    "customer_id": "CUST-123",
    "amount": 150.50,
    "timestamp": "2024-01-15T10:30:00Z"
  }'
Resulting Kafka Record (orders topic):
  • Key: {"order_id": "ORD-001"}
  • Value:
{
  "order_id": "ORD-001",
  "customer_id": "CUST-123",
  "amount": 150.50,
  "timestamp": "2024-01-15T10:30:00Z"
}

String POST to Events Topic

Request:
curl -X POST https://wh-abc123xyz-tenant-internal.streamkap.net/events \
  -H "Content-Type: text/plain" \
  -H "X-API-Key: sk-1234567890abcdef" \
  -d 'User logged in from IP 192.168.1.100'
Resulting Kafka Record (events topic):
{
  "data": "User logged in from IP 192.168.1.100"
}

DELETE Request with Delete Field and Key

Request:
curl -X DELETE https://wh-abc123xyz-tenant-internal.streamkap.net/users \
  -H "X-API-Key: sk-1234567890abcdef" \
  -H "key: {\"user_id\": \"USER-456\"}" \
  -d '{"user_id": "USER-456"}'
Resulting Kafka Record (users topic, with Add Delete Field enabled):
  • Key: {"user_id": "USER-456"}
  • Value:
{
  "user_id": "USER-456",
  "delete": true
}

Common Scenarios

Scenario 1: Multi-Topic Event Ingestion with Upserts

Setup:
  • Default Topic: default_topic
  • Topic Names: orders, payments, events, notifications
  • Data Format: JSON
  • Camel Message Header Key: key
Usage:
  • Third-party service POSTs orders to .../orders with a key header containing the order ID.
  • Billing system POSTs payments to .../payments with a key header containing the payment ID.
  • Downstream sink connectors upsert records based on these keys, updating existing records instead of creating duplicates.
  • Each goes to its respective topic, enabling independent processing.

Scenario 2: Generic Event Logging

Setup:
  • Default Topic: webhook_events
  • Topic Names: (empty or just default)
  • Data Format: String
Usage:
  • All requests regardless of path go to webhook_events.
  • Useful for centralized logging of third-party webhooks.

Scenario 3: Soft Deletes with Tombstones

Setup:
  • Default Topic: users
  • Add Delete Field: Enabled
  • Data Format: JSON
  • Camel Message Header Key: key
Usage:
  • POST {"user_id": 123, "name": "Alice"} with header key: {"user_id": 123} creates a record.
  • DELETE with {"user_id": 123} and header key: {"user_id": 123} adds "delete": true to mark deletion.
  • Downstream systems can process deletes accordingly, removing records by key.

Integration Examples

Integrating Shopify Webhooks

Configure Shopify to send order events to your webhook:
Webhook URL: https://wh-abc123xyz-tenant-internal.streamkap.net/shopify_orders
Header X-API-Key: sk-1234567890abcdef
Shopify posts order data to your shopify_orders topic in real-time.

Integrating GitHub Webhooks

Configure GitHub to send push and pull request events:
Webhook URL: https://wh-abc123xyz-tenant-internal.streamkap.net/github_events
Header X-API-Key: sk-1234567890abcdef
GitHub events flow into your github_events topic.

Custom Application Integration with Keys

From your application, send data via HTTP with message keys:
import requests
import json

API_KEY = "sk-1234567890abcdef"
WEBHOOK_URL = "https://wh-abc123xyz-tenant-internal.streamkap.net/my_events"

headers = {
    "X-API-Key": API_KEY,
    "Content-Type": "application/json",
    "key": json.dumps({"event_id": "EVT-001"})
}

data = {
    "event_id": "EVT-001",
    "event_type": "purchase",
    "user_id": "USER-001",
    "amount": 99.99
}

response = requests.post(WEBHOOK_URL, json=data, headers=headers)
print(response.status_code)  # 202 Accepted

Error Handling

Status CodeMeaningAction
202 AcceptedRequest received and queued for processingSuccess; data will be in Kafka shortly
400 Bad RequestMalformed request, invalid format, or invalid JSON in key headerCheck JSON syntax, Content-Type header, and key header format
401 UnauthorizedMissing or invalid API keyVerify the API key in the X-API-Key header
413 Payload Too LargeRequest exceeds max chunk size (50 MB)Reduce payload size or split into multiple requests
429 Too Many RequestsRate limit exceededImplement backoff and retry logic
500 Internal Server ErrorConnector issueCheck connector logs; retry after a delay

Security Notes

  • API Key Protection: Treat the API key like a password. Do not commit it to version control or expose in client-side code. Store securely in environment variables or secrets management systems.
  • HTTPS Only: Always use HTTPS for webhook requests in production. Plain HTTP is not recommended.
  • Key Header Security: The key header contains sensitive identifying information. Ensure HTTPS is used to prevent interception.
  • IP Whitelisting: If available, restrict webhook access to known IP ranges of external services.
  • Payload Validation: Implement server-side validation in downstream consumers to ensure data integrity and key consistency.
  • Rate Limiting: External services should implement exponential backoff for 429 responses.

Limitations

  • No Built-in Deduplication: If external services retry requests, duplicate records may appear. Implement deduplication downstream using unique message keys or IDs.
  • Single Record per Request: The connector assumes each HTTP request contains exactly one Kafka message. Batch requests are not supported at the connector level.
  • Single-Instance Deployment: The connector runs as a single task (tasks.max=1). Horizontal scaling requires multiple connector instances.
  • No Request Buffering: Incoming requests are processed sequentially. High-volume ingestion may experience latency.
  • Topic Pre-Registration Recommended: While auto-creation works, pre-registering topics in the Schema section ensures reliable routing without delays.
  • Key Header Validation: Invalid JSON in the key header will cause the request to fail. Always validate key format before sending.

Monitoring & Troubleshooting

Checking Message Flow

Use your pipeline dashboard to monitor:
  • Events Written – Total messages ingested from the webhook
  • Volume – Data volume in bytes
  • Topic Distribution – Messages per topic

Common Issues

Issue: Requests to new topics fail or are delayed Solution: Pre-register all expected topics in the Schema section. Issue: 401 Unauthorized errors Solution: Verify the API key is included in the X-API-Key header with the correct value. Issue: 400 Bad Request with JSON format selected Solution: Ensure the request body is valid JSON and the Content-Type: application/json header is set. Verify the key header contains valid JSON. Issue: Data appears in wrong topic Solution: Check the URL path matches a registered topic name. Typos or unregistered paths may route to the default topic. Issue: Downstream upserts not working Solution: Ensure the message key is correctly extracted. Verify the key header contains valid JSON and the same key fields exist in the request body. Issue: Invalid JSON in key header Solution: Use tools like jq or online JSON validators to ensure your key JSON is well-formed. Example: key: $(echo '{"id": 123}' | jq -c .).