Updated Watch so it can return previous values.

This commit is contained in:
arches 2016-06-21 10:45:43 -04:00
parent 808ccd5e3b
commit bdfb481f69
4 changed files with 14 additions and 5 deletions

View File

@ -335,6 +335,7 @@ pplx::task<etcd::Response> etcd::Client::send_asyncwatch(std::string const & key
watch_req.mutable_create_request()->CopyFrom(watch_create_req); watch_req.mutable_create_request()->CopyFrom(watch_create_req);
call->stream->Write(watch_req, (void*)call.get()); call->stream->Write(watch_req, (void*)call.get());
call->stub_ = stub_.get(); call->stub_ = stub_.get();
call->fromIndex = fromIndex;
return Response::create(call); return Response::create(call);
} }

View File

@ -36,8 +36,10 @@ etcd::Response::Response(const etcdv3::V3Response& reply)
} }
if(reply.prev_values.size() == 1) if(reply.prev_values.size() == 1)
{
_prev_value = Value(reply.prev_values[0]); _prev_value = Value(reply.prev_values[0]);
} }
}
etcd::Response::Response() etcd::Response::Response()

View File

@ -19,7 +19,7 @@ namespace etcdv3
class AsyncWatchResponse : public etcdv3::V3Response class AsyncWatchResponse : public etcdv3::V3Response
{ {
public: public:
AsyncWatchResponse(){}; AsyncWatchResponse(){fromIndex = -1;};
AsyncWatchResponse(const std::string act){action = act;}; AsyncWatchResponse(const std::string act){action = act;};
AsyncWatchResponse(const AsyncWatchResponse& other); AsyncWatchResponse(const AsyncWatchResponse& other);
AsyncWatchResponse& operator=(const AsyncWatchResponse& other); AsyncWatchResponse& operator=(const AsyncWatchResponse& other);
@ -31,6 +31,7 @@ namespace etcdv3
CompletionQueue cq_; CompletionQueue cq_;
std::unique_ptr<ClientAsyncReaderWriter<WatchRequest,WatchResponse>> stream; std::unique_ptr<ClientAsyncReaderWriter<WatchRequest,WatchResponse>> stream;
KV::Stub* stub_; KV::Stub* stub_;
int fromIndex;
}; };
} }

View File

@ -56,6 +56,7 @@ etcdv3::AsyncWatchResponse& etcdv3::AsyncWatchResponse::ParseResponse()
mvccpb::KeyValue kv; mvccpb::KeyValue kv;
std::map<std::string, mvccpb::KeyValue> mapValue; std::map<std::string, mvccpb::KeyValue> mapValue;
std::map<std::string, mvccpb::KeyValue> prev_mapValue;
for(int cnt =0; cnt < reply.events_size(); cnt++) for(int cnt =0; cnt < reply.events_size(); cnt++)
{ {
@ -73,7 +74,6 @@ etcdv3::AsyncWatchResponse& etcdv3::AsyncWatchResponse::ParseResponse()
{ {
action = "set"; action = "set";
} }
//values.push_back(kv);
mapValue.emplace(kv.key(), kv); mapValue.emplace(kv.key(), kv);
} }
} }
@ -84,7 +84,7 @@ etcdv3::AsyncWatchResponse& etcdv3::AsyncWatchResponse::ParseResponse()
//get previous value index - 1 //get previous value index - 1
RangeRequest get_request; RangeRequest get_request;
get_request.set_key(kv.key()); get_request.set_key(kv.key());
get_request.set_revision(index - 1); get_request.set_revision((fromIndex >=0)?fromIndex - 1:index-1);
RangeResponse response; RangeResponse response;
ClientContext ctx; ClientContext ctx;
@ -94,10 +94,15 @@ etcdv3::AsyncWatchResponse& etcdv3::AsyncWatchResponse::ParseResponse()
{ {
for(int cnt=0; cnt < response.kvs_size(); cnt++) for(int cnt=0; cnt < response.kvs_size(); cnt++)
{ {
prev_values.push_back(response.kvs(cnt)); prev_mapValue.emplace(response.kvs(cnt).key(),response.kvs(cnt));
} }
} }
} }
for(auto x: prev_mapValue)
{
prev_values.push_back(x.second);
}
for(auto x: mapValue) for(auto x: mapValue)
{ {
values.push_back(x.second); values.push_back(x.second);