0
0
mirror of https://github.com/yse/easy_profiler.git synced 2024-12-26 08:01:51 +08:00

[ui] fixed SocketListener listen methods; moved SocketListener and FileReader into separate source files.

This commit is contained in:
Victor Zarubkin 2019-10-29 23:52:53 +03:00
parent 00ee2d0e18
commit 6ae08c3a70
9 changed files with 1468 additions and 1067 deletions

View File

@ -30,6 +30,8 @@ if (Qt5Widgets_FOUND)
descriptors_tree_widget.cpp
dialog.h
dialog.cpp
file_reader.h
file_reader.cpp
fps_widget.h
fps_widget.cpp
globals.h
@ -49,6 +51,8 @@ if (Qt5Widgets_FOUND)
main_window.cpp
round_progress_widget.h
round_progress_widget.cpp
socket_listener.h
socket_listener.cpp
text_highlighter.h
text_highlighter.cpp
timer.h

View File

@ -600,4 +600,17 @@ namespace profiler_gui {
return median;
}
//////////////////////////////////////////////////////////////////////////
void clear_stream(std::stringstream& _stream)
{
#if defined(__GNUC__) && __GNUC__ < 5
// gcc 4 has a known bug which has been solved in gcc 5:
// std::stringstream has no swap() method :(
_stream.str(std::string());
#else
std::stringstream().swap(_stream);
#endif
}
} // end of namespace profiler_gui.

View File

@ -55,6 +55,7 @@
#ifndef EASY_PROFILER_GUI_COMMON_FUNCTIONS_H
#define EASY_PROFILER_GUI_COMMON_FUNCTIONS_H
#include <sstream>
#include <stdlib.h>
#include <type_traits>
@ -254,6 +255,10 @@ void deleteTreeItem(QTreeWidgetItem* item);
profiler::timestamp_t calculateMedian(const DurationsCountMap& durations);
///////////////////////////////////////////////////////////////////////
void clear_stream(std::stringstream& _stream);
} // END of namespace profiler_gui.
//////////////////////////////////////////////////////////////////////////

View File

