Skip to main content
Version: 1.5.1

MQTT

MQTT Message Queue

Synopsis

Creates a target that publishes log messages to MQTT topics with support for batching, QoS levels, TLS encryption, and automatic retry logic.

Schema

- name: <string>
description: <string>
type: mqtt
pipelines: <pipeline[]>
status: <boolean>
properties:
url: <string>
topic: <string>
client_id: <string>
username: <string>
password: <string>
qos: <integer>
retained: <boolean>
clean_session: <boolean>
auto_reconnect: <boolean>
keep_alive: <integer>
timeout: <integer>
batch_size: <integer>
max_retries: <integer>
retry_delay: <integer>
field_format: <string>
tls:
status: <boolean>
verify: <boolean>
cert_name: <string>
key_name: <string>
min_tls_version: <string>
max_tls_version: <string>
interval: <string|numeric>
cron: <string>
debug:
status: <boolean>
dont_send_logs: <boolean>

Configuration

The following fields are used to define the target:

FieldRequiredDefaultDescription
nameYTarget name
descriptionN-Optional description
typeYMust be mqtt
pipelinesN-Optional post-processor pipelines
statusNtrueEnable/disable the target

Connection

FieldRequiredDefaultDescription
urlY-MQTT broker URL (e.g., tcp://localhost:1883, ssl://broker:8883, ws://broker:9001)
topicY-MQTT topic for message publishing
client_idNAuto-generatedMQTT client identifier (auto-generated if not specified)
usernameN-MQTT username for authentication
passwordN-MQTT password for authentication
timeoutN30Connection and write timeout in seconds
keep_aliveN60Keep-alive interval in seconds

MQTT Settings

FieldRequiredDefaultDescription
qosN1Quality of Service level: 0 (at most once), 1 (at least once), 2 (exactly once)
retainedNfalseRetain messages on broker for new subscribers
clean_sessionNtrueStart with clean session (discard previous session state)
auto_reconnectNtrueEnable automatic reconnection on connection loss

Batch Configuration

FieldRequiredDefaultDescription
batch_sizeN1000Number of messages to batch before publishing (minimum 1)
max_retriesN3Maximum retry attempts for failed publish operations
retry_delayN1Delay between retry attempts in seconds

Processing

FieldRequiredDefaultDescription
field_formatN-Data normalization format. See applicable Normalization section

TLS Configuration

FieldRequiredDefaultDescription
tls.statusNfalseEnable TLS encryption
tls.verifyNtrueVerify server TLS certificate
tls.cert_nameN*-Client certificate filename (PEM format)
tls.key_nameN*-Client private key filename (PEM format)
tls.min_tls_versionNtls1.2Minimum TLS version: tls1.0, tls1.1, tls1.2, tls1.3
tls.max_tls_versionNtls1.3Maximum TLS version: tls1.0, tls1.1, tls1.2, tls1.3

* = Conditionally required. Both cert_name and key_name must be provided together or omitted together.

Scheduler

FieldRequiredDefaultDescription
intervalNrealtimeExecution frequency. See Interval for details
cronN-Cron expression for scheduled execution. See Cron for details

Debug Options

FieldRequiredDefaultDescription
debug.statusNfalseEnable debug logging
debug.dont_send_logsNfalseProcess logs but don't send to target (testing)

Details

The MQTT target publishes log messages to MQTT topics using the Eclipse Paho MQTT client library. Messages are accumulated in batches and published when the batch size is reached. Each message is published with automatic retry logic and configurable Quality of Service levels.

The target maintains a persistent connection to the MQTT broker with automatic reconnection capabilities. Connection options include configurable timeouts, keep-alive intervals, and session management.

Quality of Service Levels

  • QoS 0 (At most once): Fire-and-forget, no acknowledgment, lowest overhead
  • QoS 1 (At least once): Acknowledged delivery, message may be duplicated
  • QoS 2 (Exactly once): Guaranteed single delivery, highest overhead

Prerequisites

  1. A running MQTT broker (e.g., Mosquitto, HiveMQ, AWS IoT Core)
  2. Network connectivity to the MQTT broker
  3. Valid authentication credentials if the broker requires authentication
  4. TLS certificates if using encrypted connections
note

The client_id is auto-generated if not specified. For persistent sessions (clean_session: false), use a consistent client ID across reconnections.

warning

Both tls.cert_name and tls.key_name must be provided together when using client certificate authentication. Providing only one will result in a configuration error.

note

The retained flag causes the broker to store the last message on the topic, delivering it to new subscribers immediately. Use with caution for high-frequency log data.

Examples

Basic

Minimum configuration for publishing to MQTT:

targets:
- name: basic_mqtt
type: mqtt
properties:
url: "tcp://localhost:1883"
topic: "logs/system"

With Authentication

Configuration with username/password authentication:

targets:
- name: authenticated_mqtt
type: mqtt
properties:
url: "tcp://mqtt.example.com:1883"
topic: "logs/application"
username: "logger"
password: "secure_password"

With TLS

Configuration with TLS encryption:

targets:
- name: secure_mqtt
type: mqtt
properties:
url: "ssl://mqtt.example.com:8883"
topic: "logs/secure"
username: "logger"
password: "secure_password"
tls:
status: true
verify: true
min_tls_version: "tls1.2"

With Client Certificate

Configuration using mutual TLS with client certificates:

targets:
- name: mtls_mqtt
type: mqtt
properties:
url: "ssl://mqtt.example.com:8883"
topic: "logs/secure"
tls:
status: true
verify: true
cert_name: "client.pem"
key_name: "client-key.pem"
min_tls_version: "tls1.2"

High Reliability

Configuration with QoS 2 for guaranteed delivery:

targets:
- name: reliable_mqtt
type: mqtt
properties:
url: "tcp://mqtt.example.com:1883"
topic: "logs/critical"
qos: 2
clean_session: false
client_id: "datastream-logger-001"

Retained Messages

Configuration with retained messages for topic state:

targets:
- name: retained_mqtt
type: mqtt
properties:
url: "tcp://mqtt.example.com:1883"
topic: "logs/status"
retained: true
qos: 1

Custom Batch Size

Configuration with custom batching and retry settings:

targets:
- name: batch_mqtt
type: mqtt
properties:
url: "tcp://mqtt.example.com:1883"
topic: "logs/highvolume"
batch_size: 5000
max_retries: 5
retry_delay: 2
timeout: 60

With Normalization

Configuration using field normalization:

targets:
- name: normalized_mqtt
type: mqtt
properties:
url: "tcp://mqtt.example.com:1883"
topic: "logs/ecs"
field_format: "ecs"

WebSocket Connection

Configuration using WebSocket transport:

targets:
- name: ws_mqtt
type: mqtt
properties:
url: "ws://mqtt.example.com:9001"
topic: "logs/websocket"
username: "logger"
password: "secure_password"

With Pipeline

Using a pipeline for additional log processing:

targets:
- name: pipeline_mqtt
type: mqtt
pipelines:
- enrich_logs
properties:
url: "tcp://mqtt.example.com:1883"
topic: "logs/enriched"