Kafka
Synopsis
Creates a collector that connects to Kafka brokers and consumes messages from specified topics. Supports authentication, TLS encryption, and multiple workers.
Schema
- id: <numeric>
name: <string>
description: <string>
type: kafka
tags: <string[]>
pipelines: <pipeline[]>
status: <boolean>
properties:
address: <string>
port: <numeric>
username: <string>
password: <string>
group: <string>
topic: <string>
balancer: <string>
reuse: <boolean>
workers: <numeric>
buffer_size: <numeric>
stats_frequency: <numeric>
tls:
status: <boolean>
cert_name: <string>
key_name: <string>
Configuration
The following are the minimum requirements to define the device.
Device
Field | Required | Default | Description |
---|---|---|---|
id | Y | Unique identifier | |
name | Y | Device name | |
description | N | - | Optional description |
type | Y | Must be kafka | |
tags | N | - | Optional tags |
pipelines | N | - | Optional pre-processor pipelines |
status | N | true | Enable/disable the device |
Connection
Field | Required | Default | Description |
---|---|---|---|
address | N | "0.0.0.0" | Kafka broker address |
port | Y | Kafka broker port | |
username | N | - | SASL username |
password | N | - | SASL password |
group | N | "vmetric" | Consumer group ID |
topic | Y | Topic to consume from | |
balancer | N | "roundrobin" | Partition balancing strategy |
TLS
Field | Required | Default | Description |
---|---|---|---|
tls.status | N | false | Enable TLS encryption |
tls.cert_name | Y | TLS certificate file path (required if TLS enabled) | |
tls.key_name | Y | TLS private key file path (required if TLS enabled) |
The TLS certificate and key files must be placed in the service root directory.
Performance
Field | Required | Default | Description |
---|---|---|---|
reuse | N | true | Enable multi-worker mode |
workers | N | 4 | Number of worker processes when reuse enabled |
buffer_size | N | 9000 | Read buffer size in bytes |
stats_frequency | N | 300 | Statistics collection interval in seconds |
Advanced Features
Multiple Workers
When reuse
is enabled, the collector uses multiple workers. Each worker maintains its own Kafka consumer, and processes its own messages independently, automatically balancing message volumes. The worker count is capped at the number of available CPU cores.
Messages
The collector supports message offset tracking and commits, automatic consumer group rebalancing, multiple topic subscriptions, TLS-encrypted connections and SASL authentication, and custom message-processing pipelines.
Examples
The following are commonly used configuration types.
Basic
The minimum required configuration creates the consumer:
Creating a simple Kafka consumer... |
|
Secure
The consumer can connect to secure Kafka brokers:
Connecting with authentication and encryption... |
|
High-Volume
Performance can be enhanced for high message volumes:
Optimizing for throughput... |
|
When reuse
is enabled, the actual worker count will be capped at the number of available CPU cores.
Consumer Groups
Message consumption can be coordinated:
Configuring consumer group behavior... |
|
The consumers in the same group must use compatible configuration settings.
Messages
Messages can be pre-processed:
Applying custom processing to messages... |
|
Pipelines are processed sequentially, and can modify or drop messages before ingestion.