From c75548d9e3410001f530e174d8c5a546b392b263 Mon Sep 17 00:00:00 2001 From: arches Date: Wed, 1 Jun 2016 08:07:09 -0400 Subject: [PATCH] Updated client::set() function to use grpc. Parse response for PutRequest is still stubbed. --- etcd/Client.hpp | 18 ++++++++++++++ src/Client.cpp | 63 ++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 70 insertions(+), 11 deletions(-) diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 8a6929a..3cd23e6 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -10,6 +10,12 @@ #include "proto/rpc.grpc.pb.h" using grpc::Channel; +using grpc::ClientAsyncResponseReader; +using grpc::ClientContext; +using grpc::CompletionQueue; +using grpc::Status; +using etcdserverpb::PutRequest; +using etcdserverpb::PutResponse; using etcdserverpb::KV; namespace etcd @@ -140,6 +146,18 @@ namespace etcd web::http::client::http_client client; std::unique_ptr stub_; + pplx::task send_put(const std::string& key, const std::string& value); + }; + + class AsyncPutResponse + { + public: + PutResponse reply; + Status status; + ClientContext context; + CompletionQueue cq_; + std::unique_ptr> response_reader; + Response ParseResponse(); }; } diff --git a/src/Client.cpp b/src/Client.cpp index 3c571ea..3036416 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -3,15 +3,15 @@ etcd::Client::Client(std::string const & address) : client(address) { - std::string stripped_address(address); - std::string substr("http://"); - std::string::size_type i = stripped_address.find(substr); - if(i != std::string::npos) - { - stripped_address.erase(i,substr.length()); - } - std::shared_ptr channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials()); - stub_= KV::NewStub(channel); + std::string stripped_address(address); + std::string substr("http://"); + std::string::size_type i = stripped_address.find(substr); + if(i != std::string::npos) + { + stripped_address.erase(i,substr.length()); + } + std::shared_ptr channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials()); + stub_= KV::NewStub(channel); } pplx::task etcd::Client::send_get_request(web::http::uri_builder & uri) @@ -39,8 +39,7 @@ pplx::task etcd::Client::get(std::string const & key) pplx::task etcd::Client::set(std::string const & key, std::string const & value) { - web::http::uri_builder uri("/v2/keys" + key); - return send_put_request(uri, "value", value); + return send_put(key,value); } pplx::task etcd::Client::add(std::string const & key, std::string const & value) @@ -134,3 +133,45 @@ pplx::task etcd::Client::watch(std::string const & key, int from uri.append_query("recursive=true"); return send_get_request(uri); } + + +etcd::Response etcd::AsyncPutResponse::ParseResponse() +{ + return etcd::Response(); +} + + +pplx::task etcd::Client::send_put(std::string const & key, std::string const & value) +{ + PutRequest request; + request.set_key(key); + request.set_value(value); + + etcd::AsyncPutResponse* call= new etcd::AsyncPutResponse(); + + call->response_reader = stub_->AsyncPut(&call->context,request,&call->cq_); + + call->response_reader->Finish(&call->reply, &call->status, (void*)call); + + + return pplx::task([call]() + { + void* got_tag; + bool ok = false; + etcd::Response resp; + + //blocking + call->cq_.Next(&got_tag, &ok); + GPR_ASSERT(got_tag == (void*)call); + GPR_ASSERT(ok); + + etcd::AsyncPutResponse* call = static_cast(got_tag); + + if(call->status.ok()) + { + resp = call->ParseResponse(); + } + delete call; + return resp; + }); +}