0
0
mirror of https://github.com/yse/easy_profiler.git synced 2024-12-26 16:11:02 +08:00

Receive by EasySocket

This commit is contained in:
Sergey Yagovtsev 2016-09-14 22:04:15 +03:00
parent 6d6ad61a18
commit 80892e636e
6 changed files with 286 additions and 30 deletions

View File

@ -122,15 +122,30 @@ EasyMainWindow::EasyMainWindow() : Parent(), m_treeWidget(nullptr), m_graphicsVi
m_server = new QTcpSocket( this );
m_server->setSocketOption(QAbstractSocket::LowDelayOption, 1);
/*m_server = new QTcpSocket( );
m_server->setSocketOption(QAbstractSocket::LowDelayOption, 0);
m_server->setReadBufferSize(16 * 1024);
connect(m_server, SIGNAL(readyRead()), SLOT(readTcpData()));
//connect(m_server, SIGNAL(readyRead()), SLOT(readTcpData()));
connect(m_server, SIGNAL(connected()), SLOT(onConnected()));
connect(m_server, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(onErrorConnection(QAbstractSocket::SocketError)), Qt::UniqueConnection);
connect(m_server, SIGNAL(disconnected()), SLOT(onDisconnect()), Qt::UniqueConnection);
m_receiver = new TcpReceiverThread(this, this);
connect(m_receiver, &TcpReceiverThread::resultReady, this, &This::handleResults);
connect(m_receiver, &TcpReceiverThread::finished, m_receiver, &QObject::deleteLater);
m_server->connectToHost(m_hostString->text(), m_portString->text().toUShort());
m_receiver->start();
*/
m_thread = std::thread(&This::listen, this);
//connect(m_server, &QAbstractSocket::readyRead, m_receiver, &TcpReceiverThread::readTcpData);
//m_receiver->run();
//m_server->connectToHost(m_hostString->text(), m_portString->text().toUShort());
//TODO:
//connected
//error
@ -271,11 +286,181 @@ EasyMainWindow::EasyMainWindow() : Parent(), m_treeWidget(nullptr), m_graphicsVi
auto opened_filename = QCoreApplication::arguments().at(1);
loadFile(opened_filename);
}
}
#include "../src/easy_socket.h"
#undef max
void EasyMainWindow::listen()
{
EasySocket socket;
profiler::net::Message request(profiler::net::MESSAGE_TYPE_REQUEST_START_CAPTURE);
socket.setAddress("127.0.0.1",28077);
socket.connect();
//std::this_thread::sleep_for(std::chrono::seconds(2));
char* buffer = new char[8*2014];
int seek = 0;
bool isReceiveData = false;
int unreadedBytes = 0;
static auto timeBegin = std::chrono::system_clock::now();
while (true)
{
auto bytes = seek;
if (seek==0)
bytes = socket.receive(buffer, 8 * 2014);
char *buf = &buffer[seek];
if (seek==0)
unreadedBytes = bytes;
int readedBytes = 0;
if (bytes > 0)
{
profiler::net::Message* message = (profiler::net::Message*)buf;
if (!message->isEasyNetMessage()){
if (isReceiveData)
{
m_receivedProfileData.write(buf, unreadedBytes);
seek = 0;
continue;
}else
continue;
}
switch (message->type) {
case profiler::net::MESSAGE_TYPE_ACCEPTED_CONNECTION:
{
qInfo() << "Receive MESSAGE_TYPE_ACCEPTED_CONNECTION";
socket.send(&request, sizeof(request));
readedBytes = sizeof(profiler::net::Message);
unreadedBytes -= readedBytes;
}
break;
case profiler::net::MESSAGE_TYPE_REPLY_START_CAPTURING:
{
qInfo() << "Receive MESSAGE_TYPE_REPLY_START_CAPTURING";
request.type = profiler::net::MESSAGE_TYPE_REQUEST_STOP_CAPTURE;
std::this_thread::sleep_for(std::chrono::seconds(2));
socket.send(&request, sizeof(request));
readedBytes = sizeof(profiler::net::Message);
unreadedBytes -= readedBytes;
}
break;
case profiler::net::MESSAGE_TYPE_REPLY_PREPARE_BLOCKS:
{
qInfo() << "Receive MESSAGE_TYPE_REPLY_PREPARE_BLOCKS";
m_isClientPreparedBlocks = true;
readedBytes = sizeof(profiler::net::Message);
unreadedBytes -= readedBytes;
}
break;
case profiler::net::MESSAGE_TYPE_REPLY_END_SEND_BLOCKS:
{
qInfo() << "Receive MESSAGE_TYPE_REPLY_END_SEND_BLOCKS";
readedBytes = sizeof(profiler::net::Message);
unreadedBytes -= readedBytes;
isReceiveData = false;
auto timeEnd = std::chrono::system_clock::now();
auto dT = std::chrono::duration_cast<std::chrono::milliseconds>(timeEnd - timeBegin);
auto dTsec = std::chrono::duration_cast<std::chrono::seconds>(timeEnd - timeBegin);
qInfo() << "recieve" << m_receivedProfileData.str().size() << dT.count() << "ms" << double(m_receivedProfileData.str().size())*1000.0 / double(dT.count()) / 1024.0 << "kBytes/sec";
m_recFrames = false;
qInfo() << "Write FILE";
std::string tempfilename = "test_rec.prof";
std::ofstream of(tempfilename, std::fstream::binary);
of << m_receivedProfileData.str();
of.close();
m_receivedProfileData.str(std::string());
m_receivedProfileData.clear();
loadFile(QString(tempfilename.c_str()));
m_recFrames = false;
}
break;
case profiler::net::MESSAGE_TYPE_REPLY_BLOCKS:
{
qInfo() << "Receive MESSAGE_TYPE_REPLY_BLOCKS";
readedBytes = sizeof(profiler::net::DataMessage);
unreadedBytes -= readedBytes;
isReceiveData = true;
timeBegin = std::chrono::system_clock::now();
} break;
default:
//qInfo() << "Receive unknown " << message->type;
break;
}
if (unreadedBytes)
seek += readedBytes;
else
seek = 0;
}
}
}
void TcpReceiverThread::run()
{
QString result;
//mainwindow->m_server = new QTcpSocket(this);
m_server = new QTcpSocket();
//mainwindow->m_server = m_server;
//auto m_server = mainwindow->m_server;
m_server->setSocketOption(QAbstractSocket::LowDelayOption, 0);
m_server->setReadBufferSize(16 * 1024);
m_server->connectToHost("127.0.0.1", 28077);
//connect(m_server, SIGNAL(readyRead()), SLOT(readTcpData()));
connect(m_server, &QAbstractSocket::connected, this, &TcpReceiverThread::onConnected);
//connect(m_server, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(onErrorConnection(QAbstractSocket::SocketError)), Qt::UniqueConnection);
connect(m_server, &QAbstractSocket::disconnected, mainwindow, &EasyMainWindow::onDisconnect);
connect(m_server, &QAbstractSocket::readyRead, this, &TcpReceiverThread::readTcpData);
/**/
exec();
/* ... here is the expensive or blocking operation ... */
emit resultReady(result);
}
TcpReceiverThread::TcpReceiverThread(QObject *parent, EasyMainWindow* mw) :QThread(parent), mainwindow(mw)
{
}
EasyMainWindow::~EasyMainWindow()
{
delete m_progress;
m_receiver->quit();
m_receiver->wait();
}
//////////////////////////////////////////////////////////////////////////
@ -398,7 +583,7 @@ void EasyMainWindow::onCaptureClicked(bool)
QMessageBox::warning(this, "Warning", "No connection with profiling app", QMessageBox::Close);
return;
}
m_server = m_receiver->m_server;
profiler::net::Message requestMessage(profiler::net::MESSAGE_TYPE_REQUEST_START_CAPTURE);
m_server->write((const char*)&requestMessage, sizeof(requestMessage));
@ -452,7 +637,7 @@ void EasyMainWindow::onCaptureClicked(bool)
if (m_isConnected)
{
profiler::net::Message requestMessage(profiler::net::MESSAGE_TYPE_REQUEST_STOP_CAPTURE);
m_client->write((const char*)&requestMessage, sizeof(requestMessage));
m_server->write((const char*)&requestMessage, sizeof(requestMessage));
}else
{
QMessageBox::warning(this,"Warning" ,"Application was disconnected",QMessageBox::Close);
@ -460,12 +645,24 @@ void EasyMainWindow::onCaptureClicked(bool)
}
}
void TcpReceiverThread::readTcpData()
{
mainwindow->readTcpData();
}
void EasyMainWindow::handleResults(const QString &s)
{
}
void EasyMainWindow::readTcpData()
{
QTcpSocket* pClientSocket = (QTcpSocket*)sender();
m_client = pClientSocket;
m_server = m_receiver->m_server;
//qInfo() << "Rec: " << m_server->bytesAvailable() << "bytes max" << m_server->readBufferSize();
static qint64 necessarySize = 0;
static qint64 loadedSize = 0;
static auto timeBegin = std::chrono::system_clock::now();
while(m_server->bytesAvailable())
{
//QByteArray data = m_server->readAll();
@ -487,16 +684,15 @@ void EasyMainWindow::readTcpData()
return;
}
else if (m_recFrames){
static bool first = true;
if (first)
m_server->waitForBytesWritten(5000);
first = false;
m_receivedProfileData.write(data.data(),data.size());
loadedSize += data.size();
if (m_receivedProfileData.str().size() == necessarySize)
{
m_recFrames = false;
qInfo() << "recieve all";
auto timeEnd = std::chrono::system_clock::now();
auto dT = std::chrono::duration_cast<std::chrono::milliseconds>(timeEnd - timeBegin);
auto dTsec = std::chrono::duration_cast<std::chrono::seconds>(timeEnd - timeBegin);
qInfo() << "recieve" << m_receivedProfileData.str().size() << dT.count() << "ms" << double(m_receivedProfileData.str().size())*1000.0 / double(dT.count()) / 1024.0 << "kBytes/sec";
m_recFrames = false;
@ -510,15 +706,25 @@ void EasyMainWindow::readTcpData()
m_receivedProfileData.clear();
loadFile(QString(tempfilename.c_str()));
m_recFrames = false;
}
if (m_receivedProfileData.str().size() > necessarySize)
{
qInfo() << "recieve more than necessary d=" << m_receivedProfileData.str().size() - necessarySize;
}
//qInfo() << necessarySize << " " << loadedSize << m_receivedProfileData.str().size() << m_receivedProfileData.str().length();
qInfo() << necessarySize << " " << loadedSize;
//qInfo() << necessarySize << " " << loadedSize << QThread::currentThreadId();
if (m_recFrames)
{
m_receivedProfileData.write(data.data(), data.size());
loadedSize += data.size();
continue;
}
}
switch (message->type) {
@ -555,7 +761,9 @@ void EasyMainWindow::readTcpData()
loadedSize = 0;
m_receivedProfileData.write(data.data()+sizeof(profiler::net::DataMessage),data.size() - sizeof(profiler::net::DataMessage));
loadedSize += data.size() - sizeof(profiler::net::DataMessage);
//std::this_thread::sleep_for(std::chrono::seconds(2));
timeBegin = std::chrono::system_clock::now();
} break;
default:
@ -568,9 +776,25 @@ void EasyMainWindow::readTcpData()
}
void TcpReceiverThread::onConnected()
{
qInfo() << "onConnected()";
profiler::net::Message requestMessage(profiler::net::MESSAGE_TYPE_REQUEST_START_CAPTURE);
m_server->write((const char*)&requestMessage, sizeof(requestMessage));
std::this_thread::sleep_for(std::chrono::seconds(2));
profiler::net::Message requestMessage2(profiler::net::MESSAGE_TYPE_REQUEST_STOP_CAPTURE);
m_server->write((const char*)&requestMessage2, sizeof(requestMessage));
}
void EasyMainWindow::onConnected()
{
qInfo() << "onConnected()";
m_isConnected = true;
}
void EasyMainWindow::onErrorConnection(QAbstractSocket::SocketError socketError)
@ -588,15 +812,15 @@ void EasyMainWindow::onNewConnection()
{
//m_client = m_server->nextPendingConnection();
qInfo() << "New connection!" << m_client;
//qInfo() << "New connection!" << m_client;
connect(m_client, SIGNAL(disconnected()), this, SLOT(onDisconnection())) ;
connect(m_client, SIGNAL(readyRead()), this, SLOT(readTcpData()) );
//connect(m_client, SIGNAL(disconnected()), this, SLOT(onDisconnection())) ;
//connect(m_client, SIGNAL(readyRead()), this, SLOT(readTcpData()) );
}
void EasyMainWindow::onDisconnection()
{
m_client = nullptr;
//m_client = nullptr;
}
//////////////////////////////////////////////////////////////////////////