@ -0,0 +1,241 @@
//
// Created by vzarubkin on 29.10.19.
//
#include <fstream>
#include <QFile>
#include <easy/writer.h>
#include "common_functions.h"
#include "globals.h"
#include "file_reader.h"
#ifdef min
#undef min
#endif
#ifdef max
#undef max
#endif
FileReader::FileReader()
{
}
FileReader::~FileReader()
{
interrupt();
}
const bool FileReader::isFile() const
{
return m_isFile;
}
const bool FileReader::isSaving() const
{
return m_jobType == JobType::Saving;
}
const bool FileReader::isLoading() const
{
return m_jobType == JobType::Loading;
}
const bool FileReader::isSnapshot() const
{
return m_isSnapshot;
}
bool FileReader::done() const
{
return m_bDone.load(std::memory_order_acquire);
}
int FileReader::progress() const
{
return m_progress.load(std::memory_order_acquire);
}
unsigned int FileReader::size() const
{
return m_size.load(std::memory_order_acquire);
}
const QString& FileReader::filename() const
{
return m_filename;
}
void FileReader::load(const QString& _filename)
{
interrupt();
m_jobType = JobType::Loading;
m_isFile = true;
m_isSnapshot = false;
m_filename = _filename;
m_thread = std::thread([this] (bool _enableStatistics)
{
const auto size = fillTreesFromFile(m_progress, m_filename.toStdString().c_str(), m_beginEndTime, m_serializedBlocks,
m_serializedDescriptors, m_descriptors, m_blocks, m_blocksTree,
m_bookmarks, m_descriptorsNumberInFile, m_version, m_pid,
_enableStatistics, m_errorMessage);
m_size.store(size, std::memory_order_release);
m_progress.store(100, std::memory_order_release);
m_bDone.store(true, std::memory_order_release);
}, EASY_GLOBALS.enable_statistics);
}
void FileReader::load(std::stringstream& _stream)
{
interrupt();
m_jobType = JobType::Loading;
m_isFile = false;
m_isSnapshot = false;
m_filename.clear();
#if defined(__GNUC__) && __GNUC__ < 5 && !defined(__llvm__)
// gcc 4 has a known bug which has been solved in gcc 5:
// std::stringstream has no swap() method :(
// have to copy all contents... Use gcc 5 or higher!
#pragma message "Warning: in gcc 4 and lower std::stringstream has no swap()! Memory consumption may increase! Better use gcc 5 or higher instead."
m_stream.str(_stream.str());
#else
m_stream.swap(_stream);
#endif
m_thread = std::thread([this] (bool _enableStatistics)
{
std::ofstream cache_file(NETWORK_CACHE_FILE, std::fstream::binary);
if (cache_file.is_open())
{
cache_file << m_stream.str();
cache_file.close();
}
const auto size = fillTreesFromStream(m_progress, m_stream, m_beginEndTime, m_serializedBlocks, m_serializedDescriptors,
m_descriptors, m_blocks, m_blocksTree, m_bookmarks, m_descriptorsNumberInFile,
m_version, m_pid, _enableStatistics, m_errorMessage);
m_size.store(size, std::memory_order_release);
m_progress.store(100, std::memory_order_release);
m_bDone.store(true, std::memory_order_release);
}, EASY_GLOBALS.enable_statistics);
}
void FileReader::save(const QString& _filename, profiler::timestamp_t _beginTime, profiler::timestamp_t _endTime,
const profiler::SerializedData& _serializedDescriptors,
const profiler::descriptors_list_t& _descriptors, profiler::block_id_t descriptors_count,
const profiler::thread_blocks_tree_t& _trees, const profiler::bookmarks_t& bookmarks,
profiler::block_getter_fn block_getter, profiler::processid_t _pid, bool snapshotMode)
{
interrupt();
m_jobType = JobType::Saving;
m_isFile = true;
m_isSnapshot = snapshotMode;
m_filename = _filename;
auto serializedDescriptors = std::ref(_serializedDescriptors);
auto descriptors = std::ref(_descriptors);
auto trees = std::ref(_trees);
auto bookmarksRef = std::ref(bookmarks);
m_thread = std::thread([=] (profiler::block_getter_fn getter)
{
const QString tmpFile = m_filename + ".tmp";
const auto result = writeTreesToFile(m_progress, tmpFile.toStdString().c_str(), serializedDescriptors,
descriptors, descriptors_count, trees, bookmarksRef, getter,
_beginTime, _endTime, _pid, m_errorMessage);
if (result == 0 || !m_errorMessage.str().empty())
{
// Remove temporary file in case of error
QFile::remove(tmpFile);
}
else
{
// Remove old file if exists
{
QFile out(m_filename);
if (out.exists())
out.remove();
}
QFile::rename(tmpFile, m_filename);
}
m_progress.store(100, std::memory_order_release);
m_bDone.store(true, std::memory_order_release);
}, std::move(block_getter));
}
void FileReader::interrupt()
{
join();
m_bDone.store(false, std::memory_order_release);
m_size.store(0, std::memory_order_release);
m_serializedBlocks.clear();
m_serializedDescriptors.clear();
m_descriptors.clear();
m_blocks.clear();
m_blocksTree.clear();
m_bookmarks.clear();
m_descriptorsNumberInFile = 0;
m_version = 0;
m_pid = 0;
m_jobType = JobType::Idle;
m_isSnapshot = false;
profiler_gui::clear_stream(m_stream);
profiler_gui::clear_stream(m_errorMessage);
}
void FileReader::get(profiler::SerializedData& _serializedBlocks, profiler::SerializedData& _serializedDescriptors,
profiler::descriptors_list_t& _descriptors, profiler::blocks_t& _blocks,
profiler::thread_blocks_tree_t& _trees, profiler::bookmarks_t& bookmarks,
profiler::BeginEndTime& beginEndTime, uint32_t& _descriptorsNumberInFile, uint32_t& _version,
profiler::processid_t& _pid, QString& _filename)
{
if (done())
{
m_serializedBlocks.swap(_serializedBlocks);
m_serializedDescriptors.swap(_serializedDescriptors);
profiler::descriptors_list_t(std::move(m_descriptors)).swap(_descriptors);
m_blocks.swap(_blocks);
m_blocksTree.swap(_trees);
m_bookmarks.swap(bookmarks);
m_filename.swap(_filename);
beginEndTime = m_beginEndTime;
_descriptorsNumberInFile = m_descriptorsNumberInFile;
_version = m_version;
_pid = m_pid;
}
}
void FileReader::join()
{
m_progress.store(-100, std::memory_order_release);
if (m_thread.joinable())
{
m_thread.join();
}
m_progress.store(0, std::memory_order_release);
}
QString FileReader::getError() const
{
return QString(m_errorMessage.str().c_str());
}

View File

