DMS Airflow: Professional Practices for Enterprise-Grade Data Workflow Orchestration

DMS Airflow: Professional Practices for Enterprise-Grade Data Workflow Orchestration

DMS Airflow — Enterprise Data Workflow Orchestration Guide

image
image

Article #127 of 2025

(Estimated reading time: 15 minutes)

---

Introduction

DMS Airflow is an enterprise-grade data workflow orchestration platform built on Apache Airflow. With deep integration into Alibaba Cloud DMS (Data Management Service), it offers:

  • Advanced task orchestration
  • Powerful scheduling & monitoring
  • Unified management features

In this guide, we'll explore:

  • Airflow's advanced orchestration
  • Special enterprise capabilities via DMS integration
  • Practical usage examples

---

01 — Advanced Orchestration with Airflow

1.1 Defining DAGs (Directed Acyclic Graphs)

At the core of Airflow is the DAG, defining task dependencies and execution order.

Highlights:

  • Python-defined: Version control + reviews
  • Dynamic generation: Config/data-based DAG creation
  • Templating: Jinja2 parameterization

Example:

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'complex_etl_pipeline',
    default_args=default_args,
    description='Complex ETL Data Pipeline',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'production']
)

extract_task = BashOperator(
    task_id='extract_data',
    bash_command='python /scripts/extract.py --date {{ ds }}',
    dag=dag
)

transform_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform_function,
    op_kwargs={'date': '{{ ds }}'},
    dag=dag
)

load_task = BashOperator(
    task_id='load_data',
    bash_command='python /scripts/load.py --date {{ ds }}',
    dag=dag
)

extract_task >> transform_task >> load_task

---

1.2 Task Dependency Management

Dependency Operators:

  • `>>`, `<<` — sequential order
  • `.set_upstream() / .set_downstream()` — explicit control
  • `cross_downstream()` — batch downstream assignment
  • `chain()` — chained dependencies

Complex Example:

from airflow.operators.dummy import DummyOperator
from airflow.utils.helpers import chain

branch_task = DummyOperator(task_id='branch', dag=dag)
task_a = DummyOperator(task_id='task_a', dag=dag)
task_b = DummyOperator(task_id='task_b', dag=dag)
task_c = DummyOperator(task_id='task_c', dag=dag)
merge_task = DummyOperator(task_id='merge', dag=dag)

branch_task >> [task_a, task_b, task_c] >> merge_task

chain(
    extract_task,
    [transform_task_1, transform_task_2],
    load_task
)

---

1.3 Scheduling & Time Triggers

Scheduling Options:

  • Cron expressions: `'0 0 *'`
  • Presets: `@daily`, `@hourly`
  • Time delta: `timedelta(hours=2)`
  • Manual: `None`

Template Variables:

`{{ ds }}`, `{{ ds_nodash }}`, `{{ ts }}`, `{{ yesterday_ds }}`, `{{ next_ds }}`

---

1.4 Task State & Retry Management

States: None, Scheduled, Queued, Running, Success, Failed, Skipped, Retry, Up for retry

Retry Example:

task = PythonOperator(
    task_id='unreliable_task',
    python_callable=unreliable_function,
    retries=3,
    retry_delay=timedelta(minutes=5),
    retry_exponential_backoff=True,
    max_retry_delay=timedelta(hours=1),
    dag=dag
)

---

1.5 Data-Aware Scheduling (Datasets)

Introduced in Airflow 2.4+, Datasets trigger DAGs when specific data updates.

Example:

from airflow import Dataset
raw_data = Dataset("s3://bucket/raw-data/")
processed_data = Dataset("s3://bucket/processed-data/")

---

1.6 Dynamic Task Generation

Generate tasks at runtime from configs or DB queries.

Example:

def generate_tasks():
    configs = [{'table': 'users', 'database': 'db1'}]
    tasks = []
    for cfg in configs:
        tasks.append(PythonOperator(
            task_id=f"process_{cfg['table']}",
            python_callable=process_table,
            op_kwargs=cfg,
            dag=dag
        ))
    return tasks

dynamic_tasks = generate_tasks()

---

1.7 Task Groups & SubDAGs

TaskGroup:

from airflow.utils.task_group import TaskGroup

