Skip to main content
Version: 1.3.0

Enforce Schema

Validate Enterprise

Synopsis

Validates log entries against predefined schemas and enforces data structure compliance using Avro or Parquet schema definitions.

Schema

- enforce_schema:
description: <text>
schema: <string>
schema_type: <string>
if: <script>
ignore_failure: <boolean>
on_failure: <processor[]>
on_success: <processor[]>
tag: <string>

Configuration

The following fields are used to define the processor:

FieldRequiredDefaultDescription
schemaYSchema definition string or reference
schema_typeN"parquet"Schema format type ("avro" or "parquet")
descriptionN-Explanatory notes
ifN-Condition to run
ignore_failureNfalseSee Handling Failures
on_failureN-See Handling Failures
on_successN-See Handling Success
tagN-Identifier

Details

The processor validates log entries against schema definitions to ensure data structure compliance. It supports two schema formats:

  • Avro schemas: JSON-based schema definitions that provide rich data type validation and evolution support
  • Parquet schemas: Column-oriented schema definitions optimized for analytics and big data processing

The processor caches compiled schemas using content hashing for performance optimization. When validation occurs, the log entry is automatically transformed to match the schema requirements, including:

  • Type coercion: Converting compatible types to match schema expectations
  • Field validation: Ensuring required fields are present
  • Structure enforcement: Organizing nested data according to schema hierarchy
  • Default value assignment: Adding missing fields with default values where defined
info

Schema enforcement is particularly useful in data pipelines where downstream systems require strict data contracts and consistent field types.

warning

Schema validation can modify the original log entry structure. Fields that don't match the schema may be transformed, removed, or have their types changed to ensure compliance.

tip

Use schema caching effectively by keeping schema definitions consistent. The processor uses content hashing to cache compiled schemas, so identical schema strings will reuse cached versions.

Examples

Basic Parquet Schema

Enforce a simple Parquet schema...

{
"user_id": "12345",
"timestamp": "2024-01-15T10:30:00Z",
"action": "login"
}
- enforce_schema:
schema: |
message user_event {
required int64 user_id;
required int64 timestamp;
optional binary action (UTF8);
}

with automatic type conversion:

{
"user_id": 12345,
"timestamp": 1705316200000,
"action": "login"
}

Avro Schema Validation

Use an Avro schema for rich validation...

{
"name": "John Doe",
"age": "30",
"email": "john@example.com"
}
- enforce_schema:
schema_type: avro
schema: |
{
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "email", "type": "string"}
]
}

ensuring proper data types:

{
"name": "John Doe",
"age": 30,
"email": "john@example.com"
}

Nested Structure Validation

Validate complex nested structures...

{
"user": {
"id": "123",
"profile": {
"name": "Jane",
"settings": {"theme": "dark"}
}
}
}
- enforce_schema:
schema_type: avro
schema: |
{
"type": "record",
"name": "UserEvent",
"fields": [
{
"name": "user",
"type": {
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{
"name": "profile",
"type": {
"type": "record",
"name": "Profile",
"fields": [
{"name": "name", "type": "string"},
{"name": "settings", "type": {"type": "map", "values": "string"}}
]
}
}
]
}
}
]
}

with proper type enforcement:

{
"user": {
"id": 123,
"profile": {
"name": "Jane",
"settings": {"theme": "dark"}
}
}
}

Default Values

Schema with default values...

{
"event_type": "page_view",
"user_id": 456
}
- enforce_schema:
schema_type: avro
schema: |
{
"type": "record",
"name": "Event",
"fields": [
{"name": "event_type", "type": "string"},
{"name": "user_id", "type": "int"},
{"name": "timestamp", "type": "long", "default": 0},
{"name": "properties", "type": {"type": "map", "values": "string"}, "default": {}}
]
}

automatically adds missing fields:

{
"event_type": "page_view",
"user_id": 456,
"timestamp": 0,
"properties": {}
}

Array Validation

Validate arrays with typed elements...

{
"tags": ["urgent", "customer", "billing"],
"scores": ["95", "87", "92"]
}
- enforce_schema:
schema_type: avro
schema: |
{
"type": "record",
"name": "Document",
"fields": [
{"name": "tags", "type": {"type": "array", "items": "string"}},
{"name": "scores", "type": {"type": "array", "items": "int"}}
]
}

with automatic type conversion:

{
"tags": ["urgent", "customer", "billing"],
"scores": [95, 87, 92]
}

Union Types

Handle union types for flexible fields...

{
"id": "USER123",
"value": "42.5"
}
- enforce_schema:
schema_type: avro
schema: |
{
"type": "record",
"name": "FlexibleRecord",
"fields": [
{"name": "id", "type": ["int", "string"]},
{"name": "value", "type": ["null", "int", "double", "string"]}
]
}

choosing the best matching type:

{
"id": "USER123",
"value": 42.5
}

Error Handling

Handle validation errors gracefully...

{
"invalid_field": "not_in_schema",
"required_field": null
}
- enforce_schema:
schema_type: avro
schema: |
{
"type": "record",
"name": "StrictRecord",
"fields": [
{"name": "required_field", "type": "string"}
]
}
on_failure:
- set:
field: validation_error
value: "Schema validation failed"

by adding error information:

{
"invalid_field": "not_in_schema",
"required_field": null,
"validation_error": "Schema validation failed"
}

Conditional Schema Enforcement

Apply schemas conditionally...

{
"event_type": "user_action",
"data": {
"action": "click",
"element": "button"
}
}
- enforce_schema:
if: "event_type == 'user_action'"
schema_type: avro
schema: |
{
"type": "record",
"name": "UserAction",
"fields": [
{"name": "event_type", "type": "string"},
{
"name": "data",
"type": {
"type": "record",
"name": "ActionData",
"fields": [
{"name": "action", "type": "string"},
{"name": "element", "type": "string"}
]
}
}
]
}

based on event characteristics:

{
"event_type": "user_action",
"data": {
"action": "click",
"element": "button"
}
}