Running Apache Airflow DAG with Docker
In this article, we are going to run the sample dynamic DAG using docker. Before that, let's get a quick idea about the airflow and some of its terms.
Join the DZone community and get the full member experience.
Join For FreeIn this article, we are going to run the sample dynamic DAG using docker. Before that, let's get a quick idea about the airflow and some of its terms.
What is Airflow?
Airflow is a workflow engine which is responsible for managing and scheduling running jobs and data pipelines. It ensures that the jobs are ordered correctly based on dependencies and also manages the allocation of resources and failures.
Before going forward, let's get familiar with the terms:
- Task or Operator: A defined unit of work.
- Task instance: An individual run of a single task. The states could be running, success, failed, skipped, and up for retry.
- DAG (Directed Acyclic Graph): A set of tasks with an execution order.
- DAG Run: Individual DAG run.
- Web Server: It is the UI of airflow, it also allows us to manage users, roles, and different configurations for the Airflow setup.
- Scheduler: Schedules the jobs or orchestrates the tasks. It uses the DAGs object to decide what tasks need to be run, when, and where.
- Executor: Executes the tasks. There are different types of executors:
- Sequential: Runs one task instance at a time.
- Local: Runs tasks by spawning processes in a controlled fashion in different modes.
- Celery: An asynchronous task queue/job queue based on distributed message passing. For CeleryExecutor, one needs to set up a queue (Redis, RabbitMQ or any other task broker supported by Celery) on which all the celery workers running keep on polling for any new tasks to run
- Kubernetes: Provides a way to run Airflow tasks on Kubernetes, Kubernetes launch a new pod for each task.
- Metadata Database: Stores the Airflow states. Airflow uses SqlAlchemy and Object Relational Mapping (ORM) written in Python to connect to the metadata database.
Now that we are familiar with the terms, let's get started.
Any time, you run into permission issue execute:
xxxxxxxxxx
chmod 777 <file>
Let's create the scripts folder and create a script called airflow-entrypoint.sh for running initdb and webserver.
scripts/airflow-entrypoint.sh
xxxxxxxxxx
airflow upgradedb
airflow webserver
Let's now create the docker-compose file:
docker-compose.yml
xxxxxxxxxx
version: "2.1"
services:
postgres:
image: postgres:12
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
ports:
- "5433:5432"
scheduler:
image: apache/airflow
restart: always
depends_on:
- postgres
- webserver
env_file:
- .env
ports:
- "8793:8793"
volumes:
- ./dags:/opt/airflow/dags
- ./airflow-logs:/opt/airflow/logs
command: scheduler
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
webserver:
image: apache/airflow
hostname: webserver
restart: always
depends_on:
- postgres
env_file:
- .env
volumes:
- ./dags:/opt/airflow/dags
- ./scripts:/opt/airflow/scripts
- ./airflow-logs:/opt/airflow/logs
ports:
- "8080:8080"
entrypoint: ./scripts/airflow-entrypoint.sh
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 32
Now, create a .env file for environment variables.
.env
xxxxxxxxxx
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgres+psycopg2://airflow:airflow@postgres:5432/airflow
AIRFLOW__CORE__FERNET_KEY=81HqDtbqAywKSOumSha3BhWNOdQ26slT6K0YaZeZyPs=
AIRFLOW_CONN_METADATA_DB=postgres+psycopg2://airflow:airflow@postgres:5432/airflow
AIRFLOW_VAR__METADATA_DB_SCHEMA=airflow
AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC=10
At last, create a folder called dags and a file hello-airflow.py
hello-airflow.py
xxxxxxxxxx
import codecs
import logging
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils import dates
logging.basicConfig(format="%(name)s-%(levelname)s-%(asctime)s-%(message)s", level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def create_dag(dag_id):
default_args = {
"owner": "jyoti",
"description": (
"DAG to explain airflow concepts"
),
"depends_on_past": False,
"start_date": dates.days_ago(1),
"retries": 1,
"retry_delay": timedelta(minutes=1),
"provide_context": True,
}
new_dag = DAG(
dag_id,
default_args=default_args,
schedule_interval=timedelta(minutes=5),
)
def task_1(**kwargs):
logger.info('=====Executing Task 1=============')
return kwargs['message']
def task_2(**kwargs):
logger.info('=====Executing Task 2=============')
task_instance = kwargs['ti']
result = task_instance.xcom_pull(key=None, task_ids='Task_1')
logger.info('Extracted the value from task 1')
logger.info(result)
with new_dag:
task1 = PythonOperator(task_id='Task_1',
python_callable=task_1,
op_kwargs=
{
'message': 'hellow airflow'
},
provide_context=True)
task2 = PythonOperator(task_id='Task_2',
python_callable=task_2,
op_kwargs=None,
provide_context=True)
task2.set_upstream(task1)
return new_dag
dag_id = "hello_airflow1"
globals()[dag_id] = create_dag(dag_id)
We have created two tasks(operators) here, task 2 will execute when task 1 completes, defined by task2.set_upstream(task1).
Task 1 pushes the data to task 2 for that we just need to set provide_context=True. The data will be pulled in task 2 using the task instance and the task id.
task_instance.xcom_pull(key=None, task_ids='Task_1')
That is all, now run docker-compose -f docker-compose.yml up and access the airflow on http://0.0.0.0:8080
Thanks for reading!
Published at DZone with permission of Jyoti Sachdeva. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments