DMS Airflow: Professional Practices for Enterprise-Grade Data Workflow Orchestration

DMS Airflow: Professional Practices for Enterprise-Grade Data Workflow Orchestration

DMS Airflow — Enterprise Data Workflow Orchestration Guide

image
image

Article #127 of 2025

(Estimated reading time: 15 minutes)

---

Introduction

DMS Airflow is an enterprise-grade data workflow orchestration platform built on Apache Airflow. With deep integration into Alibaba Cloud DMS (Data Management Service), it offers:

  • Advanced task orchestration
  • Powerful scheduling & monitoring
  • Unified management features

In this guide, we'll explore:

  • Airflow's advanced orchestration
  • Special enterprise capabilities via DMS integration
  • Practical usage examples

---

01 — Advanced Orchestration with Airflow

1.1 Defining DAGs (Directed Acyclic Graphs)

At the core of Airflow is the DAG, defining task dependencies and execution order.

Highlights:

  • Python-defined: Version control + reviews
  • Dynamic generation: Config/data-based DAG creation
  • Templating: Jinja2 parameterization

Example:

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'complex_etl_pipeline',
    default_args=default_args,
    description='Complex ETL Data Pipeline',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'production']
)

extract_task = BashOperator(
    task_id='extract_data',
    bash_command='python /scripts/extract.py --date {{ ds }}',
    dag=dag
)

transform_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform_function,
    op_kwargs={'date': '{{ ds }}'},
    dag=dag
)

load_task = BashOperator(
    task_id='load_data',
    bash_command='python /scripts/load.py --date {{ ds }}',
    dag=dag
)

extract_task >> transform_task >> load_task

---

1.2 Task Dependency Management

Dependency Operators:

  • `>>`, `<<` — sequential order
  • `.set_upstream() / .set_downstream()` — explicit control
  • `cross_downstream()` — batch downstream assignment
  • `chain()` — chained dependencies

Complex Example:

from airflow.operators.dummy import DummyOperator
from airflow.utils.helpers import chain

branch_task = DummyOperator(task_id='branch', dag=dag)
task_a = DummyOperator(task_id='task_a', dag=dag)
task_b = DummyOperator(task_id='task_b', dag=dag)
task_c = DummyOperator(task_id='task_c', dag=dag)
merge_task = DummyOperator(task_id='merge', dag=dag)

branch_task >> [task_a, task_b, task_c] >> merge_task

chain(
    extract_task,
    [transform_task_1, transform_task_2],
    load_task
)

---

1.3 Scheduling & Time Triggers

Scheduling Options:

  • Cron expressions: `'0 0 *'`
  • Presets: `@daily`, `@hourly`
  • Time delta: `timedelta(hours=2)`
  • Manual: `None`

Template Variables:

`{{ ds }}`, `{{ ds_nodash }}`, `{{ ts }}`, `{{ yesterday_ds }}`, `{{ next_ds }}`

---

1.4 Task State & Retry Management

States: None, Scheduled, Queued, Running, Success, Failed, Skipped, Retry, Up for retry

Retry Example:

task = PythonOperator(
    task_id='unreliable_task',
    python_callable=unreliable_function,
    retries=3,
    retry_delay=timedelta(minutes=5),
    retry_exponential_backoff=True,
    max_retry_delay=timedelta(hours=1),
    dag=dag
)

---

1.5 Data-Aware Scheduling (Datasets)

Introduced in Airflow 2.4+, Datasets trigger DAGs when specific data updates.

Example:

from airflow import Dataset
raw_data = Dataset("s3://bucket/raw-data/")
processed_data = Dataset("s3://bucket/processed-data/")

---

1.6 Dynamic Task Generation

Generate tasks at runtime from configs or DB queries.

Example:

def generate_tasks():
    configs = [{'table': 'users', 'database': 'db1'}]
    tasks = []
    for cfg in configs:
        tasks.append(PythonOperator(
            task_id=f"process_{cfg['table']}",
            python_callable=process_table,
            op_kwargs=cfg,
            dag=dag
        ))
    return tasks

dynamic_tasks = generate_tasks()

---

1.7 Task Groups & SubDAGs

TaskGroup:

from airflow.utils.task_group import TaskGroup

