Ticker Fundamentals Uploader

Use Case

Airflow -> AWS S3 (JSON/Parquet File)

Use this program to upload stock fundamentals data and stock quotes stats to AWS S3. Airflow Dag submits 5000+ tickers to Airflow Celery works using Task/Task Group Mapping. Task groups/tasks perform http get from the service provider and upload the data to AWS S3.

Libraries/Tools Used

  • Airflow
  • AWS Data Wrangler

Dataflow Diagram

Airflow Architecture Airflow Arhitecture Source: Airflow Documentation

Task Mapping Flow

Airflow Task Mapping

Task Group

Tasks in a group

Output File

Session 1

  • Discuss environment setup
  • Airflow Task and Task Group
  • Highlevel discuss of Dask Vs Airflow Task Mapping

Video explanation of the code

Airflow-Task-Mapping-YouTube

Source Code

from __future__ import annotations
from datetime import datetime
from airflow import DAG
from airflow.decorators import task_group, task
from datetime import datetime, timedelta
from airflow import DAG
import sys
sys.path.append('/home/airflow/')
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['contactus@sravz.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'sravz_upload_stock_fundamentals',
default_args=default_args,
description='Uploads Sravz Stock fundamentals to S3',
schedule_interval='0 0 * * SAT',
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['sravz'],
) as dag:
@task
def get_stock_fundamentals(tickers: list):
from src.services.stock import summary_stats
summary_engine = summary_stats.engine()
return summary_engine.get_fundamentals(tickers=tickers)
@task_group
def get_stock_fundamentals_group(tickers: list):
return get_stock_fundamentals(tickers=tickers)
from src.services.stock import summary_stats
from src.util import settings
summary_engine = summary_stats.engine()
ticker_list = summary_engine.get_all_fundamental_tickers()
n = int(len(ticker_list)/settings.constants.AIRFLOW_MAX_TASKS_IN_TASK_MAPPING)
tickers = [ticker_list[i:i + n] for i in range(0, len(ticker_list), n)]
_ = get_stock_fundamentals_group.expand(tickers=tickers)
from __future__ import annotations
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from datetime import datetime, timedelta
from airflow import DAG
import sys
sys.path.append('/home/airflow/')
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['contactus@sravz.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'sravz_quotes_summary_stats',
default_args=default_args,
description='Calculate quotes summary stats and upload to S3',
schedule_interval='0 1 * * *',
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['sravz'],
) as dag:
@task
def get_single_ticker_stat(tickers: list):
from src.services.stock import summary_stats
summary_engine = summary_stats.engine()
return summary_engine.get_summary_stats(tickers=tickers)
@task
def upload_ticker_stats(status):
from src.services.stock import summary_stats
summary_engine = summary_stats.engine()
summary_engine.upload_final_stats_df()
from src.services.stock import summary_stats
from src.util import settings
summary_engine = summary_stats.engine()
ticker_list = summary_engine.get_all_tickers()
n = int(len(ticker_list)/settings.constants.AIRFLOW_MAX_TASKS_IN_TASK_MAPPING)
tickers = [ticker_list[i:i + n] for i in range(0, len(ticker_list), n)]
status = get_single_ticker_stat.expand(tickers=tickers)
upload_ticker_stats(status)

References

https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html#mapping-over-a-task-group