REST API Reference
Base Configuration
- Base URL:
http://localhost:8080/api/v2
- Content-Type:
application/json
- OpenAPI Version: 3.0.0
Authentication
The API supports three authentication methods:
- Basic Auth: Include
Authorization: Basic <base64(username:password)>
header - Bearer Token: Include
Authorization: Bearer <token>
header - No Authentication: When auth is disabled (default for local development)
System Endpoints
Health Check
Endpoint: GET /api/v2/health
Checks the health status of the Dagu server.
Response (200):
{
"status": "healthy",
"version": "1.14.5",
"uptime": 86400,
"timestamp": "2024-02-11T16:30:45.123456789Z"
}
Response when unhealthy (503):
{
"status": "unhealthy",
"version": "1.14.5",
"uptime": 300,
"timestamp": "2024-02-11T16:30:45.123456789Z",
"error": "Scheduler not responding"
}
Response Fields:
status
: Server health status ("healthy" or "unhealthy")version
: Current server versionuptime
: Server uptime in secondstimestamp
: Current server time
DAG Management Endpoints
List DAGs
Endpoint: GET /api/v2/dags
Retrieves DAG definitions with optional filtering by name and tags.
Query Parameters:
Parameter | Type | Description | Default |
---|---|---|---|
page | integer | Page number (1-based) | 1 |
perPage | integer | Items per page (max 1000) | 50 |
name | string | Filter DAGs by name | - |
tag | string | Filter DAGs by tag | - |
remoteNode | string | Remote node name | "local" |
Response (200):
{
"dags": [
{
"fileName": "example.yaml",
"dag": {
"name": "example_dag",
"group": "default",
"schedule": [{"expression": "0 * * * *"}],
"description": "Example DAG",
"params": ["param1", "param2"],
"defaultParams": "{}",
"tags": ["example", "demo"]
},
"latestDAGRun": {
"dagRunId": "20240101_120000",
"name": "example_dag",
"status": 1,
"statusLabel": "running",
"startedAt": "2024-01-01T12:00:00Z",
"finishedAt": "",
"log": "/logs/example_dag.log"
},
"suspended": false,
"errors": []
}
],
"errors": [],
"pagination": {
"totalRecords": 45,
"currentPage": 1,
"totalPages": 5,
"nextPage": 2,
"prevPage": null
}
}
Create DAG
Endpoint: POST /api/v2/dags
Creates a new empty DAG file with the specified name.
Request Body:
{
"name": "my-new-dag"
}
Response (201):
{
"name": "my-new-dag"
}
Error Response (400):
{
"code": "bad_request",
"message": "Invalid DAG name format"
}
Error Response (409):
{
"code": "already_exists",
"message": "DAG with this name already exists"
}
Get DAG Details
Endpoint: GET /api/v2/dags/{fileName}
Fetches detailed information about a specific DAG.
Path Parameters:
Parameter | Type | Description | Pattern |
---|---|---|---|
fileName | string | DAG file name | ^[a-zA-Z0-9_-]+$ |
Response (200):
{
"dag": {
"name": "data_processing_pipeline",
"group": "ETL",
"schedule": [
{"expression": "0 2 * * *"},
{"expression": "0 14 * * *"}
],
"description": "Daily data processing pipeline for warehouse ETL",
"env": [
"DATA_SOURCE=postgres://prod-db:5432/analytics",
"WAREHOUSE_URL=${WAREHOUSE_URL}"
],
"logDir": "/var/log/dagu/pipelines",
"handlerOn": {
"success": {
"name": "notify_success",
"command": "notify.sh 'Pipeline completed successfully'"
},
"failure": {
"name": "alert_on_failure",
"command": "alert.sh 'Pipeline failed' high"
},
"exit": {
"name": "cleanup",
"command": "cleanup_temp_files.sh"
}
},
"steps": [
{
"name": "extract_data",
"id": "extract",
"description": "Extract data from source database",
"dir": "/app/etl",
"command": "python",
"args": ["extract.py", "--date", "${date}"],
"stdout": "/logs/extract.out",
"stderr": "/logs/extract.err",
"output": "EXTRACTED_FILE",
"preconditions": [
{
"condition": "test -f /data/ready.flag",
"expected": ""
}
]
},
{
"name": "transform_data",
"id": "transform",
"description": "Apply transformations to extracted data",
"command": "python transform.py --input=${EXTRACTED_FILE}",
"depends": ["extract_data"],
"output": "TRANSFORMED_FILE",
"repeatPolicy": {
"repeat": false,
"interval": 0
},
"mailOnError": true
},
{
"name": "load_to_warehouse",
"id": "load",
"run": "warehouse-loader",
"params": "{\"file\": \"${TRANSFORMED_FILE}\", \"table\": \"fact_sales\"}",
"depends": ["transform_data"]
}
],
"delay": 30,
"histRetentionDays": 30,
"preconditions": [
{
"condition": "`date +%u`",
"expected": "re:[1-5]",
"error": "Pipeline only runs on weekdays"
}
],
"maxActiveRuns": 1,
"maxActiveSteps": 5,
"params": ["date", "env", "batch_size"],
"defaultParams": "{\"batch_size\": 1000, \"env\": \"dev\"}",
"tags": ["production", "etl", "daily"]
},
"localDags": [
{
"name": "warehouse-loader",
"dag": {
"name": "warehouse_loader_subdag",
"steps": [
{
"name": "validate_schema",
"command": "validate_schema.py"
},
{
"name": "load_data",
"command": "load_to_warehouse.py",
"depends": ["validate_schema"]
}
]
},
"errors": []
}
],
"latestDAGRun": {
"rootDAGRunName": "data_processing_pipeline",
"rootDAGRunId": "20240211_140000_abc123",
"parentDAGRunName": "",
"parentDAGRunId": "",
"dagRunId": "20240211_140000_abc123",
"name": "data_processing_pipeline",
"status": 4,
"statusLabel": "finished",
"queuedAt": "",
"startedAt": "2024-02-11T14:00:00Z",
"finishedAt": "2024-02-11T14:45:30Z",
"log": "/logs/data_processing_pipeline/20240211_140000_abc123.log",
"params": "{\"date\": \"2024-02-11\", \"env\": \"production\", \"batch_size\": 5000}"
},
"suspended": false,
"errors": []
}
Delete DAG
Endpoint: DELETE /api/v2/dags/{fileName}
Permanently removes a DAG definition from the system.
Response (204): No content
Error Response (404):
{
"code": "not_found",
"message": "DAG not found"
}
Error Response (403):
{
"code": "forbidden",
"message": "Permission denied to delete DAGs"
}
Get DAG Specification
Endpoint: GET /api/v2/dags/{fileName}/spec
Fetches the YAML specification of a DAG.
Response (200):
{
"dag": {
"name": "example_dag"
},
"spec": "name: example_dag\nsteps:\n - name: hello\n command: echo Hello",
"errors": []
}
Update DAG Specification
Endpoint: PUT /api/v2/dags/{fileName}/spec
Updates the YAML specification of a DAG.
Request Body:
{
"spec": "name: example_dag\nsteps:\n - name: hello\n command: echo Hello World"
}
Response (200):
{
"errors": []
}
Response with Validation Errors (200):
{
"errors": [
"Line 5: Invalid step configuration - missing command",
"Line 10: Circular dependency detected between step1 and step2"
]
}
Error Response (403):
{
"code": "forbidden",
"message": "Permission denied to edit DAGs"
}
Rename DAG
Endpoint: POST /api/v2/dags/{fileName}/rename
Changes the file ID of the DAG definition.
Request Body:
{
"newFileName": "new-dag-name"
}
Response (200): Success
Error Response (400):
{
"code": "bad_request",
"message": "Invalid new file name format"
}
Error Response (409):
{
"code": "already_exists",
"message": "A DAG with the new name already exists"
}
DAG Execution Endpoints
Start DAG
Endpoint: POST /api/v2/dags/{fileName}/start
Creates and starts a DAG run with optional parameters.
Request Body:
{
"params": "{\"env\": \"production\", \"version\": \"1.2.3\"}",
"dagRunId": "custom-run-id"
}
Request Fields:
Field | Type | Description | Required |
---|---|---|---|
params | string | JSON string of parameters | No |
dagRunId | string | Custom run ID | No |
Response (200):
{
"dagRunId": "20240101_120000_abc123"
}
Enqueue DAG
Endpoint: POST /api/v2/dags/{fileName}/enqueue
Adds a DAG run to the queue for later execution.
Request Body: Same as Start DAG
Response (200):
{
"dagRunId": "20240101_120000_abc123"
}
Toggle DAG Suspension
Endpoint: POST /api/v2/dags/{fileName}/suspend
Controls whether the scheduler creates runs from this DAG.
Request Body:
{
"suspend": true
}
Response (200): Success
Error Response (404):
{
"code": "not_found",
"message": "DAG not found"
}
DAG Run History Endpoints
Get DAG Run History
Endpoint: GET /api/v2/dags/{fileName}/dag-runs
Fetches execution history of a DAG.
Response (200):
{
"dagRuns": [
{
"rootDAGRunName": "data_processing_pipeline",
"rootDAGRunId": "20240211_140000_abc123",
"parentDAGRunName": "",
"parentDAGRunId": "",
"dagRunId": "20240211_140000_abc123",
"name": "data_processing_pipeline",
"status": 4,
"statusLabel": "finished",
"queuedAt": "",
"startedAt": "2024-02-11T14:00:00Z",
"finishedAt": "2024-02-11T14:45:30Z",
"log": "/logs/data_processing_pipeline/20240211_140000_abc123.log",
"params": "{\"date\": \"2024-02-11\", \"env\": \"production\", \"batch_size\": 5000}"
},
{
"rootDAGRunName": "data_processing_pipeline",
"rootDAGRunId": "20240211_020000_def456",
"parentDAGRunName": "",
"parentDAGRunId": "",
"dagRunId": "20240211_020000_def456",
"name": "data_processing_pipeline",
"status": 2,
"statusLabel": "failed",
"queuedAt": "",
"startedAt": "2024-02-11T02:00:00Z",
"finishedAt": "2024-02-11T02:15:45Z",
"log": "/logs/data_processing_pipeline/20240211_020000_def456.log",
"params": "{\"date\": \"2024-02-11\", \"env\": \"production\", \"batch_size\": 5000}"
},
{
"rootDAGRunName": "data_processing_pipeline",
"rootDAGRunId": "20240210_140000_ghi789",
"parentDAGRunName": "",
"parentDAGRunId": "",
"dagRunId": "20240210_140000_ghi789",
"name": "data_processing_pipeline",
"status": 4,
"statusLabel": "finished",
"queuedAt": "",
"startedAt": "2024-02-10T14:00:00Z",
"finishedAt": "2024-02-10T14:42:15Z",
"log": "/logs/data_processing_pipeline/20240210_140000_ghi789.log",
"params": "{\"date\": \"2024-02-10\", \"env\": \"production\", \"batch_size\": 5000}"
}
],
"gridData": [
{
"name": "extract_data",
"history": [4, 2, 4]
},
{
"name": "transform_data",
"history": [4, 2, 4]
},
{
"name": "load_to_warehouse",
"history": [4, 0, 4]
}
]
}
Get Specific DAG Run
Endpoint: GET /api/v2/dags/{fileName}/dag-runs/{dagRunId}
Gets detailed status of a specific DAG run.
Response (200):
{
"dagRun": {
"rootDAGRunName": "data_processing_pipeline",
"rootDAGRunId": "20240211_140000_abc123",
"parentDAGRunName": "",
"parentDAGRunId": "",
"dagRunId": "20240211_140000_abc123",
"name": "data_processing_pipeline",
"status": 4,
"statusLabel": "finished",
"queuedAt": "",
"startedAt": "2024-02-11T14:00:00Z",
"finishedAt": "2024-02-11T14:45:30Z",
"log": "/logs/data_processing_pipeline/20240211_140000_abc123.log",
"params": "{\"date\": \"2024-02-11\", \"env\": \"production\", \"batch_size\": 5000}",
"nodes": [
{
"step": {
"name": "extract_data",
"id": "extract",
"command": "python",
"args": ["extract.py", "--date", "2024-02-11"]
},
"stdout": "/logs/data_processing_pipeline/20240211_140000_abc123/extract_data.stdout",
"stderr": "/logs/data_processing_pipeline/20240211_140000_abc123/extract_data.stderr",
"startedAt": "2024-02-11T14:00:30Z",
"finishedAt": "2024-02-11T14:15:45Z",
"status": 4,
"statusLabel": "finished",
"retryCount": 0,
"doneCount": 1,
"children": [],
"childrenRepeated": [],
"error": ""
},
{
"step": {
"name": "transform_data",
"id": "transform",
"command": "python transform.py --input=/tmp/extracted_20240211.csv",
"depends": ["extract_data"]
},
"stdout": "/logs/data_processing_pipeline/20240211_140000_abc123/transform_data.stdout",
"stderr": "/logs/data_processing_pipeline/20240211_140000_abc123/transform_data.stderr",
"startedAt": "2024-02-11T14:15:45Z",
"finishedAt": "2024-02-11T14:30:20Z",
"status": 4,
"statusLabel": "finished",
"retryCount": 0,
"doneCount": 1,
"children": [],
"childrenRepeated": [],
"error": ""
},
{
"step": {
"name": "load_to_warehouse",
"id": "load",
"run": "warehouse-loader",
"params": "{\"file\": \"/tmp/transformed_20240211.csv\", \"table\": \"fact_sales\"}",
"depends": ["transform_data"]
},
"stdout": "/logs/data_processing_pipeline/20240211_140000_abc123/load_to_warehouse.stdout",
"stderr": "/logs/data_processing_pipeline/20240211_140000_abc123/load_to_warehouse.stderr",
"startedAt": "2024-02-11T14:30:20Z",
"finishedAt": "2024-02-11T14:45:30Z",
"status": 4,
"statusLabel": "finished",
"retryCount": 0,
"doneCount": 1,
"children": [
{
"dagRunId": "sub_20240211_143020_xyz456",
"params": "{\"file\": \"/tmp/transformed_20240211.csv\", \"table\": \"fact_sales\"}"
}
],
"childrenRepeated": [],
"error": ""
}
]
}
}
DAG Run Management Endpoints
List All DAG Runs
Endpoint: GET /api/v2/dag-runs
Retrieves all DAG runs with optional filtering.
Query Parameters:
Parameter | Type | Description | Default |
---|---|---|---|
name | string | Filter by DAG name | - |
status | integer | Filter by status (0-6) | - |
fromDate | integer | Unix timestamp start | - |
toDate | integer | Unix timestamp end | - |
dagRunId | string | Filter by run ID | - |
remoteNode | string | Remote node name | "local" |
Note: This endpoint does not support pagination. All matching results are returned.
Status Values:
- 0: Not started
- 1: Running
- 2: Failed
- 3: Cancelled
- 4: Success
- 5: Queued
- 6: Partial Success
Response (200):
{
"dagRuns": [
{
"rootDAGRunName": "data_processing_pipeline",
"rootDAGRunId": "20240211_160000_current",
"parentDAGRunName": "",
"parentDAGRunId": "",
"dagRunId": "20240211_160000_current",
"name": "data_processing_pipeline",
"status": 1,
"statusLabel": "running",
"queuedAt": "",
"startedAt": "2024-02-11T16:00:00Z",
"finishedAt": "",
"log": "/logs/data_processing_pipeline/20240211_160000_current.log",
"params": "{\"date\": \"2024-02-11\", \"env\": \"production\", \"batch_size\": 5000}"
},
{
"rootDAGRunName": "database_backup",
"rootDAGRunId": "20240211_150000_backup",
"parentDAGRunName": "",
"parentDAGRunId": "",
"dagRunId": "20240211_150000_backup",
"name": "database_backup",
"status": 4,
"statusLabel": "finished",
"queuedAt": "",
"startedAt": "2024-02-11T15:00:00Z",
"finishedAt": "2024-02-11T15:45:30Z",
"log": "/logs/database_backup/20240211_150000_backup.log",
"params": "{\"target_db\": \"production\", \"retention_days\": 30}"
},
{
"rootDAGRunName": "ml_training_pipeline",
"rootDAGRunId": "20240211_143000_ml",
"parentDAGRunName": "",
"parentDAGRunId": "",
"dagRunId": "20240211_143000_ml",
"name": "ml_training_pipeline",
"status": 5,
"statusLabel": "queued",
"queuedAt": "2024-02-11T14:30:00Z",
"startedAt": "",
"finishedAt": "",
"log": "/logs/ml_training_pipeline/20240211_143000_ml.log",
"params": "{\"model\": \"recommendation_v2\", \"dataset\": \"user_interactions\"}"
}
]
}
Get DAG Run Details
Endpoint: GET /api/v2/dag-runs/{name}/{dagRunId}
Fetches detailed status of a specific DAG run.
Path Parameters:
Parameter | Type | Description |
---|---|---|
name | string | DAG name |
dagRunId | string | DAG run ID or "latest" |
Response (200):
{
"dagRun": {
"dagRunId": "20240211_120000",
"name": "data-pipeline",
"status": 4,
"statusLabel": "finished",
"startedAt": "2024-02-11T12:00:00Z",
"finishedAt": "2024-02-11T12:15:00Z",
"params": "{\"date\": \"2024-02-11\", \"env\": \"prod\"}",
"nodes": [
{
"step": {
"name": "extract",
"command": "python extract.py"
},
"status": 4,
"statusLabel": "finished",
"startedAt": "2024-02-11T12:00:00Z",
"finishedAt": "2024-02-11T12:05:00Z",
"retryCount": 0,
"stdout": "/logs/data-pipeline/20240211_120000/extract.out",
"stderr": "/logs/data-pipeline/20240211_120000/extract.err"
},
{
"step": {
"name": "transform",
"command": "python transform.py",
"depends": ["extract"]
},
"status": 4,
"statusLabel": "finished",
"startedAt": "2024-02-11T12:05:00Z",
"finishedAt": "2024-02-11T12:10:00Z",
"retryCount": 0
},
{
"step": {
"name": "load",
"run": "sub-workflow",
"params": "TARGET=warehouse"
},
"status": 4,
"statusLabel": "finished",
"startedAt": "2024-02-11T12:10:00Z",
"finishedAt": "2024-02-11T12:15:00Z",
"children": [
{
"dagRunId": "sub_20240211_121000",
"name": "sub-workflow",
"status": 4,
"statusLabel": "finished"
}
]
}
]
}
}
Stop DAG Run
Endpoint: POST /api/v2/dag-runs/{name}/{dagRunId}/stop
Forcefully stops a running DAG run.
Response (200): Success
Error Response (404):
{
"code": "not_found",
"message": "DAG run not found"
}
Error Response (400):
{
"code": "not_running",
"message": "DAG is not currently running"
}
Retry DAG Run
Endpoint: POST /api/v2/dag-runs/{name}/{dagRunId}/retry
Creates a new DAG run based on a previous execution.
Request Body:
{
"dagRunId": "new-run-id"
}
Response (200): Success
Error Response (404):
{
"code": "not_found",
"message": "Original DAG run not found"
}
Error Response (400):
{
"code": "already_running",
"message": "Another instance of this DAG is already running"
}
Dequeue DAG Run
Endpoint: GET /api/v2/dag-runs/{name}/{dagRunId}/dequeue
Removes a queued DAG run from the queue.
Response (200): Success
Error Response (404):
{
"code": "not_found",
"message": "DAG run not found in queue"
}
Log Endpoints
Get DAG Run Log
Endpoint: GET /api/v2/dag-runs/{name}/{dagRunId}/log
Fetches the execution log for a DAG run.
Query Parameters:
Parameter | Type | Description | Default |
---|---|---|---|
tail | integer | Lines from end | - |
head | integer | Lines from start | - |
offset | integer | Start line (1-based) | - |
limit | integer | Max lines (max 10000) | - |
remoteNode | string | Remote node name | "local" |
Response (200):
{
"content": "2024-02-11 14:00:00 INFO DAG data_processing_pipeline started\n2024-02-11 14:00:00 INFO Run ID: 20240211_140000_abc123\n2024-02-11 14:00:00 INFO Parameters: {\"date\": \"2024-02-11\", \"env\": \"production\", \"batch_size\": 5000}\n2024-02-11 14:00:00 INFO Checking preconditions...\n2024-02-11 14:00:01 INFO Precondition passed: Weekday check (current day: 7)\n2024-02-11 14:00:01 INFO Starting step: extract_data\n2024-02-11 14:00:30 INFO [extract_data] Executing: python extract.py --date 2024-02-11\n2024-02-11 14:15:45 INFO [extract_data] Step completed successfully\n2024-02-11 14:15:45 INFO [extract_data] Output saved to variable: EXTRACTED_FILE = /tmp/extracted_20240211.csv\n2024-02-11 14:15:45 INFO Starting step: transform_data\n2024-02-11 14:15:45 INFO [transform_data] Executing: python transform.py --input=/tmp/extracted_20240211.csv\n2024-02-11 14:30:20 INFO [transform_data] Step completed successfully\n2024-02-11 14:30:20 INFO [transform_data] Output saved to variable: TRANSFORMED_FILE = /tmp/transformed_20240211.csv\n2024-02-11 14:30:20 INFO Starting step: load_to_warehouse\n2024-02-11 14:30:20 INFO [load_to_warehouse] Running child DAG: warehouse-loader\n2024-02-11 14:30:20 INFO [load_to_warehouse] Child DAG started with ID: sub_20240211_143020_xyz456\n2024-02-11 14:45:30 INFO [load_to_warehouse] Child DAG completed successfully\n2024-02-11 14:45:30 INFO Executing onSuccess handler: notify_success\n2024-02-11 14:45:32 INFO [notify_success] Handler completed\n2024-02-11 14:45:32 INFO Executing onExit handler: cleanup\n2024-02-11 14:45:35 INFO [cleanup] Handler completed\n2024-02-11 14:45:35 INFO DAG completed successfully\n",
"lineCount": 22,
"totalLines": 156,
"hasMore": true,
"isEstimate": false
}
Get Step Log
Endpoint: GET /api/v2/dag-runs/{name}/{dagRunId}/steps/{stepName}/log
Fetches the log for a specific step.
Query Parameters:
Parameter | Type | Description | Default |
---|---|---|---|
stream | string | "stdout" or "stderr" | "stdout" |
tail | integer | Lines from end | - |
head | integer | Lines from start | - |
offset | integer | Start line (1-based) | - |
limit | integer | Max lines (max 10000) | - |
Response (200):
{
"content": "2024-02-11 12:05:00 INFO Starting data transformation...\n2024-02-11 12:05:01 INFO Processing 1000 records\n2024-02-11 12:05:05 INFO Transformation complete\n",
"lineCount": 3,
"totalLines": 3,
"hasMore": false,
"isEstimate": false
}
Response with stderr (200):
{
"content": "2024-02-11 12:05:02 WARNING Duplicate key found, skipping record ID: 123\n2024-02-11 12:05:03 WARNING Invalid date format in record ID: 456\n",
"lineCount": 2,
"totalLines": 2,
"hasMore": false,
"isEstimate": false
}
Step Management Endpoints
Update Step Status
Endpoint: PATCH /api/v2/dag-runs/{name}/{dagRunId}/steps/{stepName}/status
Manually updates a step's execution status.
Request Body:
{
"status": 4
}
Status Values:
- 0: Not started
- 1: Running
- 2: Failed
- 3: Cancelled
- 4: Success
- 5: Skipped
Response (200): Success
Error Response (404):
{
"code": "not_found",
"message": "Step not found in DAG run"
}
Error Response (400):
{
"code": "bad_request",
"message": "Invalid status value"
}
Search Endpoints
Search DAGs
Endpoint: GET /api/v2/dags/search
Performs full-text search across DAG definitions.
Query Parameters:
Parameter | Type | Description | Required |
---|---|---|---|
q | string | Search query | Yes |
remoteNode | string | Remote node name | No |
Response (200):
{
"results": [
{
"name": "database_backup",
"dag": {
"name": "database_backup",
"group": "Operations",
"schedule": [{"expression": "0 0 * * 0"}],
"description": "Weekly database backup job",
"params": ["target_db", "retention_days"],
"defaultParams": "{\"retention_days\": 30}",
"tags": ["backup", "weekly", "critical"]
},
"matches": [
{
"line": " command: pg_dump ${target_db} | gzip > backup_$(date +%Y%m%d).sql.gz",
"lineNumber": 25,
"startLine": 20
},
{
"line": "description: Weekly database backup job",
"lineNumber": 3,
"startLine": 1
}
]
},
{
"name": "data_processing_pipeline",
"dag": {
"name": "data_processing_pipeline",
"group": "ETL",
"schedule": [
{"expression": "0 2 * * *"},
{"expression": "0 14 * * *"}
],
"description": "Daily data processing pipeline for warehouse ETL",
"params": ["date", "env", "batch_size"],
"defaultParams": "{\"batch_size\": 1000, \"env\": \"dev\"}",
"tags": ["production", "etl", "daily"]
},
"matches": [
{
"line": " command: psql -h ${DB_HOST} -d analytics -c \"COPY data TO STDOUT\"",
"lineNumber": 45,
"startLine": 42
}
]
}
],
"errors": []
}
Get All Tags
Endpoint: GET /api/v2/dags/tags
Retrieves all unique tags used across DAGs.
Query Parameters:
Parameter | Type | Description | Default |
---|---|---|---|
remoteNode | string | Remote node name | "local" |
Response (200):
{
"tags": [
"backup",
"critical",
"daily",
"data-quality",
"etl",
"experimental",
"hourly",
"maintenance",
"ml",
"monitoring",
"production",
"reporting",
"staging",
"testing",
"weekly"
],
"errors": []
}
Response with Errors (200):
{
"tags": [
"backup",
"critical",
"daily",
"etl",
"production"
],
"errors": [
"Error reading DAG file: malformed-etl.yaml - yaml: line 15: found unexpected end of stream",
"Error reading DAG file: invalid-syntax.yaml - yaml: unmarshal errors:\n line 8: field invalidField not found in type digraph.DAG"
]
}
Monitoring Endpoints
Prometheus Metrics
Endpoint: GET /api/v2/metrics
Returns Prometheus-compatible metrics.
Response (200) (text/plain):
# HELP dagu_info Dagu build information
# TYPE dagu_info gauge
dagu_info{version="1.14.0",build_date="2024-01-01T12:00:00Z",go_version="1.21"} 1
# HELP dagu_uptime_seconds Time since server start
# TYPE dagu_uptime_seconds gauge
dagu_uptime_seconds 3600
# HELP dagu_dag_runs_currently_running Number of currently running DAG runs
# TYPE dagu_dag_runs_currently_running gauge
dagu_dag_runs_currently_running 5
# HELP dagu_dag_runs_queued_total Total number of DAG runs in queue
# TYPE dagu_dag_runs_queued_total gauge
dagu_dag_runs_queued_total 8
# HELP dagu_dag_runs_total Total number of DAG runs by status
# TYPE dagu_dag_runs_total counter
dagu_dag_runs_total{status="success"} 2493
dagu_dag_runs_total{status="error"} 15
dagu_dag_runs_total{status="cancelled"} 7
# HELP dagu_dags_total Total number of DAGs
# TYPE dagu_dags_total gauge
dagu_dags_total 45
# HELP dagu_scheduler_running Whether the scheduler is running
# TYPE dagu_scheduler_running gauge
dagu_scheduler_running 1
Error Handling
All endpoints return structured error responses:
{
"code": "error_code",
"message": "Human readable error message",
"details": {
"additional": "error details"
}
}
Error Codes:
Code | Description |
---|---|
forbidden | Insufficient permissions |
bad_request | Invalid request parameters |
not_found | Resource doesn't exist |
internal_error | Server-side error |
unauthorized | Authentication failed |
bad_gateway | Upstream service error |
remote_node_error | Remote node connection failed |
already_running | DAG is already running |
not_running | DAG is not running |
already_exists | Resource already exists |
Child DAG Run Endpoints
Get Child DAG Run Details
Endpoint: GET /api/v2/dag-runs/{name}/{dagRunId}/children/{childDAGRunId}
Fetches detailed status of a child DAG run.
Response (200):
{
"dagRunDetails": {
"rootDAGRunName": "data_processing_pipeline",
"rootDAGRunId": "20240211_140000_abc123",
"parentDAGRunName": "data_processing_pipeline",
"parentDAGRunId": "20240211_140000_abc123",
"dagRunId": "sub_20240211_143020_xyz456",
"name": "warehouse_loader_subdag",
"status": 4,
"statusLabel": "finished",
"queuedAt": "",
"startedAt": "2024-02-11T14:30:20Z",
"finishedAt": "2024-02-11T14:45:30Z",
"log": "/logs/warehouse_loader_subdag/sub_20240211_143020_xyz456.log",
"params": "{\"file\": \"/tmp/transformed_20240211.csv\", \"table\": \"fact_sales\"}",
"nodes": [
{
"step": {
"name": "validate_schema",
"command": "validate_schema.py",
"args": [],
"depends": []
},
"stdout": "/logs/warehouse_loader_subdag/sub_20240211_143020_xyz456/validate_schema.stdout",
"stderr": "/logs/warehouse_loader_subdag/sub_20240211_143020_xyz456/validate_schema.stderr",
"startedAt": "2024-02-11T14:30:20Z",
"finishedAt": "2024-02-11T14:30:35Z",
"status": 4,
"statusLabel": "finished",
"retryCount": 0,
"doneCount": 1,
"children": [],
"childrenRepeated": [],
"error": ""
},
{
"step": {
"name": "load_data",
"command": "load_to_warehouse.py",
"depends": ["validate_schema"]
},
"stdout": "/logs/warehouse_loader_subdag/sub_20240211_143020_xyz456/load_data.stdout",
"stderr": "/logs/warehouse_loader_subdag/sub_20240211_143020_xyz456/load_data.stderr",
"startedAt": "2024-02-11T14:30:35Z",
"finishedAt": "2024-02-11T14:45:30Z",
"status": 4,
"statusLabel": "finished",
"retryCount": 0,
"doneCount": 1,
"children": [],
"childrenRepeated": [],
"error": ""
}
],
"onExit": null,
"onSuccess": null,
"onFailure": null,
"onCancel": null,
"preconditions": []
}
}
Get Child DAG Run Log
Endpoint: GET /api/v2/dag-runs/{name}/{dagRunId}/children/{childDAGRunId}/log
Fetches the log for a child DAG run.
Response (200):
{
"content": "2024-02-11 14:30:20 INFO Starting child DAG: warehouse_loader_subdag\n2024-02-11 14:30:20 INFO Parameters: {\"file\": \"/tmp/transformed_20240211.csv\", \"table\": \"fact_sales\"}\n2024-02-11 14:30:20 INFO Parent DAG: data_processing_pipeline (20240211_140000_abc123)\n2024-02-11 14:30:20 INFO Step 'validate_schema' started\n2024-02-11 14:30:22 INFO Schema validation: Checking table structure for 'fact_sales'\n2024-02-11 14:30:35 INFO Step 'validate_schema' completed successfully\n2024-02-11 14:30:35 INFO Step 'load_data' started\n2024-02-11 14:30:36 INFO Opening file: /tmp/transformed_20240211.csv\n2024-02-11 14:30:37 INFO File contains 50000 records\n2024-02-11 14:30:38 INFO Beginning bulk insert to warehouse.fact_sales\n2024-02-11 14:35:00 INFO Progress: 25000/50000 records loaded (50%)\n2024-02-11 14:40:00 INFO Progress: 45000/50000 records loaded (90%)\n2024-02-11 14:45:28 INFO All 50000 records loaded successfully\n2024-02-11 14:45:29 INFO Committing transaction\n2024-02-11 14:45:30 INFO Step 'load_data' completed successfully\n2024-02-11 14:45:30 INFO Child DAG completed successfully\n",
"lineCount": 7,
"totalLines": 7,
"hasMore": false,
"isEstimate": false
}
Get Child Step Log
Endpoint: GET /api/v2/dag-runs/{name}/{dagRunId}/children/{childDAGRunId}/steps/{stepName}/log
Fetches the log for a step in a child DAG run.
Response (200):
{
"content": "2024-02-11 14:30:35 INFO Step 'load_data' started\n2024-02-11 14:30:36 INFO Opening file: /tmp/transformed_20240211.csv\n2024-02-11 14:30:37 INFO File contains 50000 records\n2024-02-11 14:30:38 INFO Beginning bulk insert to warehouse.fact_sales\n2024-02-11 14:30:39 INFO Using batch size: 5000\n2024-02-11 14:30:40 INFO Processing batch 1 of 10\n2024-02-11 14:32:00 INFO Batch 1 complete (5000 records)\n2024-02-11 14:33:20 INFO Processing batch 2 of 10\n2024-02-11 14:34:40 INFO Batch 2 complete (10000 records)\n2024-02-11 14:36:00 INFO Processing batch 3 of 10\n2024-02-11 14:37:20 INFO Batch 3 complete (15000 records)\n2024-02-11 14:38:40 INFO Processing batch 4 of 10\n2024-02-11 14:40:00 INFO Batch 4 complete (20000 records)\n2024-02-11 14:41:20 INFO Processing batch 5 of 10\n2024-02-11 14:42:40 INFO Batch 5 complete (25000 records)\n[... truncated for brevity ...]\n2024-02-11 14:45:28 INFO All 50000 records loaded successfully\n2024-02-11 14:45:29 INFO Committing transaction\n2024-02-11 14:45:30 INFO Step 'load_data' completed successfully\n",
"lineCount": 50,
"totalLines": 156,
"hasMore": true,
"isEstimate": false
}
Update Child Step Status
Endpoint: PATCH /api/v2/dag-runs/{name}/{dagRunId}/children/{childDAGRunId}/steps/{stepName}/status
Updates the status of a step in a child DAG run.
Request Body:
{
"status": 4
}
Response (200): Success
Error Response (404):
{
"code": "not_found",
"message": "Child DAG run or step not found"
}
Additional Endpoints
List DAG Runs by Name
Endpoint: GET /api/v2/dag-runs/{name}
Lists all DAG runs for a specific DAG name.
Path Parameters:
Parameter | Type | Description |
---|---|---|
name | string | DAG name |
Query Parameters:
Parameter | Type | Description | Default |
---|---|---|---|
status | integer | Filter by status (0-6) | - |
fromDate | integer | Unix timestamp start | - |
toDate | integer | Unix timestamp end | - |
dagRunId | string | Filter by run ID | - |
remoteNode | string | Remote node name | "local" |
Response (200):
{
"dagRuns": [
{
"rootDAGRunName": "data_processing_pipeline",
"rootDAGRunId": "20240211_140000_abc123",
"parentDAGRunName": "",
"parentDAGRunId": "",
"dagRunId": "20240211_140000_abc123",
"name": "data_processing_pipeline",
"status": 4,
"statusLabel": "finished",
"queuedAt": "",
"startedAt": "2024-02-11T14:00:00Z",
"finishedAt": "2024-02-11T14:45:30Z",
"log": "/logs/data_processing_pipeline/20240211_140000_abc123.log",
"params": "{\"date\": \"2024-02-11\", \"env\": \"production\", \"batch_size\": 5000}"
},
{
"rootDAGRunName": "data_processing_pipeline",
"rootDAGRunId": "20240211_020000_def456",
"parentDAGRunName": "",
"parentDAGRunId": "",
"dagRunId": "20240211_020000_def456",
"name": "data_processing_pipeline",
"status": 2,
"statusLabel": "failed",
"queuedAt": "",
"startedAt": "2024-02-11T02:00:00Z",
"finishedAt": "2024-02-11T02:15:45Z",
"log": "/logs/data_processing_pipeline/20240211_020000_def456.log",
"params": "{\"date\": \"2024-02-11\", \"env\": \"production\", \"batch_size\": 5000}"
}
]
}
Example Usage
Start a DAG with Parameters
curl -X POST "http://localhost:8080/api/v2/dags/data-processing-pipeline/start" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer your-token" \
-d '{
"params": "{\"date\": \"2024-02-11\", \"env\": \"production\", \"batch_size\": 5000}",
"dagRunId": "manual_20240211_160000"
}'
Response:
{
"dagRunId": "manual_20240211_160000"
}
Check DAG Run Status
curl "http://localhost:8080/api/v2/dag-runs/data-processing-pipeline/latest" \
-H "Authorization: Bearer your-token"
Response:
{
"dagRunDetails": {
"rootDAGRunName": "data_processing_pipeline",
"rootDAGRunId": "20240211_160000_current",
"parentDAGRunName": "",
"parentDAGRunId": "",
"dagRunId": "20240211_160000_current",
"name": "data_processing_pipeline",
"status": 1,
"statusLabel": "running",
"queuedAt": "",
"startedAt": "2024-02-11T16:00:00Z",
"finishedAt": "",
"log": "/logs/data_processing_pipeline/20240211_160000_current.log",
"params": "{\"date\": \"2024-02-11\", \"env\": \"production\", \"batch_size\": 5000}",
"nodes": [
{
"step": {
"name": "extract_data",
"id": "extract"
},
"status": 4,
"statusLabel": "finished",
"startedAt": "2024-02-11T16:00:30Z",
"finishedAt": "2024-02-11T16:15:45Z",
"retryCount": 0,
"doneCount": 1
},
{
"step": {
"name": "transform_data",
"id": "transform"
},
"status": 1,
"statusLabel": "running",
"startedAt": "2024-02-11T16:15:45Z",
"finishedAt": "",
"retryCount": 0,
"doneCount": 0
},
{
"step": {
"name": "load_to_warehouse",
"id": "load"
},
"status": 0,
"statusLabel": "not started",
"startedAt": "",
"finishedAt": "",
"retryCount": 0,
"doneCount": 0
}
]
}
}
Search for DAGs
curl "http://localhost:8080/api/v2/dags/search?q=database" \
-H "Authorization: Bearer your-token"
Response:
{
"results": [
{
"name": "database_backup",
"dag": {
"name": "database_backup",
"group": "Operations",
"schedule": [{"expression": "0 0 * * 0"}],
"description": "Weekly database backup job",
"params": ["target_db", "retention_days"],
"defaultParams": "{\"retention_days\": 30}",
"tags": ["backup", "weekly", "critical"]
},
"matches": [
{
"line": " command: pg_dump ${target_db} | gzip > backup_$(date +%Y%m%d).sql.gz",
"lineNumber": 25,
"startLine": 20
}
]
}
],
"errors": []
}
Get Metrics for Monitoring
curl "http://localhost:8080/api/v2/metrics" | grep dagu_dag_runs_currently_running
Stop a Running DAG
curl -X POST "http://localhost:8080/api/v2/dag-runs/data-pipeline/20240211_120000/stop" \
-H "Authorization: Bearer your-token"
Update Step Status Manually
# Mark a failed step as successful
curl -X PATCH "http://localhost:8080/api/v2/dag-runs/data-processing-pipeline/20240211_020000_def456/steps/transform_data/status" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer your-token" \
-d '{"status": 4}'
Response (200): Success (empty response body)
Enqueue a DAG Run
curl -X POST "http://localhost:8080/api/v2/dags/ml-training-pipeline/enqueue" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer your-token" \
-d '{
"params": "{\"model\": \"recommendation_v3\", \"dataset\": \"user_interactions_2024\"}",
"dagRunId": "ml_train_20240211_170000"
}'
Response:
{
"dagRunId": "ml_train_20240211_170000"
}
Get Logs with Pagination
# Get last 100 lines of a DAG run log
curl "http://localhost:8080/api/v2/dag-runs/etl-pipeline/20240211_120000/log?tail=100"
# Get specific step's stderr output
curl "http://localhost:8080/api/v2/dag-runs/etl-pipeline/20240211_120000/steps/transform/log?stream=stderr"
# Get logs with offset and limit
curl "http://localhost:8080/api/v2/dag-runs/etl-pipeline/20240211_120000/log?offset=1000&limit=500"
Working with Child DAGs
# Get child DAG run details
curl "http://localhost:8080/api/v2/dag-runs/data-processing-pipeline/20240211_140000_abc123/children/sub_20240211_143020_xyz456" \
-H "Authorization: Bearer your-token"
# Get child DAG step log
curl "http://localhost:8080/api/v2/dag-runs/data-processing-pipeline/20240211_140000_abc123/children/sub_20240211_143020_xyz456/steps/load_data/log" \
-H "Authorization: Bearer your-token"
# Update child DAG step status
curl -X PATCH "http://localhost:8080/api/v2/dag-runs/data-processing-pipeline/20240211_140000_abc123/children/sub_20240211_143020_xyz456/steps/load_data/status" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer your-token" \
-d '{"status": 4}'
Rename a DAG
curl -X POST "http://localhost:8080/api/v2/dags/old-pipeline-name/rename" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer your-token" \
-d '{"newFileName": "new-pipeline-name"}'
Response (200): Success (empty response body)
Delete a DAG
curl -X DELETE "http://localhost:8080/api/v2/dags/deprecated-pipeline" \
-H "Authorization: Bearer your-token"
Response (204): No content (successful deletion)
Get DAG Specification YAML
curl "http://localhost:8080/api/v2/dags/data-processing-pipeline/spec" \
-H "Authorization: Bearer your-token"
Response:
{
"dag": {
"name": "data_processing_pipeline",
"group": "ETL"
},
"spec": "name: data_processing_pipeline\ngroup: ETL\nschedule:\n - \"0 2 * * *\"\n - \"0 14 * * *\"\ndescription: Daily data processing pipeline for warehouse ETL\nenv:\n - DATA_SOURCE=postgres://prod-db:5432/analytics\n - WAREHOUSE_URL=${WAREHOUSE_URL}\nlogDir: /var/log/dagu/pipelines\nhistRetentionDays: 30\nmaxActiveRuns: 1\nmaxActiveSteps: 5\nparams:\n - date\n - env\n - batch_size\ndefaultParams: |\n batch_size: 1000\n env: dev\ntags:\n - production\n - etl\n - daily\npreconditions:\n - condition: \"`date +%u`\"\n expected: \"re:[1-5]\"\n error: Pipeline only runs on weekdays\nsteps:\n - name: extract_data\n id: extract\n description: Extract data from source database\n dir: /app/etl\n command: python\n args:\n - extract.py\n - --date\n - ${date}\n stdout: /logs/extract.out\n stderr: /logs/extract.err\n output: EXTRACTED_FILE\n preconditions:\n - condition: test -f /data/ready.flag\n - name: transform_data\n id: transform\n description: Apply transformations to extracted data\n command: python transform.py --input=${EXTRACTED_FILE}\n depends:\n - extract_data\n output: TRANSFORMED_FILE\n mailOnError: true\n - name: load_to_warehouse\n id: load\n run: warehouse-loader\n params: |\n file: ${TRANSFORMED_FILE}\n table: fact_sales\n depends:\n - transform_data\nhandlerOn:\n success:\n command: notify.sh 'Pipeline completed successfully'\n failure:\n command: alert.sh 'Pipeline failed' high\n exit:\n command: cleanup_temp_files.sh\n",
"errors": []
}
Complex Filtering Examples
# Get all failed DAG runs in the last 24 hours
curl "http://localhost:8080/api/v2/dag-runs?status=2&fromDate=$(date -d '24 hours ago' +%s)" \
-H "Authorization: Bearer your-token"
# Get DAG runs for a specific DAG with pagination
curl "http://localhost:8080/api/v2/dag-runs?name=data-processing-pipeline&page=2&perPage=20" \
-H "Authorization: Bearer your-token"
# Search for DAGs with specific tags
curl "http://localhost:8080/api/v2/dags?tag=production&page=1&perPage=50" \
-H "Authorization: Bearer your-token"
# Get running DAG runs
curl "http://localhost:8080/api/v2/dag-runs?status=1" \
-H "Authorization: Bearer your-token"
# Get queued DAG runs
curl "http://localhost:8080/api/v2/dag-runs?status=5" \
-H "Authorization: Bearer your-token"
Suspend/Resume DAG Scheduling
# Suspend a DAG
curl -X POST "http://localhost:8080/api/v2/dags/data-processing-pipeline/suspend" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer your-token" \
-d '{"suspend": true}'
# Resume a DAG
curl -X POST "http://localhost:8080/api/v2/dags/data-processing-pipeline/suspend" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer your-token" \
-d '{"suspend": false}'
Response (200): Success (empty response body)
API Response Status Codes Summary
Status Code | Description | Common Scenarios |
---|---|---|
200 | Success | Successful GET, POST, PUT, PATCH requests |
201 | Created | New DAG created successfully |
204 | No Content | Successful DELETE operation |
400 | Bad Request | Invalid parameters, malformed JSON, invalid DAG name format |
401 | Unauthorized | Missing or invalid authentication token |
403 | Forbidden | Insufficient permissions (e.g., no write access) |
404 | Not Found | DAG, DAG run, or resource doesn't exist |
409 | Conflict | Resource already exists (e.g., DAG name conflict) |
500 | Internal Error | Server-side processing error |
503 | Service Unavailable | Server unhealthy or scheduler not responding |
Workers Endpoints
List Workers
Endpoint: GET /api/v2/workers
Retrieves information about connected workers in the distributed execution system.
Query Parameters:
Parameter | Type | Description | Default |
---|---|---|---|
remoteNode | string | Remote node name | "local" |
Response (200):
{
"workers": [
{
"id": "worker-gpu-01",
"labels": {
"gpu": "true",
"cuda": "11.8",
"memory": "64G",
"region": "us-east-1"
},
"health_status": "WORKER_HEALTH_STATUS_HEALTHY",
"last_heartbeat": "2024-02-11T12:00:00Z",
"running_tasks": [
{
"dagName": "ml-training-pipeline",
"dagRunId": "20240211_120000_abc123",
"rootDagRunName": "ml-training-pipeline",
"rootDagRunId": "20240211_120000_abc123",
"parentDagRunName": "",
"parentDagRunId": "",
"startedAt": "2024-02-11T12:00:00Z"
}
]
},
{
"id": "worker-cpu-02",
"labels": {
"cpu-arch": "amd64",
"cpu-cores": "32",
"region": "us-east-1"
},
"health_status": "WORKER_HEALTH_STATUS_WARNING",
"last_heartbeat": "2024-02-11T11:59:50Z",
"running_tasks": []
},
{
"id": "worker-eu-01",
"labels": {
"region": "eu-west-1",
"compliance": "gdpr"
},
"health_status": "WORKER_HEALTH_STATUS_UNHEALTHY",
"last_heartbeat": "2024-02-11T11:59:30Z",
"running_tasks": [
{
"dagName": "data-processor",
"dagRunId": "20240211_113000_def456",
"rootDagRunName": "data-pipeline",
"rootDagRunId": "20240211_110000_xyz789",
"parentDagRunName": "data-pipeline",
"parentDagRunId": "20240211_110000_xyz789",
"startedAt": "2024-02-11T11:30:00Z"
}
]
}
]
}
Worker Health Status Values:
WORKER_HEALTH_STATUS_HEALTHY
: Last heartbeat < 5 seconds agoWORKER_HEALTH_STATUS_WARNING
: Last heartbeat 5-15 seconds agoWORKER_HEALTH_STATUS_UNHEALTHY
: Last heartbeat > 15 seconds ago
Running Task Fields:
dagName
: Name of the DAG being executeddagRunId
: ID of the current DAG runrootDagRunName
: Name of the root DAG (for nested workflows)rootDagRunId
: ID of the root DAG runparentDagRunName
: Name of the immediate parent DAG (empty for root DAGs)parentDagRunId
: ID of the immediate parent DAG runstartedAt
: When the task execution started
Error Response (503) (when coordinator is not running):
{
"code": "service_unavailable",
"message": "Coordinator service is not available"
}
API Versioning
- Current version: v2
- Legacy v1 endpoints are deprecated but still available
- Version is included in the URL path:
/api/v2/
- Breaking changes will result in a new API version
Remote Node Support
Most endpoints support the remoteNode
query parameter for multi-environment setups:
# Query a remote node
curl "http://localhost:8080/api/v2/dags?remoteNode=production" \
-H "Authorization: Bearer your-token"
Remote nodes are configured in the server configuration file and allow managing DAGs across multiple Dagu instances from a single interface.