@ -0,0 +1,88 @@
//
// Created by vzarubkin on 29.10.19.
//
#ifndef EASY_PROFILER_FILE_READER_H
#define EASY_PROFILER_FILE_READER_H
#include <atomic>
#include <sstream>
#include <thread>
#include <QObject>
#include <QString>
#include <easy/reader.h>
EASY_CONSTEXPR auto NETWORK_CACHE_FILE = "easy_profiler_stream.cache";
class FileReader Q_DECL_FINAL
{
enum class JobType : int8_t
{
Idle = 0,
Loading,
Saving,
};
profiler::SerializedData m_serializedBlocks; ///<
profiler::SerializedData m_serializedDescriptors; ///<
profiler::descriptors_list_t m_descriptors; ///<
profiler::blocks_t m_blocks; ///<
profiler::thread_blocks_tree_t m_blocksTree; ///<
profiler::bookmarks_t m_bookmarks; ///<
profiler::BeginEndTime m_beginEndTime; ///<
std::stringstream m_stream; ///<
std::stringstream m_errorMessage; ///<
QString m_filename; ///<
profiler::processid_t m_pid = 0; ///<
uint32_t m_descriptorsNumberInFile = 0; ///<
uint32_t m_version = 0; ///<
std::thread m_thread; ///<
std::atomic_bool m_bDone; ///<
std::atomic<int> m_progress; ///<
std::atomic<unsigned int> m_size; ///<
JobType m_jobType = JobType::Idle; ///<
bool m_isFile = false; ///<
bool m_isSnapshot = false; ///<
public:
FileReader();
~FileReader();
const bool isFile() const;
const bool isSaving() const;
const bool isLoading() const;
const bool isSnapshot() const;
bool done() const;
int progress() const;
unsigned int size() const;
const QString& filename() const;
void load(const QString& _filename);
void load(std::stringstream& _stream);
/** \brief Save data to file.
*/
void save(const QString& _filename, profiler::timestamp_t _beginTime, profiler::timestamp_t _endTime,
const profiler::SerializedData& _serializedDescriptors, const profiler::descriptors_list_t& _descriptors,
profiler::block_id_t descriptors_count, const profiler::thread_blocks_tree_t& _trees,
const profiler::bookmarks_t& bookmarks, profiler::block_getter_fn block_getter,
profiler::processid_t _pid, bool snapshotMode);
void interrupt();
void get(profiler::SerializedData& _serializedBlocks, profiler::SerializedData& _serializedDescriptors,
profiler::descriptors_list_t& _descriptors, profiler::blocks_t& _blocks, profiler::thread_blocks_tree_t& _trees,
profiler::bookmarks_t& bookmarks, profiler::BeginEndTime& beginEndTime, uint32_t& _descriptorsNumberInFile,
uint32_t& _version, profiler::processid_t& _pid, QString& _filename);
void join();
QString getError() const;
}; // END of class FileReader.
#endif //EASY_PROFILER_FILE_READER_H

File diff suppressed because it is too large Load Diff

View File

