Use Case
Boost Beast based WebSocket Server
Use this programs to perform websocket calls.
Session 1:
- Updated boost beast multi chat example.
- Parses the HTTP request and sends a response text to the client.
Libraries Used
- C++ Boost ASIO
- C++ Boost Beast
- hiredis
- C++ JWT
- C++ Boost URL
References
Session 1
Thread Model

Video explanation of the code
Source Code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) | |
// | |
// Distributed under the Boost Software License, Version 1.0. (See accompanying | |
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
// | |
// Official repository: https://github.com/vinniefalco/CppCon2018 | |
// | |
#ifndef BOOST_BEAST_EXAMPLE_WEBSOCKET_CHAT_MULTI_BEAST_HPP | |
#define BOOST_BEAST_EXAMPLE_WEBSOCKET_CHAT_MULTI_BEAST_HPP | |
#include <boost/beast.hpp> | |
namespace beast = boost::beast; // from <boost/beast.hpp> | |
namespace http = beast::http; // from <boost/beast/http.hpp> | |
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp> | |
#endif |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) | |
// | |
// Distributed under the Boost Software License, Version 1.0. (See accompanying | |
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
// | |
// Official repository: https://github.com/vinniefalco/CppCon2018 | |
// | |
#include "http_session.hpp" | |
#include "websocket_session.hpp" | |
#include <boost/config.hpp> | |
#include <iostream> | |
//------------------------------------------------------------------------------ | |
// Return a reasonable mime type based on the extension of a file. | |
beast::string_view | |
mime_type(beast::string_view path) | |
{ | |
using beast::iequals; | |
auto const ext = [&path] | |
{ | |
auto const pos = path.rfind("."); | |
if(pos == beast::string_view::npos) | |
return beast::string_view{}; | |
return path.substr(pos); | |
}(); | |
if(iequals(ext, ".htm")) return "text/html"; | |
if(iequals(ext, ".html")) return "text/html"; | |
if(iequals(ext, ".php")) return "text/html"; | |
if(iequals(ext, ".css")) return "text/css"; | |
if(iequals(ext, ".txt")) return "text/plain"; | |
if(iequals(ext, ".js")) return "application/javascript"; | |
if(iequals(ext, ".json")) return "application/json"; | |
if(iequals(ext, ".xml")) return "application/xml"; | |
if(iequals(ext, ".swf")) return "application/x-shockwave-flash"; | |
if(iequals(ext, ".flv")) return "video/x-flv"; | |
if(iequals(ext, ".png")) return "image/png"; | |
if(iequals(ext, ".jpe")) return "image/jpeg"; | |
if(iequals(ext, ".jpeg")) return "image/jpeg"; | |
if(iequals(ext, ".jpg")) return "image/jpeg"; | |
if(iequals(ext, ".gif")) return "image/gif"; | |
if(iequals(ext, ".bmp")) return "image/bmp"; | |
if(iequals(ext, ".ico")) return "image/vnd.microsoft.icon"; | |
if(iequals(ext, ".tiff")) return "image/tiff"; | |
if(iequals(ext, ".tif")) return "image/tiff"; | |
if(iequals(ext, ".svg")) return "image/svg+xml"; | |
if(iequals(ext, ".svgz")) return "image/svg+xml"; | |
return "application/text"; | |
} | |
// Append an HTTP rel-path to a local filesystem path. | |
// The returned path is normalized for the platform. | |
std::string | |
path_cat( | |
beast::string_view base, | |
beast::string_view path) | |
{ | |
if(base.empty()) | |
return std::string(path); | |
std::string result(base); | |
#ifdef BOOST_MSVC | |
char constexpr path_separator = '\\'; | |
if(result.back() == path_separator) | |
result.resize(result.size() - 1); | |
result.append(path.data(), path.size()); | |
for(auto& c : result) | |
if(c == '/') | |
c = path_separator; | |
#else | |
char constexpr path_separator = '/'; | |
if(result.back() == path_separator) | |
result.resize(result.size() - 1); | |
result.append(path.data(), path.size()); | |
#endif | |
return result; | |
} | |
// Return a response for the given request. | |
// | |
// The concrete type of the response message (which depends on the | |
// request), is type-erased in message_generator. | |
template <class Body, class Allocator> | |
http::message_generator | |
handle_request( | |
beast::string_view doc_root, | |
http::request<Body, http::basic_fields<Allocator>>&& req) | |
{ | |
// Returns a bad request response | |
auto const bad_request = | |
[&req](beast::string_view why) | |
{ | |
http::response<http::string_body> res{http::status::bad_request, req.version()}; | |
res.set(http::field::server, BOOST_BEAST_VERSION_STRING); | |
res.set(http::field::content_type, "text/html"); | |
res.keep_alive(req.keep_alive()); | |
res.body() = std::string(why); | |
res.prepare_payload(); | |
return res; | |
}; | |
// Returns a not found response | |
auto const not_found = | |
[&req](beast::string_view target) | |
{ | |
http::response<http::string_body> res{http::status::not_found, req.version()}; | |
res.set(http::field::server, BOOST_BEAST_VERSION_STRING); | |
res.set(http::field::content_type, "text/html"); | |
res.keep_alive(req.keep_alive()); | |
res.body() = "The resource '" + std::string(target) + "' was not found."; | |
res.prepare_payload(); | |
return res; | |
}; | |
// Returns a server error response | |
auto const server_error = | |
[&req](beast::string_view what) | |
{ | |
http::response<http::string_body> res{http::status::internal_server_error, req.version()}; | |
res.set(http::field::server, BOOST_BEAST_VERSION_STRING); | |
res.set(http::field::content_type, "text/html"); | |
res.keep_alive(req.keep_alive()); | |
res.body() = "An error occurred: '" + std::string(what) + "'"; | |
res.prepare_payload(); | |
return res; | |
}; | |
// Make sure we can handle the method | |
if( req.method() != http::verb::get && | |
req.method() != http::verb::head) | |
return bad_request("Unknown HTTP-method"); | |
// Request path must be absolute and not contain "..". | |
if( req.target().empty() || | |
req.target()[0] != '/' || | |
req.target().find("..") != beast::string_view::npos) | |
return bad_request("Illegal request-target"); | |
// Build the path to the requested file | |
std::string path = path_cat(doc_root, req.target()); | |
if(req.target().back() == '/') | |
path.append("index.html"); | |
// Attempt to open the file | |
beast::error_code ec; | |
http::file_body::value_type body; | |
body.open(path.c_str(), beast::file_mode::scan, ec); | |
// Handle the case where the file doesn't exist | |
if(ec == boost::system::errc::no_such_file_or_directory) | |
return not_found(req.target()); | |
// Handle an unknown error | |
if(ec) | |
return server_error(ec.message()); | |
// Cache the size since we need it after the move | |
auto const size = body.size(); | |
// Respond to HEAD request | |
if(req.method() == http::verb::head) | |
{ | |
http::response<http::empty_body> res{http::status::ok, req.version()}; | |
res.set(http::field::server, BOOST_BEAST_VERSION_STRING); | |
res.set(http::field::content_type, mime_type(path)); | |
res.content_length(size); | |
res.keep_alive(req.keep_alive()); | |
return res; | |
} | |
// Respond to GET request | |
http::response<http::file_body> res{ | |
std::piecewise_construct, | |
std::make_tuple(std::move(body)), | |
std::make_tuple(http::status::ok, req.version())}; | |
res.set(http::field::server, BOOST_BEAST_VERSION_STRING); | |
res.set(http::field::content_type, mime_type(path)); | |
res.content_length(size); | |
res.keep_alive(req.keep_alive()); | |
return res; | |
} | |
//------------------------------------------------------------------------------ | |
http_session:: | |
http_session( | |
tcp::socket&& socket, | |
boost::shared_ptr<shared_state> const& state) | |
: stream_(std::move(socket)) | |
, state_(state) | |
{ | |
} | |
void | |
http_session:: | |
run() | |
{ | |
do_read(); | |
} | |
// Report a failure | |
void | |
http_session:: | |
fail(beast::error_code ec, char const* what) | |
{ | |
// Don't report on canceled operations | |
if(ec == net::error::operation_aborted) | |
return; | |
std::cerr << what << ": " << ec.message() << "\n"; | |
} | |
void | |
http_session:: | |
do_read() | |
{ | |
// Construct a new parser for each message | |
parser_.emplace(); | |
// Apply a reasonable limit to the allowed size | |
// of the body in bytes to prevent abuse. | |
parser_->body_limit(10000); | |
// Set the timeout. | |
stream_.expires_after(std::chrono::seconds(30)); | |
// Read a request | |
http::async_read( | |
stream_, | |
buffer_, | |
parser_->get(), | |
beast::bind_front_handler( | |
&http_session::on_read, | |
shared_from_this())); | |
} | |
void | |
http_session:: | |
on_read(beast::error_code ec, std::size_t) | |
{ | |
// This means they closed the connection | |
if(ec == http::error::end_of_stream) | |
{ | |
stream_.socket().shutdown(tcp::socket::shutdown_send, ec); | |
return; | |
} | |
// Handle the error, if any | |
if(ec) | |
return fail(ec, "read"); | |
// See if it is a WebSocket Upgrade | |
if(websocket::is_upgrade(parser_->get())) | |
{ | |
// Create a websocket session, transferring ownership | |
// of both the socket and the HTTP request. | |
boost::make_shared<websocket_session>( | |
stream_.release_socket(), | |
state_)->run(parser_->release()); | |
return; | |
} | |
// Handle request | |
http::message_generator msg = | |
handle_request(state_->doc_root(), parser_->release()); | |
// Determine if we should close the connection | |
bool keep_alive = msg.keep_alive(); | |
auto self = shared_from_this(); | |
// Send the response | |
beast::async_write( | |
stream_, std::move(msg), | |
[self, keep_alive](beast::error_code ec, std::size_t bytes) | |
{ | |
self->on_write(ec, bytes, keep_alive); | |
}); | |
} | |
void | |
http_session:: | |
on_write(beast::error_code ec, std::size_t, bool keep_alive) | |
{ | |
// Handle the error, if any | |
if(ec) | |
return fail(ec, "write"); | |
if(! keep_alive) | |
{ | |
// This means we should close the connection, usually because | |
// the response indicated the "Connection: close" semantic. | |
stream_.socket().shutdown(tcp::socket::shutdown_send, ec); | |
return; | |
} | |
// Read another request | |
do_read(); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) | |
// | |
// Distributed under the Boost Software License, Version 1.0. (See accompanying | |
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
// | |
// Official repository: https://github.com/vinniefalco/CppCon2018 | |
// | |
#ifndef BOOST_BEAST_EXAMPLE_WEBSOCKET_CHAT_MULTI_HTTP_SESSION_HPP | |
#define BOOST_BEAST_EXAMPLE_WEBSOCKET_CHAT_MULTI_HTTP_SESSION_HPP | |
#include "net.hpp" | |
#include "beast.hpp" | |
#include "shared_state.hpp" | |
#include <boost/optional.hpp> | |
#include <boost/smart_ptr.hpp> | |
#include <cstdlib> | |
#include <memory> | |
/** Represents an established HTTP connection | |
*/ | |
class http_session : public boost::enable_shared_from_this<http_session> | |
{ | |
beast::tcp_stream stream_; | |
beast::flat_buffer buffer_; | |
boost::shared_ptr<shared_state> state_; | |
// The parser is stored in an optional container so we can | |
// construct it from scratch it at the beginning of each new message. | |
boost::optional<http::request_parser<http::string_body>> parser_; | |
struct send_lambda; | |
void fail(beast::error_code ec, char const* what); | |
void do_read(); | |
void on_read(beast::error_code ec, std::size_t); | |
void on_write(beast::error_code ec, std::size_t, bool close); | |
public: | |
http_session( | |
tcp::socket&& socket, | |
boost::shared_ptr<shared_state> const& state); | |
void run(); | |
}; | |
#endif |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) | |
// | |
// Distributed under the Boost Software License, Version 1.0. (See accompanying | |
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
// | |
// Official repository: https://github.com/vinniefalco/CppCon2018 | |
// | |
#include "listener.hpp" | |
#include "http_session.hpp" | |
#include <iostream> | |
listener:: | |
listener( | |
net::io_context& ioc, | |
tcp::endpoint endpoint, | |
boost::shared_ptr<shared_state> const& state) | |
: ioc_(ioc) | |
, acceptor_(ioc) | |
, state_(state) | |
{ | |
beast::error_code ec; | |
// Open the acceptor | |
acceptor_.open(endpoint.protocol(), ec); | |
if(ec) | |
{ | |
fail(ec, "open"); | |
return; | |
} | |
// Allow address reuse | |
acceptor_.set_option(net::socket_base::reuse_address(true), ec); | |
if(ec) | |
{ | |
fail(ec, "set_option"); | |
return; | |
} | |
// Bind to the server address | |
acceptor_.bind(endpoint, ec); | |
if(ec) | |
{ | |
fail(ec, "bind"); | |
return; | |
} | |
// Start listening for connections | |
acceptor_.listen( | |
net::socket_base::max_listen_connections, ec); | |
if(ec) | |
{ | |
fail(ec, "listen"); | |
return; | |
} | |
} | |
void | |
listener:: | |
run() | |
{ | |
// The new connection gets its own strand | |
acceptor_.async_accept( | |
net::make_strand(ioc_), | |
beast::bind_front_handler( | |
&listener::on_accept, | |
shared_from_this())); | |
// state_->run(); | |
} | |
// Report a failure | |
void | |
listener:: | |
fail(beast::error_code ec, char const* what) | |
{ | |
// Don't report on canceled operations | |
if(ec == net::error::operation_aborted) | |
return; | |
std::cerr << what << ": " << ec.message() << "\n"; | |
} | |
// Handle a connection | |
void | |
listener:: | |
on_accept(beast::error_code ec, tcp::socket socket) | |
{ | |
if(ec) | |
return fail(ec, "accept"); | |
else | |
// Launch a new session for this connection | |
boost::make_shared<http_session>( | |
std::move(socket), | |
state_)->run(); | |
// The new connection gets its own strand | |
acceptor_.async_accept( | |
net::make_strand(ioc_), | |
beast::bind_front_handler( | |
&listener::on_accept, | |
shared_from_this())); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) | |
// | |
// Distributed under the Boost Software License, Version 1.0. (See accompanying | |
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
// | |
// Official repository: https://github.com/vinniefalco/CppCon2018 | |
// | |
#ifndef BOOST_BEAST_EXAMPLE_WEBSOCKET_CHAT_MULTI_LISTENER_HPP | |
#define BOOST_BEAST_EXAMPLE_WEBSOCKET_CHAT_MULTI_LISTENER_HPP | |
#include "beast.hpp" | |
#include "net.hpp" | |
#include <boost/smart_ptr.hpp> | |
#include <memory> | |
#include <string> | |
// Forward declaration | |
class shared_state; | |
// Accepts incoming connections and launches the sessions | |
class listener : public boost::enable_shared_from_this<listener> | |
{ | |
net::io_context& ioc_; | |
tcp::acceptor acceptor_; | |
boost::shared_ptr<shared_state> state_; | |
void fail(beast::error_code ec, char const* what); | |
void on_accept(beast::error_code ec, tcp::socket socket); | |
public: | |
listener( | |
net::io_context& ioc, | |
tcp::endpoint endpoint, | |
boost::shared_ptr<shared_state> const& state); | |
// Start accepting incoming connections | |
void run(); | |
}; | |
#endif |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) | |
// | |
// Distributed under the Boost Software License, Version 1.0. (See accompanying | |
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
// | |
// Official repository: https://github.com/vinniefalco/CppCon2018 | |
// | |
//------------------------------------------------------------------------------ | |
/* | |
WebSocket chat server, multi-threaded | |
This implements a multi-user chat room using WebSocket. The | |
`io_context` runs on any number of threads, specified at | |
the command line. | |
*/ | |
//------------------------------------------------------------------------------ | |
#include "listener.hpp" | |
#include "shared_state.hpp" | |
#include "subscriber.hpp" | |
#include "publisher.hpp" | |
#include <boost/asio/signal_set.hpp> | |
#include <boost/smart_ptr.hpp> | |
#include <iostream> | |
#include <vector> | |
int | |
main(int argc, char* argv[]) | |
{ | |
// Check command line arguments. | |
if (argc != 6) | |
{ | |
std::cerr << | |
"Usage: websocket-chat-multi <address> <port> <doc_root> <threads> <topics>\n" << | |
"Example:\n" << | |
" websocket-chat-server 0.0.0.0 8080 . 5 ETH-USD,BTC-USD\n"; | |
return EXIT_FAILURE; | |
} | |
auto address = net::ip::make_address(argv[1]); | |
auto port = static_cast<unsigned short>(std::atoi(argv[2])); | |
auto doc_root = argv[3]; | |
auto const threads = std::max<int>(1, std::atoi(argv[4])); | |
auto topics_ = argv[5]; | |
// The io_context is required for all I/O | |
net::io_context ioc; | |
net::io_context ioc_subscriber; | |
net::io_context ioc_publisher; | |
// Create and launch a listening port | |
boost::shared_ptr<shared_state> shared_state_ = boost::make_shared<shared_state>(doc_root, topics_); | |
boost::shared_ptr<listener> listener_ = boost::make_shared<listener>(ioc, tcp::endpoint{address, port}, shared_state_); | |
boost::shared_ptr<subscriber> subscriber_ = boost::make_shared<subscriber>(ioc_subscriber, shared_state_); | |
boost::shared_ptr<publisher> publisher_ = boost::make_shared<publisher>(ioc_publisher, shared_state_); | |
// Start the sub-process | |
listener_->run(); | |
subscriber_->subscribe(); | |
publisher_->publish(); | |
// Capture SIGINT and SIGTERM to perform a clean shutdown | |
net::signal_set signals(ioc, SIGINT, SIGTERM); | |
signals.async_wait( | |
[&ioc, &ioc_subscriber, &ioc_publisher](boost::system::error_code const&, int) | |
{ | |
// Stop the io_context. This will cause run() | |
// to return immediately, eventually destroying the | |
// io_context and any remaining handlers in it. | |
ioc.stop(); | |
ioc_subscriber.stop(); | |
ioc_publisher.stop(); | |
}); | |
// Run the I/O service on the requested number of threads | |
std::vector<std::thread> v; | |
v.reserve(threads); | |
for(auto i = threads - 1; i > 0; --i) | |
v.emplace_back( | |
[&ioc] | |
{ | |
std::cout << "Starting ioc" << std::endl; | |
ioc.run(); | |
}); | |
// Run redis subscriber io_context | |
v.emplace_back( | |
[&ioc_subscriber] | |
{ | |
std::cout << "Starting ioc_subscriber" << std::endl; | |
ioc_subscriber.run(); | |
}); | |
// Run websocket publisher io_context | |
v.emplace_back( | |
[&ioc_publisher] | |
{ | |
std::cout << "Starting ioc_publisher" << std::endl; | |
ioc_publisher.run(); | |
}); | |
// Block until all the threads exit | |
for(auto& t : v) | |
t.join(); | |
return EXIT_SUCCESS; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) | |
// | |
// Distributed under the Boost Software License, Version 1.0. (See accompanying | |
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
// | |
// Official repository: https://github.com/vinniefalco/CppCon2018 | |
// | |
#ifndef BOOST_BEAST_EXAMPLE_WEBSOCKET_CHAT_MULTI_NET_HPP | |
#define BOOST_BEAST_EXAMPLE_WEBSOCKET_CHAT_MULTI_NET_HPP | |
#include <boost/asio.hpp> | |
namespace net = boost::asio; // from <boost/asio.hpp> | |
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp> | |
#endif |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// SRAVZ LLC | |
// | |
#include "publisher.hpp" | |
publisher:: | |
publisher(net::io_context& ioc_publisher, boost::shared_ptr<shared_state> const& state) | |
: ioc_publisher_(ioc_publisher) | |
,state_(state) | |
{ | |
} | |
void | |
publisher:: | |
publish() | |
{ | |
boost::asio::post(net::make_strand(ioc_publisher_), | |
[=]() | |
{ | |
std::string message; | |
while(true) { | |
if (state_->spsc_queue_.pop(message)) { | |
std::cout << "Message on the queue: " << message << std::endl; | |
// Put the message in a shared pointer so we can re-use it for each client | |
auto const ss = boost::make_shared<std::string const>(std::move(message)); | |
// Make a local list of all the weak pointers representing | |
// the sessions, so we can do the actual sending without | |
// holding the mutex: | |
std::vector<boost::weak_ptr<websocket_session>> v; | |
{ | |
std::cout << "Before mutex lock" << std::endl; | |
std::lock_guard<std::mutex> lock(state_->mutex_); | |
std::cout << "After mutex lock - Size:" << state_->sessions_.size() << std::endl; | |
v.reserve(state_->sessions_.size()); | |
for(auto p : state_->sessions_) | |
{ | |
std::cout << "Weak pointer emplace back" << std::endl; | |
v.emplace_back(p->weak_from_this()); | |
} | |
} | |
// For each session in our local list, try to acquire a strong | |
// pointer. If successful, then send the message on that session. | |
for(auto const& wp : v) { | |
std::cout << "Looping through weak pointers" << std::endl; | |
if(auto sp = wp.lock()) { | |
std::cout << "Weak pointer locked" << std::endl; | |
sp->send(ss); | |
} | |
} | |
} | |
} | |
}); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// SRAVZ LLC | |
// | |
#ifndef SRAVZ_WEB_PUBLISHER_HPP | |
#define SRAVZ_WEB_PUBLISHER_HPP | |
#include <util.hpp> | |
#include "shared_state.hpp" | |
#include "websocket_session.hpp" | |
// Forward declaration | |
class symbol; | |
class publisher : public boost::enable_shared_from_this<publisher> | |
{ | |
// RedisInstance redis_consumer_; | |
net::io_context& ioc_publisher_; | |
boost::shared_ptr<shared_state> state_; | |
public: | |
explicit | |
publisher(net::io_context& ioc_publisher, boost::shared_ptr<shared_state> const& state); | |
void publish (); | |
}; | |
#endif |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// SRAVZ LLC | |
// | |
#include "subscriber.hpp" | |
#include "symbol.hpp" | |
subscriber:: | |
subscriber(net::io_context& ioc_subscriber, boost::shared_ptr<shared_state> const& state) | |
: ioc_subscriber_(ioc_subscriber) | |
,redis_ptr_(boost::make_shared<RedisInstance>(RedisInstance(getRedisConnectionOptions()))) | |
,state_(state) | |
{ | |
} | |
void | |
subscriber:: | |
subscribe() | |
{ | |
boost::asio::post(net::make_strand(ioc_subscriber_), | |
[=]() | |
{ | |
if (true) { | |
// Create a Subscriber. | |
std::cout << "Before calling subscriber" << std::endl; | |
auto sub = redis_ptr_->subscriber(); | |
for(auto symbol_ : state_->symbols_) | |
{ | |
std::cout << "Subscribing to symbol: " << symbol_->code << std::endl; | |
sub.subscribe(symbol_->code); | |
} | |
// Set callback functions. | |
sub.on_message([=](std::string channel, std::string msg) { | |
try { | |
// if (sessions_.size()) { | |
// send(msg); | |
// std::cout << msg << std::endl; | |
state_->spsc_queue_.push(msg); | |
// } | |
} catch (const sw::redis::Error &err) { | |
std::cout << "could not send message" << err.what() << std::endl; | |
} | |
}); | |
// Consume messages in a loop. | |
std::cout << "Before while loop"; | |
while (true) { | |
try { | |
// if (sessions_.size()) { | |
sub.consume(); | |
// } | |
} catch (const sw::redis::Error &err) { | |
std::cout << "Cannot read from redis" << err.what() << std::endl; | |
} | |
} | |
} else { | |
} | |
}); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// SRAVZ LLC | |
// | |
#ifndef SRAVZ_WEB_SUBSCRIBER_HPP | |
#define SRAVZ_WEB_SUBSCRIBER_HPP | |
#include <util.hpp> | |
#include "shared_state.hpp" | |
// Represents the shared server state | |
class subscriber : public boost::enable_shared_from_this<subscriber> | |
{ | |
// RedisInstance redis_consumer_; | |
net::io_context& ioc_subscriber_; | |
boost::shared_ptr<shared_state> state_; | |
// Single redis subscriber TODO: Make it multiple | |
boost::shared_ptr<RedisInstance> redis_ptr_; | |
public: | |
explicit | |
subscriber(net::io_context& ioc_subscriber, boost::shared_ptr<shared_state> const& state); | |
void subscribe (); | |
}; | |
#endif |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Sravz LLC | |
// | |
#include "symbol.hpp" | |
symbol:: | |
symbol(std::string code, std::string quote, std::chrono::milliseconds::rep time) | |
: code(std::move(code)) | |
,quote(quote) | |
,time(time), | |
refcount(0) | |
{ | |
} | |
void | |
symbol:: | |
join(websocket_session* session) | |
{ | |
std::lock_guard<std::mutex> lock(mutex_); | |
std::cout << "Joining symbol session: " << code << std::endl; | |
sessions_.insert(session); | |
// boost::asio::post(ioc_.get_executor(), beast::bind_front_handler(&shared_state::publish, shared_from_this())); | |
} | |
void | |
symbol:: | |
leave(websocket_session* session) | |
{ | |
std::lock_guard<std::mutex> lock(mutex_); | |
std::cout << "Leaving symbol session: " << code << std::endl; | |
sessions_.erase(session); | |
} | |
// Broadcast a message to all websocket client sessions | |
void | |
symbol:: | |
send(std::string message) | |
{ | |
// Put the message in a shared pointer so we can re-use it for each client | |
auto const ss = boost::make_shared<std::string const>(std::move(message)); | |
// Make a local list of all the weak pointers representing | |
// the sessions, so we can do the actual sending without | |
// holding the mutex: | |
std::vector<boost::weak_ptr<websocket_session>> v; | |
{ | |
std::lock_guard<std::mutex> lock(mutex_); | |
v.reserve(sessions_.size()); | |
for(auto p : sessions_) | |
v.emplace_back(p->weak_from_this()); | |
} | |
// For each session in our local list, try to acquire a strong | |
// pointer. If successful, then send the message on that session. | |
for(auto const& wp : v) | |
if(auto sp = wp.lock()) | |
sp->send(ss); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) | |
// | |
// Distributed under the Boost Software License, Version 1.0. (See accompanying | |
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
// | |
// Official repository: https://github.com/vinniefalco/CppCon2018 | |
// | |
#ifndef SRAVZ_SYMBOL_HPP | |
#define SRAVZ_SYMBOL_HPP | |
#include <util.hpp> | |
#include "websocket_session.hpp" | |
// Represents a symbol state | |
class symbol : public boost::enable_shared_from_this<symbol> | |
{ | |
public: | |
std::string code; | |
std::string quote; | |
std::chrono::milliseconds::rep time; | |
mutable boost::atomic<int> refcount; | |
std::unordered_set<websocket_session*> sessions_; | |
std::mutex mutex_; | |
explicit | |
symbol(std::string code, std::string quote, std::chrono::milliseconds::rep time); | |
void join (websocket_session* session); | |
void leave (websocket_session* session); | |
void send (std::string message); | |
}; | |
#endif |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef SRAVZ_BACKENDCPP_UTIL_H | |
#define SRAVZ_BACKENDCPP_UTIL_H | |
// Includes | |
#include "root_certificates.hpp" | |
#include <boost/algorithm/string.hpp> | |
#include <boost/asio.hpp> | |
#include <boost/asio/co_spawn.hpp> | |
#include <boost/asio/detached.hpp> | |
#include <boost/asio/io_context.hpp> | |
#include <boost/asio/io_service.hpp> | |
#include <boost/asio/ip/tcp.hpp> | |
#include <boost/asio/signal_set.hpp> | |
#include <boost/asio/spawn.hpp> | |
#include <boost/asio/strand.hpp> | |
#include <boost/asio/write.hpp> | |
#include <boost/beast/core.hpp> | |
#include <boost/beast/ssl.hpp> | |
#include <boost/beast/websocket.hpp> | |
#include <boost/beast/websocket/ssl.hpp> | |
#include <boost/coroutine2/all.hpp> | |
#include <boost/foreach.hpp> | |
#include <boost/json.hpp> | |
#include <boost/lexical_cast.hpp> | |
#include <boost/lockfree/spsc_queue.hpp> | |
#include <boost/log/trivial.hpp> | |
#include <boost/smart_ptr.hpp> | |
#include <boost/thread.hpp> | |
#include <chrono> | |
#include <cstddef> | |
#include <cstdio> | |
#include <cstdlib> | |
#include <functional> | |
#include <iostream> | |
#include <memory> | |
#include <mutex> | |
#include <redis-plus-plus/src/sw/redis++/errors.h> | |
#include <redis-plus-plus/test/src/sw/redis++/utils.h> | |
#include <string> | |
#include <thread> | |
#include <unordered_map> | |
#include <unordered_set> | |
/* Using namespaces */ | |
// TODO: Move this to the cpp files where the namespace is used. | |
using RedisInstance = sw::redis::Redis; | |
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp> | |
// using namespace std::chrono; | |
// Namespaces | |
namespace beast = boost::beast; // from <boost/beast.hpp> | |
namespace http = beast::http; // from <boost/beast/http.hpp> | |
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp> | |
namespace net = boost::asio; // from <boost/asio.hpp> | |
namespace ssl = boost::asio::ssl; // from <boost/asio/ssl.hpp> | |
namespace json = boost::json; | |
typedef boost::asio::io_context::executor_type executor_type; | |
typedef boost::asio::strand<executor_type> strand; | |
// Classes | |
class ThreadInfo | |
{ | |
public: | |
beast::flat_buffer buffer; | |
std::shared_ptr<RedisInstance> redis; | |
ThreadInfo() {} | |
ThreadInfo(beast::flat_buffer buffer_, std::shared_ptr<RedisInstance> redis_) : buffer(buffer_), redis(redis_) {} | |
}; | |
typedef std::map<boost::thread::id, ThreadInfo> ThreadsInfo; | |
// Functions | |
// Gets env variable | |
bool getenv(const char *name, std::string &env); | |
// Returns redis connection options to be used in redis connection creation | |
sw::redis::ConnectionOptions getRedisConnectionOptions(); | |
#endif // end SRAVZ_BACKENDCPP_UTIL_H |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) | |
// | |
// Distributed under the Boost Software License, Version 1.0. (See accompanying | |
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
// | |
// Official repository: https://github.com/vinniefalco/CppCon2018 | |
// | |
#include "websocket_session.hpp" | |
#include <iostream> | |
websocket_session:: | |
websocket_session( | |
tcp::socket&& socket, | |
boost::shared_ptr<shared_state> const& state) | |
: ws_(std::move(socket)) | |
, state_(state) | |
{ | |
} | |
websocket_session:: | |
~websocket_session() | |
{ | |
// Remove this session from the list of active sessions | |
state_->leave(this); | |
} | |
void | |
websocket_session:: | |
fail(beast::error_code ec, char const* what) | |
{ | |
// Don't report these | |
if( ec == net::error::operation_aborted || | |
ec == websocket::error::closed) | |
return; | |
std::cerr << what << ": " << ec.message() << "\n"; | |
} | |
void | |
websocket_session:: | |
on_accept(beast::error_code ec) | |
{ | |
// Handle the error, if any | |
if(ec) | |
return fail(ec, "accept"); | |
// Add this session to the list of active sessions | |
state_->join(this); | |
// Read a message | |
// Do not echo back the message | |
// Parse the message and set the subscription | |
// TODO: Read subscribe message an send specific stock quotes | |
ws_.async_read( | |
buffer_, | |
beast::bind_front_handler( | |
&websocket_session::on_read, | |
shared_from_this())); | |
} | |
void | |
websocket_session:: | |
on_read(beast::error_code ec, std::size_t) | |
{ | |
// Handle the error, if any | |
if(ec) | |
return fail(ec, "read"); | |
// Send to all connections | |
// state_->send(beast::buffers_to_string(buffer_.data())); | |
// Implement parser to parse message and subscribe to symbol | |
// Clear the buffer | |
buffer_.consume(buffer_.size()); | |
// Read another message | |
ws_.async_read( | |
buffer_, | |
beast::bind_front_handler( | |
&websocket_session::on_read, | |
shared_from_this())); | |
} | |
void | |
websocket_session:: | |
send(boost::shared_ptr<std::string const> const& ss) | |
{ | |
// Post our work to the strand, this ensures | |
// that the members of `this` will not be | |
// accessed concurrently. | |
net::post( | |
ws_.get_executor(), | |
beast::bind_front_handler( | |
&websocket_session::on_send, | |
shared_from_this(), | |
ss)); | |
} | |
void | |
websocket_session:: | |
on_send(boost::shared_ptr<std::string const> const& ss) | |
{ | |
// Always add to queue | |
queue_.push_back(ss); | |
// Are we already writing? | |
if(queue_.size() > 1) | |
return; | |
// We are not currently writing, so send this immediately | |
ws_.async_write( | |
net::buffer(*queue_.front()), | |
beast::bind_front_handler( | |
&websocket_session::on_write, | |
shared_from_this())); | |
} | |
void | |
websocket_session:: | |
on_write(beast::error_code ec, std::size_t) | |
{ | |
// Handle the error, if any | |
if(ec) | |
return fail(ec, "write"); | |
// Remove the string from the queue | |
queue_.erase(queue_.begin()); | |
// Send the next message if any | |
if(! queue_.empty()) | |
ws_.async_write( | |
net::buffer(*queue_.front()), | |
beast::bind_front_handler( | |
&websocket_session::on_write, | |
shared_from_this())); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) | |
// | |
// Distributed under the Boost Software License, Version 1.0. (See accompanying | |
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
// | |
// Official repository: https://github.com/vinniefalco/CppCon2018 | |
// | |
#ifndef BOOST_BEAST_EXAMPLE_WEBSOCKET_CHAT_MULTI_WEBSOCKET_SESSION_HPP | |
#define BOOST_BEAST_EXAMPLE_WEBSOCKET_CHAT_MULTI_WEBSOCKET_SESSION_HPP | |
#include "net.hpp" | |
#include "beast.hpp" | |
#include "shared_state.hpp" | |
#include <cstdlib> | |
#include <memory> | |
#include <string> | |
#include <vector> | |
// Forward declaration | |
class shared_state; | |
/** Represents an active WebSocket connection to the server | |
*/ | |
class websocket_session : public boost::enable_shared_from_this<websocket_session> | |
{ | |
beast::flat_buffer buffer_; | |
websocket::stream<beast::tcp_stream> ws_; | |
boost::shared_ptr<shared_state> state_; | |
std::vector<boost::shared_ptr<std::string const>> queue_; | |
void fail(beast::error_code ec, char const* what); | |
void on_accept(beast::error_code ec); | |
void on_read(beast::error_code ec, std::size_t bytes_transferred); | |
void on_write(beast::error_code ec, std::size_t bytes_transferred); | |
public: | |
websocket_session( | |
tcp::socket&& socket, | |
boost::shared_ptr<shared_state> const& state); | |
~websocket_session(); | |
template<class Body, class Allocator> | |
void | |
run(http::request<Body, http::basic_fields<Allocator>> req); | |
// Send a message | |
void | |
send(boost::shared_ptr<std::string const> const& ss); | |
private: | |
void | |
on_send(boost::shared_ptr<std::string const> const& ss); | |
}; | |
template<class Body, class Allocator> | |
void | |
websocket_session:: | |
run(http::request<Body, http::basic_fields<Allocator>> req) | |
{ | |
// Set suggested timeout settings for the websocket | |
ws_.set_option( | |
websocket::stream_base::timeout::suggested( | |
beast::role_type::server)); | |
// Set a decorator to change the Server of the handshake | |
ws_.set_option(websocket::stream_base::decorator( | |
[](websocket::response_type& res) | |
{ | |
res.set(http::field::server, | |
std::string(BOOST_BEAST_VERSION_STRING) + | |
" websocket-chat-multi"); | |
})); | |
// Accept the websocket handshake | |
ws_.async_accept( | |
req, | |
beast::bind_front_handler( | |
&websocket_session::on_accept, | |
shared_from_this())); | |
} | |
#endif |
Session 2 & 3
Video explanation of the code
Source Code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) | |
// | |
// Distributed under the Boost Software License, Version 1.0. (See accompanying | |
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
// | |
// Official repository: https://github.com/vinniefalco/CppCon2018 | |
// | |
#include "http_session.hpp" | |
#include "websocket_session.hpp" | |
#include <boost/config.hpp> | |
#include <iostream> | |
#include <util.hpp> | |
//------------------------------------------------------------------------------ | |
// Return a reasonable mime type based on the extension of a file. | |
beast::string_view | |
mime_type(beast::string_view path) | |
{ | |
using beast::iequals; | |
auto const ext = [&path] | |
{ | |
auto const pos = path.rfind("."); | |
if(pos == beast::string_view::npos) | |
return beast::string_view{}; | |
return path.substr(pos); | |
}(); | |
if(iequals(ext, ".htm")) return "text/html"; | |
if(iequals(ext, ".html")) return "text/html"; | |
if(iequals(ext, ".php")) return "text/html"; | |
if(iequals(ext, ".css")) return "text/css"; | |
if(iequals(ext, ".txt")) return "text/plain"; | |
if(iequals(ext, ".js")) return "application/javascript"; | |
if(iequals(ext, ".json")) return "application/json"; | |
if(iequals(ext, ".xml")) return "application/xml"; | |
if(iequals(ext, ".swf")) return "application/x-shockwave-flash"; | |
if(iequals(ext, ".flv")) return "video/x-flv"; | |
if(iequals(ext, ".png")) return "image/png"; | |
if(iequals(ext, ".jpe")) return "image/jpeg"; | |
if(iequals(ext, ".jpeg")) return "image/jpeg"; | |
if(iequals(ext, ".jpg")) return "image/jpeg"; | |
if(iequals(ext, ".gif")) return "image/gif"; | |
if(iequals(ext, ".bmp")) return "image/bmp"; | |
if(iequals(ext, ".ico")) return "image/vnd.microsoft.icon"; | |
if(iequals(ext, ".tiff")) return "image/tiff"; | |
if(iequals(ext, ".tif")) return "image/tiff"; | |
if(iequals(ext, ".svg")) return "image/svg+xml"; | |
if(iequals(ext, ".svgz")) return "image/svg+xml"; | |
return "application/text"; | |
} | |
// Append an HTTP rel-path to a local filesystem path. | |
// The returned path is normalized for the platform. | |
std::string | |
path_cat( | |
beast::string_view base, | |
beast::string_view path) | |
{ | |
if(base.empty()) | |
return std::string(path); | |
std::string result(base); | |
#ifdef BOOST_MSVC | |
char constexpr path_separator = '\\'; | |
if(result.back() == path_separator) | |
result.resize(result.size() - 1); | |
result.append(path.data(), path.size()); | |
for(auto& c : result) | |
if(c == '/') | |
c = path_separator; | |
#else | |
char constexpr path_separator = '/'; | |
if(result.back() == path_separator) | |
result.resize(result.size() - 1); | |
result.append(path.data(), path.size()); | |
#endif | |
return result; | |
} | |
template <class Body, class Allocator> | |
boost::optional<http::message_generator> | |
validate_jwt_token(http::request<Body, http::basic_fields<Allocator>>& req) | |
{ | |
// Returns a bad request response | |
auto const unauthorized = | |
[&req](beast::string_view why) | |
{ | |
http::response<http::string_body> res{http::status::bad_request, req.version()}; | |
res.set(http::field::server, BOOST_BEAST_VERSION_STRING); | |
res.set(http::field::content_type, "text/html"); | |
res.keep_alive(req.keep_alive()); | |
res.body() = std::string(why); | |
res.prepare_payload(); | |
return res; | |
}; | |
boost::urls::url_view uv(req.base().target()); | |
for( auto v : uv.params() ) { | |
if (v.key == "api_token") { | |
try { | |
auto dec_obj = jwt::decode(v.value, jwt::params::algorithms({"HS256"}), jwt::params::secret("secret")); | |
// std::cout << "User Authorized " << dec_obj.payload() << std::endl; | |
return boost::none; | |
} catch (const jwt::TokenExpiredError& e) { | |
return boost::make_optional<http::message_generator>(unauthorized("Unauthorized - api_token expired")); | |
} catch (const jwt::SignatureFormatError& e) { | |
return boost::make_optional<http::message_generator>(unauthorized("Unauthorized - api_token signature format error")); | |
} catch (const jwt::DecodeError& e) { | |
return boost::make_optional<http::message_generator>(unauthorized("Unauthorized - api_token decode error")); | |
} catch (const jwt::VerificationError& e) { | |
return boost::make_optional<http::message_generator>(unauthorized("Unauthorized - api_token verification error")); | |
} catch (...) { | |
return boost::make_optional<http::message_generator>(unauthorized("Unauthorized - api_token unknown error")); | |
} | |
} | |
} | |
return boost::make_optional<http::message_generator>(unauthorized("Unauthorized - api_token missing")); | |
//TODO: Implement HTTP header authorization | |
// std::string authorizationHeader; | |
// std::cout << "\nDebug: ALL HTTP Request Header Values:" << "\n\n"; | |
// for (auto& h : parser_->get().base()) | |
// { | |
// auto headerValue = std::string(h.value()); | |
// std::cout << "Header: " << h.name() << ", Header Name: " | |
// << h.name_string() << ", Value: " << headerValue << "\n\n"; | |
// if (h.name() == http::field::authorization) | |
// { | |
// authorizationHeader = headerValue; | |
// std::cout << "\nDebug: Detected HTTP Request Authorization Header." << "\n\n"; | |
// } | |
// } | |
} | |
// Return a response for the given request. | |
// | |
// The concrete type of the response message (which depends on the | |
// request), is type-erased in message_generator. | |
template <class Body, class Allocator> | |
http::message_generator | |
handle_request( | |
beast::string_view doc_root, | |
http::request<Body, http::basic_fields<Allocator>>&& req) | |
{ | |
// Returns a bad request response | |
auto const bad_request = | |
[&req](beast::string_view why) | |
{ | |
http::response<http::string_body> res{http::status::bad_request, req.version()}; | |
res.set(http::field::server, BOOST_BEAST_VERSION_STRING); | |
res.set(http::field::content_type, "text/html"); | |
res.keep_alive(req.keep_alive()); | |
res.body() = std::string(why); | |
res.prepare_payload(); | |
return res; | |
}; | |
// Returns a not found response | |
auto const not_found = | |
[&req](beast::string_view target) | |
{ | |
http::response<http::string_body> res{http::status::not_found, req.version()}; | |
res.set(http::field::server, BOOST_BEAST_VERSION_STRING); | |
res.set(http::field::content_type, "text/html"); | |
res.keep_alive(req.keep_alive()); | |
res.body() = "The resource '" + std::string(target) + "' was not found."; | |
res.prepare_payload(); | |
return res; | |
}; | |
// Returns a server error response | |
auto const server_error = | |
[&req](beast::string_view what) | |
{ | |
http::response<http::string_body> res{http::status::internal_server_error, req.version()}; | |
res.set(http::field::server, BOOST_BEAST_VERSION_STRING); | |
res.set(http::field::content_type, "text/html"); | |
res.keep_alive(req.keep_alive()); | |
res.body() = "An error occurred: '" + std::string(what) + "'"; | |
res.prepare_payload(); | |
return res; | |
}; | |
// Make sure we can handle the method | |
if( req.method() != http::verb::get && | |
req.method() != http::verb::head) | |
return bad_request("Unknown HTTP-method"); | |
// Request path must be absolute and not contain "..". | |
if( req.target().empty() || | |
req.target()[0] != '/' || | |
req.target().find("..") != beast::string_view::npos) | |
return bad_request("Illegal request-target"); | |
// Build the path to the requested file | |
std::string path = path_cat(doc_root, req.target()); | |
if(req.target().back() == '/') | |
path.append("index.html"); | |
// Attempt to open the file | |
beast::error_code ec; | |
http::file_body::value_type body; | |
body.open(path.c_str(), beast::file_mode::scan, ec); | |
// Handle the case where the file doesn't exist | |
if(ec == boost::system::errc::no_such_file_or_directory) | |
return not_found(req.target()); | |
// Handle an unknown error | |
if(ec) | |
return server_error(ec.message()); | |
// Cache the size since we need it after the move | |
auto const size = body.size(); | |
// Respond to HEAD request | |
if(req.method() == http::verb::head) | |
{ | |
http::response<http::empty_body> res{http::status::ok, req.version()}; | |
res.set(http::field::server, BOOST_BEAST_VERSION_STRING); | |
res.set(http::field::content_type, mime_type(path)); | |
res.content_length(size); | |
res.keep_alive(req.keep_alive()); | |
return res; | |
} | |
// Respond to GET request | |
http::response<http::file_body> res{ | |
std::piecewise_construct, | |
std::make_tuple(std::move(body)), | |
std::make_tuple(http::status::ok, req.version())}; | |
res.set(http::field::server, BOOST_BEAST_VERSION_STRING); | |
res.set(http::field::content_type, mime_type(path)); | |
res.content_length(size); | |
res.keep_alive(req.keep_alive()); | |
return res; | |
} | |
//------------------------------------------------------------------------------ | |
http_session:: | |
http_session( | |
tcp::socket&& socket, | |
boost::shared_ptr<shared_state> const& state) | |
: stream_(std::move(socket)) | |
, state_(state) | |
{ | |
} | |
void | |
http_session:: | |
run() | |
{ | |
do_read(); | |
} | |
// Report a failure | |
void | |
http_session:: | |
fail(beast::error_code ec, char const* what) | |
{ | |
// Don't report on canceled operations | |
if(ec == net::error::operation_aborted) | |
return; | |
std::cerr << what << ": " << ec.message() << "\n"; | |
} | |
void | |
http_session:: | |
do_read() | |
{ | |
// Construct a new parser for each message | |
parser_.emplace(); | |
// Apply a reasonable limit to the allowed size | |
// of the body in bytes to prevent abuse. | |
parser_->body_limit(10000); | |
// Set the timeout. | |
stream_.expires_after(std::chrono::seconds(30)); | |
// Read a request | |
http::async_read( | |
stream_, | |
buffer_, | |
parser_->get(), | |
beast::bind_front_handler( | |
&http_session::on_read, | |
shared_from_this())); | |
} | |
void | |
http_session:: | |
on_read(beast::error_code ec, std::size_t) | |
{ | |
// This means they closed the connection | |
if(ec == http::error::end_of_stream) | |
{ | |
stream_.socket().shutdown(tcp::socket::shutdown_send, ec); | |
return; | |
} | |
// Handle the error, if any | |
if(ec) | |
return fail(ec, "read"); | |
bool keep_alive; | |
auto self = shared_from_this(); | |
//boost::optional<http::message_generator> unauthorized_res = validate_jwt_token(parser_->release()); | |
if(boost::optional<http::message_generator> unauthorized_res = validate_jwt_token(parser_->get())) { | |
// TODO: Check why boost::none still runs below code | |
std::cout << "Sending unauthorized response\n"; | |
keep_alive = unauthorized_res->keep_alive(); | |
// Send the response | |
beast::async_write( | |
stream_, std::move(*unauthorized_res), | |
[self, keep_alive](beast::error_code ec, std::size_t bytes) | |
{ | |
self->on_write(ec, bytes, keep_alive); | |
}); | |
return; | |
} else { | |
std::cout << "User Authorized\n"; | |
} | |
// See if it is a WebSocket Upgrade | |
if(websocket::is_upgrade(parser_->get())) | |
{ | |
// Create a websocket session, transferring ownership | |
// of both the socket and the HTTP request. | |
boost::make_shared<websocket_session>( | |
stream_.release_socket(), | |
state_)->run(parser_->release()); | |
return; | |
} | |
// Handle request | |
http::message_generator msg = | |
handle_request(state_->doc_root(), parser_->release()); | |
// Determine if we should close the connection | |
keep_alive = msg.keep_alive(); | |
// Send the response | |
beast::async_write( | |
stream_, std::move(msg), | |
[self, keep_alive](beast::error_code ec, std::size_t bytes) | |
{ | |
self->on_write(ec, bytes, keep_alive); | |
}); | |
} | |
void | |
http_session:: | |
on_write(beast::error_code ec, std::size_t, bool keep_alive) | |
{ | |
// Handle the error, if any | |
if(ec) | |
return fail(ec, "write"); | |
if(! keep_alive) | |
{ | |
// This means we should close the connection, usually because | |
// the response indicated the "Connection: close" semantic. | |
stream_.socket().shutdown(tcp::socket::shutdown_send, ec); | |
return; | |
} | |
// Read another request | |
do_read(); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) | |
// | |
// Distributed under the Boost Software License, Version 1.0. (See accompanying | |
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
// | |
// Official repository: https://github.com/vinniefalco/CppCon2018 | |
// | |
#ifndef BOOST_BEAST_EXAMPLE_WEBSOCKET_CHAT_MULTI_HTTP_SESSION_HPP | |
#define BOOST_BEAST_EXAMPLE_WEBSOCKET_CHAT_MULTI_HTTP_SESSION_HPP | |
#include "net.hpp" | |
#include "beast.hpp" | |
#include "shared_state.hpp" | |
#include <boost/optional.hpp> | |
#include <boost/smart_ptr.hpp> | |
#include <cstdlib> | |
#include <memory> | |
#include <jwt/jwt.hpp> | |
#include <boost/optional.hpp> | |
/** Represents an established HTTP connection | |
*/ | |
class http_session : public boost::enable_shared_from_this<http_session> | |
{ | |
beast::tcp_stream stream_; | |
beast::flat_buffer buffer_; | |
boost::shared_ptr<shared_state> state_; | |
// The parser is stored in an optional container so we can | |
// construct it from scratch it at the beginning of each new message. | |
boost::optional<http::request_parser<http::string_body>> parser_; | |
struct send_lambda; | |
void fail(beast::error_code ec, char const* what); | |
void do_read(); | |
void on_read(beast::error_code ec, std::size_t); | |
void on_write(beast::error_code ec, std::size_t, bool close); | |
public: | |
http_session( | |
tcp::socket&& socket, | |
boost::shared_ptr<shared_state> const& state); | |
void run(); | |
}; | |
#endif |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// SRAVZ LLC | |
// | |
#include "parser.hpp" | |
parser:: | |
parser() | |
{ | |
opt_.allow_comments = true; | |
opt_.allow_trailing_commas = true; | |
} | |
boost::json::value | |
parser:: | |
parse(const std::string& msg) | |
{ | |
boost::json::value val = boost::json::parse(msg, {}, opt_); | |
return val.as_object(); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// SRAVZ LLC | |
// | |
#ifndef SRAVZ_WEB_PARSER_HPP | |
#define SRAVZ_WEB_PARSER_HPP | |
#include <util.hpp> | |
class parser : public boost::enable_shared_from_this<parser> | |
{ | |
boost::json::parse_options opt_ {}; | |
public: | |
parser(); | |
boost::json::value parse (const std::string& s); | |
enum class MsgType { | |
SUBSCRIBE = 1, | |
UNSUBSCRIBE = 2 | |
}; | |
}; | |
#endif |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// SRAVZ LLC | |
// | |
#include "publisher.hpp" | |
publisher:: | |
publisher(net::io_context& ioc_publisher, boost::shared_ptr<shared_state> const& state) | |
: ioc_publisher_(ioc_publisher) | |
,state_(state) | |
{ | |
} | |
void | |
publisher:: | |
publish() | |
{ | |
boost::asio::post(net::make_strand(ioc_publisher_), | |
[=]() | |
{ | |
std::pair<std::string, std::string> msg_pair; | |
while(true) { | |
sravz::get_message_with_backoff(state_->spsc_queue_, msg_pair); | |
auto topic = std::get<0>(msg_pair); | |
auto message = std::get<1>(msg_pair); | |
// Put the message in a shared pointer so we can re-use it for each client | |
auto const ss = boost::make_shared<std::string const>(std::move(message)); | |
// Make a local list of all the weak pointers representing | |
// the sessions, so we can do the actual sending without | |
// holding the mutex: | |
std::vector<boost::weak_ptr<websocket_session>> v; | |
{ | |
std::lock_guard<std::mutex> lock_symbol(state_->symbols_[topic]->mutex_); | |
v.reserve(state_->symbols_[topic]->sessions_.size()); | |
for(auto p : state_->symbols_[topic]->sessions_) | |
{ | |
v.emplace_back(p->weak_from_this()); | |
} | |
} | |
// For each session in our local list, try to acquire a strong | |
// pointer. If successful, then send the message on that session. | |
for(auto const& wp : v) { | |
if(auto sp = wp.lock()) { | |
sp->send(ss); | |
} | |
} | |
} | |
}); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// SRAVZ LLC | |
// | |
#ifndef SRAVZ_WEB_PUBLISHER_HPP | |
#define SRAVZ_WEB_PUBLISHER_HPP | |
#include <utility> | |
#include <util.hpp> | |
#include <sravz.hpp> | |
#include "shared_state.hpp" | |
#include "websocket_session.hpp" | |
#include "symbol.hpp" | |
class publisher : public boost::enable_shared_from_this<publisher> | |
{ | |
// RedisInstance redis_consumer_; | |
net::io_context& ioc_publisher_; | |
boost::shared_ptr<shared_state> state_; | |
public: | |
explicit | |
publisher(net::io_context& ioc_publisher, boost::shared_ptr<shared_state> const& state); | |
void publish (); | |
}; | |
#endif |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "util.hpp" | |
namespace sravz | |
{ | |
const int EXPONENTIAL_BACKOFF_LIMIT = 5; | |
struct config | |
{ | |
int id; | |
std::string topics; | |
}; | |
template <typename T1, typename T2> | |
void | |
get_message_with_backoff(boost::lockfree::spsc_queue<T1, boost::lockfree::capacity<1024> > &queue, T2 &msg) { | |
int backoff = 0; | |
while(true) { | |
if (queue.pop(msg)) | |
{ | |
return; | |
} else { | |
sleep(1 << backoff++); | |
if (backoff > EXPONENTIAL_BACKOFF_LIMIT) { | |
backoff = 0; | |
} | |
} | |
} | |
} | |
} | |
// void tag_invoke(boost::json::value_from_tag, boost::json::value& jv, config const& c) | |
// { | |
// jv = | |
// { | |
// { "id" , c.id }, | |
// { "topics", c.topics } | |
// }; | |
// }; | |
// config tag_invoke(boost::json::value_to_tag< config >, boost::json::value const& jv ) | |
// { | |
// object const& obj = jv.as_object(); | |
// return config { | |
// value_to<int>( obj.at( "id" ) ), | |
// value_to<std::string>( obj.at( "topics" ) ) | |
// }; | |
// }; | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// SRAVZ LLC | |
// | |
#include "subscriber.hpp" | |
#include "symbol.hpp" | |
#include <sravz.hpp> | |
subscriber:: | |
subscriber(net::io_context& ioc_subscriber, boost::shared_ptr<shared_state> const& state) | |
: ioc_subscriber_(ioc_subscriber) | |
,redis_ptr_(boost::make_shared<RedisInstance>(RedisInstance(getRedisConnectionOptions()))) | |
,state_(state) | |
{ | |
} | |
void | |
subscriber:: | |
subscribe() | |
{ | |
boost::asio::post(net::make_strand(ioc_subscriber_), | |
[=]() | |
{ | |
// Create a Subscriber. | |
// std::cout << "Before calling subscriber" << std::endl; | |
auto topics_subscribed_count = 0; | |
auto sub = redis_ptr_->subscriber(); | |
// for(auto const& symbol_ : state_->symbols_) | |
// { | |
// std::cout << "Subscribing to symbol: " << symbol_.first << std::endl; | |
// sub.subscribe(symbol_.first); | |
// } | |
// Set callback functions. | |
sub.on_message([=](std::string channel, std::string msg) { | |
try { | |
// if (sessions_.size()) { | |
// send(msg); | |
// std::cout << msg << std::endl; | |
state_->spsc_queue_.push(std::make_pair(channel, msg)); | |
// } | |
} catch (const sw::redis::Error &err) { | |
std::cout << "could not send message" << err.what() << std::endl; | |
} | |
}); | |
// Consume messages in a loop. | |
// std::cout << "Before while loop"; | |
std::pair<parser::MsgType, std::string> msg_pair; | |
while (true) { | |
if (topics_subscribed_count) | |
{ | |
try { | |
sub.consume(); | |
} catch (const sw::redis::Error &err) { | |
// std::cout << "Cannot read from redis" << err.what() << std::endl; | |
//TODO: Handle subscriber failure and recreate subscribe as per redis++ recommendation | |
} | |
} | |
if (state_->spsc_queue_subscriber_.pop(msg_pair)) | |
{ | |
auto type = std::get<0>(msg_pair); | |
auto topic = std::get<1>(msg_pair); | |
// std::cout << "Message in subscribe :" << " " << topic << std::endl; | |
switch (type) | |
{ | |
case parser::MsgType::SUBSCRIBE: | |
{ | |
std::cout << "Subscribing to topic :" << topic << std::endl; | |
sub.subscribe(topic); | |
topics_subscribed_count++; | |
break; | |
} | |
case parser::MsgType::UNSUBSCRIBE: | |
{ | |
std::cout << "Unsubscribing to topic :" << topic << std::endl; | |
sub.unsubscribe(topic); | |
topics_subscribed_count--; | |
break; | |
} | |
default: | |
std::cout << "Unsupported message type" << std::endl; | |
break; | |
} | |
} | |
} | |
}); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Sravz LLC | |
// | |
#include "symbol.hpp" | |
symbol:: | |
symbol(std::string code, std::string quote, std::chrono::milliseconds::rep time) | |
: code(std::move(code)) | |
,quote(quote) | |
,time(time), | |
refcount(0) | |
{ | |
} | |
void | |
symbol:: | |
join(websocket_session* session) | |
{ | |
sessions_.insert(session); | |
} | |
int | |
symbol:: | |
leave(websocket_session* session) | |
{ | |
return sessions_.erase(session); | |
} | |
// // Broadcast a message to all websocket client sessions | |
// void | |
// symbol:: | |
// send(std::string message) | |
// { | |
// // Put the message in a shared pointer so we can re-use it for each client | |
// auto const ss = boost::make_shared<std::string const>(std::move(message)); | |
// // Make a local list of all the weak pointers representing | |
// // the sessions, so we can do the actual sending without | |
// // holding the mutex: | |
// std::vector<boost::weak_ptr<websocket_session>> v; | |
// { | |
// std::lock_guard<std::mutex> lock(mutex_); | |
// v.reserve(sessions_.size()); | |
// for(auto p : sessions_) | |
// v.emplace_back(p->weak_from_this()); | |
// } | |
// // For each session in our local list, try to acquire a strong | |
// // pointer. If successful, then send the message on that session. | |
// for(auto const& wp : v) | |
// if(auto sp = wp.lock()) | |
// sp->send(ss); | |
// } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// SRAVZ LLC | |
// | |
#ifndef SRAVZ_SYMBOL_HPP | |
#define SRAVZ_SYMBOL_HPP | |
#include <util.hpp> | |
#include "websocket_session.hpp" | |
#include <boost/atomic.hpp> | |
// Represents a symbol state | |
class symbol : public boost::enable_shared_from_this<symbol> | |
{ | |
public: | |
std::string code; | |
std::string quote; | |
std::chrono::milliseconds::rep time; | |
// TODO: Not used for now. | |
mutable boost::atomic<int> refcount; | |
std::unordered_set<websocket_session*> sessions_; | |
std::mutex mutex_; | |
explicit | |
symbol(std::string code, std::string quote, std::chrono::milliseconds::rep time); | |
void join (websocket_session* session); | |
int leave (websocket_session* session); | |
// void send (std::string message); | |
}; | |
#endif |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <util.hpp> | |
// Gets env variable | |
bool getenv(const char *name, std::string &env) | |
{ | |
const char *ret = getenv(name); | |
if (ret) { | |
env = std::string(ret); | |
} else { | |
std::cout << "Env variable: " << name << " not set!!!"; | |
} | |
return !!ret; | |
} | |
// Returns redis connection options to be used in redis connection creation | |
sw::redis::ConnectionOptions getRedisConnectionOptions() { | |
std::string REDIS_HOST; | |
std::string REDIS_PORT; | |
std::string REDIS_PASSWORD; | |
sw::redis::ConnectionOptions opts; | |
if (getenv("REDIS_HOST", REDIS_HOST)) | |
opts.host = REDIS_HOST; | |
if (getenv("REDIS_PORT", REDIS_PORT)) | |
opts.port = std::stoi(REDIS_PORT); | |
if (getenv("REDIS_PASSWORD", REDIS_PASSWORD)) | |
opts.password = REDIS_PASSWORD; | |
opts.socket_timeout = std::chrono::milliseconds(200); | |
return opts; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef SRAVZ_BACKENDCPP_UTIL_H | |
#define SRAVZ_BACKENDCPP_UTIL_H | |
// Includes | |
#include "root_certificates.hpp" | |
#include <boost/algorithm/string.hpp> | |
#include <boost/asio.hpp> | |
#include <boost/asio/co_spawn.hpp> | |
#include <boost/asio/detached.hpp> | |
#include <boost/asio/io_context.hpp> | |
#include <boost/asio/io_service.hpp> | |
#include <boost/asio/ip/tcp.hpp> | |
#include <boost/asio/signal_set.hpp> | |
#include <boost/asio/spawn.hpp> | |
#include <boost/asio/strand.hpp> | |
#include <boost/asio/write.hpp> | |
#include <boost/beast/core.hpp> | |
#include <boost/beast/ssl.hpp> | |
#include <boost/beast/websocket.hpp> | |
#include <boost/beast/websocket/ssl.hpp> | |
#include <boost/coroutine2/all.hpp> | |
#include <boost/foreach.hpp> | |
#include <boost/json.hpp> | |
#include <boost/lexical_cast.hpp> | |
#include <boost/lockfree/spsc_queue.hpp> | |
#include <boost/log/trivial.hpp> | |
#include <boost/smart_ptr.hpp> | |
#include <boost/thread.hpp> | |
#include <chrono> | |
#include <cstddef> | |
#include <cstdio> | |
#include <cstdlib> | |
#include <functional> | |
#include <iostream> | |
#include <memory> | |
#include <mutex> | |
#include <redis-plus-plus/src/sw/redis++/errors.h> | |
#include <redis-plus-plus/test/src/sw/redis++/utils.h> | |
#include <string> | |
#include <thread> | |
#include <unordered_map> | |
#include <unordered_set> | |
#include <boost/url.hpp> | |
/* Using namespaces */ | |
// TODO: Move this to the cpp files where the namespace is used. | |
using RedisInstance = sw::redis::Redis; | |
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp> | |
// using namespace std::chrono; | |
// Namespaces | |
namespace beast = boost::beast; // from <boost/beast.hpp> | |
namespace http = beast::http; // from <boost/beast/http.hpp> | |
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp> | |
namespace net = boost::asio; // from <boost/asio.hpp> | |
namespace ssl = boost::asio::ssl; // from <boost/asio/ssl.hpp> | |
namespace json = boost::json; | |
typedef boost::asio::io_context::executor_type executor_type; | |
typedef boost::asio::strand<executor_type> strand; | |
// Classes | |
class ThreadInfo | |
{ | |
public: | |
beast::flat_buffer buffer; | |
std::shared_ptr<RedisInstance> redis; | |
ThreadInfo() {} | |
ThreadInfo(beast::flat_buffer buffer_, std::shared_ptr<RedisInstance> redis_) : buffer(buffer_), redis(redis_) {} | |
}; | |
typedef std::map<boost::thread::id, ThreadInfo> ThreadsInfo; | |
// Functions | |
// Gets env variable | |
bool getenv(const char *name, std::string &env); | |
// Returns redis connection options to be used in redis connection creation | |
sw::redis::ConnectionOptions getRedisConnectionOptions(); | |
#endif // end SRAVZ_BACKENDCPP_UTIL_H |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) | |
// | |
// Distributed under the Boost Software License, Version 1.0. (See accompanying | |
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
// | |
// Official repository: https://github.com/vinniefalco/CppCon2018 | |
// | |
#include "websocket_session.hpp" | |
#include <iostream> | |
websocket_session:: | |
websocket_session( | |
tcp::socket&& socket, | |
boost::shared_ptr<shared_state> const& state) | |
: ws_(std::move(socket)) | |
, state_(state) | |
{ | |
} | |
websocket_session:: | |
~websocket_session() | |
{ | |
// Remove this session from the list of active sessions | |
state_->leave(this); | |
} | |
void | |
websocket_session:: | |
fail(beast::error_code ec, char const* what) | |
{ | |
// Don't report these | |
if( ec == net::error::operation_aborted || | |
ec == websocket::error::closed) | |
return; | |
std::cerr << what << ": " << ec.message() << "\n"; | |
} | |
void | |
websocket_session:: | |
on_accept(beast::error_code ec) | |
{ | |
// Handle the error, if any | |
if(ec) | |
return fail(ec, "accept"); | |
// Add this session to the list of active session | |
state_->join(this); | |
// Read a message | |
// Do not echo back the message | |
// Parse the message and set the subscription | |
// TODO: Read subscribe message an send specific stock quotes | |
ws_.async_read( | |
buffer_, | |
beast::bind_front_handler( | |
&websocket_session::on_read, | |
shared_from_this())); | |
} | |
void | |
websocket_session:: | |
on_read(beast::error_code ec, std::size_t) | |
{ | |
// Handle the error, if any | |
if(ec) | |
return fail(ec, "read"); | |
// Send to all connections | |
state_->parse(beast::buffers_to_string(buffer_.data()), this); | |
// state_->send(beast::buffers_to_string(buffer_.data())); | |
// Implement parser to parse message and subscribe to symbol | |
// Clear the buffer | |
buffer_.consume(buffer_.size()); | |
// Read another message | |
ws_.async_read( | |
buffer_, | |
beast::bind_front_handler( | |
&websocket_session::on_read, | |
shared_from_this())); | |
} | |
void | |
websocket_session:: | |
send(boost::shared_ptr<std::string const> const& ss) | |
{ | |
// Post our work to the strand, this ensures | |
// that the members of `this` will not be | |
// accessed concurrently. | |
net::post( | |
ws_.get_executor(), | |
beast::bind_front_handler( | |
&websocket_session::on_send, | |
shared_from_this(), | |
ss)); | |
} | |
void | |
websocket_session:: | |
on_send(boost::shared_ptr<std::string const> const& ss) | |
{ | |
// Always add to queue | |
queue_.push_back(ss); | |
// Are we already writing? | |
if(queue_.size() > 1) | |
return; | |
// We are not currently writing, so send this immediately | |
ws_.async_write( | |
net::buffer(*queue_.front()), | |
beast::bind_front_handler( | |
&websocket_session::on_write, | |
shared_from_this())); | |
} | |
void | |
websocket_session:: | |
on_write(beast::error_code ec, std::size_t) | |
{ | |
// Handle the error, if any | |
if(ec) | |
return fail(ec, "write"); | |
// Remove the string from the queue | |
queue_.erase(queue_.begin()); | |
// Send the next message if any | |
if(! queue_.empty()) | |
ws_.async_write( | |
net::buffer(*queue_.front()), | |
beast::bind_front_handler( | |
&websocket_session::on_write, | |
shared_from_this())); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) | |
// | |
// Distributed under the Boost Software License, Version 1.0. (See accompanying | |
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
// | |
// Official repository: https://github.com/vinniefalco/CppCon2018 | |
// | |
#ifndef BOOST_BEAST_EXAMPLE_WEBSOCKET_CHAT_MULTI_WEBSOCKET_SESSION_HPP | |
#define BOOST_BEAST_EXAMPLE_WEBSOCKET_CHAT_MULTI_WEBSOCKET_SESSION_HPP | |
#include "net.hpp" | |
#include "beast.hpp" | |
#include "shared_state.hpp" | |
#include <cstdlib> | |
#include <memory> | |
#include <string> | |
#include <vector> | |
#include <set> | |
// Forward declaration | |
class shared_state; | |
/** Represents an active WebSocket connection to the server | |
*/ | |
class websocket_session : public boost::enable_shared_from_this<websocket_session> | |
{ | |
beast::flat_buffer buffer_; | |
websocket::stream<beast::tcp_stream> ws_; | |
boost::shared_ptr<shared_state> state_; | |
std::vector<boost::shared_ptr<std::string const>> queue_; | |
void fail(beast::error_code ec, char const* what); | |
void on_accept(beast::error_code ec); | |
void on_read(beast::error_code ec, std::size_t bytes_transferred); | |
void on_write(beast::error_code ec, std::size_t bytes_transferred); | |
public: | |
std::set<std::string> topics; | |
websocket_session( | |
tcp::socket&& socket, | |
boost::shared_ptr<shared_state> const& state); | |
~websocket_session(); | |
template<class Body, class Allocator> | |
void | |
run(http::request<Body, http::basic_fields<Allocator>> req); | |
// Send a message | |
void | |
send(boost::shared_ptr<std::string const> const& ss); | |
private: | |
void | |
on_send(boost::shared_ptr<std::string const> const& ss); | |
}; | |
template<class Body, class Allocator> | |
void | |
websocket_session:: | |
run(http::request<Body, http::basic_fields<Allocator>> req) | |
{ | |
// Set suggested timeout settings for the websocket | |
ws_.set_option( | |
websocket::stream_base::timeout::suggested( | |
beast::role_type::server)); | |
// Set a decorator to change the Server of the handshake | |
ws_.set_option(websocket::stream_base::decorator( | |
[](websocket::response_type& res) | |
{ | |
res.set(http::field::server, | |
std::string(BOOST_BEAST_VERSION_STRING) + | |
" websocket-chat-multi"); | |
})); | |
// Accept the websocket handshake | |
ws_.async_accept( | |
req, | |
beast::bind_front_handler( | |
&websocket_session::on_accept, | |
shared_from_this())); | |
} | |
#endif |