Boost Beast Websockets

Use Case

Websocket (Session 1) -> Process Quotes (Session 2) -> Redis Timeseries (Session 3)

Use this programs to read stock quotes from WebSocket and store the timeseries in Redis Time Series Database

Libraries Used

  • C++ Boost ASIO
  • C++ Boost Beast
  • hiredis
  • Redis Plus Plus

Session 1

Video explanation of the code

C++-Boost-WebSocket-Client

Source Code - Reads from WebSocket

/**
* Sravz LLC
* TODO:
* Implement lock free datastructure instead of _buffer so strand can be removed on on_read
* Implement JSON parser for the message
* Upload the JSON quotes/trades to redis
**/
#include "root_certificates.hpp"
#include <boost/beast/core.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <boost/asio/strand.hpp>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <cstdio>
#include <boost/asio/signal_set.hpp>
#include <thread>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
namespace ssl = boost::asio::ssl; // from <boost/asio/ssl.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
typedef boost::asio::io_context::executor_type executor_type;
typedef boost::asio::strand<executor_type> strand;
//------------------------------------------------------------------------------
// Report a failure
void
fail(beast::error_code ec, char const* what)
{
std::cerr << what << ": " << ec.message() << "\n";
}
// Sends a WebSocket message and prints the response
// enabled_shared_from_this provided function shared_from_this
// which is a handy function to create shared pointers from this
class session : public std::enable_shared_from_this<session>
{
private:
net::io_context& ioc_;
tcp::resolver resolver_;
websocket::stream<
beast::ssl_stream<beast::tcp_stream>> ws_;
beast::flat_buffer buffer_;
std::string host_;
std::string text_;
std::string endpoint_;
strand ws_strand_;
public:
// Resolver and socket require an io_context
explicit
session(net::io_context& ioc, ssl::context& ctx)
: resolver_(net::make_strand(ioc)) // Looks up the domain name
, ws_(net::make_strand(ioc), ctx) // Websocket constructor takes an IO context and ssl context
, ioc_(ioc) // reference to the io_context created in the main function
, ws_strand_(ioc.get_executor()) // Get a strand from the io_context
{
}
// Start the asynchronous operation
void
run(
char const* host,
char const* port,
char const* text,
char const* endpoint)
{
// Save these for later
host_ = host;
text_ = text;
endpoint_ = endpoint;
// Look up the domain name
resolver_.async_resolve(
host,
port,
// Binds parameters to the handler creating a new handler
beast::bind_front_handler(
&session::on_resolve,
shared_from_this()));
}
void
on_resolve(
beast::error_code ec,
tcp::resolver::results_type results)
{
if(ec)
return fail(ec, "resolve");
// Set a timeout on the operation
// Get lowest layer will get the underlying socket
//websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws_;
beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
// Make the connection on the IP address we get from a lookup
beast::get_lowest_layer(ws_).async_connect(
results,
// Binds the strand to an executor of the type the strand is based on, in this case io_context
// bind_front_handler creates a functor object which when execute calls on_connect function of this pointer i.e
// current instance of session.
// Once Async_connect completes it calls the functor object with error_code ec and endpoint_type ep
// which are required arguments to call on_connect function
boost::asio::bind_executor(ws_strand_, beast::bind_front_handler( &session::on_connect, shared_from_this())));
// beast::bind_front_handler(
// &session::on_connect,
// shared_from_this()));
}
void
on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type ep)
{
if(ec)
return fail(ec, "connect");
// Gets the socket associated with this web socket and sets a timeout on the operation.
// ws_ is declared as websocket::stream<beast::ssl_stream<beast::tcp_stream>>
// get_lower_layer will return beast::tcp_stream
beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
// Set SNI Hostname (many hosts need this to handshake successfully)
if(! SSL_set_tlsext_host_name(
ws_.next_layer().native_handle(),
host_.c_str()))
{
ec = beast::error_code(static_cast<int>(::ERR_get_error()),
net::error::get_ssl_category());
return fail(ec, "connect");
}
// Update the host_ string. This will provide the value of the
// Host HTTP header during the WebSocket handshake.
// See https://tools.ietf.org/html/rfc7230#section-5.4
host_ += ':' + std::to_string(ep.port());
// Perform the SSL handshake.
// ws_ is declared as websocket::stream<beast::ssl_stream<beast::tcp_stream>>
// next_layer will return beast::ssl_stream
ws_.next_layer().async_handshake(
ssl::stream_base::client,
boost::asio::bind_executor(ws_strand_, beast::bind_front_handler( &session::on_ssl_handshake, shared_from_this())));
// beast::bind_front_handler(
// &session::on_ssl_handshake,
// shared_from_this()));
}
void
on_ssl_handshake(beast::error_code ec)
{
if(ec)
return fail(ec, "ssl_handshake");
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws_).expires_never();
// Set suggested timeout settings for the websocket
ws_.set_option(
websocket::stream_base::timeout::suggested(
beast::role_type::client));
// Set a decorator to change the User-Agent of the handshake
ws_.set_option(websocket::stream_base::decorator(
[](websocket::request_type& req)
{
req.set(http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-client-async-ssl");
}));
// Perform the websocket handshake
ws_.async_handshake(host_, endpoint_,
boost::asio::bind_executor(ws_strand_, beast::bind_front_handler( &session::on_handshake, shared_from_this())));
// beast::bind_front_handler(
// &session::on_handshake,
// shared_from_this()));
}
void
on_handshake(beast::error_code ec)
{
if(ec)
return fail(ec, "handshake");
// Send the message
ws_.async_write(
net::buffer(text_),
boost::asio::bind_executor(ws_strand_, beast::bind_front_handler( &session::on_write, shared_from_this())));
// beast::bind_front_handler(
// &session::on_write,
// shared_from_this()));
}
void
on_write(
beast::error_code ec,
std::size_t bytes_transferred)
{
// Supress unused variable compiler warnings
// bytes_transferred contains total bytes read/written to the websocket stream
boost::ignore_unused(bytes_transferred);
if(ec)
return fail(ec, "write");
// Read single message
ws_.async_read(
buffer_,
boost::asio::bind_executor(ws_strand_, beast::bind_front_handler( &session::on_read, shared_from_this())));
}
void
on_read(
beast::error_code ec,
std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
if(ec)
return fail(ec, "read");
// Print the messages
// make_printable interprets the bytes are characters and sends to output stream
std::cout << beast::make_printable(buffer_.data()) << " by thread ID:" << boost::this_thread::get_id() << std::endl;
// Clear the buffer
buffer_.consume(buffer_.size());
// Read single message
ws_.async_read(
buffer_,
boost::asio::bind_executor(ws_strand_, beast::bind_front_handler( &session::on_read, shared_from_this())));
// beast::bind_front_handler(
// &session::on_read,
// shared_from_this()));
}
// Check how to close when a signal handler is triggered? does the websocket auto close?
void
on_close(beast::error_code ec)
{
if(ec)
return fail(ec, "close");
// If we get here then the connection is closed gracefully
// The make_printable() function helps print a ConstBufferSequence
std::cout << beast::make_printable(buffer_.data()) << std::endl;
}
};
//------------------------------------------------------------------------------
int main(int argc, char** argv)
{
// Check command line arguments.
if(argc != 6)
{
std::cerr <<
"Usage: websocket-client-async-ssl <host> <port> <text>\n" <<
"Example:\n" <<
" websocket-client-async-ssl echo.websocket.org 443 \"Hello, world!\" \"/url/to/connect/to\" threads\n";
return EXIT_FAILURE;
}
auto const host = argv[1];
auto const port = argv[2];
auto const text = argv[3];
auto const endpoint = argv[4];
auto const threads = std::max<int>(1, std::atoi(argv[5]));
// The io_context is required for all I/O
net::io_context ioc;
// Singal handler to exit cleanly
boost::asio::signal_set signals(ioc, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto){
ioc.stop();
std::cout << "Signal received, stopped the process.";
});
// The SSL context is required, and holds certificates
ssl::context ctx{ssl::context::tlsv12_client};
// This holds the root certificate used for verification
load_root_certificates(ctx);
// Launch the asynchronous operation
// Create an instance of session and returns a shared pointer to session
std::make_shared<session>(ioc, ctx)->run(host, port, text, endpoint);
// Run the I/O service on the requested number of threads
std::vector<std::thread> v;
v.reserve(threads - 1);
for(auto i = threads - 1; i > 0; --i)
// Create an instance in place at the end of the vector
v.emplace_back(
[&ioc]
{
ioc.run();
});
ioc.run();
// (If we get here, it means we got a SIGINT or SIGTERM)
// Block until all the threads exit
for(auto& t : v)
t.join();
return EXIT_SUCCESS;
}

