diff --git a/etcd/v3/AsyncWatchAction.hpp b/etcd/v3/AsyncWatchAction.hpp index 8dcb189..f356653 100644 --- a/etcd/v3/AsyncWatchAction.hpp +++ b/etcd/v3/AsyncWatchAction.hpp @@ -1,18 +1,18 @@ #ifndef __ASYNC_WATCHACTION_HPP__ #define __ASYNC_WATCHACTION_HPP__ +#include + #include #include "proto/rpc.grpc.pb.h" #include "etcd/v3/Action.hpp" #include "etcd/v3/AsyncWatchResponse.hpp" #include "etcd/Response.hpp" - using grpc::ClientAsyncReaderWriter; using etcdserverpb::WatchRequest; using etcdserverpb::WatchResponse; - namespace etcdv3 { class AsyncWatchAction : public etcdv3::Action @@ -29,6 +29,7 @@ namespace etcdv3 WatchResponse reply; std::unique_ptr> stream; bool isCancelled; + std::mutex protect_is_cancalled; }; } diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 04839e5..cc6bfa6 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -4,7 +4,7 @@ file(GLOB_RECURSE CPP_CLIENT_SRC "${CMAKE_CURRENT_SOURCE_DIR}/**/*.cpp") set_source_files_properties(${PROTOBUF_GENERATES} PROPERTIES GENERATED TRUE) -add_library(etcd-cpp-api SHARED ${CPP_CLIENT_SRC} ${PROTOBUF_GENERATES}) +add_library(etcd-cpp-api ${CPP_CLIENT_SRC} ${PROTOBUF_GENERATES}) add_dependencies(etcd-cpp-api protobuf_generates) set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11) diff --git a/src/v3/AsyncWatchAction.cpp b/src/v3/AsyncWatchAction.cpp index 15ee5d1..bf5b51b 100644 --- a/src/v3/AsyncWatchAction.cpp +++ b/src/v3/AsyncWatchAction.cpp @@ -87,10 +87,12 @@ void etcdv3::AsyncWatchAction::waitForResponse() void etcdv3::AsyncWatchAction::CancelWatch() { + std::lock_guard scope_lock(this->protect_is_cancalled); if(isCancelled == false) { stream->WritesDone((void*)"writes done"); } + isCancelled = true; } bool etcdv3::AsyncWatchAction::Cancelled() const { diff --git a/tst/WatcherTest.cpp b/tst/WatcherTest.cpp index 4e6dea1..69d84ec 100644 --- a/tst/WatcherTest.cpp +++ b/tst/WatcherTest.cpp @@ -1,6 +1,8 @@ #define CATCH_CONFIG_MAIN #include +#include + #include "etcd/Watcher.hpp" #include "etcd/SyncClient.hpp" @@ -70,6 +72,21 @@ TEST_CASE("watch should exit normally") watcher.Cancel(); } +TEST_CASE("watch should can be cancelled repeatedly") +{ + // cancal immediately after start watch. + etcd::Watcher watcher(etcd_uri, "/test", printResponse, true); + std::vector threads(10); + for (size_t i = 0; i < 10; ++i) { + threads[i] = std::thread([&]() { + watcher.Cancel(); + }); + } + for (size_t i = 0; i < 10; ++i) { + threads[i].join(); + } +} + // TEST_CASE("request cancellation") // { // etcd::Client etcd(etcd_uri);