A lightweight, robust, and asynchronous-friendly Python SDK for interacting with the Apache Airflow v2 Stable REST API.
This SDK follows a clean Facade and Layered Architecture, providing both Synchronous and Asynchronous clients to seamlessly manage your Airflow environments programmatically.
- Features
- Installation
- Requirements
- Airflow Configuration
- Quick Start
- SDK Architecture
- Available Services & API Reference
- Request Models
- Response Models
- Error Handling
- Pagination
- License
- Full API Coverage — Supports all Airflow v2 REST API endpoints: DAGs, DAG Runs, Tasks, Task Instances, Variables, Connections, Pools, Assets, Backfills, and more.
- Dual Clients — Provides both a synchronous
AirflowSDKand an asynchronousAsyncAirflowSDK. - Layered Architecture — Clean separation between the Facade, Domain Services, and Transport layers.
- Resilience Built-In — Automatic exponential backoff retries for common transient errors (e.g., 429 Rate Limits, 502/503/504 Server Errors).
- Intuitive Exceptions — Clear hierarchy of custom exceptions (
AirflowAuthError,AirflowRateLimitError,AirflowNotFoundError, etc.). - Typed Response Models — All responses are returned as Pydantic models for full IDE intellisense and type safety.
- Context Manager Support — Both clients fully support
with/async withfor automatic resource cleanup.
Clone the repository and install in editable mode:
git clone <your-repo>/AirflowSDK
cd AirflowSDK
pip install -e .| Dependency | Version |
|---|---|
| Python | 3.8+ |
httpx |
≥ 0.24.0 |
anyio |
≥ 3.0.0 |
pydantic |
≥ 2.0.0 |
Install dependencies manually if needed:
pip install httpx anyio pydanticMake sure your target Airflow environment has API authentication enabled. In airflow.cfg:
[api]
auth_backends = airflow.api.auth.backend.basic_authRestart Airflow after making changes:
airflow webserver restart
airflow scheduler restartTip: For Airflow running via Docker Compose, set the environment variable
AIRFLOW__CORE__AUTH_MANAGER: airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager.
Use AirflowSDK as a context manager to ensure connections are properly closed.
from src.sdk import AirflowSDK
with AirflowSDK(
base_url="http://localhost:8080",
username="admin",
password="admin",
retries=3 # optional, default is 3
) as client:
# List all DAGs
dag_collection = client.dags.list()
print(f"Total DAGs: {dag_collection.total_entries}")
for dag in dag_collection.dags:
print(dag.dag_id, "| paused:", dag.is_paused)
# Trigger a DAG Run
run = client.dag_runs.trigger(
dag_id="example_bash_operator",
conf={"key": "value"}
)
print(f"Triggered run: {run.dag_run_id} | state: {run.state}")You can also use the SDK without a context manager; just call .close() when done:
client = AirflowSDK("http://localhost:8080", "admin", "admin")
dags = client.dags.list()
client.close()For concurrent applications (FastAPI servers, async workers, etc.), use AsyncAirflowSDK.
import asyncio
from src.async_sdk import AsyncAirflowSDK
async def main():
async with AsyncAirflowSDK(
base_url="http://localhost:8080",
username="admin",
password="admin"
) as client:
# Fetch DAGs and config concurrently
dags, config = await asyncio.gather(
client.dags.list(),
client.config.get_config()
)
print(f"Found {dags.total_entries} DAGs")
print(f"Config sections: {len(config.sections)}")
# Trigger a run asynchronously
run = await client.dag_runs.trigger("my_dag", conf={"date": "2024-01-01"})
print(f"State: {run.state}")
asyncio.run(main())Note: Every method on
AsyncAirflowSDKservices is a coroutine and must be awaited.
AirflowSDK / AsyncAirflowSDK ← Facade Layer (entry point)
│
├── client.dags ─┐
├── client.dag_runs │
├── client.tasks │ Service Layer (src/services/)
├── client.task_instances │ Domain-scoped API modules
├── client.variables │ returning typed Pydantic models
├── client.connections │
├── client.pools │
├── client.config │
├── client.assets │
├── client.backfills ─┘
│
└── APIClient / AsyncAPIClient ← Core Layer (src/core/)
├── httpx transport
├── Basic Auth
└── Exponential backoff retry
Layers:
- Facade Layer (
src/sdk.py,src/async_sdk.py) — The primary entry point. Exposes all domain services as attributes (e.g.,client.dags,client.connections). - Service Layer (
src/services/) — Domain-specific modules that implement exact REST API logic and return typed Pydantic models. - Core Layer (
src/core/) — Handles all HTTP transport, session state, authentication, and retry logic usinghttpx.
All methods below apply equally to both AirflowSDK (sync) and AsyncAirflowSDK (async — prefix calls with await).
Manage your Airflow DAG definitions.
| Method | Description |
|---|---|
list(limit, offset, tags) |
List all DAGs. Optionally filter by tags. |
get(dag_id) |
Get a specific DAG by ID. |
get_details(dag_id) |
Get detailed information about a DAG. |
patch(dag_id, update_mask, **kwargs) |
Update fields on a DAG. |
delete(dag_id) |
Delete a DAG. |
pause(dag_id, is_paused=True) |
Pause or unpause a DAG. |
Examples:
# List first 50 DAGs
result = client.dags.list(limit=50, offset=0)
for dag in result.dags:
print(dag.dag_id, dag.owners, dag.tags)
# Get a specific DAG
dag = client.dags.get("my_etl_dag")
print(dag.fileloc, dag.is_active)
# Pause a DAG
client.dags.pause("my_etl_dag")
# Unpause a DAG
client.dags.pause("my_etl_dag", is_paused=False)
# Delete a DAG
client.dags.delete("old_dag")
# Filter by tags
result = client.dags.list(tags=["production", "finance"])Manage and monitor DAG run executions.
| Method | Description |
|---|---|
list(dag_id, limit, offset) |
List all runs for a DAG. |
get(dag_id, dag_run_id) |
Get a specific DAG run. |
trigger(dag_id, conf, logical_date) |
Trigger a new DAG run. |
delete(dag_id, dag_run_id) |
Delete a DAG run. |
clear(dag_id, dag_run_id, dry_run) |
Clear (re-run) a DAG run. |
Examples:
# Trigger a DAG run with configuration
run = client.dag_runs.trigger(
dag_id="my_dag",
conf={"environment": "prod", "date": "2024-06-01"},
logical_date="2024-06-01T00:00:00Z" # optional
)
print(run.dag_run_id, run.state) # e.g. "manual__2024-06-01" | "queued"
# List all runs for a DAG
runs = client.dag_runs.list("my_dag", limit=20)
for run in runs.dag_runs:
print(run.dag_run_id, run.state, run.start_date)
# Get a specific run
run = client.dag_runs.get("my_dag", "manual__2024-06-01")
# Clear (retry) a failed run
client.dag_runs.clear("my_dag", "manual__2024-06-01", dry_run=False)
# Delete a run
client.dag_runs.delete("my_dag", "manual__2024-06-01")Inspect task definitions within a DAG.
| Method | Description |
|---|---|
list(dag_id) |
List all tasks in a DAG. |
get(dag_id, task_id) |
Get a specific task definition. |
Examples:
# List tasks in a DAG
tasks = client.tasks.list("my_dag")
for task in tasks.tasks:
print(task.task_id, task.operator, task.downstream_task_ids)
# Get a specific task
task = client.tasks.get("my_dag", "process_data")
print(task.retries, task.pool, task.priority_weight)Inspect individual task execution instances.
| Method | Description |
|---|---|
list(dag_id, dag_run_id, limit, offset) |
List task instances for a run. |
get(dag_id, dag_run_id, task_id) |
Get a specific task instance. |
Examples:
# List all task instances in a run
instances = client.task_instances.list("my_dag", "manual__2024-06-01")
for ti in instances.task_instances:
print(ti.task_id, ti.state, ti.duration)
# Get a specific task instance
ti = client.task_instances.get("my_dag", "manual__2024-06-01", "process_data")
print(ti.state, ti.start_date, ti.end_date, ti.try_number)Retrieve the raw Python source code of a DAG file.
| Method | Description |
|---|---|
get(dag_id) |
Get the source code for a DAG. |
Examples:
source = client.dag_sources.get("my_dag")
print(source.content) # Raw Python source of the DAG fileGet run state statistics per DAG.
| Method | Description |
|---|---|
get(dag_ids) |
Get stats for one or more DAGs. |
Examples:
stats = client.dag_stats.get(dag_ids=["my_dag", "other_dag"])
for dag_stat in stats.dag_stats:
print(dag_stat.dag_id)
for s in dag_stat.stats:
print(f" {s.state}: {s.count}")Inspect version history of your DAGs.
| Method | Description |
|---|---|
list(dag_id, limit, offset) |
List all versions of a DAG. |
get(dag_id, version_number) |
Get a specific version. |
Examples:
versions = client.dag_versions.list("my_dag")
for v in versions.dag_versions:
print(v.version_number, v.created_at, v.bundle_name)
# Get a specific version
v = client.dag_versions.get("my_dag", 3)Retrieve import or configuration warnings for DAGs.
| Method | Description |
|---|---|
list(limit, offset) |
List all DAG warnings. |
Examples:
warnings = client.dag_warnings.list()
for w in warnings.dag_warnings:
print(w.dag_id, w.warning_type, w.message)Manage Airflow Variables (key-value configuration store).
| Method | Description |
|---|---|
list(limit, offset) |
List all variables. |
get(variable_key) |
Get a variable by key. |
create(request) |
Create a new variable. |
patch(variable_key, update_mask, **kwargs) |
Update an existing variable. |
delete(variable_key) |
Delete a variable. |
Examples:
from src.services.admin.admin_models import VariableCreateRequest
# Create a variable
client.variables.create(VariableCreateRequest(
key="api_url",
value="https://myapi.example.com",
description="External API endpoint"
))
# Read a variable
var = client.variables.get("api_url")
print(var.key, var.value, var.description)
# Update a variable value
client.variables.patch("api_url", value="https://newapi.example.com")
# List all variables
all_vars = client.variables.list(limit=50)
for v in all_vars.variables:
print(v.key, v.value)
# Delete a variable
client.variables.delete("api_url")Manage Airflow Connections (secrets for external services).
| Method | Description |
|---|---|
list(limit, offset) |
List all connections. |
get(connection_id) |
Get a connection by ID. |
create(request) |
Create a new connection. |
patch(connection_id, update_mask, **kwargs) |
Update a connection. |
delete(connection_id) |
Delete a connection. |
Examples:
from src.services.admin.admin_models import ConnectionCreateRequest
# Create a Postgres connection
client.connections.create(ConnectionCreateRequest(
connection_id="my_postgres",
conn_type="postgres",
host="db.example.com",
login="admin",
password="secret",
port=5432,
schema="mydb"
))
# Get a connection
conn = client.connections.get("my_postgres")
print(conn.host, conn.port, conn.schema_)
# List all connections
conns = client.connections.list()
for c in conns.connections:
print(c.connection_id, c.conn_type)
# Update host
client.connections.patch("my_postgres", host="newdb.example.com")
# Delete
client.connections.delete("my_postgres")Manage Airflow task execution Pools to limit concurrency.
| Method | Description |
|---|---|
list(limit, offset) |
List all pools. |
get(pool_name) |
Get a specific pool. |
create(request) |
Create a new pool. |
patch(pool_name, update_mask, **kwargs) |
Update a pool. |
delete(pool_name) |
Delete a pool. |
Examples:
from src.services.admin.admin_models import PoolCreateRequest
# Create a pool
pool = client.pools.create(PoolCreateRequest(
name="etl_pool",
slots=5,
description="Pool for ETL pipeline tasks"
))
print(pool.name, pool.slots, pool.open_slots)
# Get pool status
pool = client.pools.get("etl_pool")
print(f"Running: {pool.running_slots} | Queued: {pool.queued_slots} | Open: {pool.open_slots}")
# Update slot count
client.pools.patch("etl_pool", slots=10)
# Delete pool
client.pools.delete("etl_pool")Read Airflow server configuration values.
| Method | Description |
|---|---|
get_config() |
Get the full Airflow config. |
get_value(section, option) |
Get a single config option. |
Examples:
# Get full config
config = client.config.get_config()
for section in config.sections:
print(f"[{section.name}]")
for opt in section.options:
print(f" {opt.key} = {opt.value}")
# Get a single value
val = client.config.get_value("core", "dags_folder")
print(val)Retrieve the Airflow audit event log.
| Method | Description |
|---|---|
list(limit, offset) |
List event log entries. |
get(event_log_id) |
Get a specific event log entry. |
Examples:
logs = client.event_logs.list(limit=20)
for log in logs.get("event_logs", []):
print(log)
# Get a specific log entry
entry = client.event_logs.get("12345")Inspect DAG import errors from the Airflow scheduler.
| Method | Description |
|---|---|
list(limit, offset) |
List all import errors. |
get(import_error_id) |
Get a specific import error. |
Examples:
errors = client.import_errors.list()
for err in errors.get("import_errors", []):
print(err)List active Airflow Scheduler/Webserver jobs.
| Method | Description |
|---|---|
list(limit, offset) |
List all active jobs. |
Examples:
jobs = client.jobs.list()
print(jobs)List installed Airflow plugins.
| Method | Description |
|---|---|
list(limit, offset) |
List all plugins. |
Examples:
plugins = client.plugins.list()
for plugin in plugins.plugins:
print(plugin.name, plugin.source)List installed Airflow provider packages.
| Method | Description |
|---|---|
list(limit, offset) |
List all providers. |
Examples:
providers = client.providers.list()
for p in providers.providers:
print(p.package_name, p.version)Work with Airflow Data Assets (dataset-driven scheduling).
| Method | Description |
|---|---|
list(limit, offset) |
List all assets. |
get(asset_id) |
Get a specific asset. |
events(limit, offset) |
List asset events. |
Examples:
# List all assets
assets = client.assets.list()
for asset in assets.assets:
print(asset.uri, asset.extra)
# Get a specific asset
asset = client.assets.get(42)
# List asset events
events = client.assets.events()Create and manage historical DAG Backfill runs.
| Method | Description |
|---|---|
list(limit, offset) |
List all backfills. |
get(backfill_id) |
Get a specific backfill. |
create(dag_id, from_date, to_date, **kwargs) |
Create a new backfill. |
pause(backfill_id) |
Pause a running backfill. |
unpause(backfill_id) |
Resume a paused backfill. |
cancel(backfill_id) |
Cancel a backfill. |
Examples:
# Create a backfill for a date range
backfill = client.backfills.create(
dag_id="my_dag",
from_date="2024-01-01T00:00:00Z",
to_date="2024-03-31T23:59:59Z"
)
print(backfill.backfill_id, backfill.state)
# Pause a running backfill
client.backfills.pause(backfill.backfill_id)
# Resume it
client.backfills.unpause(backfill.backfill_id)
# Cancel entirely
client.backfills.cancel(backfill.backfill_id)Several creation methods accept Pydantic request models for structured, validated inputs:
from src.services.admin.admin_models import (
VariableCreateRequest,
ConnectionCreateRequest,
PoolCreateRequest,
)
# Variable
req = VariableCreateRequest(key="my_key", value="my_value", description="optional")
# Connection
req = ConnectionCreateRequest(
connection_id="my_s3",
conn_type="aws",
host="s3.amazonaws.com",
login="access_key_id",
password="secret_access_key"
)
# Pool
req = PoolCreateRequest(name="my_pool", slots=10, description="optional")All SDK methods return typed Pydantic models. This provides full attribute access and IDE autocompletion.
| Model | Key Fields |
|---|---|
DagModel |
dag_id, is_paused, is_active, fileloc, owners, tags |
DagCollection |
dags: List[DagModel], total_entries: int |
DagRunModel |
dag_run_id, dag_id, state, run_type, conf, logical_date |
DagRunCollection |
dag_runs: List[DagRunModel], total_entries: int |
TaskModel |
task_id, retries, pool, queue, downstream_task_ids |
TaskCollection |
tasks: List[TaskModel], total_entries: int |
TaskInstanceModel |
task_id, dag_id, state, duration, try_number, operator |
TaskInstanceCollection |
task_instances: List[TaskInstanceModel], total_entries: int |
DagVersionModel |
version_number, dag_id, bundle_name, created_at |
DagWarningModel |
dag_id, warning_type, message, timestamp |
| Model | Key Fields |
|---|---|
VariableModel |
key, value, description |
ConnectionModel |
connection_id, conn_type, host, login, port, schema_ |
PoolModel |
name, slots, occupied_slots, running_slots, queued_slots, open_slots |
ConfigModel |
sections: List[ConfigSectionModel] |
ProviderModel |
package_name, description, version |
PluginModel |
name, source |
The SDK raises specific exceptions defined in src/exceptions.py. Catch them precisely for robust error management.
AirflowAPIError ← Base exception
├── AirflowAuthError ← 401 Unauthorized / 403 Forbidden
└── AirflowRequestError ← Any HTTP error (has .status_code, .response_data)
├── AirflowRateLimitError ← 429 Too Many Requests (auto-retried)
└── AirflowNotFoundError ← 404 Not Found
from src.sdk import AirflowSDK
from src.exceptions import (
AirflowAuthError,
AirflowNotFoundError,
AirflowRateLimitError,
AirflowRequestError,
AirflowAPIError,
)
with AirflowSDK("http://localhost:8080", "admin", "admin") as client:
try:
dag = client.dags.get("non_existent_dag")
except AirflowNotFoundError as e:
print(f"DAG not found: {e}")
print(f"HTTP Status: {e.status_code}")
except AirflowAuthError:
print("Authentication failed. Check username/password.")
except AirflowRateLimitError:
print("Rate limit hit. Request will be retried automatically.")
except AirflowRequestError as e:
print(f"HTTP error {e.status_code}: {e.response_data}")
except AirflowAPIError as e:
print(f"SDK error: {e}")Tip:
AirflowRateLimitError(429) and server errors (502, 503, 504) are automatically retried with exponential backoff. The number of retries is controlled by theretriesparameter in the SDK constructor (default: 3).
All collection endpoints support limit and offset for pagination. Responses include total_entries so you can calculate the total number of pages.
PAGE_SIZE = 50
# Get first page
page1 = client.dags.list(limit=PAGE_SIZE, offset=0)
total = page1.total_entries
print(f"Page 1: Got {len(page1.dags)} of {total} total DAGs")
# Get second page
page2 = client.dags.list(limit=PAGE_SIZE, offset=PAGE_SIZE)
# Iterate through all pages
all_dags = []
offset = 0
while True:
result = client.dags.list(limit=PAGE_SIZE, offset=offset)
all_dags.extend(result.dags)
offset += PAGE_SIZE
if offset >= result.total_entries:
break
print(f"Fetched all {len(all_dags)} DAGs")The async client is ideal for running many API calls in parallel:
import asyncio
from src.async_sdk import AsyncAirflowSDK
async def monitor_dags(dag_ids: list):
async with AsyncAirflowSDK("http://localhost:8080", "admin", "admin") as client:
# Fetch details for multiple DAGs concurrently
tasks = [client.dags.get(dag_id) for dag_id in dag_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
for dag_id, result in zip(dag_ids, results):
if isinstance(result, Exception):
print(f"{dag_id}: ERROR — {result}")
else:
print(f"{dag_id}: paused={result.is_paused}, active={result.is_active}")
asyncio.run(monitor_dags(["dag_1", "dag_2", "dag_3"]))MIT License