Apache Kafka
Synopsis
Creates a target that writes log messages to Apache Kafka topics with support for batching, compression, and various authentication methods. The target handles message delivery efficiently with configurable batch limits based on size or event count. Apache Kafka is an open-source distributed event streaming platform for high-throughput, fault-tolerant data pipelines.
Schema
- name: <string>
description: <string>
type: kafka
pipelines: <pipeline[]>
status: <boolean>
properties:
address: <string>
port: <numeric>
client_id: <string>
topic: <string>
algorithm: <string>
username: <string>
password: <string>
compression: <string>
compression_level: <string>
acknowledgments: <string>
allow_auto_topic_creation: <boolean>
disable_idempotent_write: <boolean>
max_bytes: <numeric>
max_events: <numeric>
field_format: <string>
tls:
status: <boolean>
insecure_skip_verify: <boolean>
min_tls_version: <string>
max_tls_version: <string>
cert_name: <string>
key_name: <string>
passphrase: <string>
interval: <string|numeric>
cron: <string>
Configuration
The following fields are used to define the target:
| Field | Required | Default | Description |
|---|---|---|---|
name | Y | Target name | |
description | N | - | Optional description |
type | Y | Must be kafka | |
pipelines | N | - | Optional post-processor pipelines |
status | N | true | Enable/disable the target |
Kafka Connection
| Field | Required | Default | Description |
|---|---|---|---|
address | Y | - | Kafka broker address (hostname or IP) |
port | N | 9092 | Kafka broker port (1-65535) |
client_id | N | - | Client identifier for connection tracking |
topic | Y | - | Kafka topic name for message delivery |
Authentication
| Field | Required | Default | Description |
|---|---|---|---|
algorithm | N | "none" | Authentication mechanism: none, plain, scram-sha-256, scram-sha-512 |
username | N* | - | Username for SASL authentication |
password | N* | - | Password for SASL authentication |
* = Conditionally required when algorithm is not none.
Producer Settings
| Field | Required | Default | Description |
|---|---|---|---|
compression | N | "none" | Message compression: none, gzip, snappy, lz4, zstd |
compression_level | N | - | Compression level (algorithm-specific, see details below) |
acknowledgments | N | "leader" | Acknowledgment level: none, leader, all |
allow_auto_topic_creation | N | false | Allow automatic topic creation if topic doesn't exist |
disable_idempotent_write | N | false | Disable idempotent producer (not recommended) |
Batch Configuration
| Field | Required | Default | Description |
|---|---|---|---|
max_bytes | N | 0 | Maximum batch size in bytes (0 = unlimited) |
max_events | N | 1000 | Maximum number of events per batch |
field_format | N | - | Data normalization format. See applicable Normalization section |
Batches are sent when either max_bytes or max_events limit is reached, whichever comes first.
TLS Configuration
| Field | Required | Default | Description |
|---|---|---|---|
tls.status | N | false | Enable TLS/SSL encryption |
tls.insecure_skip_verify | N | false | Skip certificate verification (not recommended) |
tls.min_tls_version | N | "tls1.2" | Minimum TLS version: tls1.0, tls1.1, tls1.2, tls1.3 |
tls.max_tls_version | N | "tls1.2" | Maximum TLS version: tls1.0, tls1.1, tls1.2, tls1.3 |
tls.cert_name | N | "cert.pem" | Client certificate file name for mTLS |
tls.key_name | N | "key.pem" | Private key file name for mTLS |
tls.passphrase | N | - | Passphrase for encrypted private key |
Scheduler
| Field | Required | Default | Description |
|---|---|---|---|
interval | N | realtime | Execution frequency. See Interval for details |
cron | N | - | Cron expression for scheduled execution. See Cron for details |
Details
Apache Kafka is an open-source distributed event streaming platform. This target allows you to send log messages to Kafka topics for real-time processing and analytics.
Authentication Methods
Kafka supports multiple SASL authentication mechanisms:
No Authentication
- Set
algorithm: "none" - No username or password required
- Suitable for development or internal networks
SASL/PLAIN
- Simple username/password authentication
- Set
algorithm: "plain" - Credentials sent over the connection (use TLS for security)
SASL/SCRAM-SHA-256
- Secure challenge-response authentication
- Set
algorithm: "scram-sha-256" - More secure than PLAIN
SASL/SCRAM-SHA-512
- Enhanced secure authentication with SHA-512
- Set
algorithm: "scram-sha-512" - Recommended for production use
Message Delivery Guarantees
The acknowledgments setting controls delivery guarantees:
| Level | Behavior | Use Case |
|---|---|---|
none | No acknowledgment from broker | Maximum throughput, lowest durability |
leader | Acknowledgment from partition leader only | Balanced throughput and durability |
all | Acknowledgment from all in-sync replicas | Maximum durability, lower throughput |
Idempotent Producer
By default, the target uses idempotent writes to prevent message duplication. This ensures exactly-once semantics within a single producer session. The disable_idempotent_write option allows disabling this feature, but it is not recommended for production use.
Compression
Message compression reduces network bandwidth and storage requirements:
| Algorithm | Compression Ratio | CPU Usage | Speed |
|---|---|---|---|
none | None | Minimal | Fastest |
gzip | High | High | Slow |
snappy | Medium | Low | Fast |
lz4 | Medium | Low | Fast |
zstd | High | Medium | Medium |
Compression Levels
Optional compression levels can be specified with the compression_level parameter:
LZ4 Compression Levels:
fast(default)level1throughlevel9(increasing compression ratio)
Gzip Compression Levels:
nocompressionbestspeeddefaultcompression(default)bestcompressionhuffmanonly
Zstd Compression Levels:
speedfastestspeeddefault(default)speedbettercompressionspeedbestcompression
Batch Processing
The target accumulates messages in memory and sends them in batches for optimal performance. Batches are sent when either the byte limit (max_bytes) or event count limit (max_events) is reached. Setting max_bytes to 0 allows unlimited batch sizes (limited only by max_events).
Topic Management
Kafka topics must exist before sending messages unless allow_auto_topic_creation is enabled. When enabled, topics are automatically created with broker default settings. For production environments, create topics in advance with appropriate partition counts and replication factors.
TLS/SSL Security
The target supports encrypted connections using TLS/SSL. For production environments:
- Set
tls.status: trueto enable encryption - Set
tls.insecure_skip_verify: falseto validate certificates - Use TLS 1.2 or higher for security compliance
- Configure mTLS with client certificates for mutual authentication
Examples
Basic Configuration
The minimum configuration for a Kafka target:
targets:
- name: basic_kafka
type: kafka
properties:
address: "kafka.example.com"
topic: "application-logs"
With SASL/PLAIN Authentication
Configuration using PLAIN authentication:
targets:
- name: authenticated_kafka
type: kafka
properties:
address: "kafka.example.com"
port: 9092
topic: "secure-logs"
algorithm: "plain"
username: "log-producer"
password: "secure-password"
With SASL/SCRAM-SHA-256
Configuration using SCRAM-SHA-256 authentication:
targets:
- name: scram_kafka
type: kafka
properties:
address: "kafka.example.com"
port: 9093
topic: "authenticated-logs"
algorithm: "scram-sha-256"
username: "kafka-producer"
password: "secure-password-here"
tls:
status: true
With SASL/SCRAM-SHA-512
Configuration using SCRAM-SHA-512 authentication:
targets:
- name: secure_kafka
type: kafka
properties:
address: "kafka.example.com"
port: 9093
topic: "production-logs"
algorithm: "scram-sha-512"
username: "producer-user"
password: "strong-password"
tls:
status: true
insecure_skip_verify: false
With TLS Encryption
Configuration with TLS enabled:
targets:
- name: encrypted_kafka
type: kafka
properties:
address: "kafka.example.com"
port: 9093
topic: "encrypted-logs"
tls:
status: true
insecure_skip_verify: false
min_tls_version: "tls1.2"
max_tls_version: "tls1.3"
With mTLS Authentication
Configuration using mutual TLS authentication:
targets:
- name: mtls_kafka
type: kafka
properties:
address: "kafka.example.com"
port: 9093
topic: "mtls-logs"
tls:
status: true
insecure_skip_verify: false
min_tls_version: "tls1.2"
cert_name: "client-cert.pem"
key_name: "client-key.pem"
passphrase: "key-passphrase"
With Compression
Configuration with zstd compression:
targets:
- name: compressed_kafka
type: kafka
properties:
address: "kafka.example.com"
topic: "compressed-logs"
compression: "zstd"
compression_level: "speedbettercompression"
High Throughput
Configuration optimized for maximum throughput:
targets:
- name: high_throughput_kafka
type: kafka
properties:
address: "kafka.example.com"
topic: "high-volume-logs"
compression: "lz4"
compression_level: "fast"
acknowledgments: "leader"
max_bytes: 1048576
max_events: 10000
High Reliability
Configuration optimized for maximum durability:
targets:
- name: reliable_kafka
type: kafka
pipelines:
- checkpoint
properties:
address: "kafka.example.com"
topic: "critical-logs"
compression: "zstd"
acknowledgments: "all"
max_events: 100
disable_idempotent_write: false
tls:
status: true
With Field Normalization
Using field normalization for standard format:
targets:
- name: normalized_kafka
type: kafka
properties:
address: "kafka.example.com"
topic: "normalized-logs"
field_format: "cim"
compression: "snappy"
Multi-Broker Cluster
Configuration for Kafka cluster with client ID:
targets:
- name: cluster_kafka
type: kafka
properties:
address: "kafka-broker-1.example.com"
port: 9092
client_id: "datastream-producer-01"
topic: "cluster-logs"
algorithm: "scram-sha-512"
username: "producer-user"
password: "cluster-password"
compression: "zstd"
acknowledgments: "all"
Auto Topic Creation
Configuration with automatic topic creation:
targets:
- name: auto_topic_kafka
type: kafka
properties:
address: "kafka.example.com"
topic: "dynamic-logs"
allow_auto_topic_creation: true
compression: "snappy"
max_events: 500
Scheduled Batching
Configuration with scheduled batch delivery:
targets:
- name: scheduled_kafka
type: kafka
properties:
address: "kafka.example.com"
topic: "scheduled-logs"
max_events: 5000
interval: "5m"
compression: "gzip"
compression_level: "bestcompression"
Development Environment
Configuration for local development without security:
targets:
- name: dev_kafka
type: kafka
properties:
address: "localhost"
port: 9092
topic: "dev-logs"
algorithm: "none"
allow_auto_topic_creation: true
Production Environment
Configuration for production with full security:
targets:
- name: prod_kafka
type: kafka
pipelines:
- checkpoint
properties:
address: "kafka-prod.example.com"
port: 9093
client_id: "datastream-prod-01"
topic: "production-logs"
algorithm: "scram-sha-512"
username: "prod-kafka-user"
password: "strong-production-password"
compression: "zstd"
compression_level: "speeddefault"
acknowledgments: "all"
max_events: 1000
disable_idempotent_write: false
tls:
status: true
insecure_skip_verify: false
min_tls_version: "tls1.2"
max_tls_version: "tls1.3"