0
0
mirror of https://github.com/yse/easy_profiler.git synced 2024-12-28 01:04:41 +08:00

Connect by non-blocking socket

This commit is contained in:
Sergey Yagovtsev 2016-09-27 23:00:49 +03:00
parent 4a05cafab4
commit fd9a172b9b
3 changed files with 136 additions and 97 deletions

View File

@ -52,7 +52,8 @@ public:
CONNECTION_STATE_UNKNOWN, CONNECTION_STATE_UNKNOWN,
CONNECTION_STATE_SUCCESS, CONNECTION_STATE_SUCCESS,
CONNECTION_STATE_DISCONNECTED CONNECTION_STATE_DISCONNECTED,
CONNECTION_STATE_IN_PROGRESS
}; };
private: private:

View File

@ -131,17 +131,17 @@ EasyMainWindow::EasyMainWindow() : Parent()
toolbar->addWidget(new QLabel(" IP:")); toolbar->addWidget(new QLabel(" IP:"));
m_ipEdit = new QLineEdit(); m_ipEdit = new QLineEdit();
m_ipEdit->setMaximumWidth(100);
QRegExp rx("[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}"); QRegExp rx("[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}");
m_ipEdit->setInputMask("000.000.000.000;"); m_ipEdit->setInputMask("000.000.000.000;");
m_ipEdit->setValidator(new QRegExpValidator(rx, m_ipEdit)); m_ipEdit->setValidator(new QRegExpValidator(rx, m_ipEdit));
m_ipEdit->setText("127.0.0.1"); m_ipEdit->setText("127.0.0.1");
m_ipEdit->setFixedWidth(m_ipEdit->fontMetrics().width(QString("255.255.255.255")) + 20);
toolbar->addWidget(m_ipEdit); toolbar->addWidget(m_ipEdit);
toolbar->addWidget(new QLabel(" Port:")); toolbar->addWidget(new QLabel(" Port:"));
m_portEdit = new QLineEdit(); m_portEdit = new QLineEdit();
m_portEdit->setMaximumWidth(80); m_portEdit->setMaximumWidth(80);
m_portEdit->setValidator(new QIntValidator(1024, 65536, m_portEdit)); m_portEdit->setValidator(new QIntValidator(1, 65535, m_portEdit));
m_portEdit->setText(QString::number(::profiler::DEFAULT_PORT)); m_portEdit->setText(QString::number(::profiler::DEFAULT_PORT));
toolbar->addWidget(m_portEdit); toolbar->addWidget(m_portEdit);
@ -642,7 +642,7 @@ void EasyMainWindow::onListenerDialogClose(int)
break; break;
} }
default: default:
return; return;
} }
@ -974,106 +974,106 @@ void EasyMainWindow::onBlockStatusChange(::profiler::block_id_t _id, ::profiler:
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
EasySocketListener::EasySocketListener() : m_receivedSize(0), m_regime(LISTENER_IDLE) EasySocketListener::EasySocketListener() : m_receivedSize(0), m_regime(LISTENER_IDLE)
{ {
m_bInterrupt = ATOMIC_VAR_INIT(false); m_bInterrupt = ATOMIC_VAR_INIT(false);
m_bConnected = ATOMIC_VAR_INIT(false); m_bConnected = ATOMIC_VAR_INIT(false);
} }
EasySocketListener::~EasySocketListener() EasySocketListener::~EasySocketListener()
{ {
m_bInterrupt.store(true, ::std::memory_order_release); m_bInterrupt.store(true, ::std::memory_order_release);
if (m_thread.joinable()) if (m_thread.joinable())
m_thread.join(); m_thread.join();
} }
bool EasySocketListener::connected() const bool EasySocketListener::connected() const
{ {
return m_bConnected.load(::std::memory_order_acquire); return m_bConnected.load(::std::memory_order_acquire);
} }
EasyListenerRegime EasySocketListener::regime() const EasyListenerRegime EasySocketListener::regime() const
{ {
return m_regime; return m_regime;
} }
uint64_t EasySocketListener::size() const uint64_t EasySocketListener::size() const
{ {
return m_receivedSize; return m_receivedSize;
} }
::std::stringstream& EasySocketListener::data() ::std::stringstream& EasySocketListener::data()
{ {
return m_receivedData; return m_receivedData;
} }
void EasySocketListener::clearData() void EasySocketListener::clearData()
{ {
decltype(m_receivedData) dummy; decltype(m_receivedData) dummy;
dummy.swap(m_receivedData); dummy.swap(m_receivedData);
m_receivedSize = 0; m_receivedSize = 0;
} }
bool EasySocketListener::connect(const char* _ipaddress, uint16_t _port) bool EasySocketListener::connect(const char* _ipaddress, uint16_t _port)
{ {
if (connected()) if (connected())
return true; return true;
m_easySocket.flush(); m_easySocket.flush();
m_easySocket.init(); m_easySocket.init();
int res = m_easySocket.setAddress(_ipaddress, _port); int res = m_easySocket.setAddress(_ipaddress, _port);
res = m_easySocket.connect(); res = m_easySocket.connect();
bool isConnected = res == 0; bool isConnected = res == 0;
m_bConnected.store(isConnected, ::std::memory_order_release); m_bConnected.store(isConnected, ::std::memory_order_release);
return isConnected; return isConnected;
} }
void EasySocketListener::startCapture() void EasySocketListener::startCapture()
{ {
clearData(); clearData();
profiler::net::Message request(profiler::net::MESSAGE_TYPE_REQUEST_START_CAPTURE); profiler::net::Message request(profiler::net::MESSAGE_TYPE_REQUEST_START_CAPTURE);
m_easySocket.send(&request, sizeof(request)); m_easySocket.send(&request, sizeof(request));
m_regime = LISTENER_CAPTURE; m_regime = LISTENER_CAPTURE;
m_thread = ::std::move(::std::thread(&EasySocketListener::listenCapture, this)); m_thread = ::std::move(::std::thread(&EasySocketListener::listenCapture, this));
} }
void EasySocketListener::stopCapture() void EasySocketListener::stopCapture()
{ {
if (!m_thread.joinable() || m_regime != LISTENER_CAPTURE) if (!m_thread.joinable() || m_regime != LISTENER_CAPTURE)
return; return;
profiler::net::Message request(profiler::net::MESSAGE_TYPE_REQUEST_STOP_CAPTURE); profiler::net::Message request(profiler::net::MESSAGE_TYPE_REQUEST_STOP_CAPTURE);
m_easySocket.send(&request, sizeof(request)); m_easySocket.send(&request, sizeof(request));
m_thread.join(); m_thread.join();
m_regime = LISTENER_IDLE; m_regime = LISTENER_IDLE;
} }
void EasySocketListener::requestBlocksDescription() void EasySocketListener::requestBlocksDescription()
{ {
clearData(); clearData();
profiler::net::Message request(profiler::net::MESSAGE_TYPE_REQUEST_BLOCKS_DESCRIPTION); profiler::net::Message request(profiler::net::MESSAGE_TYPE_REQUEST_BLOCKS_DESCRIPTION);
m_easySocket.send(&request, sizeof(request)); m_easySocket.send(&request, sizeof(request));
m_regime = LISTENER_DESCRIBE; m_regime = LISTENER_DESCRIBE;
listenDescription(); listenDescription();
m_regime = LISTENER_IDLE; m_regime = LISTENER_IDLE;
} }
void EasySocketListener::sendBlockStatus(::profiler::block_id_t _id, ::profiler::EasyBlockStatus _status) void EasySocketListener::sendBlockStatus(::profiler::block_id_t _id, ::profiler::EasyBlockStatus _status)
{ {
profiler::net::BlockStatusMessage message(_id, static_cast<uint8_t>(_status)); profiler::net::BlockStatusMessage message(_id, static_cast<uint8_t>(_status));
m_easySocket.send(&message, sizeof(message)); m_easySocket.send(&message, sizeof(message));
} }
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
void EasySocketListener::listenCapture() void EasySocketListener::listenCapture()
{ {
// TODO: Merge functions listenCapture() and listenDescription() // TODO: Merge functions listenCapture() and listenDescription()

View File

@ -19,6 +19,7 @@ along with this program.If not, see <http://www.gnu.org/licenses/>.
#include "profiler/easy_socket.h" #include "profiler/easy_socket.h"
#include <string.h> #include <string.h>
#include <thread>
#ifdef _WIN32 #ifdef _WIN32
#pragma comment (lib, "Ws2_32.lib") #pragma comment (lib, "Ws2_32.lib")
@ -80,6 +81,8 @@ void EasySocket::flush()
#ifdef _WIN32 #ifdef _WIN32
m_socket = 0; m_socket = 0;
m_replySocket = 0; m_replySocket = 0;
#else
wsaret = 0;
#endif #endif
} }
@ -98,10 +101,12 @@ void EasySocket::checkResult(int result)
error_code = WSAGetLastError(); error_code = WSAGetLastError();
const int CONNECTION_ABORTED = WSAECONNABORTED; const int CONNECTION_ABORTED = WSAECONNABORTED;
const int CONNECTION_RESET = WSAECONNRESET; const int CONNECTION_RESET = WSAECONNRESET;
const int CONNECTION_IN_PROGRESS = WSAEINPROGRESS;
#else #else
error_code = errno; error_code = errno;
const int CONNECTION_ABORTED = ECONNABORTED; const int CONNECTION_ABORTED = ECONNABORTED;
const int CONNECTION_RESET = ECONNRESET; const int CONNECTION_RESET = ECONNRESET;
const int CONNECTION_IN_PROGRESS = EINPROGRESS;
#endif #endif
switch(error_code) switch(error_code)
@ -110,6 +115,10 @@ void EasySocket::checkResult(int result)
case CONNECTION_RESET: case CONNECTION_RESET:
m_state = CONNECTION_STATE_DISCONNECTED; m_state = CONNECTION_STATE_DISCONNECTED;
break; break;
case CONNECTION_IN_PROGRESS:
m_state = CONNECTION_STATE_IN_PROGRESS;
break;
default: default:
break; break;
} }
@ -132,7 +141,9 @@ void EasySocket::init()
return; return;
setBlocking(m_socket,true); setBlocking(m_socket,true);
#ifndef _WIN32
wsaret = 1;
#endif
int opt = 1; int opt = 1;
setsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, (const char *)&opt, sizeof(opt)); setsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, (const char *)&opt, sizeof(opt));
} }
@ -240,8 +251,35 @@ int EasySocket::connect()
return -1; return -1;
//fprintf(stderr,"ERROR, no such host\n"); //fprintf(stderr,"ERROR, no such host\n");
} }
int res = ::connect(m_socket,(struct sockaddr *) &serv_addr,sizeof(serv_addr)); setBlocking(m_socket,false);
checkResult(res);
int counter = 0;
int sleepMs = 20;
int waitSec = 1;
int waitMs = waitSec*1000/sleepMs;
int res = 0;
while(counter++ < waitMs)
{
res = ::connect(m_socket,(struct sockaddr *) &serv_addr,sizeof(serv_addr));
checkResult(res);
if (res == 0)
break;
if (m_state == CONNECTION_STATE_IN_PROGRESS)
{
std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs));
continue;
}
if(m_state != CONNECTION_STATE_IN_PROGRESS && m_state != CONNECTION_STATE_SUCCESS )
break;
}
setBlocking(m_socket,true);
if(res == 0){ if(res == 0){
struct timeval tv; struct timeval tv;