Skip to main content
The Streamkap REST API lets you programmatically manage sources, destinations, pipelines, and transforms. This guide covers authentication and common workflows to get you up and running quickly.
For full endpoint details, request/response schemas, and an interactive playground, see the API Reference tab.

Authentication

The Streamkap API uses bearer tokens for authentication. The flow is:
  1. Create API credentials (Client ID and Secret) in the Streamkap UI — see API Tokens for instructions.
  2. Exchange credentials for an access token by calling the /auth/access-token endpoint.
  3. Use the access token as a Bearer token in the Authorization header for all subsequent requests.

Get an Access Token

curl -X POST "https://api.streamkap.com/auth/access-token" \
  -H "Content-Type: application/json" \
  -d '{
    "client_id": "YOUR_CLIENT_ID",
    "secret": "YOUR_CLIENT_SECRET"
  }'
The response includes:
FieldDescription
accessTokenBearer token to use in API requests
refreshTokenToken to obtain a new access token
expiresExpiration timestamp of the access token
expiresInSeconds until the access token expires

Make Authenticated Requests

Include the access token in the Authorization header for all API calls:
curl -X GET "https://api.streamkap.com/sources" \
  -H "Authorization: Bearer YOUR_ACCESS_TOKEN"

Refresh an Access Token

When your access token expires, use the refresh token to obtain a new one without re-authenticating with your client credentials:
curl -X POST "https://api.streamkap.com/auth/access-token/refresh" \
  -H "Content-Type: application/json" \
  -d '{
    "refresh_token": "YOUR_REFRESH_TOKEN"
  }'

Common Workflows

1. Create a Complete Pipeline

Build an end-to-end CDC pipeline by creating a source, destination, and then linking them in a pipeline.
1

Create a source connector

Create a source by specifying the connector type and its configuration. The connector field identifies the source type (e.g., postgresql, mysql, mongodb), and config contains connector-specific settings.
curl -X POST "https://api.streamkap.com/sources" \
  -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "my-postgresql-source",
    "connector": "postgresql",
    "config": {
      "database.hostname": "db.example.com",
      "database.port": "5432",
      "database.user": "cdc_user",
      "database.password": "YOUR_DB_PASSWORD",
      "database.dbname": "my_database",
      "table.include.list": "public.orders,public.customers"
    }
  }'
Save the returned id value — you will need it when creating the pipeline.
2

Create a destination connector

Create a destination where your CDC data will be delivered. As with sources, the connector field identifies the destination type and config holds its settings.
curl -X POST "https://api.streamkap.com/destinations" \
  -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "my-snowflake-destination",
    "connector": "snowflake",
    "config": {
      "snowflake.url.name": "your-account.snowflakecomputing.com",
      "snowflake.user.name": "STREAMKAP_USER",
      "snowflake.private.key": "YOUR_PRIVATE_KEY",
      "snowflake.database.name": "MY_DATABASE",
      "snowflake.schema.name": "PUBLIC"
    }
  }'
Save the returned id value for the next step.
3

Create a pipeline

Link the source and destination together by referencing their IDs.
curl -X POST "https://api.streamkap.com/pipelines" \
  -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "my-cdc-pipeline",
    "source": {
      "id": "SOURCE_ID_HERE",
      "connector": "postgresql",
      "name": "my-postgresql-source"
    },
    "destination": {
      "id": "DESTINATION_ID_HERE",
      "connector": "snowflake",
      "name": "my-snowflake-destination"
    },
    "snapshot_new_tables": true,
    "tags": []
  }'
The pipeline starts automatically after creation. The source will begin an initial snapshot of the configured tables.
Connector configurations vary by type. Refer to the source and destination documentation pages for the full list of configuration options for each connector.

2. Monitor Pipeline Health

Check the status and performance of your running pipelines.
1

Get pipeline details

Retrieve the current state of a pipeline:
curl -X GET "https://api.streamkap.com/pipelines/PIPELINE_ID" \
  -H "Authorization: Bearer YOUR_ACCESS_TOKEN"
2

Check pipeline metrics

Get latency, throughput, and lag metrics for a pipeline:
curl -X GET "https://api.streamkap.com/pipelines/PIPELINE_ID/metrics" \
  -H "Authorization: Bearer YOUR_ACCESS_TOKEN"
Key metrics to watch:
  • latency — End-to-end delay from source change to destination delivery
  • recordsLag — Number of records waiting to be processed
3

Monitor consumer group lag

List consumer groups and inspect their lag to identify bottlenecks:
# List all consumer groups
curl -X GET "https://api.streamkap.com/consumer-groups" \
  -H "Authorization: Bearer YOUR_ACCESS_TOKEN"

# Get details for a specific consumer group
curl -X GET "https://api.streamkap.com/consumer-groups/GROUP_ID" \
  -H "Authorization: Bearer YOUR_ACCESS_TOKEN"
4

Check source metrics and snapshot status

Monitor source connector health and snapshot progress:
curl -X GET "https://api.streamkap.com/sources/SOURCE_ID/metrics" \
  -H "Authorization: Bearer YOUR_ACCESS_TOKEN"
The response includes snapshot-related fields such as snapshotStatus, snapshotState, SnapshotCompleted, and SnapshotRunning.

3. Trigger a Snapshot