View File

@ -37,6 +37,7 @@
#include <QTimer>
#include <QTcpServer>
#include <QTcpSocket>
#include <QThread>
#include "profiler/reader.h"
#include <sstream>
@ -78,11 +79,28 @@ public:
}; // END of class EasyFileReader.
//////////////////////////////////////////////////////////////////////////
class EasyMainWindow;
class TcpReceiverThread : public QThread
{
Q_OBJECT
EasyMainWindow* mainwindow;
public:
QTcpSocket * m_server;
explicit TcpReceiverThread(QObject *parent, EasyMainWindow* mw);
void run() Q_DECL_OVERRIDE;
public slots:
void readTcpData();
void onConnected();
signals:
void resultReady(const QString &s);
};
class EasyMainWindow : public QMainWindow
{
Q_OBJECT
friend class TcpReceiverThread;
protected:
typedef EasyMainWindow This;
@ -98,13 +116,17 @@ protected:
EasyFileReader m_reader;
QTcpSocket* m_server = nullptr;
QTcpSocket* m_client = nullptr;
std::stringstream m_receivedProfileData;
bool m_recFrames = false;
QLineEdit* m_hostString = nullptr;
QLineEdit* m_portString = nullptr;
bool m_isConnected = false;
TcpReceiverThread* m_receiver;
std::thread m_thread;
public:
EasyMainWindow();
@ -114,6 +136,8 @@ public:
void closeEvent(QCloseEvent* close_event) override;
void listen();
protected slots:
void onOpenFileClicked(bool);
@ -139,6 +163,7 @@ protected slots:
void onDisconnect();
void onConnectClicked(bool);
void handleResults(const QString &s);
private:
// Private non-virtual methods

View File

@ -252,7 +252,7 @@ int main(int argc, char* argv[])
std::thread modelling = std::thread(modellingThread);
for(int i=0; i < 3; i++){
for(int i=0; i < 0; i++){
threads.emplace_back(std::thread(loadingResourcesThread));
threads.emplace_back(std::thread(renderThread));
threads.emplace_back(std::thread(modellingThread));

View File

@ -189,7 +189,7 @@ int EasySocket::connect()
if (!m_socket || !result){
return -1;
}
/**
SOCKET ConnectSocket = socket(result->ai_family, result->ai_socktype,
result->ai_protocol);
if (ConnectSocket == INVALID_SOCKET) {
@ -206,7 +206,7 @@ int EasySocket::connect()
return -1;
}
m_socket = ConnectSocket;
/**
/**/
// Connect to server.
auto iResult = ::connect(m_socket, result->ai_addr, (int)result->ai_addrlen);
if (iResult == SOCKET_ERROR) {
@ -214,6 +214,7 @@ int EasySocket::connect()
m_socket = INVALID_SOCKET;
}
/**/
m_replySocket = m_socket;
return iResult;
}
@ -233,8 +234,11 @@ int EasySocket::accept()
int send_buffer_sizeof = sizeof(int);
setsockopt(m_replySocket, SOL_SOCKET, SO_SNDBUF, (char*)&send_buffer, send_buffer_sizeof);
int flag = 1;
int result = setsockopt(m_replySocket,IPPROTO_TCP,TCP_NODELAY,(char *)&flag,sizeof(int));
//int flag = 1;
//int result = setsockopt(m_replySocket,IPPROTO_TCP,TCP_NODELAY,(char *)&flag,sizeof(int));
u_long iMode = 0;//0 - blocking, 1 - non blocking
ioctlsocket(m_replySocket, FIONBIO, &iMode);
return (int)m_replySocket;
}

View File

@ -19,7 +19,7 @@ along with this program.If not, see <http://www.gnu.org/licenses/>.
#define EASY________SOCKET_________H
#include <stdint.h>
#include "profiler/profiler.h"
#ifndef _WIN32
#include <sys/types.h>
#include <sys/socket.h>
@ -37,7 +37,7 @@ along with this program.If not, see <http://www.gnu.org/licenses/>.
#include <stdlib.h>
#endif
class EasySocket
class PROFILER_API EasySocket
{
#ifdef _WIN32
typedef SOCKET socket_t;

View File

@ -518,6 +518,9 @@ void ProfileManager::startListen()
dumpBlocksToStream(os);
dm.size = (uint32_t)os.stream().str().length();
//dm.size = 8192*4;
int packet_size = int(sizeof(dm)) + int(dm.size);
char *sendbuf = new char[packet_size];
@ -537,7 +540,7 @@ void ProfileManager::startListen()
delete[] sendbuf;
//std::this_thread::sleep_for(std::chrono::seconds(2));
replyMessage.type = profiler::net::MESSAGE_TYPE_REPLY_END_SEND_BLOCKS;
//bytes = socket.send(&replyMessage, sizeof(replyMessage));
bytes = socket.send(&replyMessage, sizeof(replyMessage));
//hasConnect = bytes > 0;
}
break;