Handle watching on compacted revisions cases.

Fixes #43.

Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
This commit is contained in:
Tao He 2021-02-15 00:13:09 +08:00
parent 4f31000cf4
commit 38366fc5c3
2 changed files with 19 additions and 1 deletions

View File

@ -63,7 +63,11 @@ void etcdv3::AsyncWatchAction::waitForResponse()
}
if(got_tag == (void*)this) // read tag
{
if ((reply.created() && reply.header().revision() < parameters.revision) ||
if (reply.canceled()) {
isCancelled = true;
cq_.Shutdown();
}
else if ((reply.created() && reply.header().revision() < parameters.revision) ||
reply.events_size() > 0) {
// we stop watch under two conditions:
//
@ -125,6 +129,15 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response
}
else if(got_tag == (void*)this) // read tag
{
if (reply.canceled()) {
isCancelled = true;
cq_.Shutdown();
if (reply.compact_revision() != 0) {
callback(etcd::Response(grpc::StatusCode::OUT_OF_RANGE /* error code */,
"required revision has been compacted"));
}
break;
}
if(reply.events_size())
{
// for the callback case, we don't stop immediately if watching for a future revison,

View File

@ -3,6 +3,11 @@
void etcdv3::AsyncWatchResponse::ParseResponse(WatchResponse& reply)
{
if (reply.canceled() && reply.compact_revision() != 0) {
error_code=grpc::StatusCode::OUT_OF_RANGE;
error_message="required revision has been compacted";
return;
}
index = reply.header().revision();
for (auto const &e: reply.events()) {
events.emplace_back(e);