Session 2

Video explanation of the code

C++-Boost-WebSocket-Client-Redis-Client

Source Code - Common Utils.h used across all files

#ifndef SRAVZ_BACKENDCPP_UTIL_H
#define SRAVZ_BACKENDCPP_UTIL_H
// Includes
#include <iostream>
#include <unordered_map>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/io_service.hpp>
#include <redis-plus-plus/test/src/sw/redis++/utils.h>
#include <redis-plus-plus/src/sw/redis++/errors.h>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/thread.hpp>
#include <boost/asio/write.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/log/trivial.hpp>
#include <cstdio>
#include <boost/asio.hpp>
#include <cstddef>
#include <string>
#include <functional>
#include <boost/beast/core.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <boost/asio/strand.hpp>
#include <cstdlib>
#include <memory>
#include <thread>
#include "root_certificates.hpp"
#include <boost/asio/spawn.hpp>
using RedisInstance = sw::redis::Redis;
// Gets env variable
bool getenv(const char *name, std::string &env)
{
const char *ret = getenv(name);
if (ret) {
env = std::string(ret);
} else {
std::cout << "Env variable: " << name << " not set!!!";
}
return !!ret;
}
// Returns redis connection options to be used in redis connection creation
sw::redis::ConnectionOptions getRedisConnectionOptions() {
std::string REDIS_HOST;
std::string REDIS_PORT;
std::string REDIS_PASSWORD;
sw::redis::ConnectionOptions opts;
if (getenv("REDIS_HOST", REDIS_HOST))
opts.host = REDIS_HOST;
if (getenv("REDIS_PORT", REDIS_PORT))
opts.port = std::stoi(REDIS_PORT);
if (getenv("REDIS_PASSWORD", REDIS_PASSWORD))
opts.password = REDIS_PASSWORD;
return opts;
}
// Namespaces
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
namespace ssl = boost::asio::ssl; // from <boost/asio/ssl.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
typedef boost::asio::io_context::executor_type executor_type;
typedef boost::asio::strand<executor_type> strand;
//------------------------------------------------------------------------------
class ThreadInfo
{
public:
beast::flat_buffer buffer;
std::shared_ptr<RedisInstance> redis;
ThreadInfo() {}
ThreadInfo(beast::flat_buffer buffer_, std::shared_ptr<RedisInstance> redis_) : buffer(buffer_), redis(redis_) {}
};
typedef std::map<boost::thread::id, ThreadInfo> ThreadsInfo;
#endif // end SRAVZ_BACKENDCPP_UTIL_H
view raw utils.h hosted with ❤ by GitHub

