> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamkap.com/llms.txt
> Use this file to discover all available pages before exploring further.

# API Quickstart

> Get started with the Streamkap REST API: authentication, common workflows, and best practices.

The Streamkap REST API lets you programmatically manage sources, destinations, pipelines, and transforms. This guide covers authentication and common workflows to get you up and running quickly.

<Info>
  For full endpoint details, request/response schemas, and an interactive playground, see the [API Reference](/api-reference/authentication/access-token) tab.
</Info>

## Authentication

The Streamkap API uses bearer tokens for authentication. The flow is:

1. **Create API credentials** (Client ID and Secret) in the Streamkap UI -- see [API Tokens](/api-tokens) for instructions.
2. **Exchange credentials for an access token** by calling the `/auth/access-token` endpoint.
3. **Use the access token** as a Bearer token in the `Authorization` header for all subsequent requests.

### Get an Access Token

<CodeGroup>
  ```bash cURL theme={null}
  curl -X POST "https://api.streamkap.com/auth/access-token" \
    -H "Content-Type: application/json" \
    -d '{
      "client_id": "YOUR_CLIENT_ID",
      "secret": "YOUR_CLIENT_SECRET"
    }'
  ```

  ```python Python theme={null}
  import requests

  response = requests.post(
      "https://api.streamkap.com/auth/access-token",
      json={
          "client_id": "YOUR_CLIENT_ID",
          "secret": "YOUR_CLIENT_SECRET"
      }
  )

  data = response.json()
  access_token = data["accessToken"]
  refresh_token = data["refreshToken"]
  print(f"Access token expires: {data['expires']}")
  ```

  ```javascript JavaScript theme={null}
  const response = await fetch("https://api.streamkap.com/auth/access-token", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({
      client_id: "YOUR_CLIENT_ID",
      secret: "YOUR_CLIENT_SECRET",
    }),
  });

  const data = await response.json();
  const accessToken = data.accessToken;
  const refreshToken = data.refreshToken;
  ```

  ```go Go theme={null}
  package main

  import (
      "bytes"
      "encoding/json"
      "fmt"
      "net/http"
  )

  func main() {
      body, _ := json.Marshal(map[string]string{
          "client_id": "YOUR_CLIENT_ID",
          "secret":    "YOUR_CLIENT_SECRET",
      })

      resp, err := http.Post(
          "https://api.streamkap.com/auth/access-token",
          "application/json",
          bytes.NewBuffer(body),
      )
      if err != nil {
          panic(err)
      }
      defer resp.Body.Close()

      var data map[string]interface{}
      json.NewDecoder(resp.Body).Decode(&data)
      fmt.Println("Access token:", data["accessToken"])
  }
  ```
</CodeGroup>

The response includes:

| Field          | Description                              |
| -------------- | ---------------------------------------- |
| `accessToken`  | Bearer token to use in API requests      |
| `refreshToken` | Token to obtain a new access token       |
| `expires`      | Expiration timestamp of the access token |
| `expiresIn`    | Seconds until the access token expires   |

### Make Authenticated Requests

Include the access token in the `Authorization` header for all API calls:

<CodeGroup>
  ```bash cURL theme={null}
  curl -X GET "https://api.streamkap.com/sources" \
    -H "Authorization: Bearer YOUR_ACCESS_TOKEN"
  ```

  ```python Python theme={null}
  import requests

  headers = {"Authorization": "Bearer YOUR_ACCESS_TOKEN"}
  response = requests.get("https://api.streamkap.com/sources", headers=headers)
  sources = response.json()
  ```

  ```javascript JavaScript theme={null}
  const response = await fetch("https://api.streamkap.com/sources", {
    headers: { Authorization: "Bearer YOUR_ACCESS_TOKEN" },
  });
  const sources = await response.json();
  ```

  ```go Go theme={null}
  req, _ := http.NewRequest("GET", "https://api.streamkap.com/sources", nil)
  req.Header.Set("Authorization", "Bearer YOUR_ACCESS_TOKEN")
  resp, err := http.DefaultClient.Do(req)
  ```
</CodeGroup>

### Refresh an Access Token