with TaskGroup('etl_group') as etl:
    extract = BashOperator(task_id='extract', ...)
    transform = PythonOperator(task_id='transform', ...)
    load = BashOperator(task_id='load', ...)
    extract >> transform >> load

---

1.8 Passing Data via XCom

Example:

def extract_function(**context):
    return {'records': 1000}

def transform_function(**context):
    data = context['ti'].xcom_pull(task_ids='extract')
    return data['records'] * 2

---

02 — Special DMS Integration Capabilities

2.1 Unified Auth & Role Mapping

  • SSO via DmsAuthManager
  • Unified permissions mapped to Airflow roles

2.2 Integrated DMS Services

  • Enterprise SQL API
  • AnalyticDB API
  • DTS API
  • Notebook API

---

2.3 Intelligent Resource Management

Auto Scaling

Dynamic worker scaling via load monitoring, sliding window smoothing, and Kubernetes API.

[scale]
worker_num_min = 2
worker_num_max = 20
polling_interval = 30

Resource Groups

  • Interactive
  • Batch
  • Warehouse

---

2.4 Dynamic DAG Refresh

Reload DAGs via API without restarting Airflow.

2.5 Log Optimization

Stack trace filtering for concise logs.

2.6 Instance → Cluster ID Mapping

Resolve AnalyticDB cluster ID from DMS instance names.

---

2.7 Monitoring

  • Task execution metrics
  • Resource use statistics
  • Custom business metrics
  • Central log analysis (SLS)

2.8 Security

  • POP signature auth
  • Auto token refresh
  • RBAC access control
  • Encrypted connections

---

03 — Usage Examples

3.1 SQL Execution

sql_task = DMSSqlOperator(
    task_id='execute_sql',
    instance='production_db',
    database='analytics',
    sql='SELECT COUNT(*) FROM user_log WHERE date = "{{ ds }}"',
    dag=dag
)

---

3.2 Spark Processing

spark_sql_task = DMSAnalyticDBSparkSqlOperator(
    task_id='spark_sql_analysis',
    cluster_id='adb-cluster-001',
    resource_group='interactive-spark',
    sql='SELECT ...',
    schema='analytics',
    dag=dag
)

spark_job_task = DMSAnalyticDBSparkOperator(
    task_id='spark_batch_job',
    cluster_id='adb-cluster-001',
    resource_group='batch-job',
    sql='your_spark_sql_here',
    app_type='SQL',
    app_name='daily_etl_job',
    dag=dag
)

---

3.3 DTS Synchronization

dts_task = DTSLakeInjectionOperator(
    task_id='sync_to_data_lake',
    source_instance='source_rds',
    target_instance='target_oss',
    bucket_name='data-lake-bucket',
    dag=dag
)

---

3.4 Notebook Execution

notebook_task = DMSNotebookOperator(
    task_id='run_ml_training',
    file_path='notebooks/model_training.ipynb',
    runtime_name='python3.9',
    timeout=7200,
    dag=dag
)

---

3.5 Notifications

def notify_on_failure(context):
    SLSNotifier(...).notify(context)
    CloudMonitorNotifier(...).notify(context)

dag = DAG(..., on_failure_callback=notify_on_failure)

---

3.6 Complete ETL Workflow

Steps:

  • Sync data via DTS
  • Validate data via SQL
  • Transform via Spark
  • Generate report via SQL
  • Notify on failure

---

Summary

DMS Airflow Advantages:

  • Unified auth & integration with DMS
  • Rich SQL/Spark/DTS/Notebook support
  • Intelligent scaling & resource grouping
  • Enterprise monitoring & notifications
  • Strong security

Use Cases:

  • ETL workflows
  • Analytics & reporting
  • ML training
  • Data migration
  • Scheduled orchestration

Resources:

image
image

---

Would you like me to prepare a quick-reference cheat sheet version of these DMS Airflow capabilities for faster onboarding?

Read more

Translate the following blog post title into English, concise and natural. Return plain text only without quotes. 哈佛大学 R 编程课程介绍

Harvard CS50: Introduction to Programming with R Harvard University offers exceptional beginner-friendly computer science courses. We’re excited to announce the release of Harvard CS50’s Introduction to Programming in R, a powerful language widely used for statistical computing, data science, and graphics. This course was developed by Carter Zenke.