Source Code - Reads from WebSocket and Publishes to Redis Topic

/**
* Sravz LLC
* TODO:
* Implement lock free datastructure instead of _buffer so strand can be removed on on_read
* Implement JSON parser for the message
* Upload the JSON quotes/trades to redis
**/
#include <util.hpp>
// Report a failure
void
fail(beast::error_code ec, char const* what)
{
std::cerr << what << ": " << ec.message() << "\n";
}
// Sends a WebSocket message and prints the response
// enabled_shared_from_this provided function shared_from_this
// which is a handy function to create shared pointers from this
class session : public std::enable_shared_from_this<session>
{
private:
net::io_context& ioc_;
tcp::resolver resolver_;
websocket::stream<
beast::ssl_stream<beast::tcp_stream>> ws_;
beast::flat_buffer buffer_;
std::string host_;
std::string text_;
std::string endpoint_;
strand ws_strand_;
ThreadsInfo threadsInfo_;
public:
// Resolver and socket require an io_context
explicit
session(net::io_context& ioc, ssl::context& ctx)
: resolver_(net::make_strand(ioc)) // Looks up the domain name
, ws_(net::make_strand(ioc), ctx) // Websocket constructor takes an IO context and ssl context
, ioc_(ioc) // reference to the io_context created in the main function
, ws_strand_(ioc.get_executor()) // Get a strand from the io_context
{
// Register current thread
register_thread();
}
// Start the asynchronous operation
void
run(
char const* host,
char const* port,
char const* text,
char const* endpoint)
{
// Save these for later
host_ = host;
text_ = text;
endpoint_ = endpoint;
// Look up the domain name
resolver_.async_resolve(
host,
port,
// Binds parameters to the handler creating a new handler
beast::bind_front_handler(
&session::on_resolve,
shared_from_this()));
}
void
register_thread()
{
threadsInfo_.insert(std::make_pair(boost::this_thread::get_id(), ThreadInfo(beast::flat_buffer(), std::make_shared<RedisInstance>(getRedisConnectionOptions()))));
}
void
on_resolve(
beast::error_code ec,
tcp::resolver::results_type results)
{
if(ec)
return fail(ec, "resolve");
// Set a timeout on the operation
// Get lowest layer will get the underlying socket
//websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws_;
beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
// Make the connection on the IP address we get from a lookup
beast::get_lowest_layer(ws_).async_connect(
results,
// Binds the strand to an executor of the type the strand is based on, in this case io_context
// bind_front_handler creates a functor object which when execute calls on_connect function of this pointer i.e
// current instance of session.
// Once Async_connect completes it calls the functor object with error_code ec and endpoint_type ep
// which are required arguments to call on_connect function
boost::asio::bind_executor(ws_strand_, beast::bind_front_handler( &session::on_connect, shared_from_this())));
// beast::bind_front_handler(
// &session::on_connect,
// shared_from_this()));
}
void
on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type ep)
{
if(ec)
return fail(ec, "connect");
// Gets the socket associated with this web socket and sets a timeout on the operation.
// ws_ is declared as websocket::stream<beast::ssl_stream<beast::tcp_stream>>
// get_lower_layer will return beast::tcp_stream
beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
// Set SNI Hostname (many hosts need this to handshake successfully)
if(! SSL_set_tlsext_host_name(
ws_.next_layer().native_handle(),
host_.c_str()))
{
ec = beast::error_code(static_cast<int>(::ERR_get_error()),
net::error::get_ssl_category());
return fail(ec, "connect");
}
// Update the host_ string. This will provide the value of the
// Host HTTP header during the WebSocket handshake.
// See https://tools.ietf.org/html/rfc7230#section-5.4
host_ += ':' + std::to_string(ep.port());
// Perform the SSL handshake.
// ws_ is declared as websocket::stream<beast::ssl_stream<beast::tcp_stream>>
// next_layer will return beast::ssl_stream
ws_.next_layer().async_handshake(
ssl::stream_base::client,
boost::asio::bind_executor(ws_strand_, beast::bind_front_handler( &session::on_ssl_handshake, shared_from_this())));
// beast::bind_front_handler(
// &session::on_ssl_handshake,
// shared_from_this()));
}
void
on_ssl_handshake(beast::error_code ec)
{
if(ec)
return fail(ec, "ssl_handshake");
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws_).expires_never();
// Set suggested timeout settings for the websocket
ws_.set_option(
websocket::stream_base::timeout::suggested(
beast::role_type::client));
// Set a decorator to change the User-Agent of the handshake
ws_.set_option(websocket::stream_base::decorator(
[](websocket::request_type& req)
{
req.set(http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-client-async-ssl");
}));
// Perform the websocket handshake
ws_.async_handshake(host_, endpoint_,
boost::asio::bind_executor(ws_strand_, beast::bind_front_handler( &session::on_handshake, shared_from_this())));
// beast::bind_front_handler(
// &session::on_handshake,
// shared_from_this()));
}
void
on_handshake(beast::error_code ec)
{
if(ec)
return fail(ec, "handshake");
// Send the message
ws_.async_write(
net::buffer(text_),
boost::asio::bind_executor(ws_strand_, beast::bind_front_handler( &session::on_write, shared_from_this())));
// beast::bind_front_handler(
// &session::on_write,
// shared_from_this()));
}
void
on_write(
beast::error_code ec,
std::size_t bytes_transferred)
{
// Supress unused variable compiler warnings
// bytes_transferred contains total bytes read/written to the websocket stream
boost::ignore_unused(bytes_transferred);
if(ec)
return fail(ec, "write");
// Read single message
ws_.async_read(
threadsInfo_[boost::this_thread::get_id()].buffer,
boost::asio::bind_executor(ws_strand_, beast::bind_front_handler( &session::on_read, shared_from_this())));
}
void
on_read(
beast::error_code ec,
std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
if(ec)
return fail(ec, "read");
// Print the messages
// make_printable interprets the bytes are characters and sends to output stream
// beast::make_printable(threadsInfo_[boost::this_thread::get_id()].buffer.data())
auto message = beast::buffers_to_string(threadsInfo_[boost::this_thread::get_id()].buffer.data());
// std::cout << message << " by thread ID: " << boost::this_thread::get_id() << " buffer ID: " << &threadsInfo_[boost::this_thread::get_id()].buffer << std::endl;
threadsInfo_[boost::this_thread::get_id()].redis->publish("ETH-USD", message);
// Clear the buffer
threadsInfo_[boost::this_thread::get_id()].buffer.consume(threadsInfo_[boost::this_thread::get_id()].buffer.size());
// Read single message
ws_.async_read(
threadsInfo_[boost::this_thread::get_id()].buffer,
boost::asio::bind_executor(ws_strand_, beast::bind_front_handler( &session::on_read, shared_from_this())));
// beast::bind_front_handler(
// &session::on_read,
// shared_from_this()));
}
// Check how to close when a signal handler is triggered? does the websocket auto close?
void
on_close(beast::error_code ec)
{
if(ec)
return fail(ec, "close");
// If we get here then the connection is closed gracefully
// The make_printable() function helps print a ConstBufferSequence
std::cout << beast::make_printable(buffer_.data()) << std::endl;
}
};
//------------------------------------------------------------------------------
int main(int argc, char** argv)
{
// Check command line arguments.
if(argc != 6)
{
std::cerr <<
"Usage: websocket-client-async-ssl <host> <port> <text>\n" <<
"Example:\n" <<
" websocket-client-async-ssl echo.websocket.org 443 \"Hello, world!\" \"/url/to/connect/to\" threads\n";
return EXIT_FAILURE;
}
auto const host = argv[1];
auto const port = argv[2];
auto const text = argv[3];
auto const endpoint = argv[4];
auto const threads = std::max<int>(1, std::atoi(argv[5]));
// The io_context is required for all I/O
net::io_context ioc;
// Singal handler to exit cleanly
boost::asio::signal_set signals(ioc, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto){
ioc.stop();
std::cout << "Signal received, stopped the process.";
});
// The SSL context is required, and holds certificates
ssl::context ctx{ssl::context::tlsv12_client};
// This holds the root certificate used for verification
load_root_certificates(ctx);
// Launch the asynchronous operation
// Create an instance of session and returns a shared pointer to session
auto session_prt = std::make_shared<session>(ioc, ctx);
session_prt->run(host, port, text, endpoint);
// Run the I/O service on the requested number of threads
std::vector<std::thread> v;
v.reserve(threads - 1);
for(auto i = threads - 1; i > 0; --i)
// Create an instance in place at the end of the vector
v.emplace_back(
[&session_prt, &ioc]
{
session_prt->register_thread();
ioc.run();
});
ioc.run();
// (If we get here, it means we got a SIGINT or SIGTERM)
// Block until all the threads exit
for(auto& t : v)
t.join();
return EXIT_SUCCESS;
}

