This is a batch file source, not a CDC connector. Files are scanned at a configurable interval and each file is processed once.
Prerequisites
- An S3 bucket containing the files to be ingested
- AWS credentials (Access Key ID and Secret Access Key) with the following permissions on the source bucket:
s3:GetObjects3:ListBuckets3:DeleteObject— only required when using the Delete cleanup policy
Supported File Formats
| Format | File Extension | Notes |
|---|---|---|
| JSON | .json | JSON Lines (one object per line) or JSON arrays |
| CSV | .csv | Comma-separated; headers optional |
| Avro | .avro | Apache Avro binary with embedded schema |
Streamkap Setup
Follow these steps to configure your new connector:1. Create the Source
- Navigate to Add Connectors.
- Choose S3.
2. Connection Settings
- Name: A name for your connector.
- AWS Access Key: The AWS Access Key ID with the permissions listed in Prerequisites.
- AWS Secret Access Key: The AWS Secret Access Key that pairs with the key above.
- Region: The AWS region where the S3 bucket lives.
3. Source Settings
- Bucket Name (required): The S3 bucket to scan.
-
File format (required): The file format to process —
json,csv, oravro. Default:json. -
CSV Has Headers: (Visible when format is CSV) Whether the first row contains column headers. When disabled, columns are auto-generated as
column1,column2, etc. Default:true. -
Object Prefix: The S3 key prefix (folder path) to scan within the bucket. Only objects under this prefix are processed. Default:
file-pulse/. -
Topic Postfix: The default topic name suffix. When Dynamic Topic Routing is disabled, all files are streamed to this single topic. When enabled, this is used as a fallback for files that do not match the routing rules. Default:
default. -
Dynamic Topic Routing: Enable to derive the Kafka topic name per file from its S3 key path. When disabled, all files go to the single default topic. Default:
false. See Dynamic Topic Routing below for details.
4. Advanced Settings
Available under Advanced:-
Scan Interval (ms): How often the connector polls the bucket for new files. Range: 100 – 100,000. Default:
10000(10 seconds). -
Cleanup Policy: What to do with a file after it has been processed:
- Log (default): Mark the file as processed without deleting it. Tracked internally so it is not reprocessed.
- Delete: Remove the file from S3 after successful processing. Requires
s3:DeleteObject.
-
Tasks: Maximum number of parallel tasks processing files concurrently. Range: 1 – 10. Default:
5.
Dynamic Topic Routing
By default, every file is sent to a single topic (using the Topic Postfix as the topic name). Dynamic topic routing derives the topic name per file from its S3 key path, so files under different folders land in different topics. When Dynamic Topic Routing is enabled, you can choose between two modes:Simple Mode (Folder Skip / Folder Levels)
The default mode when Dynamic Topic Routing is enabled. Uses folder position to build the topic name.-
Folder Skip: Number of leading path segments to drop from the S3 key. Default:
0. -
Folder Levels: Number of folder segments (after the skip) to include in the topic name, joined with dots. Default:
0.
Example
Given the S3 keydata/region/us-east/customers/file.json:
| Folder Skip | Folder Levels | Resulting Topic Suffix |
|---|---|---|
| 0 | 1 | data |
| 1 | 1 | region |
| 1 | 2 | region.us-east |
| 2 | 2 | us-east.customers |
If Folder Skip + Folder Levels exceeds the number of path segments in the S3 key, the missing segments are silently ignored — the topic name will be shorter than expected rather than causing an error. For example, if the key has only 3 folders but you set Folder Skip to 1 and Folder Levels to 4, only the 2 available folders after the skip will appear in the topic name.
Advanced Mode (ScEL Expression)
Enable Use Advanced Expression to switch to a custom ScEL expression for full control over the topic suffix. This replaces the Folder Skip / Folder Levels settings. Available functions:| Function | Description |
|---|---|
$metadata.name | The full S3 key (folders + file name) |
split(str, delimiter) | Break a string into an array |
extract_array(array, index) | Pick a segment by 0-based position |
replace_all(str, regex, replacement) | Strip or rewrite parts of a string |
concat_ws(separator, ...parts) | Join multiple parts with a separator |
Examples
Combine two folders. For the S3 keydata/region/us-east/file.json, produce the suffix region.us-east:
data/region/us-east/orders_0001.json and data/region/us-east/products_0001.json, produce suffixes us-east.orders and us-east.products:
Troubleshooting
Files are not being picked up
Files are not being picked up
Verify the following:
- The Bucket Name and Object Prefix are correct.
- Files match the selected File format (e.g.
.jsonfiles for JSON). - The AWS credentials have
s3:GetObjectands3:ListBucketon the bucket. - New files are being placed under the configured prefix.
- The Scan Interval (ms) is not set excessively high.
Access Denied errors
Access Denied errors
The IAM principal associated with the provided AWS credentials lacks sufficient permissions.Resolution:
- Ensure the IAM policy grants
s3:GetObjectands3:ListBucketfor the bucket. - If using the Delete cleanup policy, also grant
s3:DeleteObject. - Check that no bucket policy explicitly denies access.
- Confirm the bucket is in the configured Region.
CSV columns are unnamed or misaligned
CSV columns are unnamed or misaligned
This typically means CSV Has Headers does not match the actual file content.Resolution:
- If your CSV files have a header row, set CSV Has Headers to
true. - If they do not, set it to
false— columns are auto-namedcolumn1,column2, etc. - Verify the delimiter is a comma (other delimiters are not currently supported).
Files are being reprocessed
Files are being reprocessed
The connector tracks processed files in an internal status topic. Reprocessing can occur if:
- The connector was deleted and recreated (file tracking state is lost).
- The internal status topic was deleted.
- Avoid deleting and recreating connectors for the same bucket/prefix.
- Contact Streamkap support if you need to reset the processing state.