feat update rpc_core

This commit is contained in:
tqcq
2024-04-08 22:29:06 +08:00
parent b412e7e11c
commit a3ba7c6045
19 changed files with 221 additions and 153 deletions

View File

@ -43,10 +43,18 @@ struct default_connection : connection {
* Loopback connection for testing
*/
struct loopback_connection : public connection {
loopback_connection() {
send_package_impl = [this](std::string payload) {
on_recv_package(std::move(payload));
static std::pair<std::shared_ptr<connection>, std::shared_ptr<connection>> create() {
auto c1 = std::make_shared<connection>();
auto c1_weak = std::weak_ptr<connection>(c1);
auto c2 = std::make_shared<connection>();
auto c2_weak = std::weak_ptr<connection>(c2);
c1->send_package_impl = [c2_weak](std::string package) {
c2_weak.lock()->on_recv_package(std::move(package));
};
c2->send_package_impl = [c1_weak](std::string package) {
c1_weak.lock()->on_recv_package(std::move(package));
};
return std::make_pair(c1, c2);
}
};

View File

@ -12,7 +12,7 @@
namespace rpc_core {
namespace detail {
class msg_dispatcher : noncopyable {
class msg_dispatcher : public std::enable_shared_from_this<msg_dispatcher>, noncopyable {
public:
using cmd_handle = std::function<std::pair<bool, msg_wrapper>(msg_wrapper)>;
using rsp_handle = std::function<bool(msg_wrapper)>;
@ -21,17 +21,20 @@ class msg_dispatcher : noncopyable {
using timer_impl = std::function<void(uint32_t ms, timeout_cb)>;
public:
explicit msg_dispatcher(std::shared_ptr<connection> conn) : conn_(std::move(conn)) {
auto alive = std::weak_ptr<void>(is_alive_);
conn_->on_recv_package = ([this, RPC_CORE_MOVE_LAMBDA(alive)](const std::string& payload) {
if (alive.expired()) {
explicit msg_dispatcher(std::shared_ptr<connection> conn) : conn_(std::move(conn)) {}
void init() {
auto self = std::weak_ptr<msg_dispatcher>(shared_from_this());
conn_->on_recv_package = ([RPC_CORE_MOVE_LAMBDA(self)](const std::string& payload) {
auto self_lock = self.lock();
if (!self_lock) {
RPC_CORE_LOGD("msg_dispatcher expired");
return;
}
bool success;
auto msg = coder::deserialize(payload, success);
if (success) {
this->dispatch(std::move(msg));
self_lock->dispatch(std::move(msg));
} else {
RPC_CORE_LOGE("payload deserialize error");
}
@ -70,21 +73,18 @@ class msg_dispatcher : noncopyable {
}
const auto& fn = it->second;
const bool need_rsp = msg.type & msg_wrapper::need_rsp;
auto resp = fn(msg);
auto resp = fn(std::move(msg));
if (need_rsp && resp.first) {
RPC_CORE_LOGD("=> seq:%u type:rsp", msg.seq);
RPC_CORE_LOGD("=> seq:%u type:rsp", resp.second.seq);
conn_->send_package_impl(coder::serialize(resp.second));
}
} break;
case msg_wrapper::response: {
// pong or response
const bool isPong = msg.type & msg_wrapper::pong;
const auto handleMap = isPong ? &pong_handle_map_ : &rsp_handle_map_;
RPC_CORE_LOGD("<= seq:%u type:%s", msg.seq, (msg.type & detail::msg_wrapper::msg_type::pong) ? "pong" : "rsp");
auto it = handleMap->find(msg.seq);
if (it == handleMap->cend()) {
auto it = rsp_handle_map_.find(msg.seq);
if (it == rsp_handle_map_.cend()) {
RPC_CORE_LOGD("no rsp for seq:%u", msg.seq);
break;
}
@ -94,11 +94,11 @@ class msg_dispatcher : noncopyable {
return;
}
if (cb(std::move(msg))) {
handleMap->erase(it);
RPC_CORE_LOGV("handleMap->size=%zu", handleMap->size());
RPC_CORE_LOGV("rsp_handle_map_.size=%zu", rsp_handle_map_.size());
} else {
RPC_CORE_LOGE("may deserialize error");
}
rsp_handle_map_.erase(it);
} break;
default:
@ -122,31 +122,30 @@ class msg_dispatcher : noncopyable {
}
}
void subscribe_rsp(seq_type seq, rsp_handle handle, RPC_CORE_MOVE_PARAM(timeout_cb) timeout_cb, uint32_t timeout_ms, bool is_ping) {
void subscribe_rsp(seq_type seq, rsp_handle handle, RPC_CORE_MOVE_PARAM(timeout_cb) timeout_cb, uint32_t timeout_ms) {
RPC_CORE_LOGD("subscribe_rsp seq:%u", seq);
if (handle == nullptr) return;
const auto handleMap = is_ping ? &pong_handle_map_ : &rsp_handle_map_;
(*handleMap)[seq] = std::move(handle);
if (timer_impl_ == nullptr) {
RPC_CORE_LOGW("no timeout will cause memory leak!");
return;
}
auto alive = std::weak_ptr<void>(is_alive_);
timer_impl_(timeout_ms, [handleMap, seq, RPC_CORE_MOVE_LAMBDA(timeout_cb), RPC_CORE_MOVE_LAMBDA(alive)] {
if (alive.expired()) {
rsp_handle_map_[seq] = std::move(handle);
auto self = std::weak_ptr<msg_dispatcher>(shared_from_this());
timer_impl_(timeout_ms, [RPC_CORE_MOVE_LAMBDA(self), seq, RPC_CORE_MOVE_LAMBDA(timeout_cb)] {
auto self_lock = self.lock();
if (!self_lock) {
RPC_CORE_LOGD("seq:%u timeout after destroy", seq);
return;
}
auto it = handleMap->find(seq);
if (it != handleMap->cend()) {
auto it = self_lock->rsp_handle_map_.find(seq);
if (it != self_lock->rsp_handle_map_.cend()) {
if (timeout_cb) {
timeout_cb();
}
handleMap->erase(seq);
RPC_CORE_LOGV("Timeout seq=%d, handleMap.size=%zu", seq, handleMap->size());
self_lock->rsp_handle_map_.erase(seq);
RPC_CORE_LOGV("Timeout seq=%d, rsp_handle_map_.size=%zu", seq, this->rsp_handle_map_.size());
}
});
}
@ -159,9 +158,7 @@ class msg_dispatcher : noncopyable {
std::shared_ptr<connection> conn_;
std::map<cmd_type, cmd_handle> cmd_handle_map_;
std::map<seq_type, rsp_handle> rsp_handle_map_;
std::map<seq_type, rsp_handle> pong_handle_map_;
timer_impl timer_impl_;
std::shared_ptr<void> is_alive_ = std::make_shared<uint8_t>();
};
} // namespace detail

View File

@ -25,7 +25,7 @@ struct msg_wrapper : copyable { // NOLINT
cmd_type cmd;
msg_type type;
std::string data;
std::string* request_payload = nullptr;
std::string const* request_payload = nullptr;
std::string dump() const {
char tmp[100];

View File

@ -29,7 +29,7 @@ class request : detail::noncopyable, public std::enable_shared_from_this<request
struct rpc_proto {
virtual ~rpc_proto() = default;
virtual seq_type make_seq() = 0;
virtual void send_request(const request_s&) = 0;
virtual void send_request(request const*) = 0;
virtual bool is_ready() const = 0;
};
using send_proto_s = std::shared_ptr<rpc_proto>;
@ -80,7 +80,7 @@ class request : detail::noncopyable, public std::enable_shared_from_this<request
auto r = std::shared_ptr<request>(new request(std::forward<Args>(args)...), [](request* p) {
delete p;
});
r->init();
r->timeout(nullptr);
return r;
}
@ -189,7 +189,7 @@ class request : detail::noncopyable, public std::enable_shared_from_this<request
return;
}
seq_ = r->make_seq();
r->send_request(self_keeper_);
r->send_request(this);
if (!need_rsp_) {
on_finish(finally_t::no_need_rsp);
}
@ -268,7 +268,7 @@ class request : detail::noncopyable, public std::enable_shared_from_this<request
return rpc_;
}
bool canceled() const {
bool is_canceled() const {
return canceled_;
}
@ -301,10 +301,6 @@ class request : detail::noncopyable, public std::enable_shared_from_this<request
}
private:
void init() {
timeout(nullptr);
}
void on_finish(finally_t type) {
if (!waiting_rsp_) return;
waiting_rsp_ = false;

View File

@ -28,7 +28,9 @@ class rpc : detail::noncopyable, public std::enable_shared_from_this<rpc>, publi
}
private:
explicit rpc(std::shared_ptr<connection> conn = std::make_shared<default_connection>()) : conn_(conn), dispatcher_(std::move(conn)) {
explicit rpc(std::shared_ptr<connection> conn = std::make_shared<default_connection>())
: conn_(conn), dispatcher_(std::make_shared<detail::msg_dispatcher>(std::move(conn))) {
dispatcher_->init();
RPC_CORE_LOGD("rpc: %p", this);
}
@ -42,7 +44,7 @@ class rpc : detail::noncopyable, public std::enable_shared_from_this<rpc>, publi
}
inline void set_timer(detail::msg_dispatcher::timer_impl timer_impl) {
dispatcher_.set_timer_impl(std::move(timer_impl));
dispatcher_->set_timer_impl(std::move(timer_impl));
}
inline void set_ready(bool ready) {
@ -54,11 +56,11 @@ class rpc : detail::noncopyable, public std::enable_shared_from_this<rpc>, publi
void subscribe(const cmd_type& cmd, RPC_CORE_MOVE_PARAM(F) handle) {
constexpr bool F_ReturnIsEmpty = std::is_void<typename detail::callable_traits<F>::return_type>::value;
constexpr bool F_ParamIsEmpty = detail::callable_traits<F>::argc == 0;
subscribe_helper<F, F_ReturnIsEmpty, F_ParamIsEmpty>()(cmd, std::move(handle), &dispatcher_);
subscribe_helper<F, F_ReturnIsEmpty, F_ParamIsEmpty>()(cmd, std::move(handle), dispatcher_.get());
}
inline void unsubscribe(const cmd_type& cmd) {
dispatcher_.unsubscribe_cmd(cmd);
dispatcher_->unsubscribe_cmd(cmd);
}
public:
@ -79,9 +81,9 @@ class rpc : detail::noncopyable, public std::enable_shared_from_this<rpc>, publi
return seq_++;
}
void send_request(const request_s& request) override {
void send_request(request const* request) override {
if (request->need_rsp_) {
dispatcher_.subscribe_rsp(request->seq_, request->rsp_handle_, request->timeout_cb_, request->timeout_ms_, request->is_ping_);
dispatcher_->subscribe_rsp(request->seq_, request->rsp_handle_, request->timeout_cb_, request->timeout_ms_);
}
detail::msg_wrapper msg;
msg.type = static_cast<detail::msg_wrapper::msg_type>(detail::msg_wrapper::command | (request->is_ping_ ? detail::msg_wrapper::ping : 0) |
@ -157,7 +159,7 @@ class rpc : detail::noncopyable, public std::enable_shared_from_this<rpc>, publi
private:
std::shared_ptr<connection> conn_;
detail::msg_dispatcher dispatcher_;
std::shared_ptr<detail::msg_dispatcher> dispatcher_;
seq_type seq_{0};
bool is_ready_ = false;
};

View File

@ -3,6 +3,16 @@
// config
#include "config.hpp"
#if defined(RPC_CORE_SERIALIZE_USE_CUSTOM)
#include RPC_CORE_SERIALIZE_USE_CUSTOM
#elif defined(RPC_CORE_SERIALIZE_USE_NLOHMANN_JSON)
#include "serialize_nlohmann_json.hpp"
#else
// type traits
#include "detail/type_traits.hpp"
@ -32,3 +42,5 @@
#include "serialize/type_ptr.hpp"
#include "serialize/type_struct.hpp"
#include "serialize/type_void.hpp"
#endif

View File

@ -0,0 +1,25 @@
#pragma once
#include "detail/log.h"
#include "detail/string_view.hpp"
#include "nlohmann/json.hpp"
namespace rpc_core {
template <typename T>
inline std::string serialize(T&& t) {
return nlohmann::json(t).dump(-1);
}
template <typename T>
inline bool deserialize(const detail::string_view& data, T& t) {
try {
t = nlohmann::json::parse(data.data(), data.data() + data.size()).get<T>();
return true;
} catch (std::exception& e) {
RPC_CORE_LOGE("deserialize: %s", e.what());
return false;
}
}
} // namespace rpc_core

View File

@ -2,7 +2,7 @@
#define RPC_CORE_VER_MAJOR 2
#define RPC_CORE_VER_MINOR 0
#define RPC_CORE_VER_PATCH 1
#define RPC_CORE_VER_PATCH 2
#define RPC_CORE_TO_VERSION(major, minor, patch) (major * 10000 + minor * 100 + patch)
#define RPC_CORE_VERSION RPC_CORE_TO_VERSION(RPC_CORE_VER_MAJOR, RPC_CORE_VER_MINOR, RPC_CORE_VER_PATCH)