Skip to main content
Version: 1.5.0

Apache Kafka

Apache Kafka Message Queue

Synopsis

Creates a target that writes log messages to Apache Kafka topics with support for batching, compression, and various authentication methods. The target handles message delivery efficiently with configurable batch limits based on size or event count. Apache Kafka is an open-source distributed event streaming platform for high-throughput, fault-tolerant data pipelines.

Schema

- name: <string>
description: <string>
type: kafka
pipelines: <pipeline[]>
status: <boolean>
properties:
address: <string>
port: <numeric>
client_id: <string>
topic: <string>
algorithm: <string>
username: <string>
password: <string>
compression: <string>
compression_level: <string>
acknowledgments: <string>
allow_auto_topic_creation: <boolean>
disable_idempotent_write: <boolean>
max_bytes: <numeric>
max_events: <numeric>
field_format: <string>
tls:
status: <boolean>
insecure_skip_verify: <boolean>
min_tls_version: <string>
max_tls_version: <string>
cert_name: <string>
key_name: <string>
passphrase: <string>
interval: <string|numeric>
cron: <string>

Configuration

The following fields are used to define the target:

FieldRequiredDefaultDescription
nameYTarget name
descriptionN-Optional description
typeYMust be kafka
pipelinesN-Optional post-processor pipelines
statusNtrueEnable/disable the target

Kafka Connection

FieldRequiredDefaultDescription
addressY-Kafka broker address (hostname or IP)
portN9092Kafka broker port (1-65535)
client_idN-Client identifier for connection tracking
topicY-Kafka topic name for message delivery

Authentication

FieldRequiredDefaultDescription
algorithmN"none"Authentication mechanism: none, plain, scram-sha-256, scram-sha-512
usernameN*-Username for SASL authentication
passwordN*-Password for SASL authentication

* = Conditionally required when algorithm is not none.

Producer Settings

FieldRequiredDefaultDescription
compressionN"none"Message compression: none, gzip, snappy, lz4, zstd
compression_levelN-Compression level (algorithm-specific, see details below)
acknowledgmentsN"leader"Acknowledgment level: none, leader, all
allow_auto_topic_creationNfalseAllow automatic topic creation if topic doesn't exist
disable_idempotent_writeNfalseDisable idempotent producer (not recommended)

Batch Configuration

FieldRequiredDefaultDescription
max_bytesN0Maximum batch size in bytes (0 = unlimited)
max_eventsN1000Maximum number of events per batch
field_formatN-Data normalization format. See applicable Normalization section
note

Batches are sent when either max_bytes or max_events limit is reached, whichever comes first.

TLS Configuration

FieldRequiredDefaultDescription
tls.statusNfalseEnable TLS/SSL encryption
tls.insecure_skip_verifyNfalseSkip certificate verification (not recommended)
tls.min_tls_versionN"tls1.2"Minimum TLS version: tls1.0, tls1.1, tls1.2, tls1.3
tls.max_tls_versionN"tls1.2"Maximum TLS version: tls1.0, tls1.1, tls1.2, tls1.3
tls.cert_nameN"cert.pem"Client certificate file name for mTLS
tls.key_nameN"key.pem"Private key file name for mTLS
tls.passphraseN-Passphrase for encrypted private key

Scheduler

FieldRequiredDefaultDescription
intervalNrealtimeExecution frequency. See Interval for details
cronN-Cron expression for scheduled execution. See Cron for details

Details

Apache Kafka is an open-source distributed event streaming platform. This target allows you to send log messages to Kafka topics for real-time processing and analytics.

Authentication Methods

Kafka supports multiple SASL authentication mechanisms:

No Authentication

  • Set algorithm: "none"
  • No username or password required
  • Suitable for development or internal networks

SASL/PLAIN

  • Simple username/password authentication
  • Set algorithm: "plain"
  • Credentials sent over the connection (use TLS for security)

SASL/SCRAM-SHA-256

  • Secure challenge-response authentication
  • Set algorithm: "scram-sha-256"
  • More secure than PLAIN

SASL/SCRAM-SHA-512

  • Enhanced secure authentication with SHA-512
  • Set algorithm: "scram-sha-512"
  • Recommended for production use

Message Delivery Guarantees

The acknowledgments setting controls delivery guarantees:

LevelBehaviorUse Case
noneNo acknowledgment from brokerMaximum throughput, lowest durability
leaderAcknowledgment from partition leader onlyBalanced throughput and durability
allAcknowledgment from all in-sync replicasMaximum durability, lower throughput

Idempotent Producer

By default, the target uses idempotent writes to prevent message duplication. This ensures exactly-once semantics within a single producer session. The disable_idempotent_write option allows disabling this feature, but it is not recommended for production use.

Compression

Message compression reduces network bandwidth and storage requirements:

AlgorithmCompression RatioCPU UsageSpeed
noneNoneMinimalFastest
gzipHighHighSlow
snappyMediumLowFast
lz4MediumLowFast
zstdHighMediumMedium

Compression Levels

Optional compression levels can be specified with the compression_level parameter:

LZ4 Compression Levels:

  • fast (default)
  • level1 through level9 (increasing compression ratio)

Gzip Compression Levels:

  • nocompression
  • bestspeed
  • defaultcompression (default)
  • bestcompression
  • huffmanonly

Zstd Compression Levels:

  • speedfastest
  • speeddefault (default)
  • speedbettercompression
  • speedbestcompression

Batch Processing

