Examples
Quick reference for all Dagu features. Each example is minimal and copy-paste ready.
Basic Workflow Patterns
Basic Sequential Steps
steps:
- name: first
command: echo "Step 1"
- name: second
command: echo "Step 2"
Execute steps one after another.
Parallel Execution
steps:
- name: process-items
run: processor
parallel:
items: [A, B, C]
maxConcurrent: 2
Process multiple items simultaneously.
Complex Dependencies
steps:
- name: setup
command: ./setup.sh
- name: test-a
command: ./test-a.sh
depends: setup
- name: test-b
command: ./test-b.sh
depends: setup
- name: deploy
command: ./deploy.sh
depends:
- test-a
- test-b
Define complex dependency graphs.
Chain vs Graph Execution
# Chain type (default) - automatic dependencies
type: chain
steps:
- name: download
command: wget data.csv
- name: process
command: python process.py # Depends on download
- name: upload
command: aws s3 cp output.csv s3://bucket/
---
# Graph type - explicit dependencies
type: graph
steps:
- name: step1
command: echo "First"
- name: step2
command: echo "Second"
depends: step1 # Explicit dependency required
Control execution flow patterns.
Control Flow & Conditions
Conditional Execution
steps:
- name: deploy
command: ./deploy.sh
preconditions:
- condition: "${ENV}"
expected: "production"
Run steps only when conditions are met.
Advanced Preconditions
steps:
- name: conditional-task
command: ./process.sh
preconditions:
- test -f /data/input.csv
- test -s /data/input.csv # File exists and is not empty
- condition: "${ENVIRONMENT}"
expected: "production"
- condition: "`date '+%d'`"
expected: "re:0[1-9]" # First 9 days of month
- condition: "`df -h /data | awk 'NR==2 {print $5}' | sed 's/%//'`"
expected: "re:^[0-7][0-9]$" # Less than 80% disk usage
Multiple condition types and regex patterns.
Repeat Until Condition
steps:
- name: wait-for-service
command: curl -f http://service/health
repeatPolicy:
repeat: true
intervalSec: 10
exitCode: [1] # Repeat while exit code is 1
Repeat steps until success.
Repeat Until Success
steps:
- name: wait-for-service
command: curl -f http://service:8080/health
repeatPolicy:
repeat: until # Repeat UNTIL service is healthy
exitCode: [0] # Exit code 0 means success
intervalSec: 10
limit: 30 # Maximum 5 minutes
- name: monitor-job
command: ./check_job_status.sh
output: JOB_STATUS
repeatPolicy:
repeat: until # Repeat UNTIL job completes
condition: "${JOB_STATUS}"
expected: "COMPLETED"
intervalSec: 30
limit: 120 # Maximum 1 hour
Wait for external dependencies and job completion with clear semantics.
Repeat Steps
steps:
- name: keep-alive-task
command: heartbeat.sh
repeatPolicy:
repeat: while # Repeat indefinitely while successful
intervalSec: 60
- name: monitor-until-done
command: check-status.sh
repeatPolicy:
repeat: until # Repeat until exit code 0
exitCode: [0]
intervalSec: 30
limit: 20 # Maximum 10 minutes
Execute steps with clear repeat semantics.
Error Handling & Reliability
Continue on Failure
steps:
- name: optional-task
command: ./nice-to-have.sh
continueOn:
failure: true
- name: required-task
command: ./must-succeed.sh
Handle non-critical failures gracefully.
Continue on Skipped
steps:
- name: optional-feature
command: ./enable-feature.sh
preconditions:
- condition: "${FEATURE_FLAG}"
expected: "enabled"
continueOn:
skipped: true
- name: main-process
command: ./process.sh
Continue workflow when preconditions aren't met.
Retry on Failure
steps:
- name: api-call
command: curl https://api.example.com
retryPolicy:
limit: 3
intervalSec: 30
Automatically retry failed steps.
Smart Retry Policies
steps:
- name: api-with-smart-retry
command: ./api_call.sh
retryPolicy:
limit: 5
intervalSec: 30
exitCodes: [429, 503, 504] # Rate limit, service unavailable
Targeted retry policies for different error types.
Retry with Exponential Backoff
steps:
- name: api-with-backoff
command: curl https://api.example.com/data
retryPolicy:
limit: 5
intervalSec: 2
backoff: true # 2x multiplier
maxIntervalSec: 60 # Cap at 60s
# Intervals: 2s, 4s, 8s, 16s, 32s → 60s
Avoid overwhelming failed services with exponential backoff.
Repeat with Backoff
steps:
- name: wait-for-service
command: nc -z localhost 8080
repeatPolicy:
repeat: while
exitCode: [1] # While connection fails
intervalSec: 1
backoff: 2.0
maxIntervalSec: 30
limit: 20
# Check intervals: 1s, 2s, 4s, 8s, 16s, 30s...
Gradually increase polling intervals to reduce load.
Lifecycle Handlers
handlerOn:
success:
command: ./notify-success.sh
failure:
command: ./cleanup-on-fail.sh
exit:
command: ./always-cleanup.sh
steps:
- name: main
command: ./process.sh
Run handlers on workflow events.
Data & Variables
Environment Variables
env:
- SOME_DIR: ${HOME}/batch
- SOME_FILE: ${SOME_DIR}/some_file
- LOG_LEVEL: debug
- API_KEY: ${SECRET_API_KEY}
steps:
- name: task
dir: ${SOME_DIR}
command: python main.py ${SOME_FILE}
Define variables accessible throughout the DAG.
Dotenv Files
# Specify single dotenv file
dotenv: .env
# Or specify multiple candidate files
dotenv:
- .env
- .env.local
- configs/.env.prod
steps:
- name: use-env-vars
command: echo "Database: ${DATABASE_URL}"
Load environment variables from .env files.
Positional Parameters
params: param1 param2 # Default values for $1 and $2
steps:
- name: parameterized task
command: python main.py $1 $2
Define default positional parameters.
Named Parameters
params:
- FOO: 1 # Default value for ${FOO}
- BAR: "`echo 2`" # Command substitution in defaults
- ENVIRONMENT: dev
steps:
- name: named params task
command: python main.py ${FOO} ${BAR} --env=${ENVIRONMENT}
Define named parameters with defaults.
Output Variables
steps:
- name: get-version
command: cat VERSION
output: VERSION
- name: use-version
command: echo "Version is ${VERSION}"
depends: get-version
Pass data between steps.
Output Size Limits
# Set maximum output size to 5MB for all steps
maxOutputSize: 5242880 # 5MB in bytes
steps:
- name: large-output
command: "cat large-file.txt"
output: CONTENT # Will fail if file exceeds 5MB
Control output size limits to prevent memory issues.
Redirect Output to Files
steps:
- name: redirect stdout
command: "echo hello"
stdout: "/tmp/hello"
- name: redirect stderr
command: "echo error message >&2"
stderr: "/tmp/error.txt"
Send output to files instead of capturing.
JSON Path References
steps:
- name: child DAG
run: sub_workflow
output: SUB_RESULT
- name: use nested output
command: echo "Result: ${SUB_RESULT.outputs.finalValue}"
depends: child DAG
Access nested JSON data with path references.
Step ID References
steps:
- name: extract customer data
id: extract
command: python extract.py
output: DATA
- name: process if valid
command: |
echo "Exit code: ${extract.exitCode}"
echo "Stdout path: ${extract.stdout}"
depends: extract
Reference step properties using short IDs.
Command Substitution
env:
TODAY: "`date '+%Y%m%d'`"
steps:
- name: use date
command: "echo hello, today is ${TODAY}"
Use command output in configurations.
Scripts & Code
Shell Scripts
steps:
- name: script step
script: |
cd /tmp
echo "hello world" > hello
cat hello
ls -la
Run shell script with default shell.
Python Scripts
steps:
- name: python script
command: python
script: |
import os
import datetime
print(f"Current directory: {os.getcwd()}")
print(f"Current time: {datetime.datetime.now()}")
Execute script with specific interpreter.
Multi-Step Scripts
steps:
- name: complex-task
script: |
#!/bin/bash
set -e
echo "Starting process..."
./prepare.sh
echo "Running main task..."
./main-process.sh
echo "Cleaning up..."
./cleanup.sh
Run multi-line scripts.
Working Directory
steps:
- name: step in specific directory
dir: /path/to/working/directory
command: pwd && ls -la
Control where each step executes.
Shell Selection
steps:
- name: bash specific
command: echo hello world | xargs echo
shell: bash
- name: with pipes
command: echo hello world | xargs echo
Specify shell or use pipes in commands.
Executors & Integrations
Docker Executor
steps:
- name: build
executor:
type: docker
config:
image: node:18
command: npm run build
Run commands in Docker containers.
SSH Remote Execution
steps:
- name: remote
executor:
type: ssh
config:
host: server.example.com
user: deploy
command: ./deploy.sh
Execute commands on remote servers.
HTTP Requests
steps:
- name: webhook
executor:
type: http
config:
url: https://api.example.com/webhook
method: POST
headers:
Content-Type: application/json
body: '{"status": "started"}'
Make HTTP API calls.
JSON Processing
steps:
- name: get-data
command: curl -s https://api.example.com/users
output: API_RESPONSE
- name: extract-emails
executor: jq
command: '.data[] | select(.active == true) | .email'
script: ${API_RESPONSE}
output: USER_EMAILS
API integration with JSON processing pipeline.
Scheduling & Automation
Basic Scheduling
schedule: "5 4 * * *" # Run at 04:05 daily
steps:
- name: scheduled job
command: job.sh
Use cron expressions to schedule DAGs.
Skip Redundant Runs
name: Daily Data Processing
schedule: "0 */4 * * *" # Every 4 hours
skipIfSuccessful: true # Skip if already succeeded
steps:
- name: extract
command: extract_data.sh
- name: transform
command: transform_data.sh
- name: load
command: load_data.sh
Prevent unnecessary executions when already successful.
Queue Management
name: batch-job
queue: "batch" # Assign to named queue
maxActiveRuns: 2 # Max concurrent runs
steps:
- name: process
command: process_data.sh
Control concurrent DAG execution.
Global Queue Configuration
# Global queue config in ~/.config/dagu/config.yaml
queues:
enabled: true
config:
- name: "critical"
maxActiveRuns: 5
- name: "batch"
maxActiveRuns: 1
# DAG file
queue: "critical"
maxActiveRuns: 3
steps:
- name: critical-task
command: ./process.sh
Configure queues globally and per-DAG.
Email Notifications
mailOn:
failure: true
success: true
smtp:
host: smtp.gmail.com
port: "587"
username: "${SMTP_USER}"
password: "${SMTP_PASS}"
steps:
- name: critical-job
command: ./important.sh
mailOnError: true
Send email alerts on events.
Operations & Production
History Retention
name: data-archiver
histRetentionDays: 30 # Keep 30 days of history
schedule: "0 0 * * *" # Daily at midnight
steps:
- name: archive-old-data
command: ./archive.sh
- name: cleanup-temp
command: rm -rf /tmp/archive/*
Control how long execution history is retained.
Output Size Management
name: log-processor
maxOutputSize: 10485760 # 10MB max output per step
steps:
- name: process-large-logs
command: ./analyze-logs.sh
stdout: /logs/analysis.out
- name: summarize
command: tail -n 1000 /logs/analysis.out
Prevent memory issues from large command outputs.
Custom Log Directory
name: etl-pipeline
logDir: /data/etl/logs/${DAG_NAME}
histRetentionDays: 90
steps:
- name: extract
command: ./extract.sh
stdout: extract.log
stderr: extract.err
- name: transform
command: ./transform.py
stdout: transform.log
Organize logs in custom directories with retention.
Timeout & Cleanup
name: long-running-job
timeoutSec: 7200 # 2 hour timeout
maxCleanUpTimeSec: 600 # 10 min cleanup window
steps:
- name: data-processing
command: ./heavy-process.sh
signalOnStop: SIGTERM
handlerOn:
exit:
command: ./cleanup-resources.sh
Ensure workflows don't run forever and clean up properly.
Production Monitoring
name: critical-service
histRetentionDays: 365 # Keep 1 year for compliance
maxOutputSize: 5242880 # 5MB output limit
maxActiveRuns: 1 # No overlapping runs
mailOn:
failure: true
errorMail:
from: [email protected]
to: [email protected]
prefix: "[CRITICAL]"
attachLogs: true
infoMail:
from: [email protected]
to: [email protected]
prefix: "[SUCCESS]"
handlerOn:
failure:
command: |
curl -X POST https://metrics.company.com/alerts \
-H "Content-Type: application/json" \
-d '{"service": "critical-service", "status": "failed"}'
steps:
- name: health-check
command: ./health-check.sh
retryPolicy:
limit: 3
intervalSec: 30
Production-ready configuration with monitoring and alerts.
Distributed Tracing
name: data-pipeline
otel:
enabled: true
endpoint: "otel-collector:4317"
resource:
service.name: "dagu-${DAG_NAME}"
deployment.environment: "${ENV}"
steps:
- name: fetch-data
command: ./fetch.sh
- name: process-data
command: python process.py
depends: fetch-data
- name: run-sub-pipeline
run: pipelines/transform
depends: process-data
Enable OpenTelemetry tracing for observability.
Execution Control
name: batch-processor
maxActiveSteps: 5 # Max 5 parallel steps
maxActiveRuns: 2 # Max 2 concurrent DAG runs
delaySec: 10 # 10 second initial delay
skipIfSuccessful: true # Skip if already succeeded
steps:
- name: validate
command: ./validate.sh
- name: process-batch-1
command: ./process.sh batch1
depends: validate
- name: process-batch-2
command: ./process.sh batch2
depends: validate
- name: process-batch-3
command: ./process.sh batch3
depends: validate
Control concurrency and execution behavior.
Queue Assignment
name: heavy-computation
queue: compute-queue # Assign to specific queue
histRetentionDays: 60 # Keep 60 days history
maxOutputSize: 20971520 # 20MB output limit
steps:
- name: prepare-data
command: ./prepare.sh
- name: run-computation
command: ./compute.sh --intensive
- name: store-results
command: ./store.sh
Use queues to manage workflow execution priority and concurrency.
Advanced Patterns
Nested Workflows
steps:
- name: data-pipeline
run: etl.yaml
params: "ENV=prod DATE=today"
- name: analyze
run: analyze.yaml
depends: data-pipeline
Compose workflows from reusable parts.
Multiple DAGs in One File
name: main-workflow
steps:
- name: process-data
run: data-processor
params: "TYPE=daily"
---
name: data-processor
params:
- TYPE: "batch"
steps:
- name: extract
command: ./extract.sh ${TYPE}
- name: transform
command: ./transform.sh
Define multiple workflows in a single file.
Complete DAG Configuration
name: production-etl
description: Daily ETL pipeline for analytics
schedule: "0 2 * * *"
skipIfSuccessful: true
group: DataPipelines
tags: daily,critical
queue: etl-queue
maxActiveRuns: 1
maxOutputSize: 5242880 # 5MB
histRetentionDays: 90 # Keep history for 90 days
env:
- LOG_LEVEL: info
- DATA_DIR: /data/analytics
params:
- DATE: "`date '+%Y-%m-%d'`"
- ENVIRONMENT: production
mailOn:
failure: true
smtp:
host: smtp.company.com
port: "587"
handlerOn:
success:
command: echo "ETL completed successfully"
failure:
command: ./cleanup_on_failure.sh
exit:
command: ./final_cleanup.sh
steps:
- name: validate-environment
command: ./validate_env.sh ${ENVIRONMENT}
Complete DAG with all configuration options.
Steps as Map vs Array
# Steps as array (recommended)
steps:
- name: step1
command: echo "hello"
- name: step2
command: echo "world"
depends: step1
---
# Steps as map (alternative)
steps:
step1:
command: echo "hello"
step2:
command: echo "world"
depends: step1
Two ways to define steps.
Distributed Execution
GPU Task Routing
name: ml-training-pipeline
steps:
- name: prepare-data
command: python prepare_dataset.py
- name: train-model
run: train-model
- name: evaluate-model
run: evaluate-model
---
name: train-model
workerSelector:
gpu: "true"
cuda: "11.8"
memory: "64G"
steps:
- name: train-model
command: python train.py --gpu
---
name: evaluate-model
workerSelector:
gpu: "true"
steps:
- name: evaluate-model
command: python evaluate.py
Route GPU tasks to GPU-enabled workers.
Mixed Local and Distributed
name: hybrid-workflow
steps:
# Runs on any available worker (local or remote)
- name: fetch-data
command: wget https://data.example.com/dataset.tar.gz
# Must run on specific worker type
- name: process-on-gpu
run: process-on-gpu
# Runs locally (no selector)
- name: notify-completion
command: ./notify.sh "Processing complete"
depends: process-on-gpu
---
name: process-on-gpu
workerSelector:
gpu: "true"
gpu-model: "nvidia-a100"
steps:
- name: process-on-gpu
command: python gpu_process.py
Combine local and distributed execution.
Parallel Distributed Tasks
name: distributed-batch-processing
steps:
- name: split-data
command: python split_data.py --chunks=10
output: CHUNKS
- name: process-chunks
run: chunk-processor
parallel:
items: ${CHUNKS}
maxConcurrent: 5
params: "CHUNK=${ITEM}"
- name: merge-results
command: python merge_results.py
---
name: chunk-processor
workerSelector:
memory: "16G"
cpu-cores: "8"
params:
- CHUNK: ""
steps:
- name: process
command: python process_chunk.py ${CHUNK}
Distribute parallel tasks across workers.
Learn More
- Writing Workflows Guide - Complete guide to building workflows
- Feature Documentation - Deep dive into all features
- Configuration Reference - Complete YAML specification
- Distributed Execution - Scale across multiple machines
- Worker Labels - Task routing with labels