Source Code - Boost ASIO Threadpool - Redis Plus Plus Redis Subscriber

/**
* Sravz LLC
* TODO: Handle multiple topics
**/
#include <util.hpp>
using RedisInstance = sw::redis::Redis;
class RedisSubscriber: public std::enable_shared_from_this<RedisSubscriber>
{
public:
RedisSubscriber(std::string topic, std::shared_ptr<boost::asio::thread_pool>& io)
: topic_(topic)
,redis_(RedisInstance(getRedisConnectionOptions()))
,io_(io)
{
}
~RedisSubscriber()
{
std::cout << "Subscriber closed" << std::endl;
}
void subscribe()
{
// Create a Subscriber.
auto sub = redis_.subscriber();
// Set callback functions.
sub.on_message([](std::string channel, std::string msg) {
std::cout << msg << " thread ID: " << boost::this_thread::get_id() << std::endl;
});
// Subscribe to channels and patterns.
sub.subscribe(topic_);
// Consume messages in a loop.
while (true) {
sub.consume();
try {
sub.consume();
} catch (const sw::redis::Error &err) {
std::cout << err.what() << std::endl;
}
}
}
void run() {
boost::asio::post(io_->get_executor(), beast::bind_front_handler(&RedisSubscriber::subscribe, shared_from_this()));
}
private:
std::string topic_;
RedisInstance redis_;
std::shared_ptr<boost::asio::thread_pool> io_;
};
int main(int argc, char** argv)
{
if(argc != 2)
{
std::cerr <<
"Usage: sravz_redis_async <topic>\n" <<
"Example:\n" <<
" sravz_redis_async ETH-USD\n";
return EXIT_FAILURE;
}
auto const topic = argv[1];
auto io = std::make_shared<boost::asio::thread_pool>(1);
std::make_shared<RedisSubscriber>(topic, io)->run();
io->join();
}

