diff --git a/src/v3/AsyncWatchAction.cpp b/src/v3/AsyncWatchAction.cpp index 468a903..249137f 100644 --- a/src/v3/AsyncWatchAction.cpp +++ b/src/v3/AsyncWatchAction.cpp @@ -74,6 +74,7 @@ void etcdv3::AsyncWatchAction::waitForResponse() if (reply.canceled()) { isCancelled.store(true); cq_.Shutdown(); + break; } else if ((reply.created() && reply.header().revision() < parameters.revision) || reply.events_size() > 0) { @@ -92,11 +93,14 @@ void etcdv3::AsyncWatchAction::waitForResponse() grpc::Status status; stream->Finish(&status, (void *)this); - if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)this) { - // ok - } else { - std::cerr << "WARN: Failed to finish a watch connection" << std::endl; - } + + // n.b., don't wait, as there might be another extra "Read" action on the fly. + // + // if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)this) { + // // ok + // } else { + // std::cerr << "WARN: Failed to finish a watch connection" << std::endl; + // } cq_.Shutdown(); @@ -104,13 +108,14 @@ void etcdv3::AsyncWatchAction::waitForResponse() if (reply.fragment()) { std::cerr << "WARN: The response hasn't been fully received and parsed" << std::endl; } + break; } else { // otherwise, start next round read-reply stream->Read(&reply, (void*)this); - } - } + } + } } } @@ -130,11 +135,13 @@ void etcdv3::AsyncWatchAction::CancelWatch() grpc::Status status; stream->Finish(&status, (void *)this); - if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)this) { - // ok - } else { - std::cerr << "WARN: Failed to finish a watch connection" << std::endl; - } + // n.b., don't wait, as there might be another extra "Read" action on the fly. + // + // if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)this) { + // // ok + // } else { + // std::cerr << "WARN: Failed to finish a watch connection" << std::endl; + // } cq_.Shutdown(); }