From fd9a172b9b4758f673b991477e82d16f359a458f Mon Sep 17 00:00:00 2001 From: Sergey Yagovtsev Date: Tue, 27 Sep 2016 23:00:49 +0300 Subject: [PATCH] Connect by non-blocking socket --- include/profiler/easy_socket.h | 3 +- profiler_gui/main_window.cpp | 186 ++++++++++++++++----------------- src/easy_socket.cpp | 44 +++++++- 3 files changed, 136 insertions(+), 97 deletions(-) diff --git a/include/profiler/easy_socket.h b/include/profiler/easy_socket.h index 5d72119..865f969 100644 --- a/include/profiler/easy_socket.h +++ b/include/profiler/easy_socket.h @@ -52,7 +52,8 @@ public: CONNECTION_STATE_UNKNOWN, CONNECTION_STATE_SUCCESS, - CONNECTION_STATE_DISCONNECTED + CONNECTION_STATE_DISCONNECTED, + CONNECTION_STATE_IN_PROGRESS }; private: diff --git a/profiler_gui/main_window.cpp b/profiler_gui/main_window.cpp index c539b2e..9512e6e 100644 --- a/profiler_gui/main_window.cpp +++ b/profiler_gui/main_window.cpp @@ -131,17 +131,17 @@ EasyMainWindow::EasyMainWindow() : Parent() toolbar->addWidget(new QLabel(" IP:")); m_ipEdit = new QLineEdit(); - m_ipEdit->setMaximumWidth(100); QRegExp rx("[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}"); m_ipEdit->setInputMask("000.000.000.000;"); m_ipEdit->setValidator(new QRegExpValidator(rx, m_ipEdit)); m_ipEdit->setText("127.0.0.1"); + m_ipEdit->setFixedWidth(m_ipEdit->fontMetrics().width(QString("255.255.255.255")) + 20); toolbar->addWidget(m_ipEdit); toolbar->addWidget(new QLabel(" Port:")); m_portEdit = new QLineEdit(); m_portEdit->setMaximumWidth(80); - m_portEdit->setValidator(new QIntValidator(1024, 65536, m_portEdit)); + m_portEdit->setValidator(new QIntValidator(1, 65535, m_portEdit)); m_portEdit->setText(QString::number(::profiler::DEFAULT_PORT)); toolbar->addWidget(m_portEdit); @@ -642,7 +642,7 @@ void EasyMainWindow::onListenerDialogClose(int) break; } - default: + default: return; } @@ -974,106 +974,106 @@ void EasyMainWindow::onBlockStatusChange(::profiler::block_id_t _id, ::profiler: ////////////////////////////////////////////////////////////////////////// -EasySocketListener::EasySocketListener() : m_receivedSize(0), m_regime(LISTENER_IDLE) -{ - m_bInterrupt = ATOMIC_VAR_INIT(false); - m_bConnected = ATOMIC_VAR_INIT(false); -} - -EasySocketListener::~EasySocketListener() -{ - m_bInterrupt.store(true, ::std::memory_order_release); - if (m_thread.joinable()) - m_thread.join(); -} - -bool EasySocketListener::connected() const -{ - return m_bConnected.load(::std::memory_order_acquire); -} - -EasyListenerRegime EasySocketListener::regime() const -{ - return m_regime; -} - -uint64_t EasySocketListener::size() const -{ - return m_receivedSize; -} - -::std::stringstream& EasySocketListener::data() -{ - return m_receivedData; -} - -void EasySocketListener::clearData() -{ +EasySocketListener::EasySocketListener() : m_receivedSize(0), m_regime(LISTENER_IDLE) +{ + m_bInterrupt = ATOMIC_VAR_INIT(false); + m_bConnected = ATOMIC_VAR_INIT(false); +} + +EasySocketListener::~EasySocketListener() +{ + m_bInterrupt.store(true, ::std::memory_order_release); + if (m_thread.joinable()) + m_thread.join(); +} + +bool EasySocketListener::connected() const +{ + return m_bConnected.load(::std::memory_order_acquire); +} + +EasyListenerRegime EasySocketListener::regime() const +{ + return m_regime; +} + +uint64_t EasySocketListener::size() const +{ + return m_receivedSize; +} + +::std::stringstream& EasySocketListener::data() +{ + return m_receivedData; +} + +void EasySocketListener::clearData() +{ decltype(m_receivedData) dummy; - dummy.swap(m_receivedData); - m_receivedSize = 0; -} - -bool EasySocketListener::connect(const char* _ipaddress, uint16_t _port) -{ - if (connected()) - return true; - + dummy.swap(m_receivedData); + m_receivedSize = 0; +} + +bool EasySocketListener::connect(const char* _ipaddress, uint16_t _port) +{ + if (connected()) + return true; + m_easySocket.flush(); m_easySocket.init(); int res = m_easySocket.setAddress(_ipaddress, _port); - res = m_easySocket.connect(); - - bool isConnected = res == 0; - m_bConnected.store(isConnected, ::std::memory_order_release); - - return isConnected; -} - -void EasySocketListener::startCapture() -{ - clearData(); - + res = m_easySocket.connect(); + + bool isConnected = res == 0; + m_bConnected.store(isConnected, ::std::memory_order_release); + + return isConnected; +} + +void EasySocketListener::startCapture() +{ + clearData(); + profiler::net::Message request(profiler::net::MESSAGE_TYPE_REQUEST_START_CAPTURE); - m_easySocket.send(&request, sizeof(request)); - - m_regime = LISTENER_CAPTURE; + m_easySocket.send(&request, sizeof(request)); + + m_regime = LISTENER_CAPTURE; m_thread = ::std::move(::std::thread(&EasySocketListener::listenCapture, this)); -} - -void EasySocketListener::stopCapture() -{ - if (!m_thread.joinable() || m_regime != LISTENER_CAPTURE) - return; - +} + +void EasySocketListener::stopCapture() +{ + if (!m_thread.joinable() || m_regime != LISTENER_CAPTURE) + return; + profiler::net::Message request(profiler::net::MESSAGE_TYPE_REQUEST_STOP_CAPTURE); m_easySocket.send(&request, sizeof(request)); - m_thread.join(); - - m_regime = LISTENER_IDLE; -} - -void EasySocketListener::requestBlocksDescription() -{ - clearData(); - + m_thread.join(); + + m_regime = LISTENER_IDLE; +} + +void EasySocketListener::requestBlocksDescription() +{ + clearData(); + profiler::net::Message request(profiler::net::MESSAGE_TYPE_REQUEST_BLOCKS_DESCRIPTION); - m_easySocket.send(&request, sizeof(request)); - - m_regime = LISTENER_DESCRIBE; - listenDescription(); - m_regime = LISTENER_IDLE; -} - -void EasySocketListener::sendBlockStatus(::profiler::block_id_t _id, ::profiler::EasyBlockStatus _status) -{ - profiler::net::BlockStatusMessage message(_id, static_cast(_status)); - m_easySocket.send(&message, sizeof(message)); -} - -////////////////////////////////////////////////////////////////////////// - + m_easySocket.send(&request, sizeof(request)); + + m_regime = LISTENER_DESCRIBE; + listenDescription(); + m_regime = LISTENER_IDLE; +} + +void EasySocketListener::sendBlockStatus(::profiler::block_id_t _id, ::profiler::EasyBlockStatus _status) +{ + profiler::net::BlockStatusMessage message(_id, static_cast(_status)); + m_easySocket.send(&message, sizeof(message)); +} + +////////////////////////////////////////////////////////////////////////// + void EasySocketListener::listenCapture() { // TODO: Merge functions listenCapture() and listenDescription() diff --git a/src/easy_socket.cpp b/src/easy_socket.cpp index faed22d..4550200 100644 --- a/src/easy_socket.cpp +++ b/src/easy_socket.cpp @@ -19,6 +19,7 @@ along with this program.If not, see . #include "profiler/easy_socket.h" #include +#include #ifdef _WIN32 #pragma comment (lib, "Ws2_32.lib") @@ -80,6 +81,8 @@ void EasySocket::flush() #ifdef _WIN32 m_socket = 0; m_replySocket = 0; +#else + wsaret = 0; #endif } @@ -98,10 +101,12 @@ void EasySocket::checkResult(int result) error_code = WSAGetLastError(); const int CONNECTION_ABORTED = WSAECONNABORTED; const int CONNECTION_RESET = WSAECONNRESET; + const int CONNECTION_IN_PROGRESS = WSAEINPROGRESS; #else error_code = errno; const int CONNECTION_ABORTED = ECONNABORTED; const int CONNECTION_RESET = ECONNRESET; + const int CONNECTION_IN_PROGRESS = EINPROGRESS; #endif switch(error_code) @@ -110,6 +115,10 @@ void EasySocket::checkResult(int result) case CONNECTION_RESET: m_state = CONNECTION_STATE_DISCONNECTED; break; + case CONNECTION_IN_PROGRESS: + m_state = CONNECTION_STATE_IN_PROGRESS; + break; + default: break; } @@ -132,7 +141,9 @@ void EasySocket::init() return; setBlocking(m_socket,true); - +#ifndef _WIN32 + wsaret = 1; +#endif int opt = 1; setsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, (const char *)&opt, sizeof(opt)); } @@ -240,8 +251,35 @@ int EasySocket::connect() return -1; //fprintf(stderr,"ERROR, no such host\n"); } - int res = ::connect(m_socket,(struct sockaddr *) &serv_addr,sizeof(serv_addr)); - checkResult(res); + setBlocking(m_socket,false); + + int counter = 0; + int sleepMs = 20; + int waitSec = 1; + int waitMs = waitSec*1000/sleepMs; + int res = 0; + while(counter++ < waitMs) + { + res = ::connect(m_socket,(struct sockaddr *) &serv_addr,sizeof(serv_addr)); + + checkResult(res); + + if (res == 0) + break; + + if (m_state == CONNECTION_STATE_IN_PROGRESS) + { + std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs)); + continue; + } + + + if(m_state != CONNECTION_STATE_IN_PROGRESS && m_state != CONNECTION_STATE_SUCCESS ) + break; + } + + + setBlocking(m_socket,true); if(res == 0){ struct timeval tv;