From 2b2a6101b1741a39e34ca4d6194cd7b5c6673b03 Mon Sep 17 00:00:00 2001 From: tqcq Date: Tue, 2 Apr 2024 04:41:18 +0000 Subject: [PATCH] feat add event_bus (#1) Co-authored-by: tqcq <99722391+tqcq@users.noreply.github.com> Reviewed-on: https://code.uocat.com/tqcq/sled/pulls/1 --- .gitea/workflows/linux-aarch64-gcc.yml | 140 +---------------- .gitea/workflows/linux-arm-gcc.yml | 162 +++++--------------- .gitea/workflows/linux-mips64-gcc.yml | 40 +---- 3party/asyncplusplus/CMakeLists.txt | 204 +++++++++++++------------ CMakeLists.txt | 22 ++- src/sled/event_bus/event_bus.h | 155 +++++++++++++++++++ src/sled/event_bus/event_bus_bench.cc | 78 ++++++++++ src/sled/event_bus/event_bus_test.cc | 120 +++++++++++++++ src/sled/random_bench.cc | 1 + src/sled/sigslot.cc | 18 ++- src/sled/sigslot.h | 165 +++++++++----------- src/sled/sigslot_test.cc | 16 ++ src/sled/sled.h | 3 + src/sled/strings/base64_bench.cc | 2 +- src/sled/synchronization/mutex.h | 124 ++++++++++----- src/sled/system/thread_pool_bench.cc | 1 + src/sled/system_time_bench.cc | 1 + src/sled/testing/doctest.h | 16 ++ src/sled/timer/task_queue_timeout.cc | 2 +- src/sled/uri_bench.cc | 1 + 20 files changed, 734 insertions(+), 537 deletions(-) create mode 100644 src/sled/event_bus/event_bus.h create mode 100644 src/sled/event_bus/event_bus_bench.cc create mode 100644 src/sled/event_bus/event_bus_test.cc create mode 100644 src/sled/sigslot_test.cc diff --git a/.gitea/workflows/linux-aarch64-gcc.yml b/.gitea/workflows/linux-aarch64-gcc.yml index 4be40a0..a44fe88 100644 --- a/.gitea/workflows/linux-aarch64-gcc.yml +++ b/.gitea/workflows/linux-aarch64-gcc.yml @@ -27,150 +27,22 @@ concurrency: permissions: contents: read jobs: - linux-gcc: + linux-gcc-aarch64: runs-on: ubuntu-20.04 steps: - uses: actions/checkout@v4 - - name: cache-qemu - id: cache-qemu - uses: actions/cache@v4 - with: - path: qemu-install - key: qemu-aarch64-install-20220502-ubuntu-2004-2 - - name: install-qemu-build-deps - if: steps.cache-qemu.outputs.cache-hit != 'true' - run: | - sudo apt-get update -y - sudo apt-get install -y autoconf automake autotools-dev ninja-build - - name: checkout-qemu - if: steps.cache-qemu.outputs.cache-hit != 'true' - run: | - mkdir qemu && cd qemu - git init - git remote add origin https://gitlab.com/qemu-project/qemu.git - git fetch --depth=1 origin f5643914a9e8f79c606a76e6a9d7ea82a3fc3e65 - git checkout f5643914a9e8f79c606a76e6a9d7ea82a3fc3e65 - # uses: actions/checkout@v4 - # with: - # repository: qemu/qemu - # path: qemu - # ref: f5643914a9e8f79c606a76e6a9d7ea82a3fc3e65 - - name: qemu - if: steps.cache-qemu.outputs.cache-hit != 'true' - run: | - cd qemu - ./configure --prefix=$GITHUB_WORKSPACE/qemu-install --target-list=aarch64-linux-user --disable-system - make -j$(nproc) - make install - name: aarch64-gnu-toolchain run: | sudo apt-get update -y - sudo apt-get install -y cmake make g++-aarch64-linux-gnu + sudo apt-get install -y cmake make g++-aarch64-linux-gnu qemu-user-binfmt - name: build run: | mkdir build && cd build - cmake -DCMAKE_TOOLCHAIN_FILE=../toolchains/aarch64-linux-gnu.toolchain.cmake -DSLED_BUILD_TESTS=ON -DSLED_BUILD_BENCHMARK=ON - cmake --build . -j $(nproc) - - name: test - run: | - export PATH=$GITHUB_WORKSPACE/qemu-install/bin:$PATH - cd build - TESTS_EXECUTABLE_LOADER=qemu-aarch64 TESTS_EXECUTABLE_LOADER_ARGUMENTS="-L;/usr/aarch64-linux-gnu" ctest --output-on-failure -j $(nproc) - linux-gcc-arm82: - runs-on: ubuntu-20.04 - steps: - - uses: actions/checkout@v4 - - name: cache-qemu - id: cache-qemu - uses: actions/cache@v4 - with: - path: qemu-install - key: qemu-aarch64-install-20220502-ubuntu-2004-2 - - name: install-qemu-build-deps - if: steps.cache-qemu.outputs.cache-hit != 'true' - run: | - sudo apt-get update -y - sudo apt-get install -y autoconf automake autotools-dev ninja-build - - name: checkout-qemu - if: steps.cache-qemu.outputs.cache-hit != 'true' - run: | - mkdir qemu && cd qemu - git init - git remote add origin https://gitlab.com/qemu-project/qemu.git - git fetch --depth=1 origin f5643914a9e8f79c606a76e6a9d7ea82a3fc3e65 - git checkout f5643914a9e8f79c606a76e6a9d7ea82a3fc3e65 - # uses: actions/checkout@v4 - # with: - # repository: qemu/qemu - # path: qemu - # ref: f5643914a9e8f79c606a76e6a9d7ea82a3fc3e65 - - name: qemu - if: steps.cache-qemu.outputs.cache-hit != 'true' - run: | - cd qemu - ./configure --prefix=$GITHUB_WORKSPACE/qemu-install --target-list=aarch64-linux-user --disable-system - make -j$(nproc) - make install - - name: aarch64-gnu-toolchain - run: | - sudo apt-get update -y - sudo apt-get install -y cmake make g++-aarch64-linux-gnu - - name: build - run: | - mkdir build && cd build - cmake -DCMAKE_TOOLCHAIN_FILE=../toolchains/aarch64-linux-gnu.toolchain.cmake -DSLED_BUILD_TESTS=ON -DSLED_BUILD_BENCHMARK=ON - cmake --build . -j $(nproc) - - name: test - run: | - export PATH=$GITHUB_WORKSPACE/qemu-install/bin:$PATH - cd build - TESTS_EXECUTABLE_LOADER=qemu-aarch64 TESTS_EXECUTABLE_LOADER_ARGUMENTS="-L;/usr/aarch64-linux-gnu" ctest --output-on-failure -j $(nproc) - linux-gcc-arm86: - runs-on: ubuntu-22.04 - steps: - - uses: actions/checkout@v4 - - name: cache-qemu - id: cache-qemu - uses: actions/cache@v4 - with: - path: qemu-install - key: qemu-aarch64-install-20230717 - - name: install-qemu-build-deps - if: steps.cache-qemu.outputs.cache-hit != 'true' - run: | - sudo apt-get update -y - sudo apt-get install -y autoconf automake autotools-dev ninja-build - - name: checkout-qemu - if: steps.cache-qemu.outputs.cache-hit != 'true' - run: | - mkdir qemu && cd qemu - git init - git remote add origin https://gitlab.com/qemu-project/qemu.git - git fetch --depth=1 origin f5643914a9e8f79c606a76e6a9d7ea82a3fc3e65 - git checkout f5643914a9e8f79c606a76e6a9d7ea82a3fc3e65 - # uses: actions/checkout@v4 - # with: - # repository: qemu/qemu - # path: qemu - # ref: ed8ad9728a9c0eec34db9dff61dfa2f1dd625637 - - name: qemu - if: steps.cache-qemu.outputs.cache-hit != 'true' - run: | - cd qemu - ./configure --prefix=$GITHUB_WORKSPACE/qemu-install --target-list=aarch64-linux-user --disable-system - make -j$(nproc) - make install - - name: aarch64-gnu-toolchain - run: | - sudo apt-get update -y - sudo apt-get install -y cmake make g++-aarch64-linux-gnu - - name: build - run: | - mkdir build && cd build - cmake -DCMAKE_TOOLCHAIN_FILE=../toolchains/aarch64-linux-gnu.toolchain.cmake -DSLED_BUILD_TESTS=ON -DSLED_BUILD_BENCHMARK=ON + cmake -DCMAKE_TOOLCHAIN_FILE=../toolchains/aarch64-linux-gnu.toolchain.cmake -DSLED_BUILD_TESTS=ON -DSLED_BUILD_BENCHMARK=ON .. cmake --build . -j $(nproc) - name: test run: |- - export PATH=$GITHUB_WORKSPACE/qemu-install/bin:$PATH cd build - TESTS_EXECUTABLE_LOADER=qemu-aarch64 TESTS_EXECUTABLE_LOADER_ARGUMENTS="-L;/usr/aarch64-linux-gnu" ctest --output-on-failure -j $(nproc) + ln -sf /usr/aarch64-linux-gnu/lib/ld-linux-aarch64.so.1 /lib/ld-linux-aarch64.so.1 + export LD_LIBRARY_PATH=/usr/aarch64-linux-gnu/lib + ctest --output-on-failure -j$(nproc) diff --git a/.gitea/workflows/linux-arm-gcc.yml b/.gitea/workflows/linux-arm-gcc.yml index 8009a94..fa65f83 100644 --- a/.gitea/workflows/linux-arm-gcc.yml +++ b/.gitea/workflows/linux-arm-gcc.yml @@ -1,12 +1,11 @@ - --- name: linux-arm-gcc on: push: paths: - .gitea/workflows/linux-arm-gcc.yml - - 'toolchains/arm-linux-gnueabi.toolchain.cmake' - - 'toolchains/arm-linux-gnueabihf.toolchain.cmake' + - toolchains/arm-linux-gnueabi.toolchain.cmake + - toolchains/arm-linux-gnueabihf.toolchain.cmake - 3party/** - include/** - src/** @@ -16,8 +15,8 @@ on: pull_request: paths: - .gitea/workflows/linux-arm-gcc.yml - - 'toolchains/arm-linux-gnueabi.toolchain.cmake' - - 'toolchains/arm-linux-gnueabihf.toolchain.cmake' + - toolchains/arm-linux-gnueabi.toolchain.cmake + - toolchains/arm-linux-gnueabihf.toolchain.cmake - 3party/** - include/** - src/** @@ -27,125 +26,42 @@ on: concurrency: group: linux-arm-gcc-${{ github.ref }} cancel-in-progress: true - jobs: - linux-gcc-arm: - runs-on: ubuntu-20.04 - steps: - - uses: actions/checkout@v4 - - - name: cache-qemu - id: cache-qemu - uses: actions/cache@v4 - with: - path: qemu-install - key: qemu-arm-install-20220502-2 - - name: install-qemu-build-deps - if: steps.cache-qemu.outputs.cache-hit != 'true' - run: | - sudo apt-get update -y - sudo apt-get install -y autoconf automake autotools-dev ninja-build - - name: checkout-qemu - if: steps.cache-qemu.outputs.cache-hit != 'true' - run: | - mkdir qemu && cd qemu - git init - git remote add origin https://gitlab.com/qemu-project/qemu.git - git fetch --depth=1 origin f5643914a9e8f79c606a76e6a9d7ea82a3fc3e65 - git checkout f5643914a9e8f79c606a76e6a9d7ea82a3fc3e65 - # uses: actions/checkout@v4 - # with: - # repository: qemu/qemu - # path: qemu - # ref: f5643914a9e8f79c606a76e6a9d7ea82a3fc3e65 - - name: qemu - if: steps.cache-qemu.outputs.cache-hit != 'true' - run: | - cd qemu - ./configure --prefix=$GITHUB_WORKSPACE/qemu-install --target-list=arm-linux-user --disable-system - make -j$(nproc) - make install - - - name: set-qemu-cache - uses: actions/cache/save@v3 - if: steps.cache-qemu.outputs.cache-hit != 'true' - with: - key: qemu-arm-install-20220502-2 - path: qemu-install - - - name: arm-gnu-toolchain - run: | - sudo apt-get update -y - sudo apt-get install -y cmake make g++-arm-linux-gnueabi - - - name: build - run: | - mkdir build && cd build - cmake -DCMAKE_TOOLCHAIN_FILE=../toolchains/arm-linux-gnueabi.toolchain.cmake -DSLED_BUILD_TESTS=ON -DSLED_BUILD_TESTS=ON .. - cmake --build . -j $(nproc) - - name: test - run: | - export PATH=$GITHUB_WORKSPACE/qemu-install/bin:$PATH - cd build - TESTS_EXECUTABLE_LOADER=qemu-arm TESTS_EXECUTABLE_LOADER_ARGUMENTS="-L;/usr/arm-linux-gnueabi" ctest --output-on-failure -j $(nproc) - + # linux-gcc-arm: + # runs-on: ubuntu-20.04 + # steps: + # - uses: actions/checkout@v4 + # - name: arm-gnu-toolchain + # run: | + # sudo apt-get update -y + # sudo apt-get install -y cmake make g++-arm-linux-gnueabi qemu-user-binfmt + # - name: build + # run: | + # mkdir build && cd build + # cmake -DCMAKE_TOOLCHAIN_FILE=../toolchains/arm-linux-gnueabi.toolchain.cmake -DSLED_BUILD_TESTS=ON -DSLED_BUILD_TESTS=ON .. + # cmake --build . -j $(nproc) + # - name: test + # run: | + # cd build + # ln -sf /usr/arm-linux-gnueabi/lib/ld-linux.so.3 + # export LD_LIBRARY_PATH=/usr/arm-linux-gnueabi/lib + # qemu-arm ctest --output-on-failure -j$(nproc) linux-gcc-armhf: runs-on: ubuntu-20.04 steps: - - uses: actions/checkout@v4 - - - name: cache-qemu - id: cache-qemu - uses: actions/cache@v4 - with: - path: qemu-install - key: qemu-arm-install-20220502-2 - - name: install-qemu-build-deps - if: steps.cache-qemu.outputs.cache-hit != 'true' - run: | - sudo apt-get update -y - sudo apt-get install -y autoconf automake autotools-dev ninja-build - - name: checkout-qemu - if: steps.cache-qemu.outputs.cache-hit != 'true' - run: | - mkdir qemu && cd qemu - git init - git remote add origin https://gitlab.com/qemu-project/qemu.git - git fetch --depth=1 origin f5643914a9e8f79c606a76e6a9d7ea82a3fc3e65 - git checkout f5643914a9e8f79c606a76e6a9d7ea82a3fc3e65 - # uses: actions/checkout@v4 - # with: - # repository: qemu/qemu - # path: qemu - # ref: f5643914a9e8f79c606a76e6a9d7ea82a3fc3e65 - - name: qemu - if: steps.cache-qemu.outputs.cache-hit != 'true' - run: | - cd qemu - ./configure --prefix=$GITHUB_WORKSPACE/qemu-install --target-list=arm-linux-user --disable-system - make -j$(nproc) - make install - - - name: set-qemu-cache - uses: actions/cache/save@v3 - if: steps.cache-qemu.outputs.cache-hit != 'true' - with: - key: qemu-arm-install-20220502-2 - path: qemu-install - - - name: arm-gnu-toolchain - run: | - sudo apt-get update -y - sudo apt-get install -y cmake make g++-arm-linux-gnueabihf - - - name: build - run: | - mkdir build && cd build - cmake -DCMAKE_TOOLCHAIN_FILE=../toolchains/arm-linux-gnueabihf.toolchain.cmake -DSLED_BUILD_TESTS=ON .. - cmake --build . -j $(nproc) - - name: test - run: | - export PATH=$GITHUB_WORKSPACE/qemu-install/bin:$PATH - cd build - TESTS_EXECUTABLE_LOADER=qemu-arm TESTS_EXECUTABLE_LOADER_ARGUMENTS="-L;/usr/arm-linux-gnueabihf" ctest --output-on-failure -j $(nproc) - + - uses: actions/checkout@v4 + - name: arm-gnu-toolchain + run: | + sudo apt-get update -y + sudo apt-get install -y cmake make g++-arm-linux-gnueabihf qemu-user-binfmt + - name: build + run: | + mkdir build && cd build + cmake -DCMAKE_TOOLCHAIN_FILE=../toolchains/arm-linux-gnueabihf.toolchain.cmake -DSLED_BUILD_TESTS=ON .. + cmake --build . -j $(nproc) + - name: test + run: |- + cd build + ln -sf /usr/arm-linux-gnueabihf/lib/ld-linux-armhf.so.3 /lib/ld-linux-armhf.so.3 + export LD_LIBRARY_PATH=/usr/arm-linux-gnueabihf/lib/ + ctest --output-on-failure -j$(nproc) diff --git a/.gitea/workflows/linux-mips64-gcc.yml b/.gitea/workflows/linux-mips64-gcc.yml index 52cd879..84175e8 100644 --- a/.gitea/workflows/linux-mips64-gcc.yml +++ b/.gitea/workflows/linux-mips64-gcc.yml @@ -35,51 +35,19 @@ jobs: - uses: actions/checkout@v4 with: submodules: true - - name: cache-qemu - id: cache-qemu - uses: actions/cache@v3 - with: - path: qemu-install - key: qemu-mips64el-install-20220502-2 - - name: checkout-qemu - # uses: actions/checkout@v4 - if: steps.cache-qemu.outputs.cache-hit != 'true' - run: | - mkdir qemu && cd qemu - git init - git remote add origin https://gitlab.com/qemu-project/qemu.git - git fetch --depth=1 origin f5643914a9e8f79c606a76e6a9d7ea82a3fc3e65 - git checkout f5643914a9e8f79c606a76e6a9d7ea82a3fc3e65 - - name: install-qemu-build-deps - if: steps.cache-qemu.outputs.cache-hit != 'true' - run: | - sudo apt-get update -y - sudo apt-get install -y autoconf automake autotools-dev ninja-build - - name: qemu - if: steps.cache-qemu.outputs.cache-hit != 'true' - run: | - cd $GITHUB_WORKSPACE/qemu - ./configure --target-list=mips64el-linux-user --prefix=$GITHUB_WORKSPACE/qemu-install - make -j `nproc` - make install - - name: set-qemu-cache - uses: actions/cache/save@v3 - if: steps.cache-qemu.outputs.cache-hit != 'true' - with: - key: qemu-mips64el-install-20220502-2 - path: qemu-install - name: mips64el-gnuabi64-toolchain run: | sudo apt-get update -y - sudo apt-get install -y cmake make g++-mips64el-linux-gnuabi64 + sudo apt-get install -y cmake make g++-mips64el-linux-gnuabi64 qemu-user-binfmt - name: configure run: | mkdir build && cd build - cmake .. -DCMAKE_BUILD_TYPE=${{matrix.build_type}} -DSLED_BUILD_TESTS=ON -DCMAKE_TOOLCHAIN_FILE=../toolchains/mips64el-linux-gnuabi64.toolchain.cmake -DCMAKE_CROSSCOMPILING_EMULATOR="qemu-mips64el;-L;/usr/mips64el-linux-gnuabi64" + cmake .. -DCMAKE_BUILD_TYPE=${{matrix.build_type}} -DSLED_BUILD_TESTS=ON -DCMAKE_TOOLCHAIN_FILE=../toolchains/mips64el-linux-gnuabi64.toolchain.cmake #-DCMAKE_CROSSCOMPILING_EMULATOR="qemu-mips64el;-L;/usr/mips64el-linux-gnuabi64" - name: build run: cmake --build build --target all -j `nproc` - name: test run: |- - export PATH=$GITHUB_WORKSPACE/qemu-install/bin:$PATH cd build + ln -sf /usr/mips64el-linux-gnuabi64/lib64/ld.so.1 /lib64/ld.so.1 + export LD_LIBRARY_PATH=/usr/mips64el-linux-gnuabi64/lib ctest --output-on-failure -j`nproc` diff --git a/3party/asyncplusplus/CMakeLists.txt b/3party/asyncplusplus/CMakeLists.txt index bb2de4d..4d6d603 100644 --- a/3party/asyncplusplus/CMakeLists.txt +++ b/3party/asyncplusplus/CMakeLists.txt @@ -7,71 +7,71 @@ # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. cmake_minimum_required(VERSION 3.1) project(Async++ C CXX) -option(BUILD_SHARED_LIBS "Build Async++ as a shared library" ON) +option(BUILD_SHARED_LIBS "Build Async++ as a shared library" OFF) option(USE_CXX_EXCEPTIONS "Enable C++ exception support" ON) -if (APPLE) - option(BUILD_FRAMEWORK "Build a Mac OS X framework instead of a library" OFF) - if (BUILD_FRAMEWORK AND NOT BUILD_SHARED_LIBS) - message(FATAL_ERROR "Can't build a framework with static libraries") - endif() +if(APPLE) + option(BUILD_FRAMEWORK "Build a Mac OS X framework instead of a library" OFF) + if(BUILD_FRAMEWORK AND NOT BUILD_SHARED_LIBS) + message(FATAL_ERROR "Can't build a framework with static libraries") + endif() endif() set(CMAKE_EXPORT_COMPILE_COMMANDS ON) # Add all source and header files so IDEs can see them set(ASYNCXX_INCLUDE - ${PROJECT_SOURCE_DIR}/include/async++/aligned_alloc.h - ${PROJECT_SOURCE_DIR}/include/async++/cancel.h - ${PROJECT_SOURCE_DIR}/include/async++/continuation_vector.h - ${PROJECT_SOURCE_DIR}/include/async++/parallel_for.h - ${PROJECT_SOURCE_DIR}/include/async++/parallel_invoke.h - ${PROJECT_SOURCE_DIR}/include/async++/parallel_reduce.h - ${PROJECT_SOURCE_DIR}/include/async++/partitioner.h - ${PROJECT_SOURCE_DIR}/include/async++/range.h - ${PROJECT_SOURCE_DIR}/include/async++/ref_count.h - ${PROJECT_SOURCE_DIR}/include/async++/scheduler.h - ${PROJECT_SOURCE_DIR}/include/async++/scheduler_fwd.h - ${PROJECT_SOURCE_DIR}/include/async++/task.h - ${PROJECT_SOURCE_DIR}/include/async++/task_base.h - ${PROJECT_SOURCE_DIR}/include/async++/traits.h - ${PROJECT_SOURCE_DIR}/include/async++/when_all_any.h -) + ${PROJECT_SOURCE_DIR}/include/async++/aligned_alloc.h + ${PROJECT_SOURCE_DIR}/include/async++/cancel.h + ${PROJECT_SOURCE_DIR}/include/async++/continuation_vector.h + ${PROJECT_SOURCE_DIR}/include/async++/parallel_for.h + ${PROJECT_SOURCE_DIR}/include/async++/parallel_invoke.h + ${PROJECT_SOURCE_DIR}/include/async++/parallel_reduce.h + ${PROJECT_SOURCE_DIR}/include/async++/partitioner.h + ${PROJECT_SOURCE_DIR}/include/async++/range.h + ${PROJECT_SOURCE_DIR}/include/async++/ref_count.h + ${PROJECT_SOURCE_DIR}/include/async++/scheduler.h + ${PROJECT_SOURCE_DIR}/include/async++/scheduler_fwd.h + ${PROJECT_SOURCE_DIR}/include/async++/task.h + ${PROJECT_SOURCE_DIR}/include/async++/task_base.h + ${PROJECT_SOURCE_DIR}/include/async++/traits.h + ${PROJECT_SOURCE_DIR}/include/async++/when_all_any.h) set(ASYNCXX_SRC - ${PROJECT_SOURCE_DIR}/src/internal.h - ${PROJECT_SOURCE_DIR}/src/fifo_queue.h - ${PROJECT_SOURCE_DIR}/src/scheduler.cpp - ${PROJECT_SOURCE_DIR}/src/singleton.h - ${PROJECT_SOURCE_DIR}/src/task_wait_event.h - ${PROJECT_SOURCE_DIR}/src/threadpool_scheduler.cpp - ${PROJECT_SOURCE_DIR}/src/work_steal_queue.h -) -source_group(include FILES ${PROJECT_SOURCE_DIR}/include/async++.h ${ASYNCXX_INCLUDE}) + ${PROJECT_SOURCE_DIR}/src/internal.h + ${PROJECT_SOURCE_DIR}/src/fifo_queue.h + ${PROJECT_SOURCE_DIR}/src/scheduler.cpp + ${PROJECT_SOURCE_DIR}/src/singleton.h + ${PROJECT_SOURCE_DIR}/src/task_wait_event.h + ${PROJECT_SOURCE_DIR}/src/threadpool_scheduler.cpp + ${PROJECT_SOURCE_DIR}/src/work_steal_queue.h) +source_group(include FILES ${PROJECT_SOURCE_DIR}/include/async++.h + ${ASYNCXX_INCLUDE}) source_group(src FILES ${ASYNCXX_SRC}) -add_library(Async++ ${PROJECT_SOURCE_DIR}/include/async++.h ${ASYNCXX_INCLUDE} ${ASYNCXX_SRC}) +add_library(Async++ ${PROJECT_SOURCE_DIR}/include/async++.h ${ASYNCXX_INCLUDE} + ${ASYNCXX_SRC}) # Async++ only depends on the C++11 standard libraries, but some implementations # require the -pthread compiler flag to enable threading functionality. -if (NOT MSVC) - target_compile_options(Async++ PRIVATE -std=c++11) +if(NOT MSVC) + target_compile_options(Async++ PRIVATE -std=c++11) endif() -if (APPLE) - # Use libc++ on Mac because the shipped libstdc++ version is ancient - target_compile_options(Async++ PRIVATE -stdlib=libc++) - set_target_properties(Async++ PROPERTIES LINK_FLAGS -stdlib=libc++) +if(APPLE) + # Use libc++ on Mac because the shipped libstdc++ version is ancient + target_compile_options(Async++ PRIVATE -stdlib=libc++) + set_target_properties(Async++ PROPERTIES LINK_FLAGS -stdlib=libc++) endif() set(THREADS_PREFER_PTHREAD_FLAG ON) find_package(Threads REQUIRED) @@ -80,86 +80,90 @@ target_link_libraries(Async++ PUBLIC Threads::Threads) # Set up preprocessor definitions target_include_directories(Async++ PRIVATE ${PROJECT_SOURCE_DIR}/include) set_target_properties(Async++ PROPERTIES DEFINE_SYMBOL LIBASYNC_BUILD) -if (BUILD_SHARED_LIBS) - # Minimize the set of symbols exported by libraries - set_target_properties(Async++ PROPERTIES CXX_VISIBILITY_PRESET hidden VISIBILITY_INLINES_HIDDEN ON) +if(BUILD_SHARED_LIBS) + # Minimize the set of symbols exported by libraries + set_target_properties(Async++ PROPERTIES CXX_VISIBILITY_PRESET hidden + VISIBILITY_INLINES_HIDDEN ON) else() - target_compile_definitions(Async++ PUBLIC LIBASYNC_STATIC) + target_compile_definitions(Async++ PUBLIC LIBASYNC_STATIC) endif() # Enable warnings for strict C++ standard conformance -if (NOT MSVC) - target_compile_options(Async++ PRIVATE -Wall -Wextra -pedantic) +if(NOT MSVC) + target_compile_options(Async++ PRIVATE -Wall -Wextra -pedantic) endif() -# Async++ doesn't make use of RTTI information, so don't generate it. -# There are issues on Apple platforms with exceptions and -fno-rtti, so keep it -# enabled there. -# See https://stackoverflow.com/questions/21737201/problems-throwing-and-catching-exceptions-on-os-x-with-fno-rtti -if (MSVC) - target_compile_options(Async++ PRIVATE /GR-) +# Async++ doesn't make use of RTTI information, so don't generate it. There are +# issues on Apple platforms with exceptions and -fno-rtti, so keep it enabled +# there. See +# https://stackoverflow.com/questions/21737201/problems-throwing-and-catching-exceptions-on-os-x-with-fno-rtti +if(MSVC) + target_compile_options(Async++ PRIVATE /GR-) elseif(NOT APPLE) - target_compile_options(Async++ PRIVATE -fno-rtti) + target_compile_options(Async++ PRIVATE -fno-rtti) endif() # Allow disabling exceptions, but warn the user about the consequences -if (NOT USE_CXX_EXCEPTIONS) - message(WARNING "Exceptions have been disabled. Any operation that would " - "throw an exception will result in a call to std::abort() instead.") - target_compile_definitions(Async++ PUBLIC LIBASYNC_NO_EXCEPTIONS) - if (MSVC) - target_compile_options(Async++ PUBLIC /EHs-c-) - else() - target_compile_options(Async++ PUBLIC -fno-exceptions) - endif() +if(NOT USE_CXX_EXCEPTIONS) + message( + WARNING "Exceptions have been disabled. Any operation that would " + "throw an exception will result in a call to std::abort() instead.") + target_compile_definitions(Async++ PUBLIC LIBASYNC_NO_EXCEPTIONS) + if(MSVC) + target_compile_options(Async++ PUBLIC /EHs-c-) + else() + target_compile_options(Async++ PUBLIC -fno-exceptions) + endif() endif() -# /Zc:__cplusplus is required to make __cplusplus accurate -# /Zc:__cplusplus is available starting with Visual Studio 2017 version 15.7 -# (according to https://docs.microsoft.com/en-us/cpp/build/reference/zc-cplusplus) -# That version is equivalent to _MSC_VER==1914 -# (according to https://docs.microsoft.com/en-us/cpp/preprocessor/predefined-macros?view=vs-2019) -# CMake's ${MSVC_VERSION} is equivalent to _MSC_VER -# (according to https://cmake.org/cmake/help/latest/variable/MSVC_VERSION.html#variable:MSVC_VERSION) -# GREATER and EQUAL are used because GREATER_EQUAL is available starting with CMake 3.7 -# (according to https://cmake.org/cmake/help/v3.7/release/3.7.html#commands) -if ((MSVC) AND ((MSVC_VERSION GREATER 1914) OR (MSVC_VERSION EQUAL 1914))) - target_compile_options(Async++ PUBLIC /Zc:__cplusplus) +# /Zc:__cplusplus is required to make __cplusplus accurate /Zc:__cplusplus is +# available starting with Visual Studio 2017 version 15.7 (according to +# https://docs.microsoft.com/en-us/cpp/build/reference/zc-cplusplus) That +# version is equivalent to _MSC_VER==1914 (according to +# https://docs.microsoft.com/en-us/cpp/preprocessor/predefined-macros?view=vs-2019) +# CMake's ${MSVC_VERSION} is equivalent to _MSC_VER (according to +# https://cmake.org/cmake/help/latest/variable/MSVC_VERSION.html#variable:MSVC_VERSION) +# GREATER and EQUAL are used because GREATER_EQUAL is available starting with +# CMake 3.7 (according to +# https://cmake.org/cmake/help/v3.7/release/3.7.html#commands) +if((MSVC) AND ((MSVC_VERSION GREATER 1914) OR (MSVC_VERSION EQUAL 1914))) + target_compile_options(Async++ PUBLIC /Zc:__cplusplus) endif() include(CMakePackageConfigHelpers) -configure_package_config_file("${CMAKE_CURRENT_LIST_DIR}/Async++Config.cmake.in" - "${PROJECT_BINARY_DIR}/Async++Config.cmake" - INSTALL_DESTINATION cmake -) +configure_package_config_file( + "${CMAKE_CURRENT_LIST_DIR}/Async++Config.cmake.in" + "${PROJECT_BINARY_DIR}/Async++Config.cmake" INSTALL_DESTINATION cmake) -install(FILES "${PROJECT_BINARY_DIR}/Async++Config.cmake" - DESTINATION cmake -) +install(FILES "${PROJECT_BINARY_DIR}/Async++Config.cmake" DESTINATION cmake) # Install the library and produce a CMake export script include(GNUInstallDirs) -install(TARGETS Async++ - EXPORT Async++ - RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} - LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} - ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} - FRAMEWORK DESTINATION Frameworks -) +install( + TARGETS Async++ + EXPORT Async++ + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} + LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} + ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} + FRAMEWORK DESTINATION Frameworks) export(EXPORT Async++) install(EXPORT Async++ DESTINATION cmake) -if (APPLE AND BUILD_FRAMEWORK) - set_target_properties(Async++ PROPERTIES OUTPUT_NAME Async++ FRAMEWORK ON) - set_source_files_properties(${ASYNCXX_INCLUDE} PROPERTIES MACOSX_PACKAGE_LOCATION Headers/async++) - set_source_files_properties(${PROJECT_SOURCE_DIR}/include/async++.h PROPERTIES MACOSX_PACKAGE_LOCATION Headers) +if(APPLE AND BUILD_FRAMEWORK) + set_target_properties(Async++ PROPERTIES OUTPUT_NAME Async++ FRAMEWORK ON) + set_source_files_properties( + ${ASYNCXX_INCLUDE} PROPERTIES MACOSX_PACKAGE_LOCATION Headers/async++) + set_source_files_properties(${PROJECT_SOURCE_DIR}/include/async++.h + PROPERTIES MACOSX_PACKAGE_LOCATION Headers) else() - set_target_properties(Async++ PROPERTIES OUTPUT_NAME async++) - target_include_directories(Async++ INTERFACE $ $) - install(FILES ${PROJECT_SOURCE_DIR}/include/async++.h DESTINATION include) - install(FILES ${ASYNCXX_INCLUDE} DESTINATION include/async++) + set_target_properties(Async++ PROPERTIES OUTPUT_NAME async++) + target_include_directories( + Async++ INTERFACE $ + $) + install(FILES ${PROJECT_SOURCE_DIR}/include/async++.h DESTINATION include) + install(FILES ${ASYNCXX_INCLUDE} DESTINATION include/async++) endif() -SET(CPACK_GENERATOR "DEB") -SET(CPACK_DEBIAN_PACKAGE_MAINTAINER "none") #required +set(CPACK_GENERATOR "DEB") +set(CPACK_DEBIAN_PACKAGE_MAINTAINER "none") # required -INCLUDE(CPack) +include(CPack) diff --git a/CMakeLists.txt b/CMakeLists.txt index 85d3790..951ba70 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -25,7 +25,7 @@ target_include_directories(benchmark_main PUBLIC src/) add_library(sled STATIC "") add_subdirectory(3party/minilua EXCLUDE_FROM_ALL) -add_subdirectory(3party/gperftools EXCLUDE_FROM_ALL) +# add_subdirectory(3party/gperftools EXCLUDE_FROM_ALL) add_subdirectory(3party/asyncplusplus EXCLUDE_FROM_ALL) # add_subdirectory(3party/cppuprofile EXCLUDE_FROM_ALL) # add_subdirectory(3party/protobuf-3.21.12 EXCLUDE_FROM_ALL) @@ -111,6 +111,7 @@ if(SLED_BUILD_BENCHMARK) add_executable( sled_benchmark + src/sled/event_bus/event_bus_bench.cc src/sled/random_bench.cc src/sled/strings/base64_bench.cc # src/sled/system/fiber/fiber_bench.cc @@ -134,8 +135,9 @@ function(sled_add_test) add_executable(${SLED_TEST_NAME} ${SLED_TEST_SRCS}) if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang") - set(EXTRA_FLAGS -Wthread-safety -g -fsanitize=address - -fno-omit-frame-pointer -fno-optimize-sibling-calls) + set(EXTRA_FLAGS # -Wthread-safety + -g -fsanitize=address -fno-omit-frame-pointer + -fno-optimize-sibling-calls) target_compile_options(${SLED_TEST_NAME} PRIVATE ${EXTRA_FLAGS}) target_link_options(${SLED_TEST_NAME} PRIVATE ${EXTRA_FLAGS}) @@ -165,7 +167,6 @@ if(SLED_BUILD_TESTS) SRCS src/sled/debugging/demangle_test.cc src/sled/async/async_test.cc - src/sled/any_test.cc src/sled/filesystem/path_test.cc src/sled/log/fmt_test.cc src/sled/synchronization/sequence_checker_test.cc @@ -179,11 +180,20 @@ if(SLED_BUILD_TESTS) src/sled/rx_test.cc src/sled/uri_test.cc) - sled_add_test(NAME sled_symbolize_test SRCS - src/sled/debugging/symbolize_test.cc NO_MAIN) + if(NOT ${CMAKE_SYSTEM_PROCESSOR} STREQUAL "arm") + sled_add_test(NAME sled_async_test SRCS src/sled/async/async_test.cc) + sled_add_test(NAME sled_thread_pool_test SRCS + src/sled/system/thread_pool_test.cc) + endif() + + sled_add_test(NAME sled_event_bus_test SRCS + src/sled/event_bus/event_bus_test.cc) sled_add_test(NAME sled_lua_test SRCS tests/lua_test.cc) sled_add_test(NAME sled_move_on_copy_test SRCS src/sled/utility/move_on_copy_test.cc) + sled_add_test(NAME sled_symbolize_test SRCS + src/sled/debugging/symbolize_test.cc NO_MAIN) + sled_add_test(NAME sled_sigslot_test SRCS src/sled/sigslot_test.cc) endif(SLED_BUILD_TESTS) if(SLED_BUILD_FUZZ) diff --git a/src/sled/event_bus/event_bus.h b/src/sled/event_bus/event_bus.h new file mode 100644 index 0000000..27f67de --- /dev/null +++ b/src/sled/event_bus/event_bus.h @@ -0,0 +1,155 @@ +#ifndef SLED_EVENT_BUS_EVENT_BUS_H +#define SLED_EVENT_BUS_EVENT_BUS_H + +#include "sled/sigslot.h" +#include "sled/synchronization/mutex.h" +#include + +namespace sled { + +class EventBus; +class Subscriber; + +namespace { + +template +class EventRegistry { +public: + using Dispatcher = sigslot::signal1; + using SubscriberTable = std::unordered_map; + + static EventRegistry &Instance() + { + static EventRegistry instance_; + return instance_; + } + + static std::function &GetCleanupHandler() + { + static std::function cleanup_handler + = std::bind(&EventRegistry::OnBusDestroyed, &Instance(), std::placeholders::_1); + return cleanup_handler; + } + + void Post(EventBus *bus, Event event) + { + sled::SharedMutexReadLock lock(&shared_mutex_); + if (signals_.empty()) { return; } + auto iter = signals_.find(bus); + if (iter != signals_.end()) { iter->second(event); } + } + + template + void Subscribe(EventBus *bus, C *instance, void (C::*method)(Event)) + { + sled::SharedMutexWriteLock lock(&shared_mutex_); + auto iter = signals_.find(bus); + if (iter == signals_.end()) { + signals_.emplace(bus, Dispatcher()); + iter = signals_.find(bus); + } + auto &dispatcher = iter->second; + dispatcher.connect(instance, method); + } + + template + void Unsubscribe(EventBus *bus, C *instance) + { + sled::SharedMutexWriteLock lock(&shared_mutex_); + auto iter = signals_.find(bus); + if (iter == signals_.end()) { return; } + auto &dispatcher = iter->second; + dispatcher.disconnect(instance); + if (dispatcher.is_empty()) { signals_.erase(iter); } + } + + bool IsEmpty(EventBus *bus) const + { + sled::SharedMutexReadLock lock(&shared_mutex_); + auto iter = signals_.find(bus); + if (iter == signals_.end()) { return true; } + return iter->second.is_empty(); + } + + void OnBusDestroyed(EventBus *bus) + { + sled::SharedMutexWriteLock lock(&shared_mutex_); + signals_.erase(bus); + } + + template + void OnSubscriberDestroyed(C *instance) + { + sled::SharedMutexWriteLock lock(&shared_mutex_); + for (auto &entry : signals_) { + auto &dispatcher = entry.second; + dispatcher.disconnect(instance); + } + } + +private: + mutable sled::SharedMutex shared_mutex_; + + SubscriberTable signals_; +}; + +}// namespace + +class EventBus { +public: + class Subscriber : public sigslot::has_slots<> { + public: + virtual ~Subscriber() = default; + }; + + EventBus() = default; + + ~EventBus() + { + for (const auto &handler : cleanup_handlers_) { handler.second(this); } + } + + EventBus(const EventBus &) = delete; + EventBus &operator=(const EventBus &) = delete; + + template + void Post(const Event &event) + { + EventRegistry::Instance().Post(this, event); + } + + // On ([](const Event1 &){}) + template + typename std::enable_if::value>::type + Subscribe(C *instance, void (C::*method)(Event)) + { + { + sled::MutexLock lock(&mutex_); + cleanup_handlers_[std::type_index(typeid(Event))] = EventRegistry::GetCleanupHandler(); + } + + EventRegistry::Instance().Subscribe(this, instance, method); + } + + template + typename std::enable_if::value>::type Unsubscribe(C *instance) + { + EventRegistry::Instance().Unsubscribe(this, instance); + { + sled::MutexLock lock(&mutex_); + if (EventRegistry::Instance().IsEmpty(this)) { + auto iter = cleanup_handlers_.find(std::type_index(typeid(Event))); + if (iter != cleanup_handlers_.end()) { + iter->second(this); + cleanup_handlers_.erase(iter); + } + } + } + } + +private: + sled::Mutex mutex_; + std::unordered_map> cleanup_handlers_ GUARDED_BY(mutex_); +}; +}// namespace sled +#endif// SLED_EVENT_BUS_EVENT_BUS_H diff --git a/src/sled/event_bus/event_bus_bench.cc b/src/sled/event_bus/event_bus_bench.cc new file mode 100644 index 0000000..67662fa --- /dev/null +++ b/src/sled/event_bus/event_bus_bench.cc @@ -0,0 +1,78 @@ +#include +#include +#include +#include + +struct Event { + std::shared_ptr data = std::make_shared(0); +}; + +struct AtomicEvent { + AtomicEvent(std::atomic &v) : data(v) {} + + std::atomic &data; +}; + +struct Subscriber : public sled::EventBus::Subscriber { + void OnEvent(Event event) { (*event.data)++; } + + void OnAtomicnEvent(AtomicEvent event) { event.data.fetch_add(1); } +}; + +void +BMEventBusPost_1_to_1(picobench::state &s) +{ + sled::EventBus event_bus; + Subscriber subscriber; + event_bus.Subscribe(&subscriber, &Subscriber::OnEvent); + for (auto _ : s) { event_bus.Post(Event{}); } +} + +void +BMEventBusPost_1_to_1k(picobench::state &s) +{ + sled::EventBus event_bus; + std::vector subscribers(1000); + for (auto &subscriber : subscribers) { event_bus.Subscribe(&subscriber, &Subscriber::OnEvent); } + + for (auto _ : s) { + Event event; + event_bus.Post(event); + SLED_ASSERT(*event.data == 1000, ""); + } +} + +void +BMEventBusPost_10_to_1k(picobench::state &s) +{ + constexpr int kPublishCount = 10; + constexpr int kSubscriberCount = 1000; + + sled::EventBus event_bus; + std::vector subscribers(kSubscriberCount); + for (auto &subscriber : subscribers) { event_bus.Subscribe(&subscriber, &Subscriber::OnAtomicnEvent); } + sled::ThreadPool pool(kPublishCount); + + for (auto _ : s) { + std::atomic value(0); + AtomicEvent atomic_event(value); + sled::WaitGroup wg(kPublishCount); + for (int i = 0; i < kPublishCount; i++) { + pool.PostTask([atomic_event, wg, &event_bus]() { + event_bus.Post(atomic_event); + wg.Done(); + }); + } + wg.Wait(); + SLED_ASSERT(value.load() == kPublishCount * kSubscriberCount, + "{} != {}", + value.load(), + kPublishCount * kSubscriberCount); + } +} + +PICOBENCH_SUITE("EventBus"); + +PICOBENCH(BMEventBusPost_1_to_1); +PICOBENCH(BMEventBusPost_1_to_1k); +PICOBENCH(BMEventBusPost_10_to_1k); diff --git a/src/sled/event_bus/event_bus_test.cc b/src/sled/event_bus/event_bus_test.cc new file mode 100644 index 0000000..7e7ca35 --- /dev/null +++ b/src/sled/event_bus/event_bus_test.cc @@ -0,0 +1,120 @@ +#include +#include +#include +#include + +using namespace fakeit; + +template +void * +GetPtr(R (T::*p)(Args...)) +{ + union { + R (T::*ptr)(Args...); + void *void_ptr; + } _; + + _.ptr = p; + return _.void_ptr; +} + +struct Event1 { + int a; +}; + +struct Event2 { + std::string str; +}; + +struct Subscriber : public sled::EventBus::Subscriber { + void OnEvent1(Event1 event) { a += event.a; } + + void OnEvent2(Event2 event) { str += event.str; } + + int a = 0; + std::string str = ""; +}; + +TEST_SUITE("EventBus") +{ + TEST_CASE("single thread") + { + sled::EventBus bus; + bus.Post(Event1{1}); + bus.Post(Event2{"1"}); + + Subscriber subscriber; + bus.Subscribe(&subscriber, &Subscriber::OnEvent1); + bus.Subscribe(&subscriber, &Subscriber::OnEvent2); + + bus.Post(Event1{1}); + bus.Post(Event2{"1"}); + + CHECK_EQ(subscriber.a, 1); + CHECK_EQ(subscriber.str, "1"); + + bus.Post(Event1{1}); + bus.Post(Event2{"1"}); + + CHECK_EQ(subscriber.a, 2); + CHECK_EQ(subscriber.str, "11"); + } + + TEST_CASE("multi thread") + { + auto thread = sled::Thread::Create(); + thread->Start(); + + sled::EventBus bus; + Subscriber subscriber; + + bus.Subscribe(&subscriber, &Subscriber::OnEvent1); + bus.Subscribe(&subscriber, &Subscriber::OnEvent2); + + thread->BlockingCall([&] { + bus.Post(Event1{1}); + bus.Post(Event2{"1"}); + }); + + CHECK_EQ(subscriber.a, 1); + CHECK_EQ(subscriber.str, "1"); + } + + TEST_CASE("thread_pool") + { + constexpr int kPublishCount = 10; + constexpr int kSubscriberCount = 1000; + + struct AtomicEvent { + std::atomic &data; + }; + + struct AotmicEventSubscriber : public sled::EventBus::Subscriber { + virtual ~AotmicEventSubscriber() = default; + + void OnEvent(AtomicEvent event) { event.data.fetch_add(1); } + }; + + std::atomic value(0); + AtomicEvent atomic_event{value}; + + sled::WaitGroup wg(kPublishCount); + sled::ThreadPool pool(kPublishCount); + sled::EventBus bus; + std::vector subscribers(kSubscriberCount); + + for (auto &sub : subscribers) { bus.Subscribe(&sub, &AotmicEventSubscriber::OnEvent); } + std::atomic invoke_count(0); + for (int i = 0; i < kPublishCount; i++) { + pool.PostTask([wg, atomic_event, &bus, &invoke_count]() { + bus.Post(atomic_event); + invoke_count.fetch_add(1); + wg.Done(); + }); + } + wg.Wait(); + + CHECK_EQ(invoke_count.load(), kPublishCount); + CHECK_EQ(value.load(), kPublishCount * kSubscriberCount); + } +} diff --git a/src/sled/random_bench.cc b/src/sled/random_bench.cc index 91e3c98..99746fb 100644 --- a/src/sled/random_bench.cc +++ b/src/sled/random_bench.cc @@ -1,5 +1,6 @@ #include #include +PICOBENCH_SUITE("Random"); PICOBENCH([](picobench::state &s) { sled::Random rand(s.user_data()); diff --git a/src/sled/sigslot.cc b/src/sled/sigslot.cc index 4fa6b11..183b599 100644 --- a/src/sled/sigslot.cc +++ b/src/sled/sigslot.cc @@ -4,12 +4,26 @@ namespace sigslot { #ifdef _SIGSLOT_HAS_POSIX_THREADS -pthread_mutex_t * +sled::RecursiveMutex * multi_threaded_global::get_mutex() { - static pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER; + static sled::RecursiveMutex g_mutex; return &g_mutex; } +// sled::Mutex * +// multi_threaded_global::get_mutex() +// { +// static sled::Mutex g_mutex; +// return &g_mutex; +// } + +// pthread_mutex_t * +// multi_threaded_global::get_mutex() +// { +// static pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER; +// return &g_mutex; +// } + #endif// _SIGSLOT_HAS_POSIX_THREADS }// namespace sigslot diff --git a/src/sled/sigslot.h b/src/sled/sigslot.h index 6efb1d2..a4dbe4a 100644 --- a/src/sled/sigslot.h +++ b/src/sled/sigslot.h @@ -94,20 +94,19 @@ // If signalx is single threaded the user must ensure that disconnect, connect // or signal is not happening concurrently or data race may occur. -#pragma once #ifndef SLED_SIGSLOT_H #define SLED_SIGSLOT_H +#pragma once +#include "sled/synchronization/mutex.h" #include #include #include // On our copy of sigslot.h, we set single threading as default. -#define SIGSLOT_DEFAULT_MT_POLICY single_threaded +#define SIGSLOT_DEFAULT_MT_POLICY multi_threaded_local -#if defined(SIGSLOT_PURE_ISO) \ - || (!defined(WEBRTC_WIN) && !defined(__GNUG__) \ - && !defined(SIGSLOT_USE_POSIX_THREADS)) +#if defined(SIGSLOT_PURE_ISO) || (!defined(WEBRTC_WIN) && !defined(__GNUG__) && !defined(SIGSLOT_USE_POSIX_THREADS)) #define _SIGSLOT_SINGLE_THREADED #elif defined(WEBRTC_WIN) #define _SIGSLOT_HAS_WIN32_THREADS @@ -167,10 +166,7 @@ class multi_threaded_local { public: multi_threaded_local() { InitializeCriticalSection(&m_critsec); } - multi_threaded_local(const multi_threaded_local &) - { - InitializeCriticalSection(&m_critsec); - } + multi_threaded_local(const multi_threaded_local &) { InitializeCriticalSection(&m_critsec); } ~multi_threaded_local() { DeleteCriticalSection(&m_critsec); } @@ -187,31 +183,51 @@ private: // The multi threading policies only get compiled in if they are enabled. class multi_threaded_global { public: - void lock() { pthread_mutex_lock(get_mutex()); } + void lock() + { + get_mutex()->Lock(); + // pthread_mutex_lock(get_mutex()); + } - void unlock() { pthread_mutex_unlock(get_mutex()); } + void unlock() + { + get_mutex()->Unlock(); + // pthread_mutex_unlock(get_mutex()); + } private: - static pthread_mutex_t *get_mutex(); + static sled::RecursiveMutex *get_mutex(); + // static sled::Mutex *get_mutex(); + // static pthread_mutex_t *get_mutex(); }; class multi_threaded_local { public: - multi_threaded_local() { pthread_mutex_init(&m_mutex, nullptr); } + // multi_threaded_local() { pthread_mutex_init(&m_mutex, nullptr); } + // + // multi_threaded_local(const multi_threaded_local &) + // { + // pthread_mutex_init(&m_mutex, nullptr); + // } + // + // ~multi_threaded_local() { pthread_mutex_destroy(&m_mutex); } - multi_threaded_local(const multi_threaded_local &) + void lock() { - pthread_mutex_init(&m_mutex, nullptr); + mutex_.Lock(); + // pthread_mutex_lock(&m_mutex); } - ~multi_threaded_local() { pthread_mutex_destroy(&m_mutex); } - - void lock() { pthread_mutex_lock(&m_mutex); } - - void unlock() { pthread_mutex_unlock(&m_mutex); } + void unlock() + { + mutex_.Unlock(); + // pthread_mutex_unlock(&m_mutex); + } private: - pthread_mutex_t m_mutex; + sled::RecursiveMutex mutex_; + // sled::Mutex mutex_; + // pthread_mutex_t m_mutex; }; #endif// _SIGSLOT_HAS_POSIX_THREADS @@ -229,10 +245,8 @@ class _signal_base_interface; class has_slots_interface { private: - typedef void (*signal_connect_t)(has_slots_interface *self, - _signal_base_interface *sender); - typedef void (*signal_disconnect_t)(has_slots_interface *self, - _signal_base_interface *sender); + typedef void (*signal_connect_t)(has_slots_interface *self, _signal_base_interface *sender); + typedef void (*signal_disconnect_t)(has_slots_interface *self, _signal_base_interface *sender); typedef void (*disconnect_all_t)(has_slots_interface *self); const signal_connect_t m_signal_connect; @@ -240,9 +254,7 @@ private: const disconnect_all_t m_disconnect_all; protected: - has_slots_interface(signal_connect_t conn, - signal_disconnect_t disc, - disconnect_all_t disc_all) + has_slots_interface(signal_connect_t conn, signal_disconnect_t disc, disconnect_all_t disc_all) : m_signal_connect(conn), m_signal_disconnect(disc), m_disconnect_all(disc_all) @@ -253,23 +265,16 @@ protected: virtual ~has_slots_interface() {} public: - void signal_connect(_signal_base_interface *sender) - { - m_signal_connect(this, sender); - } + void signal_connect(_signal_base_interface *sender) { m_signal_connect(this, sender); } - void signal_disconnect(_signal_base_interface *sender) - { - m_signal_disconnect(this, sender); - } + void signal_disconnect(_signal_base_interface *sender) { m_signal_disconnect(this, sender); } void disconnect_all() { m_disconnect_all(this); } }; class _signal_base_interface { private: - typedef void (*slot_disconnect_t)(_signal_base_interface *self, - has_slots_interface *pslot); + typedef void (*slot_disconnect_t)(_signal_base_interface *self, has_slots_interface *pslot); typedef void (*slot_duplicate_t)(_signal_base_interface *self, const has_slots_interface *poldslot, has_slots_interface *pnewslot); @@ -286,13 +291,9 @@ protected: ~_signal_base_interface() {} public: - void slot_disconnect(has_slots_interface *pslot) - { - m_slot_disconnect(this, pslot); - } + void slot_disconnect(has_slots_interface *pslot) { m_slot_disconnect(this, pslot); } - void slot_duplicate(const has_slots_interface *poldslot, - has_slots_interface *pnewslot) + void slot_duplicate(const has_slots_interface *poldslot, has_slots_interface *pnewslot) { m_slot_duplicate(this, poldslot, pnewslot); } @@ -323,15 +324,14 @@ public: _opaque_connection(DestT *pd, void (DestT::*pm)(Args...)) : pdest(pd) { typedef void (DestT::*pm_t)(Args...); - static_assert(sizeof(pm_t) <= sizeof(pmethod), - "Size of slot function pointer too large."); + static_assert(sizeof(pm_t) <= sizeof(pmethod), "Size of slot function pointer too large."); std::memcpy(pmethod, &pm, sizeof(pm_t)); typedef void (*em_t)(const _opaque_connection *self, Args...); union_caster caster2; caster2.from = &_opaque_connection::emitter; - pemit = caster2.to; + pemit = caster2.to; } has_slots_interface *getdest() const { return pdest; } @@ -339,7 +339,7 @@ public: _opaque_connection duplicate(has_slots_interface *newtarget) const { _opaque_connection res = *this; - res.pdest = newtarget; + res.pdest = newtarget; return res; } @@ -360,8 +360,7 @@ private: { typedef void (DestT::*pm_t)(Args...); pm_t pm; - static_assert(sizeof(pm_t) <= sizeof(pmethod), - "Size of slot function pointer too large."); + static_assert(sizeof(pm_t) <= sizeof(pmethod), "Size of slot function pointer too large."); std::memcpy(&pm, self->pmethod, sizeof(pm_t)); (static_cast(self->pdest)->*(pm))(args...); } @@ -373,8 +372,7 @@ protected: typedef std::list<_opaque_connection> connections_list; _signal_base() - : _signal_base_interface(&_signal_base::do_slot_disconnect, - &_signal_base::do_slot_duplicate), + : _signal_base_interface(&_signal_base::do_slot_disconnect, &_signal_base::do_slot_duplicate), m_current_iterator(m_connected_slots.end()) {} @@ -385,8 +383,7 @@ private: public: _signal_base(const _signal_base &o) - : _signal_base_interface(&_signal_base::do_slot_disconnect, - &_signal_base::do_slot_duplicate), + : _signal_base_interface(&_signal_base::do_slot_disconnect, &_signal_base::do_slot_duplicate), m_current_iterator(m_connected_slots.end()) { lock_block lock(this); @@ -409,8 +406,7 @@ public: while (!m_connected_slots.empty()) { has_slots_interface *pdest = m_connected_slots.front().getdest(); m_connected_slots.pop_front(); - pdest->signal_disconnect( - static_cast<_signal_base_interface *>(this)); + pdest->signal_disconnect(static_cast<_signal_base_interface *>(this)); } // If disconnect_all is called while the signal is firing, advance the // current slot iterator to the end to avoid an invalidated iterator from @@ -422,7 +418,7 @@ public: bool connected(has_slots_interface *pclass) { lock_block lock(this); - connections_list::const_iterator it = m_connected_slots.begin(); + connections_list::const_iterator it = m_connected_slots.begin(); connections_list::const_iterator itEnd = m_connected_slots.end(); while (it != itEnd) { if (it->getdest() == pclass) return true; @@ -435,7 +431,7 @@ public: void disconnect(has_slots_interface *pclass) { lock_block lock(this); - connections_list::iterator it = m_connected_slots.begin(); + connections_list::iterator it = m_connected_slots.begin(); connections_list::iterator itEnd = m_connected_slots.end(); while (it != itEnd) { @@ -447,8 +443,7 @@ public: } else { m_connected_slots.erase(it); } - pclass->signal_disconnect( - static_cast<_signal_base_interface *>(this)); + pclass->signal_disconnect(static_cast<_signal_base_interface *>(this)); return; } ++it; @@ -456,12 +451,11 @@ public: } private: - static void do_slot_disconnect(_signal_base_interface *p, - has_slots_interface *pslot) + static void do_slot_disconnect(_signal_base_interface *p, has_slots_interface *pslot) { _signal_base *const self = static_cast<_signal_base *>(p); lock_block lock(self); - connections_list::iterator it = self->m_connected_slots.begin(); + connections_list::iterator it = self->m_connected_slots.begin(); connections_list::iterator itEnd = self->m_connected_slots.end(); while (it != itEnd) { @@ -472,8 +466,7 @@ private: // If we're currently using this iterator because the signal is firing, // advance it to avoid it being invalidated. if (self->m_current_iterator == it) { - self->m_current_iterator = - self->m_connected_slots.erase(it); + self->m_current_iterator = self->m_connected_slots.erase(it); } else { self->m_connected_slots.erase(it); } @@ -483,19 +476,16 @@ private: } } - static void do_slot_duplicate(_signal_base_interface *p, - const has_slots_interface *oldtarget, - has_slots_interface *newtarget) + static void + do_slot_duplicate(_signal_base_interface *p, const has_slots_interface *oldtarget, has_slots_interface *newtarget) { _signal_base *const self = static_cast<_signal_base *>(p); lock_block lock(self); - connections_list::iterator it = self->m_connected_slots.begin(); + connections_list::iterator it = self->m_connected_slots.begin(); connections_list::iterator itEnd = self->m_connected_slots.end(); while (it != itEnd) { - if (it->getdest() == oldtarget) { - self->m_connected_slots.push_back(it->duplicate(newtarget)); - } + if (it->getdest() == oldtarget) { self->m_connected_slots.push_back(it->duplicate(newtarget)); } ++it; } @@ -540,16 +530,14 @@ public: private: has_slots &operator=(has_slots const &); - static void do_signal_connect(has_slots_interface *p, - _signal_base_interface *sender) + static void do_signal_connect(has_slots_interface *p, _signal_base_interface *sender) { has_slots *const self = static_cast(p); lock_block lock(self); self->m_senders.insert(sender); } - static void do_signal_disconnect(has_slots_interface *p, - _signal_base_interface *sender) + static void do_signal_disconnect(has_slots_interface *p, _signal_base_interface *sender) { has_slots *const self = static_cast(p); lock_block lock(self); @@ -563,7 +551,7 @@ private: while (!self->m_senders.empty()) { std::set<_signal_base_interface *> senders; senders.swap(self->m_senders); - const_iterator it = senders.begin(); + const_iterator it = senders.begin(); const_iterator itEnd = senders.end(); while (it != itEnd) { @@ -627,22 +615,13 @@ using signal0 = signal_with_thread_policy; template using signal1 = signal_with_thread_policy; -template +template using signal2 = signal_with_thread_policy; -template +template using signal3 = signal_with_thread_policy; -template +template using signal4 = signal_with_thread_policy; template -using signal7 = - signal_with_thread_policy; +using signal7 = signal_with_thread_policy; template -using signal8 = - signal_with_thread_policy; +using signal8 = signal_with_thread_policy; }// namespace sigslot -#endif // SLED_SIGSLOT_H +#endif// SLED_SIGSLOT_H diff --git a/src/sled/sigslot_test.cc b/src/sled/sigslot_test.cc new file mode 100644 index 0000000..8efa501 --- /dev/null +++ b/src/sled/sigslot_test.cc @@ -0,0 +1,16 @@ +#include + +struct DeleteSelf : public sigslot::has_slots<> { + void DeleteThis() { + delete this; + } +}; + +TEST_SUITE("sigslot") { + TEST_CASE("delete this") { + DeleteSelf* d = new DeleteSelf(); + sigslot::signal0<> sig; + sig.connect(d, &DeleteSelf::DeleteThis); + sig.emit(); + } +} diff --git a/src/sled/sled.h b/src/sled/sled.h index b7f67ce..515a236 100644 --- a/src/sled/sled.h +++ b/src/sled/sled.h @@ -9,6 +9,9 @@ namespace async {} #include "inja.hpp" #include "rx.h" +// event_bus +#include "sled/event_bus/event_bus.h" + // filesystem #include "sled/filesystem/path.h" #include "sled/filesystem/temporary_file.h" diff --git a/src/sled/strings/base64_bench.cc b/src/sled/strings/base64_bench.cc index 27654c6..d273c95 100644 --- a/src/sled/strings/base64_bench.cc +++ b/src/sled/strings/base64_bench.cc @@ -38,6 +38,6 @@ Base64Decode(picobench::state &state) (void) sled::Base64::Decode(base64_input); } } - +PICOBENCH_SUITE("Base64"); PICOBENCH(Base64Decode); PICOBENCH(Base64Encode); diff --git a/src/sled/synchronization/mutex.h b/src/sled/synchronization/mutex.h index 1b029a6..c9e1a06 100644 --- a/src/sled/synchronization/mutex.h +++ b/src/sled/synchronization/mutex.h @@ -146,46 +146,90 @@ private: marl::ConditionVariable cv_; }; -// class ConditionVariable final { -// public: -// static constexpr TimeDelta kForever = TimeDelta::PlusInfinity(); -// ConditionVariable() = default; -// ConditionVariable(const ConditionVariable &) = delete; -// ConditionVariable &operator=(const ConditionVariable &) = delete; -// -// template -// inline bool Wait(LockGuard &guard, Predicate pred) -// { -// std::unique_lock lock(guard.mutex_->impl_, std::adopt_lock); -// cv_.wait(lock, pred); -// return true; -// } -// -// template -// inline bool -// WaitFor(LockGuard &guard, TimeDelta timeout, Predicate pred) -// { -// std::unique_lock lock(guard.mutex_->impl_, std::adopt_lock); -// if (timeout == kForever) { -// cv_.wait(lock, pred); -// return true; -// } else { -// return cv_.wait_for(lock, std::chrono::milliseconds(timeout.ms()), -// pred); -// } -// } -// -// // template -// // bool WaitUntil(Mutex *mutex, TimeDelta timeout, Predicate pred) -// // {} -// -// inline void NotifyOne() { cv_.notify_one(); } -// -// inline void NotifyAll() { cv_.notify_all(); } -// -// private: -// std::condition_variable cv_; -// }; +class SCOPED_CAPABILITY SharedMutex final { +public: + enum class Mode { + kReaderPriority, + kWriterPriority, + }; + + inline SharedMutex(Mode mode = SharedMutex::Mode::kWriterPriority) : mode_(mode) {} + + inline void Lock() SLED_EXCLUSIVE_LOCK_FUNCTION() + { + wait_w_count_.fetch_add(1); + + sled::MutexLock lock(&mutex_); + if (Mode::kReaderPriority == mode_) { + // 读取优先,必须在没有任何读取的消费者的情况下才能持有锁 + cv_.Wait(lock, [this] { return r_count_ == 0 && w_count_ == 0 && wait_r_count_.load() == 0; }); + w_count_++; + } else { + // 写入优先,只要没有持有读锁的消费者,就可以加锁 + cv_.Wait(lock, [this] { return r_count_ == 0 && w_count_ == 0; }); + w_count_++; + cv_.Wait(lock, [this] { return r_count_ == 0; }); + } + wait_w_count_.fetch_sub(1); + } + + inline void Unlock() SLED_UNLOCK_FUNCTION() + { + sled::MutexLock lock(&mutex_); + w_count_--; + if (w_count_ == 0) { cv_.NotifyAll(); } + } + + inline void LockShared() SLED_SHARED_LOCK_FUNCTION() + { + wait_r_count_.fetch_add(1); + sled::MutexLock lock(&mutex_); + if (Mode::kReaderPriority == mode_) { + cv_.Wait(lock, [this] { return w_count_ == 0; }); + r_count_++; + } else { + cv_.Wait(lock, [this] { return w_count_ == 0 && wait_w_count_.load() == 0; }); + r_count_++; + } + wait_r_count_.fetch_sub(1); + } + + inline void UnlockShared() SLED_UNLOCK_FUNCTION() + { + sled::MutexLock lock(&mutex_); + r_count_--; + if (r_count_ == 0) { cv_.NotifyAll(); } + } + +private: + const Mode mode_; + sled::Mutex mutex_; + sled::ConditionVariable cv_; + int r_count_{0}; + int w_count_{0}; + std::atomic wait_r_count_{0}; + std::atomic wait_w_count_{0}; +}; + +class SharedMutexReadLock final { +public: + explicit SharedMutexReadLock(SharedMutex *mutex) : mutex_(mutex) { mutex_->LockShared(); } + + ~SharedMutexReadLock() { mutex_->UnlockShared(); } + +private: + SharedMutex *mutex_; +}; + +class SharedMutexWriteLock final { +public: + explicit SharedMutexWriteLock(SharedMutex *mutex) : mutex_(mutex) { mutex_->Lock(); } + + ~SharedMutexWriteLock() { mutex_->Unlock(); } + +private: + SharedMutex *mutex_; +}; }// namespace sled diff --git a/src/sled/system/thread_pool_bench.cc b/src/sled/system/thread_pool_bench.cc index 0bd6697..1f769af 100644 --- a/src/sled/system/thread_pool_bench.cc +++ b/src/sled/system/thread_pool_bench.cc @@ -13,4 +13,5 @@ ThreadPoolBench(picobench::state &state) } // BENCHMARK(ThreadPoolBench)->RangeMultiplier(10)->Range(10, 10000); +PICOBENCH_SUITE("TheadPool"); PICOBENCH(ThreadPoolBench); diff --git a/src/sled/system_time_bench.cc b/src/sled/system_time_bench.cc index 245dd91..d8388bd 100644 --- a/src/sled/system_time_bench.cc +++ b/src/sled/system_time_bench.cc @@ -7,4 +7,5 @@ SystemTimeNanos(picobench::state &state) for (auto _ : state) { (void) sled::SystemTimeNanos(); } } +PICOBENCH_SUITE("SystemTime"); PICOBENCH(SystemTimeNanos); diff --git a/src/sled/testing/doctest.h b/src/sled/testing/doctest.h index a31c431..95a4118 100644 --- a/src/sled/testing/doctest.h +++ b/src/sled/testing/doctest.h @@ -57,6 +57,22 @@ #define DOCTEST_TOSTR_IMPL(x) #x #define DOCTEST_TOSTR(x) DOCTEST_TOSTR_IMPL(x) +#ifdef REQUIRE +#undef REQUIRE +#endif + +#ifdef CHECK +#undef CHECK +#endif + +#ifdef CHECK_EQ +#undef CHECK_EQ +#endif + +#ifdef CHECK_NE +#undef CHECK_NE +#endif + #define DOCTEST_VERSION_STR \ DOCTEST_TOSTR(DOCTEST_VERSION_MAJOR) "." \ DOCTEST_TOSTR(DOCTEST_VERSION_MINOR) "." \ diff --git a/src/sled/timer/task_queue_timeout.cc b/src/sled/timer/task_queue_timeout.cc index 869b88f..4ddeb32 100644 --- a/src/sled/timer/task_queue_timeout.cc +++ b/src/sled/timer/task_queue_timeout.cc @@ -41,7 +41,7 @@ TaskQueueTimeoutFactory::TaskQueueTimeout::Start(DurationMs duration_ms, Timeout precision_, SafeTask(safety_flag_, [timeout_id, this]() { - LOGV("timer", "Timeout expired: {}", timeout_id); + LOGV("timer", "Timeout expired id={}", timeout_id); SLED_DCHECK_RUN_ON(&parent_.thread_checker_); SLED_DCHECK(posted_task_expiration_ != std::numeric_limits::max(), ""); posted_task_expiration_ = std::numeric_limits::max(); diff --git a/src/sled/uri_bench.cc b/src/sled/uri_bench.cc index dd15ebf..172d406 100644 --- a/src/sled/uri_bench.cc +++ b/src/sled/uri_bench.cc @@ -34,4 +34,5 @@ ParseURI(picobench::state &s) } } +PICOBENCH_SUITE("URI"); PICOBENCH(ParseURI);