Updates for Watcher

This commit is contained in:
arches 2016-07-05 05:22:11 -04:00
parent 174ea401b8
commit 7d64447e76
10 changed files with 311 additions and 15 deletions

View File

@ -1,7 +1,6 @@
#ifndef __ETCD_RESPONSE_HPP__
#define __ETCD_RESPONSE_HPP__
#include <cpprest/http_client.h>
#include <string>
#include <vector>
@ -9,9 +8,11 @@
#include <grpc++/grpc++.h>
#include "v3/include/V3Response.hpp"
#include <grpc++/grpc++.h>
#include <iostream>
namespace etcdv3 {
class AsyncWatchAction;
}
namespace etcd
{
@ -110,6 +111,7 @@ namespace etcd
Values _values;
Keys _keys;
friend class SyncClient;
friend class etcdv3::AsyncWatchAction;
};
}

34
etcd/Watcher.hpp Normal file
View File

@ -0,0 +1,34 @@
#ifndef __ETCD_WATCHER_HPP__
#define __ETCD_WATCHER_HPP__
#include <string>
#include "etcd/Response.hpp"
#include "v3/include/AsyncWatchAction.hpp"
#include <grpc++/grpc++.h>
using etcdserverpb::Watch;
using grpc::Channel;
namespace etcd
{
class Watcher
{
public:
Watcher(std::string const & etcd_url, std::string const & key, std::function<void(Response)> callback);
void Cancel();
void AddKey(std::string const & key);
~Watcher();
protected:
void doWatch(std::string const & key, std::function<void(Response)> callback);
int index;
std::function<void(Response)> callback;
pplx::task<void> currentTask;
std::unique_ptr<Watch::Stub> watchServiceStub;
std::unique_ptr<etcdv3::AsyncWatchAction> call;
};
}
#endif

View File

@ -1,4 +1,4 @@
add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncWatchResponse.cpp ../v3/src/Transaction.cpp ../v3/src/action_constants.cpp ../v3/src/AsyncSetAction.cpp ../v3/src/AsyncCompareAndSwapAction.cpp ../v3/src/AsyncUpdateAction.cpp ../v3/src/AsyncGetAction.cpp ../v3/src/AsyncDeleteAction.cpp ../v3/src/AsyncCompareAndDeleteAction.cpp ../v3/src/Action.cpp ../v3/src/AsyncWatchAction.cpp Client.cpp Response.cpp Value.cpp SyncClient.cpp)
add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/Transaction.cpp ../v3/src/action_constants.cpp ../v3/src/AsyncSetAction.cpp ../v3/src/AsyncCompareAndSwapAction.cpp ../v3/src/AsyncUpdateAction.cpp ../v3/src/AsyncGetAction.cpp ../v3/src/AsyncDeleteAction.cpp ../v3/src/AsyncCompareAndDeleteAction.cpp ../v3/src/Action.cpp ../v3/src/AsyncWatchAction.cpp Client.cpp Response.cpp Value.cpp SyncClient.cpp Watcher.cpp ../v3/src/AsyncWatchResponse.cpp)
set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11)
target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++)

View File

@ -100,7 +100,7 @@ pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, int old_
pplx::task<etcd::Response> etcd::Client::rmdir(std::string const & key, bool recursive)
{
std::shared_ptr<etcdv3::AsyncDeleteAction> call(new etcdv3::AsyncDeleteAction(key,stub_.get(),true));;
std::shared_ptr<etcdv3::AsyncDeleteAction> call(new etcdv3::AsyncDeleteAction(key,stub_.get(),true));
return Response::create(call);
}

68
src/Watcher.cpp Normal file
View File

@ -0,0 +1,68 @@
#include "etcd/Watcher.hpp"
etcd::Watcher::Watcher(std::string const & address, std::string const & key, std::function<void(Response)> callback)
{
std::string stripped_address(address);
std::string substr("http://");
std::string::size_type i = stripped_address.find(substr);
if(i != std::string::npos)
{
stripped_address.erase(i,substr.length());
}
std::shared_ptr<Channel> channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials());
watchServiceStub= Watch::NewStub(channel);
doWatch(key, callback);
}
etcd::Watcher::~Watcher()
{
call->CancelWatch();
currentTask.wait();
}
void etcd::Watcher::Cancel()
{
call->CancelWatch();
currentTask.wait();
}
void etcd::Watcher::AddKey(std::string const & key)
{
call->WatchReq(key);
}
void etcd::Watcher::doWatch(std::string const & key, std::function<void(Response)> callback)
{
call.reset(new etcdv3::AsyncWatchAction(key,true,NULL,watchServiceStub.get()));
currentTask = pplx::task<void>([this, callback]()
{
return call->waitForResponse(callback);
});
//return Response::create(call);
/*currentTask = client.request(web::http::methods::GET, uri.to_string(), cancellation_source.get_token())
.then([this](pplx::task<web::http::http_response> response_task)
{
try
{
auto http_response = response_task.get();
auto json_task = http_response.extract_json();
auto json_value = json_task.get();
callback(etcd::Response(http_response, json_value));
}
catch (std::exception const & ex)
{
if (pplx::is_task_cancellation_requested() || (ex.what() == std::string("Operation canceled")))
return;
if(ex.what() != std::string("Retrieving message chunk header"))
throw;
}
doWatch();
});*/
}

