Historical Ticker Plant

Use Case

Historical Ticker Plant

  • Store historical daily and intraday asset quotes and trades data
  • Use Polars and DuckBD to perform various analytics on the data

Tools Used

  • Polars
  • Duck DB

Dataflow Diagram

Session 1

Topics Discussed

  • Rust Application Setup
  • Interaction between NSQ and Rust Application (Demo)

Video explanation

Rust Sessioin 1

Source Code

https://github.com/sravzpublic/training/tree/master/training-rust/sravz

Session 2

Dataflow Diagram

Topics Discussed

  • Ticker plant internal components
    • Main Module
    • Router
    • Services
  • Demo of Leveraged Funds service usage

Video explanation

Rust Sessioin 2

Source Code

use crate::{models::Message, s3_module::S3Module};
use log::info;
use polars::prelude::*;
use std::error::Error;
use std::io::Cursor;
pub struct LeveragedFunds {
s3_module: S3Module,
}
impl LeveragedFunds {
pub fn new() -> Self {
// TODO: Check proper dependency injection
let s3_module = S3Module::new();
LeveragedFunds { s3_module }
}
pub async fn leverage_funds_nasdaq100(
&self,
message: Message,
) -> Result<Message, Box<dyn Error>> {
let bucket_name = "sravz-data";
// let object_key = "historical/etf_us_qqq.json";
// let object_keys = ["tqqq", "qld", "qqq"];
let object_keys = message.p_i.args.clone();
let mut dataframe_vector: Vec<DataFrame> = Vec::new();
for sravzid in object_keys {
let downloaded_content = self
.s3_module
.download_object(bucket_name, &format!("historical/{}.json", sravzid))
.await;
match self.s3_module.decompress_gzip(downloaded_content) {
Ok(decompressed_data) => {
let cursor: Cursor<Vec<u8>> = Cursor::new(decompressed_data);
let df = JsonReader::new(cursor).finish().unwrap();
let mut df = df.unnest(["Date"]).unwrap();
let df = df.rename("_isoformat", "Date").unwrap();
let mut df = df
.clone()
.lazy()
.select([
col("Date")
.str()
.to_datetime(
Some(TimeUnit::Microseconds),
None,
StrptimeOptions::default(),
lit("raise"),
)
.alias("DateTime"),
col("*"),
])
.drop_columns(["Date"])
.collect()?;
let old_cols: Vec<String> = df
.get_column_names()
.iter()
.map(|s| s.to_owned().to_owned())
.collect();
old_cols.iter().for_each(|old| {
if old != "DateTime" {
df.rename(old, &format!("{}_{}", sravzid, old))
.expect(format!("cannot rename column {}", old).as_str());
}
});
// let df_with_constant = df.apply(|name| format!("{}{}", constant_string, name));
dataframe_vector.push(df.clone());
info!("Dateframe Head {}", df.head(Some(10)));
info!("Dateframe Tail {}", df.tail(Some(10)));
dbg!(df);
}
Err(err) => {
info!("Error during decompression: {:?}", err);
}
}
}
// Join DataFrames in the vector on the "id" column
let mut joined_df: Option<DataFrame> = None;
for df in dataframe_vector {
let cloned_joined_df = joined_df.clone();
if let Some(joined) = cloned_joined_df {
let join_result = joined.inner_join(&df, ["DateTime"], ["DateTime"]);
match join_result {
Ok(joined_result) => {
joined_df = Some(joined_result);
}
Err(err) => {
eprintln!("Error during join: {:?}", err);
// Optionally return the error or handle it accordingly
}
}
} else {
joined_df = Some(df.clone());
}
}
// Display the final joined DataFrame
if let Some(joined) = joined_df {
println!("{:?}", joined);
}
return Ok(message);
}
}
#[cfg(test)]
mod tests {
use crate::{
leveraged_funds::LeveragedFunds,
models::{Kwargs, Message},
};
use chrono::Utc;
use log::{error, info};
#[tokio::test]
async fn test_leverage_funds_nasdaq100() {
let leveraged_funds = LeveragedFunds::new();
let leveraged_fund_result = leveraged_funds
.leverage_funds_nasdaq100(Message {
id: 1.0,
p_i: crate::models::PI {
args: vec![
"etf_us_tqqq".to_string(),
"etf_us_qld".to_string(),
"etf_us_qqq".to_string(),
],
kwargs: Kwargs {
device: String::new(),
upload_to_aws: true,
},
},
t_o: String::new(),
cid: String::new(),
cache_message: true,
stopic: String::new(),
ts: 1.0,
fun_n: String::new(),
date: Utc::now(),
e: String::new(),
key: String::new(),
exception_message: String::new(),
})
.await;
match leveraged_fund_result {
Ok(mut processed_message) => {
processed_message.date = Utc::now();
let message_body_str = &serde_json::to_string(&processed_message)
.expect("Failed to convert message to JSON string");
info!("Sending the processed message on NSQ {}", message_body_str);
}
Err(err) => {
error!("Router message processing error: {}", err);
}
}
}
}
mod helper;
mod leveraged_funds;
mod models;
mod mongo;
mod router;
mod s3_module;
use crate::{helper::sha256_hash, models::Message, router::Router};
use chrono::{Duration, Utc};
use env_logger::Env;
use gethostname::gethostname;
use log::{error, info};
use mongodb::Client;
use rand::seq::SliceRandom;
use std::collections::HashSet;
use std::error::Error;
use tokio;
use tokio_nsq::{NSQChannel, NSQConsumerConfig,
NSQConsumerConfigSources, NSQConsumerLookupConfig, NSQProducerConfig, NSQTopic,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
let consumer_topic_name = "training-rust";
let producer_topic_name = "training-node";
let consumer_topic =
NSQTopic::new(consumer_topic_name).expect("Failed to create consumer topic");
let producer_topic =
NSQTopic::new(producer_topic_name).expect("Failed to create producer topic");
let channel = NSQChannel::new(gethostname().to_string_lossy().to_string())
.expect("Failed to create NSQ channel");
let client = Client::with_uri_str("mongodb://sravz:sravz@mongo:27017/sravz")
.await
.expect("Unable to connect to MongoDB");
let mut addresses = HashSet::new();
// TODO: Convert this to config file
addresses.insert("http://nsqlookupd-1:4161".to_string());
addresses.insert("http://nsqlookupd-2:4161".to_string());
addresses.insert("http://nsqlookupd-3:4161".to_string());
info!(
"Listening to nsq topic {} - channel {:?}",
consumer_topic_name,
gethostname()
);
let mut consumer = NSQConsumerConfig::new(consumer_topic, channel)
.set_max_in_flight(15)
.set_sources(NSQConsumerConfigSources::Lookup(
NSQConsumerLookupConfig::new().set_addresses(addresses),
))
.build();
let mut rng: rand::prelude::ThreadRng = rand::thread_rng();
let nsqd_hosts = vec!["nsqd-1:4150", "nsqd-2:4150", "nsqd-3:4150"];
let mut nsqd_host = "";
if let Some(random_element) = nsqd_hosts.choose(&mut rng) {
nsqd_host = random_element;
info!("Connecting to nsqd host {}", nsqd_host)
} else {
println!("nsqd_hosts vector empty");
}
let mut producer = NSQProducerConfig::new(nsqd_host).build();
let router = Router::new();
loop {
let message = consumer
.consume_filtered()
.await
.expect("Failed to consume NSQ message");
let message_body_str =
std::str::from_utf8(&message.body).expect("Failed to get JSON string from NSQ Message");
let hashed_string = &sha256_hash(message_body_str);
info!(
"Message received on NSQ = {} - SHA-256 = {}",
message_body_str, hashed_string
);
let messages = router
.mongo
.find_by_key(hashed_string.to_string(), &client)
.await
.expect("Document not found");
/* If the message exists and inserted less than 24 hours back - resend the same message else reprocess the message */
if messages.len() > 0 && messages[0].date + Duration::days(1) > Utc::now() {
let message = &serde_json::to_string(&messages[0])
.expect("Failed to convert NSQ message to JSON string");
info!("Sending the existing message in mongodb {}", message);
producer
.publish(&producer_topic, message.as_bytes().to_vec())
.await
.expect("Failed to publish NSQ message");
} else {
info!("Processing the message...");
let result: Result<Message, serde_json::Error> = serde_json::from_str(message_body_str);
// Handle the result using pattern matching
match result {
Ok(mut _message) => {
let router_result = router.process_message(_message).await;
match router_result {
Ok(mut processed_message) => {
// let hashed_string: String = sha256_hash(message_body_str);
processed_message.date = Utc::now();
processed_message.key = hashed_string.to_string();
router
.mongo
.create(processed_message.clone(), &client)
.await;
let message_body_str = &serde_json::to_string(&processed_message)
.expect("Failed to convert message to JSON string");
info!("Sending the processed message on NSQ {}", message_body_str);
producer
.publish(&producer_topic, message_body_str.as_bytes().to_vec())
.await
.expect("Failed to publish NSQ message");
continue;
}
Err(err) => {
error!("Router message processing error: {}", err);
//message_body_str = &serde_json::to_string(*err).expect("Failed to convert message to JSON string");
}
}
}
Err(err) => {
error!("Deserialization failed: {}", err);
}
}
info!("Unable to process message {}", message_body_str);
producer
.publish(&producer_topic, message_body_str.as_bytes().to_vec())
.await
.expect("Failed to publish NSQ message");
}
// TODO: Wait until the message is acknowledged by NSQ
// assert_matches!(producer.consume().await.unwrap(), NSQEvent::Ok());
// producer.consume().await.expect("Failed to consume NSQ message");
// message.finish().await;
}
// Ok(())
}
view raw main.rs hosted with ❤ by GitHub
use crate::{leveraged_funds::LeveragedFunds, models::Message, mongo::Mongo};
use std::error::Error;
pub struct Router {
leveraged_funds: LeveragedFunds,
pub(crate) mongo: Mongo,
}
impl Router {
pub fn new() -> Self {
// TODO: Check proper dependency injection
let leveraged_funds = LeveragedFunds::new();
let mongo = Mongo {};
Router {
leveraged_funds,
mongo,
}
}
pub async fn process_message(&self, mut message: Message) -> Result<Message, Box<dyn Error>> {
match message.fun_n.as_str() {
"leverage_funds_nasdaq100" => {
self.leveraged_funds.leverage_funds_nasdaq100(message).await
}
// 2.0..=2.9 => self.leveraged_funds.leverage_funds_nasdaq100(message).await,
_ => {
message.exception_message = "Message ID not implemented".to_owned();
Err(Box::new(message))
}
}
}
}
impl Default for Router {
fn default() -> Self {
Self::new()
}
}
view raw router.rs hosted with ❤ by GitHub

