From 405383c0ba60b585a3444253ce77439ff53c0ea4 Mon Sep 17 00:00:00 2001 From: Tao He Date: Wed, 22 Dec 2021 10:30:46 +0800 Subject: [PATCH] Add a new API to `client.observe()` that accepts a callback. Resolves #108. Signed-off-by: Tao He --- etcd/Client.hpp | 11 ++++++++++- etcd/Response.hpp | 15 +++++++++++++++ etcd/SyncClient.hpp | 2 ++ src/Client.cpp | 15 ++++++++++++++- src/SyncClient.cpp | 9 ++++++++- 5 files changed, 49 insertions(+), 3 deletions(-) diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 3cc0763..1ed4776 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -598,7 +598,16 @@ namespace etcd * Observe the leader change. * * @param name is the names of election to watch. - * @param callback is the names of election to watch. + * + * @returns an observer that holds that action and will cancel the request when being destructed. + */ + std::unique_ptr observe(std::string const &name, + const bool once = false); + + /** + * Observe the leader change. + * + * @param name is the names of election to watch. * * @returns an observer that holds that action and will cancel the request when being destructed. */ diff --git a/etcd/Response.hpp b/etcd/Response.hpp index 656851b..692444c 100644 --- a/etcd/Response.hpp +++ b/etcd/Response.hpp @@ -41,6 +41,21 @@ namespace etcd }); } + template + static pplx::task create(std::shared_ptr call, + std::function callback) + { + return pplx::task([call, callback]() + { + call->waitForResponse(callback); + auto v3resp = call->ParseResponse(); + + auto duration = std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - call->startTimepoint()); + return etcd::Response(v3resp, duration); + }); + } + template static pplx::task create(std::function()> callfn) { diff --git a/etcd/SyncClient.hpp b/etcd/SyncClient.hpp index 50b3a1e..eb00787 100644 --- a/etcd/SyncClient.hpp +++ b/etcd/SyncClient.hpp @@ -73,6 +73,8 @@ namespace etcd Response proclaim(std::string const &name, int64_t lease_id, std::string const &key, int64_t revision, std::string const &value); Response leader(std::string const &name); + std::unique_ptr observe(std::string const &name, + const bool once = false); std::unique_ptr observe(std::string const &name, std::function callback, const bool once = false); diff --git a/src/Client.cpp b/src/Client.cpp index 94c4400..f1b0b07 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -957,7 +957,7 @@ pplx::task etcd::Client::leader(std::string const &name) { } std::unique_ptr etcd::Client::observe( - std::string const &name, std::function callback, const bool once) { + std::string const &name, const bool once) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.name.assign(name); @@ -969,6 +969,19 @@ std::unique_ptr etcd::Client::observe( return observer; } +std::unique_ptr etcd::Client::observe( + std::string const &name, std::function callback, const bool once) { + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.name.assign(name); + params.election_stub = stubs->electionServiceStub.get(); + std::shared_ptr call(new etcdv3::AsyncObserveAction(params, once)); + std::unique_ptr observer(new Observer()); + observer->action = call; + observer->resp = Response::create(call, callback); + return observer; +} + pplx::task etcd::Client::resign( std::string const &name, int64_t lease_id, std::string const &key, int64_t revision) { etcdv3::ActionParameters params; diff --git a/src/SyncClient.cpp b/src/SyncClient.cpp index cd068c3..cb28bdb 100644 --- a/src/SyncClient.cpp +++ b/src/SyncClient.cpp @@ -173,7 +173,14 @@ etcd::Response etcd::SyncClient::leader(std::string const &name) } std::unique_ptr etcd::SyncClient::observe( - std::string const &name, std::function callback, + std::string const &name, const bool once) +{ + return client.observe(name, once); +} + +std::unique_ptr etcd::SyncClient::observe( + std::string const &name, + std::function callback, const bool once) { return client.observe(name, callback, once);