Overview
Use the Weaviate Sink Destination to stream data from your Kafka topics into Weaviate collections. This connector is useful for building real-time vector databases, enabling semantic search, and maintaining synchronized vector embeddings. Whether you’re vectorizing text data with Weaviate’s built-in vectorizers or bringing pre-computed embeddings, this connector handles the entire ingestion pipeline.Prerequisites
- A Weaviate instance (self-hosted or Weaviate Cloud Service)
- Connection details: REST URL and gRPC URL
- Authentication credentials (if required by your Weaviate instance)
- Understanding of your ID and vectorization strategy (see below)
- Note: Collections do not need to be pre-created if Schema Evolution is enabled (default behavior)
Key Concepts
Collections & Topics
Collections in Weaviate are similar to tables in traditional databases. Each Kafka topic is mapped to a Weaviate collection. By default, a Kafka topic namedusers maps to a collection named users. You can customize this mapping using the Collection Mapping setting.
Document IDs & Upsert Operations
Weaviate uses UUIDs to uniquely identify objects. The connector supports multiple ID strategies:- NoIdStrategy (default) – Generates a new UUID for each record, always creating new objects (INSERT semantics)
- FieldIdStrategy – Uses a field from your Kafka record as the document ID, enabling upserts
- KafkaIdStrategy – Uses the Kafka message key as the document ID
id, user_id) in the Document ID Field setting. If your Kafka record contains an id field and you want to preserve it in Weaviate, note that id is a reserved keyword in Weaviate. The connector automatically renames it to __id in the stored object.
Vectorization & Bring Your Own Vectors (BYOV)
- Default (NoVectorStrategy) – Weaviate generates embeddings using the vectorizer specified in your collection configuration (e.g., OpenAI, Cohere, Hugging Face). For newly created collections via Schema Evolution, the Default Weaviate Vectorizer setting determines which vectorizer is used.
- Bring Your Own Vectors (BYOV) – If embeddings are pre-computed outside Weaviate, use FieldVectorStrategy and specify the field containing the embedding vector. Note: BYOV supports only one vector field per collection. If your Kafka records contain multiple vector fields, only the configured vector field will be used.
Delete Operations
If Delete Enabled is set to true, records with null values are treated as deletes. For example, when a record is deleted in the source and a tombstone record (with null value) is sent to Kafka, the connector will delete the corresponding object from Weaviate.Weaviate Preparation
Before configuring the connector, prepare your Weaviate instance:Gather Connection Details
For self-hosted Weaviate:
- REST URL:
http://<host>:<port>(default:http://localhost:8080) - gRPC URL:
<host>:<port>(default:localhost:50051)
For Weaviate Cloud Service (WCS):
- REST URL: Available in your WCS dashboard (https://<REST-Endpoint>)
- gRPC URL: Available in your WCS dashboard
- Enable gRPC TLS: Set to true for WCS
Create Collections (Optional with Schema Evolution)
By default, Schema Evolution is enabled, allowing the connector to automatically create collections from incoming Kafka records. However, you may want to pre-create collections if:- You want to define custom vectorizers (e.g., specific OpenAI model, Cohere, Hugging Face) before data arrives
- You want to configure custom collection properties not present in the first Kafka record
- You prefer explicit schema control in production environments
- Collection names should match your Kafka topic names (or configure via Collection Mapping)
- If a collection doesn’t exist and Schema Evolution is enabled, the connector will create it automatically
- If a collection exists and new properties arrive in Kafka records, Weaviate will automatically add them to the collection schema
Prepare Authentication (if needed)
API Key Authentication:
- Generate an API key from your Weaviate instance settings
- Have it ready to paste during setup
OIDC Authentication:
- Gather Client ID, Client Secret, and OIDC Token URL
- Ensure OIDC is enabled on your Weaviate instance
Streamkap Setup (UI)
- Navigate to Destinations and choose Weaviate.
-
Fill in the fields:
- Name – A memorable identifier for this Destination.
-
Weaviate REST URL – The REST endpoint. Examples:
- Self-hosted:
http://localhost:8080 - WCS:
https://my-instance.gcp.weaviate.cloud
- Self-hosted:
-
Weaviate gRPC URL – The gRPC endpoint. Examples:
- Self-hosted:
localhost:50051 - WCS:
my-instance.gcp.weaviate.cloud
- Self-hosted:
- gRPC TLS Secured – Enable if your gRPC connection requires TLS encryption (required for WCS).
-
Authentication Type – Select
None,API Key, orOIDC:- None – No authentication (suitable for local development)
- API Key – Use an API key generated by Weaviate
- OIDC – Use OpenID Connect Client Credentials flow
- API Key (if API Key auth selected) – Paste your Weaviate API key.
- OIDC Client ID (if OIDC selected) – Your OIDC client identifier.
- OIDC Client Secret (if OIDC selected) – Your OIDC client secret.
-
OIDC Scopes (if OIDC selected, optional) – Comma-separated scopes (e.g.,
openid profile email). -
Custom Headers – Optional headers to include in all requests (e.g.,
X-OpenAI-Api-Keyfor embedding provider keys). -
Collection Mapping – Pattern to map Kafka topics to Weaviate collections. Default:
${topic}. Examples:${topic}– Topicusers→ Collectionusersweaviate_${topic}– Topicusers→ Collectionweaviate_usersproduction_${topic}_v2– Topicusers→ Collectionproduction_users_v2
-
Document ID Strategy – Choose how to assign IDs to objects:
- No ID Strategy – Generate new UUID for each record (always inserts)
- Field ID Strategy – Use a field from the record as the UUID
- Kafka ID Strategy – Use the Kafka message key as the UUID
-
Document ID Field (if Field ID Strategy selected) – The field name containing the ID (e.g.,
id,user_id). This field must exist in your Kafka records. -
Vector Strategy – Choose how embeddings are handled:
- No Vector Strategy – Let Weaviate vectorize using its configured vectorizer
- Field Vector Strategy – Use a pre-computed embedding from a field
-
Vector Field Name (if Field Vector Strategy selected) – The field containing the embedding vector (e.g.,
embedding,vector). Must be an array of numbers. -
Default Weaviate Vectorizer (Advanced) – Sets the default vectorizer for newly created collections when Schema Evolution is enabled. Options include:
none– No vectorization (use this if you’re providing your own vectors via Field Vector Strategy)text2vec-weaviate– Weaviate’s built-in text vectorizertext2vec-openai– OpenAI embeddings (requires API key in Custom Headers)text2vec-cohere– Cohere embeddings (requires API key in Custom Headers)text2vec-jinaai– Jina AI embeddings (requires API key in Custom Headers)text2vec-voyageai– Voyage AI embeddings (requires API key in Custom Headers)
- Delete Enabled – If true, null-valued records are treated as deletes. Useful for delete propagation from source systems.
-
Schema Evolution (default:
basic) – Controls automatic collection and property management:- basic (default) – The connector automatically creates missing collections from incoming Kafka records. If new properties appear in later records, Weaviate automatically adds them to the collection schema. This is ideal for development and dynamic data pipelines.
- none – The connector expects collections to exist in Weaviate and will fail if a collection is missing. Use this in production for strict schema control.
- Batch Size – Number of records to batch before sending to Weaviate (default: 100). Larger batches improve throughput; smaller batches reduce latency.
- Pool Size – Number of worker threads processing batches (default: 1). Increase for parallel processing if Weaviate can handle concurrent requests.
- Await Termination (ms) – Timeout in milliseconds for batch processing before forcing termination (default: 10000).
- Max Retries – Maximum retry attempts on connection/timeout errors (default: 3).
- Retry Interval (ms) – Delay between retries (default: 2000).
- Click Save.
How It Works
- Record Ingestion – Records from Kafka are received by the connector.
- ID Generation – Based on the ID strategy, a UUID is assigned or extracted.
- Vector Assignment – If using Field Vector Strategy, the embedding is extracted; otherwise, it’s left for Weaviate to compute.
- Reserved Field Handling – Fields named
idor_idare renamed to__idto avoid Weaviate keyword conflicts. - Batching – Records are batched according to Batch Size.
- Weaviate Storage – The batch is sent to Weaviate via gRPC for ingestion.
- Upsert/Insert Semantics – If using an ID strategy with existing UUIDs, objects are upserted (updated if they exist); otherwise, new objects are always inserted.
- Error Handling – Failed batches are retried up to Max Retries times with exponential backoff.
Reserved Fields & Field Renaming
Weaviate has reserved keywords (id, _id). If your Kafka records contain fields named id or _id, the connector automatically renames them:
id→__id_id→__id
Example
Input from Kafka:Limitations & Best Practices
Limitations
- Collections Require Pre-creation When Schema Evolution is Disabled – If you disable Schema Evolution (
nonemode), collections must exist in Weaviate before streaming; the connector will fail if a collection is missing. - Schema Mismatch with Evolution Disabled – When Schema Evolution is disabled and a Kafka record contains a field not defined in the Weaviate collection schema, that field is silently dropped. Ensure your Weaviate schema matches your Kafka payload structure if evolution is disabled.
- Single Vector Field per Collection (BYOV) – When using Field Vector Strategy (Bring Your Own Vectors), the connector supports only one vector field per collection. If your Kafka records contain multiple vector fields, only the configured vector field will be used for ingestion.
- Batch Retries – If a batch fails, the entire batch is retried, potentially causing duplicate inserts if not using upsert semantics (e.g., with Field ID or Kafka ID strategies).
Best Practices
- Use Field ID Strategy for Idempotency – If your source has unique identifiers (e.g., user IDs, order IDs), use them as document IDs to enable idempotent upserts.
- Pre-compute Embeddings for Custom Models – If you’re using embeddings outside Weaviate, use BYOV (Field Vector Strategy) to avoid re-vectorization. Remember that only one vector field per collection is supported.
- Configure Default Vectorizer for New Collections – Use the Default Weaviate Vectorizer setting to specify which vectorizer should be used for collections created automatically by Schema Evolution. This is more efficient than pre-creating collections if you’re using standard vectorizers.
- Design Weaviate Schema for Custom Vectorizers – If you want specific vectorizer configurations (e.g., OpenAI with a particular model, specific Cohere settings), pre-create collections with those settings. Schema Evolution uses the Default Weaviate Vectorizer for automatic collection creation.
- Monitor Weaviate Capacity – Ensure your Weaviate instance can handle the ingestion rate. Start with small batch sizes and increase gradually.
- Handle Reserved Fields – Be aware that
idand_idare renamed to__id; adjust downstream queries if needed. - Enable Deletion if Using CDC – If your source is a database with CDC, enable Delete Enabled to propagate deletes.
- Use Custom Headers for Embedding APIs – If your vectorizer needs API keys (e.g., OpenAI, Cohere, Jina AI), provide them via Custom Headers.
Troubleshooting
Collection Not Found Error
Problem: Connector fails with “Collection <name> not found” Solution:- This error typically occurs when Schema Evolution is disabled. Enable Schema Evolution if you want the connector to create collections automatically.
- If you prefer Schema Evolution disabled for strict control, pre-create the required collection in Weaviate
- Check that the Collection Mapping pattern correctly maps your topic name to the collection name
- If using custom mapping, verify the generated collection name matches what exists in Weaviate
Authentication Failures
Problem: “Unauthorized” or “Forbidden” errors Solution:- Verify your API key or OIDC credentials are correct
- Ensure the credentials have permissions to write to collections
- For OIDC, check that the token URL is reachable and returns valid tokens
gRPC Connection Issues
Problem: “Failed to connect to gRPC endpoint” Solution:- Verify the gRPC URL is correct (host and port)
- Ensure gRPC TLS is enabled if connecting to WCS
- Check network connectivity and firewall rules
- For self-hosted Weaviate, ensure gRPC is enabled on port 50051
Field Not Found or Data Loss
Problem: Fields from Kafka records are missing in Weaviate Solution:- If Schema Evolution is enabled (default), new fields are automatically added to the collection schema by Weaviate. If you’re not seeing a field, verify the field name is correct (case-sensitive).
- If Schema Evolution is disabled and a Kafka record contains a field not defined in the Weaviate collection schema, that field will be silently dropped. Check that your Weaviate collection schema includes all expected fields.
- Use Weaviate’s schema inspection to see what properties are currently defined
- Verify field names match exactly (case-sensitive) between Kafka records and Weaviate collection schema
ID Field Conflicts
Problem:id or _id fields are missing from ingested data
Solution:
- The connector renames these to
__idrespectively to avoid conflicts with Weaviate’s internal ID field - Query your data and check for the renamed fields
- If needed, query the
_additional.idproperty to get the Weaviate UUID
High Latency or Backpressure
Problem: Connector is slow or not keeping up with Kafka Solution:- Increase Batch Size to reduce the number of requests to Weaviate
- Increase Pool Size to enable parallel processing
- Check Weaviate’s CPU and memory; scale if needed
- Monitor network latency between Kafka Connect and Weaviate
Security Notes
- API Keys & Secrets – Stored encrypted. Never share them in logs or config files shared publicly.
- HTTPS/TLS – Always use HTTPS for REST connections and enable gRPC TLS for production.
- OIDC Tokens – Tokens are refreshed automatically and not persisted in logs.
- Custom Headers – Be careful with sensitive headers (e.g., API keys); they should be managed securely by your deployment platform.
Next Steps
- Review Weaviate’s documentation for collection design best practices
- Test the connector with a small Kafka topic first
- Monitor latency and throughput during initial deployment
- Adjust batch size and pool size based on observed performance
- Set up alerts for connector task failures