DMS Airflow: Professional Practices for Enterprise-Grade Data Workflow Orchestration
DMS Airflow — Enterprise Data Workflow Orchestration Guide


Article #127 of 2025
(Estimated reading time: 15 minutes)
---
Introduction
DMS Airflow is an enterprise-grade data workflow orchestration platform built on Apache Airflow. With deep integration into Alibaba Cloud DMS (Data Management Service), it offers:
- Advanced task orchestration
- Powerful scheduling & monitoring
- Unified management features
In this guide, we'll explore:
- Airflow's advanced orchestration
- Special enterprise capabilities via DMS integration
- Practical usage examples
---
01 — Advanced Orchestration with Airflow
1.1 Defining DAGs (Directed Acyclic Graphs)
At the core of Airflow is the DAG, defining task dependencies and execution order.
Highlights:
- Python-defined: Version control + reviews
- Dynamic generation: Config/data-based DAG creation
- Templating: Jinja2 parameterization
Example:
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'complex_etl_pipeline',
default_args=default_args,
description='Complex ETL Data Pipeline',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['etl', 'production']
)
extract_task = BashOperator(
task_id='extract_data',
bash_command='python /scripts/extract.py --date {{ ds }}',
dag=dag
)
transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform_function,
op_kwargs={'date': '{{ ds }}'},
dag=dag
)
load_task = BashOperator(
task_id='load_data',
bash_command='python /scripts/load.py --date {{ ds }}',
dag=dag
)
extract_task >> transform_task >> load_task---
1.2 Task Dependency Management
Dependency Operators:
- `>>`, `<<` — sequential order
- `.set_upstream() / .set_downstream()` — explicit control
- `cross_downstream()` — batch downstream assignment
- `chain()` — chained dependencies
Complex Example:
from airflow.operators.dummy import DummyOperator
from airflow.utils.helpers import chain
branch_task = DummyOperator(task_id='branch', dag=dag)
task_a = DummyOperator(task_id='task_a', dag=dag)
task_b = DummyOperator(task_id='task_b', dag=dag)
task_c = DummyOperator(task_id='task_c', dag=dag)
merge_task = DummyOperator(task_id='merge', dag=dag)
branch_task >> [task_a, task_b, task_c] >> merge_task
chain(
extract_task,
[transform_task_1, transform_task_2],
load_task
)---
1.3 Scheduling & Time Triggers
Scheduling Options:
- Cron expressions: `'0 0 *'`
- Presets: `@daily`, `@hourly`
- Time delta: `timedelta(hours=2)`
- Manual: `None`
Template Variables:
`{{ ds }}`, `{{ ds_nodash }}`, `{{ ts }}`, `{{ yesterday_ds }}`, `{{ next_ds }}`
---
1.4 Task State & Retry Management
States: None, Scheduled, Queued, Running, Success, Failed, Skipped, Retry, Up for retry
Retry Example:
task = PythonOperator(
task_id='unreliable_task',
python_callable=unreliable_function,
retries=3,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True,
max_retry_delay=timedelta(hours=1),
dag=dag
)---
1.5 Data-Aware Scheduling (Datasets)
Introduced in Airflow 2.4+, Datasets trigger DAGs when specific data updates.
Example:
from airflow import Dataset
raw_data = Dataset("s3://bucket/raw-data/")
processed_data = Dataset("s3://bucket/processed-data/")---
1.6 Dynamic Task Generation
Generate tasks at runtime from configs or DB queries.
Example:
def generate_tasks():
configs = [{'table': 'users', 'database': 'db1'}]
tasks = []
for cfg in configs:
tasks.append(PythonOperator(
task_id=f"process_{cfg['table']}",
python_callable=process_table,
op_kwargs=cfg,
dag=dag
))
return tasks
dynamic_tasks = generate_tasks()---
1.7 Task Groups & SubDAGs
TaskGroup:
from airflow.utils.task_group import TaskGroup
with TaskGroup('etl_group') as etl:
extract = BashOperator(task_id='extract', ...)
transform = PythonOperator(task_id='transform', ...)
load = BashOperator(task_id='load', ...)
extract >> transform >> load---
1.8 Passing Data via XCom
Example:
def extract_function(**context):
return {'records': 1000}
def transform_function(**context):
data = context['ti'].xcom_pull(task_ids='extract')
return data['records'] * 2---
02 — Special DMS Integration Capabilities
2.1 Unified Auth & Role Mapping
- SSO via DmsAuthManager
- Unified permissions mapped to Airflow roles
2.2 Integrated DMS Services
- Enterprise SQL API
- AnalyticDB API
- DTS API
- Notebook API
---
2.3 Intelligent Resource Management
Auto Scaling
Dynamic worker scaling via load monitoring, sliding window smoothing, and Kubernetes API.
[scale]
worker_num_min = 2
worker_num_max = 20
polling_interval = 30Resource Groups
- Interactive
- Batch
- Warehouse
---
2.4 Dynamic DAG Refresh
Reload DAGs via API without restarting Airflow.
2.5 Log Optimization
Stack trace filtering for concise logs.
2.6 Instance → Cluster ID Mapping
Resolve AnalyticDB cluster ID from DMS instance names.
---
2.7 Monitoring
- Task execution metrics
- Resource use statistics
- Custom business metrics
- Central log analysis (SLS)
2.8 Security
- POP signature auth
- Auto token refresh
- RBAC access control
- Encrypted connections
---
03 — Usage Examples
3.1 SQL Execution
sql_task = DMSSqlOperator(
task_id='execute_sql',
instance='production_db',
database='analytics',
sql='SELECT COUNT(*) FROM user_log WHERE date = "{{ ds }}"',
dag=dag
)---
3.2 Spark Processing
spark_sql_task = DMSAnalyticDBSparkSqlOperator(
task_id='spark_sql_analysis',
cluster_id='adb-cluster-001',
resource_group='interactive-spark',
sql='SELECT ...',
schema='analytics',
dag=dag
)
spark_job_task = DMSAnalyticDBSparkOperator(
task_id='spark_batch_job',
cluster_id='adb-cluster-001',
resource_group='batch-job',
sql='your_spark_sql_here',
app_type='SQL',
app_name='daily_etl_job',
dag=dag
)---
3.3 DTS Synchronization
dts_task = DTSLakeInjectionOperator(
task_id='sync_to_data_lake',
source_instance='source_rds',
target_instance='target_oss',
bucket_name='data-lake-bucket',
dag=dag
)---
3.4 Notebook Execution
notebook_task = DMSNotebookOperator(
task_id='run_ml_training',
file_path='notebooks/model_training.ipynb',
runtime_name='python3.9',
timeout=7200,
dag=dag
)---
3.5 Notifications
def notify_on_failure(context):
SLSNotifier(...).notify(context)
CloudMonitorNotifier(...).notify(context)
dag = DAG(..., on_failure_callback=notify_on_failure)---
3.6 Complete ETL Workflow
Steps:
- Sync data via DTS
- Validate data via SQL
- Transform via Spark
- Generate report via SQL
- Notify on failure
---
Summary
DMS Airflow Advantages:
- Unified auth & integration with DMS
- Rich SQL/Spark/DTS/Notebook support
- Intelligent scaling & resource grouping
- Enterprise monitoring & notifications
- Strong security
Use Cases:
- ETL workflows
- Analytics & reporting
- ML training
- Data migration
- Scheduled orchestration
Resources:


---
Would you like me to prepare a quick-reference cheat sheet version of these DMS Airflow capabilities for faster onboarding?