Add a new API to `client.observe()` that accepts a callback.

Resolves #108.

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2021-12-22 10:30:46 +08:00
parent 6a0b6696e5
commit 405383c0ba
5 changed files with 49 additions and 3 deletions

View File

@ -598,7 +598,16 @@ namespace etcd
* Observe the leader change. * Observe the leader change.
* *
* @param name is the names of election to watch. * @param name is the names of election to watch.
* @param callback is the names of election to watch. *
* @returns an observer that holds that action and will cancel the request when being destructed.
*/
std::unique_ptr<Observer> observe(std::string const &name,
const bool once = false);
/**
* Observe the leader change.
*
* @param name is the names of election to watch.
* *
* @returns an observer that holds that action and will cancel the request when being destructed. * @returns an observer that holds that action and will cancel the request when being destructed.
*/ */

View File

@ -41,6 +41,21 @@ namespace etcd
}); });
} }
template <typename T>
static pplx::task<etcd::Response> create(std::shared_ptr<T> call,
std::function<void(Response)> callback)
{
return pplx::task<etcd::Response>([call, callback]()
{
call->waitForResponse(callback);
auto v3resp = call->ParseResponse();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - call->startTimepoint());
return etcd::Response(v3resp, duration);
});
}
template <typename T> template <typename T>
static pplx::task<etcd::Response> create(std::function<std::shared_ptr<T>()> callfn) static pplx::task<etcd::Response> create(std::function<std::shared_ptr<T>()> callfn)
{ {

View File

@ -73,6 +73,8 @@ namespace etcd
Response proclaim(std::string const &name, int64_t lease_id, Response proclaim(std::string const &name, int64_t lease_id,
std::string const &key, int64_t revision, std::string const &value); std::string const &key, int64_t revision, std::string const &value);
Response leader(std::string const &name); Response leader(std::string const &name);
std::unique_ptr<Client::Observer> observe(std::string const &name,
const bool once = false);
std::unique_ptr<Client::Observer> observe(std::string const &name, std::unique_ptr<Client::Observer> observe(std::string const &name,
std::function<void(Response)> callback, std::function<void(Response)> callback,
const bool once = false); const bool once = false);

View File

@ -957,7 +957,7 @@ pplx::task<etcd::Response> etcd::Client::leader(std::string const &name) {
} }
std::unique_ptr<etcd::Client::Observer> etcd::Client::observe( std::unique_ptr<etcd::Client::Observer> etcd::Client::observe(
std::string const &name, std::function<void(Response)> callback, const bool once) { std::string const &name, const bool once) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
params.name.assign(name); params.name.assign(name);
@ -969,6 +969,19 @@ std::unique_ptr<etcd::Client::Observer> etcd::Client::observe(
return observer; return observer;
} }
std::unique_ptr<etcd::Client::Observer> etcd::Client::observe(
std::string const &name, std::function<void(Response)> callback, const bool once) {
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.name.assign(name);
params.election_stub = stubs->electionServiceStub.get();
std::shared_ptr<etcdv3::AsyncObserveAction> call(new etcdv3::AsyncObserveAction(params, once));
std::unique_ptr<Observer> observer(new Observer());
observer->action = call;
observer->resp = Response::create(call, callback);
return observer;
}
pplx::task<etcd::Response> etcd::Client::resign( pplx::task<etcd::Response> etcd::Client::resign(
std::string const &name, int64_t lease_id, std::string const &key, int64_t revision) { std::string const &name, int64_t lease_id, std::string const &key, int64_t revision) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;

View File

@ -173,7 +173,14 @@ etcd::Response etcd::SyncClient::leader(std::string const &name)
} }
std::unique_ptr<etcd::Client::Observer> etcd::SyncClient::observe( std::unique_ptr<etcd::Client::Observer> etcd::SyncClient::observe(
std::string const &name, std::function<void(etcd::Response)> callback, std::string const &name, const bool once)
{
return client.observe(name, once);
}
std::unique_ptr<etcd::Client::Observer> etcd::SyncClient::observe(
std::string const &name,
std::function<void(etcd::Response)> callback,
const bool once) const bool once)
{ {
return client.observe(name, callback, once); return client.observe(name, callback, once);