> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamkap.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Weaviate

> Stream data from Kafka topics into Weaviate vector collections in real-time

## Overview

Use the Weaviate Sink Destination to stream data from your Kafka topics into Weaviate collections. This connector is useful for building real-time vector databases, enabling semantic search, and maintaining synchronized vector embeddings. Whether you're vectorizing text data with Weaviate's built-in vectorizers or bringing pre-computed embeddings, this connector handles the entire ingestion pipeline.

## Prerequisites

* A Weaviate instance (self-hosted or Weaviate Cloud Service)
* Connection details: REST URL and gRPC URL
* Authentication credentials (if required by your Weaviate instance)
* Understanding of your ID and vectorization strategy (see below)
* Note: Collections do not need to be pre-created if **Schema Evolution** is enabled (default behavior)

## Key Concepts

### Collections & Topics

Collections in Weaviate are similar to tables in traditional databases. Each Kafka topic is mapped to a Weaviate collection. By default, a Kafka topic named `users` maps to a collection named `users`. You can customize this mapping using the **Collection Mapping** setting.

### Document IDs & Upsert Operations

Weaviate uses UUIDs to uniquely identify objects. The connector supports multiple ID strategies:

* **NoIdStrategy** (default) – Generates a new UUID for each record, always creating new objects (INSERT semantics)
* **FieldIdStrategy** – Uses a field from your Kafka record as the document ID, enabling upserts
* **KafkaIdStrategy** – Uses the Kafka message key as the document ID

When using **FieldIdStrategy**, specify the field name (e.g., `id`, `user_id`) in the **Document ID Field** setting. If your Kafka record contains an `id` field and you want to preserve it in Weaviate, note that `id` is a reserved keyword in Weaviate. The connector automatically renames it to `__id` in the stored object.

### Vectorization & Bring Your Own Vectors (BYOV)

* **Default (NoVectorStrategy)** – Weaviate generates embeddings using the vectorizer specified in your collection configuration (e.g., OpenAI, Cohere, Hugging Face). For newly created collections via Schema Evolution, the **Default Weaviate Vectorizer** setting determines which vectorizer is used.
* **Bring Your Own Vectors (BYOV)** – If embeddings are pre-computed outside Weaviate, use **FieldVectorStrategy** and specify the field containing the embedding vector. **Note:** BYOV supports only one vector field per collection. If your Kafka records contain multiple vector fields, only the configured vector field will be used.

### Delete Operations

If **Delete Enabled** is set to true, records with null values are treated as deletes. For example, when a record is deleted in the source and a tombstone record (with null value) is sent to Kafka, the connector will delete the corresponding object from Weaviate.

## Weaviate Preparation

Before configuring the connector, prepare your Weaviate instance:

### Gather Connection Details

#### For self-hosted Weaviate:

* REST URL: `http://<host>:<port>` (default: `http://localhost:8080`)
* gRPC URL: `<host>:<port>` (default: `localhost:50051`)

#### For Weaviate Cloud Service (WCS):

* REST URL: Available in your WCS dashboard (https\://\<REST-Endpoint>)
* gRPC URL: Available in your WCS dashboard
* Enable gRPC TLS: Set to true for WCS

### Create Collections (Optional with Schema Evolution)

By default, **Schema Evolution** is enabled, allowing the connector to automatically create collections from incoming Kafka records. However, you may want to pre-create collections if:

* You want to define custom vectorizers (e.g., specific OpenAI model, Cohere, Hugging Face) before data arrives
* You want to configure custom collection properties not present in the first Kafka record
* You prefer explicit schema control in production environments

Alternatively, when **Schema Evolution** is enabled and collections are created automatically, you can use the **Default Weaviate Vectorizer** setting to specify which vectorizer should be used for newly created collections. This allows you to control vectorization without pre-creating collections.

If you pre-create collections, use Weaviate's UI, API, or Python client:

```bash theme={null}
# Example using Python client
from weaviate.connect import ConnectionParams
from weaviate.client import Client
from weaviate.classes.config import Property, DataType, Vectorizer

client = Client(
    connection_params=ConnectionParams.http(host="localhost", port=8080)
)

# Create a collection with OpenAI vectorizer
client.collections.create(
    name="Products",
    properties=[
        Property(name="title", data_type=DataType.TEXT),
        Property(name="description", data_type=DataType.TEXT),
        Property(name="price", data_type=DataType.NUMBER),
    ],
    vectorizer_config=Vectorizer.text2vec_openai(
        model="text-embedding-3-small"
    ),
)
```

Note:

* Collection names should match your Kafka topic names (or configure via **Collection Mapping**)
* If a collection doesn't exist and Schema Evolution is enabled, the connector will create it automatically
* If a collection exists and new properties arrive in Kafka records, Weaviate will automatically add them to the collection schema

### Prepare Authentication (if needed)

#### API Key Authentication:

* Generate an API key from your Weaviate instance settings
* Have it ready to paste during setup

#### OIDC Authentication:

* Gather Client ID, Client Secret, and OIDC Token URL
* Ensure OIDC is enabled on your Weaviate instance

## Streamkap Setup (UI)

1. Navigate to [Destinations](https://app.streamkap.com/connectors/add?tab=Destinations) and choose **Weaviate**.

2. Fill in the fields:

   1. **Name** – A memorable identifier for this Destination.

   2. **Weaviate REST URL** – The REST endpoint. Examples:
      * Self-hosted: `http://localhost:8080`
      * WCS: `https://my-instance.gcp.weaviate.cloud`

   3. **Weaviate gRPC URL** – The gRPC endpoint. Examples:
      * Self-hosted: `localhost:50051`
      * WCS: `my-instance.gcp.weaviate.cloud`

   4. **gRPC TLS Secured** – Enable if your gRPC connection requires TLS encryption (required for WCS).

   5. **Authentication Type** – Select `None`, `API Key`, or `OIDC`:
      * **None** – No authentication (suitable for local development)
      * **API Key** – Use an API key generated by Weaviate
      * **OIDC** – Use OpenID Connect Client Credentials flow

   6. **API Key** (if API Key auth selected) – Paste your Weaviate API key.

   7. **OIDC Client ID** (if OIDC selected) – Your OIDC client identifier.

   8. **OIDC Client Secret** (if OIDC selected) – Your OIDC client secret.

   9. **OIDC Scopes** (if OIDC selected, optional) – Comma-separated scopes (e.g., `openid profile email`).

   10. **Custom Headers** – Optional headers to include in all requests (e.g., `X-OpenAI-Api-Key` for embedding provider keys).

   11. **Collection Mapping** – Pattern to map Kafka topics to Weaviate collections. Default: `${topic}`. Examples:
       * `${topic}` – Topic `users` → Collection `users`
       * `weaviate_${topic}` – Topic `users` → Collection `weaviate_users`
       * `production_${topic}_v2` – Topic `users` → Collection `production_users_v2`

   12. **Document ID Strategy** – Choose how to assign IDs to objects:
       * **No ID Strategy** – Generate new UUID for each record (always inserts)
       * **Field ID Strategy** – Use a field from the record as the UUID
       * **Kafka ID Strategy** – Use the Kafka message key as the UUID

   13. **Document ID Field** (if Field ID Strategy selected) – The field name containing the ID (e.g., `id`, `user_id`). This field must exist in your Kafka records.

   14. **Vector Strategy** – Choose how embeddings are handled:
       * **No Vector Strategy** – Let Weaviate vectorize using its configured vectorizer
       * **Field Vector Strategy** – Use a pre-computed embedding from a field

   15. **Vector Field Name** (if Field Vector Strategy selected) – The field containing the embedding vector (e.g., `embedding`, `vector`). Must be an array of numbers.

   16. **Default Weaviate Vectorizer** (Advanced) – Sets the default vectorizer for newly created collections when Schema Evolution is enabled. Options include:
       * `none` – No vectorization (use this if you're providing your own vectors via Field Vector Strategy)
       * `text2vec-weaviate` – Weaviate's built-in text vectorizer
       * `text2vec-openai` – OpenAI embeddings (requires API key in Custom Headers)
       * `text2vec-cohere` – Cohere embeddings (requires API key in Custom Headers)
       * `text2vec-jinaai` – Jina AI embeddings (requires API key in Custom Headers)
       * `text2vec-voyageai` – Voyage AI embeddings (requires API key in Custom Headers)

   17. **Delete Enabled** – If true, null-valued records are treated as deletes. Useful for delete propagation from source systems.

   18. **Schema Evolution** (default: `basic`) – Controls automatic collection and property management:
       * **basic (default)** – The connector automatically creates missing collections from incoming Kafka records. If new properties appear in later records, Weaviate automatically adds them to the collection schema. This is ideal for development and dynamic data pipelines.
       * **none** – The connector expects collections to exist in Weaviate and will fail if a collection is missing. Use this in production for strict schema control.

   19. **Batch Size** – Number of records to batch before sending to Weaviate (default: 100). Larger batches improve throughput; smaller batches reduce latency.

   20. **Pool Size** – Number of worker threads processing batches (default: 1). Increase for parallel processing if Weaviate can handle concurrent requests.

   21. **Await Termination (ms)** – Timeout in milliseconds for batch processing before forcing termination (default: 10000).

   22. **Max Retries** – Maximum retry attempts on connection/timeout errors (default: 3).

   23. **Retry Interval (ms)** – Delay between retries (default: 2000).

3. Click **Save**.

## How It Works

1. **Record Ingestion** – Records from Kafka are received by the connector.
2. **ID Generation** – Based on the ID strategy, a UUID is assigned or extracted.
3. **Vector Assignment** – If using Field Vector Strategy, the embedding is extracted; otherwise, it's left for Weaviate to compute.
4. **Reserved Field Handling** – Fields named `id` or `_id` are renamed to `__id` to avoid Weaviate keyword conflicts.
5. **Batching** – Records are batched according to **Batch Size**.
6. **Weaviate Storage** – The batch is sent to Weaviate via gRPC for ingestion.
7. **Upsert/Insert Semantics** – If using an ID strategy with existing UUIDs, objects are upserted (updated if they exist); otherwise, new objects are always inserted.
8. **Error Handling** – Failed batches are retried up to **Max Retries** times with exponential backoff.

## Reserved Fields & Field Renaming

Weaviate has reserved keywords (`id`, `_id`). If your Kafka records contain fields named `id` or `_id`, the connector automatically renames them:

* `id` → `__id`
* `_id` → `__id`

This preserves your data while avoiding conflicts with Weaviate's internal ID field.

### Example

**Input from Kafka:**

```json theme={null}
{
  "id": "user-123",
  "name": "Alice",
  "email": "alice@example.com"
}
```

**After ingestion into Weaviate:**

```json theme={null}
{
  "__id": "user-123",
  "name": "Alice",
  "email": "alice@example.com",
  "_additional": {
    "id": "<generated-uuid>"
  }
}
```

## Limitations & Best Practices

### Limitations

* **Collections Require Pre-creation When Schema Evolution is Disabled** – If you disable Schema Evolution (`none` mode), collections must exist in Weaviate before streaming; the connector will fail if a collection is missing.
* **Schema Mismatch with Evolution Disabled** – When Schema Evolution is disabled and a Kafka record contains a field not defined in the Weaviate collection schema, that field is silently dropped. Ensure your Weaviate schema matches your Kafka payload structure if evolution is disabled.
* **Single Vector Field per Collection (BYOV)** – When using Field Vector Strategy (Bring Your Own Vectors), the connector supports only one vector field per collection. If your Kafka records contain multiple vector fields, only the configured vector field will be used for ingestion.
* **Batch Retries** – If a batch fails, the entire batch is retried, potentially causing duplicate inserts if not using upsert semantics (e.g., with Field ID or Kafka ID strategies).

### Best Practices

1. **Use Field ID Strategy for Idempotency** – If your source has unique identifiers (e.g., user IDs, order IDs), use them as document IDs to enable idempotent upserts.
2. **Pre-compute Embeddings for Custom Models** – If you're using embeddings outside Weaviate, use BYOV (Field Vector Strategy) to avoid re-vectorization. Remember that only one vector field per collection is supported.
3. **Configure Default Vectorizer for New Collections** – Use the **Default Weaviate Vectorizer** setting to specify which vectorizer should be used for collections created automatically by Schema Evolution. This is more efficient than pre-creating collections if you're using standard vectorizers.
4. **Design Weaviate Schema for Custom Vectorizers** – If you want specific vectorizer configurations (e.g., OpenAI with a particular model, specific Cohere settings), pre-create collections with those settings. Schema Evolution uses the Default Weaviate Vectorizer for automatic collection creation.
5. **Monitor Weaviate Capacity** – Ensure your Weaviate instance can handle the ingestion rate. Start with small batch sizes and increase gradually.
6. **Handle Reserved Fields** – Be aware that `id` and `_id` are renamed to `__id`; adjust downstream queries if needed.
7. **Enable Deletion if Using CDC** – If your source is a database with CDC, enable **Delete Enabled** to propagate deletes.
8. **Use Custom Headers for Embedding APIs** – If your vectorizer needs API keys (e.g., OpenAI, Cohere, Jina AI), provide them via **Custom Headers**.

## Troubleshooting

### Collection Not Found Error

**Problem:** Connector fails with "Collection \<name> not found"

**Solution:**

* This error typically occurs when **Schema Evolution** is disabled. Enable Schema Evolution if you want the connector to create collections automatically.
* If you prefer Schema Evolution disabled for strict control, pre-create the required collection in Weaviate
* Check that the **Collection Mapping** pattern correctly maps your topic name to the collection name
* If using custom mapping, verify the generated collection name matches what exists in Weaviate

### Authentication Failures

**Problem:** "Unauthorized" or "Forbidden" errors

**Solution:**

* Verify your API key or OIDC credentials are correct
* Ensure the credentials have permissions to write to collections
* For OIDC, check that the token URL is reachable and returns valid tokens

### gRPC Connection Issues

**Problem:** "Failed to connect to gRPC endpoint"

**Solution:**

* Verify the gRPC URL is correct (host and port)
* Ensure gRPC TLS is enabled if connecting to WCS
* Check network connectivity and firewall rules
* For self-hosted Weaviate, ensure gRPC is enabled on port 50051

### Field Not Found or Data Loss

**Problem:** Fields from Kafka records are missing in Weaviate

**Solution:**

* If **Schema Evolution** is enabled (default), new fields are automatically added to the collection schema by Weaviate. If you're not seeing a field, verify the field name is correct (case-sensitive).
* If **Schema Evolution** is disabled and a Kafka record contains a field not defined in the Weaviate collection schema, that field will be silently dropped. Check that your Weaviate collection schema includes all expected fields.
* Use Weaviate's schema inspection to see what properties are currently defined
* Verify field names match exactly (case-sensitive) between Kafka records and Weaviate collection schema

### ID Field Conflicts

**Problem:** `id` or `_id` fields are missing from ingested data

**Solution:**

* The connector renames these to `__id` respectively to avoid conflicts with Weaviate's internal ID field
* Query your data and check for the renamed fields
* If needed, query the `_additional.id` property to get the Weaviate UUID

### High Latency or Backpressure

**Problem:** Connector is slow or not keeping up with Kafka

**Solution:**

* Increase **Batch Size** to reduce the number of requests to Weaviate
* Increase **Pool Size** to enable parallel processing
* Check Weaviate's CPU and memory; scale if needed
* Monitor network latency between Kafka Connect and Weaviate

## Security Notes

* **API Keys & Secrets** – Stored encrypted. Never share them in logs or config files shared publicly.
* **HTTPS/TLS** – Always use HTTPS for REST connections and enable gRPC TLS for production.
* **OIDC Tokens** – Tokens are refreshed automatically and not persisted in logs.
* **Custom Headers** – Be careful with sensitive headers (e.g., API keys); they should be managed securely by your deployment platform.

# Next Steps

1. Review Weaviate's [documentation](https://docs.weaviate.io/weaviate/guides) for collection design best practices
2. Test the connector with a small Kafka topic first
3. Monitor latency and throughput during initial deployment
4. Adjust batch size and pool size based on observed performance
5. Set up alerts for connector task failures
