Handling Failures
The processors in a pipeline run sequentially. When a processor fails or encounters an error, DataStream looks for an on_failure handler at that processor level. If none is found, the error propagates to the pipeline level, then up through any parent pipelines until an on_failure handler is found. If no handler is found at any level, a default error handler drops the entry without interrupting the pipeline's operation.
The on_failure field specifies a list of processors to run immediately after a processor fails, enabling controlled error recovery and logging without stopping the entire pipeline.
Recovery
To ignore a failure and keep running the remaining processors in the pipeline, ignore_failure must be set to true.
- rename:
field: provider
target_field: cloud.provider
ignore_failure: true
Alternatively, the recover processor can be used within an on_failure handler to signal error recovery. When used, it allows the parent pipeline to continue processing without treating the error as a failure.
- pipeline:
name: risky_operation
on_failure:
- recover:
description: "Recover from risky operation failure"
Unlike ignore_failure: true which silently ignores errors, recover allows you to execute error handling logic (such as logging or setting error fields) before signaling that the error has been handled and the parent pipeline should continue.
Continuation
To configure a processor so that in the event of a failure another processor runs, the on_failure parameter can be used.
- rename:
field: foo
target_field: bar
on_failure:
- set:
field: error.message
value: "Field 'foo' could not be renamed as 'bar'"
override: false
Nesting
For cascading errors, multiple on_failure processors can be nested.
- rename:
field: foo
target_field: bar
on_failure:
- set:
field: error.message
value: "Field 'foo' could not be renamed as 'bar'"
override: false
on_failure:
- set:
field: error.message.cascade
value: "Multiple errors encountered"
override: true
Pipelines
The on_failure option can be specified at the pipeline level. When a processor fails without its own on_failure handler, the error propagates to the pipeline's on_failure handler. If the pipeline is nested within another pipeline, and no on_failure handler is found, the error continues propagating up through parent pipelines until a handler is found.
pipelines:
- name: routine_process
on_failure:
- set:
field: _index
value: failed-{{ _ingest.on_failure_processor_tag }}
description: "Processor failed"
Metadata
More information is available in the metadata fields _ingest.on_failure_message, _ingest.on_failure_processor_type, _ingest.on_failure_processor_tag, and _ingest.on_failure_pipeline which can only be accessed from within an on_failure block.
pipelines:
- name: routine_process
on_failure:
- set:
field: error_info
value: "{{ _ingest.on_failure_processor_type }} in {{ _ingest.on_failure_pipeline }} failed; error message: {{ _ingest.on_failure_message }}"
Default Error Handler
If no on_failure handler is found at any level—processor, pipeline, or parent pipelines—DataStream applies a default error handler that drops the failed entry. This prevents a single failure from interrupting the entire pipeline's operation.
# Default behavior applied automatically when no on_failure is specified
on_failure:
- drop:
description: "Drop entry on failure"
To customize error handling, specify an on_failure handler at any level. The error will be caught by the first handler encountered during propagation.