@ -55,6 +55,7 @@
#define EASY_PROFILER_GUI__MAIN_WINDOW__H
#include <atomic>
#include <chrono>
#include <sstream>
#include <string>
#include <thread>
@ -64,9 +65,8 @@
#include <QTimer>
#include <QStringList>
#include <easy/easy_socket.h>
#include <easy/reader.h>
#include "round_progress_widget.h"
#include "file_reader.h"
#include "socket_listener.h"
#ifdef max
#undef max
@ -80,148 +80,6 @@
#define EASY_GUI_USE_DESCRIPTORS_DOCK_WINDOW 0
namespace profiler { namespace net { struct EasyProfilerStatus; } }
//////////////////////////////////////////////////////////////////////////
class FileReader Q_DECL_FINAL
{
enum class JobType : int8_t
{
Idle=0,
Loading,
Saving,
};
profiler::SerializedData m_serializedBlocks; ///<
profiler::SerializedData m_serializedDescriptors; ///<
profiler::descriptors_list_t m_descriptors; ///<
profiler::blocks_t m_blocks; ///<
profiler::thread_blocks_tree_t m_blocksTree; ///<
profiler::bookmarks_t m_bookmarks; ///<
profiler::BeginEndTime m_beginEndTime; ///<
std::stringstream m_stream; ///<
std::stringstream m_errorMessage; ///<
QString m_filename; ///<
profiler::processid_t m_pid = 0; ///<
uint32_t m_descriptorsNumberInFile = 0; ///<
uint32_t m_version = 0; ///<
std::thread m_thread; ///<
std::atomic_bool m_bDone; ///<
std::atomic<int> m_progress; ///<
std::atomic<unsigned int> m_size; ///<
JobType m_jobType = JobType::Idle; ///<
bool m_isFile = false; ///<
bool m_isSnapshot = false; ///<
public:
FileReader();
~FileReader();
const bool isFile() const;
const bool isSaving() const;
const bool isLoading() const;
const bool isSnapshot() const;
bool done() const;
int progress() const;
unsigned int size() const;
const QString& filename() const;
void load(const QString& _filename);
void load(std::stringstream& _stream);
/** \brief Save data to file.
*/
void save(const QString& _filename, profiler::timestamp_t _beginTime, profiler::timestamp_t _endTime,
const profiler::SerializedData& _serializedDescriptors, const profiler::descriptors_list_t& _descriptors,
profiler::block_id_t descriptors_count, const profiler::thread_blocks_tree_t& _trees,
const profiler::bookmarks_t& bookmarks, profiler::block_getter_fn block_getter,
profiler::processid_t _pid, bool snapshotMode);
void interrupt();
void get(profiler::SerializedData& _serializedBlocks, profiler::SerializedData& _serializedDescriptors,
profiler::descriptors_list_t& _descriptors, profiler::blocks_t& _blocks, profiler::thread_blocks_tree_t& _trees,
profiler::bookmarks_t& bookmarks, profiler::BeginEndTime& beginEndTime, uint32_t& _descriptorsNumberInFile,
uint32_t& _version, profiler::processid_t& _pid, QString& _filename);
void join();
QString getError();
}; // END of class FileReader.
//////////////////////////////////////////////////////////////////////////
enum class ListenerRegime : uint8_t
{
Idle = 0,
Capture,
Capture_Receive,
Descriptors
};
class SocketListener Q_DECL_FINAL
{
EasySocket m_easySocket; ///<
std::string m_address; ///<
std::stringstream m_receivedData; ///<
std::thread m_thread; ///<
uint64_t m_receivedSize; ///<
uint16_t m_port; ///<
std::atomic<uint32_t> m_frameMax; ///<
std::atomic<uint32_t> m_frameAvg; ///<
std::atomic_bool m_bInterrupt; ///<
std::atomic_bool m_bConnected; ///<
std::atomic_bool m_bStopReceive; ///<
std::atomic_bool m_bCaptureReady; ///<
std::atomic_bool m_bFrameTimeReady; ///<
ListenerRegime m_regime; ///<
public:
SocketListener();
~SocketListener();
bool connected() const;
bool captured() const;
ListenerRegime regime() const;
uint64_t size() const;
const std::string& address() const;
uint16_t port() const;
std::stringstream& data();
void clearData();
void disconnect();
void closeSocket();
bool connect(const char* _ipaddress, uint16_t _port, profiler::net::EasyProfilerStatus& _reply, bool _disconnectFirst = false);
bool reconnect(const char* _ipaddress, uint16_t _port, profiler::net::EasyProfilerStatus& _reply);
bool startCapture();
void stopCapture();
void finalizeCapture();
void requestBlocksDescription();
bool frameTime(uint32_t& _maxTime, uint32_t& _avgTime);
bool requestFrameTime();
template <class T>
void send(const T& _message) {
m_easySocket.send(&_message, sizeof(T));
}
private:
void listenCapture();
void listenDescription();
void listenFrameTime();
}; // END of class SocketListener.
//////////////////////////////////////////////////////////////////////////
class DockWidget : public QDockWidget
{
Q_OBJECT
@ -285,6 +143,7 @@ protected:
profiler::BeginEndTime m_beginEndTime;
FileReader m_reader;
SocketListener m_listener;
std::chrono::system_clock::time_point m_listenStartTime;
class QLineEdit* m_addressEdit = nullptr;
class QLineEdit* m_portEdit = nullptr;

View File

@ -0,0 +1,991 @@
//
// Created by vzarubkin on 29.10.19.
//
#include <QDebug>
#include <easy/easy_net.h>
#include "common_functions.h"
#include "socket_listener.h"
#ifdef min
#undef min
#endif
#ifdef max
#undef max
#endif
SocketListener::SocketListener() : m_receivedSize(0), m_port(0), m_regime(ListenerRegime::Idle)
{
m_bInterrupt = false;
m_bConnected = false;
m_bStopReceive = false;
m_bFrameTimeReady = false;
m_bCaptureReady = false;
m_frameMax = 0;
m_frameAvg = 0;
}
SocketListener::~SocketListener()
{
m_bInterrupt.store(true, std::memory_order_release);
if (m_thread.joinable())
{
m_thread.join();
}
}
bool SocketListener::connected() const
{
return m_bConnected.load(std::memory_order_acquire);
}
bool SocketListener::captured() const
{
return m_bCaptureReady.load(std::memory_order_acquire);
}
ListenerRegime SocketListener::regime() const
{
return m_regime;
}
uint64_t SocketListener::size() const
{
return m_receivedSize;
}
std::stringstream& SocketListener::data()
{
return m_receivedData;
}
const std::string& SocketListener::address() const
{
return m_address;
}
uint16_t SocketListener::port() const
{
return m_port;
}
void SocketListener::clearData()
{
profiler_gui::clear_stream(m_receivedData);
m_receivedSize = 0;
}
void SocketListener::disconnect()
{
if (connected())
{
m_bInterrupt.store(true, std::memory_order_release);
if (m_thread.joinable())
{
m_thread.join();
}
m_bConnected.store(false, std::memory_order_release);
m_bInterrupt.store(false, std::memory_order_release);
m_bCaptureReady.store(false, std::memory_order_release);
m_bStopReceive.store(false, std::memory_order_release);
}
m_address.clear();
m_port = 0;
closeSocket();
}
void SocketListener::closeSocket()
{
m_easySocket.flush();
m_easySocket.init();
}
bool SocketListener::connect(
const char* _ipaddress,
uint16_t _port,
profiler::net::EasyProfilerStatus& _reply,
bool _disconnectFirst
) {
if (connected())
{
m_bInterrupt.store(true, std::memory_order_release);
if (m_thread.joinable())
{
m_thread.join();
}
m_bConnected.store(false, std::memory_order_release);
m_bInterrupt.store(false, std::memory_order_release);
m_bCaptureReady.store(false, std::memory_order_release);
m_bStopReceive.store(false, std::memory_order_release);
}
m_address.clear();
m_port = 0;
if (_disconnectFirst)
{
closeSocket();
}
int res = m_easySocket.setAddress(_ipaddress, _port);
res = m_easySocket.connect();
const bool isConnected = res == 0;
if (isConnected)
{
EASY_CONSTEXPR size_t buffer_size = sizeof(profiler::net::EasyProfilerStatus) << 1;
char buffer[buffer_size] = {};
int bytes = 0;
while (true)
{
bytes = m_easySocket.receive(buffer, buffer_size);
if (bytes == -1)
{
if (m_easySocket.isDisconnected())
{
return false;
}
bytes = 0;
continue;
}
break;
}
if (bytes == 0)
{
m_address = _ipaddress;
m_port = _port;
m_bConnected.store(isConnected, std::memory_order_release);
return isConnected;
}
size_t seek = bytes;
while (seek < sizeof(profiler::net::EasyProfilerStatus))
{
bytes = m_easySocket.receive(buffer + seek, buffer_size - seek);
if (bytes == -1)
{
if (m_easySocket.isDisconnected())
{
return false;
}
break;
}
seek += bytes;
}
auto message = reinterpret_cast<const profiler::net::EasyProfilerStatus*>(buffer);
if (message->isEasyNetMessage() && message->type == profiler::net::MessageType::Connection_Accepted)
{
_reply = *message;
}
m_address = _ipaddress;
m_port = _port;
}
m_bConnected.store(isConnected, std::memory_order_release);
return isConnected;
}
bool SocketListener::reconnect(const char* _ipaddress, uint16_t _port, profiler::net::EasyProfilerStatus& _reply)
{
return connect(_ipaddress, _port, _reply, true);
}
bool SocketListener::startCapture()
{
//if (m_thread.joinable())
//{
// m_bInterrupt.store(true, std::memory_order_release);
// m_thread.join();
// m_bInterrupt.store(false, std::memory_order_release);
//}
clearData();
profiler::net::Message request(profiler::net::MessageType::Request_Start_Capture);
m_easySocket.send(&request, sizeof(request));
if (m_easySocket.isDisconnected())
{
m_bConnected.store(false, std::memory_order_release);
return false;
}
m_regime = ListenerRegime::Capture;
m_bCaptureReady.store(false, std::memory_order_release);
//m_thread = std::thread(&SocketListener::listenCapture, this);
return true;
}
void SocketListener::stopCapture()
{
//if (!m_thread.joinable() || m_regime != ListenerRegime::Capture)
// return;
if (m_regime != ListenerRegime::Capture)
{
return;
}
//m_bStopReceive.store(true, std::memory_order_release);
profiler::net::Message request(profiler::net::MessageType::Request_Stop_Capture);
m_easySocket.send(&request, sizeof(request));
//m_thread.join();
if (m_easySocket.isDisconnected())
{
m_bConnected.store(false, std::memory_order_release);
m_bStopReceive.store(false, std::memory_order_release);
m_regime = ListenerRegime::Idle;
m_bCaptureReady.store(true, std::memory_order_release);
return;
}
m_regime = ListenerRegime::Capture_Receive;
if (m_thread.joinable())
{
m_bInterrupt.store(true, std::memory_order_release);
m_thread.join();
m_bInterrupt.store(false, std::memory_order_release);
}
m_thread = std::thread(&SocketListener::listenCapture, this);
//m_regime = ListenerRegime::Idle;
//m_bStopReceive.store(false, std::memory_order_release);
}
void SocketListener::finalizeCapture()
{
if (m_thread.joinable())
{
m_bInterrupt.store(true, std::memory_order_release);
m_thread.join();
m_bInterrupt.store(false, std::memory_order_release);
}
m_regime = ListenerRegime::Idle;
m_bCaptureReady.store(false, std::memory_order_release);
m_bStopReceive.store(false, std::memory_order_release);
}
void SocketListener::requestBlocksDescription()
{
if (m_thread.joinable())
{
m_bInterrupt.store(true, std::memory_order_release);
m_thread.join();
m_bInterrupt.store(false, std::memory_order_release);
}
clearData();
profiler::net::Message request(profiler::net::MessageType::Request_Blocks_Description);
m_easySocket.send(&request, sizeof(request));
if (m_easySocket.isDisconnected())
{
m_bConnected.store(false, std::memory_order_release);
}
m_regime = ListenerRegime::Descriptors;
listenDescription();
m_regime = ListenerRegime::Idle;
}
bool SocketListener::frameTime(uint32_t& _maxTime, uint32_t& _avgTime)
{
if (m_bFrameTimeReady.exchange(false, std::memory_order_acquire))
{
_maxTime = m_frameMax.load(std::memory_order_acquire);
_avgTime = m_frameAvg.load(std::memory_order_acquire);
return true;
}
return false;
}
bool SocketListener::requestFrameTime()
{
if (m_regime != ListenerRegime::Idle && m_regime != ListenerRegime::Capture)
{
return false;
}
if (m_thread.joinable())
{
m_bInterrupt.store(true, std::memory_order_release);
m_thread.join();
m_bInterrupt.store(false, std::memory_order_release);
}
profiler::net::Message request(profiler::net::MessageType::Request_MainThread_FPS);
m_easySocket.send(&request, sizeof(request));
if (m_easySocket.isDisconnected())
{
m_bConnected.store(false, std::memory_order_release);
return false;
}
m_bFrameTimeReady.store(false, std::memory_order_release);
m_thread = std::thread(&SocketListener::listenFrameTime, this);
return true;
}
//////////////////////////////////////////////////////////////////////////
void SocketListener::listenCapture()
{
EASY_CONSTEXPR int buffer_size = 8 * 1024 * 1024;
char* buffer = new char[buffer_size];
int seek = 0, bytes = 0;
auto timeBegin = std::chrono::system_clock::now();
bool isListen = true, disconnected = false;
while (isListen && !m_bInterrupt.load(std::memory_order_acquire))
{
if (m_bStopReceive.load(std::memory_order_acquire))
{
profiler::net::Message request(profiler::net::MessageType::Request_Stop_Capture);
m_easySocket.send(&request, sizeof(request));
m_bStopReceive.store(false, std::memory_order_release);
}
if (bytes < 1)
{
bytes = 0;
seek = 0;
}
else if (seek > 0)
{
if (bytes < seek)
{
memcpy(buffer, buffer + seek, static_cast<size_t>(bytes));
}
else
{
memcpy(buffer, buffer + seek, static_cast<size_t>(seek));
for (int i = seek; i < bytes; ++i)
{
buffer[i] = buffer[seek + i];
}
}
seek = 0;
}
while (bytes < sizeof(profiler::net::Message))
{
int receivedBytes = m_easySocket.receive(buffer + seek + bytes, buffer_size);
if (receivedBytes < 1)
{
bytes = receivedBytes;
break;
}
bytes += receivedBytes;
}
if (bytes == -1)
{
if (m_easySocket.isDisconnected())
{
m_bConnected.store(false, std::memory_order_release);
isListen = false;
disconnected = true;
}
bytes = 0;
seek = 0;
continue;
}
if (bytes == 0)
{
seek = 0;
isListen = false;
break;
}
auto message = reinterpret_cast<const profiler::net::Message*>(buffer + seek);
while (!message->isEasyNetMessage())
{
seek += sizeof(uint32_t);
bytes -= sizeof(uint32_t);
if (seek >= buffer_size || bytes <= 0)
{
seek = 0;
bytes = 0;
message = nullptr;
break;
}
if (bytes < sizeof(profiler::net::Message))
{
message = nullptr;
break;
}
message = reinterpret_cast<const profiler::net::Message*>(buffer + seek);
}
if (bytes < 1 || message == nullptr)
{
continue;
}
switch (message->type)
{
case profiler::net::MessageType::Connection_Accepted:
{
qInfo() << "Receive MessageType::Connection_Accepted";
//m_easySocket.send(&request, sizeof(request));
seek += sizeof(profiler::net::Message);
bytes -= sizeof(profiler::net::Message);
break;
}
case profiler::net::MessageType::Reply_Capturing_Started:
{
qInfo() << "Receive MessageType::Reply_Capturing_Started";
seek += sizeof(profiler::net::Message);
bytes -= sizeof(profiler::net::Message);
break;
}
case profiler::net::MessageType::Reply_Blocks_End:
{
qInfo() << "Receive MessageType::Reply_Blocks_End";
seek += sizeof(profiler::net::Message);
bytes -= sizeof(profiler::net::Message);
const auto dt = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - timeBegin);
const auto bytesNumber = m_receivedData.str().size();
qInfo() << "received " << bytesNumber << " bytes, " << dt.count() << " ms, average speed = "
<< double(bytesNumber) * 1e3 / double(dt.count()) / 1024. << " kBytes/sec";
isListen = false;
break;
}
case profiler::net::MessageType::Reply_Blocks:
{
qInfo() << "Receive MessageType::Reply_Blocks";
while (bytes < sizeof(profiler::net::DataMessage))
{
int receivedBytes = m_easySocket.receive(buffer + seek + bytes, buffer_size);
if (receivedBytes < 1)
{
bytes = receivedBytes;
break;
}
bytes += receivedBytes;
}
if (bytes == -1)
{
if (m_easySocket.isDisconnected())
{
m_bConnected.store(false, std::memory_order_release);
isListen = false;
disconnected = true;
}
bytes = 0;
seek = 0;
continue;
}
if (bytes == 0)
{
seek = 0;
isListen = false;
continue;
}
seek += sizeof(profiler::net::DataMessage);
bytes -= sizeof(profiler::net::DataMessage);
auto dm = reinterpret_cast<const profiler::net::DataMessage*>(message);
timeBegin = std::chrono::system_clock::now();
int neededSize = dm->size;
const int bytesNumber = std::min(neededSize, bytes);
if (bytesNumber > 0)
{
char* buf = buffer + seek;
m_receivedSize += bytesNumber;
m_receivedData.write(buf, bytesNumber);
neededSize -= bytesNumber;
bytes -= bytesNumber;
seek += bytesNumber;
}
if (bytes < 1)
{
// this is possible only when (neededSize - bytesNumber) >= 0
seek = 0;
}
while (neededSize > 0)
{
// We can get here only when neededSize > (bytes - seek). Therefore bytes == 0 and seek = 0 here.
bytes = m_easySocket.receive(buffer, buffer_size);
if (bytes == -1)
{
break;
}
const int toWrite = std::min(bytes, neededSize);
m_receivedSize += toWrite;
m_receivedData.write(buffer, toWrite);
neededSize -= toWrite;
bytes -= toWrite;
if (bytes < 1)
{
seek = 0;
}
else
{
seek = toWrite;
}
}
if (bytes == -1)
{
if (m_easySocket.isDisconnected())
{
m_bConnected.store(false, std::memory_order_release);
isListen = false;
disconnected = true;
}
bytes = 0;
seek = 0;
continue;
}
if (m_bStopReceive.load(std::memory_order_acquire))
{
profiler::net::Message request(profiler::net::MessageType::Request_Stop_Capture);
m_easySocket.send(&request, sizeof(request));
m_bStopReceive.store(false, std::memory_order_release);
}
break;
}
default:
{
//qInfo() << "Receive unknown " << message->type;
break;
}
}
}
if (disconnected)
{
clearData();
}
delete [] buffer;
m_bCaptureReady.store(true, std::memory_order_release);
}
void SocketListener::listenDescription()
{
EASY_CONSTEXPR int buffer_size = 8 * 1024 * 1024;
char* buffer = new char[buffer_size];
int seek = 0, bytes = 0;
bool isListen = true, disconnected = false;
while (isListen && !m_bInterrupt.load(std::memory_order_acquire))
{
if (bytes < 1)
{
bytes = 0;
seek = 0;
}
else if (seek > 0)
{
if (bytes < seek)
{
memcpy(buffer, buffer + seek, static_cast<size_t>(bytes));
}
else
{
memcpy(buffer, buffer + seek, static_cast<size_t>(seek));
for (int i = seek; i < bytes; ++i)
{
buffer[i] = buffer[seek + i];
}
}
seek = 0;
}
while (bytes < sizeof(profiler::net::Message))
{
int receivedBytes = m_easySocket.receive(buffer + seek + bytes, buffer_size);
if (receivedBytes < 1)
{
bytes = receivedBytes;
break;
}
bytes += receivedBytes;
}
if (bytes == -1)
{
if (m_easySocket.isDisconnected())
{
m_bConnected.store(false, std::memory_order_release);
isListen = false;
disconnected = true;
}
bytes = 0;
seek = 0;
continue;
}
if (bytes == 0)
{
seek = 0;
isListen = false;
break;
}
auto message = reinterpret_cast<const profiler::net::Message*>(buffer + seek);
while (!message->isEasyNetMessage())
{
seek += sizeof(uint32_t);
bytes -= sizeof(uint32_t);
if (seek >= buffer_size || bytes <= 0)
{
seek = 0;
bytes = 0;
message = nullptr;
break;
}
if (bytes < sizeof(profiler::net::Message))
{
message = nullptr;
break;
}
message = reinterpret_cast<const profiler::net::Message*>(buffer + seek);
}
if (bytes < 1 || message == nullptr)
{
continue;
}
switch (message->type)
{
case profiler::net::MessageType::Connection_Accepted:
{
qInfo() << "Receive MessageType::Connection_Accepted";
seek += sizeof(profiler::net::Message);
bytes -= sizeof(profiler::net::Message);
break;
}
case profiler::net::MessageType::Reply_Blocks_Description_End:
{
qInfo() << "Receive MessageType::Reply_Blocks_Description_End";
seek += sizeof(profiler::net::Message);
bytes -= sizeof(profiler::net::Message);
isListen = false;
break;
}
case profiler::net::MessageType::Reply_Blocks_Description:
{
qInfo() << "Receive MessageType::Reply_Blocks_Description";
while (bytes < sizeof(profiler::net::DataMessage))
{
int receivedBytes = m_easySocket.receive(buffer + seek + bytes, buffer_size);
if (receivedBytes < 1)
{
bytes = receivedBytes;
break;
}
bytes += receivedBytes;
}
if (bytes == -1)
{
if (m_easySocket.isDisconnected())
{
m_bConnected.store(false, std::memory_order_release);
isListen = false;
disconnected = true;
}
bytes = 0;
seek = 0;
continue;
}
if (bytes == 0)
{
seek = 0;
isListen = false;
continue;
}
seek += sizeof(profiler::net::DataMessage);
bytes -= sizeof(profiler::net::DataMessage);
auto dm = reinterpret_cast<const profiler::net::DataMessage*>(message);
int neededSize = dm->size;
const int bytesNumber = std::min(neededSize, bytes);
if (bytesNumber > 0)
{
char* buf = buffer + seek;
m_receivedSize += bytesNumber;
m_receivedData.write(buf, bytesNumber);
neededSize -= bytesNumber;
bytes -= bytesNumber;
seek += bytesNumber;
}
if (bytes < 1)
{
// this is possible only when (neededSize - bytesNumber) >= 0
seek = 0;
}
while (neededSize > 0)
{
// We can get here only when neededSize > (bytes - seek). Therefore bytes == 0 and seek = 0 here.
bytes = m_easySocket.receive(buffer, buffer_size);
if (bytes == -1)
{
break;
}
const int toWrite = std::min(bytes, neededSize);
m_receivedSize += toWrite;
m_receivedData.write(buffer, toWrite);
neededSize -= toWrite;
bytes -= toWrite;
if (bytes < 1)
{
seek = 0;
}
else
{
seek = toWrite;
}
}
if (bytes == -1)
{
if (m_easySocket.isDisconnected())
{
m_bConnected.store(false, std::memory_order_release);
isListen = false;
disconnected = true;
}
bytes = 0;
seek = 0;
continue;
}
break;
}
default:
{
break;
}
}
}
if (disconnected)
{
clearData();
}
delete[] buffer;
}
void SocketListener::listenFrameTime()
{
EASY_CONSTEXPR size_t buffer_size = sizeof(profiler::net::TimestampMessage) << 3;
char buffer[buffer_size] = {};
int seek = 0, bytes = 0;
bool isListen = true;
while (isListen && !m_bInterrupt.load(std::memory_order_acquire))
{
if (bytes < 1)
{
bytes = 0;
seek = 0;
}
else if (seek > 0)
{
if (bytes < seek)
{
memcpy(buffer, buffer + seek, static_cast<size_t>(bytes));
}
else
{
memcpy(buffer, buffer + seek, static_cast<size_t>(seek));
for (int i = seek; i < bytes; ++i)
{
buffer[i] = buffer[seek + i];
}
}
seek = 0;
}
while (bytes < sizeof(profiler::net::Message))
{
int receivedBytes = m_easySocket.receive(buffer + seek + bytes, buffer_size);
if (receivedBytes < 1)
{
bytes = receivedBytes;
break;
}
bytes += receivedBytes;
}
if (bytes == -1)
{
if (m_easySocket.isDisconnected())
{
m_bConnected.store(false, std::memory_order_release);
isListen = false;
}
bytes = 0;
seek = 0;
continue;
}
if (bytes == 0)
{
seek = 0;
isListen = false;
break;
}
auto message = reinterpret_cast<const profiler::net::Message*>(buffer + seek);
while (!message->isEasyNetMessage())
{
seek += sizeof(uint32_t);
bytes -= sizeof(uint32_t);
if (seek >= buffer_size || bytes <= 0)
{
seek = 0;
bytes = 0;
message = nullptr;
break;
}
if (bytes < sizeof(profiler::net::Message))
{
message = nullptr;
break;
}
message = reinterpret_cast<const profiler::net::Message*>(buffer + seek);
}
if (bytes < 1 || message == nullptr)
{
continue;
}
switch (message->type)
{
case profiler::net::MessageType::Connection_Accepted:
case profiler::net::MessageType::Reply_Capturing_Started:
{
seek += sizeof(profiler::net::Message);
bytes -= sizeof(profiler::net::Message);
break;
}
case profiler::net::MessageType::Reply_MainThread_FPS:
{
//qInfo() << "Receive MessageType::Reply_MainThread_FPS";
seek += sizeof(profiler::net::TimestampMessage);
bytes -= sizeof(profiler::net::TimestampMessage);
if (seek <= buffer_size)
{
auto timestampMessage = reinterpret_cast<const profiler::net::TimestampMessage*>(message);
m_frameMax.store(timestampMessage->maxValue, std::memory_order_release);
m_frameAvg.store(timestampMessage->avgValue, std::memory_order_release);
m_bFrameTimeReady.store(true, std::memory_order_release);
}
isListen = false;
break;
}
default:
{
break;
}
}
}
}

