diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 93c819c..c7e6754 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -155,6 +155,8 @@ namespace etcd pplx::task send_asyncdelete(std::string const & key, bool recursive); pplx::task send_asyncrm_if(std::string const &key, std::string const &old_value); pplx::task send_asyncrm_if(std::string const &key, int old_index); + pplx::task send_asyncwatch(std::string const & key, bool recursive); + pplx::task send_asyncwatch(std::string const & key, int fromIndex, bool recursive); private: std::shared_ptr initiate_transaction(const std::string &operation, diff --git a/etcd/Response.hpp b/etcd/Response.hpp index 4baba30..15c04c2 100644 --- a/etcd/Response.hpp +++ b/etcd/Response.hpp @@ -29,13 +29,9 @@ namespace etcd { return pplx::task([call]() { - void* got_tag; - bool ok = false; etcd::Response resp; - //blocking - call->cq_.Next(&got_tag, &ok); - GPR_ASSERT(got_tag == (void*)call.get()); + call->waitForResponse(); auto v3resp = call->ParseResponse(); diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 45966fd..ce09f41 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/Utils.cpp ../v3/src/grpcClient.cpp ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncPutResponse.cpp ../v3/src/Transaction.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp) +add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/Utils.cpp ../v3/src/grpcClient.cpp ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncPutResponse.cpp ../v3/src/AsyncWatchResponse.cpp ../v3/src/Transaction.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp) set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11) target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++) diff --git a/src/Client.cpp b/src/Client.cpp index 621d8ef..b6e90b3 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -2,6 +2,7 @@ #include "etcd/Client.hpp" #include "v3/include/AsyncTxnResponse.hpp" #include "v3/include/AsyncRangeResponse.hpp" +#include "v3/include/AsyncWatchResponse.hpp" #include "v3/include/Utils.hpp" #include "v3/include/Transaction.hpp" #include @@ -31,6 +32,7 @@ etcd::Client::Client(std::string const & address) } std::shared_ptr channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials()); stub_= KV::NewStub(channel); + watchServiceStub= Watch::NewStub(channel); } pplx::task etcd::Client::send_get_request(web::http::uri_builder & uri) @@ -129,22 +131,13 @@ pplx::task etcd::Client::ls(std::string const & key) pplx::task etcd::Client::watch(std::string const & key, bool recursive) { - web::http::uri_builder uri("/v2/keys" + key); - uri.append_query("wait=true"); - if (recursive) - uri.append_query("recursive=true"); - return send_get_request(uri); + return send_asyncwatch(key,recursive); } pplx::task etcd::Client::watch(std::string const & key, int fromIndex, bool recursive) { - web::http::uri_builder uri("/v2/keys" + key); - uri.append_query("wait=true"); - uri.append_query("waitIndex", fromIndex); - if (recursive) - uri.append_query("recursive=true"); - return send_get_request(uri); + return send_asyncwatch(key, fromIndex, recursive); } @@ -293,3 +286,54 @@ pplx::task etcd::Client::send_asyncrm_if(std::string const &key, std::shared_ptr call = initiate_transaction("compareAndDelete", transaction); return Response::create(call); } + +pplx::task etcd::Client::send_asyncwatch(std::string const & key, bool recursive) +{ + std::shared_ptr call(new etcdv3::AsyncWatchResponse()); + call->stream = watchServiceStub->AsyncWatch(&call->context,&call->cq_,(void*)call.get()); + + WatchRequest watch_req; + WatchCreateRequest watch_create_req; + watch_create_req.set_key(key); + + std::string range_end(key); + if(recursive) + { + int ascii = (int)range_end[range_end.length()-1]; + range_end.back() = ascii+1; + watch_create_req.set_range_end(range_end); + } + + + watch_req.mutable_create_request()->CopyFrom(watch_create_req); + call->stream->Write(watch_req, (void*)call.get()); + call->stub_ = stub_.get(); + + return Response::create(call); +} + +pplx::task etcd::Client::send_asyncwatch(std::string const & key, int fromIndex, bool recursive) +{ + std::shared_ptr call(new etcdv3::AsyncWatchResponse()); + call->stream = watchServiceStub->AsyncWatch(&call->context,&call->cq_,(void*)call.get()); + + WatchRequest watch_req; + WatchCreateRequest watch_create_req; + watch_create_req.set_key(key); + watch_create_req.set_start_revision(fromIndex); + + std::string range_end(key); + if(recursive) + { + int ascii = (int)range_end[range_end.length()-1]; + range_end.back() = ascii+1; + watch_create_req.set_range_end(range_end); + } + + + watch_req.mutable_create_request()->CopyFrom(watch_create_req); + call->stream->Write(watch_req, (void*)call.get()); + call->stub_ = stub_.get(); + + return Response::create(call); +} diff --git a/tst/EtcdTest.cpp b/tst/EtcdTest.cpp index 93d48b4..96b0901 100644 --- a/tst/EtcdTest.cpp +++ b/tst/EtcdTest.cpp @@ -213,71 +213,72 @@ TEST_CASE("delete a directory") } -//TEST_CASE("wait for a value change") -//{ -// etcd::Client etcd("http://127.0.0.1:2379"); -// etcd.set("/test/key1", "42").wait(); -// -// pplx::task res = etcd.watch("/test/key1"); -// CHECK(!res.is_done()); -// sleep(1); -// CHECK(!res.is_done()); -// -// etcd.set("/test/key1", "43").get(); -// sleep(1); -// REQUIRE(res.is_done()); -// REQUIRE("set" == res.get().action()); -// CHECK("43" == res.get().value().as_string()); -//} -// -//TEST_CASE("wait for a directory change") -//{ -// etcd::Client etcd("http://127.0.0.1:4001"); -// -// pplx::task res = etcd.watch("/test", true); -// CHECK(!res.is_done()); -// sleep(1); -// CHECK(!res.is_done()); -// -// etcd.add("/test/key4", "44").wait(); -// sleep(1); -// REQUIRE(res.is_done()); -// CHECK("create" == res.get().action()); -// CHECK("44" == res.get().value().as_string()); -// -// pplx::task res2 = etcd.watch("/test", true); -// CHECK(!res2.is_done()); -// sleep(1); -// CHECK(!res2.is_done()); -// -// etcd.set("/test/key4", "45").wait(); -// REQUIRE(res2.is_done()); -// CHECK("set" == res2.get().action()); -// CHECK("45" == res2.get().value().as_string()); -//} -// -//TEST_CASE("watch changes in the past") -//{ -// etcd::Client etcd("http://127.0.0.1:4001"); -// -// int index = etcd.set("/test/key1", "42").get().index(); -// -// etcd.set("/test/key1", "43").wait(); -// etcd.set("/test/key1", "44").wait(); -// etcd.set("/test/key1", "45").wait(); -// -// etcd::Response res = etcd.watch("/test/key1", ++index).get(); -// CHECK("set" == res.action()); -// CHECK("43" == res.value().as_string()); -// -// res = etcd.watch("/test/key1", ++index).get(); -// CHECK("set" == res.action()); -// CHECK("44" == res.value().as_string()); -// -// res = etcd.watch("/test", ++index, true).get(); -// CHECK("set" == res.action()); -// CHECK("45" == res.value().as_string()); -//} +TEST_CASE("wait for a value change") +{ + etcd::Client etcd("http://127.0.0.1:2379"); + etcd.set("/test/key1", "42").wait(); + + pplx::task res = etcd.watch("/test/key1"); + CHECK(!res.is_done()); + sleep(1); + CHECK(!res.is_done()); + + etcd.set("/test/key1", "43").get(); + sleep(1); + REQUIRE(res.is_done()); + REQUIRE("set" == res.get().action()); + CHECK("43" == res.get().value().as_string()); +} + +TEST_CASE("wait for a directory change") +{ + etcd::Client etcd("http://127.0.0.1:2379"); + + pplx::task res = etcd.watch("/test", true); + CHECK(!res.is_done()); + sleep(1); + CHECK(!res.is_done()); + + etcd.add("/test/key4", "44").wait(); + sleep(1); + REQUIRE(res.is_done()); + CHECK("create" == res.get().action()); + CHECK("44" == res.get().value().as_string()); + + pplx::task res2 = etcd.watch("/test", true); + CHECK(!res2.is_done()); + sleep(1); + CHECK(!res2.is_done()); + + etcd.set("/test/key4", "45").wait(); + sleep(1); + REQUIRE(res2.is_done()); + CHECK("set" == res2.get().action()); + CHECK("45" == res2.get().value().as_string()); +} + +TEST_CASE("watch changes in the past") +{ + etcd::Client etcd("http://127.0.0.1:2379"); + + int index = etcd.set("/test/key1", "42").get().index(); + + etcd.set("/test/key1", "43").wait(); + etcd.set("/test/key1", "44").wait(); + etcd.set("/test/key1", "45").wait(); + + etcd::Response res = etcd.watch("/test/key1", ++index).get(); + CHECK("set" == res.action()); + CHECK("43" == res.value().as_string()); + + res = etcd.watch("/test/key1", ++index).get(); + CHECK("set" == res.action()); + CHECK("44" == res.value().as_string()); + + res = etcd.watch("/test", ++index, true).get(); + CHECK("set" == res.action()); + CHECK("45" == res.value().as_string()); +} TEST_CASE("cleanup") diff --git a/v3/include/AsyncRangeResponse.hpp b/v3/include/AsyncRangeResponse.hpp index 9b83ded..66d6cae 100644 --- a/v3/include/AsyncRangeResponse.hpp +++ b/v3/include/AsyncRangeResponse.hpp @@ -20,13 +20,14 @@ namespace etcdv3 AsyncRangeResponse(){action = "get";}; AsyncRangeResponse(const AsyncRangeResponse& other); AsyncRangeResponse& operator=(const AsyncRangeResponse& other); + AsyncRangeResponse& ParseResponse(); + void waitForResponse(); RangeResponse reply; etcdserverpb::PutResponse r; Status status; ClientContext context; CompletionQueue cq_; std::unique_ptr> response_reader; - AsyncRangeResponse& ParseResponse(); }; } diff --git a/v3/include/AsyncTxnResponse.hpp b/v3/include/AsyncTxnResponse.hpp index 2fda271..e07607d 100644 --- a/v3/include/AsyncTxnResponse.hpp +++ b/v3/include/AsyncTxnResponse.hpp @@ -20,12 +20,13 @@ namespace etcdv3 AsyncTxnResponse(const std::string act){action = act;}; AsyncTxnResponse(const AsyncTxnResponse& other); AsyncTxnResponse& operator=(const AsyncTxnResponse& other); + AsyncTxnResponse& ParseResponse(); + void waitForResponse(); TxnResponse reply; Status status; ClientContext context; CompletionQueue cq_; std::unique_ptr> response_reader; - AsyncTxnResponse& ParseResponse(); }; } diff --git a/v3/include/AsyncWatchResponse.hpp b/v3/include/AsyncWatchResponse.hpp new file mode 100644 index 0000000..5305345 --- /dev/null +++ b/v3/include/AsyncWatchResponse.hpp @@ -0,0 +1,37 @@ +#ifndef __ASYNC_WATCH_HPP__ +#define __ASYNC_WATCH_HPP__ + +#include +#include "proto/rpc.grpc.pb.h" +#include "v3/include/V3Response.hpp" + + +using grpc::ClientAsyncReaderWriter; +using grpc::ClientContext; +using grpc::CompletionQueue; +using grpc::Status; +using etcdserverpb::WatchRequest; +using etcdserverpb::WatchResponse; +using etcdserverpb::KV; + +namespace etcdv3 +{ + class AsyncWatchResponse : public etcdv3::V3Response + { + public: + AsyncWatchResponse(){}; + AsyncWatchResponse(const std::string act){action = act;}; + AsyncWatchResponse(const AsyncWatchResponse& other); + AsyncWatchResponse& operator=(const AsyncWatchResponse& other); + AsyncWatchResponse& ParseResponse(); + void waitForResponse(); + WatchResponse reply; + Status status; + ClientContext context; + CompletionQueue cq_; + std::unique_ptr> stream; + KV::Stub* stub_; + }; +} + +#endif diff --git a/v3/src/AsyncRangeResponse.cpp b/v3/src/AsyncRangeResponse.cpp index 682c7a2..60d197c 100644 --- a/v3/src/AsyncRangeResponse.cpp +++ b/v3/src/AsyncRangeResponse.cpp @@ -22,6 +22,15 @@ etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::operator=(const etcdv3:: return *this; } +void etcdv3::AsyncRangeResponse::waitForResponse() +{ + void* got_tag; + bool ok = false; + + cq_.Next(&got_tag, &ok); + GPR_ASSERT(got_tag == (void*)this); +} + etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::ParseResponse() { index = reply.header().revision(); diff --git a/v3/src/AsyncTxnResponse.cpp b/v3/src/AsyncTxnResponse.cpp index 1ec4014..86d813c 100644 --- a/v3/src/AsyncTxnResponse.cpp +++ b/v3/src/AsyncTxnResponse.cpp @@ -26,6 +26,15 @@ etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::operator=(const etcdv3::Asyn return *this; } +void etcdv3::AsyncTxnResponse::waitForResponse() +{ + void* got_tag; + bool ok = false; + + cq_.Next(&got_tag, &ok); + GPR_ASSERT(got_tag == (void*)this); +} + etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::ParseResponse() { diff --git a/v3/src/AsyncWatchResponse.cpp b/v3/src/AsyncWatchResponse.cpp new file mode 100644 index 0000000..2f33eef --- /dev/null +++ b/v3/src/AsyncWatchResponse.cpp @@ -0,0 +1,107 @@ +#include "v3/include/AsyncWatchResponse.hpp" + +using etcdserverpb::RangeRequest; +using etcdserverpb::RangeResponse; + + +etcdv3::AsyncWatchResponse::AsyncWatchResponse(const etcdv3::AsyncWatchResponse& other) +{ + error_code = other.error_code; + error_message = other.error_message; + index = other.index; + action = other.action; + values = other.values; + prev_values = other.prev_values; + +} + +etcdv3::AsyncWatchResponse& etcdv3::AsyncWatchResponse::operator=(const etcdv3::AsyncWatchResponse& other) +{ + error_code = other.error_code; + error_message = other.error_message; + index = other.index; + action = other.action; + values = other.values; + prev_values = other.prev_values; + return *this; +} + +void etcdv3::AsyncWatchResponse::waitForResponse() +{ + void* got_tag; + bool ok = false; + + stream->Read(&reply, (void*)3); + while(cq_.Next(&got_tag, &ok)) + { + if(got_tag == (void*)3) + { + if(reply.events_size()) + { + stream->WritesDone((void*)this); + cq_.Next(&got_tag, &ok); + break; + } + else + { + stream->Read(&reply, (void*)3); + } + } + } +} + +etcdv3::AsyncWatchResponse& etcdv3::AsyncWatchResponse::ParseResponse() +{ + index = reply.header().revision(); + + mvccpb::KeyValue kv; + std::map mapValue; + + for(int cnt =0; cnt < reply.events_size(); cnt++) + { + auto event = reply.events(cnt); + if(mvccpb::Event::EventType::Event_EventType_PUT == event.type()) + { + if(event.has_kv()) + { + kv = event.kv(); + if(kv.version() == 1) + { + action = "create"; + } + else + { + action = "set"; + } + //values.push_back(kv); + mapValue.emplace(kv.key(), kv); + } + } + else if(mvccpb::Event::EventType::Event_EventType_DELETE == event.type()) + { + action = "delete"; + } + //get previous value index - 1 + RangeRequest get_request; + get_request.set_key(kv.key()); + get_request.set_revision(index - 1); + + RangeResponse response; + ClientContext ctx; + + Status result = stub_->Range(&ctx, get_request, &response); + if (result.ok()) + { + for(int cnt=0; cnt < response.kvs_size(); cnt++) + { + prev_values.push_back(response.kvs(cnt)); + } + } + } + for(auto x: mapValue) + { + values.push_back(x.second); + } + + return *this; +}