0
0
mirror of https://github.com/yse/easy_profiler.git synced 2025-01-14 00:27:55 +08:00

Correct receiving

This commit is contained in:
Sergey Yagovtsev 2016-09-16 02:11:11 +03:00
parent f4676ced6f
commit 52a9862963
4 changed files with 123 additions and 64 deletions

View File

@ -68,7 +68,7 @@
#include <chrono>
#include <fstream>
#include "profiler/easy_socket.h"
#undef max
//////////////////////////////////////////////////////////////////////////
@ -122,6 +122,7 @@ EasyMainWindow::EasyMainWindow() : Parent(), m_treeWidget(nullptr), m_graphicsVi
m_hostString->setInputMask("000.000.000.000;");
m_hostString->setValidator(&regValidator);
m_hostString->setText("127.0.0.1");
m_hostString->setText("192.224.4.109");
fileToolBar->addWidget(m_hostString);
@ -150,7 +151,7 @@ EasyMainWindow::EasyMainWindow() : Parent(), m_treeWidget(nullptr), m_graphicsVi
m_receiver->start();
*/
m_thread = std::thread(&This::listen, this);
//m_thread = std::thread(&This::listen, this);
//connect(m_server, &QAbstractSocket::readyRead, m_receiver, &TcpReceiverThread::readTcpData);
@ -302,7 +303,8 @@ EasyMainWindow::EasyMainWindow() : Parent(), m_treeWidget(nullptr), m_graphicsVi
connect(graphicsView->view(), &EasyGraphicsView::intervalChanged, treeWidget, &EasyTreeWidget::setTreeBlocks);
connect(&m_readerTimer, &QTimer::timeout, this, &This::onFileReaderTimeout);
connect(&m_downloadedTimer, &QTimer::timeout, this, &This::onDownloadTimeout);
m_progress = new QProgressDialog("Loading file...", "Cancel", 0, 100, this);
m_progress->setFixedWidth(300);
@ -324,41 +326,29 @@ EasyMainWindow::EasyMainWindow() : Parent(), m_treeWidget(nullptr), m_graphicsVi
void EasyMainWindow::listen()
{
EasySocket socket;
profiler::net::Message request(profiler::net::MESSAGE_TYPE_REQUEST_START_CAPTURE);
socket.setAddress("127.0.0.1",28077);
socket.connect();
//std::this_thread::sleep_for(std::chrono::seconds(2));
char* buffer = new char[8*2014];
const int buffer_size = 8 * 1024 * 1024;
char* buffer = new char[buffer_size];
int seek = 0;
bool isReceiveData = false;
int unreadedBytes = 0;
int bytes = 0;
static auto timeBegin = std::chrono::system_clock::now();
while (true)
{
auto bytes = seek;
if (seek==0)
bytes = socket.receive(buffer, 8 * 2014);
if ((bytes - seek) == 0){
bytes = m_easySocket.receive(buffer, buffer_size);
seek = 0;
}
char *buf = &buffer[seek];
if (seek==0)
unreadedBytes = bytes;
int readedBytes = 0;
if (bytes > 0)
{
profiler::net::Message* message = (profiler::net::Message*)buf;
if (!message->isEasyNetMessage()){
if (isReceiveData)
{
m_receivedProfileData.write(buf, unreadedBytes);
seek = 0;
continue;
}else
continue;
}
@ -368,22 +358,16 @@ void EasyMainWindow::listen()
{
qInfo() << "Receive MESSAGE_TYPE_ACCEPTED_CONNECTION";
socket.send(&request, sizeof(request));
//m_easySocket.send(&request, sizeof(request));
seek += sizeof(profiler::net::Message);
readedBytes = sizeof(profiler::net::Message);
unreadedBytes -= readedBytes;
}
break;
case profiler::net::MESSAGE_TYPE_REPLY_START_CAPTURING:
{
qInfo() << "Receive MESSAGE_TYPE_REPLY_START_CAPTURING";
request.type = profiler::net::MESSAGE_TYPE_REQUEST_STOP_CAPTURE;
std::this_thread::sleep_for(std::chrono::seconds(2));
socket.send(&request, sizeof(request));
readedBytes = sizeof(profiler::net::Message);
unreadedBytes -= readedBytes;
seek += sizeof(profiler::net::Message);
}
break;
case profiler::net::MESSAGE_TYPE_REPLY_PREPARE_BLOCKS:
@ -391,18 +375,13 @@ void EasyMainWindow::listen()
qInfo() << "Receive MESSAGE_TYPE_REPLY_PREPARE_BLOCKS";
m_isClientPreparedBlocks = true;
readedBytes = sizeof(profiler::net::Message);
unreadedBytes -= readedBytes;
seek += sizeof(profiler::net::Message);
}
break;
case profiler::net::MESSAGE_TYPE_REPLY_END_SEND_BLOCKS:
{
qInfo() << "Receive MESSAGE_TYPE_REPLY_END_SEND_BLOCKS";
readedBytes = sizeof(profiler::net::Message);
unreadedBytes -= readedBytes;
isReceiveData = false;
seek += sizeof(profiler::net::Message);
auto timeEnd = std::chrono::system_clock::now();
auto dT = std::chrono::duration_cast<std::chrono::milliseconds>(timeEnd - timeBegin);
@ -419,20 +398,45 @@ void EasyMainWindow::listen()
m_receivedProfileData.str(std::string());
m_receivedProfileData.clear();
loadFile(QString(tempfilename.c_str()));
//loadFile(QString(tempfilename.c_str()));
m_recFrames = false;
return;
}
break;
case profiler::net::MESSAGE_TYPE_REPLY_BLOCKS:
{
qInfo() << "Receive MESSAGE_TYPE_REPLY_BLOCKS";
readedBytes = sizeof(profiler::net::DataMessage);
unreadedBytes -= readedBytes;
isReceiveData = true;
seek += sizeof(profiler::net::DataMessage);
profiler::net::DataMessage* dm = (profiler::net::DataMessage*)message;
timeBegin = std::chrono::system_clock::now();
int neededSize = dm->size;
buf = &buffer[seek];
m_receivedProfileData.write(buf, bytes - seek);
neededSize -= bytes - seek;
seek = 0;
bytes = 0;
int loaded = 0;
while (neededSize > 0)
{
bytes = m_easySocket.receive(buffer, buffer_size);
buf = &buffer[0];
int toWrite = std::min(bytes, neededSize);
m_receivedProfileData.write(buf, toWrite);
neededSize -= toWrite;
loaded += toWrite;
seek = toWrite;
m_downloadedBytes.store((loaded / (neededSize+1)) * 100);
}
int k = 0;
int z = k + 1;
} break;
default:
@ -441,10 +445,6 @@ void EasyMainWindow::listen()
}
if (unreadedBytes)
seek += readedBytes;
else
seek = 0;
}
}
}
@ -491,8 +491,8 @@ TcpReceiverThread::TcpReceiverThread(QObject *parent, EasyMainWindow* mw) :QThre
EasyMainWindow::~EasyMainWindow()
{
delete m_progress;
m_receiver->quit();
m_receiver->wait();
if (m_thread.joinable())
m_thread.join();
}
//////////////////////////////////////////////////////////////////////////
@ -624,11 +624,21 @@ void EasyMainWindow::onCollapseAllClicked(bool)
void EasyMainWindow::onConnectClicked(bool)
{
if (!m_isConnected)
int res = m_easySocket.setAddress(m_hostString->text().toStdString().c_str(), m_portString->text().toUShort());
res = m_easySocket.connect();
if (res == -1)
{
QMessageBox::warning(this, "Warning", "Cannot connect with application", QMessageBox::Close);
return;
}
qInfo() << "Connect with application successful";
m_isConnected = true;
/*if (!m_isConnected)
{
qInfo() << "Try connect to: " << m_hostString->text() << ":" << m_portString->text();
m_server->connectToHost(m_hostString->text(), m_portString->text().toUShort());
}
}*/
}
void EasyMainWindow::onCaptureClicked(bool)
@ -636,16 +646,28 @@ void EasyMainWindow::onCaptureClicked(bool)
if (!m_isConnected)
{
m_server->connectToHost(m_hostString->text(), m_portString->text().toUShort());
//m_server->connectToHost(m_hostString->text(), m_portString->text().toUShort());
QMessageBox::warning(this, "Warning", "No connection with profiling app", QMessageBox::Close);
return;
}
profiler::net::Message request(profiler::net::MESSAGE_TYPE_REQUEST_START_CAPTURE);
m_easySocket.send(&request, sizeof(request));
m_downloadingProgress = new QProgressDialog("Loading file...", "Cancel", 0, 100, this);
m_downloadingProgress->setFixedWidth(300);
m_downloadingProgress->setWindowTitle("EasyProfiler");
m_downloadingProgress->setModal(true);
m_downloadingProgress->setValue(100);
m_thread = std::thread(&This::listen, this);
/**
m_server = m_receiver->m_server;
profiler::net::Message requestMessage(profiler::net::MESSAGE_TYPE_REQUEST_START_CAPTURE);
m_server->write((const char*)&requestMessage, sizeof(requestMessage));
/**
QDialog *dialog = new QDialog();
dialog->setWindowTitle(tr("Set network parameters"));
QFormLayout *layout = new QFormLayout;
@ -689,7 +711,20 @@ void EasyMainWindow::onCaptureClicked(bool)
QMessageBox::information(this,"Capturing frames..." ,"Close this window to stop capturing.",QMessageBox::Close);
m_isConnected = (m_server->state() == QTcpSocket::ConnectedState);
m_downloading = true;
request.type = profiler::net::MESSAGE_TYPE_REQUEST_STOP_CAPTURE;
m_easySocket.send(&request, sizeof(request));
m_downloadedTimer.start(LOADER_TIMER_INTERVAL);
m_downloadingProgress->show();
m_thread.join();
m_downloading = false;
std::string tempfilename = "test_rec.prof";
loadFile(QString(tempfilename.c_str()));
/*m_isConnected = (m_server->state() == QTcpSocket::ConnectedState);
if (m_isConnected)
{
@ -700,6 +735,7 @@ void EasyMainWindow::onCaptureClicked(bool)
QMessageBox::warning(this,"Warning" ,"Application was disconnected",QMessageBox::Close);
return;
}
*/
}
void TcpReceiverThread::readTcpData()
@ -977,6 +1013,19 @@ void EasyMainWindow::saveSettingsAndGeometry()
//////////////////////////////////////////////////////////////////////////
void EasyMainWindow::onDownloadTimeout()
{
if (!m_downloading){
m_downloadingProgress->setValue(100);
//m_downloadingProgress->hide();
m_downloadedTimer.stop();
}
else{
m_downloadingProgress->setValue(m_downloadedBytes.load());
}
}
void EasyMainWindow::onFileReaderTimeout()
{
if (m_reader.done())

View File

@ -38,6 +38,9 @@
#include <QTcpServer>
#include <QTcpSocket>
#include <QThread>
#include "profiler/easy_socket.h"
#undef max
#undef min
#include "profiler/reader.h"
#include <sstream>
@ -100,7 +103,7 @@ signals:
class EasyMainWindow : public QMainWindow
{
Q_OBJECT
friend class TcpReceiverThread;
friend class TcpReceiverThread;
protected:
typedef EasyMainWindow This;
@ -110,7 +113,10 @@ protected:
QDockWidget* m_treeWidget;
QDockWidget* m_graphicsView;
class QProgressDialog* m_progress;
class QProgressDialog* m_downloadingProgress;
QTimer m_readerTimer;
QTimer m_downloadedTimer;
::profiler::SerializedData m_serializedBlocks;
::profiler::SerializedData m_serializedDescriptors;
EasyFileReader m_reader;
@ -127,6 +133,11 @@ protected:
TcpReceiverThread* m_receiver;
std::thread m_thread;
EasySocket m_easySocket;
bool m_downloading = false;
::std::atomic<int> m_downloadedBytes;
public:
EasyMainWindow();
@ -153,6 +164,7 @@ protected slots:
void onExpandAllClicked(bool);
void onCollapseAllClicked(bool);
void onFileReaderTimeout();
void onDownloadTimeout();
void onFileReaderCancel();
void onCaptureClicked(bool);

View File

@ -177,7 +177,6 @@ bool EasySocket::setAddress(const char *serv, uint16_t portno)
_itoa(portno, buffer, 10);
iResult = getaddrinfo(serv, buffer, &hints, &result);
if (iResult != 0) {
WSACleanup();
return false;
}
@ -210,8 +209,7 @@ int EasySocket::connect()
// Connect to server.
auto iResult = ::connect(m_socket, result->ai_addr, (int)result->ai_addrlen);
if (iResult == SOCKET_ERROR) {
closesocket(m_socket);
m_socket = INVALID_SOCKET;
return iResult;
}
/**/
m_replySocket = m_socket;

View File

@ -383,7 +383,7 @@ uint32_t ProfileManager::dumpBlocksToStream(profiler::OStream& _outputStream)
::profiler::setEnabled(false);
//TODO remove it
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::this_thread::sleep_for(std::chrono::milliseconds(20));
// This is to make sure that no new descriptors or new threads will be
// added until we finish sending data.
guard_lock_t lock1(m_storedSpin);