diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index f51437f..a94ee45 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -6,6 +6,9 @@ concurrency: group: ${{ github.repository }}-${{ github.event.number || github.head_ref || github.sha }}-${{ github.workflow }} cancel-in-progress: true +env: + SEGFAULT_SIGNALS: all + jobs: build: runs-on: ${{ matrix.os }} @@ -129,6 +132,10 @@ jobs: libgrpc++-dev \ protobuf-compiler-grpc + # install libsegfault.so + sudo apt-get install libc6 || true + sudo apt-get install glibc-tools || true + - name: Install dependencies for Mac if: runner.os == 'macOS' run: | @@ -172,6 +179,10 @@ jobs: make -j`nproc` sudo make install + - name: Setup tmate session + if: false + uses: mxschmitt/action-tmate@v3 + - name: Build run: | export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib:/usr/local/lib/x86_64-linux-gnu @@ -211,12 +222,18 @@ jobs: echo "Run the etcd test ........................." ./build/bin/EtcdTest + echo "Run the etcd campaign test ........................." + ./build/bin/CampaignTest + echo "Run the etcd memory leak test ........................." ./build/bin/MemLeakTest echo "Run the etcd watcher test ........................." ./build/bin/WatcherTest + echo "Run the etcd memory leak in watcher test ........................." + ./build/bin/MemLeakWatcherTest + echo "Run the etcd keepalive test ........................." ./build/bin/KeepAliveTest diff --git a/CMakeLists.txt b/CMakeLists.txt index 00ffd2b..978e54f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -39,7 +39,7 @@ if(NOT "${ETCD_CMAKE_CXX_STANDARD}") set(ETCD_CMAKE_CXX_STANDARD ${CMAKE_CXX_STANDARD}) endif() endif() -message("Building etcd-cpp-apiv3 with C++${ETCD_CMAKE_CXX_STANDARD}") +message(STATUS "Building etcd-cpp-apiv3 with C++${ETCD_CMAKE_CXX_STANDARD}") # reference: https://gitlab.kitware.com/cmake/community/-/wikis/doc/cmake/RPATH-handling#always-full-rpath set(CMAKE_BUILD_WITH_INSTALL_RPATH FALSE) @@ -134,7 +134,7 @@ else() include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/FindGRPC.cmake) set(GRPC_LIBRARIES ${GPR_LIBRARY} ${GRPC_LIBRARY} ${GRPC_GRPC++_LIBRARY}) endif() -message("-- Found GRPC: ${GRPC_LIBRARIES} (found version: \"${gRPC_VERSION}\")") +message("-- Found GRPC: ${GRPC_LIBRARIES}, ${GRPC_CPP_PLUGIN} (found version: \"${gRPC_VERSION}\")") # avoid use the apt-get installed libgrpc-dev (version v1.13) on Ubuntu 18.04 if(gRPC_FOUND AND gRPC_VERSION VERSION_LESS "1.14") message(FATAL_ERROR "gRPC '${gRPC_VERSION}' is not supported, please install a newer gRPC library " diff --git a/etcd/SyncClient.hpp b/etcd/SyncClient.hpp index 9d2712a..9d61019 100644 --- a/etcd/SyncClient.hpp +++ b/etcd/SyncClient.hpp @@ -42,6 +42,7 @@ namespace etcdv3 { namespace detail { std::string string_plus_one(std::string const &value); + std::string resolve_etcd_endpoints(std::string const &default_endpoints); } } diff --git a/etcd/v3/Action.hpp b/etcd/v3/Action.hpp index b4b76a0..6f6d493 100644 --- a/etcd/v3/Action.hpp +++ b/etcd/v3/Action.hpp @@ -66,6 +66,8 @@ namespace etcdv3 public: Action(etcdv3::ActionParameters const ¶ms); Action(etcdv3::ActionParameters && params); + virtual ~Action(); + void waitForResponse(); const std::chrono::high_resolution_clock::time_point startTimepoint(); protected: @@ -83,6 +85,7 @@ namespace etcdv3 namespace detail { std::string string_plus_one(std::string const &value); + std::string resolve_etcd_endpoints(std::string const &default_endpoints); } } #endif diff --git a/etcd/v3/AsyncWatchAction.hpp b/etcd/v3/AsyncWatchAction.hpp index 3ab0ad2..9186bee 100644 --- a/etcd/v3/AsyncWatchAction.hpp +++ b/etcd/v3/AsyncWatchAction.hpp @@ -22,12 +22,13 @@ namespace etcdv3 AsyncWatchAction(etcdv3::ActionParameters && params); AsyncWatchResponse ParseResponse(); void waitForResponse(); - void waitForResponse(std::function callback); + void waitForResponse(std::function callback); void CancelWatch(); bool Cancelled() const; private: + int64_t watch_id = -1; WatchResponse reply; - std::unique_ptr> stream; + std::unique_ptr> stream; std::atomic_bool isCancelled; }; } diff --git a/etcd/v3/action_constants.hpp b/etcd/v3/action_constants.hpp index d1728a4..a5dcd19 100644 --- a/etcd/v3/action_constants.hpp +++ b/etcd/v3/action_constants.hpp @@ -34,10 +34,13 @@ namespace etcdv3 extern char const * KEEPALIVE_WRITE; extern char const * KEEPALIVE_READ; extern char const * KEEPALIVE_DONE; + extern char const * KEEPALIVE_FINISH; extern char const * WATCH_CREATE; extern char const * WATCH_WRITE; + extern char const * WATCH_WRITE_CANCEL; extern char const * WATCH_WRITES_DONE; + extern char const * WATCH_FINISH; extern char const * ELECTION_OBSERVE_CREATE; diff --git a/src/Watcher.cpp b/src/Watcher.cpp index 334be24..530d165 100644 --- a/src/Watcher.cpp +++ b/src/Watcher.cpp @@ -11,6 +11,12 @@ struct etcd::Watcher::EtcdServerStubs { void etcd::Watcher::EtcdServerStubsDeleter::operator()(etcd::Watcher::EtcdServerStubs *stubs) { if (stubs) { + if (stubs->watchServiceStub) { + stubs->watchServiceStub.reset(); + } + if (stubs->call) { + stubs->call.reset(); + } delete stubs; } } diff --git a/src/v3/Action.cpp b/src/v3/Action.cpp index aafb64d..0f3fedd 100644 --- a/src/v3/Action.cpp +++ b/src/v3/Action.cpp @@ -15,6 +15,13 @@ etcdv3::Action::Action(etcdv3::ActionParameters && params) this->InitAction(); } +etcdv3::Action::~Action() { + cq_.Shutdown(); + + // cancel on-the-fly calls + context.TryCancel(); +} + void etcdv3::Action::InitAction() { if (!parameters.auth_token.empty()) { // use `token` as the key, see: @@ -104,6 +111,10 @@ std::string etcdv3::detail::string_plus_one(std::string const &value) { return s; } } - return {etcdv3::NUL}; } + +std::string etcdv3::detail::resolve_etcd_endpoints(std::string const&default_endpoints) { + const char *ep = std::getenv("ETCD_ENDPOINTS"); + return ep ? ep : default_endpoints; +} diff --git a/src/v3/AsyncLeaseAction.cpp b/src/v3/AsyncLeaseAction.cpp index 8415b54..984d911 100644 --- a/src/v3/AsyncLeaseAction.cpp +++ b/src/v3/AsyncLeaseAction.cpp @@ -183,17 +183,23 @@ void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive() if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_DONE) { // ok } else { - std::cerr << "Failed to mark a lease keep-alive connection as DONE" << std::endl; + std::cerr << "Failed to mark a lease keep-alive connection as DONE: " + << context.debug_error_string() << std::endl; } grpc::Status status; - stream->Finish(&status, (void *)this); - if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)this) { + stream->Finish(&status, (void *)KEEPALIVE_FINISH); + if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)KEEPALIVE_FINISH) { // ok } else { - std::cerr << "Failed to finish a lease keep-alive connection" << std::endl; + std::cerr << "Failed to finish a lease keep-alive connection: " + << status.error_message() + << ", " << context.debug_error_string() << std::endl; } + // cancel on-the-fly calls + context.TryCancel(); + cq_.Shutdown(); } } diff --git a/src/v3/AsyncWatchAction.cpp b/src/v3/AsyncWatchAction.cpp index 597dc68..06e6ea2 100644 --- a/src/v3/AsyncWatchAction.cpp +++ b/src/v3/AsyncWatchAction.cpp @@ -2,13 +2,11 @@ #include "etcd/v3/action_constants.hpp" -using etcdserverpb::RangeRequest; -using etcdserverpb::RangeResponse; using etcdserverpb::WatchCreateRequest; etcdv3::AsyncWatchAction::AsyncWatchAction( etcdv3::ActionParameters && params) - : etcdv3::Action(std::move(params)) + : etcdv3::Action(std::move(params)) { isCancelled.store(false); stream = parameters.watch_stub->AsyncWatch(&context,&cq_,(void*)etcdv3::WATCH_CREATE); @@ -45,12 +43,13 @@ etcdv3::AsyncWatchAction::AsyncWatchAction( // wait "write" (WatchCreateRequest) success, and start to read the first reply if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::WATCH_WRITE) { stream->Read(&reply, (void*)this); + this->watch_id = reply.watch_id(); } else { throw std::runtime_error("failed to write WatchCreateRequest to server"); } } -void etcdv3::AsyncWatchAction::waitForResponse() +void etcdv3::AsyncWatchAction::waitForResponse() { void* got_tag; bool ok = false; @@ -61,47 +60,60 @@ void etcdv3::AsyncWatchAction::waitForResponse() { break; } - if(isCancelled.load()) { - break; + if(got_tag == (void *)etcdv3::WATCH_WRITE_CANCEL) { + stream->WritesDone((void*)etcdv3::WATCH_WRITES_DONE); + continue; } - if(got_tag == (void*)etcdv3::WATCH_WRITES_DONE) { - isCancelled.store(true); + if(got_tag == (void*)etcdv3::WATCH_WRITES_DONE) + { + grpc::Status status; + stream->Finish(&status, (void *)etcdv3::WATCH_FINISH); + continue; + } + if (got_tag == (void *)etcdv3::WATCH_FINISH) { + // shutdown cq_.Shutdown(); + // cancel on-the-fly calls + context.TryCancel(); break; } if(got_tag == (void*)this) // read tag { if (reply.canceled()) { - isCancelled.store(true); - cq_.Shutdown(); - break; + // cancel on-the-fly calls, but don't shutdown the completion queue as there + // are still a inflight call to finish + context.TryCancel(); + // cq_.Shutdown(); + continue; } - else if ((reply.created() && reply.header().revision() < parameters.revision) || + + // we stop watch under two conditions: + // + // 1. watch for a future revision, return immediately with empty events set + // 2. receive any effective events. + if ((reply.created() && reply.header().revision() < parameters.revision) || reply.events_size() > 0) { - // we stop watch under two conditions: - // - // 1. watch for a future revision, return immediately with empty events set - // 2. receive any effective events. - isCancelled.store(true); - - stream->WritesDone((void*)etcdv3::WATCH_WRITES_DONE); - - grpc::Status status; - stream->Finish(&status, (void *)this); - - cq_.Shutdown(); - // leave a warning if the response is too large and been fragmented if (reply.fragment()) { std::cerr << "WARN: The response hasn't been fully received and parsed" << std::endl; } - break; - } - else - { - // otherwise, start next round read-reply + + std::cout << "issue a watch cancel" << std::endl; + // cancel the watcher after receiving the good response + this->CancelWatch(); + + // start the next round to read finish messages, read into "&dummy" + // (use nullptr, as it won't be touched). + stream->Read(nullptr, (void*)etcdv3::WATCH_FINISH); + } else { + // start the next round to read reply, read into "&reply" stream->Read(&reply, (void*)this); } + continue; + } + if(isCancelled.load()) { + // invalid tag, and is cancelled + break; } } } @@ -109,12 +121,10 @@ void etcdv3::AsyncWatchAction::waitForResponse() void etcdv3::AsyncWatchAction::CancelWatch() { if (!isCancelled.exchange(true)) { - stream->WritesDone((void*)etcdv3::WATCH_WRITES_DONE); - - grpc::Status status; - stream->Finish(&status, (void *)this); - - cq_.Shutdown(); + WatchRequest cancel_req; + cancel_req.mutable_cancel_request()->set_watch_id(this->watch_id); + stream->Write(cancel_req, (void *)etcdv3::WATCH_WRITE_CANCEL); + isCancelled.store(true); } } @@ -122,10 +132,10 @@ bool etcdv3::AsyncWatchAction::Cancelled() const { return isCancelled.load(); } -void etcdv3::AsyncWatchAction::waitForResponse(std::function callback) +void etcdv3::AsyncWatchAction::waitForResponse(std::function callback) { void* got_tag; - bool ok = false; + bool ok = false; while(cq_.Next(&got_tag, &ok)) { @@ -133,39 +143,55 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::functionWritesDone((void*)etcdv3::WATCH_WRITES_DONE); + continue; } if(got_tag == (void*)etcdv3::WATCH_WRITES_DONE) { - isCancelled.store(true); + grpc::Status status; + stream->Finish(&status, (void *)etcdv3::WATCH_FINISH); + continue; + } + if (got_tag == (void *)etcdv3::WATCH_FINISH) { + // shutdown cq_.Shutdown(); + // cancel on-the-fly calls + context.TryCancel(); break; } - else if(got_tag == (void*)this) // read tag + if(got_tag == (void*)this) // read tag { if (reply.canceled()) { - isCancelled.store(true); - cq_.Shutdown(); if (reply.compact_revision() != 0) { - auto resp = etcd::Response(grpc::StatusCode::OUT_OF_RANGE /* error code */, - "required revision has been compacted"); - resp._compact_revision = reply.compact_revision(); - callback(resp); + auto resp = ParseResponse(); + auto duration = std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - start_timepoint); + callback(etcd::Response(resp, duration)); } - break; + // cancel on-the-fly calls, but don't shutdown the completion queue as there + // are still a inflight call to finish + context.TryCancel(); + // cq_.Shutdown(); + continue; } + + // for the callback case, we don't invoke callback immediately if watching + // for a future revision, we wait until there are some effective events. if(reply.events_size()) { - // for the callback case, we don't stop immediately if watching for a future revison, - // we wait until there are some expected events. auto resp = ParseResponse(); auto duration = std::chrono::duration_cast( std::chrono::high_resolution_clock::now() - start_timepoint); - callback(etcd::Response(resp, duration)); + callback(etcd::Response(resp, duration)); start_timepoint = std::chrono::high_resolution_clock::now(); } stream->Read(&reply, (void*)this); + continue; + } + if(isCancelled.load()) { + // invalid tag, and is cancelled + break; } } } @@ -181,7 +207,7 @@ etcdv3::AsyncWatchResponse etcdv3::AsyncWatchAction::ParseResponse() watch_resp.set_error_message(status.error_message()); } else - { + { watch_resp.ParseResponse(reply); } return watch_resp; diff --git a/src/v3/action_constants.cpp b/src/v3/action_constants.cpp index a212ffe..ff68cc4 100644 --- a/src/v3/action_constants.cpp +++ b/src/v3/action_constants.cpp @@ -33,10 +33,13 @@ char const * etcdv3::KEEPALIVE_CREATE = "keepalive create"; char const * etcdv3::KEEPALIVE_WRITE = "keepalive write"; char const * etcdv3::KEEPALIVE_READ = "keepalive read"; char const * etcdv3::KEEPALIVE_DONE = "keepalive done"; +char const * etcdv3::KEEPALIVE_FINISH = "keepalive finish"; char const * etcdv3::WATCH_CREATE = "watch create"; char const * etcdv3::WATCH_WRITE = "watch write"; +char const * etcdv3::WATCH_WRITE_CANCEL = "watch write cancel"; char const * etcdv3::WATCH_WRITES_DONE = "watch writes done"; +char const * etcdv3::WATCH_FINISH = "watch finish"; char const * etcdv3::ELECTION_OBSERVE_CREATE = "observe create"; diff --git a/tst/AuthTest.cpp b/tst/AuthTest.cpp index d7d964a..5a4b91d 100644 --- a/tst/AuthTest.cpp +++ b/tst/AuthTest.cpp @@ -5,7 +5,7 @@ #include "etcd/Client.hpp" -static const std::string etcd_url("http://127.0.0.1:2379"); +static const std::string etcd_url = etcdv3::detail::resolve_etcd_endpoints("http://127.0.0.1:2379"); TEST_CASE("setup with auth") { diff --git a/tst/CMakeLists.txt b/tst/CMakeLists.txt index 3cca3f0..a8cfe9f 100644 --- a/tst/CMakeLists.txt +++ b/tst/CMakeLists.txt @@ -23,7 +23,15 @@ foreach(testfile ${TEST_FILES}) target_include_directories(${test_name} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/../proto/gen) target_include_directories(${test_name} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/../proto/gen/proto) - target_link_libraries(${test_name} etcd-cpp-api) + target_link_libraries(${test_name} PRIVATE etcd-cpp-api) + + if(UNIX AND NOT APPLE) + if(CMAKE_VERSION VERSION_LESS 3.13) + target_link_libraries(${test_name} PRIVATE -Wl,--no-as-needed -lSegFault -Wl,--as-needed) + else() + target_link_options(${test_name} PRIVATE -Wl,--no-as-needed -lSegFault -Wl,--as-needed) + endif() + endif() add_dependencies(etcd_tests ${test_name}) endforeach() diff --git a/tst/CampaignTest.cpp b/tst/CampaignTest.cpp new file mode 100644 index 0000000..0781073 --- /dev/null +++ b/tst/CampaignTest.cpp @@ -0,0 +1,39 @@ +#define CATCH_CONFIG_MAIN +#include + +#include +#include + +#include "etcd/Client.hpp" +#include "etcd/KeepAlive.hpp" +#include "etcd/Response.hpp" +#include "etcd/SyncClient.hpp" +#include "etcd/Value.hpp" + +static std::string etcd_uri = etcdv3::detail::resolve_etcd_endpoints( + "http://127.0.0.1:2379,http://127.0.0.1:2479,http://127.0.0.1:2579"); + +TEST_CASE("campaign and loadership using keepalive") { + etcd::Client etcd(etcd_uri); + auto keepalive = etcd.leasekeepalive(5).get(); + auto lease_id = keepalive->Lease(); + + std::cout << lease_id << std::endl; + std::string value = std::string("192.168.1.6:1880"); + auto resp1 = etcd.campaign("/leader", lease_id, value).get(); + if (0 == resp1.error_code()) { + std::cout << "became leader: " << resp1.index() << std::endl; + } else { + std::cout << "error code: " << resp1.error_code() + << "error message: " << resp1.error_message() << std::endl; + assert(false); + } + + std::cout << "finish campaign" << std::endl; + + auto resp2 = etcd.leader("/leader").get(); + std::cout << resp2.value().as_string() << std::endl; + std::cout << resp2.value().key() << std::endl; + + std::cout << "finish leader" << std::endl; +} diff --git a/tst/ElectionTest.cpp b/tst/ElectionTest.cpp index 5c91d84..e8e3915 100644 --- a/tst/ElectionTest.cpp +++ b/tst/ElectionTest.cpp @@ -8,7 +8,7 @@ #include "etcd/Client.hpp" #include "etcd/KeepAlive.hpp" -static const std::string etcd_url("http://127.0.0.1:2379"); +static const std::string etcd_url = etcdv3::detail::resolve_etcd_endpoints("http://127.0.0.1:2379"); TEST_CASE("setup") { diff --git a/tst/EtcdSyncTest.cpp b/tst/EtcdSyncTest.cpp index 0b7abe6..84118ef 100644 --- a/tst/EtcdSyncTest.cpp +++ b/tst/EtcdSyncTest.cpp @@ -5,7 +5,7 @@ #include "etcd/SyncClient.hpp" -static const std::string etcd_url("http://127.0.0.1:2379"); +static const std::string etcd_url = etcdv3::detail::resolve_etcd_endpoints("http://127.0.0.1:2379"); TEST_CASE("sync operations") { diff --git a/tst/EtcdTest.cpp b/tst/EtcdTest.cpp index d64c43f..ec4fbd6 100644 --- a/tst/EtcdTest.cpp +++ b/tst/EtcdTest.cpp @@ -7,7 +7,7 @@ #include "etcd/Client.hpp" -static const std::string etcd_url("http://127.0.0.1:2379"); +static const std::string etcd_url = etcdv3::detail::resolve_etcd_endpoints("http://127.0.0.1:2379"); TEST_CASE("setup") { diff --git a/tst/ForkTest.cpp b/tst/ForkTest.cpp index 1cf0dbe..2f01915 100644 --- a/tst/ForkTest.cpp +++ b/tst/ForkTest.cpp @@ -12,7 +12,7 @@ #include "etcd/Client.hpp" #include "etcd/KeepAlive.hpp" -static const std::string etcd_url("http://127.0.0.1:2379"); +static const std::string etcd_url = etcdv3::detail::resolve_etcd_endpoints("http://127.0.0.1:2379"); TEST_CASE("fork: set in child and get from self") { diff --git a/tst/KeepAliveTest.cpp b/tst/KeepAliveTest.cpp index 0330d61..e405134 100644 --- a/tst/KeepAliveTest.cpp +++ b/tst/KeepAliveTest.cpp @@ -10,7 +10,7 @@ #include "etcd/SyncClient.hpp" #include "etcd/Value.hpp" -static std::string etcd_uri( +static std::string etcd_uri = etcdv3::detail::resolve_etcd_endpoints( "http://127.0.0.1:2379,http://127.0.0.1:2479,http://127.0.0.1:2579"); TEST_CASE("keepalive revoke and check if alive") { diff --git a/tst/LockTest.cpp b/tst/LockTest.cpp index cf3f550..5a834cc 100644 --- a/tst/LockTest.cpp +++ b/tst/LockTest.cpp @@ -10,7 +10,7 @@ #include "etcd/Client.hpp" #include "etcd/KeepAlive.hpp" -static const std::string etcd_url("http://127.0.0.1:2379"); +static const std::string etcd_url = etcdv3::detail::resolve_etcd_endpoints("http://127.0.0.1:2379"); TEST_CASE("lock and unlock") { diff --git a/tst/MemLeakTest.cpp b/tst/MemLeakTest.cpp index 0cf68d1..05a99dc 100644 --- a/tst/MemLeakTest.cpp +++ b/tst/MemLeakTest.cpp @@ -6,7 +6,7 @@ #include "etcd/Client.hpp" #include "etcd/KeepAlive.hpp" -static const std::string etcd_url("http://127.0.0.1:2379"); +static const std::string etcd_url = etcdv3::detail::resolve_etcd_endpoints("http://127.0.0.1:2379"); class DistributedLock { public: @@ -75,7 +75,7 @@ DistributedLock::~DistributedLock() noexcept { int main() { int i = 0, t = 0; - while(t < 10 /* update this value to make it run for longer */) { + while(t < 100 /* update this value to make it run for longer */) { { DistributedLock lock(std::to_string(i), 0); if(!lock.lock_acquired()) { @@ -88,6 +88,8 @@ int main() { if (i == 10) { i = 0; } - std::cout << "round: i = " << i << ", t = " << t << std::endl; + if (t % 10 == 0) { + std::cout << "round: i = " << i << ", t = " << t << std::endl; + } } } diff --git a/tst/MemLeakWatcherTest.cpp b/tst/MemLeakWatcherTest.cpp new file mode 100644 index 0000000..3f1f5a6 --- /dev/null +++ b/tst/MemLeakWatcherTest.cpp @@ -0,0 +1,60 @@ +#define CATCH_CONFIG_MAIN +#include + +#include +#include + +#include "etcd/Client.hpp" +#include "etcd/KeepAlive.hpp" +#include "etcd/Watcher.hpp" + +static const std::string etcd_url = etcdv3::detail::resolve_etcd_endpoints("http://127.0.0.1:2379"); + +static std::atomic_int watcher_called; + +void print_response(etcd::Response const & resp) +{ + watcher_called.fetch_add(1); +} + +/** + * @brief emulate the behavior of creating watcher many times: + * + * 1. create a watcher + * 2. change a value + * 3. cancel the watcher + */ +void watch_once(etcd::Client & client, std::unique_ptr &watcher, const size_t round) { + const std::string my_prefix = "/test"; + const std::string my_key = my_prefix + "/foo"; + watcher.reset(new etcd::Watcher(client, my_prefix, print_response, true)); + + int k = watcher_called.load(); + client.set(my_key, "bar-" + std::to_string(round)).wait(); + while (true) { + if (watcher_called.load() > k) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + // cancel the watcher + watcher->Cancel(); +} + +TEST_CASE("watch shouldn't leak memory") +{ + watcher_called.store(0); + + // issue some changes to see if the watcher works + etcd::Client client(etcd_url); + std::unique_ptr watcher; + for (int round = 0; round < 10 /* update this value to make it run for longer */; ++round) { + if (round % 50 == 0) { + std::cout << "starting round " << round << std::endl; + } + watch_once(client, watcher, round); + } + + std::cout << "watcher been called for " << watcher_called.load() << " times" << std::endl; +} diff --git a/tst/RewatchTest.cpp b/tst/RewatchTest.cpp index 0ff7c92..a2a7ffb 100644 --- a/tst/RewatchTest.cpp +++ b/tst/RewatchTest.cpp @@ -8,7 +8,7 @@ #include "etcd/SyncClient.hpp" #include "etcd/Watcher.hpp" -static const std::string etcd_url("http://127.0.0.1:24799"); +static const std::string etcd_url = etcdv3::detail::resolve_etcd_endpoints("http://127.0.0.1:2379"); static int watcher_called = 0; diff --git a/tst/SecurityChannelTest.cpp b/tst/SecurityChannelTest.cpp index b046b02..25d7045 100644 --- a/tst/SecurityChannelTest.cpp +++ b/tst/SecurityChannelTest.cpp @@ -9,7 +9,7 @@ static std::string ca = "security-config/certs/ca.crt"; static std::string cert = "security-config/certs/etcd0.example.com.crt"; static std::string key = "security-config/private/etcd0.example.com.key"; -static const std::string etcd_url("https://127.0.0.1:2379"); +static const std::string etcd_url = etcdv3::detail::resolve_etcd_endpoints("https://127.0.0.1:2379"); TEST_CASE("setup with auth") { diff --git a/tst/TransactionTest.cpp b/tst/TransactionTest.cpp index 911a073..4a1af0c 100644 --- a/tst/TransactionTest.cpp +++ b/tst/TransactionTest.cpp @@ -8,7 +8,7 @@ #include "etcd/Client.hpp" #include "etcd/v3/Transaction.hpp" -static const std::string etcd_url("http://127.0.0.1:2379"); +static const std::string etcd_url = etcdv3::detail::resolve_etcd_endpoints("http://127.0.0.1:2379"); TEST_CASE("setup") { diff --git a/tst/WatcherTest.cpp b/tst/WatcherTest.cpp index b4fd113..0096303 100644 --- a/tst/WatcherTest.cpp +++ b/tst/WatcherTest.cpp @@ -7,7 +7,7 @@ #include "etcd/Watcher.hpp" #include "etcd/SyncClient.hpp" -static const std::string etcd_url("http://127.0.0.1:2379"); +static const std::string etcd_url = etcdv3::detail::resolve_etcd_endpoints("http://127.0.0.1:2379"); static int watcher_called = 0;