Use Case
Websocket (Session 1) -> Process Quotes (Session 2) -> Redis Timeseries (Session 3)
Use this programs to read stock quotes from WebSocket and store the timeseries in Redis Time Series Database
Libraries Used
- C++ Boost ASIO
- C++ Boost Beast
- hiredis
- Redis Plus Plus
Session 1
Video explanation of the code
Source Code - Reads from WebSocket
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Sravz LLC | |
* TODO: | |
* Implement lock free datastructure instead of _buffer so strand can be removed on on_read | |
* Implement JSON parser for the message | |
* Upload the JSON quotes/trades to redis | |
**/ | |
#include "root_certificates.hpp" | |
#include <boost/beast/core.hpp> | |
#include <boost/beast/ssl.hpp> | |
#include <boost/beast/websocket.hpp> | |
#include <boost/beast/websocket/ssl.hpp> | |
#include <boost/asio/strand.hpp> | |
#include <cstdlib> | |
#include <functional> | |
#include <iostream> | |
#include <memory> | |
#include <string> | |
#include <cstdio> | |
#include <boost/asio/signal_set.hpp> | |
#include <thread> | |
#include <boost/asio.hpp> | |
#include <boost/thread.hpp> | |
namespace beast = boost::beast; // from <boost/beast.hpp> | |
namespace http = beast::http; // from <boost/beast/http.hpp> | |
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp> | |
namespace net = boost::asio; // from <boost/asio.hpp> | |
namespace ssl = boost::asio::ssl; // from <boost/asio/ssl.hpp> | |
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp> | |
typedef boost::asio::io_context::executor_type executor_type; | |
typedef boost::asio::strand<executor_type> strand; | |
//------------------------------------------------------------------------------ | |
// Report a failure | |
void | |
fail(beast::error_code ec, char const* what) | |
{ | |
std::cerr << what << ": " << ec.message() << "\n"; | |
} | |
// Sends a WebSocket message and prints the response | |
// enabled_shared_from_this provided function shared_from_this | |
// which is a handy function to create shared pointers from this | |
class session : public std::enable_shared_from_this<session> | |
{ | |
private: | |
net::io_context& ioc_; | |
tcp::resolver resolver_; | |
websocket::stream< | |
beast::ssl_stream<beast::tcp_stream>> ws_; | |
beast::flat_buffer buffer_; | |
std::string host_; | |
std::string text_; | |
std::string endpoint_; | |
strand ws_strand_; | |
public: | |
// Resolver and socket require an io_context | |
explicit | |
session(net::io_context& ioc, ssl::context& ctx) | |
: resolver_(net::make_strand(ioc)) // Looks up the domain name | |
, ws_(net::make_strand(ioc), ctx) // Websocket constructor takes an IO context and ssl context | |
, ioc_(ioc) // reference to the io_context created in the main function | |
, ws_strand_(ioc.get_executor()) // Get a strand from the io_context | |
{ | |
} | |
// Start the asynchronous operation | |
void | |
run( | |
char const* host, | |
char const* port, | |
char const* text, | |
char const* endpoint) | |
{ | |
// Save these for later | |
host_ = host; | |
text_ = text; | |
endpoint_ = endpoint; | |
// Look up the domain name | |
resolver_.async_resolve( | |
host, | |
port, | |
// Binds parameters to the handler creating a new handler | |
beast::bind_front_handler( | |
&session::on_resolve, | |
shared_from_this())); | |
} | |
void | |
on_resolve( | |
beast::error_code ec, | |
tcp::resolver::results_type results) | |
{ | |
if(ec) | |
return fail(ec, "resolve"); | |
// Set a timeout on the operation | |
// Get lowest layer will get the underlying socket | |
//websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws_; | |
beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30)); | |
// Make the connection on the IP address we get from a lookup | |
beast::get_lowest_layer(ws_).async_connect( | |
results, | |
// Binds the strand to an executor of the type the strand is based on, in this case io_context | |
// bind_front_handler creates a functor object which when execute calls on_connect function of this pointer i.e | |
// current instance of session. | |
// Once Async_connect completes it calls the functor object with error_code ec and endpoint_type ep | |
// which are required arguments to call on_connect function | |
boost::asio::bind_executor(ws_strand_, beast::bind_front_handler( &session::on_connect, shared_from_this()))); | |
// beast::bind_front_handler( | |
// &session::on_connect, | |
// shared_from_this())); | |
} | |
void | |
on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type ep) | |
{ | |
if(ec) | |
return fail(ec, "connect"); | |
// Gets the socket associated with this web socket and sets a timeout on the operation. | |
// ws_ is declared as websocket::stream<beast::ssl_stream<beast::tcp_stream>> | |
// get_lower_layer will return beast::tcp_stream | |
beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30)); | |
// Set SNI Hostname (many hosts need this to handshake successfully) | |
if(! SSL_set_tlsext_host_name( | |
ws_.next_layer().native_handle(), | |
host_.c_str())) | |
{ | |
ec = beast::error_code(static_cast<int>(::ERR_get_error()), | |
net::error::get_ssl_category()); | |
return fail(ec, "connect"); | |
} | |
// Update the host_ string. This will provide the value of the | |
// Host HTTP header during the WebSocket handshake. | |
// See https://tools.ietf.org/html/rfc7230#section-5.4 | |
host_ += ':' + std::to_string(ep.port()); | |
// Perform the SSL handshake. | |
// ws_ is declared as websocket::stream<beast::ssl_stream<beast::tcp_stream>> | |
// next_layer will return beast::ssl_stream | |
ws_.next_layer().async_handshake( | |
ssl::stream_base::client, | |
boost::asio::bind_executor(ws_strand_, beast::bind_front_handler( &session::on_ssl_handshake, shared_from_this()))); | |
// beast::bind_front_handler( | |
// &session::on_ssl_handshake, | |
// shared_from_this())); | |
} | |
void | |
on_ssl_handshake(beast::error_code ec) | |
{ | |
if(ec) | |
return fail(ec, "ssl_handshake"); | |
// Turn off the timeout on the tcp_stream, because | |
// the websocket stream has its own timeout system. | |
beast::get_lowest_layer(ws_).expires_never(); | |
// Set suggested timeout settings for the websocket | |
ws_.set_option( | |
websocket::stream_base::timeout::suggested( | |
beast::role_type::client)); | |
// Set a decorator to change the User-Agent of the handshake | |
ws_.set_option(websocket::stream_base::decorator( | |
[](websocket::request_type& req) | |
{ | |
req.set(http::field::user_agent, | |
std::string(BOOST_BEAST_VERSION_STRING) + | |
" websocket-client-async-ssl"); | |
})); | |
// Perform the websocket handshake | |
ws_.async_handshake(host_, endpoint_, | |
boost::asio::bind_executor(ws_strand_, beast::bind_front_handler( &session::on_handshake, shared_from_this()))); | |
// beast::bind_front_handler( | |
// &session::on_handshake, | |
// shared_from_this())); | |
} | |
void | |
on_handshake(beast::error_code ec) | |
{ | |
if(ec) | |
return fail(ec, "handshake"); | |
// Send the message | |
ws_.async_write( | |
net::buffer(text_), | |
boost::asio::bind_executor(ws_strand_, beast::bind_front_handler( &session::on_write, shared_from_this()))); | |
// beast::bind_front_handler( | |
// &session::on_write, | |
// shared_from_this())); | |
} | |
void | |
on_write( | |
beast::error_code ec, | |
std::size_t bytes_transferred) | |
{ | |
// Supress unused variable compiler warnings | |
// bytes_transferred contains total bytes read/written to the websocket stream | |
boost::ignore_unused(bytes_transferred); | |
if(ec) | |
return fail(ec, "write"); | |
// Read single message | |
ws_.async_read( | |
buffer_, | |
boost::asio::bind_executor(ws_strand_, beast::bind_front_handler( &session::on_read, shared_from_this()))); | |
} | |
void | |
on_read( | |
beast::error_code ec, | |
std::size_t bytes_transferred) | |
{ | |
boost::ignore_unused(bytes_transferred); | |
if(ec) | |
return fail(ec, "read"); | |
// Print the messages | |
// make_printable interprets the bytes are characters and sends to output stream | |
std::cout << beast::make_printable(buffer_.data()) << " by thread ID:" << boost::this_thread::get_id() << std::endl; | |
// Clear the buffer | |
buffer_.consume(buffer_.size()); | |
// Read single message | |
ws_.async_read( | |
buffer_, | |
boost::asio::bind_executor(ws_strand_, beast::bind_front_handler( &session::on_read, shared_from_this()))); | |
// beast::bind_front_handler( | |
// &session::on_read, | |
// shared_from_this())); | |
} | |
// Check how to close when a signal handler is triggered? does the websocket auto close? | |
void | |
on_close(beast::error_code ec) | |
{ | |
if(ec) | |
return fail(ec, "close"); | |
// If we get here then the connection is closed gracefully | |
// The make_printable() function helps print a ConstBufferSequence | |
std::cout << beast::make_printable(buffer_.data()) << std::endl; | |
} | |
}; | |
//------------------------------------------------------------------------------ | |
int main(int argc, char** argv) | |
{ | |
// Check command line arguments. | |
if(argc != 6) | |
{ | |
std::cerr << | |
"Usage: websocket-client-async-ssl <host> <port> <text>\n" << | |
"Example:\n" << | |
" websocket-client-async-ssl echo.websocket.org 443 \"Hello, world!\" \"/url/to/connect/to\" threads\n"; | |
return EXIT_FAILURE; | |
} | |
auto const host = argv[1]; | |
auto const port = argv[2]; | |
auto const text = argv[3]; | |
auto const endpoint = argv[4]; | |
auto const threads = std::max<int>(1, std::atoi(argv[5])); | |
// The io_context is required for all I/O | |
net::io_context ioc; | |
// Singal handler to exit cleanly | |
boost::asio::signal_set signals(ioc, SIGINT, SIGTERM); | |
signals.async_wait([&](auto, auto){ | |
ioc.stop(); | |
std::cout << "Signal received, stopped the process."; | |
}); | |
// The SSL context is required, and holds certificates | |
ssl::context ctx{ssl::context::tlsv12_client}; | |
// This holds the root certificate used for verification | |
load_root_certificates(ctx); | |
// Launch the asynchronous operation | |
// Create an instance of session and returns a shared pointer to session | |
std::make_shared<session>(ioc, ctx)->run(host, port, text, endpoint); | |
// Run the I/O service on the requested number of threads | |
std::vector<std::thread> v; | |
v.reserve(threads - 1); | |
for(auto i = threads - 1; i > 0; --i) | |
// Create an instance in place at the end of the vector | |
v.emplace_back( | |
[&ioc] | |
{ | |
ioc.run(); | |
}); | |
ioc.run(); | |
// (If we get here, it means we got a SIGINT or SIGTERM) | |
// Block until all the threads exit | |
for(auto& t : v) | |
t.join(); | |
return EXIT_SUCCESS; | |
} |
Session 2
Video explanation of the code
Source Code - Common Utils.h used across all files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef SRAVZ_BACKENDCPP_UTIL_H | |
#define SRAVZ_BACKENDCPP_UTIL_H | |
// Includes | |
#include <iostream> | |
#include <unordered_map> | |
#include <boost/asio/co_spawn.hpp> | |
#include <boost/asio/detached.hpp> | |
#include <boost/asio/io_context.hpp> | |
#include <boost/asio/io_service.hpp> | |
#include <redis-plus-plus/test/src/sw/redis++/utils.h> | |
#include <redis-plus-plus/src/sw/redis++/errors.h> | |
#include <boost/asio/ip/tcp.hpp> | |
#include <boost/asio/signal_set.hpp> | |
#include <boost/thread.hpp> | |
#include <boost/asio/write.hpp> | |
#include <boost/lexical_cast.hpp> | |
#include <boost/log/trivial.hpp> | |
#include <cstdio> | |
#include <boost/asio.hpp> | |
#include <cstddef> | |
#include <string> | |
#include <functional> | |
#include <boost/beast/core.hpp> | |
#include <boost/beast/ssl.hpp> | |
#include <boost/beast/websocket.hpp> | |
#include <boost/beast/websocket/ssl.hpp> | |
#include <boost/asio/strand.hpp> | |
#include <cstdlib> | |
#include <memory> | |
#include <thread> | |
#include "root_certificates.hpp" | |
#include <boost/asio/spawn.hpp> | |
using RedisInstance = sw::redis::Redis; | |
// Gets env variable | |
bool getenv(const char *name, std::string &env) | |
{ | |
const char *ret = getenv(name); | |
if (ret) { | |
env = std::string(ret); | |
} else { | |
std::cout << "Env variable: " << name << " not set!!!"; | |
} | |
return !!ret; | |
} | |
// Returns redis connection options to be used in redis connection creation | |
sw::redis::ConnectionOptions getRedisConnectionOptions() { | |
std::string REDIS_HOST; | |
std::string REDIS_PORT; | |
std::string REDIS_PASSWORD; | |
sw::redis::ConnectionOptions opts; | |
if (getenv("REDIS_HOST", REDIS_HOST)) | |
opts.host = REDIS_HOST; | |
if (getenv("REDIS_PORT", REDIS_PORT)) | |
opts.port = std::stoi(REDIS_PORT); | |
if (getenv("REDIS_PASSWORD", REDIS_PASSWORD)) | |
opts.password = REDIS_PASSWORD; | |
return opts; | |
} | |
// Namespaces | |
namespace beast = boost::beast; // from <boost/beast.hpp> | |
namespace http = beast::http; // from <boost/beast/http.hpp> | |
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp> | |
namespace net = boost::asio; // from <boost/asio.hpp> | |
namespace ssl = boost::asio::ssl; // from <boost/asio/ssl.hpp> | |
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp> | |
typedef boost::asio::io_context::executor_type executor_type; | |
typedef boost::asio::strand<executor_type> strand; | |
//------------------------------------------------------------------------------ | |
class ThreadInfo | |
{ | |
public: | |
beast::flat_buffer buffer; | |
std::shared_ptr<RedisInstance> redis; | |
ThreadInfo() {} | |
ThreadInfo(beast::flat_buffer buffer_, std::shared_ptr<RedisInstance> redis_) : buffer(buffer_), redis(redis_) {} | |
}; | |
typedef std::map<boost::thread::id, ThreadInfo> ThreadsInfo; | |
#endif // end SRAVZ_BACKENDCPP_UTIL_H |
Source Code - Reads from WebSocket and Publishes to Redis Topic
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Sravz LLC | |
* TODO: | |
* Implement lock free datastructure instead of _buffer so strand can be removed on on_read | |
* Implement JSON parser for the message | |
* Upload the JSON quotes/trades to redis | |
**/ | |
#include <util.hpp> | |
// Report a failure | |
void | |
fail(beast::error_code ec, char const* what) | |
{ | |
std::cerr << what << ": " << ec.message() << "\n"; | |
} | |
// Sends a WebSocket message and prints the response | |
// enabled_shared_from_this provided function shared_from_this | |
// which is a handy function to create shared pointers from this | |
class session : public std::enable_shared_from_this<session> | |
{ | |
private: | |
net::io_context& ioc_; | |
tcp::resolver resolver_; | |
websocket::stream< | |
beast::ssl_stream<beast::tcp_stream>> ws_; | |
beast::flat_buffer buffer_; | |
std::string host_; | |
std::string text_; | |
std::string endpoint_; | |
strand ws_strand_; | |
ThreadsInfo threadsInfo_; | |
public: | |
// Resolver and socket require an io_context | |
explicit | |
session(net::io_context& ioc, ssl::context& ctx) | |
: resolver_(net::make_strand(ioc)) // Looks up the domain name | |
, ws_(net::make_strand(ioc), ctx) // Websocket constructor takes an IO context and ssl context | |
, ioc_(ioc) // reference to the io_context created in the main function | |
, ws_strand_(ioc.get_executor()) // Get a strand from the io_context | |
{ | |
// Register current thread | |
register_thread(); | |
} | |
// Start the asynchronous operation | |
void | |
run( | |
char const* host, | |
char const* port, | |
char const* text, | |
char const* endpoint) | |
{ | |
// Save these for later | |
host_ = host; | |
text_ = text; | |
endpoint_ = endpoint; | |
// Look up the domain name | |
resolver_.async_resolve( | |
host, | |
port, | |
// Binds parameters to the handler creating a new handler | |
beast::bind_front_handler( | |
&session::on_resolve, | |
shared_from_this())); | |
} | |
void | |
register_thread() | |
{ | |
threadsInfo_.insert(std::make_pair(boost::this_thread::get_id(), ThreadInfo(beast::flat_buffer(), std::make_shared<RedisInstance>(getRedisConnectionOptions())))); | |
} | |
void | |
on_resolve( | |
beast::error_code ec, | |
tcp::resolver::results_type results) | |
{ | |
if(ec) | |
return fail(ec, "resolve"); | |
// Set a timeout on the operation | |
// Get lowest layer will get the underlying socket | |
//websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws_; | |
beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30)); | |
// Make the connection on the IP address we get from a lookup | |
beast::get_lowest_layer(ws_).async_connect( | |
results, | |
// Binds the strand to an executor of the type the strand is based on, in this case io_context | |
// bind_front_handler creates a functor object which when execute calls on_connect function of this pointer i.e | |
// current instance of session. | |
// Once Async_connect completes it calls the functor object with error_code ec and endpoint_type ep | |
// which are required arguments to call on_connect function | |
boost::asio::bind_executor(ws_strand_, beast::bind_front_handler( &session::on_connect, shared_from_this()))); | |
// beast::bind_front_handler( | |
// &session::on_connect, | |
// shared_from_this())); | |
} | |
void | |
on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type ep) | |
{ | |
if(ec) | |
return fail(ec, "connect"); | |
// Gets the socket associated with this web socket and sets a timeout on the operation. | |
// ws_ is declared as websocket::stream<beast::ssl_stream<beast::tcp_stream>> | |
// get_lower_layer will return beast::tcp_stream | |
beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30)); | |
// Set SNI Hostname (many hosts need this to handshake successfully) | |
if(! SSL_set_tlsext_host_name( | |
ws_.next_layer().native_handle(), | |
host_.c_str())) | |
{ | |
ec = beast::error_code(static_cast<int>(::ERR_get_error()), | |
net::error::get_ssl_category()); | |
return fail(ec, "connect"); | |
} | |
// Update the host_ string. This will provide the value of the | |
// Host HTTP header during the WebSocket handshake. | |
// See https://tools.ietf.org/html/rfc7230#section-5.4 | |
host_ += ':' + std::to_string(ep.port()); | |
// Perform the SSL handshake. | |
// ws_ is declared as websocket::stream<beast::ssl_stream<beast::tcp_stream>> | |
// next_layer will return beast::ssl_stream | |
ws_.next_layer().async_handshake( | |
ssl::stream_base::client, | |
boost::asio::bind_executor(ws_strand_, beast::bind_front_handler( &session::on_ssl_handshake, shared_from_this()))); | |
// beast::bind_front_handler( | |
// &session::on_ssl_handshake, | |
// shared_from_this())); | |
} | |
void | |
on_ssl_handshake(beast::error_code ec) | |
{ | |
if(ec) | |
return fail(ec, "ssl_handshake"); | |
// Turn off the timeout on the tcp_stream, because | |
// the websocket stream has its own timeout system. | |
beast::get_lowest_layer(ws_).expires_never(); | |
// Set suggested timeout settings for the websocket | |
ws_.set_option( | |
websocket::stream_base::timeout::suggested( | |
beast::role_type::client)); | |
// Set a decorator to change the User-Agent of the handshake | |
ws_.set_option(websocket::stream_base::decorator( | |
[](websocket::request_type& req) | |
{ | |
req.set(http::field::user_agent, | |
std::string(BOOST_BEAST_VERSION_STRING) + | |
" websocket-client-async-ssl"); | |
})); | |
// Perform the websocket handshake | |
ws_.async_handshake(host_, endpoint_, | |
boost::asio::bind_executor(ws_strand_, beast::bind_front_handler( &session::on_handshake, shared_from_this()))); | |
// beast::bind_front_handler( | |
// &session::on_handshake, | |
// shared_from_this())); | |
} | |
void | |
on_handshake(beast::error_code ec) | |
{ | |
if(ec) | |
return fail(ec, "handshake"); | |
// Send the message | |
ws_.async_write( | |
net::buffer(text_), | |
boost::asio::bind_executor(ws_strand_, beast::bind_front_handler( &session::on_write, shared_from_this()))); | |
// beast::bind_front_handler( | |
// &session::on_write, | |
// shared_from_this())); | |
} | |
void | |
on_write( | |
beast::error_code ec, | |
std::size_t bytes_transferred) | |
{ | |
// Supress unused variable compiler warnings | |
// bytes_transferred contains total bytes read/written to the websocket stream | |
boost::ignore_unused(bytes_transferred); | |
if(ec) | |
return fail(ec, "write"); | |
// Read single message | |
ws_.async_read( | |
threadsInfo_[boost::this_thread::get_id()].buffer, | |
boost::asio::bind_executor(ws_strand_, beast::bind_front_handler( &session::on_read, shared_from_this()))); | |
} | |
void | |
on_read( | |
beast::error_code ec, | |
std::size_t bytes_transferred) | |
{ | |
boost::ignore_unused(bytes_transferred); | |
if(ec) | |
return fail(ec, "read"); | |
// Print the messages | |
// make_printable interprets the bytes are characters and sends to output stream | |
// beast::make_printable(threadsInfo_[boost::this_thread::get_id()].buffer.data()) | |
auto message = beast::buffers_to_string(threadsInfo_[boost::this_thread::get_id()].buffer.data()); | |
// std::cout << message << " by thread ID: " << boost::this_thread::get_id() << " buffer ID: " << &threadsInfo_[boost::this_thread::get_id()].buffer << std::endl; | |
threadsInfo_[boost::this_thread::get_id()].redis->publish("ETH-USD", message); | |
// Clear the buffer | |
threadsInfo_[boost::this_thread::get_id()].buffer.consume(threadsInfo_[boost::this_thread::get_id()].buffer.size()); | |
// Read single message | |
ws_.async_read( | |
threadsInfo_[boost::this_thread::get_id()].buffer, | |
boost::asio::bind_executor(ws_strand_, beast::bind_front_handler( &session::on_read, shared_from_this()))); | |
// beast::bind_front_handler( | |
// &session::on_read, | |
// shared_from_this())); | |
} | |
// Check how to close when a signal handler is triggered? does the websocket auto close? | |
void | |
on_close(beast::error_code ec) | |
{ | |
if(ec) | |
return fail(ec, "close"); | |
// If we get here then the connection is closed gracefully | |
// The make_printable() function helps print a ConstBufferSequence | |
std::cout << beast::make_printable(buffer_.data()) << std::endl; | |
} | |
}; | |
//------------------------------------------------------------------------------ | |
int main(int argc, char** argv) | |
{ | |
// Check command line arguments. | |
if(argc != 6) | |
{ | |
std::cerr << | |
"Usage: websocket-client-async-ssl <host> <port> <text>\n" << | |
"Example:\n" << | |
" websocket-client-async-ssl echo.websocket.org 443 \"Hello, world!\" \"/url/to/connect/to\" threads\n"; | |
return EXIT_FAILURE; | |
} | |
auto const host = argv[1]; | |
auto const port = argv[2]; | |
auto const text = argv[3]; | |
auto const endpoint = argv[4]; | |
auto const threads = std::max<int>(1, std::atoi(argv[5])); | |
// The io_context is required for all I/O | |
net::io_context ioc; | |
// Singal handler to exit cleanly | |
boost::asio::signal_set signals(ioc, SIGINT, SIGTERM); | |
signals.async_wait([&](auto, auto){ | |
ioc.stop(); | |
std::cout << "Signal received, stopped the process."; | |
}); | |
// The SSL context is required, and holds certificates | |
ssl::context ctx{ssl::context::tlsv12_client}; | |
// This holds the root certificate used for verification | |
load_root_certificates(ctx); | |
// Launch the asynchronous operation | |
// Create an instance of session and returns a shared pointer to session | |
auto session_prt = std::make_shared<session>(ioc, ctx); | |
session_prt->run(host, port, text, endpoint); | |
// Run the I/O service on the requested number of threads | |
std::vector<std::thread> v; | |
v.reserve(threads - 1); | |
for(auto i = threads - 1; i > 0; --i) | |
// Create an instance in place at the end of the vector | |
v.emplace_back( | |
[&session_prt, &ioc] | |
{ | |
session_prt->register_thread(); | |
ioc.run(); | |
}); | |
ioc.run(); | |
// (If we get here, it means we got a SIGINT or SIGTERM) | |
// Block until all the threads exit | |
for(auto& t : v) | |
t.join(); | |
return EXIT_SUCCESS; | |
} |
Source Code - Boost ASIO Threadpool - Redis Plus Plus Redis Subscriber
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Sravz LLC | |
* TODO: Handle multiple topics | |
**/ | |
#include <util.hpp> | |
using RedisInstance = sw::redis::Redis; | |
class RedisSubscriber: public std::enable_shared_from_this<RedisSubscriber> | |
{ | |
public: | |
RedisSubscriber(std::string topic, std::shared_ptr<boost::asio::thread_pool>& io) | |
: topic_(topic) | |
,redis_(RedisInstance(getRedisConnectionOptions())) | |
,io_(io) | |
{ | |
} | |
~RedisSubscriber() | |
{ | |
std::cout << "Subscriber closed" << std::endl; | |
} | |
void subscribe() | |
{ | |
// Create a Subscriber. | |
auto sub = redis_.subscriber(); | |
// Set callback functions. | |
sub.on_message([](std::string channel, std::string msg) { | |
std::cout << msg << " thread ID: " << boost::this_thread::get_id() << std::endl; | |
}); | |
// Subscribe to channels and patterns. | |
sub.subscribe(topic_); | |
// Consume messages in a loop. | |
while (true) { | |
sub.consume(); | |
try { | |
sub.consume(); | |
} catch (const sw::redis::Error &err) { | |
std::cout << err.what() << std::endl; | |
} | |
} | |
} | |
void run() { | |
boost::asio::post(io_->get_executor(), beast::bind_front_handler(&RedisSubscriber::subscribe, shared_from_this())); | |
} | |
private: | |
std::string topic_; | |
RedisInstance redis_; | |
std::shared_ptr<boost::asio::thread_pool> io_; | |
}; | |
int main(int argc, char** argv) | |
{ | |
if(argc != 2) | |
{ | |
std::cerr << | |
"Usage: sravz_redis_async <topic>\n" << | |
"Example:\n" << | |
" sravz_redis_async ETH-USD\n"; | |
return EXIT_FAILURE; | |
} | |
auto const topic = argv[1]; | |
auto io = std::make_shared<boost::asio::thread_pool>(1); | |
std::make_shared<RedisSubscriber>(topic, io)->run(); | |
io->join(); | |
} |
Session 3
Video explanation of the code
Source Code - Common Utils.h used across all files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef SRAVZ_BACKENDCPP_UTIL_H | |
#define SRAVZ_BACKENDCPP_UTIL_H | |
// Includes | |
#include <iostream> | |
#include <unordered_map> | |
#include <boost/asio/co_spawn.hpp> | |
#include <boost/asio/detached.hpp> | |
#include <boost/asio/io_context.hpp> | |
#include <boost/asio/io_service.hpp> | |
#include <redis-plus-plus/test/src/sw/redis++/utils.h> | |
#include <redis-plus-plus/src/sw/redis++/errors.h> | |
#include <boost/asio/ip/tcp.hpp> | |
#include <boost/asio/signal_set.hpp> | |
#include <boost/thread.hpp> | |
#include <boost/asio/write.hpp> | |
#include <boost/lexical_cast.hpp> | |
#include <boost/log/trivial.hpp> | |
#include <cstdio> | |
#include <boost/asio.hpp> | |
#include <cstddef> | |
#include <string> | |
#include <functional> | |
#include <boost/beast/core.hpp> | |
#include <boost/beast/ssl.hpp> | |
#include <boost/beast/websocket.hpp> | |
#include <boost/beast/websocket/ssl.hpp> | |
#include <boost/asio/strand.hpp> | |
#include <cstdlib> | |
#include <memory> | |
#include <thread> | |
#include "root_certificates.hpp" | |
#include <boost/asio/spawn.hpp> | |
#include <boost/coroutine2/all.hpp> | |
#include <boost/json.hpp> | |
#include <boost/lockfree/spsc_queue.hpp> | |
#include <boost/algorithm/string.hpp> | |
#include <boost/foreach.hpp> | |
/* Using namespaces */ | |
using RedisInstance = sw::redis::Redis; | |
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp> | |
// Namespaces | |
namespace beast = boost::beast; // from <boost/beast.hpp> | |
namespace http = beast::http; // from <boost/beast/http.hpp> | |
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp> | |
namespace net = boost::asio; // from <boost/asio.hpp> | |
namespace ssl = boost::asio::ssl; // from <boost/asio/ssl.hpp> | |
namespace json = boost::json; | |
typedef boost::asio::io_context::executor_type executor_type; | |
typedef boost::asio::strand<executor_type> strand; | |
// Classes | |
class ThreadInfo | |
{ | |
public: | |
beast::flat_buffer buffer; | |
std::shared_ptr<RedisInstance> redis; | |
ThreadInfo() {} | |
ThreadInfo(beast::flat_buffer buffer_, std::shared_ptr<RedisInstance> redis_) : buffer(buffer_), redis(redis_) {} | |
}; | |
typedef std::map<boost::thread::id, ThreadInfo> ThreadsInfo; | |
// Functions | |
// Gets env variable | |
bool getenv(const char *name, std::string &env); | |
// Returns redis connection options to be used in redis connection creation | |
sw::redis::ConnectionOptions getRedisConnectionOptions(); | |
#endif // end SRAVZ_BACKENDCPP_UTIL_H |
Source Code - Common Utils.cpp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <util.hpp> | |
// Gets env variable | |
bool getenv(const char *name, std::string &env) | |
{ | |
const char *ret = getenv(name); | |
if (ret) { | |
env = std::string(ret); | |
} else { | |
std::cout << "Env variable: " << name << " not set!!!"; | |
} | |
return !!ret; | |
} | |
// Returns redis connection options to be used in redis connection creation | |
sw::redis::ConnectionOptions getRedisConnectionOptions() { | |
std::string REDIS_HOST; | |
std::string REDIS_PORT; | |
std::string REDIS_PASSWORD; | |
sw::redis::ConnectionOptions opts; | |
if (getenv("REDIS_HOST", REDIS_HOST)) | |
opts.host = REDIS_HOST; | |
if (getenv("REDIS_PORT", REDIS_PORT)) | |
opts.port = std::stoi(REDIS_PORT); | |
if (getenv("REDIS_PASSWORD", REDIS_PASSWORD)) | |
opts.password = REDIS_PASSWORD; | |
return opts; | |
} | |
Source Code - Redis Subscriber Header - Reads from Redis Topic and Publishes to Redis Timeseries DB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Sravz LLC | |
**/ | |
#ifndef SRAVZ_REDIS_SUBSCRIBER_H | |
#define SRAVZ_REDIS_SUBSCRIBER_H | |
#include "util.hpp" | |
using RedisInstance = sw::redis::Redis; | |
class RedisSubscriber: public std::enable_shared_from_this<RedisSubscriber> | |
{ | |
public: | |
RedisSubscriber(std::string topics, std::shared_ptr<boost::asio::thread_pool>& io) | |
: topics_(topics) | |
,redis_consumer_(RedisInstance(getRedisConnectionOptions())) | |
,redis_publisher_(RedisInstance(getRedisConnectionOptions())) | |
,io_(io) | |
{ | |
} | |
~RedisSubscriber() | |
{ | |
std::cout << "Subscriber closed" << std::endl; | |
} | |
void subscribe(); | |
void parse(); | |
void publish(); | |
void run(); | |
private: | |
std::string topics_; | |
RedisInstance redis_consumer_; | |
RedisInstance redis_publisher_; | |
std::shared_ptr<boost::asio::thread_pool> io_; | |
boost::lockfree::spsc_queue<std::string, boost::lockfree::capacity<1024> > spsc_queue_parse_; | |
boost::lockfree::spsc_queue<boost::json::object, boost::lockfree::capacity<1024> > spsc_queue_publish_; | |
const int REDIS_BATCH_SIZE = 10; | |
const int EXPONENTIAL_BACKOFF_LIMIT = 5; | |
template<typename T1, typename T2> | |
void get_message_with_backoff_(boost::lockfree::spsc_queue<T1, boost::lockfree::capacity<1024> > &queue, T2 &msg); | |
}; | |
#endif |
Source Code - Redis Subscriber CPP- Reads from Redis Topic and Publishes to Redis Timeseries DB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Sravz LLC | |
* TODO: Handle multiple topics | |
**/ | |
#include <redis_subscriber.hpp> | |
void | |
RedisSubscriber:: | |
subscribe() | |
{ | |
// Create a Subscriber. | |
auto sub = redis_consumer_.subscriber(); | |
// Set callback functions. | |
sub.on_message([=](std::string channel, std::string msg) { | |
spsc_queue_parse_.push(msg); | |
}); | |
// Subscribe to channels and patterns. | |
std::vector<std::string> topics; | |
boost::algorithm::split(topics, topics_, boost::is_any_of(",")); | |
BOOST_FOREACH( std::string topic, topics ) | |
{ | |
sub.subscribe(topic); | |
} | |
// Consume messages in a loop. | |
while (true) { | |
try { | |
sub.consume(); | |
} catch (const sw::redis::Error &err) { | |
std::cout << err.what() << std::endl; | |
} | |
} | |
} | |
void | |
RedisSubscriber:: | |
parse() | |
{ | |
std::string msg; | |
boost::json::parse_options opt {}; | |
opt.allow_comments = true; | |
opt.allow_trailing_commas = true; | |
long last_timestamp = 0; | |
while(true) { | |
get_message_with_backoff_(spsc_queue_parse_, msg); | |
try{ | |
boost::json::value val = boost::json::parse(msg, {}, opt); | |
auto obj = val.as_object(); | |
auto cur_last_timestamp = boost::json::value_to<long>(obj.at("t")); | |
if (last_timestamp > cur_last_timestamp) { | |
continue; | |
} | |
last_timestamp = cur_last_timestamp; | |
std::cout << "Parse: " << obj.at("s") << " " << obj.at("p") << " " << obj.at("t") << " thread ID: " << boost::this_thread::get_id() << std::endl; | |
spsc_queue_publish_.push(obj); | |
} catch(std::exception const& ex) | |
{ | |
std::cout << "Cannot parse datapoint: " << ex.what() << std::endl; | |
} | |
} | |
} | |
void | |
RedisSubscriber:: | |
publish() | |
{ | |
boost::json::object obj; | |
int count = 0; | |
std::vector<std::string> result; | |
result.push_back("TS.MADD"); | |
while(true) { | |
get_message_with_backoff_(spsc_queue_publish_, obj); | |
std::cout << "Publish: " << obj.at("s") << " " << obj.at("p") << " " << obj.at("t") << " thread ID: " << boost::this_thread::get_id() << std::endl; | |
try { | |
result.push_back(boost::json::value_to<std::string>(obj.at("s"))); | |
result.push_back(boost::lexical_cast<std::string>(boost::json::value_to<long>(obj.at("t")))); | |
result.push_back(boost::json::value_to<std::string>(obj.at("p"))); | |
if (count >= REDIS_BATCH_SIZE) { | |
redis_publisher_.command(result.begin(), result.end()); | |
count=0; | |
result.resize(1); | |
} else { | |
count++; | |
} | |
} catch(std::exception const& ex) | |
{ | |
std::cout << "Cannot push datapoint: " << ex.what() << std::endl; | |
} | |
} | |
} | |
void | |
RedisSubscriber:: | |
run() { | |
boost::asio::post(io_->get_executor(), beast::bind_front_handler(&RedisSubscriber::subscribe, shared_from_this())); | |
boost::asio::post(io_->get_executor(), beast::bind_front_handler(&RedisSubscriber::parse, shared_from_this())); | |
boost::asio::post(io_->get_executor(), beast::bind_front_handler(&RedisSubscriber::publish, shared_from_this())); | |
} | |
template <typename T1, typename T2> | |
void | |
RedisSubscriber:: | |
get_message_with_backoff_(boost::lockfree::spsc_queue<T1, boost::lockfree::capacity<1024> > &queue, T2 &msg) { | |
int backoff = 0; | |
while(true) { | |
if (queue.pop(msg)) | |
{ | |
return; | |
} else { | |
sleep(1 << backoff++); | |
if (backoff > EXPONENTIAL_BACKOFF_LIMIT) { | |
backoff = 0; | |
} | |
} | |
} | |
} | |
Source Code - Main method, initializes the threadpool and calls Redis Subscriber
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Sravz LLC | |
**/ | |
#include <util.hpp> | |
#include <redis_subscriber.hpp> | |
int main(int argc, char** argv) | |
{ | |
if(argc != 2) | |
{ | |
std::cerr << | |
"Usage: sravz_redis_async <topic>\n" << | |
"Example:\n" << | |
" sravz_redis_async ETH-USD,BTC-USD\n"; | |
return EXIT_FAILURE; | |
} | |
auto const topic = argv[1]; | |
auto io = std::make_shared<boost::asio::thread_pool>(3); | |
// Singal handler to exit cleanly | |
boost::asio::signal_set signals(*io, SIGINT, SIGTERM); | |
signals.async_wait([&](auto, auto){ | |
io->stop(); | |
std::cout << "Signal received, stopped the process."; | |
}); | |
std::make_shared<RedisSubscriber>(topic, io)->run(); | |
io->join(); | |
} |