Fixes race condition (repeatly cancel) in watch.

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2021-05-18 14:08:46 +08:00
parent f57218d8d9
commit ad4215511f
5 changed files with 21 additions and 19 deletions

View File

@ -1,6 +1,7 @@
#ifndef __ETCD_WATCHER_HPP__
#define __ETCD_WATCHER_HPP__
#include <atomic>
#include <functional>
#include <string>
#include <thread>
@ -98,6 +99,7 @@ namespace etcd
private:
int fromIndex;
bool recursive;
std::atomic_bool cancelled;
};
}

View File

@ -1,6 +1,7 @@
#ifndef __ASYNC_WATCHACTION_HPP__
#define __ASYNC_WATCHACTION_HPP__
#include <atomic>
#include <mutex>
#include <grpc++/grpc++.h>
@ -28,7 +29,7 @@ namespace etcdv3
private:
WatchResponse reply;
std::unique_ptr<ClientAsyncReaderWriter<WatchRequest,WatchResponse>> stream;
bool isCancelled;
std::atomic_bool isCancelled;
std::mutex protect_is_cancalled;
};
}

View File

@ -115,7 +115,9 @@ void etcd::KeepAlive::Cancel()
// clean up
context.stop();
task_.join();
if (task_.joinable()) {
task_.join();
}
}
void etcd::KeepAlive::Check() {

View File

@ -92,16 +92,15 @@ etcd::Watcher::Watcher(std::string const & address,
etcd::Watcher::~Watcher()
{
stubs->call->CancelWatch();
if (task_.joinable()) {
task_.join();
}
this->Cancel();
}
bool etcd::Watcher::Wait()
{
if (task_.joinable()) {
task_.join();
if (cancelled.exchange(true)) {
if (task_.joinable()) {
task_.join();
}
}
return stubs->call->Cancelled();
}
@ -144,4 +143,5 @@ void etcd::Watcher::doWatch(std::string const & key,
wait_callback(stubs->call->Cancelled());
}
});
cancelled.store(false);
}

View File

@ -9,7 +9,7 @@ using etcdserverpb::WatchCreateRequest;
etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters param)
: etcdv3::Action(param)
{
isCancelled = false;
isCancelled.store(false);
stream = parameters.watch_stub->AsyncWatch(&context,&cq_,(void*)"create");
WatchRequest watch_req;
@ -61,14 +61,14 @@ void etcdv3::AsyncWatchAction::waitForResponse()
break;
}
if(got_tag == (void*)"writes done") {
isCancelled = true;
isCancelled.store(true);
cq_.Shutdown();
break;
}
if(got_tag == (void*)this) // read tag
{
if (reply.canceled()) {
isCancelled = true;
isCancelled.store(true);
cq_.Shutdown();
}
else if ((reply.created() && reply.header().revision() < parameters.revision) ||
@ -77,7 +77,7 @@ void etcdv3::AsyncWatchAction::waitForResponse()
//
// 1. watch for a future revision, return immediately with empty events set
// 2. receive any effective events.
isCancelled = true;
isCancelled.store(true);
stream->WritesDone((void*)"writes done");
grpc::Status status;
stream->Finish(&status, (void *)this);
@ -100,9 +100,7 @@ void etcdv3::AsyncWatchAction::waitForResponse()
void etcdv3::AsyncWatchAction::CancelWatch()
{
std::lock_guard<std::mutex> scope_lock(this->protect_is_cancalled);
if(isCancelled == false)
{
isCancelled = true;
if (!isCancelled.exchange(true)) {
stream->WritesDone((void*)"writes done");
grpc::Status status;
stream->Finish(&status, (void *)this);
@ -111,7 +109,7 @@ void etcdv3::AsyncWatchAction::CancelWatch()
}
bool etcdv3::AsyncWatchAction::Cancelled() const {
return isCancelled;
return isCancelled.load();
}
void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response)> callback)
@ -127,14 +125,14 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response
}
if(got_tag == (void*)"writes done")
{
isCancelled = true;
isCancelled.store(true);
cq_.Shutdown();
break;
}
else if(got_tag == (void*)this) // read tag
{
if (reply.canceled()) {
isCancelled = true;
isCancelled.store(true);
cq_.Shutdown();
if (reply.compact_revision() != 0) {
callback(etcd::Response(grpc::StatusCode::OUT_OF_RANGE /* error code */,
@ -159,7 +157,6 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response
etcdv3::AsyncWatchResponse etcdv3::AsyncWatchAction::ParseResponse()
{
AsyncWatchResponse watch_resp;
if(!status.ok())
{