Session 3

Video explanation of the code

C++-Boost-WebSocket-Client-Redis-Client

Source Code - Common Utils.h used across all files

#ifndef SRAVZ_BACKENDCPP_UTIL_H
#define SRAVZ_BACKENDCPP_UTIL_H
// Includes
#include <iostream>
#include <unordered_map>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/io_service.hpp>
#include <redis-plus-plus/test/src/sw/redis++/utils.h>
#include <redis-plus-plus/src/sw/redis++/errors.h>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/thread.hpp>
#include <boost/asio/write.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/log/trivial.hpp>
#include <cstdio>
#include <boost/asio.hpp>
#include <cstddef>
#include <string>
#include <functional>
#include <boost/beast/core.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <boost/asio/strand.hpp>
#include <cstdlib>
#include <memory>
#include <thread>
#include "root_certificates.hpp"
#include <boost/asio/spawn.hpp>
#include <boost/coroutine2/all.hpp>
#include <boost/json.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/foreach.hpp>
/* Using namespaces */
using RedisInstance = sw::redis::Redis;
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
// Namespaces
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
namespace ssl = boost::asio::ssl; // from <boost/asio/ssl.hpp>
namespace json = boost::json;
typedef boost::asio::io_context::executor_type executor_type;
typedef boost::asio::strand<executor_type> strand;
// Classes
class ThreadInfo
{
public:
beast::flat_buffer buffer;
std::shared_ptr<RedisInstance> redis;
ThreadInfo() {}
ThreadInfo(beast::flat_buffer buffer_, std::shared_ptr<RedisInstance> redis_) : buffer(buffer_), redis(redis_) {}
};
typedef std::map<boost::thread::id, ThreadInfo> ThreadsInfo;
// Functions
// Gets env variable
bool getenv(const char *name, std::string &env);
// Returns redis connection options to be used in redis connection creation
sw::redis::ConnectionOptions getRedisConnectionOptions();
#endif // end SRAVZ_BACKENDCPP_UTIL_H
view raw utils.h hosted with ❤ by GitHub

