diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 60ca4d3..bebfff2 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -10,6 +10,14 @@ #include "proto/rpc.grpc.pb.h" using grpc::Channel; +using grpc::ClientAsyncResponseReader; +using grpc::ClientContext; +using grpc::CompletionQueue; +using grpc::Status; +using etcdserverpb::PutRequest; +using etcdserverpb::PutResponse; +using etcdserverpb::RangeRequest; +using etcdserverpb::RangeResponse; using etcdserverpb::KV; using etcdserverpb::Watch; @@ -149,6 +157,30 @@ namespace etcd std::unique_ptr stub_; std::unique_ptr watchServiceStub; + pplx::task send_put(const std::string& key, const std::string& value); + pplx::task send_get(std::string const & key); + }; + + class AsyncPutResponse + { + public: + PutResponse reply; + Status status; + ClientContext context; + CompletionQueue cq_; + std::unique_ptr> response_reader; + Response ParseResponse(); + }; + + class AsyncRangeResponse + { + public: + RangeResponse reply; + Status status; + ClientContext context; + CompletionQueue cq_; + std::unique_ptr> response_reader; + Response ParseResponse(); }; } diff --git a/src/Client.cpp b/src/Client.cpp index f151643..550bc8b 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -6,15 +6,15 @@ etcd::Client::Client(std::string const & address) : client(address) { - std::string stripped_address(address); - std::string substr("http://"); - std::string::size_type i = stripped_address.find(substr); - if(i != std::string::npos) - { - stripped_address.erase(i,substr.length()); - } - std::shared_ptr channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials()); - stub_= KV::NewStub(channel); + std::string stripped_address(address); + std::string substr("http://"); + std::string::size_type i = stripped_address.find(substr); + if(i != std::string::npos) + { + stripped_address.erase(i,substr.length()); + } + std::shared_ptr channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials()); + stub_= KV::NewStub(channel); watchServiceStub = Watch::NewStub(channel); } @@ -37,14 +37,12 @@ pplx::task etcd::Client::send_put_request(web::http::uri_builder pplx::task etcd::Client::get(std::string const & key) { - web::http::uri_builder uri("/v2/keys" + key); - return send_get_request(uri); + return send_get(key); } pplx::task etcd::Client::set(std::string const & key, std::string const & value) { - web::http::uri_builder uri("/v2/keys" + key); - return send_put_request(uri, "value", value); + return send_put(key,value); } void etcd::Client::setv3(std::string const &key, std::string const &value) @@ -246,3 +244,97 @@ pplx::task etcd::Client::watch(std::string const & key, int from } +etcd::Response etcd::AsyncPutResponse::ParseResponse() +{ + std::cout << reply.header().revision() << std::endl; + return etcd::Response(); +} + +etcd::Response etcd::AsyncRangeResponse::ParseResponse() +{ + mvccpb::KeyValue kvs; + if(reply.kvs_size()) + { + int index=0; + do + { + kvs = reply.kvs(index++); + std::cout< etcd::Client::send_get(std::string const & key) +{ + RangeRequest request; + request.set_key(key); + + etcd::AsyncRangeResponse* call= new etcd::AsyncRangeResponse(); + + call->response_reader = stub_->AsyncRange(&call->context,request,&call->cq_); + + call->response_reader->Finish(&call->reply, &call->status, (void*)call); + + return pplx::task([call]() + { + void* got_tag; + bool ok = false; + etcd::Response resp; + + //blocking + call->cq_.Next(&got_tag, &ok); + GPR_ASSERT(got_tag == (void*)call); + GPR_ASSERT(ok); + + etcd::AsyncRangeResponse* call = static_cast(got_tag); + if(call->status.ok()) + { + resp = call->ParseResponse(); + } + + delete call; + return resp; + }); +} + + +pplx::task etcd::Client::send_put(std::string const & key, std::string const & value) +{ + PutRequest request; + request.set_key(key); + request.set_value(value); + + etcd::AsyncPutResponse* call= new etcd::AsyncPutResponse(); + + call->response_reader = stub_->AsyncPut(&call->context,request,&call->cq_); + + call->response_reader->Finish(&call->reply, &call->status, (void*)call); + + + return pplx::task([call]() + { + void* got_tag; + bool ok = false; + etcd::Response resp; + + //blocking + call->cq_.Next(&got_tag, &ok); + GPR_ASSERT(got_tag == (void*)call); + GPR_ASSERT(ok); + + etcd::AsyncPutResponse* call = static_cast(got_tag); + + if(call->status.ok()) + { + resp = call->ParseResponse(); + } + delete call; + return resp; + }); +} + + diff --git a/v3/include/IResponse.hpp b/v3/include/IResponse.hpp new file mode 100644 index 0000000..0514ff6 --- /dev/null +++ b/v3/include/IResponse.hpp @@ -0,0 +1,9 @@ +#ifndef __I_RESPONSE_HPP__ +#define __I_RESPONSE_HPP__ + +namespace etcdv3 +{ + class IResponse + { + }; +} diff --git a/v3/include/V3Response.hpp b/v3/include/V3Response.hpp new file mode 100644 index 0000000..665b1cf --- /dev/null +++ b/v3/include/V3Response.hpp @@ -0,0 +1,9 @@ +#ifndef __V3_RESPONSE_HPP__ +#define __V3_RESPONSE_HPP__ + +namespace etcdv3 +{ + class V3Response + { + }; +}