diff --git a/profiler_gui/main_window.cpp b/profiler_gui/main_window.cpp index 5681e5d..bcf7516 100644 --- a/profiler_gui/main_window.cpp +++ b/profiler_gui/main_window.cpp @@ -122,15 +122,30 @@ EasyMainWindow::EasyMainWindow() : Parent(), m_treeWidget(nullptr), m_graphicsVi - m_server = new QTcpSocket( this ); - m_server->setSocketOption(QAbstractSocket::LowDelayOption, 1); + /*m_server = new QTcpSocket( ); + m_server->setSocketOption(QAbstractSocket::LowDelayOption, 0); + m_server->setReadBufferSize(16 * 1024); - connect(m_server, SIGNAL(readyRead()), SLOT(readTcpData())); + //connect(m_server, SIGNAL(readyRead()), SLOT(readTcpData())); connect(m_server, SIGNAL(connected()), SLOT(onConnected())); connect(m_server, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(onErrorConnection(QAbstractSocket::SocketError)), Qt::UniqueConnection); connect(m_server, SIGNAL(disconnected()), SLOT(onDisconnect()), Qt::UniqueConnection); + + m_receiver = new TcpReceiverThread(this, this); + + connect(m_receiver, &TcpReceiverThread::resultReady, this, &This::handleResults); + connect(m_receiver, &TcpReceiverThread::finished, m_receiver, &QObject::deleteLater); - m_server->connectToHost(m_hostString->text(), m_portString->text().toUShort()); + m_receiver->start(); + */ + + m_thread = std::thread(&This::listen, this); + + //connect(m_server, &QAbstractSocket::readyRead, m_receiver, &TcpReceiverThread::readTcpData); + + //m_receiver->run(); + + //m_server->connectToHost(m_hostString->text(), m_portString->text().toUShort()); //TODO: //connected //error @@ -271,11 +286,181 @@ EasyMainWindow::EasyMainWindow() : Parent(), m_treeWidget(nullptr), m_graphicsVi auto opened_filename = QCoreApplication::arguments().at(1); loadFile(opened_filename); } +} + +#include "../src/easy_socket.h" +#undef max +void EasyMainWindow::listen() +{ + EasySocket socket; + profiler::net::Message request(profiler::net::MESSAGE_TYPE_REQUEST_START_CAPTURE); + + socket.setAddress("127.0.0.1",28077); + socket.connect(); + + //std::this_thread::sleep_for(std::chrono::seconds(2)); + char* buffer = new char[8*2014]; + int seek = 0; + bool isReceiveData = false; + int unreadedBytes = 0; + static auto timeBegin = std::chrono::system_clock::now(); + while (true) + { + + + auto bytes = seek; + if (seek==0) + bytes = socket.receive(buffer, 8 * 2014); + char *buf = &buffer[seek]; + + if (seek==0) + unreadedBytes = bytes; + int readedBytes = 0; + if (bytes > 0) + { + profiler::net::Message* message = (profiler::net::Message*)buf; + if (!message->isEasyNetMessage()){ + + if (isReceiveData) + { + m_receivedProfileData.write(buf, unreadedBytes); + seek = 0; + continue; + }else + continue; + } + + + switch (message->type) { + case profiler::net::MESSAGE_TYPE_ACCEPTED_CONNECTION: + { + qInfo() << "Receive MESSAGE_TYPE_ACCEPTED_CONNECTION"; + + socket.send(&request, sizeof(request)); + + readedBytes = sizeof(profiler::net::Message); + unreadedBytes -= readedBytes; + } + break; + case profiler::net::MESSAGE_TYPE_REPLY_START_CAPTURING: + { + qInfo() << "Receive MESSAGE_TYPE_REPLY_START_CAPTURING"; + + request.type = profiler::net::MESSAGE_TYPE_REQUEST_STOP_CAPTURE; + std::this_thread::sleep_for(std::chrono::seconds(2)); + socket.send(&request, sizeof(request)); + + readedBytes = sizeof(profiler::net::Message); + unreadedBytes -= readedBytes; + } + break; + case profiler::net::MESSAGE_TYPE_REPLY_PREPARE_BLOCKS: + { + qInfo() << "Receive MESSAGE_TYPE_REPLY_PREPARE_BLOCKS"; + m_isClientPreparedBlocks = true; + + readedBytes = sizeof(profiler::net::Message); + unreadedBytes -= readedBytes; + } + break; + case profiler::net::MESSAGE_TYPE_REPLY_END_SEND_BLOCKS: + { + qInfo() << "Receive MESSAGE_TYPE_REPLY_END_SEND_BLOCKS"; + readedBytes = sizeof(profiler::net::Message); + unreadedBytes -= readedBytes; + + isReceiveData = false; + + + auto timeEnd = std::chrono::system_clock::now(); + auto dT = std::chrono::duration_cast(timeEnd - timeBegin); + auto dTsec = std::chrono::duration_cast(timeEnd - timeBegin); + qInfo() << "recieve" << m_receivedProfileData.str().size() << dT.count() << "ms" << double(m_receivedProfileData.str().size())*1000.0 / double(dT.count()) / 1024.0 << "kBytes/sec"; + m_recFrames = false; + + + qInfo() << "Write FILE"; + std::string tempfilename = "test_rec.prof"; + std::ofstream of(tempfilename, std::fstream::binary); + of << m_receivedProfileData.str(); + of.close(); + + m_receivedProfileData.str(std::string()); + m_receivedProfileData.clear(); + loadFile(QString(tempfilename.c_str())); + m_recFrames = false; + + } + break; + case profiler::net::MESSAGE_TYPE_REPLY_BLOCKS: + { + qInfo() << "Receive MESSAGE_TYPE_REPLY_BLOCKS"; + + readedBytes = sizeof(profiler::net::DataMessage); + unreadedBytes -= readedBytes; + isReceiveData = true; + timeBegin = std::chrono::system_clock::now(); + + } break; + + default: + //qInfo() << "Receive unknown " << message->type; + break; + + } + + if (unreadedBytes) + seek += readedBytes; + else + seek = 0; + } + } +} + +void TcpReceiverThread::run() +{ + QString result; + //mainwindow->m_server = new QTcpSocket(this); + + m_server = new QTcpSocket(); + //mainwindow->m_server = m_server; + //auto m_server = mainwindow->m_server; + m_server->setSocketOption(QAbstractSocket::LowDelayOption, 0); + m_server->setReadBufferSize(16 * 1024); + + m_server->connectToHost("127.0.0.1", 28077); + + //connect(m_server, SIGNAL(readyRead()), SLOT(readTcpData())); + connect(m_server, &QAbstractSocket::connected, this, &TcpReceiverThread::onConnected); + //connect(m_server, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(onErrorConnection(QAbstractSocket::SocketError)), Qt::UniqueConnection); + connect(m_server, &QAbstractSocket::disconnected, mainwindow, &EasyMainWindow::onDisconnect); + + connect(m_server, &QAbstractSocket::readyRead, this, &TcpReceiverThread::readTcpData); + /**/ + + + + + + + + exec(); + /* ... here is the expensive or blocking operation ... */ + emit resultReady(result); +} + +TcpReceiverThread::TcpReceiverThread(QObject *parent, EasyMainWindow* mw) :QThread(parent), mainwindow(mw) +{ + + + } EasyMainWindow::~EasyMainWindow() { delete m_progress; + m_receiver->quit(); + m_receiver->wait(); } ////////////////////////////////////////////////////////////////////////// @@ -398,7 +583,7 @@ void EasyMainWindow::onCaptureClicked(bool) QMessageBox::warning(this, "Warning", "No connection with profiling app", QMessageBox::Close); return; } - + m_server = m_receiver->m_server; profiler::net::Message requestMessage(profiler::net::MESSAGE_TYPE_REQUEST_START_CAPTURE); m_server->write((const char*)&requestMessage, sizeof(requestMessage)); @@ -452,7 +637,7 @@ void EasyMainWindow::onCaptureClicked(bool) if (m_isConnected) { profiler::net::Message requestMessage(profiler::net::MESSAGE_TYPE_REQUEST_STOP_CAPTURE); - m_client->write((const char*)&requestMessage, sizeof(requestMessage)); + m_server->write((const char*)&requestMessage, sizeof(requestMessage)); }else { QMessageBox::warning(this,"Warning" ,"Application was disconnected",QMessageBox::Close); @@ -460,12 +645,24 @@ void EasyMainWindow::onCaptureClicked(bool) } } +void TcpReceiverThread::readTcpData() +{ + mainwindow->readTcpData(); +} + +void EasyMainWindow::handleResults(const QString &s) +{ + +} + void EasyMainWindow::readTcpData() { QTcpSocket* pClientSocket = (QTcpSocket*)sender(); - m_client = pClientSocket; + m_server = m_receiver->m_server; + //qInfo() << "Rec: " << m_server->bytesAvailable() << "bytes max" << m_server->readBufferSize(); static qint64 necessarySize = 0; static qint64 loadedSize = 0; + static auto timeBegin = std::chrono::system_clock::now(); while(m_server->bytesAvailable()) { //QByteArray data = m_server->readAll(); @@ -487,16 +684,15 @@ void EasyMainWindow::readTcpData() return; } else if (m_recFrames){ - static bool first = true; - if (first) - m_server->waitForBytesWritten(5000); - first = false; - m_receivedProfileData.write(data.data(),data.size()); - loadedSize += data.size(); + if (m_receivedProfileData.str().size() == necessarySize) { m_recFrames = false; - qInfo() << "recieve all"; + + auto timeEnd = std::chrono::system_clock::now(); + auto dT = std::chrono::duration_cast(timeEnd - timeBegin); + auto dTsec = std::chrono::duration_cast(timeEnd - timeBegin); + qInfo() << "recieve" << m_receivedProfileData.str().size() << dT.count() << "ms" << double(m_receivedProfileData.str().size())*1000.0 / double(dT.count()) / 1024.0 << "kBytes/sec"; m_recFrames = false; @@ -510,15 +706,25 @@ void EasyMainWindow::readTcpData() m_receivedProfileData.clear(); loadFile(QString(tempfilename.c_str())); m_recFrames = false; + + } + + + if (m_receivedProfileData.str().size() > necessarySize) { qInfo() << "recieve more than necessary d=" << m_receivedProfileData.str().size() - necessarySize; } //qInfo() << necessarySize << " " << loadedSize << m_receivedProfileData.str().size() << m_receivedProfileData.str().length(); - qInfo() << necessarySize << " " << loadedSize; + //qInfo() << necessarySize << " " << loadedSize << QThread::currentThreadId(); if (m_recFrames) + { + m_receivedProfileData.write(data.data(), data.size()); + loadedSize += data.size(); continue; + } + } switch (message->type) { @@ -555,7 +761,9 @@ void EasyMainWindow::readTcpData() loadedSize = 0; m_receivedProfileData.write(data.data()+sizeof(profiler::net::DataMessage),data.size() - sizeof(profiler::net::DataMessage)); loadedSize += data.size() - sizeof(profiler::net::DataMessage); - + //std::this_thread::sleep_for(std::chrono::seconds(2)); + + timeBegin = std::chrono::system_clock::now(); } break; default: @@ -568,9 +776,25 @@ void EasyMainWindow::readTcpData() } +void TcpReceiverThread::onConnected() +{ + qInfo() << "onConnected()"; + + profiler::net::Message requestMessage(profiler::net::MESSAGE_TYPE_REQUEST_START_CAPTURE); + m_server->write((const char*)&requestMessage, sizeof(requestMessage)); + + std::this_thread::sleep_for(std::chrono::seconds(2)); + + profiler::net::Message requestMessage2(profiler::net::MESSAGE_TYPE_REQUEST_STOP_CAPTURE); + m_server->write((const char*)&requestMessage2, sizeof(requestMessage)); + +} + + void EasyMainWindow::onConnected() { qInfo() << "onConnected()"; + m_isConnected = true; } void EasyMainWindow::onErrorConnection(QAbstractSocket::SocketError socketError) @@ -588,15 +812,15 @@ void EasyMainWindow::onNewConnection() { //m_client = m_server->nextPendingConnection(); - qInfo() << "New connection!" << m_client; + //qInfo() << "New connection!" << m_client; - connect(m_client, SIGNAL(disconnected()), this, SLOT(onDisconnection())) ; - connect(m_client, SIGNAL(readyRead()), this, SLOT(readTcpData()) ); + //connect(m_client, SIGNAL(disconnected()), this, SLOT(onDisconnection())) ; + //connect(m_client, SIGNAL(readyRead()), this, SLOT(readTcpData()) ); } void EasyMainWindow::onDisconnection() { - m_client = nullptr; + //m_client = nullptr; } ////////////////////////////////////////////////////////////////////////// diff --git a/profiler_gui/main_window.h b/profiler_gui/main_window.h index 9277a64..ae8c39b 100644 --- a/profiler_gui/main_window.h +++ b/profiler_gui/main_window.h @@ -37,6 +37,7 @@ #include #include #include +#include #include "profiler/reader.h" #include @@ -78,11 +79,28 @@ public: }; // END of class EasyFileReader. ////////////////////////////////////////////////////////////////////////// +class EasyMainWindow; +class TcpReceiverThread : public QThread +{ + Q_OBJECT + EasyMainWindow* mainwindow; +public: + QTcpSocket * m_server; + explicit TcpReceiverThread(QObject *parent, EasyMainWindow* mw); + + void run() Q_DECL_OVERRIDE; +public slots: + + void readTcpData(); + void onConnected(); +signals: + void resultReady(const QString &s); +}; class EasyMainWindow : public QMainWindow { Q_OBJECT - + friend class TcpReceiverThread; protected: typedef EasyMainWindow This; @@ -98,13 +116,17 @@ protected: EasyFileReader m_reader; QTcpSocket* m_server = nullptr; - QTcpSocket* m_client = nullptr; + std::stringstream m_receivedProfileData; bool m_recFrames = false; QLineEdit* m_hostString = nullptr; QLineEdit* m_portString = nullptr; bool m_isConnected = false; + + TcpReceiverThread* m_receiver; + + std::thread m_thread; public: EasyMainWindow(); @@ -114,6 +136,8 @@ public: void closeEvent(QCloseEvent* close_event) override; + void listen(); + protected slots: void onOpenFileClicked(bool); @@ -139,6 +163,7 @@ protected slots: void onDisconnect(); void onConnectClicked(bool); + void handleResults(const QString &s); private: // Private non-virtual methods diff --git a/sample/main.cpp b/sample/main.cpp index f16ff7c..0052582 100644 --- a/sample/main.cpp +++ b/sample/main.cpp @@ -252,7 +252,7 @@ int main(int argc, char* argv[]) std::thread modelling = std::thread(modellingThread); - for(int i=0; i < 3; i++){ + for(int i=0; i < 0; i++){ threads.emplace_back(std::thread(loadingResourcesThread)); threads.emplace_back(std::thread(renderThread)); threads.emplace_back(std::thread(modellingThread)); diff --git a/src/easy_socket.cpp b/src/easy_socket.cpp index af60637..c52976e 100644 --- a/src/easy_socket.cpp +++ b/src/easy_socket.cpp @@ -189,7 +189,7 @@ int EasySocket::connect() if (!m_socket || !result){ return -1; } - + /** SOCKET ConnectSocket = socket(result->ai_family, result->ai_socktype, result->ai_protocol); if (ConnectSocket == INVALID_SOCKET) { @@ -206,7 +206,7 @@ int EasySocket::connect() return -1; } m_socket = ConnectSocket; - /** + /**/ // Connect to server. auto iResult = ::connect(m_socket, result->ai_addr, (int)result->ai_addrlen); if (iResult == SOCKET_ERROR) { @@ -214,6 +214,7 @@ int EasySocket::connect() m_socket = INVALID_SOCKET; } /**/ + m_replySocket = m_socket; return iResult; } @@ -233,8 +234,11 @@ int EasySocket::accept() int send_buffer_sizeof = sizeof(int); setsockopt(m_replySocket, SOL_SOCKET, SO_SNDBUF, (char*)&send_buffer, send_buffer_sizeof); - int flag = 1; - int result = setsockopt(m_replySocket,IPPROTO_TCP,TCP_NODELAY,(char *)&flag,sizeof(int)); + //int flag = 1; + //int result = setsockopt(m_replySocket,IPPROTO_TCP,TCP_NODELAY,(char *)&flag,sizeof(int)); + + u_long iMode = 0;//0 - blocking, 1 - non blocking + ioctlsocket(m_replySocket, FIONBIO, &iMode); return (int)m_replySocket; } diff --git a/src/easy_socket.h b/src/easy_socket.h index 9f9f2f3..ca5d5f4 100644 --- a/src/easy_socket.h +++ b/src/easy_socket.h @@ -19,7 +19,7 @@ along with this program.If not, see . #define EASY________SOCKET_________H #include - +#include "profiler/profiler.h" #ifndef _WIN32 #include #include @@ -37,7 +37,7 @@ along with this program.If not, see . #include #endif -class EasySocket +class PROFILER_API EasySocket { #ifdef _WIN32 typedef SOCKET socket_t; diff --git a/src/profile_manager.cpp b/src/profile_manager.cpp index 344b30d..676279e 100644 --- a/src/profile_manager.cpp +++ b/src/profile_manager.cpp @@ -518,6 +518,9 @@ void ProfileManager::startListen() dumpBlocksToStream(os); dm.size = (uint32_t)os.stream().str().length(); + + //dm.size = 8192*4; + int packet_size = int(sizeof(dm)) + int(dm.size); char *sendbuf = new char[packet_size]; @@ -537,7 +540,7 @@ void ProfileManager::startListen() delete[] sendbuf; //std::this_thread::sleep_for(std::chrono::seconds(2)); replyMessage.type = profiler::net::MESSAGE_TYPE_REPLY_END_SEND_BLOCKS; - //bytes = socket.send(&replyMessage, sizeof(replyMessage)); + bytes = socket.send(&replyMessage, sizeof(replyMessage)); //hasConnect = bytes > 0; } break;