Use Case
Airflow -> AWS S3 (JSON/Parquet File)
Use this program to upload stock fundamentals data and stock quotes stats to AWS S3. Airflow Dag submits 5000+ tickers to Airflow Celery works using Task/Task Group Mapping. Task groups/tasks perform http get from the service provider and upload the data to AWS S3.
Libraries/Tools Used
- Airflow
- AWS Data Wrangler
Dataflow Diagram
Airflow Architecture
Source: Airflow Documentation
Task Mapping Flow
Task Group


Output File

Session 1
- Discuss environment setup
- Airflow Task and Task Group
- Highlevel discuss of Dask Vs Airflow Task Mapping
Video explanation of the code
Source Code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
from datetime import datetime | |
from airflow import DAG | |
from airflow.decorators import task_group, task | |
from datetime import datetime, timedelta | |
from airflow import DAG | |
import sys | |
sys.path.append('/home/airflow/') | |
default_args = { | |
'owner': 'airflow', | |
'depends_on_past': False, | |
'email': ['contactus@sravz.com'], | |
'email_on_failure': True, | |
'email_on_retry': True, | |
'retries': 1, | |
'retry_delay': timedelta(minutes=5), | |
} | |
with DAG( | |
'sravz_upload_stock_fundamentals', | |
default_args=default_args, | |
description='Uploads Sravz Stock fundamentals to S3', | |
schedule_interval='0 0 * * SAT', | |
start_date=datetime(2021, 1, 1), | |
catchup=False, | |
tags=['sravz'], | |
) as dag: | |
@task | |
def get_stock_fundamentals(tickers: list): | |
from src.services.stock import summary_stats | |
summary_engine = summary_stats.engine() | |
return summary_engine.get_fundamentals(tickers=tickers) | |
@task_group | |
def get_stock_fundamentals_group(tickers: list): | |
return get_stock_fundamentals(tickers=tickers) | |
from src.services.stock import summary_stats | |
from src.util import settings | |
summary_engine = summary_stats.engine() | |
ticker_list = summary_engine.get_all_fundamental_tickers() | |
n = int(len(ticker_list)/settings.constants.AIRFLOW_MAX_TASKS_IN_TASK_MAPPING) | |
tickers = [ticker_list[i:i + n] for i in range(0, len(ticker_list), n)] | |
_ = get_stock_fundamentals_group.expand(tickers=tickers) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
from datetime import datetime | |
from airflow import DAG | |
from airflow.decorators import task | |
from datetime import datetime, timedelta | |
from airflow import DAG | |
import sys | |
sys.path.append('/home/airflow/') | |
default_args = { | |
'owner': 'airflow', | |
'depends_on_past': False, | |
'email': ['contactus@sravz.com'], | |
'email_on_failure': True, | |
'email_on_retry': True, | |
'retries': 1, | |
'retry_delay': timedelta(minutes=5), | |
} | |
with DAG( | |
'sravz_quotes_summary_stats', | |
default_args=default_args, | |
description='Calculate quotes summary stats and upload to S3', | |
schedule_interval='0 1 * * *', | |
start_date=datetime(2021, 1, 1), | |
catchup=False, | |
tags=['sravz'], | |
) as dag: | |
@task | |
def get_single_ticker_stat(tickers: list): | |
from src.services.stock import summary_stats | |
summary_engine = summary_stats.engine() | |
return summary_engine.get_summary_stats(tickers=tickers) | |
@task | |
def upload_ticker_stats(status): | |
from src.services.stock import summary_stats | |
summary_engine = summary_stats.engine() | |
summary_engine.upload_final_stats_df() | |
from src.services.stock import summary_stats | |
from src.util import settings | |
summary_engine = summary_stats.engine() | |
ticker_list = summary_engine.get_all_tickers() | |
n = int(len(ticker_list)/settings.constants.AIRFLOW_MAX_TASKS_IN_TASK_MAPPING) | |
tickers = [ticker_list[i:i + n] for i in range(0, len(ticker_list), n)] | |
status = get_single_ticker_stat.expand(tickers=tickers) | |
upload_ticker_stats(status) |