diff --git a/etcd/Client.hpp b/etcd/Client.hpp index b74499b..e41dc7e 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -153,6 +153,8 @@ namespace etcd pplx::task send_asyncdelete(std::string const & key, bool recursive); pplx::task send_asyncrm_if(std::string const &key, std::string const &old_value); pplx::task send_asyncrm_if(std::string const &key, int old_index); + pplx::task send_asyncwatch(std::string const & key, bool recursive); + pplx::task send_asyncwatch(std::string const & key, int fromIndex, bool recursive); }; diff --git a/etcd/Response.hpp b/etcd/Response.hpp index 4baba30..15c04c2 100644 --- a/etcd/Response.hpp +++ b/etcd/Response.hpp @@ -29,13 +29,9 @@ namespace etcd { return pplx::task([call]() { - void* got_tag; - bool ok = false; etcd::Response resp; - //blocking - call->cq_.Next(&got_tag, &ok); - GPR_ASSERT(got_tag == (void*)call.get()); + call->waitForResponse(); auto v3resp = call->ParseResponse(); diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ce3bcf7..8c9920c 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/Utils.cpp ../v3/src/grpcClient.cpp ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncPutResponse.cpp ../v3/src/V3BaseResponse.cpp ../v3/src/AsyncDelResponse.cpp ../v3/src/AsyncModifyResponse.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp) +add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/Utils.cpp ../v3/src/grpcClient.cpp ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncPutResponse.cpp ../v3/src/AsyncWatchResponse.cpp ../v3/src/V3BaseResponse.cpp ../v3/src/AsyncDelResponse.cpp ../v3/src/AsyncModifyResponse.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp) set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11) target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++) diff --git a/src/Client.cpp b/src/Client.cpp index 0bf935f..c3bf892 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -4,6 +4,7 @@ #include "v3/include/AsyncRangeResponse.hpp" #include "v3/include/AsyncDelResponse.hpp" #include "v3/include/AsyncModifyResponse.hpp" +#include "v3/include/AsyncWatchResponse.hpp" #include "v3/include/Utils.hpp" #include @@ -32,6 +33,7 @@ etcd::Client::Client(std::string const & address) } std::shared_ptr channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials()); stub_= KV::NewStub(channel); + watchServiceStub= Watch::NewStub(channel); } pplx::task etcd::Client::send_get_request(web::http::uri_builder & uri) @@ -122,21 +124,12 @@ pplx::task etcd::Client::ls(std::string const & key) pplx::task etcd::Client::watch(std::string const & key, bool recursive) { - web::http::uri_builder uri("/v2/keys" + key); - uri.append_query("wait=true"); - if (recursive) - uri.append_query("recursive=true"); - return send_get_request(uri); + return send_asyncwatch(key,recursive); } pplx::task etcd::Client::watch(std::string const & key, int fromIndex, bool recursive) { - web::http::uri_builder uri("/v2/keys" + key); - uri.append_query("wait=true"); - uri.append_query("waitIndex", fromIndex); - if (recursive) - uri.append_query("recursive=true"); - return send_get_request(uri); + return send_asyncwatch(key, fromIndex, recursive); } pplx::task etcd::Client::send_asyncadd(std::string const & key, std::string const & value) @@ -537,4 +530,55 @@ pplx::task etcd::Client::send_asyncrm_if(std::string const &key, return Response::create(call); } +pplx::task etcd::Client::send_asyncwatch(std::string const & key, bool recursive) +{ + std::shared_ptr call(new etcdv3::AsyncWatchResponse()); + call->stream = watchServiceStub->AsyncWatch(&call->context,&call->cq_,(void*)call.get()); + + WatchRequest watch_req; + WatchCreateRequest watch_create_req; + watch_create_req.set_key(key); + + std::string range_end(key); + if(recursive) + { + int ascii = (int)range_end[range_end.length()-1]; + range_end.back() = ascii+1; + watch_create_req.set_range_end(range_end); + } + + + watch_req.mutable_create_request()->CopyFrom(watch_create_req); + call->stream->Write(watch_req, (void*)call.get()); + call->stub_ = stub_.get(); + + return Response::create(call); +} + +pplx::task etcd::Client::send_asyncwatch(std::string const & key, int fromIndex, bool recursive) +{ + std::shared_ptr call(new etcdv3::AsyncWatchResponse()); + call->stream = watchServiceStub->AsyncWatch(&call->context,&call->cq_,(void*)call.get()); + + WatchRequest watch_req; + WatchCreateRequest watch_create_req; + watch_create_req.set_key(key); + watch_create_req.set_start_revision(fromIndex); + + std::string range_end(key); + if(recursive) + { + int ascii = (int)range_end[range_end.length()-1]; + range_end.back() = ascii+1; + watch_create_req.set_range_end(range_end); + } + + + watch_req.mutable_create_request()->CopyFrom(watch_create_req); + call->stream->Write(watch_req, (void*)call.get()); + call->stub_ = stub_.get(); + + return Response::create(call); +} + diff --git a/v3/include/AsyncRangeResponse.hpp b/v3/include/AsyncRangeResponse.hpp index 9b83ded..66d6cae 100644 --- a/v3/include/AsyncRangeResponse.hpp +++ b/v3/include/AsyncRangeResponse.hpp @@ -20,13 +20,14 @@ namespace etcdv3 AsyncRangeResponse(){action = "get";}; AsyncRangeResponse(const AsyncRangeResponse& other); AsyncRangeResponse& operator=(const AsyncRangeResponse& other); + AsyncRangeResponse& ParseResponse(); + void waitForResponse(); RangeResponse reply; etcdserverpb::PutResponse r; Status status; ClientContext context; CompletionQueue cq_; std::unique_ptr> response_reader; - AsyncRangeResponse& ParseResponse(); }; } diff --git a/v3/include/AsyncTxnResponse.hpp b/v3/include/AsyncTxnResponse.hpp index 2fda271..e07607d 100644 --- a/v3/include/AsyncTxnResponse.hpp +++ b/v3/include/AsyncTxnResponse.hpp @@ -20,12 +20,13 @@ namespace etcdv3 AsyncTxnResponse(const std::string act){action = act;}; AsyncTxnResponse(const AsyncTxnResponse& other); AsyncTxnResponse& operator=(const AsyncTxnResponse& other); + AsyncTxnResponse& ParseResponse(); + void waitForResponse(); TxnResponse reply; Status status; ClientContext context; CompletionQueue cq_; std::unique_ptr> response_reader; - AsyncTxnResponse& ParseResponse(); }; } diff --git a/v3/src/AsyncRangeResponse.cpp b/v3/src/AsyncRangeResponse.cpp index 682c7a2..60d197c 100644 --- a/v3/src/AsyncRangeResponse.cpp +++ b/v3/src/AsyncRangeResponse.cpp @@ -22,6 +22,15 @@ etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::operator=(const etcdv3:: return *this; } +void etcdv3::AsyncRangeResponse::waitForResponse() +{ + void* got_tag; + bool ok = false; + + cq_.Next(&got_tag, &ok); + GPR_ASSERT(got_tag == (void*)this); +} + etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::ParseResponse() { index = reply.header().revision(); diff --git a/v3/src/AsyncTxnResponse.cpp b/v3/src/AsyncTxnResponse.cpp index 1ec4014..86d813c 100644 --- a/v3/src/AsyncTxnResponse.cpp +++ b/v3/src/AsyncTxnResponse.cpp @@ -26,6 +26,15 @@ etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::operator=(const etcdv3::Asyn return *this; } +void etcdv3::AsyncTxnResponse::waitForResponse() +{ + void* got_tag; + bool ok = false; + + cq_.Next(&got_tag, &ok); + GPR_ASSERT(got_tag == (void*)this); +} + etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::ParseResponse() {