View File

@ -1,4 +1,4 @@
add_executable(etcd_test EtcdTest.cpp EtcdSyncTest.cpp)
add_executable(etcd_test EtcdTest.cpp EtcdSyncTest.cpp WatcherTest.cpp)
set_property(TARGET etcd_test PROPERTY CXX_STANDARD 11)
target_link_libraries(etcd_test etcd-cpp-api)

127
tst/WatcherTest.cpp Normal file
View File

@ -0,0 +1,127 @@
#include <catch.hpp>
#include "etcd/Watcher.hpp"
#include "etcd/SyncClient.hpp"
static std::string etcd_uri("http://127.0.0.1:2379");
static int watcher_called = 0;
void printResponse(etcd::Response const & resp)
{
++watcher_called;
std::cout << "print response called" << std::endl;
if (resp.error_code())
std::cout << resp.error_code() << ": " << resp.error_message() << std::endl;
else
{
std::cout << resp.action() << " " << resp.value().as_string() << std::endl;
}
}
TEST_CASE("create watcher")
{
etcd::SyncClient etcd(etcd_uri);
etcd.rmdir("/test", true);
watcher_called = 0;
//{
std::cout << "watch started" << std::endl;
etcd::Watcher watcher(etcd_uri, "/test", printResponse);
sleep(1);
etcd.set("/test/key", "42");
std::cout << "first set finished" << std::endl;
etcd.set("/test/key", "43");
std::cout << "second set finished" << std::endl;
//}
sleep(1);
CHECK(2 == watcher_called);
// TEST_CASE("wait for a value change")
// {
// etcd::Client etcd(etcd_uri);
// etcd.set("/test/key1", "42").wait();
// pplx::task<etcd::Response> res = etcd.watch("/test/key1");
// CHECK(!res.is_done());
// etcd.set("/test/key1", "43").get();
// sleep(1);
// REQUIRE(res.is_done());
// REQUIRE("set" == res.get().action());
// CHECK("43" == res.get().value().as_string());
// }
// TEST_CASE("wait for a directory change")
// {
// etcd::Client etcd(etcd_uri);
// pplx::task<etcd::Response> res = etcd.watch("/test", true);
// etcd.add("/test/key4", "44").wait();
// REQUIRE(res.is_done());
// CHECK("create" == res.get().action());
// CHECK("44" == res.get().value().as_string());
// pplx::task<etcd::Response> res2 = etcd.watch("/test", true);
// etcd.set("/test/key4", "45").wait();
// sleep(1);
// REQUIRE(res2.is_done());
// CHECK("set" == res2.get().action());
// CHECK("45" == res2.get().value().as_string());
// }
// TEST_CASE("watch changes in the past")
// {
// etcd::Client etcd(etcd_uri);
// int index = etcd.set("/test/key1", "42").get().index();
// etcd.set("/test/key1", "43").wait();
// etcd.set("/test/key1", "44").wait();
// etcd.set("/test/key1", "45").wait();
// etcd::Response res = etcd.watch("/test/key1", ++index).get();
// CHECK("set" == res.action());
// CHECK("43" == res.value().as_string());
// res = etcd.watch("/test/key1", ++index).get();
// CHECK("set" == res.action());
// CHECK("44" == res.value().as_string());
// res = etcd.watch("/test", ++index, true).get();
// CHECK("set" == res.action());
// CHECK("45" == res.value().as_string());
// }
// TEST_CASE("request cancellation")
// {
// etcd::Client etcd(etcd_uri);
// etcd.set("/test/key1", "42").wait();
// pplx::task<etcd::Response> res = etcd.watch("/test/key1");
// CHECK(!res.is_done());
// etcd.cancel_operations();
// sleep(1);
// REQUIRE(res.is_done());
// try
// {
// res.wait();
// }
// catch(pplx::task_canceled const & ex)
// {
// std::cout << "pplx::task_canceled: " << ex.what() << "\n";
// }
// catch(std::exception const & ex)
// {
// std::cout << "std::exception: " << ex.what() << "\n";
// }
// }
std::cout << "start rmdir" << std::endl;
etcd.rmdir("/test", true).error_code();
std::cout << "end rmdir" << std::endl;
}

View File

