fix bug: spawn: coro: stack size limit on windows

This commit is contained in:
jie 2021-11-22 19:29:05 +08:00
parent 06707c4446
commit 774efcec0d
4 changed files with 512 additions and 454 deletions

View File

@ -20,494 +20,504 @@
namespace bserv { namespace bserv {
std::string get_address(const tcp::socket& socket) { std::string get_address(const tcp::socket& socket) {
tcp::endpoint end_point = socket.remote_endpoint(); tcp::endpoint end_point = socket.remote_endpoint();
std::string addr = end_point.address().to_string() std::string addr = end_point.address().to_string()
+ ':' + std::to_string(end_point.port()); + ':' + std::to_string(end_point.port());
return addr; return addr;
} }
http::response<http::string_body> handle_request( http::response<http::string_body> handle_request(
http::request<http::string_body>& req, router& routes, http::request<http::string_body>& req, router& routes,
std::shared_ptr<websocket_session> ws_session, std::shared_ptr<websocket_session> ws_session,
asio::io_context& ioc, asio::yield_context& yield) { asio::io_context& ioc, asio::yield_context& yield) {
const auto bad_request = [&req](beast::string_view why) { const auto bad_request = [&req](beast::string_view why) {
http::response<http::string_body> res{ http::response<http::string_body> res{
http::status::bad_request, req.version() }; http::status::bad_request, req.version() };
res.set(http::field::server, NAME); res.set(http::field::server, NAME);
res.set(http::field::content_type, "text/html"); res.set(http::field::content_type, "text/html");
res.keep_alive(req.keep_alive()); res.keep_alive(req.keep_alive());
res.body() = std::string{ why }; res.body() = std::string{ why };
res.prepare_payload(); res.prepare_payload();
return res; return res;
}; };
const auto not_found = [&req](beast::string_view target) { const auto not_found = [&req](beast::string_view target) {
http::response<http::string_body> res{ http::response<http::string_body> res{
http::status::not_found, req.version() }; http::status::not_found, req.version() };
res.set(http::field::server, NAME); res.set(http::field::server, NAME);
res.set(http::field::content_type, "text/html"); res.set(http::field::content_type, "text/html");
res.keep_alive(req.keep_alive()); res.keep_alive(req.keep_alive());
res.body() = "The requested url '" res.body() = "The requested url '"
+ std::string{ target } + "' does not exist."; + std::string{ target } + "' does not exist.";
res.prepare_payload(); res.prepare_payload();
return res; return res;
}; };
const auto server_error = [&req](beast::string_view what) { const auto server_error = [&req](beast::string_view what) {
http::response<http::string_body> res{ http::response<http::string_body> res{
http::status::internal_server_error, req.version() }; http::status::internal_server_error, req.version() };
res.set(http::field::server, NAME); res.set(http::field::server, NAME);
res.set(http::field::content_type, "text/html"); res.set(http::field::content_type, "text/html");
res.keep_alive(req.keep_alive()); res.keep_alive(req.keep_alive());
res.body() = "Internal server error: " + std::string{ what }; res.body() = "Internal server error: " + std::string{ what };
res.prepare_payload(); res.prepare_payload();
return res; return res;
}; };
boost::string_view target = req.target(); boost::string_view target = req.target();
auto pos = target.find('?'); auto pos = target.find('?');
boost::string_view url; boost::string_view url;
if (pos == boost::string_view::npos) url = target; if (pos == boost::string_view::npos) url = target;
else url = target.substr(0, pos); else url = target.substr(0, pos);
http::response<http::string_body> res{ http::response<http::string_body> res{
http::status::ok, req.version() }; http::status::ok, req.version() };
res.set(http::field::server, NAME); res.set(http::field::server, NAME);
res.set(http::field::content_type, "application/json"); res.set(http::field::content_type, "application/json");
res.keep_alive(req.keep_alive()); res.keep_alive(req.keep_alive());
std::optional<boost::json::value> val; std::optional<boost::json::value> val;
try { try {
val = routes(ioc, yield, ws_session, std::string{ url }, req, res); val = routes(ioc, yield, ws_session, std::string{ url }, req, res);
} }
catch (const url_not_found_exception& /*e*/) { catch (const url_not_found_exception& /*e*/) {
return not_found(url); return not_found(url);
} }
catch (const bad_request_exception& /*e*/) { catch (const bad_request_exception& /*e*/) {
return bad_request("Request body is not a valid JSON string."); return bad_request("Request body is not a valid JSON string.");
} }
catch (const std::exception& e) { catch (const std::exception& e) {
return server_error(e.what()); return server_error(e.what());
} }
catch (...) { catch (...) {
return server_error("Unknown exception."); return server_error("Unknown exception.");
} }
if (val.has_value()) { if (val.has_value()) {
res.body() = json::serialize(val.value()); res.body() = json::serialize(val.value());
res.prepare_payload(); res.prepare_payload();
} }
return res; return res;
} }
class websocket_session_server; class websocket_session_server;
void handle_websocket_request( void handle_websocket_request(
std::shared_ptr<websocket_session_server>, std::shared_ptr<websocket_session_server>,
std::shared_ptr<websocket_session> session, std::shared_ptr<websocket_session> session,
http::request<http::string_body>& req, router& routes, http::request<http::string_body>& req, router& routes,
asio::io_context& ioc, asio::yield_context yield); asio::io_context& ioc, asio::yield_context yield);
class websocket_session_server class websocket_session_server
: public std::enable_shared_from_this<websocket_session_server> { : public std::enable_shared_from_this<websocket_session_server> {
private: private:
friend websocket_server; friend websocket_server;
std::string address_; std::string address_;
std::shared_ptr<websocket_session> session_; std::shared_ptr<websocket_session> session_;
http::request<http::string_body> req_; http::request<http::string_body> req_;
router& routes_; router& routes_;
void on_accept(beast::error_code ec) { void on_accept(beast::error_code ec) {
if (ec) { if (ec) {
fail(ec, "websocket_session_server accept"); fail(ec, "websocket_session_server accept");
return; return;
} }
// handles request here // handles request here
asio::spawn( asio::spawn(
session_->ioc_, session_->ioc_,
std::bind( std::bind(
&handle_websocket_request, &handle_websocket_request,
shared_from_this(), shared_from_this(),
session_, session_,
std::ref(req_), std::ref(req_),
std::ref(routes_), std::ref(routes_),
std::ref(session_->ioc_), std::ref(session_->ioc_),
std::placeholders::_1)); std::placeholders::_1)
} #ifdef _MSC_VER
public: , boost::coroutines::attributes{ STACK_SIZE }
explicit websocket_session_server( #endif
asio::io_context& ioc, );
tcp::socket&& socket, }
http::request<http::string_body>&& req, public:
router& routes) explicit websocket_session_server(
: address_{ get_address(socket) }, asio::io_context& ioc,
session_{ std::make_shared< tcp::socket&& socket,
websocket_session>(address_, ioc, std::move(socket)) }, http::request<http::string_body>&& req,
req_{ std::move(req) }, routes_{ routes } { router& routes)
lgtrace << "websocket_session_server opened: " << address_; : address_{ get_address(socket) },
} session_{ std::make_shared<
~websocket_session_server() { websocket_session>(address_, ioc, std::move(socket)) },
lgtrace << "websocket_session_server closed: " << address_; req_{ std::move(req) }, routes_{ routes } {
} lgtrace << "websocket_session_server opened: " << address_;
// starts the asynchronous accept operation }
void do_accept() { ~websocket_session_server() {
// sets suggested timeout settings for the websocket lgtrace << "websocket_session_server closed: " << address_;
session_->ws_.set_option( }
websocket::stream_base::timeout::suggested( // starts the asynchronous accept operation
beast::role_type::server)); void do_accept() {
// sets a decorator to change the Server of the handshake // sets suggested timeout settings for the websocket
session_->ws_.set_option( session_->ws_.set_option(
websocket::stream_base::decorator( websocket::stream_base::timeout::suggested(
[](websocket::response_type& res) { beast::role_type::server));
res.set( // sets a decorator to change the Server of the handshake
http::field::server, session_->ws_.set_option(
std::string{ BOOST_BEAST_VERSION_STRING } + " websocket-server"); websocket::stream_base::decorator(
})); [](websocket::response_type& res) {
// accepts the websocket handshake res.set(
session_->ws_.async_accept( http::field::server,
req_, std::string{ BOOST_BEAST_VERSION_STRING } + " websocket-server");
beast::bind_front_handler( }));
&websocket_session_server::on_accept, // accepts the websocket handshake
shared_from_this())); session_->ws_.async_accept(
} req_,
}; beast::bind_front_handler(
&websocket_session_server::on_accept,
shared_from_this()));
}
};
void handle_websocket_request( void handle_websocket_request(
std::shared_ptr<websocket_session_server>, std::shared_ptr<websocket_session_server>,
std::shared_ptr<websocket_session> session, std::shared_ptr<websocket_session> session,
http::request<http::string_body>& req, router& routes, http::request<http::string_body>& req, router& routes,
asio::io_context& ioc, asio::yield_context yield) { asio::io_context& ioc, asio::yield_context yield) {
handle_request(req, routes, session, ioc, yield); handle_request(req, routes, session, ioc, yield);
} }
std::string websocket_server::read() { std::string websocket_server::read() {
beast::error_code ec; beast::error_code ec;
beast::flat_buffer buffer; beast::flat_buffer buffer;
// reads a message into the buffer // reads a message into the buffer
session_.ws_.async_read(buffer, yield_[ec]); session_.ws_.async_read(buffer, yield_[ec]);
lgtrace << "websocket_server: read from " << session_.address_; lgtrace << "websocket_server: read from " << session_.address_;
// this indicates that the session was closed // this indicates that the session was closed
if (ec == websocket::error::closed) { if (ec == websocket::error::closed) {
throw websocket_closed{}; throw websocket_closed{};
} }
if (ec) { if (ec) {
fail(ec, "websocket_server read"); fail(ec, "websocket_server read");
throw websocket_io_exception{ "websocket_server read: " + ec.message() }; throw websocket_io_exception{ "websocket_server read: " + ec.message() };
} }
// lgtrace << "websocket_server: received text? " << ws_.got_text() << " from " << address_; // lgtrace << "websocket_server: received text? " << ws_.got_text() << " from " << address_;
return beast::buffers_to_string(buffer.data()); return beast::buffers_to_string(buffer.data());
} }
void websocket_server::write(const std::string& data) { void websocket_server::write(const std::string& data) {
beast::error_code ec; beast::error_code ec;
// ws_.text(ws_.got_text()); // ws_.text(ws_.got_text());
session_.ws_.async_write(asio::buffer(data), yield_[ec]); session_.ws_.async_write(asio::buffer(data), yield_[ec]);
lgtrace << "websocket_server: write to " << session_.address_; lgtrace << "websocket_server: write to " << session_.address_;
if (ec) { if (ec) {
fail(ec, "websocket_server write"); fail(ec, "websocket_server write");
throw websocket_io_exception{ "websocket_server write: " + ec.message() }; throw websocket_io_exception{ "websocket_server write: " + ec.message() };
} }
} }
class http_session; class http_session;
// this function produces an HTTP response for the given // this function produces an HTTP response for the given
// request. The type of the response object depends on the // request. The type of the response object depends on the
// contents of the request, so the interface requires the // contents of the request, so the interface requires the
// caller to pass a generic lambda for receiving the response. // caller to pass a generic lambda for receiving the response.
// NOTE: `send` should be called only once! // NOTE: `send` should be called only once!
template <class Send> template <class Send>
void handle_http_request( void handle_http_request(
std::shared_ptr<http_session>, std::shared_ptr<http_session>,
http::request<http::string_body> req, http::request<http::string_body> req,
Send& send, router& routes, asio::io_context& ioc, asio::yield_context yield) { Send& send, router& routes, asio::io_context& ioc, asio::yield_context yield) {
send(handle_request(req, routes, nullptr, ioc, yield)); send(handle_request(req, routes, nullptr, ioc, yield));
} }
// handles an HTTP server connection // handles an HTTP server connection
class http_session class http_session
: public std::enable_shared_from_this<http_session> { : public std::enable_shared_from_this<http_session> {
private: private:
// the function object is used to send an HTTP message. // the function object is used to send an HTTP message.
class send_lambda { class send_lambda {
private: private:
http_session& self_; http_session& self_;
public: public:
send_lambda(http_session& self) send_lambda(http_session& self)
: self_{ self } {} : self_{ self } {}
template <bool isRequest, class Body, class Fields> template <bool isRequest, class Body, class Fields>
void operator()( void operator()(
http::message<isRequest, Body, Fields>&& msg) const { http::message<isRequest, Body, Fields>&& msg) const {
// the lifetime of the message has to extend // the lifetime of the message has to extend
// for the duration of the async operation so // for the duration of the async operation so
// we use a shared_ptr to manage it. // we use a shared_ptr to manage it.
auto sp = std::make_shared< auto sp = std::make_shared<
http::message<isRequest, Body, Fields>>( http::message<isRequest, Body, Fields>>(
std::move(msg)); std::move(msg));
// stores a type-erased version of the shared // stores a type-erased version of the shared
// pointer in the class to keep it alive. // pointer in the class to keep it alive.
self_.res_ = sp; self_.res_ = sp;
// writes the response // writes the response
http::async_write( http::async_write(
self_.stream_, *sp, self_.stream_, *sp,
beast::bind_front_handler( beast::bind_front_handler(
&http_session::on_write, &http_session::on_write,
self_.shared_from_this(), self_.shared_from_this(),
sp->need_eof())); sp->need_eof()));
} }
} lambda_; } lambda_;
asio::io_context& ioc_; asio::io_context& ioc_;
beast::tcp_stream stream_; beast::tcp_stream stream_;
beast::flat_buffer buffer_; beast::flat_buffer buffer_;
boost::optional< boost::optional<
http::request_parser<http::string_body>> parser_; http::request_parser<http::string_body>> parser_;
std::shared_ptr<void> res_; std::shared_ptr<void> res_;
router& routes_; router& routes_;
router& ws_routes_; router& ws_routes_;
const std::string address_; const std::string address_;
void do_read() { void do_read() {
// constructs a new parser for each message // constructs a new parser for each message
parser_.emplace(); parser_.emplace();
// applies a reasonable limit to the allowed size // applies a reasonable limit to the allowed size
// of the body in bytes to prevent abuse. // of the body in bytes to prevent abuse.
parser_->body_limit(PAYLOAD_LIMIT); parser_->body_limit(PAYLOAD_LIMIT);
// sets the timeout. // sets the timeout.
stream_.expires_after(std::chrono::seconds(EXPIRY_TIME)); stream_.expires_after(std::chrono::seconds(EXPIRY_TIME));
// reads a request using the parser-oriented interface // reads a request using the parser-oriented interface
http::async_read( http::async_read(
stream_, buffer_, *parser_, stream_, buffer_, *parser_,
beast::bind_front_handler( beast::bind_front_handler(
&http_session::on_read, &http_session::on_read,
shared_from_this())); shared_from_this()));
} }
void on_read( void on_read(
beast::error_code ec, beast::error_code ec,
std::size_t bytes_transferred) { std::size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred); boost::ignore_unused(bytes_transferred);
lgtrace << "received " << bytes_transferred << " byte(s) from: " << address_; lgtrace << "received " << bytes_transferred << " byte(s) from: " << address_;
// this means they closed the connection // this means they closed the connection
if (ec == http::error::end_of_stream) { if (ec == http::error::end_of_stream) {
do_close(); do_close();
return; return;
} }
if (ec) { if (ec) {
fail(ec, "http_session async_read"); fail(ec, "http_session async_read");
return; return;
} }
// sees if it is a websocket upgrade // sees if it is a websocket upgrade
if (websocket::is_upgrade(parser_->get())) { if (websocket::is_upgrade(parser_->get())) {
// creates a websocket session, transferring ownership // creates a websocket session, transferring ownership
// of both the socket and the http request // of both the socket and the http request
std::make_shared<websocket_session_server>( std::make_shared<websocket_session_server>(
ioc_, ioc_,
stream_.release_socket(), stream_.release_socket(),
parser_->release(), parser_->release(),
ws_routes_ ws_routes_
)->do_accept(); )->do_accept();
return; return;
} }
// handles the request and sends the response // handles the request and sends the response
asio::spawn( asio::spawn(
ioc_, ioc_,
std::bind( std::bind(
&handle_http_request<send_lambda>, &handle_http_request<send_lambda>,
shared_from_this(), shared_from_this(),
parser_->release(), parser_->release(),
std::ref(lambda_), std::ref(lambda_),
std::ref(routes_), std::ref(routes_),
std::ref(ioc_), std::ref(ioc_),
std::placeholders::_1)); std::placeholders::_1)
// handle_request(parser_->release(), lambda_, routes_); #ifdef _MSC_VER
// currently, it is only identified on windows
// that the default stack size is too small
, boost::coroutines::attributes{ STACK_SIZE }
#endif
);
// handle_request(parser_->release(), lambda_, routes_);
// at this point the parser can be reset // at this point the parser can be reset
} }
void on_write( void on_write(
bool close, beast::error_code ec, bool close, beast::error_code ec,
std::size_t bytes_transferred) { std::size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred); boost::ignore_unused(bytes_transferred);
// we're done with the response so delete it // we're done with the response so delete it
res_.reset(); res_.reset();
if (ec) { if (ec) {
fail(ec, "http_session async_write"); fail(ec, "http_session async_write");
return; return;
} }
lgtrace << "sent " << bytes_transferred << " byte(s) to: " << address_; lgtrace << "sent " << bytes_transferred << " byte(s) to: " << address_;
if (close) { if (close) {
// this means we should close the connection, usually because // this means we should close the connection, usually because
// the response indicated the "Connection: close" semantic. // the response indicated the "Connection: close" semantic.
do_close(); do_close();
return; return;
} }
// reads another request // reads another request
do_read(); do_read();
} }
void do_close() { void do_close() {
// sends a TCP shutdown // sends a TCP shutdown
beast::error_code ec; beast::error_code ec;
stream_.socket().shutdown(tcp::socket::shutdown_send, ec); stream_.socket().shutdown(tcp::socket::shutdown_send, ec);
// at this point the connection is closed gracefully // at this point the connection is closed gracefully
lgtrace << "socket connection closed: " << address_; lgtrace << "socket connection closed: " << address_;
} }
public: public:
http_session( http_session(
asio::io_context& ioc, asio::io_context& ioc,
tcp::socket&& socket, tcp::socket&& socket,
router& routes, router& routes,
router& ws_routes) router& ws_routes)
: lambda_{ *this }, : lambda_{ *this },
ioc_{ ioc }, ioc_{ ioc },
stream_{ std::move(socket) }, stream_{ std::move(socket) },
routes_{ routes }, routes_{ routes },
ws_routes_{ ws_routes }, ws_routes_{ ws_routes },
address_{ get_address(stream_.socket()) } { address_{ get_address(stream_.socket()) } {
lgtrace << "http session opened: " << address_; lgtrace << "http session opened: " << address_;
} }
~http_session() { ~http_session() {
lgtrace << "http session closed: " << address_; lgtrace << "http session closed: " << address_;
} }
void run() { void run() {
asio::dispatch( asio::dispatch(
stream_.get_executor(), stream_.get_executor(),
beast::bind_front_handler( beast::bind_front_handler(
&http_session::do_read, &http_session::do_read,
shared_from_this())); shared_from_this()));
} }
}; };
// accepts incoming connections and launches the sessions // accepts incoming connections and launches the sessions
class listener class listener
: public std::enable_shared_from_this<listener> { : public std::enable_shared_from_this<listener> {
private: private:
asio::io_context& ioc_; asio::io_context& ioc_;
tcp::acceptor acceptor_; tcp::acceptor acceptor_;
router& routes_; router& routes_;
router& ws_routes_; router& ws_routes_;
void do_accept() { void do_accept() {
acceptor_.async_accept( acceptor_.async_accept(
asio::make_strand(ioc_), asio::make_strand(ioc_),
beast::bind_front_handler( beast::bind_front_handler(
&listener::on_accept, &listener::on_accept,
shared_from_this())); shared_from_this()));
} }
void on_accept(beast::error_code ec, tcp::socket socket) { void on_accept(beast::error_code ec, tcp::socket socket) {
if (ec) { if (ec) {
fail(ec, "listener::acceptor async_accept"); fail(ec, "listener::acceptor async_accept");
} }
else { else {
lgtrace << "listener accepts: " << get_address(socket); lgtrace << "listener accepts: " << get_address(socket);
std::make_shared<http_session>( std::make_shared<http_session>(
ioc_, std::move(socket), routes_, ws_routes_)->run(); ioc_, std::move(socket), routes_, ws_routes_)->run();
} }
do_accept(); do_accept();
} }
public: public:
listener( listener(
asio::io_context& ioc, asio::io_context& ioc,
tcp::endpoint endpoint, tcp::endpoint endpoint,
router& routes, router& routes,
router& ws_routes) router& ws_routes)
: ioc_{ ioc }, : ioc_{ ioc },
acceptor_{ asio::make_strand(ioc) }, acceptor_{ asio::make_strand(ioc) },
routes_{ routes }, routes_{ routes },
ws_routes_{ ws_routes } { ws_routes_{ ws_routes } {
beast::error_code ec; beast::error_code ec;
acceptor_.open(endpoint.protocol(), ec); acceptor_.open(endpoint.protocol(), ec);
if (ec) { if (ec) {
fail(ec, "listener::acceptor open"); fail(ec, "listener::acceptor open");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
return; return;
} }
acceptor_.set_option( acceptor_.set_option(
asio::socket_base::reuse_address(true), ec); asio::socket_base::reuse_address(true), ec);
if (ec) { if (ec) {
fail(ec, "listener::acceptor set_option"); fail(ec, "listener::acceptor set_option");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
return; return;
} }
acceptor_.bind(endpoint, ec); acceptor_.bind(endpoint, ec);
if (ec) { if (ec) {
fail(ec, "listener::acceptor bind"); fail(ec, "listener::acceptor bind");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
return; return;
} }
acceptor_.listen( acceptor_.listen(
asio::socket_base::max_listen_connections, ec); asio::socket_base::max_listen_connections, ec);
if (ec) { if (ec) {
fail(ec, "listener::acceptor listen"); fail(ec, "listener::acceptor listen");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
return; return;
} }
} }
void run() { void run() {
asio::dispatch( asio::dispatch(
acceptor_.get_executor(), acceptor_.get_executor(),
beast::bind_front_handler( beast::bind_front_handler(
&listener::do_accept, &listener::do_accept,
shared_from_this())); shared_from_this()));
} }
}; };
server::server(const server_config& config, router&& routes, router&& ws_routes) server::server(const server_config& config, router&& routes, router&& ws_routes)
: ioc_{ config.get_num_threads() }, : ioc_{ config.get_num_threads() },
routes_{ std::move(routes) }, routes_{ std::move(routes) },
ws_routes_{ std::move(ws_routes) } { ws_routes_{ std::move(ws_routes) } {
init_logging(config); init_logging(config);
if (config.get_db_conn_str() != "") { if (config.get_db_conn_str() != "") {
// database connection // database connection
try { try {
db_conn_mgr_ = std::make_shared< db_conn_mgr_ = std::make_shared<
db_connection_manager>(config.get_db_conn_str(), config.get_num_db_conn()); db_connection_manager>(config.get_db_conn_str(), config.get_num_db_conn());
} }
catch (const std::exception& e) { catch (const std::exception& e) {
lgfatal << "db connection initialization failed: " << e.what() << std::endl; lgfatal << "db connection initialization failed: " << e.what() << std::endl;
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} }
session_mgr_ = std::make_shared<memory_session_manager>(); session_mgr_ = std::make_shared<memory_session_manager>();
std::shared_ptr<server_resources> resources_ptr = std::make_shared<server_resources>(); std::shared_ptr<server_resources> resources_ptr = std::make_shared<server_resources>();
resources_ptr->session_mgr = session_mgr_; resources_ptr->session_mgr = session_mgr_;
resources_ptr->db_conn_mgr = db_conn_mgr_; resources_ptr->db_conn_mgr = db_conn_mgr_;
routes_.set_resources(resources_ptr); routes_.set_resources(resources_ptr);
ws_routes_.set_resources(resources_ptr); ws_routes_.set_resources(resources_ptr);
// creates and launches a listening port // creates and launches a listening port
std::make_shared<listener>( std::make_shared<listener>(
ioc_, tcp::endpoint{ tcp::v4(), config.get_port() }, routes_, ws_routes_)->run(); ioc_, tcp::endpoint{ tcp::v4(), config.get_port() }, routes_, ws_routes_)->run();
// captures SIGINT and SIGTERM to perform a clean shutdown // captures SIGINT and SIGTERM to perform a clean shutdown
asio::signal_set signals{ ioc_, SIGINT, SIGTERM }; asio::signal_set signals{ ioc_, SIGINT, SIGTERM };
signals.async_wait( signals.async_wait(
[&](const boost::system::error_code&, int) { [&](const boost::system::error_code&, int) {
// stops the `io_context`. This will cause `run()` // stops the `io_context`. This will cause `run()`
// to return immediately, eventually destroying the // to return immediately, eventually destroying the
// `io_context` and all of the sockets in it. // `io_context` and all of the sockets in it.
ioc_.stop(); ioc_.stop();
}); });
lginfo << config.get_name() << " started"; lginfo << config.get_name() << " started";
// runs the I/O service on the requested number of threads // runs the I/O service on the requested number of threads
std::vector<std::thread> v; std::vector<std::thread> v;
v.reserve(config.get_num_threads() - 1); v.reserve(config.get_num_threads() - 1);
for (int i = 1; i < config.get_num_threads(); ++i) for (int i = 1; i < config.get_num_threads(); ++i)
v.emplace_back([&] { ioc_.run(); }); v.emplace_back([&] { ioc_.run(); });
ioc_.run(); ioc_.run();
// if we get here, it means we got a SIGINT or SIGTERM // if we get here, it means we got a SIGINT or SIGTERM
lginfo << "exiting " << config.get_name(); lginfo << "exiting " << config.get_name();
// blocks until all the threads exit // blocks until all the threads exit
for (auto& t : v) t.join(); for (auto& t : v) t.join();
} }
} // bserv } // bserv

View File

@ -26,6 +26,10 @@ namespace bserv {
//const std::string DB_CONN_STR = "dbname=bserv"; //const std::string DB_CONN_STR = "dbname=bserv";
const std::string DB_CONN_STR = ""; const std::string DB_CONN_STR = "";
#ifdef _MSC_VER
const std::size_t STACK_SIZE = 1024 * 1024;
#endif
#define decl_field(type, name, default_value) \ #define decl_field(type, name, default_value) \
private: \ private: \
std::optional<type> name##_; \ std::optional<type> name##_; \

View File

@ -0,0 +1,22 @@
import requests
import random
from multiprocessing import Process
def test():
if random.randint(0, 1) == 0:
resp = requests.get("http://localhost:8080/statics/js/bootstrap.bundle.min.js")
else:
resp = requests.get("http://localhost:8080/statics/css/bootstrap.min.css")
if resp.status_code != 200:
print(resp)
if __name__ == '__main__':
print('starting test')
processes = [Process(target=test) for _ in range(200)]
for p in processes:
p.start()
for p in processes:
p.join()
print('end of test')

View File

@ -0,0 +1,22 @@
import requests
import random
from multiprocessing import Process
def test():
if random.randint(0, 1) == 0:
resp = requests.get("http://localhost:8080")
else:
resp = requests.get("http://localhost:8080/users")
if resp.status_code != 200:
print(resp)
if __name__ == '__main__':
print('starting test')
processes = [Process(target=test) for _ in range(200)]
for p in processes:
p.start()
for p in processes:
p.join()
print('end of test')