Use Case
Historical Ticker Plant
- Store historical daily and intraday asset quotes and trades data
- Use Polars and DuckBD to perform various analytics on the data
Tools Used
- Polars
- Duck DB
Dataflow Diagram

Session 1
Topics Discussed
- Rust Application Setup
- Interaction between NSQ and Rust Application (Demo)
Video explanation
Source Code
https://github.com/sravzpublic/training/tree/master/training-rust/sravz
Session 2
Dataflow Diagram

Topics Discussed
- Ticker plant internal components
- Main Module
- Router
- Services
- Demo of Leveraged Funds service usage
Video explanation
Source Code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use crate::{models::Message, s3_module::S3Module}; | |
use log::info; | |
use polars::prelude::*; | |
use std::error::Error; | |
use std::io::Cursor; | |
pub struct LeveragedFunds { | |
s3_module: S3Module, | |
} | |
impl LeveragedFunds { | |
pub fn new() -> Self { | |
// TODO: Check proper dependency injection | |
let s3_module = S3Module::new(); | |
LeveragedFunds { s3_module } | |
} | |
pub async fn leverage_funds_nasdaq100( | |
&self, | |
message: Message, | |
) -> Result<Message, Box<dyn Error>> { | |
let bucket_name = "sravz-data"; | |
// let object_key = "historical/etf_us_qqq.json"; | |
// let object_keys = ["tqqq", "qld", "qqq"]; | |
let object_keys = message.p_i.args.clone(); | |
let mut dataframe_vector: Vec<DataFrame> = Vec::new(); | |
for sravzid in object_keys { | |
let downloaded_content = self | |
.s3_module | |
.download_object(bucket_name, &format!("historical/{}.json", sravzid)) | |
.await; | |
match self.s3_module.decompress_gzip(downloaded_content) { | |
Ok(decompressed_data) => { | |
let cursor: Cursor<Vec<u8>> = Cursor::new(decompressed_data); | |
let df = JsonReader::new(cursor).finish().unwrap(); | |
let mut df = df.unnest(["Date"]).unwrap(); | |
let df = df.rename("_isoformat", "Date").unwrap(); | |
let mut df = df | |
.clone() | |
.lazy() | |
.select([ | |
col("Date") | |
.str() | |
.to_datetime( | |
Some(TimeUnit::Microseconds), | |
None, | |
StrptimeOptions::default(), | |
lit("raise"), | |
) | |
.alias("DateTime"), | |
col("*"), | |
]) | |
.drop_columns(["Date"]) | |
.collect()?; | |
let old_cols: Vec<String> = df | |
.get_column_names() | |
.iter() | |
.map(|s| s.to_owned().to_owned()) | |
.collect(); | |
old_cols.iter().for_each(|old| { | |
if old != "DateTime" { | |
df.rename(old, &format!("{}_{}", sravzid, old)) | |
.expect(format!("cannot rename column {}", old).as_str()); | |
} | |
}); | |
// let df_with_constant = df.apply(|name| format!("{}{}", constant_string, name)); | |
dataframe_vector.push(df.clone()); | |
info!("Dateframe Head {}", df.head(Some(10))); | |
info!("Dateframe Tail {}", df.tail(Some(10))); | |
dbg!(df); | |
} | |
Err(err) => { | |
info!("Error during decompression: {:?}", err); | |
} | |
} | |
} | |
// Join DataFrames in the vector on the "id" column | |
let mut joined_df: Option<DataFrame> = None; | |
for df in dataframe_vector { | |
let cloned_joined_df = joined_df.clone(); | |
if let Some(joined) = cloned_joined_df { | |
let join_result = joined.inner_join(&df, ["DateTime"], ["DateTime"]); | |
match join_result { | |
Ok(joined_result) => { | |
joined_df = Some(joined_result); | |
} | |
Err(err) => { | |
eprintln!("Error during join: {:?}", err); | |
// Optionally return the error or handle it accordingly | |
} | |
} | |
} else { | |
joined_df = Some(df.clone()); | |
} | |
} | |
// Display the final joined DataFrame | |
if let Some(joined) = joined_df { | |
println!("{:?}", joined); | |
} | |
return Ok(message); | |
} | |
} | |
#[cfg(test)] | |
mod tests { | |
use crate::{ | |
leveraged_funds::LeveragedFunds, | |
models::{Kwargs, Message}, | |
}; | |
use chrono::Utc; | |
use log::{error, info}; | |
#[tokio::test] | |
async fn test_leverage_funds_nasdaq100() { | |
let leveraged_funds = LeveragedFunds::new(); | |
let leveraged_fund_result = leveraged_funds | |
.leverage_funds_nasdaq100(Message { | |
id: 1.0, | |
p_i: crate::models::PI { | |
args: vec![ | |
"etf_us_tqqq".to_string(), | |
"etf_us_qld".to_string(), | |
"etf_us_qqq".to_string(), | |
], | |
kwargs: Kwargs { | |
device: String::new(), | |
upload_to_aws: true, | |
}, | |
}, | |
t_o: String::new(), | |
cid: String::new(), | |
cache_message: true, | |
stopic: String::new(), | |
ts: 1.0, | |
fun_n: String::new(), | |
date: Utc::now(), | |
e: String::new(), | |
key: String::new(), | |
exception_message: String::new(), | |
}) | |
.await; | |
match leveraged_fund_result { | |
Ok(mut processed_message) => { | |
processed_message.date = Utc::now(); | |
let message_body_str = &serde_json::to_string(&processed_message) | |
.expect("Failed to convert message to JSON string"); | |
info!("Sending the processed message on NSQ {}", message_body_str); | |
} | |
Err(err) => { | |
error!("Router message processing error: {}", err); | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mod helper; | |
mod leveraged_funds; | |
mod models; | |
mod mongo; | |
mod router; | |
mod s3_module; | |
use crate::{helper::sha256_hash, models::Message, router::Router}; | |
use chrono::{Duration, Utc}; | |
use env_logger::Env; | |
use gethostname::gethostname; | |
use log::{error, info}; | |
use mongodb::Client; | |
use rand::seq::SliceRandom; | |
use std::collections::HashSet; | |
use std::error::Error; | |
use tokio; | |
use tokio_nsq::{NSQChannel, NSQConsumerConfig, | |
NSQConsumerConfigSources, NSQConsumerLookupConfig, NSQProducerConfig, NSQTopic, | |
}; | |
#[tokio::main] | |
async fn main() -> Result<(), Box<dyn Error>> { | |
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); | |
let consumer_topic_name = "training-rust"; | |
let producer_topic_name = "training-node"; | |
let consumer_topic = | |
NSQTopic::new(consumer_topic_name).expect("Failed to create consumer topic"); | |
let producer_topic = | |
NSQTopic::new(producer_topic_name).expect("Failed to create producer topic"); | |
let channel = NSQChannel::new(gethostname().to_string_lossy().to_string()) | |
.expect("Failed to create NSQ channel"); | |
let client = Client::with_uri_str("mongodb://sravz:sravz@mongo:27017/sravz") | |
.await | |
.expect("Unable to connect to MongoDB"); | |
let mut addresses = HashSet::new(); | |
// TODO: Convert this to config file | |
addresses.insert("http://nsqlookupd-1:4161".to_string()); | |
addresses.insert("http://nsqlookupd-2:4161".to_string()); | |
addresses.insert("http://nsqlookupd-3:4161".to_string()); | |
info!( | |
"Listening to nsq topic {} - channel {:?}", | |
consumer_topic_name, | |
gethostname() | |
); | |
let mut consumer = NSQConsumerConfig::new(consumer_topic, channel) | |
.set_max_in_flight(15) | |
.set_sources(NSQConsumerConfigSources::Lookup( | |
NSQConsumerLookupConfig::new().set_addresses(addresses), | |
)) | |
.build(); | |
let mut rng: rand::prelude::ThreadRng = rand::thread_rng(); | |
let nsqd_hosts = vec!["nsqd-1:4150", "nsqd-2:4150", "nsqd-3:4150"]; | |
let mut nsqd_host = ""; | |
if let Some(random_element) = nsqd_hosts.choose(&mut rng) { | |
nsqd_host = random_element; | |
info!("Connecting to nsqd host {}", nsqd_host) | |
} else { | |
println!("nsqd_hosts vector empty"); | |
} | |
let mut producer = NSQProducerConfig::new(nsqd_host).build(); | |
let router = Router::new(); | |
loop { | |
let message = consumer | |
.consume_filtered() | |
.await | |
.expect("Failed to consume NSQ message"); | |
let message_body_str = | |
std::str::from_utf8(&message.body).expect("Failed to get JSON string from NSQ Message"); | |
let hashed_string = &sha256_hash(message_body_str); | |
info!( | |
"Message received on NSQ = {} - SHA-256 = {}", | |
message_body_str, hashed_string | |
); | |
let messages = router | |
.mongo | |
.find_by_key(hashed_string.to_string(), &client) | |
.await | |
.expect("Document not found"); | |
/* If the message exists and inserted less than 24 hours back - resend the same message else reprocess the message */ | |
if messages.len() > 0 && messages[0].date + Duration::days(1) > Utc::now() { | |
let message = &serde_json::to_string(&messages[0]) | |
.expect("Failed to convert NSQ message to JSON string"); | |
info!("Sending the existing message in mongodb {}", message); | |
producer | |
.publish(&producer_topic, message.as_bytes().to_vec()) | |
.await | |
.expect("Failed to publish NSQ message"); | |
} else { | |
info!("Processing the message..."); | |
let result: Result<Message, serde_json::Error> = serde_json::from_str(message_body_str); | |
// Handle the result using pattern matching | |
match result { | |
Ok(mut _message) => { | |
let router_result = router.process_message(_message).await; | |
match router_result { | |
Ok(mut processed_message) => { | |
// let hashed_string: String = sha256_hash(message_body_str); | |
processed_message.date = Utc::now(); | |
processed_message.key = hashed_string.to_string(); | |
router | |
.mongo | |
.create(processed_message.clone(), &client) | |
.await; | |
let message_body_str = &serde_json::to_string(&processed_message) | |
.expect("Failed to convert message to JSON string"); | |
info!("Sending the processed message on NSQ {}", message_body_str); | |
producer | |
.publish(&producer_topic, message_body_str.as_bytes().to_vec()) | |
.await | |
.expect("Failed to publish NSQ message"); | |
continue; | |
} | |
Err(err) => { | |
error!("Router message processing error: {}", err); | |
//message_body_str = &serde_json::to_string(*err).expect("Failed to convert message to JSON string"); | |
} | |
} | |
} | |
Err(err) => { | |
error!("Deserialization failed: {}", err); | |
} | |
} | |
info!("Unable to process message {}", message_body_str); | |
producer | |
.publish(&producer_topic, message_body_str.as_bytes().to_vec()) | |
.await | |
.expect("Failed to publish NSQ message"); | |
} | |
// TODO: Wait until the message is acknowledged by NSQ | |
// assert_matches!(producer.consume().await.unwrap(), NSQEvent::Ok()); | |
// producer.consume().await.expect("Failed to consume NSQ message"); | |
// message.finish().await; | |
} | |
// Ok(()) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use crate::{leveraged_funds::LeveragedFunds, models::Message, mongo::Mongo}; | |
use std::error::Error; | |
pub struct Router { | |
leveraged_funds: LeveragedFunds, | |
pub(crate) mongo: Mongo, | |
} | |
impl Router { | |
pub fn new() -> Self { | |
// TODO: Check proper dependency injection | |
let leveraged_funds = LeveragedFunds::new(); | |
let mongo = Mongo {}; | |
Router { | |
leveraged_funds, | |
mongo, | |
} | |
} | |
pub async fn process_message(&self, mut message: Message) -> Result<Message, Box<dyn Error>> { | |
match message.fun_n.as_str() { | |
"leverage_funds_nasdaq100" => { | |
self.leveraged_funds.leverage_funds_nasdaq100(message).await | |
} | |
// 2.0..=2.9 => self.leveraged_funds.leverage_funds_nasdaq100(message).await, | |
_ => { | |
message.exception_message = "Message ID not implemented".to_owned(); | |
Err(Box::new(message)) | |
} | |
} | |
} | |
} | |
impl Default for Router { | |
fn default() -> Self { | |
Self::new() | |
} | |
} |
Session 3
Dataflow Diagram

Topics Discussed
- Py03 integration
- Pass polars dataframe to Python
- Review matplotlib output
Video explanation
Source Code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import matplotlib.pyplot as plt | |
import matplotlib.pyplot as plt | |
import matplotlib.gridspec as gridspec | |
import matplotlib.colors as mcolors | |
from statsmodels.tsa.stattools import adfuller | |
def process(price_df: pd.DataFrame, function_name: str): | |
col = 'etf_us_qqq_AdjustedClose' | |
sravzid = 'LeveragedFunds' | |
price_df = price_df.reset_index() | |
price_df.set_index('DateTime', inplace=True) | |
vertical_sections = 9 | |
widthInch = 10 | |
heightInch = vertical_sections * 5 | |
fig = plt.figure(figsize=(widthInch, heightInch)) | |
gs = gridspec.GridSpec(vertical_sections, 4, wspace=1, hspace=0.5) | |
gs.update(left=0.1, right=0.9, top=0.965, bottom=0.03) | |
chart_index = 0 | |
ax_price_plot = plt.subplot(gs[chart_index, :]) | |
price_df[col].plot(label=col, ax=ax_price_plot) | |
ax_price_plot.set_title( | |
'{0} {1} Price'.format(sravzid, col)) | |
ax_price_plot.legend() | |
chart_index = chart_index + 1 | |
ax_describe = plt.subplot(gs[chart_index, :]) | |
ax_describe.axis('off') | |
describe_df = price_df[[x for x in price_df.columns.tolist() if 'AdjustedClose' in x]].describe().round(3) | |
font_size = 14 | |
bbox = [0, 0, 1, 1] | |
mpl_table = ax_describe.table( | |
cellText=describe_df.values, rowLabels=describe_df.index, bbox=bbox, colLabels=describe_df.columns) | |
mpl_table.auto_set_font_size(False) | |
mpl_table.set_fontsize(font_size) | |
ax_describe.set_title("{0} Summary Statistics".format(sravzid)) | |
chart_index = chart_index + 1 | |
ax_rolling_mean_plot = plt.subplot(gs[chart_index, :]) | |
#price_df[col].plot(label=col, ax=ax_rolling_mean_plot) | |
price_df[col].rolling(7).mean().plot( | |
label="7 days MA", ax=ax_rolling_mean_plot) | |
price_df[col].rolling(21).mean().plot( | |
label="21 days MA", ax=ax_rolling_mean_plot) | |
price_df[col].rolling(255).mean().plot( | |
label="255 days MA", ax=ax_rolling_mean_plot) | |
ax_rolling_mean_plot.set_title( | |
'{0} {1} Rolling Moving Average'.format(sravzid, col)) | |
ax_rolling_mean_plot.legend() | |
chart_index = chart_index + 1 | |
ax_rolling_std_plot = plt.subplot(gs[chart_index, :]) | |
#price_df[col].plot(label=col, ax=ax_rolling_std_plot) | |
price_df[col].rolling(7).std().plot( | |
label="7 days Std", ax=ax_rolling_std_plot) | |
price_df[col].rolling(21).std().plot( | |
label="21 days Std", ax=ax_rolling_std_plot) | |
price_df[col].rolling(255).std().plot( | |
label="255 days Std", ax=ax_rolling_std_plot) | |
ax_rolling_std_plot.set_title( | |
'{0} {1} Rolling Moving Std'.format(sravzid, col)) | |
ax_rolling_std_plot.legend() | |
chart_index = chart_index + 1 | |
ax_df = plt.subplot(gs[chart_index, 1:]) | |
series = price_df[col].dropna() | |
dftest = adfuller(series, autolag='AIC') | |
dfoutput = pd.Series(dftest[0:4], index=[ | |
'Test Statistic', 'p-value', '#Lags Used', 'Number of Observations Used']) | |
for key, value in list(dftest[4].items()): | |
dfoutput['Critical Value (%s)' % key] = value | |
font_size = 14 | |
bbox = [0, 0, 1, 1] | |
ax_df.axis('off') | |
df_test_df = dfoutput.to_frame() | |
mpl_table = ax_df.table( | |
cellText=df_test_df.values, rowLabels=df_test_df.index, bbox=bbox, colLabels=df_test_df.index) | |
mpl_table.auto_set_font_size(False) | |
mpl_table.set_fontsize(font_size) | |
ax_df.set_title("{0} {1} Price - Augmented Dickey Fuller Test - Complete History".format(sravzid, col), loc="left") | |
for years in [10,5,2,1]: | |
chart_index = chart_index + 1 | |
ax_df = plt.subplot(gs[chart_index, 1:]) | |
series = price_df[col].last(f"{years}Y").dropna() | |
dftest = adfuller(series, regression='ctt', autolag='AIC') | |
dfoutput = pd.Series(dftest[0:4], index=[ | |
'Test Statistic', 'p-value', '#Lags Used', 'Number of Observations Used']) | |
for key, value in list(dftest[4].items()): | |
dfoutput['Critical Value (%s)' % key] = value | |
font_size = 14 | |
bbox = [0, 0, 1, 1] | |
ax_df.axis('off') | |
df_test_df = dfoutput.to_frame() | |
mpl_table = ax_df.table( | |
cellText=df_test_df.values, rowLabels=df_test_df.index, bbox=bbox, colLabels=df_test_df.index) | |
mpl_table.auto_set_font_size(False) | |
mpl_table.set_fontsize(font_size) | |
ax_df.set_title("{0} {1} Price - Augmented Dickey Fuller Test - Last {2}Yrs".format(sravzid, col, years), loc="left") | |
plt.savefig(f"/tmp/{function_name}", bbox_inches='tight') | |
return f"Hello world from python!!! {price_df.head()}" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use crate::py03_service::run_py_module; | |
use crate::{models::Message, s3_module::S3Module}; | |
use log::info; | |
use polars::prelude::*; | |
use std::error::Error; | |
use std::io::Cursor; | |
pub struct LeveragedFunds { | |
s3_module: S3Module, | |
} | |
impl LeveragedFunds { | |
pub fn new() -> Self { | |
// TODO: Check proper dependency injection | |
let s3_module = S3Module::new(); | |
LeveragedFunds { s3_module } | |
} | |
pub async fn leverage_funds(&self, message: Message) -> Result<Message, Box<dyn Error>> { | |
let bucket_name = "sravz-data"; | |
// let object_key = "historical/etf_us_qqq.json"; | |
// let object_keys = ["tqqq", "qld", "qqq"]; | |
let object_keys = message.p_i.args.clone(); | |
let mut dataframe_vector: Vec<DataFrame> = Vec::new(); | |
for sravzid in object_keys { | |
let downloaded_content = self | |
.s3_module | |
.download_object(bucket_name, &format!("historical/{}.json", sravzid), false) | |
.await; | |
match self.s3_module.decompress_gzip(downloaded_content) { | |
Ok(decompressed_data) => { | |
let cursor: Cursor<Vec<u8>> = Cursor::new(decompressed_data); | |
let df = JsonReader::new(cursor).finish().unwrap(); | |
let mut df = df.unnest(["Date"]).unwrap(); | |
let df = df.rename("_isoformat", "Date").unwrap(); | |
let mut df = df | |
.clone() | |
.lazy() | |
.select([ | |
col("Date") | |
.str() | |
.to_datetime( | |
Some(TimeUnit::Microseconds), | |
None, | |
StrptimeOptions::default(), | |
lit("raise"), | |
) | |
.alias("DateTime"), | |
col("*"), | |
]) | |
.drop_columns(["Date"]) | |
.collect()?; | |
let old_cols: Vec<String> = df | |
.get_column_names() | |
.iter() | |
.map(|s| s.to_owned().to_owned()) | |
.collect(); | |
old_cols.iter().for_each(|old| { | |
if old != "DateTime" { | |
df.rename(old, &format!("{}_{}", sravzid, old)) | |
.expect(format!("cannot rename column {}", old).as_str()); | |
} | |
}); | |
// let df_with_constant = df.apply(|name| format!("{}{}", constant_string, name)); | |
dataframe_vector.push(df.clone()); | |
info!("Dateframe Head {}", df.head(Some(10))); | |
info!("Dateframe Tail {}", df.tail(Some(10))); | |
dbg!(df); | |
} | |
Err(err) => { | |
info!("Error during decompression: {:?}", err); | |
} | |
} | |
} | |
// Join DataFrames in the vector on the "id" column | |
let mut joined_df: Option<DataFrame> = None; | |
for df in dataframe_vector { | |
let cloned_joined_df = joined_df.clone(); | |
if let Some(joined) = cloned_joined_df { | |
let join_result = joined.inner_join(&df, ["DateTime"], ["DateTime"]); | |
match join_result { | |
Ok(joined_result) => { | |
joined_df = Some(joined_result); | |
} | |
Err(err) => { | |
eprintln!("Error during join: {:?}", err); | |
// Optionally return the error or handle it accordingly | |
} | |
} | |
} else { | |
joined_df = Some(df.clone()); | |
} | |
} | |
// Display the final joined DataFrame | |
if let Some(joined) = joined_df { | |
println!("{:?}", joined); | |
match run_py_module(joined, message.fun_n.clone()) { | |
Ok(_) => { | |
println!("Python code executed successfully"); | |
self.s3_module | |
.upload_file("sravz", "rust-backend/leveraged_funds.png", "/tmp/leveraged_funds.png") | |
.await; | |
}, | |
Err(err) => eprintln!("Error executing Python code: {:?}", err), | |
} | |
} | |
// Convert the DataFrame to a JSON string | |
//let df_json = serde_json::to_string(joined_df).expect("Failed to serialize DataFrame to JSON"); | |
// Run python code | |
// Initialize the Python interpreter | |
// let gil = Python::acquire_gil(); | |
// let py = gil.python(); | |
return Ok(message); | |
} | |
} | |
#[cfg(test)] | |
mod tests { | |
use crate::{ | |
leveraged_funds::LeveragedFunds, | |
models::{Kwargs, Message}, | |
}; | |
use chrono::Utc; | |
use log::{error, info}; | |
#[tokio::test] | |
async fn test_leverage_funds() { | |
let leveraged_funds = LeveragedFunds::new(); | |
let leveraged_fund_result = leveraged_funds | |
.leverage_funds(Message { | |
id: 1.0, | |
p_i: crate::models::PI { | |
args: vec![ | |
"etf_us_tqqq".to_string(), | |
"etf_us_qld".to_string(), | |
"etf_us_qqq".to_string(), | |
], | |
kwargs: Kwargs { | |
device: String::new(), | |
upload_to_aws: true, | |
}, | |
}, | |
t_o: String::new(), | |
cid: String::new(), | |
cache_message: true, | |
stopic: String::new(), | |
ts: 1.0, | |
fun_n: "leveraged_funds".to_string(), | |
date: Utc::now(), | |
e: String::new(), | |
key: String::new(), | |
exception_message: String::new(), | |
}) | |
.await; | |
match leveraged_fund_result { | |
Ok(mut processed_message) => { | |
processed_message.date = Utc::now(); | |
let message_body_str = &serde_json::to_string(&processed_message) | |
.expect("Failed to convert message to JSON string"); | |
info!("Sending the processed message on NSQ {}", message_body_str); | |
} | |
Err(err) => { | |
error!("Router message processing error: {}", err); | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import pandas as pd | |
from pathlib import Path | |
import leveraged_funds | |
def run(function_name, slope=0.01): | |
file_path = f"/tmp/{function_name}.parquet" | |
path = Path(file_path) | |
if path.is_file(): | |
df = pd.read_parquet(file_path) | |
leveraged_funds.process(df, function_name) | |
try: | |
os.remove(file_path) | |
except FileNotFoundError: | |
print(f'{file_path} does not exist. No need to delete.') | |
except Exception as e: | |
print(f'An error occurred: {e}') | |
return f"Hello world from python!!! {df.head()}" | |
else: | |
print(f'The file {file_path} does not exist.') | |
return None | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use polars::prelude::*; | |
use pyo3::prelude::*; | |
use pyo3::types::IntoPyDict; | |
use pyo3::types::PyTuple; | |
pub(crate) fn run_py_module(mut df: DataFrame, function_name: String) -> PyResult<()> { | |
let mut file = std::fs::File::create(format!("/tmp/{}.parquet", function_name)).unwrap(); | |
ParquetWriter::new(&mut file).finish(&mut df).unwrap(); | |
Python::with_gil(|py| { | |
let activators = PyModule::from_code( | |
py, | |
r#" | |
import sys | |
sys.path.append("/workspace/backend-rust/src/py") | |
def run(x, slope=0.01): | |
from main import run | |
return run(x, slope=slope) | |
"#, | |
"activators.py", | |
"activators", | |
)?; | |
// let relu_result: f64 = activators.getattr("relu")?.call1((-1.0,))?.extract()?; | |
// assert_eq!(relu_result, 0.0); | |
// println!("This is function name {}", function_name); | |
let args = PyTuple::new(py, &[function_name]); | |
let kwargs = [("slope", 0.2)].into_py_dict(py); | |
let py_result: String = activators | |
.getattr("run")? | |
.call(args, Some(kwargs))? | |
.extract()?; | |
// assert_eq!(py_result, -0.2); | |
println!("This is from python {}", py_result); | |
Ok(()) | |
}) | |
} |
Session 4
Dataflow Diagram

Topics Discussed
- Config struct to parse environment variables
- Ticker plant send the output on the NSQ backend-node topic
Video explanation
Session 5
Dataflow Diagram

Topics Discussed
- Demo:
- Angular UI sends a message to the backend-node over socket.io websockets
- Backend-node server receives the message on the socket.io websocket
- Backed-node sends the message to NSQ Server on the backend-rust topic
- Backend-rust receives the message on the NSQ Server backend-rust topic
- Backend-rust process the message
- Backend-rust send the response to NSQ Server on the backend-node topic
- Backend-node receives the response message on the backend-node topic
- Backend-node sends the response message on WebSocket Socket.IO to Angular UI
- Angular UI Debug Console shows the message received from the backend-rust
- Added config.toml support to pass extra config data to backend-rust
- Removed .expect and .unwrap usage which cause panics
- Use match pattern to match enun result pattern
- Error handling using
Box<dyn Error>