Source Code - Common Utils.cpp

#include <util.hpp>
// Gets env variable
bool getenv(const char *name, std::string &env)
{
const char *ret = getenv(name);
if (ret) {
env = std::string(ret);
} else {
std::cout << "Env variable: " << name << " not set!!!";
}
return !!ret;
}
// Returns redis connection options to be used in redis connection creation
sw::redis::ConnectionOptions getRedisConnectionOptions() {
std::string REDIS_HOST;
std::string REDIS_PORT;
std::string REDIS_PASSWORD;
sw::redis::ConnectionOptions opts;
if (getenv("REDIS_HOST", REDIS_HOST))
opts.host = REDIS_HOST;
if (getenv("REDIS_PORT", REDIS_PORT))
opts.port = std::stoi(REDIS_PORT);
if (getenv("REDIS_PASSWORD", REDIS_PASSWORD))
opts.password = REDIS_PASSWORD;
return opts;
}
view raw utils.cpp hosted with ❤ by GitHub

Source Code - Redis Subscriber Header - Reads from Redis Topic and Publishes to Redis Timeseries DB

/**
* Sravz LLC
**/
#ifndef SRAVZ_REDIS_SUBSCRIBER_H
#define SRAVZ_REDIS_SUBSCRIBER_H
#include "util.hpp"
using RedisInstance = sw::redis::Redis;
class RedisSubscriber: public std::enable_shared_from_this<RedisSubscriber>
{
public:
RedisSubscriber(std::string topics, std::shared_ptr<boost::asio::thread_pool>& io)
: topics_(topics)
,redis_consumer_(RedisInstance(getRedisConnectionOptions()))
,redis_publisher_(RedisInstance(getRedisConnectionOptions()))
,io_(io)
{
}
~RedisSubscriber()
{
std::cout << "Subscriber closed" << std::endl;
}
void subscribe();
void parse();
void publish();
void run();
private:
std::string topics_;
RedisInstance redis_consumer_;
RedisInstance redis_publisher_;
std::shared_ptr<boost::asio::thread_pool> io_;
boost::lockfree::spsc_queue<std::string, boost::lockfree::capacity<1024> > spsc_queue_parse_;
boost::lockfree::spsc_queue<boost::json::object, boost::lockfree::capacity<1024> > spsc_queue_publish_;
const int REDIS_BATCH_SIZE = 10;
const int EXPONENTIAL_BACKOFF_LIMIT = 5;
template<typename T1, typename T2>
void get_message_with_backoff_(boost::lockfree::spsc_queue<T1, boost::lockfree::capacity<1024> > &queue, T2 &msg);
};
#endif

