From 273710aafe8e740ddbbe408e640c534cdc79ab43 Mon Sep 17 00:00:00 2001 From: arches Date: Mon, 6 Jun 2016 08:00:00 -0400 Subject: [PATCH] Updates for GET and SET --- etcd/Client.hpp | 25 ++------------ etcd/Response.hpp | 55 +++++++++++++++++++++++++++++- etcd/Value.hpp | 2 ++ src/Client.cpp | 86 +++++++++-------------------------------------- src/Response.cpp | 26 ++++++++++++++ src/Value.cpp | 10 ++++++ 6 files changed, 111 insertions(+), 93 deletions(-) diff --git a/etcd/Client.hpp b/etcd/Client.hpp index a0c68b0..13abdfd 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -148,31 +148,12 @@ namespace etcd web::http::client::http_client client; std::unique_ptr stub_; - pplx::task send_put(const std::string& key, const std::string& value); - pplx::task send_get(std::string const & key); + pplx::task send_asyncput(const std::string& key, const std::string& value); + pplx::task send_asyncget(std::string const & key); }; - class AsyncPutResponse - { - public: - PutResponse reply; - Status status; - ClientContext context; - CompletionQueue cq_; - std::unique_ptr> response_reader; - Response ParseResponse(); - }; - class AsyncRangeResponse - { - public: - RangeResponse reply; - Status status; - ClientContext context; - CompletionQueue cq_; - std::unique_ptr> response_reader; - Response ParseResponse(); - }; + } #endif diff --git a/etcd/Response.hpp b/etcd/Response.hpp index 818b5a4..3ff5e63 100644 --- a/etcd/Response.hpp +++ b/etcd/Response.hpp @@ -7,10 +7,32 @@ #include "etcd/Value.hpp" +#include +#include "proto/rpc.grpc.pb.h" +#include "v3/include/V3Response.hpp" + + +using grpc::ClientAsyncResponseReader; +using grpc::ClientContext; +using grpc::CompletionQueue; +using grpc::Status; +using etcdserverpb::PutRequest; +using etcdserverpb::PutResponse; + namespace etcd { typedef std::vector Keys; + class AsyncPutResponse + { + public: + PutResponse reply; + Status status; + ClientContext context; + CompletionQueue cq_; + std::unique_ptr> response_reader; + }; + /** * The Reponse object received for the requests of etcd::Client */ @@ -19,6 +41,35 @@ namespace etcd public: static pplx::task create(pplx::task response_task); + templatestatic pplx::task create(T call) + { + return pplx::task([call]() + { + void* got_tag; + bool ok = false; + etcd::Response resp; + + //blocking + call->cq_.Next(&got_tag, &ok); + GPR_ASSERT(got_tag == (void*)call); + GPR_ASSERT(ok); + + T call = static_cast(got_tag); + if(call->status.ok()) + { + auto v3resp = call->ParseResponse(); + resp = etcd::Response(); + } + else + { + throw std::runtime_error(call->status.error_message()); + } + + delete call; //todo:make this a smart pointer + return resp; + }); + }; + Response(); /** @@ -76,8 +127,10 @@ namespace etcd */ std::string const & key(int index) const; - protected: + protected: Response(web::http::http_response http_response, web::json::value json_value); + Response(const etcdv3::V3Response& response); + Response(PutResponse reply); int _error_code; std::string _error_message; diff --git a/etcd/Value.hpp b/etcd/Value.hpp index ec28dce..3c2ec97 100644 --- a/etcd/Value.hpp +++ b/etcd/Value.hpp @@ -4,6 +4,7 @@ #include #include #include +#include "proto/kv.pb.h" namespace etcd { @@ -43,6 +44,7 @@ namespace etcd friend class Response; Value(); Value(web::json::value const & json_value); + Value(mvccpb::KeyValue const & kvs); std::string _key; bool dir; std::string value; diff --git a/src/Client.cpp b/src/Client.cpp index f0f6e90..161f6d9 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -1,4 +1,5 @@ #include "etcd/Client.hpp" +#include "v3/include/AsyncRangeResponse.hpp" etcd::Client::Client(std::string const & address) : client(address) @@ -33,12 +34,12 @@ pplx::task etcd::Client::send_put_request(web::http::uri_builder pplx::task etcd::Client::get(std::string const & key) { - return send_get(key); + return send_asyncget(key); } pplx::task etcd::Client::set(std::string const & key, std::string const & value) { - return send_put(key,value); + return send_asyncput(key,value); } pplx::task etcd::Client::add(std::string const & key, std::string const & value) @@ -46,6 +47,12 @@ pplx::task etcd::Client::add(std::string const & key, std::strin web::http::uri_builder uri("/v2/keys" + key); uri.append_query("prevExist=false"); return send_put_request(uri, "value", value); + + // since RequestUnion is still not fixed in rpc.proto skip checking if key already exist. + //check first if key exist, use rpc synchronous Range since there's still some problem + //in rpc Txn; + + //return send_put(key,value); } pplx::task etcd::Client::modify(std::string const & key, std::string const & value) @@ -134,65 +141,22 @@ pplx::task etcd::Client::watch(std::string const & key, int from } -etcd::Response etcd::AsyncPutResponse::ParseResponse() -{ - std::cout << reply.header().revision() << std::endl; - return etcd::Response(); -} - -etcd::Response etcd::AsyncRangeResponse::ParseResponse() -{ - mvccpb::KeyValue kvs; - if(reply.kvs_size()) - { - int index=0; - do - { - kvs = reply.kvs(index++); - std::cout< etcd::Client::send_get(std::string const & key) +pplx::task etcd::Client::send_asyncget(std::string const & key) { RangeRequest request; request.set_key(key); - etcd::AsyncRangeResponse* call= new etcd::AsyncRangeResponse(); + etcdv3::AsyncRangeResponse* call= new etcdv3::AsyncRangeResponse(); call->response_reader = stub_->AsyncRange(&call->context,request,&call->cq_); call->response_reader->Finish(&call->reply, &call->status, (void*)call); - - return pplx::task([call]() - { - void* got_tag; - bool ok = false; - etcd::Response resp; - //blocking - call->cq_.Next(&got_tag, &ok); - GPR_ASSERT(got_tag == (void*)call); - GPR_ASSERT(ok); - - etcd::AsyncRangeResponse* call = static_cast(got_tag); - if(call->status.ok()) - { - resp = call->ParseResponse(); - } - - delete call; - return resp; - }); + return Response::create(call); } -pplx::task etcd::Client::send_put(std::string const & key, std::string const & value) +pplx::task etcd::Client::send_asyncput(std::string const & key, std::string const & value) { PutRequest request; request.set_key(key); @@ -204,25 +168,7 @@ pplx::task etcd::Client::send_put(std::string const & key, std:: call->response_reader->Finish(&call->reply, &call->status, (void*)call); - - return pplx::task([call]() - { - void* got_tag; - bool ok = false; - etcd::Response resp; - - //blocking - call->cq_.Next(&got_tag, &ok); - GPR_ASSERT(got_tag == (void*)call); - GPR_ASSERT(ok); - - etcd::AsyncPutResponse* call = static_cast(got_tag); - - if(call->status.ok()) - { - resp = call->ParseResponse(); - } - delete call; - return resp; - }); + return Response::create(call); } + + diff --git a/src/Response.cpp b/src/Response.cpp index ae32089..8c05cbf 100644 --- a/src/Response.cpp +++ b/src/Response.cpp @@ -1,6 +1,7 @@ #include "etcd/Response.hpp" #include "json_constants.hpp" + pplx::task etcd::Response::create(pplx::task response_task) { return pplx::task ([response_task](){ @@ -9,6 +10,31 @@ pplx::task etcd::Response::create(pplx::task 1) + { + for(int x = 0; x < size; x++) + _values.push_back(Value(reply.values[x])); + } + else if(size == 1) + { + _value = Value(reply.values[0]); + } +} + +etcd::Response::Response(PutResponse reply) + :_error_code(0), + _index(0) +{ +} + + etcd::Response::Response() : _error_code(0), _index(0) diff --git a/src/Value.cpp b/src/Value.cpp index 2e5c2c9..618e5e1 100644 --- a/src/Value.cpp +++ b/src/Value.cpp @@ -1,5 +1,6 @@ #include "etcd/Value.hpp" #include "json_constants.hpp" +#include "proto/kv.pb.h" etcd::Value::Value() : dir(false), @@ -17,6 +18,15 @@ etcd::Value::Value(web::json::value const & json_value) { } +etcd::Value::Value(mvccpb::KeyValue const & kvs) +{ + dir=false; + _key=kvs.key(); + value=kvs.value(); + created=kvs.create_revision(); + modified=kvs.mod_revision(); +} + std::string const & etcd::Value::key() const { return _key;