When your access token expires, use the refresh token to obtain a new one without re-authenticating with your client credentials:

```bash theme={null}
curl -X POST "https://api.streamkap.com/auth/access-token/refresh" \
  -H "Content-Type: application/json" \
  -d '{
    "refresh_token": "YOUR_REFRESH_TOKEN"
  }'
```

***

## Common Workflows

### 1. Create a Complete Pipeline

Build an end-to-end CDC pipeline by creating a source, destination, and then linking them in a pipeline.

<Steps>
  <Step title="Create a source connector">
    Create a source by specifying the connector type and its configuration. The `connector` field identifies the source type (e.g., `postgresql`, `mysql`, `mongodb`), and `config` contains connector-specific settings.

    ```bash theme={null}
    curl -X POST "https://api.streamkap.com/sources" \
      -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
      -H "Content-Type: application/json" \
      -d '{
        "name": "my-postgresql-source",
        "connector": "postgresql",
        "config": {
          "database.hostname": "db.example.com",
          "database.port": "5432",
          "database.user": "cdc_user",
          "database.password": "YOUR_DB_PASSWORD",
          "database.dbname": "my_database",
          "table.include.list": "public.orders,public.customers"
        }
      }'
    ```

    Save the returned `id` value -- you will need it when creating the pipeline.
  </Step>

  <Step title="Create a destination connector">
    Create a destination where your CDC data will be delivered. As with sources, the `connector` field identifies the destination type and `config` holds its settings.

    ```bash theme={null}
    curl -X POST "https://api.streamkap.com/destinations" \
      -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
      -H "Content-Type: application/json" \
      -d '{
        "name": "my-snowflake-destination",
        "connector": "snowflake",
        "config": {
          "snowflake.url.name": "your-account.snowflakecomputing.com",
          "snowflake.user.name": "STREAMKAP_USER",
          "snowflake.private.key": "YOUR_PRIVATE_KEY",
          "snowflake.database.name": "MY_DATABASE",
          "snowflake.schema.name": "PUBLIC"
        }
      }'
    ```

    Save the returned `id` value for the next step.
  </Step>

  <Step title="Create a pipeline">
    Link the source and destination together by referencing their IDs.

    ```bash theme={null}
    curl -X POST "https://api.streamkap.com/pipelines" \
      -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
      -H "Content-Type: application/json" \
      -d '{
        "name": "my-cdc-pipeline",
        "source": {
          "id": "SOURCE_ID_HERE",
          "connector": "postgresql",
          "name": "my-postgresql-source"
        },
        "destination": {
          "id": "DESTINATION_ID_HERE",
          "connector": "snowflake",
          "name": "my-snowflake-destination"
        },
        "snapshot_new_tables": true,
        "tags": []
      }'
    ```

    The pipeline starts automatically after creation. The source will begin an initial snapshot of the configured tables.
  </Step>
</Steps>

<Note>
  Connector configurations vary by type. Refer to the source and destination documentation pages for the full list of configuration options for each connector.
</Note>

### 2. Monitor Pipeline Health

Check the status and performance of your running pipelines.

<Steps>
  <Step title="Get pipeline details">
    Retrieve the current state of a pipeline:

    ```bash theme={null}
    curl -X GET "https://api.streamkap.com/pipelines/PIPELINE_ID" \
      -H "Authorization: Bearer YOUR_ACCESS_TOKEN"
    ```
  </Step>

  <Step title="Check pipeline metrics">
    Get latency, throughput, and lag metrics for a pipeline:

    ```bash theme={null}
    curl -X GET "https://api.streamkap.com/pipelines/PIPELINE_ID/metrics" \
      -H "Authorization: Bearer YOUR_ACCESS_TOKEN"
    ```

    Key metrics to watch:

    * **latency** -- End-to-end delay from source change to destination delivery
    * **recordsLag** -- Number of records waiting to be processed
  </Step>

  <Step title="Monitor consumer group lag">
    List consumer groups and inspect their lag to identify bottlenecks:

    ```bash theme={null}
    # List all consumer groups
    curl -X GET "https://api.streamkap.com/consumer-groups" \
      -H "Authorization: Bearer YOUR_ACCESS_TOKEN"

    # Get details for a specific consumer group
    curl -X GET "https://api.streamkap.com/consumer-groups/GROUP_ID" \
      -H "Authorization: Bearer YOUR_ACCESS_TOKEN"
    ```
  </Step>

  <Step title="Check source metrics and snapshot status">
    Monitor source connector health and snapshot progress:

    ```bash theme={null}
    curl -X GET "https://api.streamkap.com/sources/SOURCE_ID/metrics" \
      -H "Authorization: Bearer YOUR_ACCESS_TOKEN"
    ```

    The response includes snapshot-related fields such as `snapshotStatus`, `snapshotState`, `SnapshotCompleted`, and `SnapshotRunning`.
  </Step>
