Revisit the watcher's reconnect functionality.

Address #73, #76, #117 and #118.

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2022-04-08 19:40:59 +08:00
parent f70255c8b0
commit 56c7189f92
7 changed files with 149 additions and 25 deletions

View File

@ -570,24 +570,34 @@ to decide if a watcher should re-connect to the etcd server.
Here is an example how users can make a watcher re-connect to server after disconnected. Here is an example how users can make a watcher re-connect to server after disconnected.
```c++ ```c++
void wait_for_connection(Client &client) { // wait the client ready
void wait_for_connection(etcd::Client &client) {
// wait until the client connects to etcd server // wait until the client connects to etcd server
// `head` API is only available in version later than 0.2.1
while (!client.head().get().is_ok()) { while (!client.head().get().is_ok()) {
sleep(1); sleep(1);
} }
} }
// a loop for initialized a watcher with auto-restart capability
void initialize_watcher(const std::string& endpoints, void initialize_watcher(const std::string& endpoints,
const std::string& prefix, const std::string& prefix,
std::function<void(Response)> callback, std::function<void(etcd::Response)> callback,
std::shared_ptr<etcd::Watcher>& watcher) { std::shared_ptr<etcd::Watcher>& watcher) {
Client client(endpoints); etcd::Client client(endpoints);
wait_for_connection(client); wait_for_connection(client);
watcher->reset(new etcd::Watcher(client, prefix, callback));
// Check if the failed one has been cancelled first
if (watcher && watcher->Cancelled()) {
std::cout << "watcher's reconnect loop been cancelled" << std::endl;
return;
}
watcher.reset(new etcd::Watcher(client, prefix, callback, true));
// Note that lambda requires `mutable`qualifier.
watcher->Wait([endpoints, prefix, callback, watcher->Wait([endpoints, prefix, callback,
watcher_ref /* keep the shared_ptr alive */, &watcher](bool cancelled) { /* By reference for renewing */ &watcher](bool cancelled) mutable {
if (cancelled) { if (cancelled) {
std::cout << "watcher's reconnect loop stopped as been cancelled" << std::endl;
return; return;
} }
initialize_watcher(endpoints, prefix, callback, watcher); initialize_watcher(endpoints, prefix, callback, watcher);
@ -595,16 +605,20 @@ void initialize_watcher(const std::string &endpoints,
} }
``` ```
The functionalities can be used as
```c++ ```c++
std::string endpoints = "http://127.0.0.1:2379"; std::string endpoints = "http://127.0.0.1:2379";
std::function<void(Response)> callback = printResponse; std::function<void(Response)> callback = printResponse;
const std::string prefix = "/test/key"; const std::string prefix = "/test/key";
// the watcher initialized in this way will auto re-connect to etcd // the watcher initialized in this way will auto re-connect to etcd
std::unique_ptr<etcd::Watcher> watcher; std::shared_ptr<etcd::Watcher> watcher;
initialize_watcher(endpoints, prefix, callback, watcher); initialize_watcher(endpoints, prefix, callback, watcher);
``` ```
For a complete runnable example, see also [./tst/RewatchTest.cpp](./tst/RewatchTest.cpp).
### Requesting for lease ### Requesting for lease
Users can request for lease which is governed by a time-to-live(TTL) value given by the user. Users can request for lease which is governed by a time-to-live(TTL) value given by the user.

View File

@ -76,7 +76,12 @@ namespace etcd
/** /**
* Stop the watching action. * Stop the watching action.
*/ */
void Cancel(); bool Cancel();
/**
* Whether the watcher has been cancelled.
*/
bool Cancelled() const;
~Watcher(); ~Watcher();

View File

@ -29,7 +29,6 @@ namespace etcdv3
WatchResponse reply; WatchResponse reply;
std::unique_ptr<ClientAsyncReaderWriter<WatchRequest,WatchResponse>> stream; std::unique_ptr<ClientAsyncReaderWriter<WatchRequest,WatchResponse>> stream;
std::atomic_bool isCancelled; std::atomic_bool isCancelled;
std::mutex protect_is_cancalled;
}; };
} }

View File

@ -118,10 +118,15 @@ void etcd::Watcher::Wait(std::function<void(bool)> callback)
} }
} }
void etcd::Watcher::Cancel() bool etcd::Watcher::Cancel()
{ {
stubs->call->CancelWatch(); stubs->call->CancelWatch();
this->Wait(); return this->Wait();
}
bool etcd::Watcher::Cancelled() const
{
return cancelled.load() || stubs->call->Cancelled();
} }
void etcd::Watcher::doWatch(std::string const & key, void etcd::Watcher::doWatch(std::string const & key,
@ -144,7 +149,10 @@ void etcd::Watcher::doWatch(std::string const & key,
task_ = std::thread([this, callback]() { task_ = std::thread([this, callback]() {
stubs->call->waitForResponse(callback); stubs->call->waitForResponse(callback);
if (wait_callback != nullptr) { if (wait_callback != nullptr) {
// issue the callback in another thread to avoid deadlock, is ok to detach the pplx::task
pplx::task<void>([this]() -> void {
wait_callback(stubs->call->Cancelled()); wait_callback(stubs->call->Cancelled());
});
} }
}); });
cancelled.store(false); cancelled.store(false);

View File

@ -108,7 +108,6 @@ void etcdv3::AsyncWatchAction::waitForResponse()
void etcdv3::AsyncWatchAction::CancelWatch() void etcdv3::AsyncWatchAction::CancelWatch()
{ {
std::lock_guard<std::mutex> scope_lock(this->protect_is_cancalled);
if (!isCancelled.exchange(true)) { if (!isCancelled.exchange(true)) {
stream->WritesDone((void*)etcdv3::WATCH_WRITES_DONE); stream->WritesDone((void*)etcdv3::WATCH_WRITES_DONE);

100
tst/RewatchTest.cpp Normal file
View File

@ -0,0 +1,100 @@
#define CATCH_CONFIG_MAIN
#include <catch.hpp>
#include <chrono>
#include <thread>
#include "etcd/Watcher.hpp"
#include "etcd/SyncClient.hpp"
static std::string etcd_uri("http://127.0.0.1:2379");
static int watcher_called = 0;
void print_response(etcd::Response const & resp)
{
++watcher_called;
std::cout << "print response called" << std::endl;
if (resp.error_code()) {
std::cout << resp.error_code() << ": " << resp.error_message() << std::endl;
}
else {
std::cout << resp.action() << " " << resp.value().as_string() << std::endl;
std::cout << "Previous value: " << resp.prev_value().as_string() << std::endl;
std::cout << "Events size: " << resp.events().size() << std::endl;
for (auto const &ev: resp.events()) {
std::cout << "Value change in events: " << static_cast<int>(ev.event_type())
<< ", prev kv = " << ev.prev_kv().key() << " -> " << ev.prev_kv().as_string()
<< ", kv = " << ev.kv().key() << " -> " << ev.kv().as_string()
<< std::endl;
}
}
}
void wait_for_connection(etcd::Client &client) {
// wait until the client connects to etcd server
while (!client.head().get().is_ok()) {
sleep(1);
}
}
void initialize_watcher(const std::string& endpoints,
const std::string& prefix,
std::function<void(etcd::Response)> callback,
std::shared_ptr<etcd::Watcher>& watcher) {
etcd::Client client(endpoints);
wait_for_connection(client);
// Check if the failed one has been cancelled first
if (watcher && watcher->Cancelled()) {
std::cout << "watcher's reconnect loop been cancelled" << std::endl;
return;
}
watcher.reset(new etcd::Watcher(client, prefix, callback, true));
// Note that lambda requires `mutable`qualifier.
watcher->Wait([endpoints, prefix, callback,
/* By reference for renewing */ &watcher](bool cancelled) mutable {
if (cancelled) {
std::cout << "watcher's reconnect loop stopped as been cancelled" << std::endl;
return;
}
initialize_watcher(endpoints, prefix, callback, watcher);
});
}
TEST_CASE("watch should can be re-established")
{
const std::string my_prefix = "/test";
// the watcher initialized in this way will auto re-connect to etcd
std::shared_ptr<etcd::Watcher> watcher;
initialize_watcher(etcd_uri, my_prefix, print_response, watcher);
// issue some changes to see if the watcher works
for (int round = 0; round < 10; ++round) {
try {
etcd::Client client(etcd_uri);
auto response = client.set(my_prefix + "/foo", "bar-" + std::to_string(round)).get();
} catch (...) {
// pass
}
std::this_thread::sleep_for(std::chrono::seconds(2));
}
// cancel the worker
watcher->Cancel();
// the watcher has been cancelled and shouldn't work anymore
for (int round = 10; round < 20; ++round) {
try {
etcd::Client client(etcd_uri);
auto response = client.set(my_prefix + "/foo", "bar-" + std::to_string(round)).get();
} catch (...) {
// pass
}
std::this_thread::sleep_for(std::chrono::seconds(2));
}
}

View File

@ -17,8 +17,7 @@ void printResponse(etcd::Response const & resp)
if (resp.error_code()) { if (resp.error_code()) {
std::cout << resp.error_code() << ": " << resp.error_message() << std::endl; std::cout << resp.error_code() << ": " << resp.error_message() << std::endl;
} }
else else {
{
std::cout << resp.action() << " " << resp.value().as_string() << std::endl; std::cout << resp.action() << " " << resp.value().as_string() << std::endl;
std::cout << "Previous value: " << resp.prev_value().as_string() << std::endl; std::cout << "Previous value: " << resp.prev_value().as_string() << std::endl;