Session 3

Dataflow Diagram

Topics Discussed

  • Py03 integration
  • Pass polars dataframe to Python
  • Review matplotlib output

Video explanation

Rust Sessioin 3

Source Code

import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import matplotlib.colors as mcolors
from statsmodels.tsa.stattools import adfuller
def process(price_df: pd.DataFrame, function_name: str):
col = 'etf_us_qqq_AdjustedClose'
sravzid = 'LeveragedFunds'
price_df = price_df.reset_index()
price_df.set_index('DateTime', inplace=True)
vertical_sections = 9
widthInch = 10
heightInch = vertical_sections * 5
fig = plt.figure(figsize=(widthInch, heightInch))
gs = gridspec.GridSpec(vertical_sections, 4, wspace=1, hspace=0.5)
gs.update(left=0.1, right=0.9, top=0.965, bottom=0.03)
chart_index = 0
ax_price_plot = plt.subplot(gs[chart_index, :])
price_df[col].plot(label=col, ax=ax_price_plot)
ax_price_plot.set_title(
'{0} {1} Price'.format(sravzid, col))
ax_price_plot.legend()
chart_index = chart_index + 1
ax_describe = plt.subplot(gs[chart_index, :])
ax_describe.axis('off')
describe_df = price_df[[x for x in price_df.columns.tolist() if 'AdjustedClose' in x]].describe().round(3)
font_size = 14
bbox = [0, 0, 1, 1]
mpl_table = ax_describe.table(
cellText=describe_df.values, rowLabels=describe_df.index, bbox=bbox, colLabels=describe_df.columns)
mpl_table.auto_set_font_size(False)
mpl_table.set_fontsize(font_size)
ax_describe.set_title("{0} Summary Statistics".format(sravzid))
chart_index = chart_index + 1
ax_rolling_mean_plot = plt.subplot(gs[chart_index, :])
#price_df[col].plot(label=col, ax=ax_rolling_mean_plot)
price_df[col].rolling(7).mean().plot(
label="7 days MA", ax=ax_rolling_mean_plot)
price_df[col].rolling(21).mean().plot(
label="21 days MA", ax=ax_rolling_mean_plot)
price_df[col].rolling(255).mean().plot(
label="255 days MA", ax=ax_rolling_mean_plot)
ax_rolling_mean_plot.set_title(
'{0} {1} Rolling Moving Average'.format(sravzid, col))
ax_rolling_mean_plot.legend()
chart_index = chart_index + 1
ax_rolling_std_plot = plt.subplot(gs[chart_index, :])
#price_df[col].plot(label=col, ax=ax_rolling_std_plot)
price_df[col].rolling(7).std().plot(
label="7 days Std", ax=ax_rolling_std_plot)
price_df[col].rolling(21).std().plot(
label="21 days Std", ax=ax_rolling_std_plot)
price_df[col].rolling(255).std().plot(
label="255 days Std", ax=ax_rolling_std_plot)
ax_rolling_std_plot.set_title(
'{0} {1} Rolling Moving Std'.format(sravzid, col))
ax_rolling_std_plot.legend()
chart_index = chart_index + 1
ax_df = plt.subplot(gs[chart_index, 1:])
series = price_df[col].dropna()
dftest = adfuller(series, autolag='AIC')
dfoutput = pd.Series(dftest[0:4], index=[
'Test Statistic', 'p-value', '#Lags Used', 'Number of Observations Used'])
for key, value in list(dftest[4].items()):
dfoutput['Critical Value (%s)' % key] = value
font_size = 14
bbox = [0, 0, 1, 1]
ax_df.axis('off')
df_test_df = dfoutput.to_frame()
mpl_table = ax_df.table(
cellText=df_test_df.values, rowLabels=df_test_df.index, bbox=bbox, colLabels=df_test_df.index)
mpl_table.auto_set_font_size(False)
mpl_table.set_fontsize(font_size)
ax_df.set_title("{0} {1} Price - Augmented Dickey Fuller Test - Complete History".format(sravzid, col), loc="left")
for years in [10,5,2,1]:
chart_index = chart_index + 1
ax_df = plt.subplot(gs[chart_index, 1:])
series = price_df[col].last(f"{years}Y").dropna()
dftest = adfuller(series, regression='ctt', autolag='AIC')
dfoutput = pd.Series(dftest[0:4], index=[
'Test Statistic', 'p-value', '#Lags Used', 'Number of Observations Used'])
for key, value in list(dftest[4].items()):
dfoutput['Critical Value (%s)' % key] = value
font_size = 14
bbox = [0, 0, 1, 1]
ax_df.axis('off')
df_test_df = dfoutput.to_frame()
mpl_table = ax_df.table(
cellText=df_test_df.values, rowLabels=df_test_df.index, bbox=bbox, colLabels=df_test_df.index)
mpl_table.auto_set_font_size(False)
mpl_table.set_fontsize(font_size)
ax_df.set_title("{0} {1} Price - Augmented Dickey Fuller Test - Last {2}Yrs".format(sravzid, col, years), loc="left")
plt.savefig(f"/tmp/{function_name}", bbox_inches='tight')
return f"Hello world from python!!! {price_df.head()}"
use crate::py03_service::run_py_module;
use crate::{models::Message, s3_module::S3Module};
use log::info;
use polars::prelude::*;
use std::error::Error;
use std::io::Cursor;
pub struct LeveragedFunds {
s3_module: S3Module,
}
impl LeveragedFunds {
pub fn new() -> Self {
// TODO: Check proper dependency injection
let s3_module = S3Module::new();
LeveragedFunds { s3_module }
}
pub async fn leverage_funds(&self, message: Message) -> Result<Message, Box<dyn Error>> {
let bucket_name = "sravz-data";
// let object_key = "historical/etf_us_qqq.json";
// let object_keys = ["tqqq", "qld", "qqq"];
let object_keys = message.p_i.args.clone();
let mut dataframe_vector: Vec<DataFrame> = Vec::new();
for sravzid in object_keys {
let downloaded_content = self
.s3_module
.download_object(bucket_name, &format!("historical/{}.json", sravzid), false)
.await;
match self.s3_module.decompress_gzip(downloaded_content) {
Ok(decompressed_data) => {
let cursor: Cursor<Vec<u8>> = Cursor::new(decompressed_data);
let df = JsonReader::new(cursor).finish().unwrap();
let mut df = df.unnest(["Date"]).unwrap();
let df = df.rename("_isoformat", "Date").unwrap();
let mut df = df
.clone()
.lazy()
.select([
col("Date")
.str()
.to_datetime(
Some(TimeUnit::Microseconds),
None,
StrptimeOptions::default(),
lit("raise"),
)
.alias("DateTime"),
col("*"),
])
.drop_columns(["Date"])
.collect()?;
let old_cols: Vec<String> = df
.get_column_names()
.iter()
.map(|s| s.to_owned().to_owned())
.collect();
old_cols.iter().for_each(|old| {
if old != "DateTime" {
df.rename(old, &format!("{}_{}", sravzid, old))
.expect(format!("cannot rename column {}", old).as_str());
}
});
// let df_with_constant = df.apply(|name| format!("{}{}", constant_string, name));
dataframe_vector.push(df.clone());
info!("Dateframe Head {}", df.head(Some(10)));
info!("Dateframe Tail {}", df.tail(Some(10)));
dbg!(df);
}
Err(err) => {
info!("Error during decompression: {:?}", err);
}
}
}
// Join DataFrames in the vector on the "id" column
let mut joined_df: Option<DataFrame> = None;
for df in dataframe_vector {
let cloned_joined_df = joined_df.clone();
if let Some(joined) = cloned_joined_df {
let join_result = joined.inner_join(&df, ["DateTime"], ["DateTime"]);
match join_result {
Ok(joined_result) => {
joined_df = Some(joined_result);
}
Err(err) => {
eprintln!("Error during join: {:?}", err);
// Optionally return the error or handle it accordingly
}
}
} else {
joined_df = Some(df.clone());
}
}
// Display the final joined DataFrame
if let Some(joined) = joined_df {
println!("{:?}", joined);
match run_py_module(joined, message.fun_n.clone()) {
Ok(_) => {
println!("Python code executed successfully");
self.s3_module
.upload_file("sravz", "rust-backend/leveraged_funds.png", "/tmp/leveraged_funds.png")
.await;
},
Err(err) => eprintln!("Error executing Python code: {:?}", err),
}
}
// Convert the DataFrame to a JSON string
//let df_json = serde_json::to_string(joined_df).expect("Failed to serialize DataFrame to JSON");
// Run python code
// Initialize the Python interpreter
// let gil = Python::acquire_gil();
// let py = gil.python();
return Ok(message);
}
}
#[cfg(test)]
mod tests {
use crate::{
leveraged_funds::LeveragedFunds,
models::{Kwargs, Message},
};
use chrono::Utc;
use log::{error, info};
#[tokio::test]
async fn test_leverage_funds() {
let leveraged_funds = LeveragedFunds::new();
let leveraged_fund_result = leveraged_funds
.leverage_funds(Message {
id: 1.0,
p_i: crate::models::PI {
args: vec![
"etf_us_tqqq".to_string(),
"etf_us_qld".to_string(),
"etf_us_qqq".to_string(),
],
kwargs: Kwargs {
device: String::new(),
upload_to_aws: true,
},
},
t_o: String::new(),
cid: String::new(),
cache_message: true,
stopic: String::new(),
ts: 1.0,
fun_n: "leveraged_funds".to_string(),
date: Utc::now(),
e: String::new(),
key: String::new(),
exception_message: String::new(),
})
.await;
match leveraged_fund_result {
Ok(mut processed_message) => {
processed_message.date = Utc::now();
let message_body_str = &serde_json::to_string(&processed_message)
.expect("Failed to convert message to JSON string");
info!("Sending the processed message on NSQ {}", message_body_str);
}
Err(err) => {
error!("Router message processing error: {}", err);
}
}
}
}
import os
import pandas as pd
from pathlib import Path
import leveraged_funds
def run(function_name, slope=0.01):
file_path = f"/tmp/{function_name}.parquet"
path = Path(file_path)
if path.is_file():
df = pd.read_parquet(file_path)
leveraged_funds.process(df, function_name)
try:
os.remove(file_path)
except FileNotFoundError:
print(f'{file_path} does not exist. No need to delete.')
except Exception as e:
print(f'An error occurred: {e}')
return f"Hello world from python!!! {df.head()}"
else:
print(f'The file {file_path} does not exist.')
return None
view raw main.py hosted with ❤ by GitHub
use polars::prelude::*;
use pyo3::prelude::*;
use pyo3::types::IntoPyDict;
use pyo3::types::PyTuple;
pub(crate) fn run_py_module(mut df: DataFrame, function_name: String) -> PyResult<()> {
let mut file = std::fs::File::create(format!("/tmp/{}.parquet", function_name)).unwrap();
ParquetWriter::new(&mut file).finish(&mut df).unwrap();
Python::with_gil(|py| {
let activators = PyModule::from_code(
py,
r#"
import sys
sys.path.append("/workspace/backend-rust/src/py")
def run(x, slope=0.01):
from main import run
return run(x, slope=slope)
"#,
"activators.py",
"activators",
)?;
// let relu_result: f64 = activators.getattr("relu")?.call1((-1.0,))?.extract()?;
// assert_eq!(relu_result, 0.0);
// println!("This is function name {}", function_name);
let args = PyTuple::new(py, &[function_name]);
let kwargs = [("slope", 0.2)].into_py_dict(py);
let py_result: String = activators
.getattr("run")?
.call(args, Some(kwargs))?
.extract()?;
// assert_eq!(py_result, -0.2);
println!("This is from python {}", py_result);
Ok(())
})
}
view raw py03_service.rs hosted with ❤ by GitHub

Session 4

Dataflow Diagram

Topics Discussed

  • Config struct to parse environment variables
  • Ticker plant send the output on the NSQ backend-node topic

Video explanation

Rust Sessioin 3

Session 5

Dataflow Diagram

Topics Discussed

  • Demo:
    • Angular UI sends a message to the backend-node over socket.io websockets
    • Backend-node server receives the message on the socket.io websocket
    • Backed-node sends the message to NSQ Server on the backend-rust topic
    • Backend-rust receives the message on the NSQ Server backend-rust topic
    • Backend-rust process the message
    • Backend-rust send the response to NSQ Server on the backend-node topic
    • Backend-node receives the response message on the backend-node topic
    • Backend-node sends the response message on WebSocket Socket.IO to Angular UI
    • Angular UI Debug Console shows the message received from the backend-rust
  • Added config.toml support to pass extra config data to backend-rust
  • Removed .expect and .unwrap usage which cause panics
  • Use match pattern to match enun result pattern
  • Error handling using Box<dyn Error>

Video explanation

Rust Sessioin 5

References

Polars
NSQ
Socket.IO
Py03