Loading 3party/rpc_core/include/rpc_core/connection.hpp +36 −45 Original line number Diff line number Diff line Loading @@ -29,13 +29,12 @@ struct connection : detail::noncopyable { * Default connection avoid crash */ struct default_connection : connection { default_connection() { send_package_impl = [](const std::string &payload) { RPC_CORE_LOGE("need send_package_impl: %zu", payload.size()); }; on_recv_package = [](const std::string &payload) { RPC_CORE_LOGE("need on_recv_package: %zu", payload.size()); }; default_connection() { send_package_impl = [](const std::string &payload) { RPC_CORE_LOGE("need send_package_impl: %zu", payload.size()); }; on_recv_package = [](const std::string &payload) { RPC_CORE_LOGE("need on_recv_package: %zu", payload.size()); }; } }; Loading @@ -43,17 +42,14 @@ struct default_connection : connection { * Loopback connection for testing */ struct loopback_connection : public connection { static std::pair<std::shared_ptr<connection>, std::shared_ptr<connection>> create() { static std::pair<std::shared_ptr<connection>, std::shared_ptr<connection>> create() { auto c1 = std::make_shared<connection>(); auto c1_weak = std::weak_ptr<connection>(c1); auto c2 = std::make_shared<connection>(); auto c2_weak = std::weak_ptr<connection>(c2); c1->send_package_impl = [c2_weak](std::string package) { c2_weak.lock()->on_recv_package(std::move(package)); }; c2->send_package_impl = [c1_weak](std::string package) { c1_weak.lock()->on_recv_package(std::move(package)); }; c1->send_package_impl = [c2_weak](std::string package) { c2_weak.lock()->on_recv_package(std::move(package)); }; c2->send_package_impl = [c1_weak](std::string package) { c1_weak.lock()->on_recv_package(std::move(package)); }; return std::make_pair(c1, c2); } }; Loading @@ -63,25 +59,20 @@ struct loopback_connection : public connection { * for bytes stream: tcp socket, serial port, etc. */ struct stream_connection : public connection { explicit stream_connection(uint32_t max_body_size = UINT32_MAX) : data_packer_(max_body_size) { explicit stream_connection(uint32_t max_body_size = UINT32_MAX) : data_packer_(max_body_size) { send_package_impl = [this](const std::string &package) { auto payload = data_packer_.pack(package); send_bytes_impl(std::move(payload)); }; data_packer_.on_data = [this](std::string payload) { on_recv_package(std::move(payload)); }; on_recv_bytes = [this](const void *data, size_t size) { data_packer_.feed(data, size); }; data_packer_.on_data = [this](std::string payload) { on_recv_package(std::move(payload)); }; on_recv_bytes = [this](const void *data, size_t size) { data_packer_.feed(data, size); }; } /** * should call on connected or disconnected */ void reset() { data_packer_.reset(); } void reset() { data_packer_.reset(); } public: std::function<void(std::string)> send_bytes_impl; Loading 3party/rpc_core/include/rpc_core/rpc.hpp +130 −137 Original line number Diff line number Diff line Loading @@ -21,83 +21,76 @@ class rpc : detail::noncopyable, public std::enable_shared_from_this<rpc>, publi public: template<typename... Args> static std::shared_ptr<rpc> create(Args&&... args) { return std::shared_ptr<rpc>(new rpc(std::forward<Args>(args)...), [](rpc* p) { delete p; }); static std::shared_ptr<rpc> create(Args &&...args) { return std::shared_ptr<rpc>(new rpc(std::forward<Args>(args)...), [](rpc *p) { delete p; }); } private: explicit rpc(std::shared_ptr<connection> conn = std::make_shared<default_connection>()) : conn_(conn), dispatcher_(std::make_shared<detail::msg_dispatcher>(std::move(conn))) { : conn_(conn), dispatcher_(std::make_shared<detail::msg_dispatcher>(std::move(conn))) { dispatcher_->init(); RPC_CORE_LOGD("rpc: %p", this); } ~rpc() override { RPC_CORE_LOGD("~rpc: %p", this); }; ~rpc() override { RPC_CORE_LOGD("~rpc: %p", this); }; public: inline std::shared_ptr<connection> get_connection() const { return conn_; } inline std::shared_ptr<connection> get_connection() const { return conn_; } inline void set_timer(detail::msg_dispatcher::timer_impl timer_impl) { inline void set_timer(detail::msg_dispatcher::timer_impl timer_impl) { dispatcher_->set_timer_impl(std::move(timer_impl)); } inline void set_ready(bool ready) { is_ready_ = ready; } inline void set_ready(bool ready) { is_ready_ = ready; } public: template<typename F> void subscribe(const cmd_type& cmd, RPC_CORE_MOVE_PARAM(F) handle) { void subscribe(const cmd_type &cmd, RPC_CORE_MOVE_PARAM(F) handle) { constexpr bool F_ReturnIsEmpty = std::is_void<typename detail::callable_traits<F>::return_type>::value; constexpr bool F_ParamIsEmpty = detail::callable_traits<F>::argc == 0; subscribe_helper<F, F_ReturnIsEmpty, F_ParamIsEmpty>()(cmd, std::move(handle), dispatcher_.get()); } inline void unsubscribe(const cmd_type& cmd) { dispatcher_->unsubscribe_cmd(cmd); } inline void unsubscribe(const cmd_type &cmd) { dispatcher_->unsubscribe_cmd(cmd); } public: inline request_s create_request() { return request::create(shared_from_this()); } inline request_s create_request() { return request::create(shared_from_this()); } inline request_s cmd(cmd_type cmd) { return create_request()->cmd(std::move(cmd)); } inline request_s cmd(cmd_type cmd) { return create_request()->cmd(std::move(cmd)); } inline request_s ping(std::string payload = {}) { // NOLINT inline request_s ping(std::string payload = {}) {// NOLINT return create_request()->ping()->msg(std::move(payload)); } public: seq_type make_seq() override { return seq_++; } seq_type make_seq() override { return seq_++; } void send_request(request const* request) override { void send_request(request const *request) override { if (request->need_rsp_) { dispatcher_->subscribe_rsp(request->seq_, request->rsp_handle_, request->timeout_cb_, request->timeout_ms_); } detail::msg_wrapper msg; msg.type = static_cast<detail::msg_wrapper::msg_type>(detail::msg_wrapper::command | (request->is_ping_ ? detail::msg_wrapper::ping : 0) | (request->need_rsp_ ? detail::msg_wrapper::need_rsp : 0)); msg.type = static_cast<detail::msg_wrapper::msg_type>( detail::msg_wrapper::command | (request->is_ping_ ? detail::msg_wrapper::ping : 0) | (request->need_rsp_ ? detail::msg_wrapper::need_rsp : 0)); msg.cmd = request->cmd_; msg.seq = request->seq_; msg.request_payload = &request->payload_; RPC_CORE_LOGD("=> seq:%u type:%s %s", msg.seq, (msg.type & detail::msg_wrapper::msg_type::ping) ? "ping" : "cmd", msg.cmd.c_str()); RPC_CORE_LOGD("=> seq:%u type:%s %s", msg.seq, (msg.type & detail::msg_wrapper::msg_type::ping) ? "ping" : "cmd", msg.cmd.c_str()); conn_->send_package_impl(detail::coder::serialize(msg)); } inline bool is_ready() const override { return is_ready_; } inline bool is_ready() const override { return is_ready_; } private: template<typename F, bool F_ReturnIsEmpty, bool F_ParamIsEmpty> Loading @@ -105,16 +98,15 @@ class rpc : detail::noncopyable, public std::enable_shared_from_this<rpc>, publi template<typename F> struct subscribe_helper<F, false, false> { void operator()(const cmd_type& cmd, RPC_CORE_MOVE_PARAM(F) handle, detail::msg_dispatcher* dispatcher) { void operator()(const cmd_type &cmd, RPC_CORE_MOVE_PARAM(F) handle, detail::msg_dispatcher *dispatcher) { dispatcher->subscribe_cmd(cmd, [RPC_CORE_MOVE_LAMBDA(handle)](const detail::msg_wrapper &msg) { using F_Param = detail::remove_cvref_t<typename detail::callable_traits<F>::template argument_type<0>>; using F_Return = detail::remove_cvref_t<typename detail::callable_traits<F>::return_type>; auto r = msg.unpack_as<F_Param>(); F_Return ret; if (r.first) { ret = handle(std::move(r.second)); } if (r.first) { ret = handle(std::move(r.second)); } return detail::msg_wrapper::make_rsp(msg.seq, &ret, r.first); }); } Loading @@ -122,14 +114,13 @@ class rpc : detail::noncopyable, public std::enable_shared_from_this<rpc>, publi template<typename F> struct subscribe_helper<F, true, false> { void operator()(const cmd_type& cmd, RPC_CORE_MOVE_PARAM(F) handle, detail::msg_dispatcher* dispatcher) { void operator()(const cmd_type &cmd, RPC_CORE_MOVE_PARAM(F) handle, detail::msg_dispatcher *dispatcher) { dispatcher->subscribe_cmd(cmd, [RPC_CORE_MOVE_LAMBDA(handle)](const detail::msg_wrapper &msg) { using F_Param = detail::remove_cvref_t<typename detail::callable_traits<F>::template argument_type<0>>; auto r = msg.unpack_as<F_Param>(); if (r.first) { handle(std::move(r.second)); } if (r.first) { handle(std::move(r.second)); } return detail::msg_wrapper::make_rsp<uint8_t>(msg.seq, nullptr, r.first); }); } Loading @@ -137,7 +128,8 @@ class rpc : detail::noncopyable, public std::enable_shared_from_this<rpc>, publi template<typename F> struct subscribe_helper<F, false, true> { void operator()(const cmd_type& cmd, RPC_CORE_MOVE_PARAM(F) handle, detail::msg_dispatcher* dispatcher) { void operator()(const cmd_type &cmd, RPC_CORE_MOVE_PARAM(F) handle, detail::msg_dispatcher *dispatcher) { dispatcher->subscribe_cmd(cmd, [RPC_CORE_MOVE_LAMBDA(handle)](const detail::msg_wrapper &msg) { using F_Return = typename detail::callable_traits<F>::return_type; Loading @@ -149,7 +141,8 @@ class rpc : detail::noncopyable, public std::enable_shared_from_this<rpc>, publi template<typename F> struct subscribe_helper<F, true, true> { void operator()(const cmd_type& cmd, RPC_CORE_MOVE_PARAM(F) handle, detail::msg_dispatcher* dispatcher) { void operator()(const cmd_type &cmd, RPC_CORE_MOVE_PARAM(F) handle, detail::msg_dispatcher *dispatcher) { dispatcher->subscribe_cmd(cmd, [RPC_CORE_MOVE_LAMBDA(handle)](const detail::msg_wrapper &msg) { handle(); return detail::msg_wrapper::make_rsp<uint8_t>(msg.seq, nullptr, true); Loading Loading
3party/rpc_core/include/rpc_core/connection.hpp +36 −45 Original line number Diff line number Diff line Loading @@ -29,13 +29,12 @@ struct connection : detail::noncopyable { * Default connection avoid crash */ struct default_connection : connection { default_connection() { send_package_impl = [](const std::string &payload) { RPC_CORE_LOGE("need send_package_impl: %zu", payload.size()); }; on_recv_package = [](const std::string &payload) { RPC_CORE_LOGE("need on_recv_package: %zu", payload.size()); }; default_connection() { send_package_impl = [](const std::string &payload) { RPC_CORE_LOGE("need send_package_impl: %zu", payload.size()); }; on_recv_package = [](const std::string &payload) { RPC_CORE_LOGE("need on_recv_package: %zu", payload.size()); }; } }; Loading @@ -43,17 +42,14 @@ struct default_connection : connection { * Loopback connection for testing */ struct loopback_connection : public connection { static std::pair<std::shared_ptr<connection>, std::shared_ptr<connection>> create() { static std::pair<std::shared_ptr<connection>, std::shared_ptr<connection>> create() { auto c1 = std::make_shared<connection>(); auto c1_weak = std::weak_ptr<connection>(c1); auto c2 = std::make_shared<connection>(); auto c2_weak = std::weak_ptr<connection>(c2); c1->send_package_impl = [c2_weak](std::string package) { c2_weak.lock()->on_recv_package(std::move(package)); }; c2->send_package_impl = [c1_weak](std::string package) { c1_weak.lock()->on_recv_package(std::move(package)); }; c1->send_package_impl = [c2_weak](std::string package) { c2_weak.lock()->on_recv_package(std::move(package)); }; c2->send_package_impl = [c1_weak](std::string package) { c1_weak.lock()->on_recv_package(std::move(package)); }; return std::make_pair(c1, c2); } }; Loading @@ -63,25 +59,20 @@ struct loopback_connection : public connection { * for bytes stream: tcp socket, serial port, etc. */ struct stream_connection : public connection { explicit stream_connection(uint32_t max_body_size = UINT32_MAX) : data_packer_(max_body_size) { explicit stream_connection(uint32_t max_body_size = UINT32_MAX) : data_packer_(max_body_size) { send_package_impl = [this](const std::string &package) { auto payload = data_packer_.pack(package); send_bytes_impl(std::move(payload)); }; data_packer_.on_data = [this](std::string payload) { on_recv_package(std::move(payload)); }; on_recv_bytes = [this](const void *data, size_t size) { data_packer_.feed(data, size); }; data_packer_.on_data = [this](std::string payload) { on_recv_package(std::move(payload)); }; on_recv_bytes = [this](const void *data, size_t size) { data_packer_.feed(data, size); }; } /** * should call on connected or disconnected */ void reset() { data_packer_.reset(); } void reset() { data_packer_.reset(); } public: std::function<void(std::string)> send_bytes_impl; Loading
3party/rpc_core/include/rpc_core/rpc.hpp +130 −137 Original line number Diff line number Diff line Loading @@ -21,83 +21,76 @@ class rpc : detail::noncopyable, public std::enable_shared_from_this<rpc>, publi public: template<typename... Args> static std::shared_ptr<rpc> create(Args&&... args) { return std::shared_ptr<rpc>(new rpc(std::forward<Args>(args)...), [](rpc* p) { delete p; }); static std::shared_ptr<rpc> create(Args &&...args) { return std::shared_ptr<rpc>(new rpc(std::forward<Args>(args)...), [](rpc *p) { delete p; }); } private: explicit rpc(std::shared_ptr<connection> conn = std::make_shared<default_connection>()) : conn_(conn), dispatcher_(std::make_shared<detail::msg_dispatcher>(std::move(conn))) { : conn_(conn), dispatcher_(std::make_shared<detail::msg_dispatcher>(std::move(conn))) { dispatcher_->init(); RPC_CORE_LOGD("rpc: %p", this); } ~rpc() override { RPC_CORE_LOGD("~rpc: %p", this); }; ~rpc() override { RPC_CORE_LOGD("~rpc: %p", this); }; public: inline std::shared_ptr<connection> get_connection() const { return conn_; } inline std::shared_ptr<connection> get_connection() const { return conn_; } inline void set_timer(detail::msg_dispatcher::timer_impl timer_impl) { inline void set_timer(detail::msg_dispatcher::timer_impl timer_impl) { dispatcher_->set_timer_impl(std::move(timer_impl)); } inline void set_ready(bool ready) { is_ready_ = ready; } inline void set_ready(bool ready) { is_ready_ = ready; } public: template<typename F> void subscribe(const cmd_type& cmd, RPC_CORE_MOVE_PARAM(F) handle) { void subscribe(const cmd_type &cmd, RPC_CORE_MOVE_PARAM(F) handle) { constexpr bool F_ReturnIsEmpty = std::is_void<typename detail::callable_traits<F>::return_type>::value; constexpr bool F_ParamIsEmpty = detail::callable_traits<F>::argc == 0; subscribe_helper<F, F_ReturnIsEmpty, F_ParamIsEmpty>()(cmd, std::move(handle), dispatcher_.get()); } inline void unsubscribe(const cmd_type& cmd) { dispatcher_->unsubscribe_cmd(cmd); } inline void unsubscribe(const cmd_type &cmd) { dispatcher_->unsubscribe_cmd(cmd); } public: inline request_s create_request() { return request::create(shared_from_this()); } inline request_s create_request() { return request::create(shared_from_this()); } inline request_s cmd(cmd_type cmd) { return create_request()->cmd(std::move(cmd)); } inline request_s cmd(cmd_type cmd) { return create_request()->cmd(std::move(cmd)); } inline request_s ping(std::string payload = {}) { // NOLINT inline request_s ping(std::string payload = {}) {// NOLINT return create_request()->ping()->msg(std::move(payload)); } public: seq_type make_seq() override { return seq_++; } seq_type make_seq() override { return seq_++; } void send_request(request const* request) override { void send_request(request const *request) override { if (request->need_rsp_) { dispatcher_->subscribe_rsp(request->seq_, request->rsp_handle_, request->timeout_cb_, request->timeout_ms_); } detail::msg_wrapper msg; msg.type = static_cast<detail::msg_wrapper::msg_type>(detail::msg_wrapper::command | (request->is_ping_ ? detail::msg_wrapper::ping : 0) | (request->need_rsp_ ? detail::msg_wrapper::need_rsp : 0)); msg.type = static_cast<detail::msg_wrapper::msg_type>( detail::msg_wrapper::command | (request->is_ping_ ? detail::msg_wrapper::ping : 0) | (request->need_rsp_ ? detail::msg_wrapper::need_rsp : 0)); msg.cmd = request->cmd_; msg.seq = request->seq_; msg.request_payload = &request->payload_; RPC_CORE_LOGD("=> seq:%u type:%s %s", msg.seq, (msg.type & detail::msg_wrapper::msg_type::ping) ? "ping" : "cmd", msg.cmd.c_str()); RPC_CORE_LOGD("=> seq:%u type:%s %s", msg.seq, (msg.type & detail::msg_wrapper::msg_type::ping) ? "ping" : "cmd", msg.cmd.c_str()); conn_->send_package_impl(detail::coder::serialize(msg)); } inline bool is_ready() const override { return is_ready_; } inline bool is_ready() const override { return is_ready_; } private: template<typename F, bool F_ReturnIsEmpty, bool F_ParamIsEmpty> Loading @@ -105,16 +98,15 @@ class rpc : detail::noncopyable, public std::enable_shared_from_this<rpc>, publi template<typename F> struct subscribe_helper<F, false, false> { void operator()(const cmd_type& cmd, RPC_CORE_MOVE_PARAM(F) handle, detail::msg_dispatcher* dispatcher) { void operator()(const cmd_type &cmd, RPC_CORE_MOVE_PARAM(F) handle, detail::msg_dispatcher *dispatcher) { dispatcher->subscribe_cmd(cmd, [RPC_CORE_MOVE_LAMBDA(handle)](const detail::msg_wrapper &msg) { using F_Param = detail::remove_cvref_t<typename detail::callable_traits<F>::template argument_type<0>>; using F_Return = detail::remove_cvref_t<typename detail::callable_traits<F>::return_type>; auto r = msg.unpack_as<F_Param>(); F_Return ret; if (r.first) { ret = handle(std::move(r.second)); } if (r.first) { ret = handle(std::move(r.second)); } return detail::msg_wrapper::make_rsp(msg.seq, &ret, r.first); }); } Loading @@ -122,14 +114,13 @@ class rpc : detail::noncopyable, public std::enable_shared_from_this<rpc>, publi template<typename F> struct subscribe_helper<F, true, false> { void operator()(const cmd_type& cmd, RPC_CORE_MOVE_PARAM(F) handle, detail::msg_dispatcher* dispatcher) { void operator()(const cmd_type &cmd, RPC_CORE_MOVE_PARAM(F) handle, detail::msg_dispatcher *dispatcher) { dispatcher->subscribe_cmd(cmd, [RPC_CORE_MOVE_LAMBDA(handle)](const detail::msg_wrapper &msg) { using F_Param = detail::remove_cvref_t<typename detail::callable_traits<F>::template argument_type<0>>; auto r = msg.unpack_as<F_Param>(); if (r.first) { handle(std::move(r.second)); } if (r.first) { handle(std::move(r.second)); } return detail::msg_wrapper::make_rsp<uint8_t>(msg.seq, nullptr, r.first); }); } Loading @@ -137,7 +128,8 @@ class rpc : detail::noncopyable, public std::enable_shared_from_this<rpc>, publi template<typename F> struct subscribe_helper<F, false, true> { void operator()(const cmd_type& cmd, RPC_CORE_MOVE_PARAM(F) handle, detail::msg_dispatcher* dispatcher) { void operator()(const cmd_type &cmd, RPC_CORE_MOVE_PARAM(F) handle, detail::msg_dispatcher *dispatcher) { dispatcher->subscribe_cmd(cmd, [RPC_CORE_MOVE_LAMBDA(handle)](const detail::msg_wrapper &msg) { using F_Return = typename detail::callable_traits<F>::return_type; Loading @@ -149,7 +141,8 @@ class rpc : detail::noncopyable, public std::enable_shared_from_this<rpc>, publi template<typename F> struct subscribe_helper<F, true, true> { void operator()(const cmd_type& cmd, RPC_CORE_MOVE_PARAM(F) handle, detail::msg_dispatcher* dispatcher) { void operator()(const cmd_type &cmd, RPC_CORE_MOVE_PARAM(F) handle, detail::msg_dispatcher *dispatcher) { dispatcher->subscribe_cmd(cmd, [RPC_CORE_MOVE_LAMBDA(handle)](const detail::msg_wrapper &msg) { handle(); return detail::msg_wrapper::make_rsp<uint8_t>(msg.seq, nullptr, true); Loading