The DAG feature enables you to define complex task execution flows with parallel processing, branching, and merging capabilities. This allows for more sophisticated pipeline architectures beyond simple linear task chains.
The DAG feature was introduced in v2.1.0-experimental-dag and provides a declarative way to specify task execution dependencies and parallel processing patterns.
DAG expressions use a simple, intuitive syntax:
>>- Sequential execution (task dependency)[task1, task2]- Parallel execution (tasks run concurrently)
dag: task1 >> task2 >> task3dag: [task1, task2, task3]dag: task1 >> [task2, task3, task4]dag: [task1, task2, task3] >> task4dag: task1 >> [task2, task3] >> task4dag: source >> [branch1 >> transform1, branch2 >> transform2] >> sinkdag: ingest >> [clean, validate] >> [transform, enrich] >> [aggregate, export]tasks:
- name: read_csv_file
type: file
path: data/input.csv
- name: split_to_lines
type: split
- name: convert_from_csv
type: converter
format: csv
skip_first: true
columns:
- name: name
- name: age
is_numeric: true
- name: echo
type: echo
only_data: true
- name: echo2
type: echo
only_data: true
# DAG definition
dag: read_csv_file >> [split_to_lines, echo] >> convert_from_csv >> echo2Tasks within brackets [task1, task2] execute concurrently, improving pipeline throughput.
The pipeline automatically creates and manages channels between tasks based on the DAG structure.
- Individual task failures can be configured with
fail_on_error - Pipeline continues execution for non-failing tasks when possible
The DAG parser includes comprehensive validation:
- Syntax checking (balanced brackets, valid operators)
- Invalid character detection
- Structure validation (empty groups, malformed expressions)
The DAG parser enforces several rules:
- No empty expressions:
""is invalid - Balanced brackets: Every
[must have a matching] - No empty groups:
[]is invalid - No single-item groups:
[task1]is invalid (usetask1directly) - Valid characters only: Letters, numbers,
_,-,[,],,,>, whitespace - Proper arrow usage: Only
>>allowed, no single>or>>>+ - No leading arrows:
>>task1is invalid
tasks:
- name: task1
type: file
- name: task2
type: split
- name: task3
type: echo
# Implicit linear execution: task1 >> task2 >> task3tasks:
- name: task1
type: file
- name: task2
type: split
- name: task3
type: echo
# Explicit DAG definition (same behavior)
dag: task1 >> task2 >> task3
# Or with parallel processing
dag: task1 >> [task2, task3]-
Syntax Errors
Error: invalid DAG groups: error at index X, unmatched closing brace ']' found- Check bracket balancing
- Ensure proper comma placement
-
Invalid Characters
Error: invalid DAG groups: invalid characters found- Only use letters, numbers,
_,-, brackets, commas, and arrows - Remove special characters like
(),{},@,$
- Only use letters, numbers,
-
Performance Issues
- Increase
channel_sizefor high-throughput pipelines - Monitor task concurrency settings
- Check for bottlenecks in slow tasks
- Increase
-
Use Meaningful Task Names
- Use descriptive names that reflect task purpose
- Follow consistent naming conventions (snake_case recommended)
-
Optimize Parallel Sections
- Group similar-duration tasks together
- Avoid mixing fast and slow tasks in parallel groups
- Consider task dependencies when designing parallel sections
-
Channel Sizing
- Set appropriate
channel_sizebased on data volume - Monitor memory usage in production
- Use larger buffers for batch processing
- Set appropriate
-
Error Handling
- Configure
fail_on_errorappropriately for each task - Design graceful degradation paths
- Log errors for debugging
- Configure
The DAG feature is implemented through:
- DAG Parser: Converts string expressions to internal graph structure
- Validation Engine: Ensures syntactic and semantic correctness
- Channel Manager: Creates and manages inter-task communication
- Execution Engine: Orchestrates parallel and sequential execution
- Resource Optimizer: Optimizes memory usage and channel allocation
For more technical details, see the source code in:
internal/pkg/pipeline/dag.gointernal/pkg/pipeline/dag_test.gointernal/pkg/pipeline/pipeline.go