'boost::asio - the acceptor doesn't call the callback after the server is stopped and started again
I've created a simple wrapper for boost::asio library. My wrapper consists of 4 main classes: NetServer (server), NetClient (client), NetSession (client/server session) and Network (composition class of these three which also includes all callback methods). The problem is that the first connection client/server works flawlessly, but when I then stop server, start it again and then try to connect the client, the server just doesn't recognize the client. It seems like the acceptor callback isn't called. And client does connect to server, because first - the connection goes without errors, second - when I close the server's program, the client receives the error message WSAECONNRESET.
I've created test program which emulates the procedure written above. It does following:
- Starts the server
- Starts the client
- Client succesfully connects to server
- Stops the server
- Client receives the error and disconnects itself
- Starts the server again
- Client again succesfully connects to server
- BUT SERVER DOESN'T CALL THE ACCEPTOR CALLBACK ANYMORE
It means that in point 3 the acceptor succesfully calls the callback function, but in point 7 the acceptor doesn't call the callback.
I think I do something wrong in stop()/start() method of the server, but I can't figure out what's exactly wrong.
The source of the NetServer class:
NetServer::NetServer(Network& netRef) : net{ netRef }
{
acceptor = std::make_unique<boost::asio::ip::tcp::acceptor>(ioc);
}
NetServer::~NetServer(void)
{
ioc.stop();
if (threadStarted)
{
th.join();
threadStarted = false;
}
if (active)
stop();
}
int NetServer::start(void)
{
assert(getAcceptHandler() != nullptr);
assert(getHeaderHandler() != nullptr);
assert(getDataHandler() != nullptr);
assert(getErrorHandler() != nullptr);
closeAll();
try
{
ep = boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), srvPort);
acceptor->open(ep.protocol());
acceptor->bind(ep);
acceptor->listen();
initAccept();
}
catch (system::system_error& e)
{
return e.code().value();
}
if (!threadStarted)
{
th = std::thread([this]()
{
ioc.run();
});
threadStarted = true;
}
active = true;
return Network::NET_OK;
}
int NetServer::stop(void)
{
ioc.post(boost::bind(&NetServer::_stop, this));
return Network::NET_OK;
}
void NetServer::_stop(void)
{
boost::system::error_code ec;
acceptor->close(ec);
for (auto& s : sessions)
closeSession(s.get(), false);
active = false;
}
void NetServer::initAccept(void)
{
sock = std::make_shared<asio::ip::tcp::socket>(ioc);
acceptor->async_accept(*sock.get(), [this](const boost::system::error_code& error)
{
onAccept(error, sock);
});
}
void NetServer::onAccept(const boost::system::error_code& ec, SocketSharedPtr sock)
{
if (ec.value() == 0)
{
if (accHandler())
{
addSession(sock);
initAccept();
}
}
else
getErrorHandler()(nullptr, ec);
}
SessionPtr NetServer::addSession(SocketSharedPtr sock)
{
std::lock_guard<std::mutex> guard(mtxSession);
auto session = std::make_shared<NetSession>(sock, *this, true);
sessions.insert(session);
session->start();
return session;
}
SessionPtr NetServer::findSession(const SessionPtr session)
{
for (auto it = std::begin(sessions); it != std::end(sessions); it++)
if (*it == session)
return *it;
return nullptr;
}
bool NetServer::closeSession(const void *session, bool erase /* = true */)
{
std::lock_guard<std::mutex> guard(mtxSession);
for (auto it = std::begin(sessions); it != std::end(sessions); it++)
if (it->get() == session)
{
try
{
it->get()->getSocket()->cancel();
it->get()->getSocket()->shutdown(asio::socket_base::shutdown_send);
it->get()->getSocket()->close();
it->get()->getSocket().reset();
}
catch (system::system_error& e)
{
UNREFERENCED_PARAMETER(e);
}
if (erase)
sessions.erase(*it);
return true;
}
return false;
}
void NetServer::closeAll(void)
{
using namespace boost::placeholders;
std::lock_guard<std::mutex> guard(mtxSession);
std::for_each(sessions.begin(), sessions.end(), boost::bind(&NetSession::stop, _1));
sessions.clear();
}
bool NetServer::write(const SessionPtr session, std::string msg)
{
if (SessionPtr s = findSession(session); s)
{
s->addMessage(msg);
if (s->canWrite())
s->write();
return true;
}
return false;
}
This is the output from the server:
Enter 0 - server, 1 - client: 0
1. Server started
3. Client connected to server
Stopping server....
4. Server stopped
Net error, server, acceptor: ERROR_OPERATION_ABORTED
Net error, server, ERROR_OPERATION_ABORTED
Client session deleted
6. Server started again
(HERE SHOULD BE "8. Client again connected to server", but the server didn't recognize the reconnected client!)
And from the client:
Enter 0 - server, 1 - client: 1
2. Client started and connected to server
Net error, client: ERROR_FILE_NOT_FOUND
5. Client disconnected from server
Waiting 3 sec before reconnect...
Connecting to server...
7. Client started and connected to server
(WHEN I CLOSE THE SERVER WINDOW, I RECVEIVE HERE THE "Net error, client: WSAECONNRESET" MESSAGE - it means client was connected to server anyhow!)
If the code of NetClient, NetSession and Network is necessary, just let me know.
Thanks in advance
Solution 1:[1]
Wow. There's a lot to unpack. There is quite a lot of code smell that reminds me of some books on Asio programming that turned out to be... not excellent in my previous experience.
I couldn't give any real advice without grokking your code, which requires me to review in-depth and add missing bits. So let me just provide you with my reviewed/fixed code first, then we'll talk about some of the details.
A few areas where you seemed to have trouble making up your mind:
whether to use a strand or to use mutex locking
whether to use async or sync (e.g.
closeSessionis completely synchronous and blokcing)whether to use shared-pointers for lifetime or not: on the one hand you have
NetSesionsupport shared_from_this, but on the other hand you are keeping them alive in asessionscollection.whether to use smart pointers or raw pointers (sp.get() is a code smell)
whether to use void* pointers or forward declared structs for opaque implementation
whether to use exceptions or to use error codes. Specifically:
return e.code().value();is a Very Bad Idea. Just return
error_codealready. Or just propagate the exception.judging from the use, my best bet is that
sessionsisstd::set<SessionPtr>. Then it's funny that you're doing linear searches. In fact,findSessioncould be:SessionPtr findSession(SessionPtr const& session) { std::lock_guard guard(mtxSessions); return sessions.contains(session)? session: nullptr; }In fact, given some natural invariants, it could just be
auto findSession(SessionPtr s) { return std::move(s); }Note as well, you had forgotten to lock the mutex in
findSessioncloseSessioncompletely violates Law Of Demeter, 6*3 times over if you will. In my example I make it soSessHandleis a weak pointer toNetSessionand you can just write:for (auto& handle : sessions) if (auto sess = handle.lock()) sess->close();Of course,
sess->close()should not blockAlso, it should correctly synchronize on the session e.g. using the sessions strand:
void close() { return post(sock_.get_executor(), [this, self = shared_from_this()] { error_code ec; if (!ec) sock_.cancel(ec); if (!ec) sock_.shutdown(tcp::socket::shutdown_send, ec); if (!ec) sock_.close(ec); }); }If you insist, you can make it so the caller can still await the result and receive any exceptions:
std::future<void> close() { return post( sock_.get_executor(), std::packaged_task<void()>{[this, self = shared_from_this()] { sock_.cancel(); sock_.shutdown(tcp::socket::shutdown_send); sock_.close(); }}); }Honestly, that seems overkill since you never look at the return value anyways.
In general, I recommend leaving
socket::close()to the destructor. It avoids a specific class of race-conditions on socket handles.Don't use boolean flags (
isThreadActiveis better replaced withth.joinable())apparently you had
NetSession::stopwhich I imagine did largely the same ascloseSessionbut in the right place? I replaced it with the newNetSession::closesubtly when
accHandlerreturned false, you would exit the accept loop alltogether. I doubt that was on purposetry to minimize time under locks:
std::future<void> close() { return post( sock_.get_executor(), std::packaged_task<void()>{[this, self = shared_from_this()] { sock_.cancel(); sock_.shutdown(tcp::socket::shutdown_send); sock_.close(); }}); }I will show you how to do without the lock entirely instead.
#include <boost/asio.hpp>
#include <boost/system/error_code.hpp>
#include <deque>
#include <iostream>
#include <iomanip>
#include <set>
using namespace std::chrono_literals;
using namespace std::placeholders;
namespace asio = boost::asio;
using asio::ip::tcp;
using boost::system::error_code;
static inline std::ostream debug(std::cerr.rdbuf());
struct Network {
static constexpr error_code NET_OK{};
};
struct NetSession; // opaque forward reference
struct NetServer;
using SessHandle = std::weak_ptr<NetSession>; // opaque handle
using Sessions = std::set<SessHandle, std::owner_less<>>;
struct NetSession : std::enable_shared_from_this<NetSession> {
NetSession(tcp::socket&& s, NetServer& srv, bool)
: sock_(std::move(s))
, srv_(srv) {
debug << "New session from " << getPeer() << std::endl;
}
void start() {
post(sock_.get_executor(),
std::bind(&NetSession::do_read, shared_from_this()));
}
tcp::endpoint getPeer() const { return peer_; }
void close() {
return post(sock_.get_executor(), [this, self = shared_from_this()] {
debug << "Closing " << getPeer() << std::endl;
error_code ec;
if (!ec) sock_.cancel(ec);
if (!ec) sock_.shutdown(tcp::socket::shutdown_send, ec);
// if (!ec) sock_.close(ec);
});
}
void addMessage(std::string msg) {
post(sock_.get_executor(),
[this, msg = std::move(msg), self = shared_from_this()] {
outgoing_.push_back(std::move(msg));
if (canWrite())
write_loop();
});
}
private:
// assumed on (logical) strand
bool canWrite() const { // FIXME misnomer: shouldStartWriteLoop()?
return outgoing_.size() == 1;
}
void write_loop() {
if (outgoing_.empty())
return;
async_write(sock_, asio::buffer(outgoing_.front()),
[this, self = shared_from_this()](error_code ec, size_t) {
if (!ec) {
outgoing_.pop_front();
write_loop();
}
});
}
void do_read() {
incoming_.clear();
async_read_until(
sock_, asio::dynamic_buffer(incoming_), "\n",
std::bind(&NetSession::on_read, shared_from_this(), _1, _2));
}
void on_read(error_code ec, size_t);
tcp::socket sock_;
tcp::endpoint peer_ = sock_.remote_endpoint();
NetServer& srv_;
std::string incoming_;
std::deque<std::string> outgoing_;
};
using SessionPtr = std::shared_ptr<NetSession>;
using SocketSharedPtr = std::shared_ptr<tcp::socket>;
struct NetServer {
NetServer(Network& netRef) : net{netRef} {}
~NetServer()
{
if (acceptor.is_open())
acceptor.cancel(); // TODO seems pretty redundant
stop();
if (th.joinable())
th.join();
}
std::function<bool()> accHandler;
std::function<void(SocketSharedPtr, error_code)> errHandler;
// TODO headerHandler
std::function<void(SessionPtr, error_code, std::string)> dataHandler;
error_code start() {
assert(accHandler);
assert(errHandler);
assert(dataHandler);
closeAll(sessions);
error_code ec;
if (!ec) acceptor.open(tcp::v4(), ec);
if (!ec) acceptor.bind({{}, srvPort}, ec);
if (!ec) acceptor.listen(tcp::socket::max_listen_connections, ec);
if (!ec) {
do_accept();
if (!th.joinable()) {
th = std::thread([this] { ioc.run(); }); // TODO exceptions!
}
}
if (ec && acceptor.is_open())
acceptor.close();
return ec;
}
void stop() { //
post(ioc, std::bind(&NetServer::do_stop, this));
}
void closeSession(SessHandle handle, bool erase = true) {
post(acceptor.get_executor(), [=, this] {
if (auto s = handle.lock()) {
s->close();
}
if (erase) {
sessions.erase(handle);
}
});
}
void closeAll() {
post(acceptor.get_executor(), [this] {
closeAll(sessions);
sessions.clear();
});
}
// TODO FIXME is the return value worth it?
bool write(SessionPtr const& session, std::string msg) {
return post(acceptor.get_executor(),
std::packaged_task<bool()>{std::bind(
&NetServer::do_write, this, session, std::move(msg))})
.get();
}
// compare
void writeAll(std::string msg) {
post(acceptor.get_executor(),
std::bind(&NetServer::do_write_all, this, std::move(msg)));
}
private:
Network& net;
asio::io_context ioc;
tcp::acceptor acceptor{ioc}; // active -> acceptor.is_open()
std::thread th; // threadActive -> th.joinable()
Sessions sessions;
std::uint16_t srvPort = 8989;
// std::mutex mtxSessions; // note naming; also replaced by logical strand
// assumed on acceptor logical strand
void do_accept() {
acceptor.async_accept(
make_strand(ioc), [this](error_code ec, tcp::socket sock) {
if (ec.failed()) {
return errHandler(nullptr, ec);
}
if (accHandler()) {
auto s = std::make_shared<NetSession>(std::move(sock),
*this, true);
sessions.insert(s);
s->start();
}
do_accept();
});
}
SessionPtr do_findSession(SessionPtr const& session) {
return sessions.contains(session) ? session : nullptr;
}
bool do_write(SessionPtr session, std::string msg) {
if (auto s = do_findSession(session)) {
s->addMessage(std::move(msg));
return true;
}
return false;
}
void do_write_all(std::string msg) {
for(auto& handle : sessions)
if (auto sess = handle.lock())
do_write(sess, msg);
}
static void closeAll(Sessions const& sessions) {
for (auto& handle : sessions)
if (auto sess = handle.lock())
sess->close();
}
void do_stop()
{
if (acceptor.is_open()) {
error_code ec;
acceptor.close(ec); // TODO error handling?
}
closeAll(sessions); // TODO FIXME why not clear sessions?
}
};
// Implementation must be after NetServer definition:
void NetSession::on_read(error_code ec, size_t) {
if (srv_.dataHandler)
srv_.dataHandler(shared_from_this(), ec, std::move(incoming_));
if (!ec)
do_read();
}
int main() {
Network net;
NetServer srv{net};
srv.accHandler = [] { return true; };
srv.errHandler = [](SocketSharedPtr, error_code ec) {
debug << "errHandler: " << ec.message() << std::endl;
};
srv.dataHandler = [](SessionPtr sess, error_code ec, std::string msg) {
debug << "dataHandler: " << sess->getPeer() << " " << ec.message()
<< " " << std::quoted(msg) << std::endl;
};
srv.start();
std::this_thread::sleep_for(10s);
std::cout << "Shutdown started" << std::endl;
srv.writeAll("We're going to shutdown, take care!\n");
srv.stop();
}
Live Demo:
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | sehe |