</Steps>

### 3. Trigger a Snapshot

Trigger an incremental snapshot to re-read data from specific tables without resetting the entire source connector.

<Steps>
  <Step title="Execute an incremental snapshot">
    Specify the source ID and optionally the topic names (tables) to snapshot. If `topic_names` is omitted, all tables configured on the source will be snapshotted.

    ```bash theme={null}
    curl -X POST "https://api.streamkap.com/sources/execute_incremental_snapshot" \
      -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
      -H "Content-Type: application/json" \
      -d '{
        "source_id": "SOURCE_ID_HERE",
        "topic_names": ["my_database.public.orders"]
      }'
    ```

    You can also include optional `additional_conditions` to filter which rows are snapshotted:

    ```bash theme={null}
    curl -X POST "https://api.streamkap.com/sources/execute_incremental_snapshot" \
      -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
      -H "Content-Type: application/json" \
      -d '{
        "source_id": "SOURCE_ID_HERE",
        "topic_names": ["my_database.public.orders"],
        "additional_conditions": [
          {
            "data_collection": "public.orders",
            "filter": "created_at > '\''2025-01-01'\''"
          }
        ]
      }'
    ```
  </Step>

  <Step title="Monitor snapshot progress">
    Check the source metrics to track the snapshot:

    ```bash theme={null}
    curl -X GET "https://api.streamkap.com/sources/SOURCE_ID/metrics" \
      -H "Authorization: Bearer YOUR_ACCESS_TOKEN"
    ```

    Look for `SnapshotRunning` (number of tables currently being snapshotted) and `SnapshotCompleted` (number of tables that have finished).
  </Step>

  <Step title="Stop a snapshot (if needed)">
    To cancel a running incremental snapshot:

    ```bash theme={null}
    curl -X POST "https://api.streamkap.com/sources/stop_incremental_snapshot" \
      -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
      -H "Content-Type: application/json" \
      -d '{
        "source_id": "SOURCE_ID_HERE"
      }'
    ```
  </Step>
</Steps>

### 4. Add Tables to an Existing Source

Add new tables to a running source without modifying the original table list, using the `table.include.list.user.defined` configuration parameter.

<Steps>
  <Step title="Get the current source configuration">
    ```bash theme={null}
    curl -X GET "https://api.streamkap.com/sources/SOURCE_ID" \
      -H "Authorization: Bearer YOUR_ACCESS_TOKEN"
    ```

    Note the current `name`, `connector`, and `config` values from the response.
  </Step>

  <Step title="Update the source with new tables">
    Use `PUT /sources/{source_id}` to update the source configuration. Add the new tables via the `table.include.list.user.defined` parameter while keeping all existing configuration values.

    ```bash theme={null}
    curl -X PUT "https://api.streamkap.com/sources/SOURCE_ID" \
      -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
      -H "Content-Type: application/json" \
      -d '{
        "name": "my-postgresql-source",
        "connector": "postgresql",
        "config": {
          "database.hostname": "db.example.com",
          "database.port": "5432",
          "database.user": "cdc_user",
          "database.password": "YOUR_DB_PASSWORD",
          "database.dbname": "my_database",
          "table.include.list": "public.orders,public.customers",
          "table.include.list.user.defined": "public.products,public.inventory"
        }
      }'
    ```
  </Step>
</Steps>

<Tip>
  For a Terraform-based approach to adding tables, see [Adding Tables to an Existing Source](/terraform-resources#adding-tables-to-an-existing-source).
</Tip>

***

## API Response Patterns

### Standard Responses

All API responses return JSON. Successful responses typically include the resource object directly:

```json theme={null}
{
  "id": "abc123",
  "name": "my-postgresql-source",
  "connector": "postgresql",
  "config": { ... },
  "created_at": "2025-06-15T10:30:00Z"
}
```

### Paginated Responses

List endpoints support pagination via `page` and `page_size` query parameters:

```bash theme={null}
# Get page 2 with 20 items per page
curl -X GET "https://api.streamkap.com/sources?page=2&page_size=20" \
  -H "Authorization: Bearer YOUR_ACCESS_TOKEN"
```

| Parameter   | Default | Description                             |
| ----------- | ------- | --------------------------------------- |
| `page`      | `1`     | Page number (1-indexed)                 |
| `page_size` | `10`    | Number of items per page                |
| `sort`      | --      | Sort field (e.g., `name`, `created_at`) |
| `sort_dir`  | `asc`   | Sort direction: `asc` or `desc`         |

### Error Responses

Validation errors return a `422` status code with details about the issue:

```json theme={null}
{
  "detail": [
    {
      "loc": ["body", "name"],
      "msg": "field required",
      "type": "value_error.missing"
    }
  ]
}
```

Other common HTTP status codes:

| Status | Meaning                                      |
| ------ | -------------------------------------------- |
| `200`  | Success                                      |
| `401`  | Unauthorized -- invalid or expired token     |
| `404`  | Resource not found                           |
| `422`  | Validation error -- check the `detail` field |

<Info>
  Detailed request/response schemas for every endpoint are available in the [API Reference](/api-reference/authentication/access-token).
</Info>

***

## Rate Limits and Best Practices

<Note>
  API rate limits are managed by Streamkap and are designed to accommodate normal usage patterns. If you encounter `429 Too Many Requests` responses or believe your use case requires higher limits, contact [Streamkap support](mailto:support@streamkap.com) with details about your integration and expected request volume.
</Note>

<Info>
  **API versioning**

  The Streamkap API is currently unversioned. All endpoints use a single base URL (`https://api.streamkap.com`). If breaking changes are introduced in the future, they will be communicated in advance through release notes and direct customer notification, with a deprecation period to allow migration.
</Info>

* **Use pagination** for list endpoints. Avoid fetching all resources in a single request by using `page` and `page_size` parameters.
* **Cache responses** where appropriate. Source and destination configurations change infrequently, so you can safely cache them for short periods.
* **Handle retries with exponential backoff.** If a request fails with a `5xx` status code, wait before retrying. Start with a 1-second delay and double it on each retry, up to a maximum of 30 seconds.
* **Refresh tokens proactively.** Use the refresh token endpoint before your access token expires rather than waiting for a `401` error.
* **Minimize unnecessary polling.** When monitoring pipeline health, use a reasonable interval (e.g., every 30--60 seconds) rather than polling continuously.

***

## SDKs and Tools

### Terraform Provider

The [Streamkap Terraform Provider](/streamkap-provider-for-terraform) enables infrastructure-as-code management of sources, destinations, pipelines, and transforms. See the full [Terraform Configuration guide](/terraform-configuration) to get started.

### OpenAPI Specification

The Streamkap API is documented with an OpenAPI 3.1 specification. You can use the spec to generate client libraries for any language using tools like [OpenAPI Generator](https://openapi-generator.tech/) or [Speakeasy](https://www.speakeasy.com/).

The spec is accessible in the [API Reference](/api-reference/authentication/access-token) tab and powers the interactive endpoint playground.

***

## Next Steps

* [API Tokens](/api-tokens) -- Create and manage API credentials
* [API Reference](/api-reference/authentication/access-token) -- Full interactive endpoint documentation
* [Terraform Provider](/streamkap-provider-for-terraform) -- Manage infrastructure as code
* [Terraform Resources](/terraform-resources) -- Source, destination, and pipeline resource examples
* [Snapshots](/snapshots) -- Learn how initial and incremental snapshots work