The target accumulates messages in memory and sends them in batches for optimal performance. Batches are sent when either the byte limit (max_bytes) or event count limit (max_events) is reached. Setting max_bytes to 0 allows unlimited batch sizes (limited only by max_events).

Topic Management

Kafka topics must exist before sending messages unless allow_auto_topic_creation is enabled. When enabled, topics are automatically created with broker default settings. For production environments, create topics in advance with appropriate partition counts and replication factors.

TLS/SSL Security

The target supports encrypted connections using TLS/SSL. For production environments:

  • Set tls.status: true to enable encryption
  • Set tls.insecure_skip_verify: false to validate certificates
  • Use TLS 1.2 or higher for security compliance
  • Configure mTLS with client certificates for mutual authentication

Examples

Basic Configuration

The minimum configuration for a Kafka target:

targets:
- name: basic_kafka
type: kafka
properties:
address: "kafka.example.com"
topic: "application-logs"

With SASL/PLAIN Authentication

Configuration using PLAIN authentication:

targets:
- name: authenticated_kafka
type: kafka
properties:
address: "kafka.example.com"
port: 9092
topic: "secure-logs"
algorithm: "plain"
username: "log-producer"
password: "secure-password"

With SASL/SCRAM-SHA-256

Configuration using SCRAM-SHA-256 authentication:

targets:
- name: scram_kafka
type: kafka
properties:
address: "kafka.example.com"
port: 9093
topic: "authenticated-logs"
algorithm: "scram-sha-256"
username: "kafka-producer"
password: "secure-password-here"
tls:
status: true

With SASL/SCRAM-SHA-512

Configuration using SCRAM-SHA-512 authentication:

targets:
- name: secure_kafka
type: kafka
properties:
address: "kafka.example.com"
port: 9093
topic: "production-logs"
algorithm: "scram-sha-512"
username: "producer-user"
password: "strong-password"
tls:
status: true
insecure_skip_verify: false

With TLS Encryption

Configuration with TLS enabled:

targets:
- name: encrypted_kafka
type: kafka
properties:
address: "kafka.example.com"
port: 9093
topic: "encrypted-logs"
tls:
status: true
insecure_skip_verify: false
min_tls_version: "tls1.2"
max_tls_version: "tls1.3"

With mTLS Authentication

Configuration using mutual TLS authentication:

targets:
- name: mtls_kafka
type: kafka
properties:
address: "kafka.example.com"
port: 9093
topic: "mtls-logs"
tls:
status: true
insecure_skip_verify: false
min_tls_version: "tls1.2"
cert_name: "client-cert.pem"
key_name: "client-key.pem"
passphrase: "key-passphrase"

With Compression

Configuration with zstd compression:

targets:
- name: compressed_kafka
type: kafka
properties:
address: "kafka.example.com"
topic: "compressed-logs"
compression: "zstd"
compression_level: "speedbettercompression"

High Throughput

Configuration optimized for maximum throughput:

targets:
- name: high_throughput_kafka
type: kafka
properties:
address: "kafka.example.com"
topic: "high-volume-logs"
compression: "lz4"
compression_level: "fast"
acknowledgments: "leader"
max_bytes: 1048576
max_events: 10000

High Reliability

Configuration optimized for maximum durability:

targets:
- name: reliable_kafka
type: kafka
pipelines:
- checkpoint
properties:
address: "kafka.example.com"
topic: "critical-logs"
compression: "zstd"
acknowledgments: "all"
max_events: 100
disable_idempotent_write: false
tls:
status: true

With Field Normalization

Using field normalization for standard format:

targets:
- name: normalized_kafka
type: kafka
properties:
address: "kafka.example.com"
topic: "normalized-logs"
field_format: "cim"
compression: "snappy"

Multi-Broker Cluster

Configuration for Kafka cluster with client ID:

targets:
- name: cluster_kafka
type: kafka
properties:
address: "kafka-broker-1.example.com"
port: 9092
client_id: "datastream-producer-01"
topic: "cluster-logs"
algorithm: "scram-sha-512"
username: "producer-user"
password: "cluster-password"
compression: "zstd"
acknowledgments: "all"

Auto Topic Creation

Configuration with automatic topic creation:

targets:
- name: auto_topic_kafka
type: kafka
properties:
address: "kafka.example.com"
topic: "dynamic-logs"
allow_auto_topic_creation: true
compression: "snappy"
max_events: 500

Scheduled Batching

Configuration with scheduled batch delivery:

targets:
- name: scheduled_kafka
type: kafka
properties:
address: "kafka.example.com"
topic: "scheduled-logs"
max_events: 5000
interval: "5m"
compression: "gzip"
compression_level: "bestcompression"

Development Environment

Configuration for local development without security:

targets:
- name: dev_kafka
type: kafka
properties:
address: "localhost"
port: 9092
topic: "dev-logs"
algorithm: "none"
allow_auto_topic_creation: true

Production Environment

Configuration for production with full security:

targets:
- name: prod_kafka
type: kafka
pipelines:
- checkpoint
properties:
address: "kafka-prod.example.com"
port: 9093
client_id: "datastream-prod-01"
topic: "production-logs"
algorithm: "scram-sha-512"
username: "prod-kafka-user"
password: "strong-production-password"
compression: "zstd"
compression_level: "speeddefault"
acknowledgments: "all"
max_events: 1000
disable_idempotent_write: false
tls:
status: true
insecure_skip_verify: false
min_tls_version: "tls1.2"
max_tls_version: "tls1.3"