4.4 KiB
airflow and luigi
airflow
-
apache airflow was a tool developed by airbnb in 2014 and later open-sourced
-
it is a platform to programmatically author, schedule, and monitor workflows. when workflows are defined as code, they become more maintainable, versionable, testable, and collaborative
-
you can use airflow to author workflows as directed acyclic graphs (DAGs) of tasks: the airflow scheduler executes your tasks on an array of workers while following the specified dependencies.
-
here is a very simple toy example of an airflow job that simply prints the date in bash every day after waiting for one second to pass, after the execution date is reached:
from datetime import datetime, timedelta
from airflow.models import DAG # Import the DAG class
from airflow.operators.bash_operator import BashOperator
from airflow.operators.sensors import TimeDeltaSensor
default_args = {
'owner': 'you',
'depends_on_past': False,
'start_date': datetime(2018, 1, 8),
}
dag = DAG(
dag_id='anatomy_of_a_dag',
description="This describes my DAG",
default_args=default_args,
schedule_interval=timedelta(days=1)) # This is a daily DAG.
# t0 and t1 are examples of tasks created by instantiating operators
t0 = TimeDeltaSensor(
task_id='wait_a_second',
delta=timedelta(seconds=1),
dag=dag)
t1 = BashOperator(
task_id='print_date_in_bash',
bash_command='date',
dag=dag)
t1.set_upstream(t0)
luigi
-
luigi data pipelining is spotify's python module that helps you build complex pipelines of batch jobs. it handles dependency resolution, workflow management, visualization, etc.
-
the basic units of Luigi are task classes that model an atomic ETL operation, in three parts: a requirements part that includes pointers to other tasks that need to run before this task, the data transformation step, and the output. All tasks can be feed into a final table (e.g. on Redshift) into one file.
import luigi
class WritePipelineTask(luigi.Task):
def output(self):
return luigi.LocalTarget("data/output_one.txt")
def run(self):
with self.output().open("w") as output_file:
output_file.write("pipeline")
class AddMyTask(luigi.Task):
def output(self):
return luigi.LocalTarget("data/output_two.txt")
def requires(self):
return WritePipelineTask()
def run(self):
with self.input().open("r") as input_file:
line = input_file.read()
with self.output().open("w") as output_file:
decorated_line = "My "+line
output_file.write(decorated_line)
airflow vs. luigi
airflow | luigi | |
---|---|---|
web dashboard | very nice | minimal |
Built-in scheduler | yes | no |
Separates output data and task state | yes | no |
calendar scheduling | yes | no, use cron |
parallelism | yes, workers | threads per workers |
finds new deployed tasks | yes | no |
persists state | yes, to db | sort of |
sync tasks to workers | yes | no |
scheduling | yes | no |