> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamkap.com/llms.txt
> Use this file to discover all available pages before exploring further.

# S3 Source

> Stream files from Amazon S3 into Streamkap

The S3 Source connector polls an S3 bucket for new files and streams their contents into Kafka topics. It supports JSON, CSV, and Avro files with configurable polling, file cleanup policies, and dynamic topic routing based on the S3 key path.

<Info>
  This is a **batch file source**, not a CDC connector. Files are scanned at a configurable interval and each file is processed once.
</Info>

## Prerequisites

* An S3 bucket containing the files to be ingested
* AWS credentials (Access Key ID and Secret Access Key) with the following permissions on the source bucket:
  * `s3:GetObject`
  * `s3:ListBucket`
  * `s3:DeleteObject` — only required when using the **Delete** cleanup policy

<CodeGroup>
  ```JSON Recommended IAM Policy theme={null}
  {
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "s3:GetObject",
          "s3:ListBucket",
          "s3:DeleteObject"
        ],
        "Resource": [
          "arn:aws:s3:::<your-bucket-name>",
          "arn:aws:s3:::<your-bucket-name>/*"
        ]
      }
    ]
  }
  ```
</CodeGroup>

## Supported File Formats

| Format | File Extension | Notes                                           |
| ------ | -------------- | ----------------------------------------------- |
| JSON   | `.json`        | JSON Lines (one object per line) or JSON arrays |
| CSV    | `.csv`         | Comma-separated; headers optional               |
| Avro   | `.avro`        | Apache Avro binary with embedded schema         |

Only files matching the selected format's extension under the configured prefix are picked up.

***

## Streamkap Setup

Follow these steps to configure your new connector:

### 1. Create the Source

