From b9eb2633d1417576fab1433e9f293058859f3076 Mon Sep 17 00:00:00 2001 From: arches Date: Tue, 21 Jun 2016 09:19:26 -0400 Subject: [PATCH 1/3] Added watch functionality --- etcd/Client.hpp | 2 + etcd/Response.hpp | 6 +-- src/CMakeLists.txt | 2 +- src/Client.cpp | 66 +++++++++++++++++++++++++------ v3/include/AsyncRangeResponse.hpp | 3 +- v3/include/AsyncTxnResponse.hpp | 3 +- v3/src/AsyncRangeResponse.cpp | 9 +++++ v3/src/AsyncTxnResponse.cpp | 9 +++++ 8 files changed, 81 insertions(+), 19 deletions(-) diff --git a/etcd/Client.hpp b/etcd/Client.hpp index b74499b..e41dc7e 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -153,6 +153,8 @@ namespace etcd pplx::task send_asyncdelete(std::string const & key, bool recursive); pplx::task send_asyncrm_if(std::string const &key, std::string const &old_value); pplx::task send_asyncrm_if(std::string const &key, int old_index); + pplx::task send_asyncwatch(std::string const & key, bool recursive); + pplx::task send_asyncwatch(std::string const & key, int fromIndex, bool recursive); }; diff --git a/etcd/Response.hpp b/etcd/Response.hpp index 4baba30..15c04c2 100644 --- a/etcd/Response.hpp +++ b/etcd/Response.hpp @@ -29,13 +29,9 @@ namespace etcd { return pplx::task([call]() { - void* got_tag; - bool ok = false; etcd::Response resp; - //blocking - call->cq_.Next(&got_tag, &ok); - GPR_ASSERT(got_tag == (void*)call.get()); + call->waitForResponse(); auto v3resp = call->ParseResponse(); diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ce3bcf7..8c9920c 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/Utils.cpp ../v3/src/grpcClient.cpp ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncPutResponse.cpp ../v3/src/V3BaseResponse.cpp ../v3/src/AsyncDelResponse.cpp ../v3/src/AsyncModifyResponse.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp) +add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/Utils.cpp ../v3/src/grpcClient.cpp ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncPutResponse.cpp ../v3/src/AsyncWatchResponse.cpp ../v3/src/V3BaseResponse.cpp ../v3/src/AsyncDelResponse.cpp ../v3/src/AsyncModifyResponse.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp) set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11) target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++) diff --git a/src/Client.cpp b/src/Client.cpp index 0bf935f..c3bf892 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -4,6 +4,7 @@ #include "v3/include/AsyncRangeResponse.hpp" #include "v3/include/AsyncDelResponse.hpp" #include "v3/include/AsyncModifyResponse.hpp" +#include "v3/include/AsyncWatchResponse.hpp" #include "v3/include/Utils.hpp" #include @@ -32,6 +33,7 @@ etcd::Client::Client(std::string const & address) } std::shared_ptr channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials()); stub_= KV::NewStub(channel); + watchServiceStub= Watch::NewStub(channel); } pplx::task etcd::Client::send_get_request(web::http::uri_builder & uri) @@ -122,21 +124,12 @@ pplx::task etcd::Client::ls(std::string const & key) pplx::task etcd::Client::watch(std::string const & key, bool recursive) { - web::http::uri_builder uri("/v2/keys" + key); - uri.append_query("wait=true"); - if (recursive) - uri.append_query("recursive=true"); - return send_get_request(uri); + return send_asyncwatch(key,recursive); } pplx::task etcd::Client::watch(std::string const & key, int fromIndex, bool recursive) { - web::http::uri_builder uri("/v2/keys" + key); - uri.append_query("wait=true"); - uri.append_query("waitIndex", fromIndex); - if (recursive) - uri.append_query("recursive=true"); - return send_get_request(uri); + return send_asyncwatch(key, fromIndex, recursive); } pplx::task etcd::Client::send_asyncadd(std::string const & key, std::string const & value) @@ -537,4 +530,55 @@ pplx::task etcd::Client::send_asyncrm_if(std::string const &key, return Response::create(call); } +pplx::task etcd::Client::send_asyncwatch(std::string const & key, bool recursive) +{ + std::shared_ptr call(new etcdv3::AsyncWatchResponse()); + call->stream = watchServiceStub->AsyncWatch(&call->context,&call->cq_,(void*)call.get()); + + WatchRequest watch_req; + WatchCreateRequest watch_create_req; + watch_create_req.set_key(key); + + std::string range_end(key); + if(recursive) + { + int ascii = (int)range_end[range_end.length()-1]; + range_end.back() = ascii+1; + watch_create_req.set_range_end(range_end); + } + + + watch_req.mutable_create_request()->CopyFrom(watch_create_req); + call->stream->Write(watch_req, (void*)call.get()); + call->stub_ = stub_.get(); + + return Response::create(call); +} + +pplx::task etcd::Client::send_asyncwatch(std::string const & key, int fromIndex, bool recursive) +{ + std::shared_ptr call(new etcdv3::AsyncWatchResponse()); + call->stream = watchServiceStub->AsyncWatch(&call->context,&call->cq_,(void*)call.get()); + + WatchRequest watch_req; + WatchCreateRequest watch_create_req; + watch_create_req.set_key(key); + watch_create_req.set_start_revision(fromIndex); + + std::string range_end(key); + if(recursive) + { + int ascii = (int)range_end[range_end.length()-1]; + range_end.back() = ascii+1; + watch_create_req.set_range_end(range_end); + } + + + watch_req.mutable_create_request()->CopyFrom(watch_create_req); + call->stream->Write(watch_req, (void*)call.get()); + call->stub_ = stub_.get(); + + return Response::create(call); +} + diff --git a/v3/include/AsyncRangeResponse.hpp b/v3/include/AsyncRangeResponse.hpp index 9b83ded..66d6cae 100644 --- a/v3/include/AsyncRangeResponse.hpp +++ b/v3/include/AsyncRangeResponse.hpp @@ -20,13 +20,14 @@ namespace etcdv3 AsyncRangeResponse(){action = "get";}; AsyncRangeResponse(const AsyncRangeResponse& other); AsyncRangeResponse& operator=(const AsyncRangeResponse& other); + AsyncRangeResponse& ParseResponse(); + void waitForResponse(); RangeResponse reply; etcdserverpb::PutResponse r; Status status; ClientContext context; CompletionQueue cq_; std::unique_ptr> response_reader; - AsyncRangeResponse& ParseResponse(); }; } diff --git a/v3/include/AsyncTxnResponse.hpp b/v3/include/AsyncTxnResponse.hpp index 2fda271..e07607d 100644 --- a/v3/include/AsyncTxnResponse.hpp +++ b/v3/include/AsyncTxnResponse.hpp @@ -20,12 +20,13 @@ namespace etcdv3 AsyncTxnResponse(const std::string act){action = act;}; AsyncTxnResponse(const AsyncTxnResponse& other); AsyncTxnResponse& operator=(const AsyncTxnResponse& other); + AsyncTxnResponse& ParseResponse(); + void waitForResponse(); TxnResponse reply; Status status; ClientContext context; CompletionQueue cq_; std::unique_ptr> response_reader; - AsyncTxnResponse& ParseResponse(); }; } diff --git a/v3/src/AsyncRangeResponse.cpp b/v3/src/AsyncRangeResponse.cpp index 682c7a2..60d197c 100644 --- a/v3/src/AsyncRangeResponse.cpp +++ b/v3/src/AsyncRangeResponse.cpp @@ -22,6 +22,15 @@ etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::operator=(const etcdv3:: return *this; } +void etcdv3::AsyncRangeResponse::waitForResponse() +{ + void* got_tag; + bool ok = false; + + cq_.Next(&got_tag, &ok); + GPR_ASSERT(got_tag == (void*)this); +} + etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::ParseResponse() { index = reply.header().revision(); diff --git a/v3/src/AsyncTxnResponse.cpp b/v3/src/AsyncTxnResponse.cpp index 1ec4014..86d813c 100644 --- a/v3/src/AsyncTxnResponse.cpp +++ b/v3/src/AsyncTxnResponse.cpp @@ -26,6 +26,15 @@ etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::operator=(const etcdv3::Asyn return *this; } +void etcdv3::AsyncTxnResponse::waitForResponse() +{ + void* got_tag; + bool ok = false; + + cq_.Next(&got_tag, &ok); + GPR_ASSERT(got_tag == (void*)this); +} + etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::ParseResponse() { From c467a239582a605adf0eb34d0fdf55a5267dd381 Mon Sep 17 00:00:00 2001 From: arches Date: Tue, 21 Jun 2016 09:36:31 -0400 Subject: [PATCH 2/3] Added Watch implementation --- v3/include/AsyncWatchResponse.hpp | 37 +++++++++++ v3/src/AsyncWatchResponse.cpp | 107 ++++++++++++++++++++++++++++++ 2 files changed, 144 insertions(+) create mode 100644 v3/include/AsyncWatchResponse.hpp create mode 100644 v3/src/AsyncWatchResponse.cpp diff --git a/v3/include/AsyncWatchResponse.hpp b/v3/include/AsyncWatchResponse.hpp new file mode 100644 index 0000000..5305345 --- /dev/null +++ b/v3/include/AsyncWatchResponse.hpp @@ -0,0 +1,37 @@ +#ifndef __ASYNC_WATCH_HPP__ +#define __ASYNC_WATCH_HPP__ + +#include +#include "proto/rpc.grpc.pb.h" +#include "v3/include/V3Response.hpp" + + +using grpc::ClientAsyncReaderWriter; +using grpc::ClientContext; +using grpc::CompletionQueue; +using grpc::Status; +using etcdserverpb::WatchRequest; +using etcdserverpb::WatchResponse; +using etcdserverpb::KV; + +namespace etcdv3 +{ + class AsyncWatchResponse : public etcdv3::V3Response + { + public: + AsyncWatchResponse(){}; + AsyncWatchResponse(const std::string act){action = act;}; + AsyncWatchResponse(const AsyncWatchResponse& other); + AsyncWatchResponse& operator=(const AsyncWatchResponse& other); + AsyncWatchResponse& ParseResponse(); + void waitForResponse(); + WatchResponse reply; + Status status; + ClientContext context; + CompletionQueue cq_; + std::unique_ptr> stream; + KV::Stub* stub_; + }; +} + +#endif diff --git a/v3/src/AsyncWatchResponse.cpp b/v3/src/AsyncWatchResponse.cpp new file mode 100644 index 0000000..2f33eef --- /dev/null +++ b/v3/src/AsyncWatchResponse.cpp @@ -0,0 +1,107 @@ +#include "v3/include/AsyncWatchResponse.hpp" + +using etcdserverpb::RangeRequest; +using etcdserverpb::RangeResponse; + + +etcdv3::AsyncWatchResponse::AsyncWatchResponse(const etcdv3::AsyncWatchResponse& other) +{ + error_code = other.error_code; + error_message = other.error_message; + index = other.index; + action = other.action; + values = other.values; + prev_values = other.prev_values; + +} + +etcdv3::AsyncWatchResponse& etcdv3::AsyncWatchResponse::operator=(const etcdv3::AsyncWatchResponse& other) +{ + error_code = other.error_code; + error_message = other.error_message; + index = other.index; + action = other.action; + values = other.values; + prev_values = other.prev_values; + return *this; +} + +void etcdv3::AsyncWatchResponse::waitForResponse() +{ + void* got_tag; + bool ok = false; + + stream->Read(&reply, (void*)3); + while(cq_.Next(&got_tag, &ok)) + { + if(got_tag == (void*)3) + { + if(reply.events_size()) + { + stream->WritesDone((void*)this); + cq_.Next(&got_tag, &ok); + break; + } + else + { + stream->Read(&reply, (void*)3); + } + } + } +} + +etcdv3::AsyncWatchResponse& etcdv3::AsyncWatchResponse::ParseResponse() +{ + index = reply.header().revision(); + + mvccpb::KeyValue kv; + std::map mapValue; + + for(int cnt =0; cnt < reply.events_size(); cnt++) + { + auto event = reply.events(cnt); + if(mvccpb::Event::EventType::Event_EventType_PUT == event.type()) + { + if(event.has_kv()) + { + kv = event.kv(); + if(kv.version() == 1) + { + action = "create"; + } + else + { + action = "set"; + } + //values.push_back(kv); + mapValue.emplace(kv.key(), kv); + } + } + else if(mvccpb::Event::EventType::Event_EventType_DELETE == event.type()) + { + action = "delete"; + } + //get previous value index - 1 + RangeRequest get_request; + get_request.set_key(kv.key()); + get_request.set_revision(index - 1); + + RangeResponse response; + ClientContext ctx; + + Status result = stub_->Range(&ctx, get_request, &response); + if (result.ok()) + { + for(int cnt=0; cnt < response.kvs_size(); cnt++) + { + prev_values.push_back(response.kvs(cnt)); + } + } + } + for(auto x: mapValue) + { + values.push_back(x.second); + } + + return *this; +} From 808ccd5e3b1b855f8aa471b57cfad0127d900e60 Mon Sep 17 00:00:00 2001 From: arches Date: Tue, 21 Jun 2016 09:48:55 -0400 Subject: [PATCH 3/3] Updated test cases --- tst/EtcdTest.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tst/EtcdTest.cpp b/tst/EtcdTest.cpp index cb2ecc7..96b0901 100644 --- a/tst/EtcdTest.cpp +++ b/tst/EtcdTest.cpp @@ -222,7 +222,7 @@ TEST_CASE("wait for a value change") CHECK(!res.is_done()); sleep(1); CHECK(!res.is_done()); - + etcd.set("/test/key1", "43").get(); sleep(1); REQUIRE(res.is_done()); @@ -232,7 +232,7 @@ TEST_CASE("wait for a value change") TEST_CASE("wait for a directory change") { - etcd::Client etcd("http://127.0.0.1:4001"); + etcd::Client etcd("http://127.0.0.1:2379"); pplx::task res = etcd.watch("/test", true); CHECK(!res.is_done()); @@ -251,6 +251,7 @@ TEST_CASE("wait for a directory change") CHECK(!res2.is_done()); etcd.set("/test/key4", "45").wait(); + sleep(1); REQUIRE(res2.is_done()); CHECK("set" == res2.get().action()); CHECK("45" == res2.get().value().as_string()); @@ -258,7 +259,7 @@ TEST_CASE("wait for a directory change") TEST_CASE("watch changes in the past") { - etcd::Client etcd("http://127.0.0.1:4001"); + etcd::Client etcd("http://127.0.0.1:2379"); int index = etcd.set("/test/key1", "42").get().index();