Include compact revision for cancelled watch response. (#91)

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2021-09-28 11:24:11 +08:00 committed by GitHub
parent a949dec288
commit b99dc2024e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 27 additions and 4 deletions

View File

@ -130,6 +130,12 @@ namespace etcd
*/ */
std::string const & key(int index) const; std::string const & key(int index) const;
/**
* Returns the compact_revision if the response is a watch-cancelled revision.
* `-1` means uninitialized (the response is not watch-cancelled)
*/
int compact_revision() const;
/** /**
* Returns the lock key. * Returns the lock key.
*/ */
@ -162,6 +168,7 @@ namespace etcd
Value _prev_value; Value _prev_value;
Values _values; Values _values;
Keys _keys; Keys _keys;
int _compact_revision = -1; // for watch
std::string _lock_key; // for lock std::string _lock_key; // for lock
std::string _name; // for campaign (in v3election) std::string _name; // for campaign (in v3election)
std::vector<Event> _events; // for watch std::vector<Event> _events; // for watch

View File

@ -25,6 +25,7 @@ namespace etcdv3
etcdv3::KeyValue const & get_value() const; etcdv3::KeyValue const & get_value() const;
etcdv3::KeyValue const & get_prev_value() const; etcdv3::KeyValue const & get_prev_value() const;
bool has_values() const; bool has_values() const;
int get_compact_revision() const;
void set_lock_key(std::string const &key); void set_lock_key(std::string const &key);
std::string const &get_lock_key() const; std::string const &get_lock_key() const;
void set_name(std::string const &name); void set_name(std::string const &name);
@ -39,6 +40,7 @@ namespace etcdv3
etcdv3::KeyValue prev_value; etcdv3::KeyValue prev_value;
std::vector<etcdv3::KeyValue> values; std::vector<etcdv3::KeyValue> values;
std::vector<etcdv3::KeyValue> prev_values; std::vector<etcdv3::KeyValue> prev_values;
int compact_revision = -1;
std::string lock_key; // for lock std::string lock_key; // for lock
std::string name; // for campaign (in v3election) std::string name; // for campaign (in v3election)
std::vector<mvccpb::Event> events; // for watch std::vector<mvccpb::Event> events; // for watch

View File

@ -24,6 +24,7 @@ etcd::Response::Response(const etcdv3::V3Response& reply, std::chrono::microseco
} }
_prev_value = Value(reply.get_prev_value()); _prev_value = Value(reply.get_prev_value());
_compact_revision = reply.get_compact_revision();
_lock_key = reply.get_lock_key(); _lock_key = reply.get_lock_key();
_name = reply.get_name(); _name = reply.get_name();
@ -109,6 +110,11 @@ std::string const & etcd::Response::key(int index) const
return _keys[index]; return _keys[index];
} }
int etcd::Response::compact_revision() const
{
return _compact_revision;
}
std::string const & etcd::Response::lock_key() const { std::string const & etcd::Response::lock_key() const {
return _lock_key; return _lock_key;
} }

View File

@ -149,8 +149,10 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response
isCancelled.store(true); isCancelled.store(true);
cq_.Shutdown(); cq_.Shutdown();
if (reply.compact_revision() != 0) { if (reply.compact_revision() != 0) {
callback(etcd::Response(grpc::StatusCode::OUT_OF_RANGE /* error code */, auto resp = etcd::Response(grpc::StatusCode::OUT_OF_RANGE /* error code */,
"required revision has been compacted")); "required revision has been compacted");
resp._compact_revision = reply.compact_revision();
callback(resp);
} }
break; break;
} }

View File

@ -4,8 +4,9 @@
void etcdv3::AsyncWatchResponse::ParseResponse(WatchResponse& reply) void etcdv3::AsyncWatchResponse::ParseResponse(WatchResponse& reply)
{ {
if (reply.canceled() && reply.compact_revision() != 0) { if (reply.canceled() && reply.compact_revision() != 0) {
error_code=grpc::StatusCode::OUT_OF_RANGE; error_code = grpc::StatusCode::OUT_OF_RANGE;
error_message="required revision has been compacted"; error_message = "required revision has been compacted";
compact_revision = reply.compact_revision();
return; return;
} }
index = reply.header().revision(); index = reply.header().revision();

View File

@ -61,6 +61,11 @@ bool etcdv3::V3Response::has_values() const
return values.size() > 0; return values.size() > 0;
} }
int etcdv3::V3Response::get_compact_revision() const
{
return compact_revision;
}
void etcdv3::V3Response::set_lock_key(std::string const &key) { void etcdv3::V3Response::set_lock_key(std::string const &key) {
this->lock_key = key; this->lock_key = key;
} }