## airflow and luigi
### airflow
* **[apache airflow](https://github.com/apache/airflow)** was a tool **[developed by airbnb in 2014 and later open-sourced](https://medium.com/airbnb-engineering/airflow-a-workflow-management-platform-46318b977fd8)**
* it is a platform to programmatically author, schedule, and monitor workflows. when workflows are defined as code, they become more maintainable, versionable, testable, and collaborative
* you can use airflow to author workflows as directed acyclic graphs (DAGs) of tasks: the airflow scheduler executes your tasks on an array of workers while following the specified dependencies.
* here is **[a very simple toy example of an airflow job](https://gist.github.com/robert8138/c6e492d00cd7b7e7626670ba2ed32e6a)** that simply prints the date in bash every day after waiting for one second to pass, after the execution date is reached:
```python
from datetime import datetime, timedelta
from airflow.models import DAG # Import the DAG class
from airflow.operators.bash_operator import BashOperator
from airflow.operators.sensors import TimeDeltaSensor
default_args = {
'owner': 'you',
'depends_on_past': False,
'start_date': datetime(2018, 1, 8),
}
dag = DAG(
dag_id='anatomy_of_a_dag',
description="This describes my DAG",
default_args=default_args,
schedule_interval=timedelta(days=1)) # This is a daily DAG.
# t0 and t1 are examples of tasks created by instantiating operators
t0 = TimeDeltaSensor(
task_id='wait_a_second',
delta=timedelta(seconds=1),
dag=dag)
t1 = BashOperator(
task_id='print_date_in_bash',
bash_command='date',
dag=dag)
t1.set_upstream(t0)
```
---
### luigi
- **[luigi data pipelining](https://github.com/spotify/luigi)** is spotify's python module that helps you build complex pipelines of batch jobs. it handles dependency resolution, workflow management, visualization, etc.
- the basic units of Luigi are task classes that model an atomic ETL operation, in three parts: a requirements part that includes pointers to other tasks that need to run before this task, the data transformation step, and the output. All tasks can be feed into a final table (e.g. on Redshift) into one file.
- here is **[an example of a simple workflow in luigi](https://towardsdatascience.com/data-pipelines-luigi-airflow-everything-you-need-to-know-18dc741449b7)**:
```python
import luigi
class WritePipelineTask(luigi.Task):
def output(self):
return luigi.LocalTarget("data/output_one.txt")
def run(self):
with self.output().open("w") as output_file:
output_file.write("pipeline")
class AddMyTask(luigi.Task):
def output(self):
return luigi.LocalTarget("data/output_two.txt")
def requires(self):
return WritePipelineTask()
def run(self):
with self.input().open("r") as input_file:
line = input_file.read()
with self.output().open("w") as output_file:
decorated_line = "My "+line
output_file.write(decorated_line)
```
----
### airflow vs. luigi
| | airflow | luigi |
|---------------------------------------|-----------------------|------------------------|
| web dashboard | very nice | minimal |
| Built-in scheduler | yes | no |
| Separates output data and task state | yes | no |
| calendar scheduling | yes | no, use cron |
| parallelism | yes, workers | threads per workers |
| finds new deployed tasks | yes | no |
| persists state | yes, to db | sort of |
| sync tasks to workers | yes | no |
| scheduling | yes | no |
---
### cool resources
* **[incubator airflow data pipelining](https://github.com/apache/incubator-airflow)**
* **[awesome airflow Resources](https://github.com/jghoman/awesome-apache-airflow)**
* **[airflow in kubernetes](https://github.com/rolanddb/airflow-on-kubernetes)**
* **[astronomer: airflow as a service](https://github.com/astronomer/astronomer)**