Skip to main content

Kafka

Synopsis

Creates a collector that connects to Kafka brokers and consumes messages from specified topics. Supports authentication, TLS encryption, and multiple workers.

Schema

- id: <numeric>
name: <string>
description: <string>
type: kafka
tags: <string[]>
pipelines: <pipeline[]>
status: <boolean>
properties:
address: <string>
port: <numeric>
username: <string>
password: <string>
algorithm: <string>
kerberos_keytab: <string>
kerberos_principal: <string>
kerberos_realm: <string>
kerberos_kdc: <string>
kerberos_config: <string>
kerberos_service_name: <string>
group: <string>
topic: <string>
balancer: <string>
reuse: <boolean>
workers: <numeric>
tls:
status: <boolean>
cert_name: <string>
key_name: <string>

Configuration

Device

FieldRequiredDefaultDescription
idY-Unique numeric identifier
nameY-Device name
descriptionN-Optional description
typeY-Must be kafka
tagsN-Optional tags
pipelinesN-Optional pre-processor pipelines
statusNtrueEnable/disable the device

Connection

FieldRequiredDefaultDescription
addressN"0.0.0.0"Kafka broker address
portY-Kafka broker port
usernameN-SASL username
passwordN-SASL password
algorithmN-SASL mechanism: plain, scram-sha-256, scram-sha-512, or gssapi. Leave empty or set to none to disable authentication
kerberos_keytabN*-Keytab file path, or base64-encoded keytab content. Resolvable via ${ENV_VAR} or $secret{...}
kerberos_principalNusernameClient principal (user or user@REALM). Falls back to username when omitted
kerberos_realmN-Kerberos realm. Overrides the realm derived from the principal or krb5.conf
kerberos_kdcN*-Comma-separated KDC addresses (host[:port], default port 88)
kerberos_configN*-krb5.conf file path, or inline krb5.conf content. Generated from kerberos_realm and kerberos_kdc when omitted
kerberos_service_nameN"kafka"Kafka broker service principal name (SPN)
groupN"vmetric"Consumer group ID
topicY-Topic to consume from
balancerN"roundrobin"Partition balancing strategy

* = Applies only when algorithm is gssapi. kerberos_keytab is required, along with either kerberos_kdc or kerberos_config.

TLS

FieldRequiredDefaultDescription
tls.statusNfalseEnable TLS encryption
tls.cert_nameY*-TLS certificate file name
tls.key_nameY*-TLS private key file name

* = Conditionally required when tls.status is true.

note

The TLS certificate and key files must be placed in the service root directory.

Performance

FieldRequiredDefaultDescription
reuseNtrueEnable multi-worker mode
workersN4Number of worker processes when reuse enabled (capped at the number of available CPU cores)

Examples

Basic

Creating a simple Kafka consumer...

- id: 1
name: basic_kafka
type: kafka
properties:
address: "kafka.example.com"
port: 9092
topic: "logs"

Secure

Connecting with SASL authentication and TLS encryption...

- id: 2
name: secure_kafka
type: kafka
properties:
address: "kafka.example.com"
port: 9093
username: "consumer"
password: "secret"
algorithm: "scram-sha-256"
topic: "secure-logs"
tls:
status: true
cert_name: "kafka.crt"
key_name: "kafka.key"

Kerberos

Authenticating with Kerberos (GSSAPI) using a keytab and KDC...

- id: 6
name: kerberos_kafka
type: kafka
properties:
address: "kafka.example.com"
port: 9093
algorithm: "gssapi"
kerberos_keytab: "$secret{id=12}"
kerberos_principal: "vmetric@EXAMPLE.COM"
kerberos_kdc: "kdc01.example.com,kdc02.example.com:88"
kerberos_service_name: "kafka"
topic: "secured-logs"
tls:
status: true
cert_name: "kafka.crt"
key_name: "kafka.key"

High-Volume

Optimizing for throughput with multi-worker mode...

- id: 3
name: performant_kafka
type: kafka
properties:
address: "kafka.example.com"
port: 9092
topic: "high-volume-logs"
group: "high-perf-group"
reuse: true
workers: 4

Consumer Groups

Configuring consumer group behavior...

- id: 4
name: group_kafka
type: kafka
properties:
address: "kafka.example.com"
port: 9092
topic: "shared-logs"
group: "processing-group"
balancer: "roundrobin"
reuse: true
workers: 2
warning

Consumers in the same group must use compatible configuration settings.

Pipelines

Applying custom processing to messages...

- id: 5
name: pipeline_kafka
type: kafka
pipelines:
- json_parser
- field_extractor
properties:
address: "kafka.example.com"
port: 9092
topic: "raw-logs"
group: "processing-group"
note

Pipelines are processed sequentially, and can modify or drop messages before ingestion.