Merge remote-tracking branch 'origin/other_dev' into other_dev
# Conflicts: # etcd/Client.hpp # src/Client.cpp
This commit is contained in:
commit
618995c080
|
|
@ -155,6 +155,8 @@ namespace etcd
|
||||||
pplx::task<etcd::Response> send_asyncdelete(std::string const & key, bool recursive);
|
pplx::task<etcd::Response> send_asyncdelete(std::string const & key, bool recursive);
|
||||||
pplx::task<etcd::Response> send_asyncrm_if(std::string const &key, std::string const &old_value);
|
pplx::task<etcd::Response> send_asyncrm_if(std::string const &key, std::string const &old_value);
|
||||||
pplx::task<etcd::Response> send_asyncrm_if(std::string const &key, int old_index);
|
pplx::task<etcd::Response> send_asyncrm_if(std::string const &key, int old_index);
|
||||||
|
pplx::task<etcd::Response> send_asyncwatch(std::string const & key, bool recursive);
|
||||||
|
pplx::task<etcd::Response> send_asyncwatch(std::string const & key, int fromIndex, bool recursive);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::shared_ptr<etcdv3::AsyncTxnResponse> initiate_transaction(const std::string &operation,
|
std::shared_ptr<etcdv3::AsyncTxnResponse> initiate_transaction(const std::string &operation,
|
||||||
|
|
|
||||||
|
|
@ -29,13 +29,9 @@ namespace etcd
|
||||||
{
|
{
|
||||||
return pplx::task<etcd::Response>([call]()
|
return pplx::task<etcd::Response>([call]()
|
||||||
{
|
{
|
||||||
void* got_tag;
|
|
||||||
bool ok = false;
|
|
||||||
etcd::Response resp;
|
etcd::Response resp;
|
||||||
|
|
||||||
//blocking
|
call->waitForResponse();
|
||||||
call->cq_.Next(&got_tag, &ok);
|
|
||||||
GPR_ASSERT(got_tag == (void*)call.get());
|
|
||||||
|
|
||||||
auto v3resp = call->ParseResponse();
|
auto v3resp = call->ParseResponse();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/Utils.cpp ../v3/src/grpcClient.cpp ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncPutResponse.cpp ../v3/src/Transaction.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp)
|
add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/Utils.cpp ../v3/src/grpcClient.cpp ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncPutResponse.cpp ../v3/src/AsyncWatchResponse.cpp ../v3/src/Transaction.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp)
|
||||||
set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11)
|
set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11)
|
||||||
|
|
||||||
target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++)
|
target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++)
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@
|
||||||
#include "etcd/Client.hpp"
|
#include "etcd/Client.hpp"
|
||||||
#include "v3/include/AsyncTxnResponse.hpp"
|
#include "v3/include/AsyncTxnResponse.hpp"
|
||||||
#include "v3/include/AsyncRangeResponse.hpp"
|
#include "v3/include/AsyncRangeResponse.hpp"
|
||||||
|
#include "v3/include/AsyncWatchResponse.hpp"
|
||||||
#include "v3/include/Utils.hpp"
|
#include "v3/include/Utils.hpp"
|
||||||
#include "v3/include/Transaction.hpp"
|
#include "v3/include/Transaction.hpp"
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
|
@ -31,6 +32,7 @@ etcd::Client::Client(std::string const & address)
|
||||||
}
|
}
|
||||||
std::shared_ptr<Channel> channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials());
|
std::shared_ptr<Channel> channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials());
|
||||||
stub_= KV::NewStub(channel);
|
stub_= KV::NewStub(channel);
|
||||||
|
watchServiceStub= Watch::NewStub(channel);
|
||||||
}
|
}
|
||||||
|
|
||||||
pplx::task<etcd::Response> etcd::Client::send_get_request(web::http::uri_builder & uri)
|
pplx::task<etcd::Response> etcd::Client::send_get_request(web::http::uri_builder & uri)
|
||||||
|
|
@ -129,22 +131,13 @@ pplx::task<etcd::Response> etcd::Client::ls(std::string const & key)
|
||||||
|
|
||||||
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, bool recursive)
|
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, bool recursive)
|
||||||
{
|
{
|
||||||
web::http::uri_builder uri("/v2/keys" + key);
|
return send_asyncwatch(key,recursive);
|
||||||
uri.append_query("wait=true");
|
|
||||||
if (recursive)
|
|
||||||
uri.append_query("recursive=true");
|
|
||||||
return send_get_request(uri);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, int fromIndex, bool recursive)
|
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, int fromIndex, bool recursive)
|
||||||
{
|
{
|
||||||
web::http::uri_builder uri("/v2/keys" + key);
|
return send_asyncwatch(key, fromIndex, recursive);
|
||||||
uri.append_query("wait=true");
|
|
||||||
uri.append_query("waitIndex", fromIndex);
|
|
||||||
if (recursive)
|
|
||||||
uri.append_query("recursive=true");
|
|
||||||
return send_get_request(uri);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -293,3 +286,54 @@ pplx::task<etcd::Response> etcd::Client::send_asyncrm_if(std::string const &key,
|
||||||
std::shared_ptr<etcdv3::AsyncTxnResponse> call = initiate_transaction("compareAndDelete", transaction);
|
std::shared_ptr<etcdv3::AsyncTxnResponse> call = initiate_transaction("compareAndDelete", transaction);
|
||||||
return Response::create(call);
|
return Response::create(call);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pplx::task<etcd::Response> etcd::Client::send_asyncwatch(std::string const & key, bool recursive)
|
||||||
|
{
|
||||||
|
std::shared_ptr<etcdv3::AsyncWatchResponse> call(new etcdv3::AsyncWatchResponse());
|
||||||
|
call->stream = watchServiceStub->AsyncWatch(&call->context,&call->cq_,(void*)call.get());
|
||||||
|
|
||||||
|
WatchRequest watch_req;
|
||||||
|
WatchCreateRequest watch_create_req;
|
||||||
|
watch_create_req.set_key(key);
|
||||||
|
|
||||||
|
std::string range_end(key);
|
||||||
|
if(recursive)
|
||||||
|
{
|
||||||
|
int ascii = (int)range_end[range_end.length()-1];
|
||||||
|
range_end.back() = ascii+1;
|
||||||
|
watch_create_req.set_range_end(range_end);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
watch_req.mutable_create_request()->CopyFrom(watch_create_req);
|
||||||
|
call->stream->Write(watch_req, (void*)call.get());
|
||||||
|
call->stub_ = stub_.get();
|
||||||
|
|
||||||
|
return Response::create(call);
|
||||||
|
}
|
||||||
|
|
||||||
|
pplx::task<etcd::Response> etcd::Client::send_asyncwatch(std::string const & key, int fromIndex, bool recursive)
|
||||||
|
{
|
||||||
|
std::shared_ptr<etcdv3::AsyncWatchResponse> call(new etcdv3::AsyncWatchResponse());
|
||||||
|
call->stream = watchServiceStub->AsyncWatch(&call->context,&call->cq_,(void*)call.get());
|
||||||
|
|
||||||
|
WatchRequest watch_req;
|
||||||
|
WatchCreateRequest watch_create_req;
|
||||||
|
watch_create_req.set_key(key);
|
||||||
|
watch_create_req.set_start_revision(fromIndex);
|
||||||
|
|
||||||
|
std::string range_end(key);
|
||||||
|
if(recursive)
|
||||||
|
{
|
||||||
|
int ascii = (int)range_end[range_end.length()-1];
|
||||||
|
range_end.back() = ascii+1;
|
||||||
|
watch_create_req.set_range_end(range_end);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
watch_req.mutable_create_request()->CopyFrom(watch_create_req);
|
||||||
|
call->stream->Write(watch_req, (void*)call.get());
|
||||||
|
call->stub_ = stub_.get();
|
||||||
|
|
||||||
|
return Response::create(call);
|
||||||
|
}
|
||||||
|
|
|
||||||
131
tst/EtcdTest.cpp
131
tst/EtcdTest.cpp
|
|
@ -213,71 +213,72 @@ TEST_CASE("delete a directory")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
//TEST_CASE("wait for a value change")
|
TEST_CASE("wait for a value change")
|
||||||
//{
|
{
|
||||||
// etcd::Client etcd("http://127.0.0.1:2379");
|
etcd::Client etcd("http://127.0.0.1:2379");
|
||||||
// etcd.set("/test/key1", "42").wait();
|
etcd.set("/test/key1", "42").wait();
|
||||||
//
|
|
||||||
// pplx::task<etcd::Response> res = etcd.watch("/test/key1");
|
pplx::task<etcd::Response> res = etcd.watch("/test/key1");
|
||||||
// CHECK(!res.is_done());
|
CHECK(!res.is_done());
|
||||||
// sleep(1);
|
sleep(1);
|
||||||
// CHECK(!res.is_done());
|
CHECK(!res.is_done());
|
||||||
//
|
|
||||||
// etcd.set("/test/key1", "43").get();
|
etcd.set("/test/key1", "43").get();
|
||||||
// sleep(1);
|
sleep(1);
|
||||||
// REQUIRE(res.is_done());
|
REQUIRE(res.is_done());
|
||||||
// REQUIRE("set" == res.get().action());
|
REQUIRE("set" == res.get().action());
|
||||||
// CHECK("43" == res.get().value().as_string());
|
CHECK("43" == res.get().value().as_string());
|
||||||
//}
|
}
|
||||||
//
|
|
||||||
//TEST_CASE("wait for a directory change")
|
TEST_CASE("wait for a directory change")
|
||||||
//{
|
{
|
||||||
// etcd::Client etcd("http://127.0.0.1:4001");
|
etcd::Client etcd("http://127.0.0.1:2379");
|
||||||
//
|
|
||||||
// pplx::task<etcd::Response> res = etcd.watch("/test", true);
|
pplx::task<etcd::Response> res = etcd.watch("/test", true);
|
||||||
// CHECK(!res.is_done());
|
CHECK(!res.is_done());
|
||||||
// sleep(1);
|
sleep(1);
|
||||||
// CHECK(!res.is_done());
|
CHECK(!res.is_done());
|
||||||
//
|
|
||||||
// etcd.add("/test/key4", "44").wait();
|
etcd.add("/test/key4", "44").wait();
|
||||||
// sleep(1);
|
sleep(1);
|
||||||
// REQUIRE(res.is_done());
|
REQUIRE(res.is_done());
|
||||||
// CHECK("create" == res.get().action());
|
CHECK("create" == res.get().action());
|
||||||
// CHECK("44" == res.get().value().as_string());
|
CHECK("44" == res.get().value().as_string());
|
||||||
//
|
|
||||||
// pplx::task<etcd::Response> res2 = etcd.watch("/test", true);
|
pplx::task<etcd::Response> res2 = etcd.watch("/test", true);
|
||||||
// CHECK(!res2.is_done());
|
CHECK(!res2.is_done());
|
||||||
// sleep(1);
|
sleep(1);
|
||||||
// CHECK(!res2.is_done());
|
CHECK(!res2.is_done());
|
||||||
//
|
|
||||||
// etcd.set("/test/key4", "45").wait();
|
etcd.set("/test/key4", "45").wait();
|
||||||
// REQUIRE(res2.is_done());
|
sleep(1);
|
||||||
// CHECK("set" == res2.get().action());
|
REQUIRE(res2.is_done());
|
||||||
// CHECK("45" == res2.get().value().as_string());
|
CHECK("set" == res2.get().action());
|
||||||
//}
|
CHECK("45" == res2.get().value().as_string());
|
||||||
//
|
}
|
||||||
//TEST_CASE("watch changes in the past")
|
|
||||||
//{
|
TEST_CASE("watch changes in the past")
|
||||||
// etcd::Client etcd("http://127.0.0.1:4001");
|
{
|
||||||
//
|
etcd::Client etcd("http://127.0.0.1:2379");
|
||||||
// int index = etcd.set("/test/key1", "42").get().index();
|
|
||||||
//
|
int index = etcd.set("/test/key1", "42").get().index();
|
||||||
// etcd.set("/test/key1", "43").wait();
|
|
||||||
// etcd.set("/test/key1", "44").wait();
|
etcd.set("/test/key1", "43").wait();
|
||||||
// etcd.set("/test/key1", "45").wait();
|
etcd.set("/test/key1", "44").wait();
|
||||||
//
|
etcd.set("/test/key1", "45").wait();
|
||||||
// etcd::Response res = etcd.watch("/test/key1", ++index).get();
|
|
||||||
// CHECK("set" == res.action());
|
etcd::Response res = etcd.watch("/test/key1", ++index).get();
|
||||||
// CHECK("43" == res.value().as_string());
|
CHECK("set" == res.action());
|
||||||
//
|
CHECK("43" == res.value().as_string());
|
||||||
// res = etcd.watch("/test/key1", ++index).get();
|
|
||||||
// CHECK("set" == res.action());
|
res = etcd.watch("/test/key1", ++index).get();
|
||||||
// CHECK("44" == res.value().as_string());
|
CHECK("set" == res.action());
|
||||||
//
|
CHECK("44" == res.value().as_string());
|
||||||
// res = etcd.watch("/test", ++index, true).get();
|
|
||||||
// CHECK("set" == res.action());
|
res = etcd.watch("/test", ++index, true).get();
|
||||||
// CHECK("45" == res.value().as_string());
|
CHECK("set" == res.action());
|
||||||
//}
|
CHECK("45" == res.value().as_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
TEST_CASE("cleanup")
|
TEST_CASE("cleanup")
|
||||||
|
|
|
||||||
|
|
@ -20,13 +20,14 @@ namespace etcdv3
|
||||||
AsyncRangeResponse(){action = "get";};
|
AsyncRangeResponse(){action = "get";};
|
||||||
AsyncRangeResponse(const AsyncRangeResponse& other);
|
AsyncRangeResponse(const AsyncRangeResponse& other);
|
||||||
AsyncRangeResponse& operator=(const AsyncRangeResponse& other);
|
AsyncRangeResponse& operator=(const AsyncRangeResponse& other);
|
||||||
|
AsyncRangeResponse& ParseResponse();
|
||||||
|
void waitForResponse();
|
||||||
RangeResponse reply;
|
RangeResponse reply;
|
||||||
etcdserverpb::PutResponse r;
|
etcdserverpb::PutResponse r;
|
||||||
Status status;
|
Status status;
|
||||||
ClientContext context;
|
ClientContext context;
|
||||||
CompletionQueue cq_;
|
CompletionQueue cq_;
|
||||||
std::unique_ptr<ClientAsyncResponseReader<RangeResponse>> response_reader;
|
std::unique_ptr<ClientAsyncResponseReader<RangeResponse>> response_reader;
|
||||||
AsyncRangeResponse& ParseResponse();
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -20,12 +20,13 @@ namespace etcdv3
|
||||||
AsyncTxnResponse(const std::string act){action = act;};
|
AsyncTxnResponse(const std::string act){action = act;};
|
||||||
AsyncTxnResponse(const AsyncTxnResponse& other);
|
AsyncTxnResponse(const AsyncTxnResponse& other);
|
||||||
AsyncTxnResponse& operator=(const AsyncTxnResponse& other);
|
AsyncTxnResponse& operator=(const AsyncTxnResponse& other);
|
||||||
|
AsyncTxnResponse& ParseResponse();
|
||||||
|
void waitForResponse();
|
||||||
TxnResponse reply;
|
TxnResponse reply;
|
||||||
Status status;
|
Status status;
|
||||||
ClientContext context;
|
ClientContext context;
|
||||||
CompletionQueue cq_;
|
CompletionQueue cq_;
|
||||||
std::unique_ptr<ClientAsyncResponseReader<TxnResponse>> response_reader;
|
std::unique_ptr<ClientAsyncResponseReader<TxnResponse>> response_reader;
|
||||||
AsyncTxnResponse& ParseResponse();
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,37 @@
|
||||||
|
#ifndef __ASYNC_WATCH_HPP__
|
||||||
|
#define __ASYNC_WATCH_HPP__
|
||||||
|
|
||||||
|
#include <grpc++/grpc++.h>
|
||||||
|
#include "proto/rpc.grpc.pb.h"
|
||||||
|
#include "v3/include/V3Response.hpp"
|
||||||
|
|
||||||
|
|
||||||
|
using grpc::ClientAsyncReaderWriter;
|
||||||
|
using grpc::ClientContext;
|
||||||
|
using grpc::CompletionQueue;
|
||||||
|
using grpc::Status;
|
||||||
|
using etcdserverpb::WatchRequest;
|
||||||
|
using etcdserverpb::WatchResponse;
|
||||||
|
using etcdserverpb::KV;
|
||||||
|
|
||||||
|
namespace etcdv3
|
||||||
|
{
|
||||||
|
class AsyncWatchResponse : public etcdv3::V3Response
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
AsyncWatchResponse(){};
|
||||||
|
AsyncWatchResponse(const std::string act){action = act;};
|
||||||
|
AsyncWatchResponse(const AsyncWatchResponse& other);
|
||||||
|
AsyncWatchResponse& operator=(const AsyncWatchResponse& other);
|
||||||
|
AsyncWatchResponse& ParseResponse();
|
||||||
|
void waitForResponse();
|
||||||
|
WatchResponse reply;
|
||||||
|
Status status;
|
||||||
|
ClientContext context;
|
||||||
|
CompletionQueue cq_;
|
||||||
|
std::unique_ptr<ClientAsyncReaderWriter<WatchRequest,WatchResponse>> stream;
|
||||||
|
KV::Stub* stub_;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
@ -22,6 +22,15 @@ etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::operator=(const etcdv3::
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void etcdv3::AsyncRangeResponse::waitForResponse()
|
||||||
|
{
|
||||||
|
void* got_tag;
|
||||||
|
bool ok = false;
|
||||||
|
|
||||||
|
cq_.Next(&got_tag, &ok);
|
||||||
|
GPR_ASSERT(got_tag == (void*)this);
|
||||||
|
}
|
||||||
|
|
||||||
etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::ParseResponse()
|
etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::ParseResponse()
|
||||||
{
|
{
|
||||||
index = reply.header().revision();
|
index = reply.header().revision();
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,15 @@ etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::operator=(const etcdv3::Asyn
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void etcdv3::AsyncTxnResponse::waitForResponse()
|
||||||
|
{
|
||||||
|
void* got_tag;
|
||||||
|
bool ok = false;
|
||||||
|
|
||||||
|
cq_.Next(&got_tag, &ok);
|
||||||
|
GPR_ASSERT(got_tag == (void*)this);
|
||||||
|
}
|
||||||
|
|
||||||
etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::ParseResponse()
|
etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::ParseResponse()
|
||||||
{
|
{
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,107 @@
|
||||||
|
#include "v3/include/AsyncWatchResponse.hpp"
|
||||||
|
|
||||||
|
using etcdserverpb::RangeRequest;
|
||||||
|
using etcdserverpb::RangeResponse;
|
||||||
|
|
||||||
|
|
||||||
|
etcdv3::AsyncWatchResponse::AsyncWatchResponse(const etcdv3::AsyncWatchResponse& other)
|
||||||
|
{
|
||||||
|
error_code = other.error_code;
|
||||||
|
error_message = other.error_message;
|
||||||
|
index = other.index;
|
||||||
|
action = other.action;
|
||||||
|
values = other.values;
|
||||||
|
prev_values = other.prev_values;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
etcdv3::AsyncWatchResponse& etcdv3::AsyncWatchResponse::operator=(const etcdv3::AsyncWatchResponse& other)
|
||||||
|
{
|
||||||
|
error_code = other.error_code;
|
||||||
|
error_message = other.error_message;
|
||||||
|
index = other.index;
|
||||||
|
action = other.action;
|
||||||
|
values = other.values;
|
||||||
|
prev_values = other.prev_values;
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
void etcdv3::AsyncWatchResponse::waitForResponse()
|
||||||
|
{
|
||||||
|
void* got_tag;
|
||||||
|
bool ok = false;
|
||||||
|
|
||||||
|
stream->Read(&reply, (void*)3);
|
||||||
|
while(cq_.Next(&got_tag, &ok))
|
||||||
|
{
|
||||||
|
if(got_tag == (void*)3)
|
||||||
|
{
|
||||||
|
if(reply.events_size())
|
||||||
|
{
|
||||||
|
stream->WritesDone((void*)this);
|
||||||
|
cq_.Next(&got_tag, &ok);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
stream->Read(&reply, (void*)3);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
etcdv3::AsyncWatchResponse& etcdv3::AsyncWatchResponse::ParseResponse()
|
||||||
|
{
|
||||||
|
index = reply.header().revision();
|
||||||
|
|
||||||
|
mvccpb::KeyValue kv;
|
||||||
|
std::map<std::string, mvccpb::KeyValue> mapValue;
|
||||||
|
|
||||||
|
for(int cnt =0; cnt < reply.events_size(); cnt++)
|
||||||
|
{
|
||||||
|
auto event = reply.events(cnt);
|
||||||
|
if(mvccpb::Event::EventType::Event_EventType_PUT == event.type())
|
||||||
|
{
|
||||||
|
if(event.has_kv())
|
||||||
|
{
|
||||||
|
kv = event.kv();
|
||||||
|
if(kv.version() == 1)
|
||||||
|
{
|
||||||
|
action = "create";
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
action = "set";
|
||||||
|
}
|
||||||
|
//values.push_back(kv);
|
||||||
|
mapValue.emplace(kv.key(), kv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if(mvccpb::Event::EventType::Event_EventType_DELETE == event.type())
|
||||||
|
{
|
||||||
|
action = "delete";
|
||||||
|
}
|
||||||
|
//get previous value index - 1
|
||||||
|
RangeRequest get_request;
|
||||||
|
get_request.set_key(kv.key());
|
||||||
|
get_request.set_revision(index - 1);
|
||||||
|
|
||||||
|
RangeResponse response;
|
||||||
|
ClientContext ctx;
|
||||||
|
|
||||||
|
Status result = stub_->Range(&ctx, get_request, &response);
|
||||||
|
if (result.ok())
|
||||||
|
{
|
||||||
|
for(int cnt=0; cnt < response.kvs_size(); cnt++)
|
||||||
|
{
|
||||||
|
prev_values.push_back(response.kvs(cnt));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for(auto x: mapValue)
|
||||||
|
{
|
||||||
|
values.push_back(x.second);
|
||||||
|
}
|
||||||
|
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue