Fixes a possible bug about watcher's id (#232)

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2023-07-01 18:41:33 +08:00 committed by GitHub
parent 32fae70113
commit fe9f17e61e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 2 deletions

View File

@ -89,6 +89,9 @@ void etcdv3::Action::waitForResponse()
break;
}
case CompletionQueue::NextStatus::GOT_EVENT: {
if (!ok) {
status = grpc::Status(grpc::StatusCode::ABORTED, "Failed to execute the action: not ok or invalid tag");
}
break;
}
}

View File

@ -1178,6 +1178,7 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(
{
isCancelled.store(false);
stream = parameters.watch_stub->AsyncWatch(&context,&cq_,(void*)etcdv3::WATCH_CREATE);
this->watch_id = std::chrono::steady_clock::now().time_since_epoch().count();
WatchRequest watch_req;
WatchCreateRequest watch_create_req;
@ -1199,6 +1200,7 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(
watch_create_req.set_prev_kv(true);
watch_create_req.set_start_revision(parameters.revision);
watch_create_req.set_watch_id(this->watch_id);
watch_req.mutable_create_request()->CopyFrom(watch_create_req);
@ -1254,6 +1256,9 @@ void etcdv3::AsyncWatchAction::waitForResponse()
continue;
}
// record the watcher id
this->watch_id = reply.watch_id();
// we stop watch under two conditions:
//
// 1. watch for a future revision, return immediately with empty events set
@ -1339,6 +1344,9 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response
continue;
}
// record the watcher id
this->watch_id = reply.watch_id();
// for the callback case, we don't invoke callback immediately if watching
// for a future revision, we wait until there are some effective events.
if(reply.events_size())

View File

@ -14,10 +14,12 @@ static int watcher_called = 0;
void printResponse(etcd::Response const & resp)
{
if (resp.error_code()) {
std::cout << resp.error_code() << ": " << resp.error_message() << std::endl;
std::cout << "Watcher "<< resp.watch_id()
<< " fails with " << resp.error_code() << ": " << resp.error_message() << std::endl;
}
else {
std::cout << resp.action() << " " << resp.value().as_string() << std::endl;
std::cout << "Watcher " << resp.watch_id()
<< " responses with " << resp.action() << " " << resp.value().as_string() << std::endl;
std::cout << "Previous value: " << resp.prev_value().as_string() << std::endl;
std::cout << "Events size: " << resp.events().size() << std::endl;
@ -191,6 +193,14 @@ TEST_CASE("watch changes on the same key (#212)")
}
}
TEST_CASE("create two watcher")
{
etcd::Watcher w1(etcd_url, "/test", printResponse, true);
etcd::Watcher w2(etcd_url, "/test", printResponse, true);
std::this_thread::sleep_for(std::chrono::seconds(5));
}
// TEST_CASE("request cancellation")
// {
// etcd::Client etcd(etcd_url);