with TaskGroup('etl_group') as etl:
    extract = BashOperator(task_id='extract', ...)
    transform = PythonOperator(task_id='transform', ...)
    load = BashOperator(task_id='load', ...)
    extract >> transform >> load

---

1.8 Passing Data via XCom

Example:

def extract_function(**context):
    return {'records': 1000}

def transform_function(**context):
    data = context['ti'].xcom_pull(task_ids='extract')
    return data['records'] * 2

---

02 — Special DMS Integration Capabilities

2.1 Unified Auth & Role Mapping

  • SSO via DmsAuthManager
  • Unified permissions mapped to Airflow roles

2.2 Integrated DMS Services

  • Enterprise SQL API
  • AnalyticDB API
  • DTS API
  • Notebook API

---

2.3 Intelligent Resource Management

Auto Scaling

Dynamic worker scaling via load monitoring, sliding window smoothing, and Kubernetes API.

[scale]
worker_num_min = 2
worker_num_max = 20
polling_interval = 30

Resource Groups

  • Interactive
  • Batch
  • Warehouse

---

2.4 Dynamic DAG Refresh

Reload DAGs via API without restarting Airflow.

2.5 Log Optimization

Stack trace filtering for concise logs.

2.6 Instance → Cluster ID Mapping

Resolve AnalyticDB cluster ID from DMS instance names.

---

2.7 Monitoring

  • Task execution metrics
  • Resource use statistics
  • Custom business metrics
  • Central log analysis (SLS)

2.8 Security

  • POP signature auth
  • Auto token refresh
  • RBAC access control
  • Encrypted connections

---

03 — Usage Examples

3.1 SQL Execution

sql_task = DMSSqlOperator(
    task_id='execute_sql',
    instance='production_db',
    database='analytics',
    sql='SELECT COUNT(*) FROM user_log WHERE date = "{{ ds }}"',
    dag=dag
)

---

3.2 Spark Processing

spark_sql_task = DMSAnalyticDBSparkSqlOperator(
    task_id='spark_sql_analysis',
    cluster_id='adb-cluster-001',
    resource_group='interactive-spark',
    sql='SELECT ...',
    schema='analytics',
    dag=dag
)

spark_job_task = DMSAnalyticDBSparkOperator(
    task_id='spark_batch_job',
    cluster_id='adb-cluster-001',
    resource_group='batch-job',
    sql='your_spark_sql_here',
    app_type='SQL',
    app_name='daily_etl_job',
    dag=dag
)

---

3.3 DTS Synchronization

dts_task = DTSLakeInjectionOperator(
    task_id='sync_to_data_lake',
    source_instance='source_rds',
    target_instance='target_oss',
    bucket_name='data-lake-bucket',
    dag=dag
)

---

3.4 Notebook Execution

notebook_task = DMSNotebookOperator(
    task_id='run_ml_training',
    file_path='notebooks/model_training.ipynb',
    runtime_name='python3.9',
    timeout=7200,
    dag=dag
)

---

3.5 Notifications

def notify_on_failure(context):
    SLSNotifier(...).notify(context)
    CloudMonitorNotifier(...).notify(context)

dag = DAG(..., on_failure_callback=notify_on_failure)

---

3.6 Complete ETL Workflow

Steps:

  • Sync data via DTS
  • Validate data via SQL
  • Transform via Spark
  • Generate report via SQL
  • Notify on failure

---

Summary

DMS Airflow Advantages:

  • Unified auth & integration with DMS
  • Rich SQL/Spark/DTS/Notebook support
  • Intelligent scaling & resource grouping
  • Enterprise monitoring & notifications
  • Strong security

Use Cases:

  • ETL workflows
  • Analytics & reporting
  • ML training
  • Data migration
  • Scheduled orchestration

Resources:

image
image

---

Would you like me to prepare a quick-reference cheat sheet version of these DMS Airflow capabilities for faster onboarding?

Read more

How AI Startups Can Effectively Analyze Competitors — Avoid the Feature List Trap and Redefine Your Battleground

How AI Startups Can Effectively Analyze Competitors — Avoid the Feature List Trap and Redefine Your Battleground

Competitive Analysis Is Not “Feature Comparison” — It’s Strategic Positioning This guide explains how AI startup teams can escape the trap of feature lists. Using concepts from user perception, product pacing, and capital narratives, we’ll build a cognitive framework for understanding competitors — and help you identify your differentiated battlefield

By Honghao Wang