diff --git a/etcd/Response.hpp b/etcd/Response.hpp index c9baff0..2b08c51 100644 --- a/etcd/Response.hpp +++ b/etcd/Response.hpp @@ -51,6 +51,11 @@ namespace etcd */ bool is_ok() const; + /** + * Returns true if the error is a network unavailable error. + */ + bool is_network_unavailable() const; + /** * Returns the error code received from the etcd server. In case of success the error code is 0. */ diff --git a/etcd/Watcher.hpp b/etcd/Watcher.hpp index a5cd29a..501ad7e 100644 --- a/etcd/Watcher.hpp +++ b/etcd/Watcher.hpp @@ -30,7 +30,27 @@ namespace etcd std::function callback, bool recursive=false); Watcher(std::string const & etcd_url, std::string const & key, int fromIndex, std::function callback, bool recursive=false); + + /** + * Wait util the task has been stopped, actively or passively, e.g., the watcher + * get cancelled or the server closes the connection. + * + * Returns true if the watcher is been normally cancalled, otherwise false. + */ + bool Wait(); + + /** + * An async wait, the callback will be called when the task has been stopped. + * + * The callback parameter would be true if the watch is been normally cancalled. + */ + void Wait(std::function callback); + + /** + * Stop the watching action. + */ void Cancel(); + ~Watcher(); protected: diff --git a/etcd/v3/AsyncWatchAction.hpp b/etcd/v3/AsyncWatchAction.hpp index 3453e87..8dcb189 100644 --- a/etcd/v3/AsyncWatchAction.hpp +++ b/etcd/v3/AsyncWatchAction.hpp @@ -24,6 +24,7 @@ namespace etcdv3 void waitForResponse(std::function callback); void CancelWatch(); void WatchReq(std::string const & key); + bool Cancelled() const; private: WatchResponse reply; std::unique_ptr> stream; diff --git a/src/Response.cpp b/src/Response.cpp index 6e01a4c..57ac8b8 100644 --- a/src/Response.cpp +++ b/src/Response.cpp @@ -72,6 +72,11 @@ bool etcd::Response::is_ok() const return error_code() == 0; } +bool etcd::Response::is_network_unavailable() const +{ + return error_code() == ::grpc::StatusCode::UNAVAILABLE; +} + etcd::Value const & etcd::Response::value() const { return _value; diff --git a/src/Watcher.cpp b/src/Watcher.cpp index 434c462..4f26def 100644 --- a/src/Watcher.cpp +++ b/src/Watcher.cpp @@ -43,10 +43,24 @@ etcd::Watcher::~Watcher() currentTask.wait(); } +bool etcd::Watcher::Wait() +{ + currentTask.wait(); + return call->Cancelled(); +} + +void etcd::Watcher::Wait(std::function callback) +{ + currentTask.then([this, callback](pplx::task const & resp_task) { + resp_task.wait(); + callback(this->call->Cancelled()); + }); +} + void etcd::Watcher::Cancel() { call->CancelWatch(); - currentTask.wait(); + this->Wait(); } void etcd::Watcher::doWatch(std::string const & key, std::function callback) diff --git a/src/v3/AsyncLockAction.cpp b/src/v3/AsyncLockAction.cpp index 55354a0..49d50b9 100644 --- a/src/v3/AsyncLockAction.cpp +++ b/src/v3/AsyncLockAction.cpp @@ -20,7 +20,6 @@ etcdv3::AsyncLockResponse etcdv3::AsyncLockAction::ParseResponse() if(!status.ok()) { - std::cout << "lock error message is: " << status.error_message() << std::endl; lock_resp.set_error_code(status.error_code()); lock_resp.set_error_message(status.error_message()); } @@ -49,7 +48,6 @@ etcdv3::AsyncUnlockResponse etcdv3::AsyncUnlockAction::ParseResponse() if(!status.ok()) { - std::cout << "unlock error message is: " << status.error_message() << std::endl; unlock_resp.set_error_code(status.error_code()); unlock_resp.set_error_message(status.error_message()); } diff --git a/src/v3/AsyncWatchAction.cpp b/src/v3/AsyncWatchAction.cpp index d494a80..d5fa792 100644 --- a/src/v3/AsyncWatchAction.cpp +++ b/src/v3/AsyncWatchAction.cpp @@ -93,6 +93,10 @@ void etcdv3::AsyncWatchAction::CancelWatch() } } +bool etcdv3::AsyncWatchAction::Cancelled() const { + return isCancelled; +} + void etcdv3::AsyncWatchAction::waitForResponse(std::function callback) { void* got_tag;