diff --git a/3party/rpc_core/include/rpc_core/connection.hpp b/3party/rpc_core/include/rpc_core/connection.hpp index 3ff1377..e792767 100644 --- a/3party/rpc_core/include/rpc_core/connection.hpp +++ b/3party/rpc_core/include/rpc_core/connection.hpp @@ -21,41 +21,37 @@ namespace rpc_core { * 3. Provide the implementation of sending data, send_package_impl. */ struct connection : detail::noncopyable { - std::function send_package_impl; - std::function on_recv_package; + std::function send_package_impl; + std::function on_recv_package; }; /** * Default connection avoid crash */ struct default_connection : connection { - default_connection() { - send_package_impl = [](const std::string &payload) { - RPC_CORE_LOGE("need send_package_impl: %zu", payload.size()); - }; - on_recv_package = [](const std::string &payload) { - RPC_CORE_LOGE("need on_recv_package: %zu", payload.size()); - }; - } + default_connection() + { + send_package_impl + = [](const std::string &payload) { RPC_CORE_LOGE("need send_package_impl: %zu", payload.size()); }; + on_recv_package + = [](const std::string &payload) { RPC_CORE_LOGE("need on_recv_package: %zu", payload.size()); }; + } }; /** * Loopback connection for testing */ struct loopback_connection : public connection { - static std::pair, std::shared_ptr> create() { - auto c1 = std::make_shared(); - auto c1_weak = std::weak_ptr(c1); - auto c2 = std::make_shared(); - auto c2_weak = std::weak_ptr(c2); - c1->send_package_impl = [c2_weak](std::string package) { - c2_weak.lock()->on_recv_package(std::move(package)); - }; - c2->send_package_impl = [c1_weak](std::string package) { - c1_weak.lock()->on_recv_package(std::move(package)); - }; - return std::make_pair(c1, c2); - } + static std::pair, std::shared_ptr> create() + { + auto c1 = std::make_shared(); + auto c1_weak = std::weak_ptr(c1); + auto c2 = std::make_shared(); + auto c2_weak = std::weak_ptr(c2); + c1->send_package_impl = [c2_weak](std::string package) { c2_weak.lock()->on_recv_package(std::move(package)); }; + c2->send_package_impl = [c1_weak](std::string package) { c1_weak.lock()->on_recv_package(std::move(package)); }; + return std::make_pair(c1, c2); + } }; /** @@ -63,32 +59,27 @@ struct loopback_connection : public connection { * for bytes stream: tcp socket, serial port, etc. */ struct stream_connection : public connection { - explicit stream_connection(uint32_t max_body_size = UINT32_MAX) : data_packer_(max_body_size) { - send_package_impl = [this](const std::string &package) { - auto payload = data_packer_.pack(package); - send_bytes_impl(std::move(payload)); - }; - data_packer_.on_data = [this](std::string payload) { - on_recv_package(std::move(payload)); - }; - on_recv_bytes = [this](const void *data, size_t size) { - data_packer_.feed(data, size); - }; - } + explicit stream_connection(uint32_t max_body_size = UINT32_MAX) : data_packer_(max_body_size) + { + send_package_impl = [this](const std::string &package) { + auto payload = data_packer_.pack(package); + send_bytes_impl(std::move(payload)); + }; + data_packer_.on_data = [this](std::string payload) { on_recv_package(std::move(payload)); }; + on_recv_bytes = [this](const void *data, size_t size) { data_packer_.feed(data, size); }; + } - /** + /** * should call on connected or disconnected */ - void reset() { - data_packer_.reset(); - } + void reset() { data_packer_.reset(); } - public: - std::function send_bytes_impl; - std::function on_recv_bytes; +public: + std::function send_bytes_impl; + std::function on_recv_bytes; - private: - detail::data_packer data_packer_; +private: + detail::data_packer data_packer_; }; -} // namespace rpc_core +}// namespace rpc_core diff --git a/3party/rpc_core/include/rpc_core/rpc.hpp b/3party/rpc_core/include/rpc_core/rpc.hpp index 70a8dde..2ca001e 100644 --- a/3party/rpc_core/include/rpc_core/rpc.hpp +++ b/3party/rpc_core/include/rpc_core/rpc.hpp @@ -16,152 +16,145 @@ namespace rpc_core { class rpc : detail::noncopyable, public std::enable_shared_from_this, public request::rpc_proto { - public: - using timeout_cb = detail::msg_dispatcher::timeout_cb; +public: + using timeout_cb = detail::msg_dispatcher::timeout_cb; - public: - template - static std::shared_ptr create(Args&&... args) { - return std::shared_ptr(new rpc(std::forward(args)...), [](rpc* p) { - delete p; - }); - } - - private: - explicit rpc(std::shared_ptr conn = std::make_shared()) - : conn_(conn), dispatcher_(std::make_shared(std::move(conn))) { - dispatcher_->init(); - RPC_CORE_LOGD("rpc: %p", this); - } - - ~rpc() override { - RPC_CORE_LOGD("~rpc: %p", this); - }; - - public: - inline std::shared_ptr get_connection() const { - return conn_; - } - - inline void set_timer(detail::msg_dispatcher::timer_impl timer_impl) { - dispatcher_->set_timer_impl(std::move(timer_impl)); - } - - inline void set_ready(bool ready) { - is_ready_ = ready; - } - - public: - template - void subscribe(const cmd_type& cmd, RPC_CORE_MOVE_PARAM(F) handle) { - constexpr bool F_ReturnIsEmpty = std::is_void::return_type>::value; - constexpr bool F_ParamIsEmpty = detail::callable_traits::argc == 0; - subscribe_helper()(cmd, std::move(handle), dispatcher_.get()); - } - - inline void unsubscribe(const cmd_type& cmd) { - dispatcher_->unsubscribe_cmd(cmd); - } - - public: - inline request_s create_request() { - return request::create(shared_from_this()); - } - - inline request_s cmd(cmd_type cmd) { - return create_request()->cmd(std::move(cmd)); - } - - inline request_s ping(std::string payload = {}) { // NOLINT - return create_request()->ping()->msg(std::move(payload)); - } - - public: - seq_type make_seq() override { - return seq_++; - } - - void send_request(request const* request) override { - if (request->need_rsp_) { - dispatcher_->subscribe_rsp(request->seq_, request->rsp_handle_, request->timeout_cb_, request->timeout_ms_); +public: + template + static std::shared_ptr create(Args &&...args) + { + return std::shared_ptr(new rpc(std::forward(args)...), [](rpc *p) { delete p; }); } - detail::msg_wrapper msg; - msg.type = static_cast(detail::msg_wrapper::command | (request->is_ping_ ? detail::msg_wrapper::ping : 0) | - (request->need_rsp_ ? detail::msg_wrapper::need_rsp : 0)); - msg.cmd = request->cmd_; - msg.seq = request->seq_; - msg.request_payload = &request->payload_; - RPC_CORE_LOGD("=> seq:%u type:%s %s", msg.seq, (msg.type & detail::msg_wrapper::msg_type::ping) ? "ping" : "cmd", msg.cmd.c_str()); - conn_->send_package_impl(detail::coder::serialize(msg)); - } - inline bool is_ready() const override { - return is_ready_; - } +private: + explicit rpc(std::shared_ptr conn = std::make_shared()) + : conn_(conn), + dispatcher_(std::make_shared(std::move(conn))) + { + dispatcher_->init(); + RPC_CORE_LOGD("rpc: %p", this); + } - private: - template - struct subscribe_helper; + ~rpc() override { RPC_CORE_LOGD("~rpc: %p", this); }; - template - struct subscribe_helper { - void operator()(const cmd_type& cmd, RPC_CORE_MOVE_PARAM(F) handle, detail::msg_dispatcher* dispatcher) { - dispatcher->subscribe_cmd(cmd, [RPC_CORE_MOVE_LAMBDA(handle)](const detail::msg_wrapper& msg) { - using F_Param = detail::remove_cvref_t::template argument_type<0>>; - using F_Return = detail::remove_cvref_t::return_type>; +public: + inline std::shared_ptr get_connection() const { return conn_; } - auto r = msg.unpack_as(); - F_Return ret; - if (r.first) { - ret = handle(std::move(r.second)); + inline void set_timer(detail::msg_dispatcher::timer_impl timer_impl) + { + dispatcher_->set_timer_impl(std::move(timer_impl)); + } + + inline void set_ready(bool ready) { is_ready_ = ready; } + +public: + template + void subscribe(const cmd_type &cmd, RPC_CORE_MOVE_PARAM(F) handle) + { + constexpr bool F_ReturnIsEmpty = std::is_void::return_type>::value; + constexpr bool F_ParamIsEmpty = detail::callable_traits::argc == 0; + subscribe_helper()(cmd, std::move(handle), dispatcher_.get()); + } + + inline void unsubscribe(const cmd_type &cmd) { dispatcher_->unsubscribe_cmd(cmd); } + +public: + inline request_s create_request() { return request::create(shared_from_this()); } + + inline request_s cmd(cmd_type cmd) { return create_request()->cmd(std::move(cmd)); } + + inline request_s ping(std::string payload = {}) + {// NOLINT + return create_request()->ping()->msg(std::move(payload)); + } + +public: + seq_type make_seq() override { return seq_++; } + + void send_request(request const *request) override + { + if (request->need_rsp_) { + dispatcher_->subscribe_rsp(request->seq_, request->rsp_handle_, request->timeout_cb_, request->timeout_ms_); } - return detail::msg_wrapper::make_rsp(msg.seq, &ret, r.first); - }); + detail::msg_wrapper msg; + msg.type = static_cast( + detail::msg_wrapper::command | (request->is_ping_ ? detail::msg_wrapper::ping : 0) + | (request->need_rsp_ ? detail::msg_wrapper::need_rsp : 0)); + msg.cmd = request->cmd_; + msg.seq = request->seq_; + msg.request_payload = &request->payload_; + RPC_CORE_LOGD("=> seq:%u type:%s %s", + msg.seq, + (msg.type & detail::msg_wrapper::msg_type::ping) ? "ping" : "cmd", + msg.cmd.c_str()); + conn_->send_package_impl(detail::coder::serialize(msg)); } - }; - template - struct subscribe_helper { - void operator()(const cmd_type& cmd, RPC_CORE_MOVE_PARAM(F) handle, detail::msg_dispatcher* dispatcher) { - dispatcher->subscribe_cmd(cmd, [RPC_CORE_MOVE_LAMBDA(handle)](const detail::msg_wrapper& msg) { - using F_Param = detail::remove_cvref_t::template argument_type<0>>; + inline bool is_ready() const override { return is_ready_; } - auto r = msg.unpack_as(); - if (r.first) { - handle(std::move(r.second)); +private: + template + struct subscribe_helper; + + template + struct subscribe_helper { + void operator()(const cmd_type &cmd, RPC_CORE_MOVE_PARAM(F) handle, detail::msg_dispatcher *dispatcher) + { + dispatcher->subscribe_cmd(cmd, [RPC_CORE_MOVE_LAMBDA(handle)](const detail::msg_wrapper &msg) { + using F_Param = detail::remove_cvref_t::template argument_type<0>>; + using F_Return = detail::remove_cvref_t::return_type>; + + auto r = msg.unpack_as(); + F_Return ret; + if (r.first) { ret = handle(std::move(r.second)); } + return detail::msg_wrapper::make_rsp(msg.seq, &ret, r.first); + }); } - return detail::msg_wrapper::make_rsp(msg.seq, nullptr, r.first); - }); - } - }; + }; - template - struct subscribe_helper { - void operator()(const cmd_type& cmd, RPC_CORE_MOVE_PARAM(F) handle, detail::msg_dispatcher* dispatcher) { - dispatcher->subscribe_cmd(cmd, [RPC_CORE_MOVE_LAMBDA(handle)](const detail::msg_wrapper& msg) { - using F_Return = typename detail::callable_traits::return_type; + template + struct subscribe_helper { + void operator()(const cmd_type &cmd, RPC_CORE_MOVE_PARAM(F) handle, detail::msg_dispatcher *dispatcher) + { + dispatcher->subscribe_cmd(cmd, [RPC_CORE_MOVE_LAMBDA(handle)](const detail::msg_wrapper &msg) { + using F_Param = detail::remove_cvref_t::template argument_type<0>>; - F_Return ret = handle(); - return detail::msg_wrapper::make_rsp(msg.seq, &ret, true); - }); - } - }; + auto r = msg.unpack_as(); + if (r.first) { handle(std::move(r.second)); } + return detail::msg_wrapper::make_rsp(msg.seq, nullptr, r.first); + }); + } + }; - template - struct subscribe_helper { - void operator()(const cmd_type& cmd, RPC_CORE_MOVE_PARAM(F) handle, detail::msg_dispatcher* dispatcher) { - dispatcher->subscribe_cmd(cmd, [RPC_CORE_MOVE_LAMBDA(handle)](const detail::msg_wrapper& msg) { - handle(); - return detail::msg_wrapper::make_rsp(msg.seq, nullptr, true); - }); - } - }; + template + struct subscribe_helper { + void operator()(const cmd_type &cmd, RPC_CORE_MOVE_PARAM(F) handle, detail::msg_dispatcher *dispatcher) + { + dispatcher->subscribe_cmd(cmd, [RPC_CORE_MOVE_LAMBDA(handle)](const detail::msg_wrapper &msg) { + using F_Return = typename detail::callable_traits::return_type; - private: - std::shared_ptr conn_; - std::shared_ptr dispatcher_; - seq_type seq_{0}; - bool is_ready_ = false; + F_Return ret = handle(); + return detail::msg_wrapper::make_rsp(msg.seq, &ret, true); + }); + } + }; + + template + struct subscribe_helper { + void operator()(const cmd_type &cmd, RPC_CORE_MOVE_PARAM(F) handle, detail::msg_dispatcher *dispatcher) + { + dispatcher->subscribe_cmd(cmd, [RPC_CORE_MOVE_LAMBDA(handle)](const detail::msg_wrapper &msg) { + handle(); + return detail::msg_wrapper::make_rsp(msg.seq, nullptr, true); + }); + } + }; + +private: + std::shared_ptr conn_; + std::shared_ptr dispatcher_; + seq_type seq_{0}; + bool is_ready_ = false; }; -} // namespace rpc_core +}// namespace rpc_core