0
0
mirror of https://github.com/yse/easy_profiler.git synced 2024-12-27 00:21:11 +08:00

#75 [GUI] ui is not freezed now when you hit Cancel button while profiled blocks are prepared to be sent

This commit is contained in:
Victor Zarubkin 2017-11-02 22:43:37 +03:00
parent de6b6c82b5
commit 4bf796cb7a
4 changed files with 569 additions and 406 deletions

View File

@ -40,281 +40,342 @@ limitations under the License.
**/
#include <easy/easy_socket.h>
#include <string.h>
#include <thread>
#ifdef _WIN32
#pragma comment (lib, "Ws2_32.lib")
#pragma comment (lib, "Mswsock.lib")
#pragma comment (lib, "AdvApi32.lib")
#if defined(_WIN32)
# pragma comment (lib, "Ws2_32.lib")
# pragma comment (lib, "Mswsock.lib")
# pragma comment (lib, "AdvApi32.lib")
#else
#include <errno.h>
#include <sys/ioctl.h>
# include <errno.h>
# include <sys/ioctl.h>
#endif
bool EasySocket::checkSocket(socket_t s) const
{
return s > 0;
}
/////////////////////////////////////////////////////////////////
int EasySocket::_close(EasySocket::socket_t s)
#if defined(_WIN32)
const int SOCK_ABORTED = WSAECONNABORTED;
const int SOCK_RESET = WSAECONNRESET;
const int SOCK_IN_PROGRESS = WSAEINPROGRESS;
#else
const int SOCK_ABORTED = ECONNABORTED;
const int SOCK_RESET = ECONNRESET;
const int SOCK_IN_PROGRESS = EINPROGRESS;
const int SOCK_BROKEN_PIPE = EPIPE;
const int SOCK_ENOENT = ENOENT;
#endif
const int SEND_BUFFER_SIZE = 64 * 1024 * 1024;
/////////////////////////////////////////////////////////////////
static int closeSocket(EasySocket::socket_t s)
{
#ifdef _WIN32
#if defined(_WIN32)
return ::closesocket(s);
#else
return ::close(s);
#endif
}
/////////////////////////////////////////////////////////////////
bool EasySocket::checkSocket(socket_t s) const
{
return s > 0;
}
void EasySocket::setBlocking(EasySocket::socket_t s, bool blocking)
{
#ifdef _WIN32
#if defined(_WIN32)
u_long iMode = blocking ? 0 : 1;//0 - blocking, 1 - non blocking
ioctlsocket(s, FIONBIO, &iMode);
::ioctlsocket(s, FIONBIO, &iMode);
#else
const int iMode = blocking ? 0 : 1;//0 - blocking, 1 - non blocking
ioctl(s, FIONBIO, (char *)&iMode);
int iMode = blocking ? 0 : 1;//0 - blocking, 1 - non blocking
::ioctl(s, FIONBIO, (char*)&iMode);
#endif
}
int EasySocket::bind(uint16_t portno)
int EasySocket::bind(uint16_t port)
{
if (!checkSocket(m_socket)) return -1;
if (!checkSocket(m_socket))
return -1;
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr.sin_port = htons(portno);
auto res = ::bind(m_socket, (struct sockaddr *) &serv_addr, sizeof(serv_addr));
serv_addr.sin_port = htons(port);
auto res = ::bind(m_socket, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
return res;
}
void EasySocket::flush()
{
if (m_socket){
_close(m_socket);
}
if (m_replySocket != m_socket){
_close(m_replySocket);
}
#ifdef _WIN32
if (m_socket)
closeSocket(m_socket);
if (m_replySocket != m_socket)
closeSocket(m_replySocket);
#if defined(_WIN32)
m_socket = 0;
m_replySocket = 0;
#else
wsaret = 0;
m_wsaret = 0;
#endif
}
void EasySocket::checkResult(int result)
{
//printf("Errno: %s\n", strerror(errno));
if(result >= 0){
m_state = CONNECTION_STATE_SUCCESS;
if (result >= 0)
{
m_state = ConnectionState::Connected;
return;
}else if(result == -1){
}
int error_code = 0;
#ifdef _WIN32
error_code = WSAGetLastError();
const int CONNECTION_ABORTED = WSAECONNABORTED;
const int CONNECTION_RESET = WSAECONNRESET;
const int CONNECTION_IN_PROGRESS = WSAEINPROGRESS;
if (result == -1) // is this check necessary?
{
#if defined(_WIN32)
const int error_code = WSAGetLastError();
#else
error_code = errno;
const int CONNECTION_ABORTED = ECONNABORTED;
const int CONNECTION_RESET = ECONNRESET;
const int CONNECTION_IN_PROGRESS = EINPROGRESS;
const int CONNECTION_BROKEN_PIPE = EPIPE;
const int CONNECTION_ENOENT = ENOENT;
const int error_code = errno;
#endif
switch(error_code)
switch (error_code)
{
case CONNECTION_ABORTED:
case CONNECTION_RESET:
#ifndef _WIN32
case CONNECTION_BROKEN_PIPE:
case CONNECTION_ENOENT:
case SOCK_ABORTED:
case SOCK_RESET:
#if !defined(_WIN32)
case SOCK_BROKEN_PIPE:
case SOCK_ENOENT:
#endif
m_state = CONNECTION_STATE_DISCONNECTED;
break;
case CONNECTION_IN_PROGRESS:
m_state = CONNECTION_STATE_IN_PROGRESS;
break;
default:
break;
m_state = ConnectionState::Disconnected;
break;
case SOCK_IN_PROGRESS:
m_state = ConnectionState::Connecting;
break;
default:
break;
}
}
}
void EasySocket::init()
{
if (wsaret == 0)
{
int protocol = 0;
#ifdef _WIN32
protocol = IPPROTO_TCP;
#endif
m_socket = socket(AF_INET, SOCK_STREAM, protocol);
if (!checkSocket(m_socket)) {
return;
}
}else
if (m_wsaret != 0)
return;
setBlocking(m_socket,true);
#ifndef _WIN32
wsaret = 1;
#if !defined(_WIN32)
const int protocol = 0;
#else
const int protocol = IPPROTO_TCP;
#endif
int opt = 1;
setsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, (const char *)&opt, sizeof(opt));
m_socket = ::socket(AF_INET, SOCK_STREAM, protocol);
if (!checkSocket(m_socket))
return;
setBlocking(m_socket, true);
#if !defined(_WIN32)
m_wsaret = 1;
#endif
const int opt = 1;
::setsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(int));
}
EasySocket::ConnectionState EasySocket::state() const
{
return m_state;
}
bool EasySocket::isDisconnected() const
{
return static_cast<int>(m_state) <= 0;
}
bool EasySocket::isConnected() const
{
return m_state == ConnectionState::Connected;
}
EasySocket::EasySocket()
{
#ifdef _WIN32
#if defined(_WIN32)
WSADATA wsaData;
wsaret = WSAStartup(0x101, &wsaData);
m_wsaret = WSAStartup(0x101, &wsaData);
#else
wsaret = 0;
m_wsaret = 0;
#endif
init();
#ifndef _WIN32
wsaret = 1;
#endif
}
EasySocket::~EasySocket()
{
flush();
#ifdef _WIN32
if (wsaret == 0)
#if defined(_WIN32)
if (m_wsaret == 0)
WSACleanup();
#endif
}
int EasySocket::send(const void *buf, size_t nbyte)
void EasySocket::setReceiveTimeout(int milliseconds)
{
if(!checkSocket(m_replySocket)) return -1;
int res = 0;
#if defined(_WIN32) || defined(__APPLE__)
res = ::send(m_replySocket, (const char*)buf, (int)nbyte, 0);
if (!isConnected() || !checkSocket(m_replySocket))
return;
if (milliseconds <= 0)
milliseconds = std::numeric_limits<int>::max() / 1000;
#if defined(_WIN32)
const DWORD timeout = static_cast<DWORD>(milliseconds);
::setsockopt(m_replySocket, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout, sizeof(DWORD));
#else
res = ::send(m_replySocket,buf,nbyte,MSG_NOSIGNAL);
struct timeval tv;
if (milliseconds >= 1000)
{
const int s = milliseconds / 1000;
const int us = (milliseconds - s * 1000) * 1000;
tv.tv_sec = s;
tv.tv_usec = us;
}
else
{
tv.tv_sec = 0;
tv.tv_usec = milliseconds * 1000;
}
::setsockopt(m_replySocket, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof (struct timeval));
#endif
}
int EasySocket::send(const void* buffer, size_t nbytes)
{
if (!checkSocket(m_replySocket))
return -1;
#if defined(_WIN32) || defined(__APPLE__)
const int res = ::send(m_replySocket, (const char*)buffer, (int)nbytes, 0);
#else
const int res = (int)::send(m_replySocket, buffer, nbytes, MSG_NOSIGNAL);
#endif
checkResult(res);
return res;
}
int EasySocket::receive(void *buf, size_t nbyte)
int EasySocket::receive(void* buffer, size_t nbytes)
{
if(!checkSocket(m_replySocket)) return -1;
int res = 0;
#ifdef _WIN32
res = ::recv(m_replySocket, (char*)buf, (int)nbyte, 0);
if (!checkSocket(m_replySocket))
return -1;
#if defined(_WIN32)
const int res = ::recv(m_replySocket, (char*)buffer, (int)nbytes, 0);
#else
res = ::read(m_replySocket,buf,nbyte);
const int res = (int)::read(m_replySocket, buffer, nbytes);
#endif
checkResult(res);
if (res == 0){
m_state = CONNECTION_STATE_DISCONNECTED;
}
if (res == 0)
m_state = ConnectionState::Disconnected;
return res;
}
int EasySocket::listen(int count)
{
if(!checkSocket(m_socket)) return -1;
int res = ::listen(m_socket,count);
if (!checkSocket(m_socket))
return -1;
const int res = ::listen(m_socket, count);
checkResult(res);
return res;
}
int EasySocket::accept()
{
if(!checkSocket(m_socket)) return -1;
if (!checkSocket(m_socket))
return -1;
fd_set fdread, fdwrite, fdexcl;
timeval tv = { 0 };
fd_set fdread;
FD_ZERO (&fdread);
FD_SET (m_socket, &fdread);
fdwrite = fdread;
fdexcl = fdread;
tv.tv_sec = 0; tv.tv_usec = 500;
int rc =select ((int)m_socket+1, &fdread, &fdwrite, &fdexcl, &tv);
fd_set fdwrite = fdread;
fd_set fdexcl = fdread;
if(rc <= 0){
//there is no connection for accept
return -1;
}
m_replySocket = ::accept(m_socket,nullptr,nullptr);
struct timeval tv { 0, 500 };
const int rc = ::select((int)m_socket + 1, &fdread, &fdwrite, &fdexcl, &tv);
if (rc <= 0)
return -1; // there is no connection for accept
m_replySocket = ::accept(m_socket, nullptr, nullptr);
checkResult((int)m_replySocket);
if(checkSocket(m_replySocket))
{
int send_buffer = 64*1024*1024;
int send_buffer_sizeof = sizeof(int);
setsockopt(m_replySocket, SOL_SOCKET, SO_SNDBUF, (char*)&send_buffer, send_buffer_sizeof);
//int flag = 1;
//int result = setsockopt(m_replySocket,IPPROTO_TCP,TCP_NODELAY,(char *)&flag,sizeof(int));
if (checkSocket(m_replySocket))
{
::setsockopt(m_replySocket, SOL_SOCKET, SO_SNDBUF, (char*)&SEND_BUFFER_SIZE, sizeof(int));
//const int flag = 1;
//const int result = setsockopt(m_replySocket,IPPROTO_TCP,TCP_NODELAY,(char *)&flag,sizeof(int));
#if defined(__APPLE__)
// Apple doesn't have MSG_NOSIGNAL, work around it
#ifdef __APPLE__
int value = 1;
setsockopt(m_replySocket, SOL_SOCKET, SO_NOSIGPIPE, &value, sizeof(value));
const int value = 1;
::setsockopt(m_replySocket, SOL_SOCKET, SO_NOSIGPIPE, &value, sizeof(value));
#endif
//setBlocking(m_replySocket,true);
}
return (int)m_replySocket;
}
bool EasySocket::setAddress(const char *serv, uint16_t portno)
bool EasySocket::setAddress(const char* address, uint16_t port)
{
server = gethostbyname(serv);
if (server == NULL) {
m_server = ::gethostbyname(address);
if (m_server == nullptr)
return false;
//fprintf(stderr,"ERROR, no such host\n");
}
memset((char *)&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
memcpy((char *)&serv_addr.sin_addr.s_addr, (char *)server->h_addr, server->h_length);
serv_addr.sin_port = htons(portno);
memset((char*)&m_serverAddress, 0, sizeof(m_serverAddress));
m_serverAddress.sin_family = AF_INET;
memcpy((char*)&m_serverAddress.sin_addr.s_addr, (char*)m_server->h_addr, (size_t)m_server->h_length);
m_serverAddress.sin_port = htons(port);
return true;
}
int EasySocket::connect()
{
if (server == NULL || m_socket <=0 ) {
if (m_server == nullptr || m_socket <= 0)
return -1;
//fprintf(stderr,"ERROR, no such host\n");
}
int res = 0;
//TODO: more intelligence
#ifndef _WIN32
setBlocking(m_socket,false);
#if !defined(_WIN32)
setBlocking(m_socket, false);
int counter = 0;
int sleepMs = 20;
int waitSec = 1;
int waitMs = waitSec*1000/sleepMs;
while(counter++ < waitMs)
const int sleepMs = 20;
const int waitSec = 1;
const int waitMs = waitSec * 1000 / sleepMs;
for (int counter = 0; counter++ < waitMs;)
{
res = ::connect(m_socket,(struct sockaddr *) &serv_addr,sizeof(serv_addr));
res = ::connect(m_socket, (struct sockaddr*)&m_serverAddress, sizeof(m_serverAddress));
#if defined(__APPLE__)
// on Apple, treat EISCONN error as success
#ifdef __APPLE__
if (res == -1 && errno == EISCONN)
{
res = 0;
@ -323,42 +384,44 @@ int EasySocket::connect()
#endif
checkResult(res);
if (res == 0)
{
break;
}
if (m_state == CONNECTION_STATE_IN_PROGRESS)
if (m_state == ConnectionState::Connecting)
{
std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs));
continue;
}
if(m_state != CONNECTION_STATE_IN_PROGRESS && m_state != CONNECTION_STATE_SUCCESS )
if (isDisconnected())
{
break;
}
}
setBlocking(m_socket,true);
setBlocking(m_socket, true);
#else
res = ::connect(m_socket, (struct sockaddr *) &serv_addr, sizeof(serv_addr));
res = ::connect(m_socket, (struct sockaddr*)&m_serverAddress, sizeof(m_serverAddress));
checkResult(res);
#endif
if(res == 0){
if (res == 0)
{
struct timeval tv;
tv.tv_sec = 1;
tv.tv_usec = 0;
::setsockopt(m_socket, SOL_SOCKET, SO_RCVTIMEO, (char*)&tv, sizeof(struct timeval));
setsockopt(m_socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv,sizeof(struct timeval));
#ifdef __APPLE__
#if defined(__APPLE__)
// Apple doesn't have MSG_NOSIGNAL, work around it
int value = 1;
const int value = 1;
setsockopt(m_socket, SOL_SOCKET, SO_NOSIGPIPE, &value, sizeof(value));
#endif
m_replySocket = m_socket;
}
return res;
}

View File

@ -39,8 +39,8 @@ The Apache License, Version 2.0 (the "License");
limitations under the License.
**/
#ifndef EASY________SOCKET_________H
#define EASY________SOCKET_________H
#ifndef EASY_PROFILER_SOCKET_H
#define EASY_PROFILER_SOCKET_H
#include <stdint.h>
#include <easy/profiler.h>
@ -72,40 +72,37 @@ public:
typedef int socket_t;
#endif
enum ConnectionState
enum class ConnectionState : int8_t
{
CONNECTION_STATE_UNKNOWN,
CONNECTION_STATE_SUCCESS,
CONNECTION_STATE_DISCONNECTED,
CONNECTION_STATE_IN_PROGRESS
Disconnected = -1,
Unknown,
Connected,
Connecting
};
private:
void checkResult(int result);
bool checkSocket(socket_t s) const;
static int _close(socket_t s);
void setBlocking(socket_t s, bool blocking);
socket_t m_socket = 0;
socket_t m_replySocket = 0;
int wsaret = -1;
int m_receiveTimeoutMs = 0;
int m_wsaret = -1;
struct hostent * server;
struct sockaddr_in serv_addr;
struct hostent* m_server = nullptr;
struct sockaddr_in m_serverAddress;
ConnectionState m_state = CONNECTION_STATE_UNKNOWN;
ConnectionState m_state = ConnectionState::Unknown;
public:
EasySocket();
~EasySocket();
int send(const void *buf, size_t nbyte);
int receive(void *buf, size_t nbyte);
int listen(int count=5);
void setReceiveTimeout(int milliseconds);
int send(const void* buf, size_t nbyte);
int receive(void* buf, size_t nbyte);
int listen(int count = 5);
int accept();
int bind(uint16_t portno);
@ -115,14 +112,16 @@ public:
void flush();
void init();
void setState(ConnectionState state){m_state=state;}
ConnectionState state() const{return m_state;}
ConnectionState state() const;
bool isDisconnected() const;
bool isConnected() const;
bool isDisconnected() const
{
return m_state == CONNECTION_STATE_UNKNOWN ||
m_state == CONNECTION_STATE_DISCONNECTED;
}
};
private:
#endif // EASY________SOCKET_________H
void checkResult(int result);
bool checkSocket(socket_t s) const;
void setBlocking(socket_t s, bool blocking);
}; // end of class EasySocket.
#endif // EASY_PROFILER_SOCKET_H

