Advanced Patterns
Master complex workflow scenarios and optimization techniques.
Hierarchical Workflows
Dagu's most powerful feature is the ability to compose DAGs from other DAGs. The web UI allows you to drill down into nested workflows, making it easy to manage complex systems.
Basic Nested Workflow
yaml
# main.yaml
name: data-pipeline
steps:
- name: extract
run: workflows/extract.yaml
params: "SOURCE=production"
- name: transform
run: workflows/transform.yaml
params: "INPUT=${extract.output}"
depends: extract
- name: load
run: workflows/load.yaml
params: "DATA=${transform.output}"
depends: transform
Multi-Level Composition
Build complex systems from simple components:
yaml
# deployment-orchestrator.yaml
name: multi-env-deployment
params:
- VERSION: latest
- ENVIRONMENTS: '["dev", "staging", "prod"]'
steps:
- name: build
run: ci/build
params: "VERSION=${VERSION}"
output: BUILD_ARTIFACT
- name: deploy-environments
run: deploy/environment
parallel: ${ENVIRONMENTS}
params: |
ENV=${ITEM}
ARTIFACT=${BUILD_ARTIFACT}
VERSION=${VERSION}
depends: build
Sharing Data Between Workflows
yaml
# parent.yaml
steps:
- name: prepare-data
run: child-workflow
params: "DATASET=customers"
output: PREPARED_DATA
- name: process
command: python process.py --input=${PREPARED_DATA.outputs.FILE_PATH}
depends: prepare-data
# child-workflow.yaml
params:
- DATASET: ""
steps:
- name: extract
command: extract.sh ${DATASET}
output: FILE_PATH
Dynamic Workflow Generation
Conditional Sub-workflows
yaml
name: adaptive-pipeline
params:
- MODE: "standard" # standard, fast, thorough
steps:
- name: determine-workflow
command: |
case "${MODE}" in
fast) echo "workflows/fast-process.yaml" ;;
thorough) echo "workflows/thorough-process.yaml" ;;
*) echo "workflows/standard-process.yaml" ;;
esac
output: WORKFLOW_PATH
- name: execute
run: ${WORKFLOW_PATH}
depends: determine-workflow
Dynamic Step Generation
yaml
name: dynamic-processor
steps:
- name: discover-tasks
command: |
# Discover files to process
find /data -name "*.csv" -type f | jq -R -s -c 'split("\n")[:-1]'
output: TASK_LIST
- name: process-dynamic
run: processors/csv-handler
parallel: ${TASK_LIST}
params: "FILE=${ITEM}"
depends: discover-tasks
Parallel Processing Patterns
Map-Reduce Pattern
yaml
name: map-reduce-aggregation
steps:
- name: split-data
command: |
split -n 10 large-file.txt chunk_
ls chunk_* | jq -R -s -c 'split("\n")[:-1]'
output: CHUNKS
- name: map-phase
run: mappers/process-chunk
parallel: ${CHUNKS}
params: "CHUNK=${ITEM}"
output: MAP_RESULTS
depends: split-data
- name: reduce-phase
command: |
python reduce.py \
--inputs='${MAP_RESULTS.outputs}' \
--output=final-result.json
depends: map-phase
Fork-Join Pattern
yaml
name: fork-join-analysis
steps:
- name: prepare
command: ./prepare-dataset.sh
output: DATASET
# Fork: Parallel analysis
- name: statistical-analysis
command: python stats.py ${DATASET}
depends: prepare
output: STATS
- name: ml-analysis
command: python ml_model.py ${DATASET}
depends: prepare
output: ML_RESULTS
- name: visualization
command: python visualize.py ${DATASET}
depends: prepare
output: CHARTS
# Join: Combine results
- name: generate-report
command: |
python generate_report.py \
--stats=${STATS} \
--ml=${ML_RESULTS} \
--charts=${CHARTS}
depends:
- statistical-analysis
- ml-analysis
- visualization
Scatter-Gather Pattern
yaml
name: scatter-gather
params:
- REGIONS: '["us-east-1", "eu-west-1", "ap-south-1"]'
steps:
- name: scatter-requests
run: regional/data-collector
parallel: ${REGIONS}
params: "REGION=${ITEM}"
output: REGIONAL_DATA
- name: gather-results
command: |
python aggregate_regional.py \
--data='${REGIONAL_DATA.outputs}' \
--output=global-summary.json
depends: scatter-requests
Resource Management
Concurrency Control
yaml
name: resource-aware-pipeline
maxActiveRuns: 1 # Only one instance of this DAG
maxActiveSteps: 5 # Max 5 steps running concurrently
steps:
- name: cpu-intensive
command: ./heavy-computation.sh
env:
- GOMAXPROCS: 4 # Limit CPU cores
- name: memory-intensive
command: ./process-large-data.sh
env:
- MEMORY_LIMIT: 8G
- name: io-intensive
parallel:
items: ${FILE_LIST}
maxConcurrent: 3 # Limit parallel I/O
command: ./process-file.sh ${ITEM}
Queue-Based Resource Management
yaml
name: queue-managed-workflow
queue: heavy-jobs # Assign to specific queue
maxActiveRuns: 2 # Queue allows 2 concurrent
steps:
- name: resource-heavy
command: ./intensive-task.sh
- name: light-task
command: echo "Quick task"
queue: light-jobs # Different queue for light tasks
Performance Optimization
Caching Pattern
yaml
name: cached-pipeline
steps:
- name: check-cache
command: |
CACHE_KEY=$(echo "${INPUT_PARAMS}" | sha256sum | cut -d' ' -f1)
if [ -f "/cache/${CACHE_KEY}" ]; then
echo "CACHE_HIT"
cat "/cache/${CACHE_KEY}"
else
echo "CACHE_MISS"
fi
output: CACHE_STATUS
- name: compute
command: |
./expensive-computation.sh > result.json
CACHE_KEY=$(echo "${INPUT_PARAMS}" | sha256sum | cut -d' ' -f1)
cp result.json "/cache/${CACHE_KEY}"
cat result.json
depends: check-cache
preconditions:
- condition: "${CACHE_STATUS}"
expected: "CACHE_MISS"
output: RESULT
- name: use-cached
command: echo "${CACHE_STATUS}" | tail -n +2
depends: check-cache
preconditions:
- condition: "${CACHE_STATUS}"
expected: "re:CACHE_HIT.*"
output: RESULT
Lazy Evaluation
yaml
name: lazy-evaluation
steps:
- name: check-prerequisites
command: |
# Quick checks before expensive operations
test -f required-file.txt && echo "READY" || echo "NOT_READY"
output: STATUS
- name: expensive-operation
command: ./long-running-process.sh
depends: check-prerequisites
preconditions:
- condition: "${STATUS}"
expected: "READY"
State Management
Checkpointing
yaml
name: resumable-pipeline
params:
- CHECKPOINT_DIR: /tmp/checkpoints
steps:
- name: stage-1
command: |
CHECKPOINT="${CHECKPOINT_DIR}/stage-1.done"
if [ -f "$CHECKPOINT" ]; then
echo "Stage 1 already completed"
else
./stage-1-process.sh
touch "$CHECKPOINT"
fi
- name: stage-2
command: |
CHECKPOINT="${CHECKPOINT_DIR}/stage-2.done"
if [ -f "$CHECKPOINT" ]; then
echo "Stage 2 already completed"
else
./stage-2-process.sh
touch "$CHECKPOINT"
fi
depends: stage-1
Distributed Locking
yaml
name: distributed-job
steps:
- name: acquire-lock
command: |
LOCK_FILE="/tmp/locks/job.lock"
LOCK_ACQUIRED=false
for i in {1..60}; do
if mkdir "$LOCK_FILE" 2>/dev/null; then
LOCK_ACQUIRED=true
echo $$ > "$LOCK_FILE/pid"
break
fi
sleep 1
done
if [ "$LOCK_ACQUIRED" != "true" ]; then
echo "Failed to acquire lock"
exit 1
fi
- name: critical-section
command: ./exclusive-operation.sh
depends: acquire-lock
- name: release-lock
command: rm -rf "/tmp/locks/job.lock"
depends: critical-section
continueOn:
failure: true # Always release lock
Complex Control Flow
State Machine Pattern
yaml
name: state-machine
params:
- STATE: "INIT"
steps:
- name: init-state
command: echo "Initializing..."
preconditions:
- condition: "${STATE}"
expected: "INIT"
output: NEXT_STATE
- name: processing-state
command: ./process.sh
preconditions:
- condition: "${NEXT_STATE:-${STATE}}"
expected: "PROCESSING"
depends: init-state
output: NEXT_STATE
- name: validation-state
command: ./validate.sh
preconditions:
- condition: "${NEXT_STATE:-${STATE}}"
expected: "VALIDATION"
depends: processing-state
output: NEXT_STATE
- name: complete-state
command: echo "Completed!"
preconditions:
- condition: "${NEXT_STATE:-${STATE}}"
expected: "COMPLETE"
depends: validation-state
Circuit Breaker
yaml
name: circuit-breaker-pattern
env:
- FAILURE_THRESHOLD: 3
- FAILURE_COUNT_FILE: /tmp/failure_count
steps:
- name: check-circuit
command: |
COUNT=$(cat ${FAILURE_COUNT_FILE} 2>/dev/null || echo 0)
if [ $COUNT -ge ${FAILURE_THRESHOLD} ]; then
echo "OPEN"
else
echo "CLOSED"
fi
output: CIRCUIT_STATE
- name: execute-if-closed
command: |
./risky-operation.sh && echo 0 > ${FAILURE_COUNT_FILE}
depends: check-circuit
preconditions:
- condition: "${CIRCUIT_STATE}"
expected: "CLOSED"
continueOn:
failure: true
- name: increment-failures
command: |
COUNT=$(cat ${FAILURE_COUNT_FILE} 2>/dev/null || echo 0)
echo $((COUNT + 1)) > ${FAILURE_COUNT_FILE}
depends: execute-if-closed
preconditions:
- condition: "${execute-if-closed.exitCode}"
expected: "re:[1-9][0-9]*" # Non-zero exit code
Monitoring and Observability
Metrics Collection
yaml
name: monitored-pipeline
steps:
- name: start-metrics
command: |
START_TIME=$(date +%s)
echo "pipeline_start_time{workflow=\"${DAG_NAME}\"} $START_TIME" \
>> /metrics/dagu.prom
- name: process-with-metrics
command: |
START=$(date +%s)
./process.sh
DURATION=$(($(date +%s) - START))
echo "step_duration_seconds{workflow=\"${DAG_NAME}\",step=\"process\"} $DURATION" \
>> /metrics/dagu.prom
depends: start-metrics
- name: export-metrics
command: |
curl -X POST http://prometheus-pushgateway:9091/metrics/job/dagu \
--data-binary @/metrics/dagu.prom
depends: process-with-metrics
Structured Logging
yaml
name: structured-logging
env:
- LOG_FORMAT: json
steps:
- name: log-context
command: |
jq -n \
--arg workflow "${DAG_NAME}" \
--arg run_id "${DAG_RUN_ID}" \
--arg step "process" \
--arg level "info" \
--arg message "Starting processing" \
'{timestamp: now|todate, workflow: $workflow, run_id: $run_id, step: $step, level: $level, message: $message}'
- name: process-with-logging
command: |
./process.sh 2>&1 | while read line; do
jq -n \
--arg workflow "${DAG_NAME}" \
--arg run_id "${DAG_RUN_ID}" \
--arg step "${DAG_RUN_STEP_NAME}" \
--arg output "$line" \
'{timestamp: now|todate, workflow: $workflow, run_id: $run_id, step: $step, output: $output}'
done