diff --git a/etcd/Response.hpp b/etcd/Response.hpp index 8f84051..7b91d22 100644 --- a/etcd/Response.hpp +++ b/etcd/Response.hpp @@ -1,7 +1,6 @@ #ifndef __ETCD_RESPONSE_HPP__ #define __ETCD_RESPONSE_HPP__ -#include #include #include @@ -9,9 +8,11 @@ #include #include "v3/include/V3Response.hpp" -#include #include +namespace etcdv3 { + class AsyncWatchAction; +} namespace etcd { @@ -110,6 +111,7 @@ namespace etcd Values _values; Keys _keys; friend class SyncClient; + friend class etcdv3::AsyncWatchAction; }; } diff --git a/etcd/Watcher.hpp b/etcd/Watcher.hpp new file mode 100644 index 0000000..e21a28b --- /dev/null +++ b/etcd/Watcher.hpp @@ -0,0 +1,34 @@ +#ifndef __ETCD_WATCHER_HPP__ +#define __ETCD_WATCHER_HPP__ + +#include +#include "etcd/Response.hpp" +#include "v3/include/AsyncWatchAction.hpp" + +#include + +using etcdserverpb::Watch; +using grpc::Channel; + +namespace etcd +{ + class Watcher + { + public: + Watcher(std::string const & etcd_url, std::string const & key, std::function callback); + void Cancel(); + void AddKey(std::string const & key); + ~Watcher(); + + protected: + void doWatch(std::string const & key, std::function callback); + + int index; + std::function callback; + pplx::task currentTask; + std::unique_ptr watchServiceStub; + std::unique_ptr call; + }; +} + +#endif diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index be6c7c4..f060a13 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncWatchResponse.cpp ../v3/src/Transaction.cpp ../v3/src/action_constants.cpp ../v3/src/AsyncSetAction.cpp ../v3/src/AsyncCompareAndSwapAction.cpp ../v3/src/AsyncUpdateAction.cpp ../v3/src/AsyncGetAction.cpp ../v3/src/AsyncDeleteAction.cpp ../v3/src/AsyncCompareAndDeleteAction.cpp ../v3/src/Action.cpp ../v3/src/AsyncWatchAction.cpp Client.cpp Response.cpp Value.cpp SyncClient.cpp) +add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/Transaction.cpp ../v3/src/action_constants.cpp ../v3/src/AsyncSetAction.cpp ../v3/src/AsyncCompareAndSwapAction.cpp ../v3/src/AsyncUpdateAction.cpp ../v3/src/AsyncGetAction.cpp ../v3/src/AsyncDeleteAction.cpp ../v3/src/AsyncCompareAndDeleteAction.cpp ../v3/src/Action.cpp ../v3/src/AsyncWatchAction.cpp Client.cpp Response.cpp Value.cpp SyncClient.cpp Watcher.cpp ../v3/src/AsyncWatchResponse.cpp) set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11) target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++) diff --git a/src/Client.cpp b/src/Client.cpp index 836a8f3..4069958 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -100,7 +100,7 @@ pplx::task etcd::Client::rm_if(std::string const & key, int old_ pplx::task etcd::Client::rmdir(std::string const & key, bool recursive) { - std::shared_ptr call(new etcdv3::AsyncDeleteAction(key,stub_.get(),true));; + std::shared_ptr call(new etcdv3::AsyncDeleteAction(key,stub_.get(),true)); return Response::create(call); } diff --git a/src/Watcher.cpp b/src/Watcher.cpp new file mode 100644 index 0000000..97bc5f2 --- /dev/null +++ b/src/Watcher.cpp @@ -0,0 +1,68 @@ +#include "etcd/Watcher.hpp" + +etcd::Watcher::Watcher(std::string const & address, std::string const & key, std::function callback) +{ + std::string stripped_address(address); + std::string substr("http://"); + std::string::size_type i = stripped_address.find(substr); + if(i != std::string::npos) + { + stripped_address.erase(i,substr.length()); + } + std::shared_ptr channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials()); + watchServiceStub= Watch::NewStub(channel); + + doWatch(key, callback); +} + +etcd::Watcher::~Watcher() +{ + call->CancelWatch(); + currentTask.wait(); +} + +void etcd::Watcher::Cancel() +{ + call->CancelWatch(); + currentTask.wait(); +} + +void etcd::Watcher::AddKey(std::string const & key) +{ + call->WatchReq(key); +} + +void etcd::Watcher::doWatch(std::string const & key, std::function callback) +{ + + call.reset(new etcdv3::AsyncWatchAction(key,true,NULL,watchServiceStub.get())); + + currentTask = pplx::task([this, callback]() + { + return call->waitForResponse(callback); + }); + + + //return Response::create(call); + + /*currentTask = client.request(web::http::methods::GET, uri.to_string(), cancellation_source.get_token()) + .then([this](pplx::task response_task) + { + try + { + auto http_response = response_task.get(); + auto json_task = http_response.extract_json(); + auto json_value = json_task.get(); + callback(etcd::Response(http_response, json_value)); + } + catch (std::exception const & ex) + { + if (pplx::is_task_cancellation_requested() || (ex.what() == std::string("Operation canceled"))) + return; + + if(ex.what() != std::string("Retrieving message chunk header")) + throw; + } + doWatch(); + });*/ +} diff --git a/tst/CMakeLists.txt b/tst/CMakeLists.txt index f304fe2..50083cc 100644 --- a/tst/CMakeLists.txt +++ b/tst/CMakeLists.txt @@ -1,4 +1,4 @@ -add_executable(etcd_test EtcdTest.cpp EtcdSyncTest.cpp) +add_executable(etcd_test EtcdTest.cpp EtcdSyncTest.cpp WatcherTest.cpp) set_property(TARGET etcd_test PROPERTY CXX_STANDARD 11) target_link_libraries(etcd_test etcd-cpp-api) diff --git a/tst/WatcherTest.cpp b/tst/WatcherTest.cpp new file mode 100644 index 0000000..a34ca0e --- /dev/null +++ b/tst/WatcherTest.cpp @@ -0,0 +1,127 @@ +#include + +#include "etcd/Watcher.hpp" +#include "etcd/SyncClient.hpp" + +static std::string etcd_uri("http://127.0.0.1:2379"); +static int watcher_called = 0; + +void printResponse(etcd::Response const & resp) +{ + ++watcher_called; + std::cout << "print response called" << std::endl; + if (resp.error_code()) + std::cout << resp.error_code() << ": " << resp.error_message() << std::endl; + else + { + std::cout << resp.action() << " " << resp.value().as_string() << std::endl; + } +} + +TEST_CASE("create watcher") +{ + + etcd::SyncClient etcd(etcd_uri); + etcd.rmdir("/test", true); + + watcher_called = 0; + //{ + std::cout << "watch started" << std::endl; + etcd::Watcher watcher(etcd_uri, "/test", printResponse); + sleep(1); + etcd.set("/test/key", "42"); + std::cout << "first set finished" << std::endl; + etcd.set("/test/key", "43"); + std::cout << "second set finished" << std::endl; + //} + + sleep(1); + CHECK(2 == watcher_called); +// TEST_CASE("wait for a value change") +// { +// etcd::Client etcd(etcd_uri); +// etcd.set("/test/key1", "42").wait(); + +// pplx::task res = etcd.watch("/test/key1"); +// CHECK(!res.is_done()); + +// etcd.set("/test/key1", "43").get(); +// sleep(1); + +// REQUIRE(res.is_done()); +// REQUIRE("set" == res.get().action()); +// CHECK("43" == res.get().value().as_string()); +// } + +// TEST_CASE("wait for a directory change") +// { +// etcd::Client etcd(etcd_uri); + +// pplx::task res = etcd.watch("/test", true); + +// etcd.add("/test/key4", "44").wait(); +// REQUIRE(res.is_done()); +// CHECK("create" == res.get().action()); +// CHECK("44" == res.get().value().as_string()); + +// pplx::task res2 = etcd.watch("/test", true); + +// etcd.set("/test/key4", "45").wait(); +// sleep(1); +// REQUIRE(res2.is_done()); +// CHECK("set" == res2.get().action()); +// CHECK("45" == res2.get().value().as_string()); +// } + +// TEST_CASE("watch changes in the past") +// { +// etcd::Client etcd(etcd_uri); + +// int index = etcd.set("/test/key1", "42").get().index(); + +// etcd.set("/test/key1", "43").wait(); +// etcd.set("/test/key1", "44").wait(); +// etcd.set("/test/key1", "45").wait(); + +// etcd::Response res = etcd.watch("/test/key1", ++index).get(); +// CHECK("set" == res.action()); +// CHECK("43" == res.value().as_string()); + +// res = etcd.watch("/test/key1", ++index).get(); +// CHECK("set" == res.action()); +// CHECK("44" == res.value().as_string()); + +// res = etcd.watch("/test", ++index, true).get(); +// CHECK("set" == res.action()); +// CHECK("45" == res.value().as_string()); +// } + +// TEST_CASE("request cancellation") +// { +// etcd::Client etcd(etcd_uri); +// etcd.set("/test/key1", "42").wait(); + +// pplx::task res = etcd.watch("/test/key1"); +// CHECK(!res.is_done()); + +// etcd.cancel_operations(); + +// sleep(1); +// REQUIRE(res.is_done()); +// try +// { +// res.wait(); +// } +// catch(pplx::task_canceled const & ex) +// { +// std::cout << "pplx::task_canceled: " << ex.what() << "\n"; +// } +// catch(std::exception const & ex) +// { +// std::cout << "std::exception: " << ex.what() << "\n"; +// } +// } + std::cout << "start rmdir" << std::endl; + etcd.rmdir("/test", true).error_code(); + std::cout << "end rmdir" << std::endl; +} diff --git a/v3/include/AsyncWatchAction.hpp b/v3/include/AsyncWatchAction.hpp index b91c6ff..d929253 100644 --- a/v3/include/AsyncWatchAction.hpp +++ b/v3/include/AsyncWatchAction.hpp @@ -5,6 +5,7 @@ #include "proto/rpc.grpc.pb.h" #include "v3/include/Action.hpp" #include "v3/include/AsyncWatchResponse.hpp" +#include "etcd/Response.hpp" using grpc::ClientAsyncReaderWriter; @@ -22,9 +23,14 @@ namespace etcdv3 AsyncWatchAction(std::string const & key, int fromIndex, bool recursive, KV::Stub* stub_, Watch::Stub* watchServiceStub); AsyncWatchResponse ParseResponse(); void waitForResponse(); + void waitForResponse(std::function callback); + void CancelWatch(); + void WatchReq(std::string const & key); WatchResponse reply; KV::Stub* stub_; std::unique_ptr> stream; + bool prefix; + }; } diff --git a/v3/src/AsyncTxnResponse.cpp b/v3/src/AsyncTxnResponse.cpp index d27490f..53170f5 100644 --- a/v3/src/AsyncTxnResponse.cpp +++ b/v3/src/AsyncTxnResponse.cpp @@ -52,6 +52,10 @@ void etcdv3::AsyncTxnResponse::ParseResponse() range_kvs = response.values; } } + else if(ResponseOp::ResponseCase::kResponseDeleteRange == resp.response_case()) + { + std::cout << "number of deleted keys: " << resp.response_delete_range().deleted() <AsyncWatch(&context,&cq_,(void*)this); + std::cout << "AsyncWatchAction create start" << std::endl; + stream = watchServiceStub->AsyncWatch(&context,&cq_,(void*)"create"); WatchRequest watch_req; WatchCreateRequest watch_create_req; watch_create_req.set_key(key); std::string range_end(key); + prefix = recursive; if(recursive) { int ascii = (int)range_end[range_end.length()-1]; @@ -22,14 +25,16 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(std::string const & key, bool recursi } watch_req.mutable_create_request()->CopyFrom(watch_create_req); - stream->Write(watch_req, (void*)this); + stream->Write(watch_req, (void*)"write"); + stream->Read(&reply, (void*)this); stub_ = stub_; + std::cout << "AsyncWatchAction create end" << std::endl; } etcdv3::AsyncWatchAction::AsyncWatchAction(std::string const & key, int fromIndex, bool recursive, KV::Stub* stub_, Watch::Stub* watchServiceStub) { - stream = watchServiceStub->AsyncWatch(&context,&cq_,(void*)this); + stream = watchServiceStub->AsyncWatch(&context,&cq_,(void*)1); WatchRequest watch_req; WatchCreateRequest watch_create_req; @@ -44,36 +49,86 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(std::string const & key, int fromInde watch_create_req.set_range_end(range_end); } - watch_req.mutable_create_request()->CopyFrom(watch_create_req); - stream->Write(watch_req, (void*)this); + stream->Write(watch_req, (void*)1); + stream->Read(&reply, (void*)this); stub_ = stub_; } +void etcdv3::AsyncWatchAction::WatchReq(std::string const & key) +{ + WatchRequest watch_req; + WatchCreateRequest watch_create_req; + watch_create_req.set_key(key); + + watch_req.mutable_create_request()->CopyFrom(watch_create_req); + stream->Write(watch_req, (void*)1); + stream->Read(&reply, (void*)this); +} + + void etcdv3::AsyncWatchAction::waitForResponse() { void* got_tag; bool ok = false; - stream->Read(&reply, (void*)3); while(cq_.Next(&got_tag, &ok)) { - if(got_tag == (void*)3) + if(got_tag == (void*)this) // read tag { if(reply.events_size()) { stream->WritesDone((void*)this); cq_.Next(&got_tag, &ok); break; - } + } else { - stream->Read(&reply, (void*)3); - } + stream->Read(&reply, (void*)this); + } } } } +void etcdv3::AsyncWatchAction::CancelWatch() +{ + std::cout << "cancel watch"<< std::endl; + stream->WritesDone((void*)"writes done"); +} + +void etcdv3::AsyncWatchAction::waitForResponse(std::function callback) +{ + std::cout << "waitForResponse start" << std::endl; + void* got_tag; + bool ok = false; + + while(cq_.Next(&got_tag, &ok)) + { + if(ok == false) + { + break; + } + std::cout << "ok status: " << ok << std::endl; + if(got_tag == (void*)"writes done") + { + std::cout << "writes done" << std::endl; + } + else if(got_tag == (void*)this) // read tag + { + std::cout << "read tag" << std::endl; + std::cout << "events size: "<< reply.events_size() << std::endl; + if(reply.events_size()) + { + auto resp = ParseResponse(); + callback(resp); + std::cout << "events received try to read again" << std::endl; + } + std::cout << " read again" << std::endl; + stream->Read(&reply, (void*)this); + } + } +} + etcdv3::AsyncWatchResponse etcdv3::AsyncWatchAction::ParseResponse() {