Event Hubs
Synopsis
Creates a collector that connects to Azure Event Hubs and consumes messages from specified event hubs. Supports multiple authentication methods, TLS encryption, and multiple workers for high-throughput scenarios.
Schema
- id: <numeric>
name: <string>
description: <string>
type: eventhubs
tags: <string[]>
pipelines: <pipeline[]>
status: <boolean>
properties:
client_connection_string: <string>
tenant_id: <string>
client_id: <string>
client_secret: <string>
namespace: <string>
event_hub: <string>
consumer_group: <string>
container_connection_string: <string>
container_url: <string>
container_name: <string>
reuse: <boolean>
workers: <numeric>
tls:
status: <boolean>
cert_name: <string>
key_name: <string>
Configuration
The following fields are used to define the device:
Device
Field | Required | Default | Description |
---|---|---|---|
id | Y | Unique identifier | |
name | Y | Device name | |
description | N | - | Optional description |
type | Y | Must be eventhubs | |
tags | N | - | Optional tags |
pipelines | N | - | Optional pre-processor pipelines |
status | N | true | Enable/disable the device |
Connection
Event Hubs supports two authentication methods:
Method 1: Connection String Authentication
Field | Required | Default | Description |
---|---|---|---|
client_connection_string | Y* | Event Hubs connection string (required if not using method 2) | |
event_hub | Y | Event hub name to consume from |
Method 2: Service Principal Authentication
Field | Required | Default | Description |
---|---|---|---|
tenant_id | Y* | Azure tenant ID (required if not using connection string) | |
client_id | Y* | Azure service principal client ID | |
client_secret | Y* | Azure service principal client secret | |
namespace | Y* | Event Hubs namespace (required if not using connection string) | |
event_hub | Y | Event hub name to consume from |
Consumer Configuration
Field | Required | Default | Description |
---|---|---|---|
consumer_group | N | "$Default" | Consumer group name |
Storage Configuration
EventHubs requires checkpoint storage. Choose one method:
Method 1: Storage Account Connection String
Field | Required | Default | Description |
---|---|---|---|
container_connection_string | Y* | Azure Storage connection string | |
container_name | Y* | Blob container name for checkpoints |
* = Conditionally required (see authentication methods above)
Method 2: Storage Account URL
Field | Required | Default | Description |
---|---|---|---|
container_url | Y* | Azure Storage container URL |
TLS
Field | Required | Default | Description |
---|---|---|---|
tls.status | N | false | Enable TLS encryption |
tls.cert_name | N* | TLS certificate file path (required if TLS enabled) | |
tls.key_name | N* | TLS private key file path (required if TLS enabled) |
* = Conditionally required (only when tls.status: true
)
TLS certificate and key files must be placed in the service root directory.
Advanced Configuration
To enhance performance and achieve better message handling, the following settings are used.
Performance
Field | Required | Default | Description |
---|---|---|---|
reuse | N | true | Enable multi-worker mode |
workers | N | 4 | Number of worker processes when reuse enabled |
Key Features
Multiple Workers
When reuse
is enabled, the collector uses multiple workers. Each worker maintains its own Event Hubs consumer and processes messages independently, automatically balancing message volumes. The worker count is capped at the number of available CPU cores.
Messages
The collector supports automatic checkpoint management, consumer group load balancing, multiple Event Hub subscriptions, TLS-encrypted connections, both connection string and service principal authentication, and custom message-processing pipelines.
Examples
The following are commonly used configuration types.
Basic with Connection String
The minimum required configuration using an Event Hubs connection string:
Creating a simple EventHubs consumer with connection string... |
|
Service Principal Authentication
Using Azure service principal for authentication:
Connecting with service principal authentication... |
|
High-Volume Processing
Performance optimization for high message volumes:
Optimizing for throughput with multiple workers... |
|
When reuse
is enabled, the actual worker count will be capped at the number of available CPU cores.
Secure Connection
Using TLS encryption for enhanced security:
Secure EventHubs connection with TLS... |
|
Pipeline Processing
Messages can be pre-processed using pipelines:
Applying custom processing to EventHubs messages... |
|
Pipelines are processed sequentially and can modify or drop messages before ingestion.
Multiple Consumer Groups
For load distribution across multiple DataStream instances:
Configuring consumer groups for load distribution... |
|
Each consumer group should use a separate checkpoint container to avoid conflicts.