diff --git a/src/Client.cpp b/src/Client.cpp index 9b8cb70..be3772c 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -335,6 +335,7 @@ pplx::task etcd::Client::send_asyncwatch(std::string const & key watch_req.mutable_create_request()->CopyFrom(watch_create_req); call->stream->Write(watch_req, (void*)call.get()); call->stub_ = stub_.get(); + call->fromIndex = fromIndex; return Response::create(call); } diff --git a/src/Response.cpp b/src/Response.cpp index 09770b1..39ad78d 100644 --- a/src/Response.cpp +++ b/src/Response.cpp @@ -36,7 +36,9 @@ etcd::Response::Response(const etcdv3::V3Response& reply) } if(reply.prev_values.size() == 1) + { _prev_value = Value(reply.prev_values[0]); + } } diff --git a/v3/include/AsyncWatchResponse.hpp b/v3/include/AsyncWatchResponse.hpp index 5305345..7c07cd9 100644 --- a/v3/include/AsyncWatchResponse.hpp +++ b/v3/include/AsyncWatchResponse.hpp @@ -19,7 +19,7 @@ namespace etcdv3 class AsyncWatchResponse : public etcdv3::V3Response { public: - AsyncWatchResponse(){}; + AsyncWatchResponse(){fromIndex = -1;}; AsyncWatchResponse(const std::string act){action = act;}; AsyncWatchResponse(const AsyncWatchResponse& other); AsyncWatchResponse& operator=(const AsyncWatchResponse& other); @@ -31,6 +31,7 @@ namespace etcdv3 CompletionQueue cq_; std::unique_ptr> stream; KV::Stub* stub_; + int fromIndex; }; } diff --git a/v3/src/AsyncWatchResponse.cpp b/v3/src/AsyncWatchResponse.cpp index 2f33eef..9fc55b9 100644 --- a/v3/src/AsyncWatchResponse.cpp +++ b/v3/src/AsyncWatchResponse.cpp @@ -56,6 +56,7 @@ etcdv3::AsyncWatchResponse& etcdv3::AsyncWatchResponse::ParseResponse() mvccpb::KeyValue kv; std::map mapValue; + std::map prev_mapValue; for(int cnt =0; cnt < reply.events_size(); cnt++) { @@ -73,7 +74,6 @@ etcdv3::AsyncWatchResponse& etcdv3::AsyncWatchResponse::ParseResponse() { action = "set"; } - //values.push_back(kv); mapValue.emplace(kv.key(), kv); } } @@ -84,7 +84,7 @@ etcdv3::AsyncWatchResponse& etcdv3::AsyncWatchResponse::ParseResponse() //get previous value index - 1 RangeRequest get_request; get_request.set_key(kv.key()); - get_request.set_revision(index - 1); + get_request.set_revision((fromIndex >=0)?fromIndex - 1:index-1); RangeResponse response; ClientContext ctx; @@ -94,10 +94,15 @@ etcdv3::AsyncWatchResponse& etcdv3::AsyncWatchResponse::ParseResponse() { for(int cnt=0; cnt < response.kvs_size(); cnt++) { - prev_values.push_back(response.kvs(cnt)); - } + prev_mapValue.emplace(response.kvs(cnt).key(),response.kvs(cnt)); + } } } + for(auto x: prev_mapValue) + { + prev_values.push_back(x.second); + } + for(auto x: mapValue) { values.push_back(x.second);