From ae28a458b7350b122cd7ae6f7c56198d48f4f9bd Mon Sep 17 00:00:00 2001 From: tqcq <99722391+tqcq@users.noreply.github.com> Date: Tue, 9 Sep 2025 19:15:40 +0800 Subject: [PATCH] feat: response +PONG\r\n --- CMakeLists.txt | 1 + src/main.cc | 16 +++++++++++++++- src/session.cc | 38 ++++++++++++++++++++++++++++++++++++++ src/session.h | 24 ++++++++++++++++++++++++ 4 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 src/session.cc create mode 100644 src/session.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 326b41f..4d05f17 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -41,6 +41,7 @@ target_link_libraries(server PRIVATE Boost::format Boost::log Boost::log_setup + Boost::signals2 Boost::property_tree Boost::serialization Boost::filesystem diff --git a/src/main.cc b/src/main.cc index f126dba..f19c031 100644 --- a/src/main.cc +++ b/src/main.cc @@ -7,6 +7,8 @@ #include "boost/foreach.hpp" #include "boost/range.hpp" #include "boost/thread.hpp" +#include "session.h" +#include using namespace boost; @@ -50,7 +52,17 @@ private: if (!ec) { auto remote_ep = new_socket.remote_endpoint(); LOGI("Connected to Server, From {}:{}", remote_ep.address().to_string(), remote_ep.port()); - _io_context.stop(); + auto session = std::make_shared(std::move(new_socket)); + auto weak_session = std::weak_ptr(session); + session->OnClose.connect([this, weak_session] { + auto s = weak_session.lock(); + if (s) { + _sessions.erase(s); + _io_context.stop(); + } + }); + _sessions.insert(session); + session->startSession(); } else { LOGE("Accept failed. reason={}", ec.message()); } @@ -66,6 +78,8 @@ private: thread_group _io_threads; // for accept asio::ip::tcp::acceptor _acceptor; + + std::set> _sessions; }; int diff --git a/src/session.cc b/src/session.cc new file mode 100644 index 0000000..ea48e04 --- /dev/null +++ b/src/session.cc @@ -0,0 +1,38 @@ +#include "session.h" +#include "base/log.h" +#include "boost/system/detail/error_code.hpp" + +namespace redis { +RedisSession::RedisSession(asio::ip::tcp::socket &&socket) : _socket(std::forward(socket)) {} + +void +RedisSession::startSession() +{ + startReceive(); +} + +RedisSession::~RedisSession() +{ + LOGI("~RedisSession {}:{} ", _socket.remote_endpoint().address().to_string(), _socket.remote_endpoint().port()); +} + +void +RedisSession::startReceive() +{ + auto weak_self = std::weak_ptr(shared_from_this()); + _socket.async_receive( + asio::buffer(_recv_buf), [this, weak_self](const system::error_code &ec, size_t received) mutable { + auto self = weak_self.lock(); + if (!self) { return; } + if (!ec) { + auto sent = _socket.send(asio::buffer("+PONG\r\n")); + LOGI("sent to {}:{} 7 bytes", _socket.remote_endpoint().address().to_string(), + _socket.remote_endpoint().port()); + OnClose(); + } else { + startReceive(); + } + }); +} + +}// namespace redis diff --git a/src/session.h b/src/session.h new file mode 100644 index 0000000..9d291c2 --- /dev/null +++ b/src/session.h @@ -0,0 +1,24 @@ +#pragma once + +#include "boost/asio.hpp" +#include "boost/intrusive_ptr.hpp" +#include "boost/signals2.hpp" + +using namespace boost; + +namespace redis { +class RedisSession : public std::enable_shared_from_this { +public: + signals2::signal OnClose; + RedisSession(asio::ip::tcp::socket &&socket); + ~RedisSession(); + void startSession(); + +private: + void startReceive(); + +private: + asio::ip::tcp::socket _socket; + std::array _recv_buf; +}; +}// namespace redis