Improve the etcd watcher to detect error when the connection lost.

Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
This commit is contained in:
Tao He 2020-09-22 10:48:50 +08:00
parent 8344886283
commit b317e29ec7
7 changed files with 50 additions and 3 deletions

View File

@ -51,6 +51,11 @@ namespace etcd
*/ */
bool is_ok() const; bool is_ok() const;
/**
* Returns true if the error is a network unavailable error.
*/
bool is_network_unavailable() const;
/** /**
* Returns the error code received from the etcd server. In case of success the error code is 0. * Returns the error code received from the etcd server. In case of success the error code is 0.
*/ */

View File

@ -30,7 +30,27 @@ namespace etcd
std::function<void(Response)> callback, bool recursive=false); std::function<void(Response)> callback, bool recursive=false);
Watcher(std::string const & etcd_url, std::string const & key, int fromIndex, Watcher(std::string const & etcd_url, std::string const & key, int fromIndex,
std::function<void(Response)> callback, bool recursive=false); std::function<void(Response)> callback, bool recursive=false);
/**
* Wait util the task has been stopped, actively or passively, e.g., the watcher
* get cancelled or the server closes the connection.
*
* Returns true if the watcher is been normally cancalled, otherwise false.
*/
bool Wait();
/**
* An async wait, the callback will be called when the task has been stopped.
*
* The callback parameter would be true if the watch is been normally cancalled.
*/
void Wait(std::function<void(bool)> callback);
/**
* Stop the watching action.
*/
void Cancel(); void Cancel();
~Watcher(); ~Watcher();
protected: protected:

View File

@ -24,6 +24,7 @@ namespace etcdv3
void waitForResponse(std::function<void(etcd::Response)> callback); void waitForResponse(std::function<void(etcd::Response)> callback);
void CancelWatch(); void CancelWatch();
void WatchReq(std::string const & key); void WatchReq(std::string const & key);
bool Cancelled() const;
private: private:
WatchResponse reply; WatchResponse reply;
std::unique_ptr<ClientAsyncReaderWriter<WatchRequest,WatchResponse>> stream; std::unique_ptr<ClientAsyncReaderWriter<WatchRequest,WatchResponse>> stream;

View File

@ -72,6 +72,11 @@ bool etcd::Response::is_ok() const
return error_code() == 0; return error_code() == 0;
} }
bool etcd::Response::is_network_unavailable() const
{
return error_code() == ::grpc::StatusCode::UNAVAILABLE;
}
etcd::Value const & etcd::Response::value() const etcd::Value const & etcd::Response::value() const
{ {
return _value; return _value;

View File

@ -43,10 +43,24 @@ etcd::Watcher::~Watcher()
currentTask.wait(); currentTask.wait();
} }
bool etcd::Watcher::Wait()
{
currentTask.wait();
return call->Cancelled();
}
void etcd::Watcher::Wait(std::function<void(bool)> callback)
{
currentTask.then([this, callback](pplx::task<void> const & resp_task) {
resp_task.wait();
callback(this->call->Cancelled());
});
}
void etcd::Watcher::Cancel() void etcd::Watcher::Cancel()
{ {
call->CancelWatch(); call->CancelWatch();
currentTask.wait(); this->Wait();
} }
void etcd::Watcher::doWatch(std::string const & key, std::function<void(Response)> callback) void etcd::Watcher::doWatch(std::string const & key, std::function<void(Response)> callback)

View File

@ -20,7 +20,6 @@ etcdv3::AsyncLockResponse etcdv3::AsyncLockAction::ParseResponse()
if(!status.ok()) if(!status.ok())
{ {
std::cout << "lock error message is: " << status.error_message() << std::endl;
lock_resp.set_error_code(status.error_code()); lock_resp.set_error_code(status.error_code());
lock_resp.set_error_message(status.error_message()); lock_resp.set_error_message(status.error_message());
} }
@ -49,7 +48,6 @@ etcdv3::AsyncUnlockResponse etcdv3::AsyncUnlockAction::ParseResponse()
if(!status.ok()) if(!status.ok())
{ {
std::cout << "unlock error message is: " << status.error_message() << std::endl;
unlock_resp.set_error_code(status.error_code()); unlock_resp.set_error_code(status.error_code());
unlock_resp.set_error_message(status.error_message()); unlock_resp.set_error_message(status.error_message());
} }

View File

@ -93,6 +93,10 @@ void etcdv3::AsyncWatchAction::CancelWatch()
} }
} }
bool etcdv3::AsyncWatchAction::Cancelled() const {
return isCancelled;
}
void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response)> callback) void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response)> callback)
{ {
void* got_tag; void* got_tag;