* Navigate to [Add Connectors](https://app.streamkap.com/connectors/add?tab=Sources).
* Choose **S3**.

### 2. Connection Settings

* **Name**: A name for your connector.

* **AWS Access Key**: The AWS Access Key ID with the permissions listed in [Prerequisites](#prerequisites).

* **AWS Secret Access Key**: The AWS Secret Access Key that pairs with the key above.

* **Region**: The AWS region where the S3 bucket lives.

Click **Next**.

### 3. Source Settings

* **Bucket Name** (required): The S3 bucket to scan.

* **File format** (required): The file format to process — `json`, `csv`, or `avro`. Default: `json`.

* **CSV Has Headers**: (Visible when format is CSV) Whether the first row contains column headers. When disabled, columns are auto-generated as `column1`, `column2`, etc. Default: `true`.

* **Object Prefix**: The S3 key prefix (folder path) to scan within the bucket. Only objects under this prefix are processed. Default: `file-pulse/`.

* **Topic Postfix**: The default topic name suffix. When Dynamic Topic Routing is disabled, all files are streamed to this single topic. When enabled, this is used as a fallback for files that do not match the routing rules. Default: `default`.

* **Dynamic Topic Routing**: Enable to derive the Kafka topic name per file from its S3 key path. When disabled, all files go to the single default topic. Default: `false`. See [Dynamic Topic Routing](#dynamic-topic-routing) below for details.

Click **Save**.

### 4. Advanced Settings

Available under **Advanced**:

* **Scan Interval (ms)**: How often the connector polls the bucket for new files. Range: 100 – 100,000. Default: `10000` (10 seconds).

* **Cleanup Policy**: What to do with a file after it has been processed:
  * **Log** (default): Mark the file as processed without deleting it. Tracked internally so it is not reprocessed.
  * **Delete**: Remove the file from S3 after successful processing. Requires `s3:DeleteObject`.

* **Tasks**: Maximum number of parallel tasks processing files concurrently. Range: 1 – 10. Default: `5`.

## Dynamic Topic Routing

By default, every file is sent to a single topic (using the **Topic Postfix** as the topic name). Dynamic topic routing derives the topic name per file from its S3 key path, so files under different folders land in different topics.

When **Dynamic Topic Routing** is enabled, you can choose between two modes:

### Simple Mode (Folder Skip / Folder Levels)

The default mode when Dynamic Topic Routing is enabled. Uses folder position to build the topic name.

* **Folder Skip**: Number of leading path segments to drop from the S3 key. Default: `0`.

* **Folder Levels**: Number of folder segments (after the skip) to include in the topic name, joined with dots. Default: `0`.

#### Example

Given the S3 key `data/region/us-east/customers/file.json`:

| Folder Skip | Folder Levels | Resulting Topic Suffix |
| ----------- | ------------- | ---------------------- |
| 0           | 1             | `data`                 |
| 1           | 1             | `region`               |
| 1           | 2             | `region.us-east`       |
| 2           | 2             | `us-east.customers`    |

<Info>
  If **Folder Skip** + **Folder Levels** exceeds the number of path segments in the S3 key, the missing segments are silently ignored — the topic name will be shorter than expected rather than causing an error. For example, if the key has only 3 folders but you set Folder Skip to 1 and Folder Levels to 4, only the 2 available folders after the skip will appear in the topic name.
</Info>

### Advanced Mode (ScEL Expression)

Enable **Use Advanced Expression** to switch to a custom ScEL expression for full control over the topic suffix. This replaces the Folder Skip / Folder Levels settings.

Available functions:

| Function                               | Description                           |
| -------------------------------------- | ------------------------------------- |
| `$metadata.name`                       | The full S3 key (folders + file name) |
| `split(str, delimiter)`                | Break a string into an array          |
| `extract_array(array, index)`          | Pick a segment by 0-based position    |
| `replace_all(str, regex, replacement)` | Strip or rewrite parts of a string    |
| `concat_ws(separator, ...parts)`       | Join multiple parts with a separator  |

#### Examples

**Combine two folders.** For the S3 key `data/region/us-east/file.json`, produce the suffix `region.us-east`:

```
concat_ws('.', extract_array(split($metadata.name, '/'), 1), extract_array(split($metadata.name, '/'), 2))
```

**Combine a folder and the file-name prefix.** For keys like `data/region/us-east/orders_0001.json` and `data/region/us-east/products_0001.json`, produce suffixes `us-east.orders` and `us-east.products`:

```
concat_ws('.', extract_array(split($metadata.name, '/'), 2), replace_all(extract_array(split($metadata.name, '/'), 3), '_.*', ''))
```

## Troubleshooting

<AccordionGroup>
  <Accordion title="Files are not being picked up">
    Verify the following:

    * The **Bucket Name** and **Object Prefix** are correct.
    * Files match the selected **File format** (e.g. `.json` files for JSON).
    * The AWS credentials have `s3:GetObject` and `s3:ListBucket` on the bucket.
    * New files are being placed under the configured prefix.
    * The **Scan Interval (ms)** is not set excessively high.
  </Accordion>

  <Accordion title="Access Denied errors">
    The IAM principal associated with the provided AWS credentials lacks sufficient permissions.

    **Resolution:**

    * Ensure the IAM policy grants `s3:GetObject` and `s3:ListBucket` for the bucket.
    * If using the **Delete** cleanup policy, also grant `s3:DeleteObject`.
    * Check that no bucket policy explicitly denies access.
    * Confirm the bucket is in the configured **Region**.
  </Accordion>

  <Accordion title="CSV columns are unnamed or misaligned">
    This typically means **CSV Has Headers** does not match the actual file content.

    **Resolution:**

    * If your CSV files have a header row, set **CSV Has Headers** to `true`.
    * If they do not, set it to `false` — columns are auto-named `column1`, `column2`, etc.
    * Verify the delimiter is a comma (other delimiters are not currently supported).
  </Accordion>

  <Accordion title="Files are being reprocessed">
    The connector tracks processed files in an internal status topic. Reprocessing can occur if:

    * The connector was deleted and recreated (file tracking state is lost).
    * The internal status topic was deleted.

    **Resolution:**

    * Avoid deleting and recreating connectors for the same bucket/prefix.
    * Contact [Streamkap support](mailto:support@streamkap.com) if you need to reset the processing state.
  </Accordion>
</AccordionGroup>
