Skip to main content
Version: 1.4.0

Group

Control Flow Elastic Compatible

Synopsis

Groups multiple processors together into a named unit that can be executed conditionally and organized hierarchically with parent-child relationships.

Schema

- group:
description: <text>
name: <string>
parent_group: <string>
processors: <processor[]>
if: <script>
ignore_failure: <boolean>
ignore_missing: <boolean>
on_failure: <processor[]>
on_success: <processor[]>
tag: <string>

Configuration

The following fields are used to define the processor:

FieldRequiredDefaultDescription
nameYUnique name for the processor group
processorsYArray of processors to execute within the group
parent_groupN-Name of parent group that must execute before this group
descriptionN-Explanatory notes
ifN-Condition to run
ignore_failureNfalseSee Handling Failures
ignore_missingNfalseContinue processing if referenced fields are missing
on_failureN-See Handling Failures
on_successN-See Handling Success
tagN-Identifier

Details

The group processor provides a way to organize and control the execution of multiple processors as a single unit. It offers several key features:

  • Sequential execution: All processors within the group are executed in the order they are defined
  • Conditional execution: The entire group can be conditionally executed based on the if condition
  • Hierarchical organization: Groups can be organized in parent-child relationships using parent_group
  • Named tracking: Each group has a unique name that is tracked during pipeline execution
  • Failure handling: Groups can have their own failure handling logic that applies to all contained processors

The processor maintains execution state through the config.PreviousGroup field, which tracks the last successfully executed group. This enables conditional group execution based on the execution history.

info

Groups are particularly useful for organizing complex pipelines where certain sets of processors should only run under specific conditions or after other groups have completed successfully.

note

When a group specifies a parent_group, it will only execute if the specified parent group was the last group to execute successfully. This creates a chain of dependent group execution.

tip

Use meaningful group names and organize related processors together. This makes pipeline debugging easier and allows for better conditional logic based on processing history.

Examples

Basic Processor Grouping

Group related processors together...

{
"user_id": "12345",
"action": "login"
}
- group:
name: user_processing
processors:
- set:
field: event_type
value: user_event
- set:
field: timestamp
value: "{{@timestamp}}"
- convert:
field: user_id
type: integer

for organized execution:

{
"user_id": 12345,
"action": "login",
"event_type": "user_event",
"timestamp": "2024-01-15T10:30:00Z"
}

Conditional Group Execution

Execute groups based on conditions...

{
"event_type": "error",
"severity": "high",
"message": "Database connection failed"
}
- group:
name: error_processing
if: "event_type == 'error'"
processors:
- set:
field: alert_required
value: true
- set:
field: escalation_level
value: "{{severity}}"
- append:
field: tags
value: critical

only when criteria are met:

{
"event_type": "error",
"severity": "high",
"message": "Database connection failed",
"alert_required": true,
"escalation_level": "high",
"tags": ["critical"]
}

Parent-Child Group Relationships

Create dependent group execution...

{
"raw_data": "user:john,action:login,ip:192.168.1.1"
}
- group:
name: data_parsing
processors:
- kv:
field: raw_data
field_split: ","
value_split: ":"
- group:
name: data_enrichment
parent_group: data_parsing
processors:
- geoip:
field: ip
- set:
field: processed_at
value: "{{@timestamp}}"

where enrichment only runs after parsing:

{
"raw_data": "user:john,action:login,ip:192.168.1.1",
"user": "john",
"action": "login",
"ip": "192.168.1.1",
"geo": {"city": "San Francisco"},
"processed_at": "2024-01-15T10:30:00Z"
}

Error Handling in Groups

Handle group-level failures...

{
"email": "invalid-email",
"phone": "555-1234"
}
- group:
name: validation
processors:
# Example processor that might fail
- validate_email:
field: email
on_failure:
- set:
field: validation_error
value: "Email validation failed"
- set:
field: validation_status
value: failed

with group-specific error handling:

{
"email": "invalid-email",
"phone": "555-1234",
"validation_error": "Email validation failed",
"validation_status": "failed"
}

Security Processing Pipeline

Organize security-related processing...

{
"source_ip": "192.168.1.100",
"user_agent": "Mozilla/5.0...",
"request_path": "/admin/login"
}
- group:
name: threat_detection
if: "request_path contains '/admin'"
processors:
- geoip:
field: source_ip
target_field: geo_info
- set:
field: risk_score
value: 5
- append:
field: security_tags
value: admin_access
on_success:
- set:
field: threat_analysis_complete
value: true

for admin access attempts:

{
"source_ip": "192.168.1.100",
"user_agent": "Mozilla/5.0...",
"request_path": "/admin/login",
"geo_info": {"country": "US"},
"risk_score": 5,
"security_tags": ["admin_access"],
"threat_analysis_complete": true
}

Multi-Stage Data Processing

Create multi-stage processing pipeline...

{
"log_line": "2024-01-15 10:30:00 ERROR Database connection timeout"
}
- group:
name: log_parsing
processors:
- grok:
field: log_line
patterns:
- "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"
- group:
name: log_enrichment
parent_group: log_parsing
processors:
- date:
field: timestamp
target_field: "@timestamp"
- convert:
field: level
type: string
- group:
name: log_classification
parent_group: log_enrichment
if: "level == 'ERROR'"
processors:
- set:
field: alert_required
value: true
- set:
field: category
value: system_error

with dependent execution stages:

{
"log_line": "2024-01-15 10:30:00 ERROR Database connection timeout",
"timestamp": "2024-01-15 10:30:00",
"level": "ERROR",
"message": "Database connection timeout",
"@timestamp": "2024-01-15T10:30:00Z",
"alert_required": true,
"category": "system_error"
}

Conditional Branching

Create conditional processing branches...

{
"event_source": "web_server",
"status_code": 404,
"request_path": "/api/users"
}
- group:
name: web_server_processing
if: "event_source == 'web_server'"
processors:
- http_status:
field: status_code
- append:
field: tags
value: web_traffic
- group:
name: database_processing
if: "event_source == 'database'"
processors:
- set:
field: db_category
value: query_log
- append:
field: tags
value: database_activity

based on event characteristics:

{
"event_source": "web_server",
"status_code": "Not Found",
"request_path": "/api/users",
"tags": ["web_traffic"]
}

Complex Group Hierarchy

Build complex processing hierarchies...

{
"event_type": "user_activity",
"user_id": "12345",
"action": "file_access",
"file_path": "/sensitive/data.txt"
}
- group:
name: base_processing
processors:
- convert:
field: user_id
type: integer
- lowercase:
field: action
- group:
name: security_analysis
parent_group: base_processing
if: "file_path contains '/sensitive'"
processors:
- set:
field: security_risk
value: high
- append:
field: alerts
value: sensitive_file_access
- group:
name: compliance_logging
parent_group: security_analysis
processors:
- set:
field: compliance_logged
value: true
- set:
field: audit_required
value: true

with cascading execution:

{
"event_type": "user_activity",
"user_id": 12345,
"action": "file_access",
"file_path": "/sensitive/data.txt",
"security_risk": "high",
"alerts": ["sensitive_file_access"],
"compliance_logged": true,
"audit_required": true
}