Trigger an incremental snapshot to re-read data from specific tables without resetting the entire source connector.
1

Execute an incremental snapshot

Specify the source ID and optionally the topic names (tables) to snapshot. If topic_names is omitted, all tables configured on the source will be snapshotted.
curl -X POST "https://api.streamkap.com/sources/execute_incremental_snapshot" \
  -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "source_id": "SOURCE_ID_HERE",
    "topic_names": ["my_database.public.orders"]
  }'
You can also include optional additional_conditions to filter which rows are snapshotted:
curl -X POST "https://api.streamkap.com/sources/execute_incremental_snapshot" \
  -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "source_id": "SOURCE_ID_HERE",
    "topic_names": ["my_database.public.orders"],
    "additional_conditions": [
      {
        "data_collection": "public.orders",
        "filter": "created_at > '\''2025-01-01'\''"
      }
    ]
  }'
2

Monitor snapshot progress

Check the source metrics to track the snapshot:
curl -X GET "https://api.streamkap.com/sources/SOURCE_ID/metrics" \
  -H "Authorization: Bearer YOUR_ACCESS_TOKEN"
Look for SnapshotRunning (number of tables currently being snapshotted) and SnapshotCompleted (number of tables that have finished).
3

Stop a snapshot (if needed)

To cancel a running incremental snapshot:
curl -X POST "https://api.streamkap.com/sources/stop_incremental_snapshot" \
  -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "source_id": "SOURCE_ID_HERE"
  }'

4. Add Tables to an Existing Source

Add new tables to a running source without modifying the original table list, using the table.include.list.user.defined configuration parameter.
1

Get the current source configuration

curl -X GET "https://api.streamkap.com/sources/SOURCE_ID" \
  -H "Authorization: Bearer YOUR_ACCESS_TOKEN"
Note the current name, connector, and config values from the response.
2

Update the source with new tables

Use PUT /sources/{source_id} to update the source configuration. Add the new tables via the table.include.list.user.defined parameter while keeping all existing configuration values.
curl -X PUT "https://api.streamkap.com/sources/SOURCE_ID" \
  -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "my-postgresql-source",
    "connector": "postgresql",
    "config": {
      "database.hostname": "db.example.com",
      "database.port": "5432",
      "database.user": "cdc_user",
      "database.password": "YOUR_DB_PASSWORD",
      "database.dbname": "my_database",
      "table.include.list": "public.orders,public.customers",
      "table.include.list.user.defined": "public.products,public.inventory"
    }
  }'
For a Terraform-based approach to adding tables, see Adding Tables to an Existing Source.

API Response Patterns

Standard Responses

All API responses return JSON. Successful responses typically include the resource object directly:
{
  "id": "abc123",
  "name": "my-postgresql-source",
  "connector": "postgresql",
  "config": { ... },
  "created_at": "2025-06-15T10:30:00Z"
}

Paginated Responses

List endpoints support pagination via page and page_size query parameters:
# Get page 2 with 20 items per page
curl -X GET "https://api.streamkap.com/sources?page=2&page_size=20" \
  -H "Authorization: Bearer YOUR_ACCESS_TOKEN"
ParameterDefaultDescription
page1Page number (1-indexed)
page_size10Number of items per page
sortSort field (e.g., name, created_at)
sort_dirascSort direction: asc or desc

Error Responses

Validation errors return a 422 status code with details about the issue:
{
  "detail": [
    {
      "loc": ["body", "name"],
      "msg": "field required",
      "type": "value_error.missing"
    }
  ]
}
Other common HTTP status codes:
StatusMeaning
200Success
401Unauthorized — invalid or expired token
404Resource not found
422Validation error — check the detail field
Detailed request/response schemas for every endpoint are available in the API Reference.

Rate Limits and Best Practices

API rate limits are managed by Streamkap and are designed to accommodate normal usage patterns. If you encounter 429 Too Many Requests responses or believe your use case requires higher limits, contact Streamkap support with details about your integration and expected request volume.
API versioningThe Streamkap API is currently unversioned. All endpoints use a single base URL (https://api.streamkap.com). If breaking changes are introduced in the future, they will be communicated in advance through release notes and direct customer notification, with a deprecation period to allow migration.
  • Use pagination for list endpoints. Avoid fetching all resources in a single request by using page and page_size parameters.
  • Cache responses where appropriate. Source and destination configurations change infrequently, so you can safely cache them for short periods.
  • Handle retries with exponential backoff. If a request fails with a 5xx status code, wait before retrying. Start with a 1-second delay and double it on each retry, up to a maximum of 30 seconds.
  • Refresh tokens proactively. Use the refresh token endpoint before your access token expires rather than waiting for a 401 error.
  • Minimize unnecessary polling. When monitoring pipeline health, use a reasonable interval (e.g., every 30—60 seconds) rather than polling continuously.

SDKs and Tools

Terraform Provider

The Streamkap Terraform Provider enables infrastructure-as-code management of sources, destinations, pipelines, and transforms. See the full Terraform Configuration guide to get started.

OpenAPI Specification

The Streamkap API is documented with an OpenAPI 3.1 specification. You can use the spec to generate client libraries for any language using tools like OpenAPI Generator or Speakeasy. The spec is accessible in the API Reference tab and powers the interactive endpoint playground.

Next Steps