Skip to main content
The S3 Source connector polls an S3 bucket for new files and streams their contents into Kafka topics. It supports JSON, CSV, and Avro files with configurable polling, file cleanup policies, and dynamic topic routing based on the S3 key path.
This is a batch file source, not a CDC connector. Files are scanned at a configurable interval and each file is processed once.

Prerequisites

  • An S3 bucket containing the files to be ingested
  • AWS credentials (Access Key ID and Secret Access Key) with the following permissions on the source bucket:
    • s3:GetObject
    • s3:ListBucket
    • s3:DeleteObject — only required when using the Delete cleanup policy
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:ListBucket",
        "s3:DeleteObject"
      ],
      "Resource": [
        "arn:aws:s3:::<your-bucket-name>",
        "arn:aws:s3:::<your-bucket-name>/*"
      ]
    }
  ]
}

Supported File Formats

FormatFile ExtensionNotes
JSON.jsonJSON Lines (one object per line) or JSON arrays
CSV.csvComma-separated; headers optional
Avro.avroApache Avro binary with embedded schema
Only files matching the selected format’s extension under the configured prefix are picked up.

Streamkap Setup

Follow these steps to configure your new connector:

1. Create the Source

2. Connection Settings

  • Name: A name for your connector.
  • AWS Access Key: The AWS Access Key ID with the permissions listed in Prerequisites.
  • AWS Secret Access Key: The AWS Secret Access Key that pairs with the key above.
  • Region: The AWS region where the S3 bucket lives.
Click Next.

3. Source Settings

  • Bucket Name (required): The S3 bucket to scan.
  • File format (required): The file format to process — json, csv, or avro. Default: json.
  • CSV Has Headers: (Visible when format is CSV) Whether the first row contains column headers. When disabled, columns are auto-generated as column1, column2, etc. Default: true.
  • Object Prefix: The S3 key prefix (folder path) to scan within the bucket. Only objects under this prefix are processed. Default: file-pulse/.
  • Topic Postfix: The default topic name suffix. When Dynamic Topic Routing is disabled, all files are streamed to this single topic. When enabled, this is used as a fallback for files that do not match the routing rules. Default: default.
  • Dynamic Topic Routing: Enable to derive the Kafka topic name per file from its S3 key path. When disabled, all files go to the single default topic. Default: false. See Dynamic Topic Routing below for details.
Click Save.

4. Advanced Settings

Available under Advanced:
  • Scan Interval (ms): How often the connector polls the bucket for new files. Range: 100 – 100,000. Default: 10000 (10 seconds).
  • Cleanup Policy: What to do with a file after it has been processed:
    • Log (default): Mark the file as processed without deleting it. Tracked internally so it is not reprocessed.
    • Delete: Remove the file from S3 after successful processing. Requires s3:DeleteObject.
  • Tasks: Maximum number of parallel tasks processing files concurrently. Range: 1 – 10. Default: 5.

Dynamic Topic Routing

By default, every file is sent to a single topic (using the Topic Postfix as the topic name). Dynamic topic routing derives the topic name per file from its S3 key path, so files under different folders land in different topics. When Dynamic Topic Routing is enabled, you can choose between two modes:

Simple Mode (Folder Skip / Folder Levels)

The default mode when Dynamic Topic Routing is enabled. Uses folder position to build the topic name.
  • Folder Skip: Number of leading path segments to drop from the S3 key. Default: 0.
  • Folder Levels: Number of folder segments (after the skip) to include in the topic name, joined with dots. Default: 0.

Example

Given the S3 key data/region/us-east/customers/file.json:
Folder SkipFolder LevelsResulting Topic Suffix
01data
11region
12region.us-east
22us-east.customers
If Folder Skip + Folder Levels exceeds the number of path segments in the S3 key, the missing segments are silently ignored — the topic name will be shorter than expected rather than causing an error. For example, if the key has only 3 folders but you set Folder Skip to 1 and Folder Levels to 4, only the 2 available folders after the skip will appear in the topic name.

Advanced Mode (ScEL Expression)

Enable Use Advanced Expression to switch to a custom ScEL expression for full control over the topic suffix. This replaces the Folder Skip / Folder Levels settings. Available functions:
FunctionDescription
$metadata.nameThe full S3 key (folders + file name)
split(str, delimiter)Break a string into an array
extract_array(array, index)Pick a segment by 0-based position
replace_all(str, regex, replacement)Strip or rewrite parts of a string
concat_ws(separator, ...parts)Join multiple parts with a separator

Examples

Combine two folders. For the S3 key data/region/us-east/file.json, produce the suffix region.us-east:
concat_ws('.', extract_array(split($metadata.name, '/'), 1), extract_array(split($metadata.name, '/'), 2))
Combine a folder and the file-name prefix. For keys like data/region/us-east/orders_0001.json and data/region/us-east/products_0001.json, produce suffixes us-east.orders and us-east.products:
concat_ws('.', extract_array(split($metadata.name, '/'), 2), replace_all(extract_array(split($metadata.name, '/'), 3), '_.*', ''))

Troubleshooting

Verify the following:
  • The Bucket Name and Object Prefix are correct.
  • Files match the selected File format (e.g. .json files for JSON).
  • The AWS credentials have s3:GetObject and s3:ListBucket on the bucket.
  • New files are being placed under the configured prefix.
  • The Scan Interval (ms) is not set excessively high.
The IAM principal associated with the provided AWS credentials lacks sufficient permissions.Resolution:
  • Ensure the IAM policy grants s3:GetObject and s3:ListBucket for the bucket.
  • If using the Delete cleanup policy, also grant s3:DeleteObject.
  • Check that no bucket policy explicitly denies access.
  • Confirm the bucket is in the configured Region.
This typically means CSV Has Headers does not match the actual file content.Resolution:
  • If your CSV files have a header row, set CSV Has Headers to true.
  • If they do not, set it to false — columns are auto-named column1, column2, etc.
  • Verify the delimiter is a comma (other delimiters are not currently supported).
The connector tracks processed files in an internal status topic. Reprocessing can occur if:
  • The connector was deleted and recreated (file tracking state is lost).
  • The internal status topic was deleted.
Resolution:
  • Avoid deleting and recreating connectors for the same bucket/prefix.
  • Contact Streamkap support if you need to reset the processing state.