Source Code - Redis Subscriber CPP- Reads from Redis Topic and Publishes to Redis Timeseries DB

/**
* Sravz LLC
* TODO: Handle multiple topics
**/
#include <redis_subscriber.hpp>
void
RedisSubscriber::
subscribe()
{
// Create a Subscriber.
auto sub = redis_consumer_.subscriber();
// Set callback functions.
sub.on_message([=](std::string channel, std::string msg) {
spsc_queue_parse_.push(msg);
});
// Subscribe to channels and patterns.
std::vector<std::string> topics;
boost::algorithm::split(topics, topics_, boost::is_any_of(","));
BOOST_FOREACH( std::string topic, topics )
{
sub.subscribe(topic);
}
// Consume messages in a loop.
while (true) {
try {
sub.consume();
} catch (const sw::redis::Error &err) {
std::cout << err.what() << std::endl;
}
}
}
void
RedisSubscriber::
parse()
{
std::string msg;
boost::json::parse_options opt {};
opt.allow_comments = true;
opt.allow_trailing_commas = true;
long last_timestamp = 0;
while(true) {
get_message_with_backoff_(spsc_queue_parse_, msg);
try{
boost::json::value val = boost::json::parse(msg, {}, opt);
auto obj = val.as_object();
auto cur_last_timestamp = boost::json::value_to<long>(obj.at("t"));
if (last_timestamp > cur_last_timestamp) {
continue;
}
last_timestamp = cur_last_timestamp;
std::cout << "Parse: " << obj.at("s") << " " << obj.at("p") << " " << obj.at("t") << " thread ID: " << boost::this_thread::get_id() << std::endl;
spsc_queue_publish_.push(obj);
} catch(std::exception const& ex)
{
std::cout << "Cannot parse datapoint: " << ex.what() << std::endl;
}
}
}
void
RedisSubscriber::
publish()
{
boost::json::object obj;
int count = 0;
std::vector<std::string> result;
result.push_back("TS.MADD");
while(true) {
get_message_with_backoff_(spsc_queue_publish_, obj);
std::cout << "Publish: " << obj.at("s") << " " << obj.at("p") << " " << obj.at("t") << " thread ID: " << boost::this_thread::get_id() << std::endl;
try {
result.push_back(boost::json::value_to<std::string>(obj.at("s")));
result.push_back(boost::lexical_cast<std::string>(boost::json::value_to<long>(obj.at("t"))));
result.push_back(boost::json::value_to<std::string>(obj.at("p")));
if (count >= REDIS_BATCH_SIZE) {
redis_publisher_.command(result.begin(), result.end());
count=0;
result.resize(1);
} else {
count++;
}
} catch(std::exception const& ex)
{
std::cout << "Cannot push datapoint: " << ex.what() << std::endl;
}
}
}
void
RedisSubscriber::
run() {
boost::asio::post(io_->get_executor(), beast::bind_front_handler(&RedisSubscriber::subscribe, shared_from_this()));
boost::asio::post(io_->get_executor(), beast::bind_front_handler(&RedisSubscriber::parse, shared_from_this()));
boost::asio::post(io_->get_executor(), beast::bind_front_handler(&RedisSubscriber::publish, shared_from_this()));
}
template <typename T1, typename T2>
void
RedisSubscriber::
get_message_with_backoff_(boost::lockfree::spsc_queue<T1, boost::lockfree::capacity<1024> > &queue, T2 &msg) {
int backoff = 0;
while(true) {
if (queue.pop(msg))
{
return;
} else {
sleep(1 << backoff++);
if (backoff > EXPONENTIAL_BACKOFF_LIMIT) {
backoff = 0;
}
}
}
}

Source Code - Main method, initializes the threadpool and calls Redis Subscriber

/**
* Sravz LLC
**/
#include <util.hpp>
#include <redis_subscriber.hpp>
int main(int argc, char** argv)
{
if(argc != 2)
{
std::cerr <<
"Usage: sravz_redis_async <topic>\n" <<
"Example:\n" <<
" sravz_redis_async ETH-USD,BTC-USD\n";
return EXIT_FAILURE;
}
auto const topic = argv[1];
auto io = std::make_shared<boost::asio::thread_pool>(3);
// Singal handler to exit cleanly
boost::asio::signal_set signals(*io, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto){
io->stop();
std::cout << "Signal received, stopped the process.";
});
std::make_shared<RedisSubscriber>(topic, io)->run();
io->join();
}