@ -5,6 +5,7 @@
#include "proto/rpc.grpc.pb.h"
#include "v3/include/Action.hpp"
#include "v3/include/AsyncWatchResponse.hpp"
#include "etcd/Response.hpp"
using grpc::ClientAsyncReaderWriter;
@ -22,9 +23,14 @@ namespace etcdv3
AsyncWatchAction(std::string const & key, int fromIndex, bool recursive, KV::Stub* stub_, Watch::Stub* watchServiceStub);
AsyncWatchResponse ParseResponse();
void waitForResponse();
void waitForResponse(std::function<void(etcd::Response)> callback);
void CancelWatch();
void WatchReq(std::string const & key);
WatchResponse reply;
KV::Stub* stub_;
std::unique_ptr<ClientAsyncReaderWriter<WatchRequest,WatchResponse>> stream;
bool prefix;
};
}

View File

@ -52,6 +52,10 @@ void etcdv3::AsyncTxnResponse::ParseResponse()
range_kvs = response.values;
}
}
else if(ResponseOp::ResponseCase::kResponseDeleteRange == resp.response_case())
{
std::cout << "number of deleted keys: " << resp.response_delete_range().deleted() <<std::endl;
}
}
prev_values = prev_range_kvs;
values = range_kvs;

View File

@ -1,19 +1,22 @@
#include "v3/include/AsyncWatchAction.hpp"
#include "v3/include/action_constants.hpp"
using etcdserverpb::RangeRequest;
using etcdserverpb::RangeResponse;
using etcdserverpb::WatchCreateRequest;
etcdv3::AsyncWatchAction::AsyncWatchAction(std::string const & key, bool recursive, KV::Stub* stub_, Watch::Stub* watchServiceStub)
{
stream = watchServiceStub->AsyncWatch(&context,&cq_,(void*)this);
std::cout << "AsyncWatchAction create start" << std::endl;
stream = watchServiceStub->AsyncWatch(&context,&cq_,(void*)"create");
WatchRequest watch_req;
WatchCreateRequest watch_create_req;
watch_create_req.set_key(key);
std::string range_end(key);
prefix = recursive;
if(recursive)
{
int ascii = (int)range_end[range_end.length()-1];
@ -22,14 +25,16 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(std::string const & key, bool recursi
}
watch_req.mutable_create_request()->CopyFrom(watch_create_req);
stream->Write(watch_req, (void*)this);
stream->Write(watch_req, (void*)"write");
stream->Read(&reply, (void*)this);
stub_ = stub_;
std::cout << "AsyncWatchAction create end" << std::endl;
}
etcdv3::AsyncWatchAction::AsyncWatchAction(std::string const & key, int fromIndex, bool recursive, KV::Stub* stub_, Watch::Stub* watchServiceStub)
{
stream = watchServiceStub->AsyncWatch(&context,&cq_,(void*)this);
stream = watchServiceStub->AsyncWatch(&context,&cq_,(void*)1);
WatchRequest watch_req;
WatchCreateRequest watch_create_req;
@ -44,36 +49,86 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(std::string const & key, int fromInde
watch_create_req.set_range_end(range_end);
}
watch_req.mutable_create_request()->CopyFrom(watch_create_req);
stream->Write(watch_req, (void*)this);
stream->Write(watch_req, (void*)1);
stream->Read(&reply, (void*)this);
stub_ = stub_;
}
void etcdv3::AsyncWatchAction::WatchReq(std::string const & key)
{
WatchRequest watch_req;
WatchCreateRequest watch_create_req;
watch_create_req.set_key(key);
watch_req.mutable_create_request()->CopyFrom(watch_create_req);
stream->Write(watch_req, (void*)1);
stream->Read(&reply, (void*)this);
}
void etcdv3::AsyncWatchAction::waitForResponse()
{
void* got_tag;
bool ok = false;
stream->Read(&reply, (void*)3);
while(cq_.Next(&got_tag, &ok))
{
if(got_tag == (void*)3)
if(got_tag == (void*)this) // read tag
{
if(reply.events_size())
{
stream->WritesDone((void*)this);
cq_.Next(&got_tag, &ok);
break;
}
}
else
{
stream->Read(&reply, (void*)3);
}
stream->Read(&reply, (void*)this);
}
}
}
}
void etcdv3::AsyncWatchAction::CancelWatch()
{
std::cout << "cancel watch"<< std::endl;
stream->WritesDone((void*)"writes done");
}
void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response)> callback)
{
std::cout << "waitForResponse start" << std::endl;
void* got_tag;
bool ok = false;
while(cq_.Next(&got_tag, &ok))
{
if(ok == false)
{
break;
}
std::cout << "ok status: " << ok << std::endl;
if(got_tag == (void*)"writes done")
{
std::cout << "writes done" << std::endl;
}
else if(got_tag == (void*)this) // read tag
{
std::cout << "read tag" << std::endl;
std::cout << "events size: "<< reply.events_size() << std::endl;
if(reply.events_size())
{
auto resp = ParseResponse();
callback(resp);
std::cout << "events received try to read again" << std::endl;
}
std::cout << " read again" << std::endl;
stream->Read(&reply, (void*)this);
}
}
}
etcdv3::AsyncWatchResponse etcdv3::AsyncWatchAction::ParseResponse()
{