diff --git a/easy_profiler_core/easy_socket.cpp b/easy_profiler_core/easy_socket.cpp index 64ee7e6..51a6ce3 100644 --- a/easy_profiler_core/easy_socket.cpp +++ b/easy_profiler_core/easy_socket.cpp @@ -40,281 +40,342 @@ limitations under the License. **/ #include - #include #include -#ifdef _WIN32 -#pragma comment (lib, "Ws2_32.lib") -#pragma comment (lib, "Mswsock.lib") -#pragma comment (lib, "AdvApi32.lib") +#if defined(_WIN32) +# pragma comment (lib, "Ws2_32.lib") +# pragma comment (lib, "Mswsock.lib") +# pragma comment (lib, "AdvApi32.lib") #else -#include -#include +# include +# include #endif -bool EasySocket::checkSocket(socket_t s) const -{ - return s > 0; -} +///////////////////////////////////////////////////////////////// -int EasySocket::_close(EasySocket::socket_t s) +#if defined(_WIN32) +const int SOCK_ABORTED = WSAECONNABORTED; +const int SOCK_RESET = WSAECONNRESET; +const int SOCK_IN_PROGRESS = WSAEINPROGRESS; +#else +const int SOCK_ABORTED = ECONNABORTED; +const int SOCK_RESET = ECONNRESET; +const int SOCK_IN_PROGRESS = EINPROGRESS; +const int SOCK_BROKEN_PIPE = EPIPE; +const int SOCK_ENOENT = ENOENT; +#endif + +const int SEND_BUFFER_SIZE = 64 * 1024 * 1024; + +///////////////////////////////////////////////////////////////// + +static int closeSocket(EasySocket::socket_t s) { -#ifdef _WIN32 +#if defined(_WIN32) return ::closesocket(s); #else return ::close(s); #endif } +///////////////////////////////////////////////////////////////// + +bool EasySocket::checkSocket(socket_t s) const +{ + return s > 0; +} + void EasySocket::setBlocking(EasySocket::socket_t s, bool blocking) { - -#ifdef _WIN32 +#if defined(_WIN32) u_long iMode = blocking ? 0 : 1;//0 - blocking, 1 - non blocking - ioctlsocket(s, FIONBIO, &iMode); + ::ioctlsocket(s, FIONBIO, &iMode); #else - const int iMode = blocking ? 0 : 1;//0 - blocking, 1 - non blocking - ioctl(s, FIONBIO, (char *)&iMode); + int iMode = blocking ? 0 : 1;//0 - blocking, 1 - non blocking + ::ioctl(s, FIONBIO, (char*)&iMode); #endif } -int EasySocket::bind(uint16_t portno) +int EasySocket::bind(uint16_t port) { - if (!checkSocket(m_socket)) return -1; + if (!checkSocket(m_socket)) + return -1; struct sockaddr_in serv_addr; memset(&serv_addr, 0, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = INADDR_ANY; - serv_addr.sin_port = htons(portno); - auto res = ::bind(m_socket, (struct sockaddr *) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_port = htons(port); + auto res = ::bind(m_socket, (struct sockaddr*)&serv_addr, sizeof(serv_addr)); + return res; } void EasySocket::flush() { - if (m_socket){ - _close(m_socket); - } - if (m_replySocket != m_socket){ - _close(m_replySocket); - } -#ifdef _WIN32 + if (m_socket) + closeSocket(m_socket); + + if (m_replySocket != m_socket) + closeSocket(m_replySocket); + +#if defined(_WIN32) m_socket = 0; m_replySocket = 0; #else - wsaret = 0; + m_wsaret = 0; #endif } void EasySocket::checkResult(int result) { - //printf("Errno: %s\n", strerror(errno)); - if(result >= 0){ - m_state = CONNECTION_STATE_SUCCESS; + if (result >= 0) + { + m_state = ConnectionState::Connected; return; - }else if(result == -1){ + } - - int error_code = 0; - -#ifdef _WIN32 - error_code = WSAGetLastError(); - const int CONNECTION_ABORTED = WSAECONNABORTED; - const int CONNECTION_RESET = WSAECONNRESET; - const int CONNECTION_IN_PROGRESS = WSAEINPROGRESS; + if (result == -1) // is this check necessary? + { +#if defined(_WIN32) + const int error_code = WSAGetLastError(); #else - error_code = errno; - const int CONNECTION_ABORTED = ECONNABORTED; - const int CONNECTION_RESET = ECONNRESET; - const int CONNECTION_IN_PROGRESS = EINPROGRESS; - const int CONNECTION_BROKEN_PIPE = EPIPE; - const int CONNECTION_ENOENT = ENOENT; + const int error_code = errno; #endif - switch(error_code) + switch (error_code) { - case CONNECTION_ABORTED: - case CONNECTION_RESET: -#ifndef _WIN32 - case CONNECTION_BROKEN_PIPE: - case CONNECTION_ENOENT: + case SOCK_ABORTED: + case SOCK_RESET: +#if !defined(_WIN32) + case SOCK_BROKEN_PIPE: + case SOCK_ENOENT: #endif - m_state = CONNECTION_STATE_DISCONNECTED; - break; - case CONNECTION_IN_PROGRESS: - m_state = CONNECTION_STATE_IN_PROGRESS; - break; - default: - break; + m_state = ConnectionState::Disconnected; + break; + + case SOCK_IN_PROGRESS: + m_state = ConnectionState::Connecting; + break; + + default: + break; } } } void EasySocket::init() { - if (wsaret == 0) - { - int protocol = 0; -#ifdef _WIN32 - protocol = IPPROTO_TCP; -#endif - m_socket = socket(AF_INET, SOCK_STREAM, protocol); - if (!checkSocket(m_socket)) { - return; - } - }else + if (m_wsaret != 0) return; - setBlocking(m_socket,true); -#ifndef _WIN32 - wsaret = 1; +#if !defined(_WIN32) + const int protocol = 0; +#else + const int protocol = IPPROTO_TCP; #endif - int opt = 1; - setsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, (const char *)&opt, sizeof(opt)); + + m_socket = ::socket(AF_INET, SOCK_STREAM, protocol); + if (!checkSocket(m_socket)) + return; + + setBlocking(m_socket, true); + +#if !defined(_WIN32) + m_wsaret = 1; +#endif + + const int opt = 1; + ::setsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(int)); +} + +EasySocket::ConnectionState EasySocket::state() const +{ + return m_state; +} + +bool EasySocket::isDisconnected() const +{ + return static_cast(m_state) <= 0; +} + +bool EasySocket::isConnected() const +{ + return m_state == ConnectionState::Connected; } EasySocket::EasySocket() { -#ifdef _WIN32 +#if defined(_WIN32) WSADATA wsaData; - wsaret = WSAStartup(0x101, &wsaData); + m_wsaret = WSAStartup(0x101, &wsaData); #else - wsaret = 0; + m_wsaret = 0; #endif + init(); -#ifndef _WIN32 - wsaret = 1; -#endif } EasySocket::~EasySocket() { flush(); -#ifdef _WIN32 - if (wsaret == 0) + +#if defined(_WIN32) + if (m_wsaret == 0) WSACleanup(); #endif } -int EasySocket::send(const void *buf, size_t nbyte) +void EasySocket::setReceiveTimeout(int milliseconds) { - if(!checkSocket(m_replySocket)) return -1; - int res = 0; -#if defined(_WIN32) || defined(__APPLE__) - res = ::send(m_replySocket, (const char*)buf, (int)nbyte, 0); + if (!isConnected() || !checkSocket(m_replySocket)) + return; + + if (milliseconds <= 0) + milliseconds = std::numeric_limits::max() / 1000; + +#if defined(_WIN32) + const DWORD timeout = static_cast(milliseconds); + ::setsockopt(m_replySocket, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout, sizeof(DWORD)); #else - res = ::send(m_replySocket,buf,nbyte,MSG_NOSIGNAL); + struct timeval tv; + if (milliseconds >= 1000) + { + const int s = milliseconds / 1000; + const int us = (milliseconds - s * 1000) * 1000; + tv.tv_sec = s; + tv.tv_usec = us; + } + else + { + tv.tv_sec = 0; + tv.tv_usec = milliseconds * 1000; + } + + ::setsockopt(m_replySocket, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof (struct timeval)); #endif +} + +int EasySocket::send(const void* buffer, size_t nbytes) +{ + if (!checkSocket(m_replySocket)) + return -1; + +#if defined(_WIN32) || defined(__APPLE__) + const int res = ::send(m_replySocket, (const char*)buffer, (int)nbytes, 0); +#else + const int res = (int)::send(m_replySocket, buffer, nbytes, MSG_NOSIGNAL); +#endif + checkResult(res); + return res; } -int EasySocket::receive(void *buf, size_t nbyte) +int EasySocket::receive(void* buffer, size_t nbytes) { - if(!checkSocket(m_replySocket)) return -1; - int res = 0; -#ifdef _WIN32 - res = ::recv(m_replySocket, (char*)buf, (int)nbyte, 0); + if (!checkSocket(m_replySocket)) + return -1; + +#if defined(_WIN32) + const int res = ::recv(m_replySocket, (char*)buffer, (int)nbytes, 0); #else - res = ::read(m_replySocket,buf,nbyte); + const int res = (int)::read(m_replySocket, buffer, nbytes); #endif checkResult(res); - if (res == 0){ - m_state = CONNECTION_STATE_DISCONNECTED; - } + if (res == 0) + m_state = ConnectionState::Disconnected; + return res; } int EasySocket::listen(int count) { - if(!checkSocket(m_socket)) return -1; - int res = ::listen(m_socket,count); + if (!checkSocket(m_socket)) + return -1; + + const int res = ::listen(m_socket, count); checkResult(res); + return res; } int EasySocket::accept() { - if(!checkSocket(m_socket)) return -1; + if (!checkSocket(m_socket)) + return -1; - fd_set fdread, fdwrite, fdexcl; - timeval tv = { 0 }; + fd_set fdread; FD_ZERO (&fdread); FD_SET (m_socket, &fdread); - fdwrite = fdread; - fdexcl = fdread; - tv.tv_sec = 0; tv.tv_usec = 500; - int rc =select ((int)m_socket+1, &fdread, &fdwrite, &fdexcl, &tv); + fd_set fdwrite = fdread; + fd_set fdexcl = fdread; - if(rc <= 0){ - //there is no connection for accept - return -1; - } - m_replySocket = ::accept(m_socket,nullptr,nullptr); + struct timeval tv { 0, 500 }; + const int rc = ::select((int)m_socket + 1, &fdread, &fdwrite, &fdexcl, &tv); + if (rc <= 0) + return -1; // there is no connection for accept + m_replySocket = ::accept(m_socket, nullptr, nullptr); checkResult((int)m_replySocket); - if(checkSocket(m_replySocket)) - { - int send_buffer = 64*1024*1024; - int send_buffer_sizeof = sizeof(int); - setsockopt(m_replySocket, SOL_SOCKET, SO_SNDBUF, (char*)&send_buffer, send_buffer_sizeof); - - //int flag = 1; - //int result = setsockopt(m_replySocket,IPPROTO_TCP,TCP_NODELAY,(char *)&flag,sizeof(int)); + if (checkSocket(m_replySocket)) + { + ::setsockopt(m_replySocket, SOL_SOCKET, SO_SNDBUF, (char*)&SEND_BUFFER_SIZE, sizeof(int)); + + //const int flag = 1; + //const int result = setsockopt(m_replySocket,IPPROTO_TCP,TCP_NODELAY,(char *)&flag,sizeof(int)); + +#if defined(__APPLE__) // Apple doesn't have MSG_NOSIGNAL, work around it -#ifdef __APPLE__ - int value = 1; - setsockopt(m_replySocket, SOL_SOCKET, SO_NOSIGPIPE, &value, sizeof(value)); + const int value = 1; + ::setsockopt(m_replySocket, SOL_SOCKET, SO_NOSIGPIPE, &value, sizeof(value)); #endif //setBlocking(m_replySocket,true); } + return (int)m_replySocket; } -bool EasySocket::setAddress(const char *serv, uint16_t portno) +bool EasySocket::setAddress(const char* address, uint16_t port) { - server = gethostbyname(serv); - if (server == NULL) { + m_server = ::gethostbyname(address); + if (m_server == nullptr) return false; - //fprintf(stderr,"ERROR, no such host\n"); - } - memset((char *)&serv_addr, 0, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - memcpy((char *)&serv_addr.sin_addr.s_addr, (char *)server->h_addr, server->h_length); - serv_addr.sin_port = htons(portno); + memset((char*)&m_serverAddress, 0, sizeof(m_serverAddress)); + m_serverAddress.sin_family = AF_INET; + memcpy((char*)&m_serverAddress.sin_addr.s_addr, (char*)m_server->h_addr, (size_t)m_server->h_length); + + m_serverAddress.sin_port = htons(port); return true; } int EasySocket::connect() { - if (server == NULL || m_socket <=0 ) { + if (m_server == nullptr || m_socket <= 0) return -1; - //fprintf(stderr,"ERROR, no such host\n"); - } + int res = 0; //TODO: more intelligence -#ifndef _WIN32 - setBlocking(m_socket,false); +#if !defined(_WIN32) + setBlocking(m_socket, false); - int counter = 0; - int sleepMs = 20; - int waitSec = 1; - int waitMs = waitSec*1000/sleepMs; - - while(counter++ < waitMs) + const int sleepMs = 20; + const int waitSec = 1; + const int waitMs = waitSec * 1000 / sleepMs; + + for (int counter = 0; counter++ < waitMs;) { - res = ::connect(m_socket,(struct sockaddr *) &serv_addr,sizeof(serv_addr)); + res = ::connect(m_socket, (struct sockaddr*)&m_serverAddress, sizeof(m_serverAddress)); +#if defined(__APPLE__) // on Apple, treat EISCONN error as success -#ifdef __APPLE__ if (res == -1 && errno == EISCONN) { res = 0; @@ -323,42 +384,44 @@ int EasySocket::connect() #endif checkResult(res); - if (res == 0) + { break; + } - if (m_state == CONNECTION_STATE_IN_PROGRESS) + if (m_state == ConnectionState::Connecting) { std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs)); continue; } - - if(m_state != CONNECTION_STATE_IN_PROGRESS && m_state != CONNECTION_STATE_SUCCESS ) + if (isDisconnected()) + { break; + } } - setBlocking(m_socket,true); + setBlocking(m_socket, true); #else - res = ::connect(m_socket, (struct sockaddr *) &serv_addr, sizeof(serv_addr)); + res = ::connect(m_socket, (struct sockaddr*)&m_serverAddress, sizeof(m_serverAddress)); checkResult(res); #endif - if(res == 0){ + if (res == 0) + { struct timeval tv; - tv.tv_sec = 1; tv.tv_usec = 0; + ::setsockopt(m_socket, SOL_SOCKET, SO_RCVTIMEO, (char*)&tv, sizeof(struct timeval)); - setsockopt(m_socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv,sizeof(struct timeval)); - -#ifdef __APPLE__ +#if defined(__APPLE__) // Apple doesn't have MSG_NOSIGNAL, work around it - int value = 1; + const int value = 1; setsockopt(m_socket, SOL_SOCKET, SO_NOSIGPIPE, &value, sizeof(value)); #endif m_replySocket = m_socket; } + return res; } diff --git a/easy_profiler_core/include/easy/easy_socket.h b/easy_profiler_core/include/easy/easy_socket.h index ed9a868..23eebf1 100644 --- a/easy_profiler_core/include/easy/easy_socket.h +++ b/easy_profiler_core/include/easy/easy_socket.h @@ -39,8 +39,8 @@ The Apache License, Version 2.0 (the "License"); limitations under the License. **/ -#ifndef EASY________SOCKET_________H -#define EASY________SOCKET_________H +#ifndef EASY_PROFILER_SOCKET_H +#define EASY_PROFILER_SOCKET_H #include #include @@ -72,40 +72,37 @@ public: typedef int socket_t; #endif - enum ConnectionState + enum class ConnectionState : int8_t { - CONNECTION_STATE_UNKNOWN, - CONNECTION_STATE_SUCCESS, - - CONNECTION_STATE_DISCONNECTED, - CONNECTION_STATE_IN_PROGRESS + Disconnected = -1, + Unknown, + Connected, + Connecting }; private: - - void checkResult(int result); - bool checkSocket(socket_t s) const; - static int _close(socket_t s); - void setBlocking(socket_t s, bool blocking); - + socket_t m_socket = 0; socket_t m_replySocket = 0; - int wsaret = -1; + int m_receiveTimeoutMs = 0; + int m_wsaret = -1; - struct hostent * server; - struct sockaddr_in serv_addr; + struct hostent* m_server = nullptr; + struct sockaddr_in m_serverAddress; - ConnectionState m_state = CONNECTION_STATE_UNKNOWN; + ConnectionState m_state = ConnectionState::Unknown; public: EasySocket(); ~EasySocket(); - int send(const void *buf, size_t nbyte); - int receive(void *buf, size_t nbyte); - int listen(int count=5); + void setReceiveTimeout(int milliseconds); + + int send(const void* buf, size_t nbyte); + int receive(void* buf, size_t nbyte); + int listen(int count = 5); int accept(); int bind(uint16_t portno); @@ -115,14 +112,16 @@ public: void flush(); void init(); - void setState(ConnectionState state){m_state=state;} - ConnectionState state() const{return m_state;} + ConnectionState state() const; + bool isDisconnected() const; + bool isConnected() const; - bool isDisconnected() const - { - return m_state == CONNECTION_STATE_UNKNOWN || - m_state == CONNECTION_STATE_DISCONNECTED; - } -}; +private: -#endif // EASY________SOCKET_________H + void checkResult(int result); + bool checkSocket(socket_t s) const; + void setBlocking(socket_t s, bool blocking); + +}; // end of class EasySocket. + +#endif // EASY_PROFILER_SOCKET_H diff --git a/easy_profiler_core/profile_manager.cpp b/easy_profiler_core/profile_manager.cpp index e1bbe55..e069bb0 100644 --- a/easy_profiler_core/profile_manager.cpp +++ b/easy_profiler_core/profile_manager.cpp @@ -50,7 +50,7 @@ #include #include -#include +#include #include "profile_manager.h" #include @@ -654,6 +654,7 @@ ProfileManager::ProfileManager() : m_profilerStatus = ATOMIC_VAR_INIT(EASY_PROF_DISABLED); m_isEventTracingEnabled = ATOMIC_VAR_INIT(EASY_OPTION_EVENT_TRACING_ENABLED); m_isAlreadyListening = ATOMIC_VAR_INIT(false); + m_stopDumping = ATOMIC_VAR_INIT(false); m_stopListen = ATOMIC_VAR_INIT(false); m_mainThreadId = ATOMIC_VAR_INIT(0); @@ -1258,7 +1259,7 @@ char ProfileManager::checkThreadExpired(ThreadStorage& _registeredThread) ////////////////////////////////////////////////////////////////////////// -uint32_t ProfileManager::dumpBlocksToStream(profiler::OStream& _outputStream, bool _lockSpin) +uint32_t ProfileManager::dumpBlocksToStream(profiler::OStream& _outputStream, bool _lockSpin, bool _async) { EASY_LOGMSG("dumpBlocksToStream(_lockSpin = " << _lockSpin << ")...\n"); @@ -1283,6 +1284,12 @@ uint32_t ProfileManager::dumpBlocksToStream(profiler::OStream& _outputStream, bo //m_spin.lock(); // This is the only place using both spins, so no dead-lock will occur + if (_async && m_stopDumping.load(std::memory_order_acquire)) + { + if (_lockSpin) + m_dumpSpin.unlock(); + return 0; + } // Wait for some time to be sure that all operations which began before setEnabled(false) will be finished. // This is much better than inserting spin-lock or atomic variable check into each store operation. @@ -1292,6 +1299,13 @@ uint32_t ProfileManager::dumpBlocksToStream(profiler::OStream& _outputStream, bo EASY_LOG_ONLY(bool logged = false); for (auto it = m_threads.begin(), end = m_threads.end(); it != end;) { + if (_async && m_stopDumping.load(std::memory_order_acquire)) + { + if (_lockSpin) + m_dumpSpin.unlock(); + return 0; + } + if (!it->second.profiledFrameOpened.load(std::memory_order_acquire)) { ++it; @@ -1331,6 +1345,15 @@ uint32_t ProfileManager::dumpBlocksToStream(profiler::OStream& _outputStream, bo { // Read thread context switch events from temporary file + if (_async && m_stopDumping.load(std::memory_order_acquire)) + { + m_spin.unlock(); + m_storedSpin.unlock(); + if (_lockSpin) + m_dumpSpin.unlock(); + return 0; + } + EASY_LOGMSG("Writing context switch events...\n"); uint64_t timestamp = 0; @@ -1344,6 +1367,15 @@ uint32_t ProfileManager::dumpBlocksToStream(profiler::OStream& _outputStream, bo pid_t process_to = 0; while (infile >> timestamp >> thread_from >> thread_to >> next_task_name >> process_to) { + if (_async && m_stopDumping.load(std::memory_order_acquire)) + { + m_spin.unlock(); + m_storedSpin.unlock(); + if (_lockSpin) + m_dumpSpin.unlock(); + return 0; + } + beginContextSwitch(thread_from, timestamp, thread_to, next_task_name.c_str(), false); endContextSwitch(thread_to, (processid_t)process_to, timestamp, false); EASY_LOG_ONLY(++num); @@ -1366,6 +1398,15 @@ uint32_t ProfileManager::dumpBlocksToStream(profiler::OStream& _outputStream, bo uint32_t blocks_number = 0; for (auto it = m_threads.begin(), end = m_threads.end(); it != end;) { + if (_async && m_stopDumping.load(std::memory_order_acquire)) + { + m_spin.unlock(); + m_storedSpin.unlock(); + if (_lockSpin) + m_dumpSpin.unlock(); + return 0; + } + auto& t = it->second; uint32_t num = static_cast(t.blocks.closedList.size()) + static_cast(t.sync.closedList.size()); const char expired = ProfileManager::checkThreadExpired(t); @@ -1448,6 +1489,15 @@ uint32_t ProfileManager::dumpBlocksToStream(profiler::OStream& _outputStream, bo // Write blocks and context switch events for each thread for (auto it = m_threads.begin(), end = m_threads.end(); it != end;) { + if (_async && m_stopDumping.load(std::memory_order_acquire)) + { + m_spin.unlock(); + m_storedSpin.unlock(); + if (_lockSpin) + m_dumpSpin.unlock(); + return 0; + } + auto& t = it->second; _outputStream.write(it->first); @@ -1512,7 +1562,7 @@ uint32_t ProfileManager::dumpBlocksToFile(const char* _filename) auto oldbuf = s.rdbuf(outputFile.rdbuf()); // Write data directly to file - const auto blocksNumber = dumpBlocksToStream(outputStream, true); + const auto blocksNumber = dumpBlocksToStream(outputStream, true, false); // Restore old outputStream buffer to avoid possible second memory free on stringstream destructor s.rdbuf(oldbuf); @@ -1628,12 +1678,31 @@ bool ProfileManager::isListening() const ////////////////////////////////////////////////////////////////////////// +template +inline void join(std::future& futureResult) +{ + if (futureResult.valid()) + futureResult.get(); +} + void ProfileManager::listen(uint16_t _port) { EASY_THREAD_SCOPE("EasyProfiler.Listen"); EASY_LOGMSG("Listening started\n"); + profiler::OStream os; + std::future dumpingResult; + bool dumping = false; + + const auto stopDumping = [this, &dumping, &dumpingResult, &os] + { + dumping = false; + m_stopDumping.store(true, std::memory_order_release); + join(dumpingResult); + os.clear(); + }; + EasySocket socket; profiler::net::Message replyMessage(profiler::net::MESSAGE_TYPE_REPLY_START_CAPTURING); @@ -1641,12 +1710,13 @@ void ProfileManager::listen(uint16_t _port) int bytes = 0; while (!m_stopListen.load(std::memory_order_acquire)) { - bool hasConnect = false; + if (dumping) + stopDumping(); socket.listen(); socket.accept(); - hasConnect = true; + bool hasConnect = true; // Send reply { @@ -1663,230 +1733,260 @@ void ProfileManager::listen(uint16_t _port) while (hasConnect && !m_stopListen.load(std::memory_order_acquire)) { - char buffer[256] = {}; + if (dumping) + { + if (!dumpingResult.valid()) + { + dumping = false; + socket.setReceiveTimeout(0); + os.clear(); + } + else if (dumpingResult.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready) + { + dumping = false; + dumpingResult.get(); + const auto size = os.stream().tellp(); + static const decltype(size) badSize = -1; + if (size != badSize) + { + const profiler::net::DataMessage dm(static_cast(size), + profiler::net::MESSAGE_TYPE_REPLY_BLOCKS); + + const size_t packet_size = sizeof(dm) + dm.size; + std::string sendbuf; + sendbuf.reserve(packet_size + 1); + + if (sendbuf.capacity() >= packet_size) // check if there is enough memory + { + sendbuf.append((const char*) &dm, sizeof(dm)); + sendbuf += os.stream().str(); // TODO: Avoid double-coping data from stringstream! + os.clear(); + + bytes = socket.send(sendbuf.c_str(), packet_size); + hasConnect = bytes > 0; + if (!hasConnect) + break; + } + else + { + EASY_ERROR("Can not send blocks. Not enough memory for allocating " << packet_size + << " bytes"); + os.clear(); + } + } + else + { + EASY_ERROR("Can not send blocks. Bad std::stringstream.tellp() == -1"); + os.clear(); + } + + replyMessage.type = profiler::net::MESSAGE_TYPE_REPLY_BLOCKS_END; + bytes = socket.send(&replyMessage, sizeof(replyMessage)); + hasConnect = bytes > 0; + if (!hasConnect) + break; + + socket.setReceiveTimeout(0); + } + } + + char buffer[256] = {}; bytes = socket.receive(buffer, 255); - hasConnect = bytes > 0; + hasConnect = socket.isConnected(); + if (!hasConnect || bytes < static_cast(sizeof(profiler::net::Message))) + continue; - char *buf = &buffer[0]; + auto message = (const profiler::net::Message*)buffer; + if (!message->isEasyNetMessage()) + continue; - if (bytes > 0) + switch (message->type) { - profiler::net::Message* message = (profiler::net::Message*)buf; - if (!message->isEasyNetMessage()){ - continue; - } - - switch (message->type) + case profiler::net::MESSAGE_TYPE_CHECK_CONNECTION: { - case profiler::net::MESSAGE_TYPE_CHECK_CONNECTION: - { - EASY_LOGMSG("receive MESSAGE_TYPE_CHECK_CONNECTION\n"); - break; - } - - case profiler::net::MESSAGE_TYPE_REQUEST_MAIN_FRAME_TIME_MAX_AVG_US: - { - profiler::timestamp_t maxDuration = maxFrameDuration(), avgDuration = avgFrameDuration(); - maxDuration = TICKS_TO_US(maxDuration); - avgDuration = TICKS_TO_US(avgDuration); - const profiler::net::TimestampMessage reply(profiler::net::MESSAGE_TYPE_REPLY_MAIN_FRAME_TIME_MAX_AVG_US, (uint32_t)maxDuration, (uint32_t)avgDuration); - bytes = socket.send(&reply, sizeof(profiler::net::TimestampMessage)); - hasConnect = bytes > 0; - break; - } - - case profiler::net::MESSAGE_TYPE_REQUEST_START_CAPTURE: - { - EASY_LOGMSG("receive REQUEST_START_CAPTURE\n"); - - ::profiler::timestamp_t t = 0; - EASY_FORCE_EVENT(t, "StartCapture", EASY_COLOR_START, profiler::OFF); - - m_dumpSpin.lock(); - const auto prev = m_profilerStatus.exchange(EASY_PROF_ENABLED, std::memory_order_release); - if (prev != EASY_PROF_ENABLED) { - enableEventTracer(); - m_beginTime = t; - } - m_dumpSpin.unlock(); - - replyMessage.type = profiler::net::MESSAGE_TYPE_REPLY_START_CAPTURING; - bytes = socket.send(&replyMessage, sizeof(replyMessage)); - hasConnect = bytes > 0; - - break; - } - - case profiler::net::MESSAGE_TYPE_REQUEST_STOP_CAPTURE: - { - EASY_LOGMSG("receive REQUEST_STOP_CAPTURE\n"); - - m_dumpSpin.lock(); - auto time = getCurrentTime(); - const auto prev = m_profilerStatus.exchange(EASY_PROF_DUMP, std::memory_order_release); - if (prev == EASY_PROF_ENABLED) { - disableEventTracer(); - m_endTime = time; - } - EASY_FORCE_EVENT2(m_endTime, "StopCapture", EASY_COLOR_END, profiler::OFF); - - //TODO - //if connection aborted - ignore this part - - profiler::OStream os; - dumpBlocksToStream(os, false); - m_dumpSpin.unlock(); - - const auto size = os.stream().tellp(); - static const decltype(size) badSize = -1; - if (size != badSize) - { - const profiler::net::DataMessage dm(static_cast(size), profiler::net::MESSAGE_TYPE_REPLY_BLOCKS); - - const size_t packet_size = sizeof(dm) + dm.size; - std::string sendbuf; - sendbuf.reserve(packet_size + 1); - - if (sendbuf.capacity() >= packet_size) // check if there is enough memory - { - sendbuf.append((const char*)&dm, sizeof(dm)); - sendbuf += os.stream().str(); // TODO: Avoid double-coping data from stringstream! - os.clear(); - - bytes = socket.send(sendbuf.c_str(), packet_size); - hasConnect = bytes > 0; - } - else - { - EASY_ERROR("Can not send blocks. Not enough memory for allocating " << packet_size << " bytes"); - } - } - else - { - EASY_ERROR("Can not send blocks. Bad std::stringstream.tellp() == -1"); - } - - replyMessage.type = profiler::net::MESSAGE_TYPE_REPLY_BLOCKS_END; - bytes = socket.send(&replyMessage, sizeof(replyMessage)); - hasConnect = bytes > 0; - - break; - } - - case profiler::net::MESSAGE_TYPE_REQUEST_BLOCKS_DESCRIPTION: - { - EASY_LOGMSG("receive REQUEST_BLOCKS_DESCRIPTION\n"); - - profiler::OStream os; - - // Write profiler signature and version - os.write(PROFILER_SIGNATURE); - os.write(EASY_CURRENT_VERSION); - - // Write block descriptors - m_storedSpin.lock(); - os.write(static_cast(m_descriptors.size())); - os.write(m_usedMemorySize); - for (const auto descriptor : m_descriptors) - { - const auto name_size = descriptor->nameSize(); - const auto filename_size = descriptor->filenameSize(); - const auto size = static_cast(sizeof(profiler::SerializedBlockDescriptor) + name_size + filename_size); - - os.write(size); - os.write(*descriptor); - os.write(name_size); - os.write(descriptor->name(), name_size); - os.write(descriptor->filename(), filename_size); - } - m_storedSpin.unlock(); - // END of Write block descriptors. - - const auto size = os.stream().tellp(); - static const decltype(size) badSize = -1; - if (size != badSize) - { - const profiler::net::DataMessage dm(static_cast(size), profiler::net::MESSAGE_TYPE_REPLY_BLOCKS_DESCRIPTION); - - const size_t packet_size = sizeof(dm) + dm.size; - std::string sendbuf; - sendbuf.reserve(packet_size + 1); - - if (sendbuf.capacity() >= packet_size) // check if there is enough memory - { - sendbuf.append((const char*)&dm, sizeof(dm)); - sendbuf += os.stream().str(); // TODO: Avoid double-coping data from stringstream! - os.clear(); - - bytes = socket.send(sendbuf.c_str(), packet_size); - hasConnect = bytes > 0; - } - else - { - EASY_ERROR("Can not send block descriptions. Not enough memory for allocating " << packet_size << " bytes"); - } - } - else - { - EASY_ERROR("Can not send block descriptions. Bad std::stringstream.tellp() == -1"); - } - - replyMessage.type = profiler::net::MESSAGE_TYPE_REPLY_BLOCKS_DESCRIPTION_END; - bytes = socket.send(&replyMessage, sizeof(replyMessage)); - hasConnect = bytes > 0; - - break; - } - - case profiler::net::MESSAGE_TYPE_EDIT_BLOCK_STATUS: - { - auto data = reinterpret_cast(message); - - EASY_LOGMSG("receive EDIT_BLOCK_STATUS id=" << data->id << " status=" << data->status << std::endl); - - setBlockStatus(data->id, static_cast<::profiler::EasyBlockStatus>(data->status)); - - break; - } - - case profiler::net::MESSAGE_TYPE_EVENT_TRACING_STATUS: - { - auto data = reinterpret_cast(message); - - EASY_LOGMSG("receive EVENT_TRACING_STATUS on=" << data->flag << std::endl); - - m_isEventTracingEnabled.store(data->flag, std::memory_order_release); - break; - } - - case profiler::net::MESSAGE_TYPE_EVENT_TRACING_PRIORITY: - { -#if defined(_WIN32) || EASY_OPTION_LOG_ENABLED != 0 - auto data = reinterpret_cast(message); -#endif - - EASY_LOGMSG("receive EVENT_TRACING_PRIORITY low=" << data->flag << std::endl); - -#ifdef _WIN32 - EasyEventTracer::instance().setLowPriority(data->flag); -#endif - break; - } - - default: - break; + EASY_LOGMSG("receive MESSAGE_TYPE_CHECK_CONNECTION\n"); + break; } - //nn_freemsg (buf); + case profiler::net::MESSAGE_TYPE_REQUEST_MAIN_FRAME_TIME_MAX_AVG_US: + { + profiler::timestamp_t maxDuration = maxFrameDuration(), avgDuration = avgFrameDuration(); + maxDuration = TICKS_TO_US(maxDuration); + avgDuration = TICKS_TO_US(avgDuration); + const profiler::net::TimestampMessage reply(profiler::net::MESSAGE_TYPE_REPLY_MAIN_FRAME_TIME_MAX_AVG_US, (uint32_t)maxDuration, (uint32_t)avgDuration); + bytes = socket.send(&reply, sizeof(profiler::net::TimestampMessage)); + hasConnect = bytes > 0; + break; + } + + case profiler::net::MESSAGE_TYPE_REQUEST_START_CAPTURE: + { + EASY_LOGMSG("receive REQUEST_START_CAPTURE\n"); + + ::profiler::timestamp_t t = 0; + EASY_FORCE_EVENT(t, "StartCapture", EASY_COLOR_START, profiler::OFF); + + m_dumpSpin.lock(); + const auto prev = m_profilerStatus.exchange(EASY_PROF_ENABLED, std::memory_order_release); + if (prev != EASY_PROF_ENABLED) { + enableEventTracer(); + m_beginTime = t; + } + m_dumpSpin.unlock(); + + replyMessage.type = profiler::net::MESSAGE_TYPE_REPLY_START_CAPTURING; + bytes = socket.send(&replyMessage, sizeof(replyMessage)); + hasConnect = bytes > 0; + + break; + } + + case profiler::net::MESSAGE_TYPE_REQUEST_STOP_CAPTURE: + { + EASY_LOGMSG("receive REQUEST_STOP_CAPTURE\n"); + + if (dumping) + break; + + m_dumpSpin.lock(); + auto time = getCurrentTime(); + const auto prev = m_profilerStatus.exchange(EASY_PROF_DUMP, std::memory_order_release); + if (prev == EASY_PROF_ENABLED) { + disableEventTracer(); + m_endTime = time; + } + EASY_FORCE_EVENT2(m_endTime, "StopCapture", EASY_COLOR_END, profiler::OFF); + + dumping = true; + socket.setReceiveTimeout(500); // We have to check if dumping ready or not + + m_stopDumping.store(false, std::memory_order_release); + dumpingResult = std::async(std::launch::async, [this, &os] + { + auto result = dumpBlocksToStream(os, false, true); + m_dumpSpin.unlock(); + return result; + }); + + break; + } + + case profiler::net::MESSAGE_TYPE_REQUEST_BLOCKS_DESCRIPTION: + { + EASY_LOGMSG("receive REQUEST_BLOCKS_DESCRIPTION\n"); + + if (dumping) + stopDumping(); + + // Write profiler signature and version + os.write(PROFILER_SIGNATURE); + os.write(EASY_CURRENT_VERSION); + + // Write block descriptors + m_storedSpin.lock(); + os.write(static_cast(m_descriptors.size())); + os.write(m_usedMemorySize); + for (const auto descriptor : m_descriptors) + { + const auto name_size = descriptor->nameSize(); + const auto filename_size = descriptor->filenameSize(); + const auto size = static_cast(sizeof(profiler::SerializedBlockDescriptor) + + name_size + filename_size); + + os.write(size); + os.write(*descriptor); + os.write(name_size); + os.write(descriptor->name(), name_size); + os.write(descriptor->filename(), filename_size); + } + m_storedSpin.unlock(); + // END of Write block descriptors. + + const auto size = os.stream().tellp(); + static const decltype(size) badSize = -1; + if (size != badSize) + { + const profiler::net::DataMessage dm(static_cast(size), + profiler::net::MESSAGE_TYPE_REPLY_BLOCKS_DESCRIPTION); + + const size_t packet_size = sizeof(dm) + dm.size; + std::string sendbuf; + sendbuf.reserve(packet_size + 1); + + if (sendbuf.capacity() >= packet_size) // check if there is enough memory + { + sendbuf.append((const char*)&dm, sizeof(dm)); + sendbuf += os.stream().str(); // TODO: Avoid double-coping data from stringstream! + os.clear(); + + bytes = socket.send(sendbuf.c_str(), packet_size); + //hasConnect = bytes > 0; + } + else + { + EASY_ERROR("Can not send block descriptions. Not enough memory for allocating " << packet_size << " bytes"); + } + } + else + { + EASY_ERROR("Can not send block descriptions. Bad std::stringstream.tellp() == -1"); + } + + replyMessage.type = profiler::net::MESSAGE_TYPE_REPLY_BLOCKS_DESCRIPTION_END; + bytes = socket.send(&replyMessage, sizeof(replyMessage)); + hasConnect = bytes > 0; + + break; + } + + case profiler::net::MESSAGE_TYPE_EDIT_BLOCK_STATUS: + { + auto data = reinterpret_cast(message); + + EASY_LOGMSG("receive EDIT_BLOCK_STATUS id=" << data->id << " status=" << data->status << std::endl); + + setBlockStatus(data->id, static_cast<::profiler::EasyBlockStatus>(data->status)); + + break; + } + + case profiler::net::MESSAGE_TYPE_EVENT_TRACING_STATUS: + { + auto data = reinterpret_cast(message); + + EASY_LOGMSG("receive EVENT_TRACING_STATUS on=" << data->flag << std::endl); + + m_isEventTracingEnabled.store(data->flag, std::memory_order_release); + break; + } + + case profiler::net::MESSAGE_TYPE_EVENT_TRACING_PRIORITY: + { +#if defined(_WIN32) || EASY_OPTION_LOG_ENABLED != 0 + auto data = reinterpret_cast(message); +#endif + + EASY_LOGMSG("receive EVENT_TRACING_PRIORITY low=" << data->flag << std::endl); + +#if defined(_WIN32) + EasyEventTracer::instance().setLowPriority(data->flag); +#endif + break; + } + + default: + break; } } - - - } + if (dumping) + { + m_stopDumping.store(true, std::memory_order_release); + join(dumpingResult); + } } ////////////////////////////////////////////////////////////////////////// diff --git a/easy_profiler_core/profile_manager.h b/easy_profiler_core/profile_manager.h index c198790..6fad1fb 100644 --- a/easy_profiler_core/profile_manager.h +++ b/easy_profiler_core/profile_manager.h @@ -104,10 +104,11 @@ class ProfileManager std::atomic_bool m_isAlreadyListening; std::atomic_bool m_frameMaxReset; std::atomic_bool m_frameAvgReset; + std::atomic_bool m_stopDumping; std::string m_csInfoFilename = "/tmp/cs_profiling_info.log"; - uint32_t dumpBlocksToStream(profiler::OStream& _outputStream, bool _lockSpin); + uint32_t dumpBlocksToStream(profiler::OStream& _outputStream, bool _lockSpin, bool _async); void setBlockStatus(profiler::block_id_t _id, profiler::EasyBlockStatus _status); std::thread m_listenThread;