From 95fa7789c26179df030bb7bd477f871a71de81ce Mon Sep 17 00:00:00 2001 From: Tao He Date: Fri, 8 Apr 2022 19:40:59 +0800 Subject: [PATCH] Revisit the watcher's reconnect functionality. Address #73, #76, #117 and #118. Signed-off-by: Tao He --- README.md | 48 +++++++++++------ etcd/Watcher.hpp | 7 ++- etcd/v3/AsyncWatchAction.hpp | 1 - src/Watcher.cpp | 14 +++-- src/v3/AsyncWatchAction.cpp | 1 - tst/RewatchTest.cpp | 100 +++++++++++++++++++++++++++++++++++ tst/WatcherTest.cpp | 3 +- 7 files changed, 149 insertions(+), 25 deletions(-) create mode 100644 tst/RewatchTest.cpp diff --git a/README.md b/README.md index 6633cce..2e3a2f7 100644 --- a/README.md +++ b/README.md @@ -570,41 +570,55 @@ to decide if a watcher should re-connect to the etcd server. Here is an example how users can make a watcher re-connect to server after disconnected. ```c++ -void wait_for_connection(Client &client) { +// wait the client ready +void wait_for_connection(etcd::Client &client) { // wait until the client connects to etcd server - // `head` API is only available in version later than 0.2.1 while (!client.head().get().is_ok()) { sleep(1); } } -void initialize_watcher(const std::string &endpoints, - const std::string &prefix, - std::function callback, - std::shared_ptr &watcher) { - Client client(endpoints); - wait_for_connection(client); - watcher->reset(new etcd::Watcher(client, prefix, callback)); - watcher->Wait([endpoints, prefix, callback, - watcher_ref /* keep the shared_ptr alive */, &watcher](bool cancelled) { - if (cancelled) { - return; - } - initialize_watcher(endpoints, prefix, callback, watcher); - }); +// a loop for initialized a watcher with auto-restart capability +void initialize_watcher(const std::string& endpoints, + const std::string& prefix, + std::function callback, + std::shared_ptr& watcher) { + etcd::Client client(endpoints); + wait_for_connection(client); + + // Check if the failed one has been cancelled first + if (watcher && watcher->Cancelled()) { + std::cout << "watcher's reconnect loop been cancelled" << std::endl; + return; + } + watcher.reset(new etcd::Watcher(client, prefix, callback, true)); + + // Note that lambda requires `mutable`qualifier. + watcher->Wait([endpoints, prefix, callback, + /* By reference for renewing */ &watcher](bool cancelled) mutable { + if (cancelled) { + std::cout << "watcher's reconnect loop stopped as been cancelled" << std::endl; + return; + } + initialize_watcher(endpoints, prefix, callback, watcher); + }); } ``` +The functionalities can be used as + ```c++ std::string endpoints = "http://127.0.0.1:2379"; std::function callback = printResponse; const std::string prefix = "/test/key"; // the watcher initialized in this way will auto re-connect to etcd -std::unique_ptr watcher; +std::shared_ptr watcher; initialize_watcher(endpoints, prefix, callback, watcher); ``` +For a complete runnable example, see also [./tst/RewatchTest.cpp](./tst/RewatchTest.cpp). + ### Requesting for lease Users can request for lease which is governed by a time-to-live(TTL) value given by the user. diff --git a/etcd/Watcher.hpp b/etcd/Watcher.hpp index 366f6ed..aa2a222 100644 --- a/etcd/Watcher.hpp +++ b/etcd/Watcher.hpp @@ -76,7 +76,12 @@ namespace etcd /** * Stop the watching action. */ - void Cancel(); + bool Cancel(); + + /** + * Whether the watcher has been cancelled. + */ + bool Cancelled() const; ~Watcher(); diff --git a/etcd/v3/AsyncWatchAction.hpp b/etcd/v3/AsyncWatchAction.hpp index 5f4abc7..cd18489 100644 --- a/etcd/v3/AsyncWatchAction.hpp +++ b/etcd/v3/AsyncWatchAction.hpp @@ -29,7 +29,6 @@ namespace etcdv3 WatchResponse reply; std::unique_ptr> stream; std::atomic_bool isCancelled; - std::mutex protect_is_cancalled; }; } diff --git a/src/Watcher.cpp b/src/Watcher.cpp index d0dbf54..9c6a8f0 100644 --- a/src/Watcher.cpp +++ b/src/Watcher.cpp @@ -118,10 +118,15 @@ void etcd::Watcher::Wait(std::function callback) } } -void etcd::Watcher::Cancel() +bool etcd::Watcher::Cancel() { stubs->call->CancelWatch(); - this->Wait(); + return this->Wait(); +} + +bool etcd::Watcher::Cancelled() const +{ + return cancelled.load() || stubs->call->Cancelled(); } void etcd::Watcher::doWatch(std::string const & key, @@ -144,7 +149,10 @@ void etcd::Watcher::doWatch(std::string const & key, task_ = std::thread([this, callback]() { stubs->call->waitForResponse(callback); if (wait_callback != nullptr) { - wait_callback(stubs->call->Cancelled()); + // issue the callback in another thread to avoid deadlock, is ok to detach the pplx::task + pplx::task([this]() -> void { + wait_callback(stubs->call->Cancelled()); + }); } }); cancelled.store(false); diff --git a/src/v3/AsyncWatchAction.cpp b/src/v3/AsyncWatchAction.cpp index 35d8f73..78f9977 100644 --- a/src/v3/AsyncWatchAction.cpp +++ b/src/v3/AsyncWatchAction.cpp @@ -108,7 +108,6 @@ void etcdv3::AsyncWatchAction::waitForResponse() void etcdv3::AsyncWatchAction::CancelWatch() { - std::lock_guard scope_lock(this->protect_is_cancalled); if (!isCancelled.exchange(true)) { stream->WritesDone((void*)etcdv3::WATCH_WRITES_DONE); diff --git a/tst/RewatchTest.cpp b/tst/RewatchTest.cpp new file mode 100644 index 0000000..c0ac949 --- /dev/null +++ b/tst/RewatchTest.cpp @@ -0,0 +1,100 @@ +#define CATCH_CONFIG_MAIN +#include + +#include +#include + +#include "etcd/Watcher.hpp" +#include "etcd/SyncClient.hpp" + +static std::string etcd_uri("http://127.0.0.1:2379"); +static int watcher_called = 0; + +void print_response(etcd::Response const & resp) +{ + ++watcher_called; + std::cout << "print response called" << std::endl; + if (resp.error_code()) { + std::cout << resp.error_code() << ": " << resp.error_message() << std::endl; + } + else { + std::cout << resp.action() << " " << resp.value().as_string() << std::endl; + std::cout << "Previous value: " << resp.prev_value().as_string() << std::endl; + + std::cout << "Events size: " << resp.events().size() << std::endl; + for (auto const &ev: resp.events()) { + std::cout << "Value change in events: " << static_cast(ev.event_type()) + << ", prev kv = " << ev.prev_kv().key() << " -> " << ev.prev_kv().as_string() + << ", kv = " << ev.kv().key() << " -> " << ev.kv().as_string() + << std::endl; + } + } +} + +void wait_for_connection(etcd::Client &client) { + // wait until the client connects to etcd server + while (!client.head().get().is_ok()) { + sleep(1); + } +} + +void initialize_watcher(const std::string& endpoints, + const std::string& prefix, + std::function callback, + std::shared_ptr& watcher) { + etcd::Client client(endpoints); + wait_for_connection(client); + + // Check if the failed one has been cancelled first + if (watcher && watcher->Cancelled()) { + std::cout << "watcher's reconnect loop been cancelled" << std::endl; + return; + } + watcher.reset(new etcd::Watcher(client, prefix, callback, true)); + + // Note that lambda requires `mutable`qualifier. + watcher->Wait([endpoints, prefix, callback, + /* By reference for renewing */ &watcher](bool cancelled) mutable { + if (cancelled) { + std::cout << "watcher's reconnect loop stopped as been cancelled" << std::endl; + return; + } + initialize_watcher(endpoints, prefix, callback, watcher); + }); +} + +TEST_CASE("watch should can be re-established") +{ + const std::string my_prefix = "/test"; + + // the watcher initialized in this way will auto re-connect to etcd + std::shared_ptr watcher; + initialize_watcher(etcd_uri, my_prefix, print_response, watcher); + + // issue some changes to see if the watcher works + for (int round = 0; round < 10; ++round) { + try { + etcd::Client client(etcd_uri); + auto response = client.set(my_prefix + "/foo", "bar-" + std::to_string(round)).get(); + } catch (...) { + // pass + } + + std::this_thread::sleep_for(std::chrono::seconds(2)); + } + + // cancel the worker + watcher->Cancel(); + + // the watcher has been cancelled and shouldn't work anymore + for (int round = 10; round < 20; ++round) { + try { + etcd::Client client(etcd_uri); + auto response = client.set(my_prefix + "/foo", "bar-" + std::to_string(round)).get(); + } catch (...) { + // pass + } + + std::this_thread::sleep_for(std::chrono::seconds(2)); + } +} diff --git a/tst/WatcherTest.cpp b/tst/WatcherTest.cpp index 6f74fd5..2ca823d 100644 --- a/tst/WatcherTest.cpp +++ b/tst/WatcherTest.cpp @@ -17,8 +17,7 @@ void printResponse(etcd::Response const & resp) if (resp.error_code()) { std::cout << resp.error_code() << ": " << resp.error_message() << std::endl; } - else - { + else { std::cout << resp.action() << " " << resp.value().as_string() << std::endl; std::cout << "Previous value: " << resp.prev_value().as_string() << std::endl;