View File

@ -0,0 +1,87 @@
//
// Created by vzarubkin on 29.10.19.
//
#ifndef EASY_PROFILER_SOCKET_LISTENER_H
#define EASY_PROFILER_SOCKET_LISTENER_H
#include <atomic>
#include <sstream>
#include <string>
#include <thread>
#include <QObject>
#include <easy/easy_socket.h>
namespace profiler { namespace net { struct EasyProfilerStatus; } }
//////////////////////////////////////////////////////////////////////////
enum class ListenerRegime : uint8_t
{
Idle = 0,
Capture,
Capture_Receive,
Descriptors
};
class SocketListener Q_DECL_FINAL
{
EasySocket m_easySocket; ///<
std::string m_address; ///<
std::stringstream m_receivedData; ///<
std::thread m_thread; ///<
uint64_t m_receivedSize; ///<
uint16_t m_port; ///<
std::atomic<uint32_t> m_frameMax; ///<
std::atomic<uint32_t> m_frameAvg; ///<
std::atomic_bool m_bInterrupt; ///<
std::atomic_bool m_bConnected; ///<
std::atomic_bool m_bStopReceive; ///<
std::atomic_bool m_bCaptureReady; ///<
std::atomic_bool m_bFrameTimeReady; ///<
ListenerRegime m_regime; ///<
public:
SocketListener();
~SocketListener();
bool connected() const;
bool captured() const;
ListenerRegime regime() const;
uint64_t size() const;
const std::string& address() const;
uint16_t port() const;
std::stringstream& data();
void clearData();
void disconnect();
void closeSocket();
bool connect(const char* _ipaddress, uint16_t _port, profiler::net::EasyProfilerStatus& _reply, bool _disconnectFirst = false);
bool reconnect(const char* _ipaddress, uint16_t _port, profiler::net::EasyProfilerStatus& _reply);
bool startCapture();
void stopCapture();
void finalizeCapture();
void requestBlocksDescription();
bool frameTime(uint32_t& _maxTime, uint32_t& _avgTime);
bool requestFrameTime();
template <class T>
void send(const T& _message) {
m_easySocket.send(&_message, sizeof(T));
}
private:
void listenCapture();
void listenDescription();
void listenFrameTime();
}; // END of class SocketListener.
#endif //EASY_PROFILER_SOCKET_LISTENER_H