View File

@ -50,7 +50,7 @@
#include <algorithm>
#include <fstream>
#include <assert.h>
#include <future>
#include "profile_manager.h"
#include <easy/serialized_block.h>
@ -654,6 +654,7 @@ ProfileManager::ProfileManager() :
m_profilerStatus = ATOMIC_VAR_INIT(EASY_PROF_DISABLED);
m_isEventTracingEnabled = ATOMIC_VAR_INIT(EASY_OPTION_EVENT_TRACING_ENABLED);
m_isAlreadyListening = ATOMIC_VAR_INIT(false);
m_stopDumping = ATOMIC_VAR_INIT(false);
m_stopListen = ATOMIC_VAR_INIT(false);
m_mainThreadId = ATOMIC_VAR_INIT(0);
@ -1258,7 +1259,7 @@ char ProfileManager::checkThreadExpired(ThreadStorage& _registeredThread)
//////////////////////////////////////////////////////////////////////////
uint32_t ProfileManager::dumpBlocksToStream(profiler::OStream& _outputStream, bool _lockSpin)
uint32_t ProfileManager::dumpBlocksToStream(profiler::OStream& _outputStream, bool _lockSpin, bool _async)
{
EASY_LOGMSG("dumpBlocksToStream(_lockSpin = " << _lockSpin << ")...\n");
@ -1283,6 +1284,12 @@ uint32_t ProfileManager::dumpBlocksToStream(profiler::OStream& _outputStream, bo
//m_spin.lock();
// This is the only place using both spins, so no dead-lock will occur
if (_async && m_stopDumping.load(std::memory_order_acquire))
{
if (_lockSpin)
m_dumpSpin.unlock();
return 0;
}
// Wait for some time to be sure that all operations which began before setEnabled(false) will be finished.
// This is much better than inserting spin-lock or atomic variable check into each store operation.
@ -1292,6 +1299,13 @@ uint32_t ProfileManager::dumpBlocksToStream(profiler::OStream& _outputStream, bo
EASY_LOG_ONLY(bool logged = false);
for (auto it = m_threads.begin(), end = m_threads.end(); it != end;)
{
if (_async && m_stopDumping.load(std::memory_order_acquire))
{
if (_lockSpin)
m_dumpSpin.unlock();
return 0;
}
if (!it->second.profiledFrameOpened.load(std::memory_order_acquire))
{
++it;
@ -1331,6 +1345,15 @@ uint32_t ProfileManager::dumpBlocksToStream(profiler::OStream& _outputStream, bo
{
// Read thread context switch events from temporary file
if (_async && m_stopDumping.load(std::memory_order_acquire))
{
m_spin.unlock();
m_storedSpin.unlock();
if (_lockSpin)
m_dumpSpin.unlock();
return 0;
}
EASY_LOGMSG("Writing context switch events...\n");
uint64_t timestamp = 0;
@ -1344,6 +1367,15 @@ uint32_t ProfileManager::dumpBlocksToStream(profiler::OStream& _outputStream, bo
pid_t process_to = 0;
while (infile >> timestamp >> thread_from >> thread_to >> next_task_name >> process_to)
{
if (_async && m_stopDumping.load(std::memory_order_acquire))
{
m_spin.unlock();
m_storedSpin.unlock();
if (_lockSpin)
m_dumpSpin.unlock();
return 0;
}
beginContextSwitch(thread_from, timestamp, thread_to, next_task_name.c_str(), false);
endContextSwitch(thread_to, (processid_t)process_to, timestamp, false);
EASY_LOG_ONLY(++num);
@ -1366,6 +1398,15 @@ uint32_t ProfileManager::dumpBlocksToStream(profiler::OStream& _outputStream, bo
uint32_t blocks_number = 0;
for (auto it = m_threads.begin(), end = m_threads.end(); it != end;)
{
if (_async && m_stopDumping.load(std::memory_order_acquire))
{
m_spin.unlock();
m_storedSpin.unlock();
if (_lockSpin)
m_dumpSpin.unlock();
return 0;
}
auto& t = it->second;
uint32_t num = static_cast<uint32_t>(t.blocks.closedList.size()) + static_cast<uint32_t>(t.sync.closedList.size());
const char expired = ProfileManager::checkThreadExpired(t);
@ -1448,6 +1489,15 @@ uint32_t ProfileManager::dumpBlocksToStream(profiler::OStream& _outputStream, bo
// Write blocks and context switch events for each thread
for (auto it = m_threads.begin(), end = m_threads.end(); it != end;)
{
if (_async && m_stopDumping.load(std::memory_order_acquire))
{
m_spin.unlock();
m_storedSpin.unlock();
if (_lockSpin)
m_dumpSpin.unlock();
return 0;
}
auto& t = it->second;
_outputStream.write(it->first);
@ -1512,7 +1562,7 @@ uint32_t ProfileManager::dumpBlocksToFile(const char* _filename)
auto oldbuf = s.rdbuf(outputFile.rdbuf());
// Write data directly to file
const auto blocksNumber = dumpBlocksToStream(outputStream, true);
const auto blocksNumber = dumpBlocksToStream(outputStream, true, false);
// Restore old outputStream buffer to avoid possible second memory free on stringstream destructor
s.rdbuf(oldbuf);
@ -1628,12 +1678,31 @@ bool ProfileManager::isListening() const
//////////////////////////////////////////////////////////////////////////
template <class T>
inline void join(std::future<T>& futureResult)
{
if (futureResult.valid())
futureResult.get();
}
void ProfileManager::listen(uint16_t _port)
{
EASY_THREAD_SCOPE("EasyProfiler.Listen");
EASY_LOGMSG("Listening started\n");
profiler::OStream os;
std::future<uint32_t> dumpingResult;
bool dumping = false;
const auto stopDumping = [this, &dumping, &dumpingResult, &os]
{
dumping = false;
m_stopDumping.store(true, std::memory_order_release);
join(dumpingResult);
os.clear();
};
EasySocket socket;
profiler::net::Message replyMessage(profiler::net::MESSAGE_TYPE_REPLY_START_CAPTURING);
@ -1641,12 +1710,13 @@ void ProfileManager::listen(uint16_t _port)
int bytes = 0;
while (!m_stopListen.load(std::memory_order_acquire))
{
bool hasConnect = false;
if (dumping)
stopDumping();
socket.listen();
socket.accept();
hasConnect = true;
bool hasConnect = true;
// Send reply
{
@ -1663,230 +1733,260 @@ void ProfileManager::listen(uint16_t _port)
while (hasConnect && !m_stopListen.load(std::memory_order_acquire))
{
char buffer[256] = {};
if (dumping)
{
if (!dumpingResult.valid())
{
dumping = false;
socket.setReceiveTimeout(0);
os.clear();
}
else if (dumpingResult.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready)
{
dumping = false;
dumpingResult.get();
const auto size = os.stream().tellp();
static const decltype(size) badSize = -1;
if (size != badSize)
{
const profiler::net::DataMessage dm(static_cast<uint32_t>(size),
profiler::net::MESSAGE_TYPE_REPLY_BLOCKS);
const size_t packet_size = sizeof(dm) + dm.size;
std::string sendbuf;
sendbuf.reserve(packet_size + 1);
if (sendbuf.capacity() >= packet_size) // check if there is enough memory
{
sendbuf.append((const char*) &dm, sizeof(dm));
sendbuf += os.stream().str(); // TODO: Avoid double-coping data from stringstream!
os.clear();
bytes = socket.send(sendbuf.c_str(), packet_size);
hasConnect = bytes > 0;
if (!hasConnect)
break;
}
else
{
EASY_ERROR("Can not send blocks. Not enough memory for allocating " << packet_size
<< " bytes");
os.clear();
}
}
else
{
EASY_ERROR("Can not send blocks. Bad std::stringstream.tellp() == -1");
os.clear();
}
replyMessage.type = profiler::net::MESSAGE_TYPE_REPLY_BLOCKS_END;
bytes = socket.send(&replyMessage, sizeof(replyMessage));
hasConnect = bytes > 0;
if (!hasConnect)
break;
socket.setReceiveTimeout(0);
}
}
char buffer[256] = {};
bytes = socket.receive(buffer, 255);
hasConnect = bytes > 0;
hasConnect = socket.isConnected();
if (!hasConnect || bytes < static_cast<int>(sizeof(profiler::net::Message)))
continue;
char *buf = &buffer[0];
auto message = (const profiler::net::Message*)buffer;
if (!message->isEasyNetMessage())
continue;
if (bytes > 0)
switch (message->type)
{
profiler::net::Message* message = (profiler::net::Message*)buf;
if (!message->isEasyNetMessage()){
continue;
}
switch (message->type)
case profiler::net::MESSAGE_TYPE_CHECK_CONNECTION:
{
case profiler::net::MESSAGE_TYPE_CHECK_CONNECTION:
{
EASY_LOGMSG("receive MESSAGE_TYPE_CHECK_CONNECTION\n");
break;
}
case profiler::net::MESSAGE_TYPE_REQUEST_MAIN_FRAME_TIME_MAX_AVG_US:
{
profiler::timestamp_t maxDuration = maxFrameDuration(), avgDuration = avgFrameDuration();
maxDuration = TICKS_TO_US(maxDuration);
avgDuration = TICKS_TO_US(avgDuration);
const profiler::net::TimestampMessage reply(profiler::net::MESSAGE_TYPE_REPLY_MAIN_FRAME_TIME_MAX_AVG_US, (uint32_t)maxDuration, (uint32_t)avgDuration);
bytes = socket.send(&reply, sizeof(profiler::net::TimestampMessage));
hasConnect = bytes > 0;
break;
}
case profiler::net::MESSAGE_TYPE_REQUEST_START_CAPTURE:
{
EASY_LOGMSG("receive REQUEST_START_CAPTURE\n");
::profiler::timestamp_t t = 0;
EASY_FORCE_EVENT(t, "StartCapture", EASY_COLOR_START, profiler::OFF);
m_dumpSpin.lock();
const auto prev = m_profilerStatus.exchange(EASY_PROF_ENABLED, std::memory_order_release);
if (prev != EASY_PROF_ENABLED) {
enableEventTracer();
m_beginTime = t;
}
m_dumpSpin.unlock();
replyMessage.type = profiler::net::MESSAGE_TYPE_REPLY_START_CAPTURING;
bytes = socket.send(&replyMessage, sizeof(replyMessage));
hasConnect = bytes > 0;
break;
}
case profiler::net::MESSAGE_TYPE_REQUEST_STOP_CAPTURE:
{
EASY_LOGMSG("receive REQUEST_STOP_CAPTURE\n");
m_dumpSpin.lock();
auto time = getCurrentTime();
const auto prev = m_profilerStatus.exchange(EASY_PROF_DUMP, std::memory_order_release);
if (prev == EASY_PROF_ENABLED) {
disableEventTracer();
m_endTime = time;
}
EASY_FORCE_EVENT2(m_endTime, "StopCapture", EASY_COLOR_END, profiler::OFF);
//TODO
//if connection aborted - ignore this part
profiler::OStream os;
dumpBlocksToStream(os, false);
m_dumpSpin.unlock();
const auto size = os.stream().tellp();
static const decltype(size) badSize = -1;
if (size != badSize)
{
const profiler::net::DataMessage dm(static_cast<uint32_t>(size), profiler::net::MESSAGE_TYPE_REPLY_BLOCKS);
const size_t packet_size = sizeof(dm) + dm.size;
std::string sendbuf;
sendbuf.reserve(packet_size + 1);
if (sendbuf.capacity() >= packet_size) // check if there is enough memory
{
sendbuf.append((const char*)&dm, sizeof(dm));
sendbuf += os.stream().str(); // TODO: Avoid double-coping data from stringstream!
os.clear();
bytes = socket.send(sendbuf.c_str(), packet_size);
hasConnect = bytes > 0;
}
else
{
EASY_ERROR("Can not send blocks. Not enough memory for allocating " << packet_size << " bytes");
}
}
else
{
EASY_ERROR("Can not send blocks. Bad std::stringstream.tellp() == -1");
}
replyMessage.type = profiler::net::MESSAGE_TYPE_REPLY_BLOCKS_END;
bytes = socket.send(&replyMessage, sizeof(replyMessage));
hasConnect = bytes > 0;
break;
}
case profiler::net::MESSAGE_TYPE_REQUEST_BLOCKS_DESCRIPTION:
{
EASY_LOGMSG("receive REQUEST_BLOCKS_DESCRIPTION\n");
profiler::OStream os;
// Write profiler signature and version
os.write(PROFILER_SIGNATURE);
os.write(EASY_CURRENT_VERSION);
// Write block descriptors
m_storedSpin.lock();
os.write(static_cast<uint32_t>(m_descriptors.size()));
os.write(m_usedMemorySize);
for (const auto descriptor : m_descriptors)
{
const auto name_size = descriptor->nameSize();
const auto filename_size = descriptor->filenameSize();
const auto size = static_cast<uint16_t>(sizeof(profiler::SerializedBlockDescriptor) + name_size + filename_size);
os.write(size);
os.write<profiler::BaseBlockDescriptor>(*descriptor);
os.write(name_size);
os.write(descriptor->name(), name_size);
os.write(descriptor->filename(), filename_size);
}
m_storedSpin.unlock();
// END of Write block descriptors.
const auto size = os.stream().tellp();
static const decltype(size) badSize = -1;
if (size != badSize)
{
const profiler::net::DataMessage dm(static_cast<uint32_t>(size), profiler::net::MESSAGE_TYPE_REPLY_BLOCKS_DESCRIPTION);
const size_t packet_size = sizeof(dm) + dm.size;
std::string sendbuf;
sendbuf.reserve(packet_size + 1);
if (sendbuf.capacity() >= packet_size) // check if there is enough memory
{
sendbuf.append((const char*)&dm, sizeof(dm));
sendbuf += os.stream().str(); // TODO: Avoid double-coping data from stringstream!
os.clear();
bytes = socket.send(sendbuf.c_str(), packet_size);
hasConnect = bytes > 0;
}
else
{
EASY_ERROR("Can not send block descriptions. Not enough memory for allocating " << packet_size << " bytes");
}
}
else
{
EASY_ERROR("Can not send block descriptions. Bad std::stringstream.tellp() == -1");
}
replyMessage.type = profiler::net::MESSAGE_TYPE_REPLY_BLOCKS_DESCRIPTION_END;
bytes = socket.send(&replyMessage, sizeof(replyMessage));
hasConnect = bytes > 0;
break;
}
case profiler::net::MESSAGE_TYPE_EDIT_BLOCK_STATUS:
{
auto data = reinterpret_cast<const profiler::net::BlockStatusMessage*>(message);
EASY_LOGMSG("receive EDIT_BLOCK_STATUS id=" << data->id << " status=" << data->status << std::endl);
setBlockStatus(data->id, static_cast<::profiler::EasyBlockStatus>(data->status));
break;
}
case profiler::net::MESSAGE_TYPE_EVENT_TRACING_STATUS:
{
auto data = reinterpret_cast<const profiler::net::BoolMessage*>(message);
EASY_LOGMSG("receive EVENT_TRACING_STATUS on=" << data->flag << std::endl);
m_isEventTracingEnabled.store(data->flag, std::memory_order_release);
break;
}
case profiler::net::MESSAGE_TYPE_EVENT_TRACING_PRIORITY:
{
#if defined(_WIN32) || EASY_OPTION_LOG_ENABLED != 0
auto data = reinterpret_cast<const profiler::net::BoolMessage*>(message);
#endif
EASY_LOGMSG("receive EVENT_TRACING_PRIORITY low=" << data->flag << std::endl);
#ifdef _WIN32
EasyEventTracer::instance().setLowPriority(data->flag);
#endif
break;
}
default:
break;
EASY_LOGMSG("receive MESSAGE_TYPE_CHECK_CONNECTION\n");
break;
}
//nn_freemsg (buf);
case profiler::net::MESSAGE_TYPE_REQUEST_MAIN_FRAME_TIME_MAX_AVG_US:
{
profiler::timestamp_t maxDuration = maxFrameDuration(), avgDuration = avgFrameDuration();
maxDuration = TICKS_TO_US(maxDuration);
avgDuration = TICKS_TO_US(avgDuration);
const profiler::net::TimestampMessage reply(profiler::net::MESSAGE_TYPE_REPLY_MAIN_FRAME_TIME_MAX_AVG_US, (uint32_t)maxDuration, (uint32_t)avgDuration);
bytes = socket.send(&reply, sizeof(profiler::net::TimestampMessage));
hasConnect = bytes > 0;
break;
}
case profiler::net::MESSAGE_TYPE_REQUEST_START_CAPTURE:
{
EASY_LOGMSG("receive REQUEST_START_CAPTURE\n");
::profiler::timestamp_t t = 0;
EASY_FORCE_EVENT(t, "StartCapture", EASY_COLOR_START, profiler::OFF);
m_dumpSpin.lock();
const auto prev = m_profilerStatus.exchange(EASY_PROF_ENABLED, std::memory_order_release);
if (prev != EASY_PROF_ENABLED) {
enableEventTracer();
m_beginTime = t;
}
m_dumpSpin.unlock();
replyMessage.type = profiler::net::MESSAGE_TYPE_REPLY_START_CAPTURING;
bytes = socket.send(&replyMessage, sizeof(replyMessage));
hasConnect = bytes > 0;
break;
}
case profiler::net::MESSAGE_TYPE_REQUEST_STOP_CAPTURE:
{
EASY_LOGMSG("receive REQUEST_STOP_CAPTURE\n");
if (dumping)
break;
m_dumpSpin.lock();
auto time = getCurrentTime();
const auto prev = m_profilerStatus.exchange(EASY_PROF_DUMP, std::memory_order_release);
if (prev == EASY_PROF_ENABLED) {
disableEventTracer();
m_endTime = time;
}
EASY_FORCE_EVENT2(m_endTime, "StopCapture", EASY_COLOR_END, profiler::OFF);
dumping = true;
socket.setReceiveTimeout(500); // We have to check if dumping ready or not
m_stopDumping.store(false, std::memory_order_release);
dumpingResult = std::async(std::launch::async, [this, &os]
{
auto result = dumpBlocksToStream(os, false, true);
m_dumpSpin.unlock();
return result;
});
break;
}
case profiler::net::MESSAGE_TYPE_REQUEST_BLOCKS_DESCRIPTION:
{
EASY_LOGMSG("receive REQUEST_BLOCKS_DESCRIPTION\n");
if (dumping)
stopDumping();
// Write profiler signature and version
os.write(PROFILER_SIGNATURE);
os.write(EASY_CURRENT_VERSION);
// Write block descriptors
m_storedSpin.lock();
os.write(static_cast<uint32_t>(m_descriptors.size()));
os.write(m_usedMemorySize);
for (const auto descriptor : m_descriptors)
{
const auto name_size = descriptor->nameSize();
const auto filename_size = descriptor->filenameSize();
const auto size = static_cast<uint16_t>(sizeof(profiler::SerializedBlockDescriptor)
+ name_size + filename_size);
os.write(size);
os.write<profiler::BaseBlockDescriptor>(*descriptor);
os.write(name_size);
os.write(descriptor->name(), name_size);
os.write(descriptor->filename(), filename_size);
}
m_storedSpin.unlock();
// END of Write block descriptors.
const auto size = os.stream().tellp();
static const decltype(size) badSize = -1;
if (size != badSize)
{
const profiler::net::DataMessage dm(static_cast<uint32_t>(size),
profiler::net::MESSAGE_TYPE_REPLY_BLOCKS_DESCRIPTION);
const size_t packet_size = sizeof(dm) + dm.size;
std::string sendbuf;
sendbuf.reserve(packet_size + 1);
if (sendbuf.capacity() >= packet_size) // check if there is enough memory
{
sendbuf.append((const char*)&dm, sizeof(dm));
sendbuf += os.stream().str(); // TODO: Avoid double-coping data from stringstream!
os.clear();
bytes = socket.send(sendbuf.c_str(), packet_size);
//hasConnect = bytes > 0;
}
else
{
EASY_ERROR("Can not send block descriptions. Not enough memory for allocating " << packet_size << " bytes");
}
}
else
{
EASY_ERROR("Can not send block descriptions. Bad std::stringstream.tellp() == -1");
}
replyMessage.type = profiler::net::MESSAGE_TYPE_REPLY_BLOCKS_DESCRIPTION_END;
bytes = socket.send(&replyMessage, sizeof(replyMessage));
hasConnect = bytes > 0;
break;
}
case profiler::net::MESSAGE_TYPE_EDIT_BLOCK_STATUS:
{
auto data = reinterpret_cast<const profiler::net::BlockStatusMessage*>(message);
EASY_LOGMSG("receive EDIT_BLOCK_STATUS id=" << data->id << " status=" << data->status << std::endl);
setBlockStatus(data->id, static_cast<::profiler::EasyBlockStatus>(data->status));
break;
}
case profiler::net::MESSAGE_TYPE_EVENT_TRACING_STATUS:
{
auto data = reinterpret_cast<const profiler::net::BoolMessage*>(message);
EASY_LOGMSG("receive EVENT_TRACING_STATUS on=" << data->flag << std::endl);
m_isEventTracingEnabled.store(data->flag, std::memory_order_release);
break;
}
case profiler::net::MESSAGE_TYPE_EVENT_TRACING_PRIORITY:
{
#if defined(_WIN32) || EASY_OPTION_LOG_ENABLED != 0
auto data = reinterpret_cast<const profiler::net::BoolMessage*>(message);
#endif
EASY_LOGMSG("receive EVENT_TRACING_PRIORITY low=" << data->flag << std::endl);
#if defined(_WIN32)
EasyEventTracer::instance().setLowPriority(data->flag);
#endif
break;
}
default:
break;
}
}
}
if (dumping)
{
m_stopDumping.store(true, std::memory_order_release);
join(dumpingResult);
}
}
//////////////////////////////////////////////////////////////////////////

View File

@ -104,10 +104,11 @@ class ProfileManager
std::atomic_bool m_isAlreadyListening;
std::atomic_bool m_frameMaxReset;
std::atomic_bool m_frameAvgReset;
std::atomic_bool m_stopDumping;
std::string m_csInfoFilename = "/tmp/cs_profiling_info.log";
uint32_t dumpBlocksToStream(profiler::OStream& _outputStream, bool _lockSpin);
uint32_t dumpBlocksToStream(profiler::OStream& _outputStream, bool _lockSpin, bool _async);
void setBlockStatus(profiler::block_id_t _id, profiler::EasyBlockStatus _status);
std::thread m_listenThread;