Added watch functionality
This commit is contained in:
parent
87dc87abdb
commit
b9eb2633d1
|
|
@ -153,6 +153,8 @@ namespace etcd
|
|||
pplx::task<etcd::Response> send_asyncdelete(std::string const & key, bool recursive);
|
||||
pplx::task<etcd::Response> send_asyncrm_if(std::string const &key, std::string const &old_value);
|
||||
pplx::task<etcd::Response> send_asyncrm_if(std::string const &key, int old_index);
|
||||
pplx::task<etcd::Response> send_asyncwatch(std::string const & key, bool recursive);
|
||||
pplx::task<etcd::Response> send_asyncwatch(std::string const & key, int fromIndex, bool recursive);
|
||||
};
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -29,13 +29,9 @@ namespace etcd
|
|||
{
|
||||
return pplx::task<etcd::Response>([call]()
|
||||
{
|
||||
void* got_tag;
|
||||
bool ok = false;
|
||||
etcd::Response resp;
|
||||
|
||||
//blocking
|
||||
call->cq_.Next(&got_tag, &ok);
|
||||
GPR_ASSERT(got_tag == (void*)call.get());
|
||||
call->waitForResponse();
|
||||
|
||||
auto v3resp = call->ParseResponse();
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/Utils.cpp ../v3/src/grpcClient.cpp ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncPutResponse.cpp ../v3/src/V3BaseResponse.cpp ../v3/src/AsyncDelResponse.cpp ../v3/src/AsyncModifyResponse.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp)
|
||||
add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/Utils.cpp ../v3/src/grpcClient.cpp ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncPutResponse.cpp ../v3/src/AsyncWatchResponse.cpp ../v3/src/V3BaseResponse.cpp ../v3/src/AsyncDelResponse.cpp ../v3/src/AsyncModifyResponse.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp)
|
||||
set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11)
|
||||
|
||||
target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++)
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
#include "v3/include/AsyncRangeResponse.hpp"
|
||||
#include "v3/include/AsyncDelResponse.hpp"
|
||||
#include "v3/include/AsyncModifyResponse.hpp"
|
||||
#include "v3/include/AsyncWatchResponse.hpp"
|
||||
#include "v3/include/Utils.hpp"
|
||||
#include <iostream>
|
||||
|
||||
|
|
@ -32,6 +33,7 @@ etcd::Client::Client(std::string const & address)
|
|||
}
|
||||
std::shared_ptr<Channel> channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials());
|
||||
stub_= KV::NewStub(channel);
|
||||
watchServiceStub= Watch::NewStub(channel);
|
||||
}
|
||||
|
||||
pplx::task<etcd::Response> etcd::Client::send_get_request(web::http::uri_builder & uri)
|
||||
|
|
@ -122,21 +124,12 @@ pplx::task<etcd::Response> etcd::Client::ls(std::string const & key)
|
|||
|
||||
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, bool recursive)
|
||||
{
|
||||
web::http::uri_builder uri("/v2/keys" + key);
|
||||
uri.append_query("wait=true");
|
||||
if (recursive)
|
||||
uri.append_query("recursive=true");
|
||||
return send_get_request(uri);
|
||||
return send_asyncwatch(key,recursive);
|
||||
}
|
||||
|
||||
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, int fromIndex, bool recursive)
|
||||
{
|
||||
web::http::uri_builder uri("/v2/keys" + key);
|
||||
uri.append_query("wait=true");
|
||||
uri.append_query("waitIndex", fromIndex);
|
||||
if (recursive)
|
||||
uri.append_query("recursive=true");
|
||||
return send_get_request(uri);
|
||||
return send_asyncwatch(key, fromIndex, recursive);
|
||||
}
|
||||
|
||||
pplx::task<etcd::Response> etcd::Client::send_asyncadd(std::string const & key, std::string const & value)
|
||||
|
|
@ -537,4 +530,55 @@ pplx::task<etcd::Response> etcd::Client::send_asyncrm_if(std::string const &key,
|
|||
return Response::create(call);
|
||||
}
|
||||
|
||||
pplx::task<etcd::Response> etcd::Client::send_asyncwatch(std::string const & key, bool recursive)
|
||||
{
|
||||
std::shared_ptr<etcdv3::AsyncWatchResponse> call(new etcdv3::AsyncWatchResponse());
|
||||
call->stream = watchServiceStub->AsyncWatch(&call->context,&call->cq_,(void*)call.get());
|
||||
|
||||
WatchRequest watch_req;
|
||||
WatchCreateRequest watch_create_req;
|
||||
watch_create_req.set_key(key);
|
||||
|
||||
std::string range_end(key);
|
||||
if(recursive)
|
||||
{
|
||||
int ascii = (int)range_end[range_end.length()-1];
|
||||
range_end.back() = ascii+1;
|
||||
watch_create_req.set_range_end(range_end);
|
||||
}
|
||||
|
||||
|
||||
watch_req.mutable_create_request()->CopyFrom(watch_create_req);
|
||||
call->stream->Write(watch_req, (void*)call.get());
|
||||
call->stub_ = stub_.get();
|
||||
|
||||
return Response::create(call);
|
||||
}
|
||||
|
||||
pplx::task<etcd::Response> etcd::Client::send_asyncwatch(std::string const & key, int fromIndex, bool recursive)
|
||||
{
|
||||
std::shared_ptr<etcdv3::AsyncWatchResponse> call(new etcdv3::AsyncWatchResponse());
|
||||
call->stream = watchServiceStub->AsyncWatch(&call->context,&call->cq_,(void*)call.get());
|
||||
|
||||
WatchRequest watch_req;
|
||||
WatchCreateRequest watch_create_req;
|
||||
watch_create_req.set_key(key);
|
||||
watch_create_req.set_start_revision(fromIndex);
|
||||
|
||||
std::string range_end(key);
|
||||
if(recursive)
|
||||
{
|
||||
int ascii = (int)range_end[range_end.length()-1];
|
||||
range_end.back() = ascii+1;
|
||||
watch_create_req.set_range_end(range_end);
|
||||
}
|
||||
|
||||
|
||||
watch_req.mutable_create_request()->CopyFrom(watch_create_req);
|
||||
call->stream->Write(watch_req, (void*)call.get());
|
||||
call->stub_ = stub_.get();
|
||||
|
||||
return Response::create(call);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -20,13 +20,14 @@ namespace etcdv3
|
|||
AsyncRangeResponse(){action = "get";};
|
||||
AsyncRangeResponse(const AsyncRangeResponse& other);
|
||||
AsyncRangeResponse& operator=(const AsyncRangeResponse& other);
|
||||
AsyncRangeResponse& ParseResponse();
|
||||
void waitForResponse();
|
||||
RangeResponse reply;
|
||||
etcdserverpb::PutResponse r;
|
||||
Status status;
|
||||
ClientContext context;
|
||||
CompletionQueue cq_;
|
||||
std::unique_ptr<ClientAsyncResponseReader<RangeResponse>> response_reader;
|
||||
AsyncRangeResponse& ParseResponse();
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -20,12 +20,13 @@ namespace etcdv3
|
|||
AsyncTxnResponse(const std::string act){action = act;};
|
||||
AsyncTxnResponse(const AsyncTxnResponse& other);
|
||||
AsyncTxnResponse& operator=(const AsyncTxnResponse& other);
|
||||
AsyncTxnResponse& ParseResponse();
|
||||
void waitForResponse();
|
||||
TxnResponse reply;
|
||||
Status status;
|
||||
ClientContext context;
|
||||
CompletionQueue cq_;
|
||||
std::unique_ptr<ClientAsyncResponseReader<TxnResponse>> response_reader;
|
||||
AsyncTxnResponse& ParseResponse();
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -22,6 +22,15 @@ etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::operator=(const etcdv3::
|
|||
return *this;
|
||||
}
|
||||
|
||||
void etcdv3::AsyncRangeResponse::waitForResponse()
|
||||
{
|
||||
void* got_tag;
|
||||
bool ok = false;
|
||||
|
||||
cq_.Next(&got_tag, &ok);
|
||||
GPR_ASSERT(got_tag == (void*)this);
|
||||
}
|
||||
|
||||
etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::ParseResponse()
|
||||
{
|
||||
index = reply.header().revision();
|
||||
|
|
|
|||
|
|
@ -26,6 +26,15 @@ etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::operator=(const etcdv3::Asyn
|
|||
return *this;
|
||||
}
|
||||
|
||||
void etcdv3::AsyncTxnResponse::waitForResponse()
|
||||
{
|
||||
void* got_tag;
|
||||
bool ok = false;
|
||||
|
||||
cq_.Next(&got_tag, &ok);
|
||||
GPR_ASSERT(got_tag == (void*)this);
|
||||
}
|
||||
|
||||
etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::ParseResponse()
|
||||
{
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue