Skip to main content
Version: 1.3.0

Event Hubs

Pull

Synopsis

Creates a collector that connects to Azure Event Hubs and consumes messages from specified event hubs. Supports multiple authentication methods, TLS encryption, and multiple workers for high-throughput scenarios.

Schema

- id: <numeric>
name: <string>
description: <string>
type: eventhubs
tags: <string[]>
pipelines: <pipeline[]>
status: <boolean>
properties:
client_connection_string: <string>
tenant_id: <string>
client_id: <string>
client_secret: <string>
namespace: <string>
event_hub: <string>
consumer_group: <string>
container_connection_string: <string>
container_url: <string>
container_name: <string>
reuse: <boolean>
workers: <numeric>
tls:
status: <boolean>
cert_name: <string>
key_name: <string>

Configuration

The following fields are used to define the device:

Device

FieldRequiredDefaultDescription
idYUnique identifier
nameYDevice name
descriptionN-Optional description
typeYMust be eventhubs
tagsN-Optional tags
pipelinesN-Optional pre-processor pipelines
statusNtrueEnable/disable the device

Connection

Event Hubs supports two authentication methods:

Method 1: Connection String Authentication

FieldRequiredDefaultDescription
client_connection_stringY*Event Hubs connection string (required if not using method 2)
event_hubYEvent hub name to consume from

Method 2: Service Principal Authentication

FieldRequiredDefaultDescription
tenant_idY*Azure tenant ID (required if not using connection string)
client_idY*Azure service principal client ID
client_secretY*Azure service principal client secret
namespaceY*Event Hubs namespace (required if not using connection string)
event_hubYEvent hub name to consume from

Consumer Configuration

FieldRequiredDefaultDescription
consumer_groupN"$Default"Consumer group name

Storage Configuration

EventHubs requires checkpoint storage. Choose one method:

Method 1: Storage Account Connection String

FieldRequiredDefaultDescription
container_connection_stringY*Azure Storage connection string
container_nameY*Blob container name for checkpoints

* = Conditionally required (see authentication methods above)

Method 2: Storage Account URL

FieldRequiredDefaultDescription
container_urlY*Azure Storage container URL

TLS

FieldRequiredDefaultDescription
tls.statusNfalseEnable TLS encryption
tls.cert_nameN*TLS certificate file path (required if TLS enabled)
tls.key_nameN*TLS private key file path (required if TLS enabled)

* = Conditionally required (only when tls.status: true)

note

TLS certificate and key files must be placed in the service root directory.

Advanced Configuration

To enhance performance and achieve better message handling, the following settings are used.

Performance

FieldRequiredDefaultDescription
reuseNtrueEnable multi-worker mode
workersN4Number of worker processes when reuse enabled

Key Features

Multiple Workers

When reuse is enabled, the collector uses multiple workers. Each worker maintains its own Event Hubs consumer and processes messages independently, automatically balancing message volumes. The worker count is capped at the number of available CPU cores.

Messages

The collector supports automatic checkpoint management, consumer group load balancing, multiple Event Hub subscriptions, TLS-encrypted connections, both connection string and service principal authentication, and custom message-processing pipelines.

Examples

The following are commonly used configuration types.

Basic with Connection String

The minimum required configuration using an Event Hubs connection string:

Creating a simple EventHubs consumer with connection string...

devices:
- id: 1
name: basic_eventhubs
type: eventhubs
properties:
client_connection_string: "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=mykey;SharedAccessKey=myvalue"
event_hub: "logs"
container_connection_string: "DefaultEndpointsProtocol=https;AccountName=mystorage;AccountKey=mykey"
container_name: "checkpoints"

Service Principal Authentication

Using Azure service principal for authentication:

Connecting with service principal authentication...

devices:
- id: 2
name: sp_eventhubs
type: eventhubs
properties:
tenant_id: "12345678-1234-1234-1234-123456789012"
client_id: "87654321-4321-4321-4321-210987654321"
client_secret: "${AZURE_CLIENT_SECRET}"
namespace: "mynamespace"
event_hub: "security-logs"
consumer_group: "datastream-group"
container_url: "https://mystorage.blob.core.windows.net/checkpoints"

High-Volume Processing

Performance optimization for high message volumes:

Optimizing for throughput with multiple workers...

devices:
- id: 3
name: performant_eventhubs
type: eventhubs
properties:
client_connection_string: "${EVENTHUBS_CONNECTION_STRING}"
event_hub: "high-volume-logs"
consumer_group: "processing-group"
container_connection_string: "${STORAGE_CONNECTION_STRING}"
container_name: "checkpoints"
reuse: true
workers: 8
note

When reuse is enabled, the actual worker count will be capped at the number of available CPU cores.

Secure Connection

Using TLS encryption for enhanced security:

Secure EventHubs connection with TLS...

devices:
- id: 4
name: secure_eventhubs
type: eventhubs
properties:
tenant_id: "${AZURE_TENANT_ID}"
client_id: "${AZURE_CLIENT_ID}"
client_secret: "${AZURE_CLIENT_SECRET}"
namespace: "secure-namespace"
event_hub: "secure-logs"
consumer_group: "secure-group"
container_url: "${STORAGE_CONTAINER_URL}"
tls:
status: true
cert_name: "eventhubs.crt"
key_name: "eventhubs.key"

Pipeline Processing

Messages can be pre-processed using pipelines:

Applying custom processing to EventHubs messages...

devices:
- id: 5
name: pipeline_eventhubs
type: eventhubs
pipelines:
- json_parser
- field_extractor
- normalize_timestamps
properties:
client_connection_string: "${EVENTHUBS_CONNECTION_STRING}"
event_hub: "application-logs"
consumer_group: "processing-group"
container_connection_string: "${STORAGE_CONNECTION_STRING}"
container_name: "checkpoints"
note

Pipelines are processed sequentially and can modify or drop messages before ingestion.

Multiple Consumer Groups

For load distribution across multiple DataStream instances:

Configuring consumer groups for load distribution...

devices:
- id: 6
name: distributed_eventhubs
type: eventhubs
properties:
client_connection_string: "${EVENTHUBS_CONNECTION_STRING}"
event_hub: "distributed-logs"
consumer_group: "instance-1"
container_connection_string: "${STORAGE_CONNECTION_STRING}"
container_name: "checkpoints-instance1"
reuse: true
workers: 4
warning

Each consumer group should use a separate checkpoint container to avoid conflicts.