From 221cdf39f23a6322e9a0ffabb3763e38dc691a9a Mon Sep 17 00:00:00 2001 From: arches Date: Wed, 1 Jun 2016 06:36:59 -0400 Subject: [PATCH 1/4] Removed go files dependencies --- proto/auth.proto | 8 -------- proto/kv.proto | 7 ------- 2 files changed, 15 deletions(-) diff --git a/proto/auth.proto b/proto/auth.proto index 9308a17..50f9c33 100644 --- a/proto/auth.proto +++ b/proto/auth.proto @@ -1,14 +1,6 @@ syntax = "proto3"; package authpb; -import "gogoproto/gogo.proto"; - -option (gogoproto.marshaler_all) = true; -option (gogoproto.sizer_all) = true; -option (gogoproto.unmarshaler_all) = true; -option (gogoproto.goproto_getters_all) = false; -option (gogoproto.goproto_enum_prefix_all) = false; - // User is a single entry in the bucket authUsers message User { bytes name = 1; diff --git a/proto/kv.proto b/proto/kv.proto index f0c82b5..60ab046 100644 --- a/proto/kv.proto +++ b/proto/kv.proto @@ -1,13 +1,6 @@ syntax = "proto3"; package mvccpb; -import "gogoproto/gogo.proto"; - -option (gogoproto.marshaler_all) = true; -option (gogoproto.sizer_all) = true; -option (gogoproto.unmarshaler_all) = true; -option (gogoproto.goproto_getters_all) = false; -option (gogoproto.goproto_enum_prefix_all) = false; message KeyValue { // key is the key in bytes. An empty key is not allowed. From c75548d9e3410001f530e174d8c5a546b392b263 Mon Sep 17 00:00:00 2001 From: arches Date: Wed, 1 Jun 2016 08:07:09 -0400 Subject: [PATCH 2/4] Updated client::set() function to use grpc. Parse response for PutRequest is still stubbed. --- etcd/Client.hpp | 18 ++++++++++++++ src/Client.cpp | 63 ++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 70 insertions(+), 11 deletions(-) diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 8a6929a..3cd23e6 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -10,6 +10,12 @@ #include "proto/rpc.grpc.pb.h" using grpc::Channel; +using grpc::ClientAsyncResponseReader; +using grpc::ClientContext; +using grpc::CompletionQueue; +using grpc::Status; +using etcdserverpb::PutRequest; +using etcdserverpb::PutResponse; using etcdserverpb::KV; namespace etcd @@ -140,6 +146,18 @@ namespace etcd web::http::client::http_client client; std::unique_ptr stub_; + pplx::task send_put(const std::string& key, const std::string& value); + }; + + class AsyncPutResponse + { + public: + PutResponse reply; + Status status; + ClientContext context; + CompletionQueue cq_; + std::unique_ptr> response_reader; + Response ParseResponse(); }; } diff --git a/src/Client.cpp b/src/Client.cpp index 3c571ea..3036416 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -3,15 +3,15 @@ etcd::Client::Client(std::string const & address) : client(address) { - std::string stripped_address(address); - std::string substr("http://"); - std::string::size_type i = stripped_address.find(substr); - if(i != std::string::npos) - { - stripped_address.erase(i,substr.length()); - } - std::shared_ptr channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials()); - stub_= KV::NewStub(channel); + std::string stripped_address(address); + std::string substr("http://"); + std::string::size_type i = stripped_address.find(substr); + if(i != std::string::npos) + { + stripped_address.erase(i,substr.length()); + } + std::shared_ptr channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials()); + stub_= KV::NewStub(channel); } pplx::task etcd::Client::send_get_request(web::http::uri_builder & uri) @@ -39,8 +39,7 @@ pplx::task etcd::Client::get(std::string const & key) pplx::task etcd::Client::set(std::string const & key, std::string const & value) { - web::http::uri_builder uri("/v2/keys" + key); - return send_put_request(uri, "value", value); + return send_put(key,value); } pplx::task etcd::Client::add(std::string const & key, std::string const & value) @@ -134,3 +133,45 @@ pplx::task etcd::Client::watch(std::string const & key, int from uri.append_query("recursive=true"); return send_get_request(uri); } + + +etcd::Response etcd::AsyncPutResponse::ParseResponse() +{ + return etcd::Response(); +} + + +pplx::task etcd::Client::send_put(std::string const & key, std::string const & value) +{ + PutRequest request; + request.set_key(key); + request.set_value(value); + + etcd::AsyncPutResponse* call= new etcd::AsyncPutResponse(); + + call->response_reader = stub_->AsyncPut(&call->context,request,&call->cq_); + + call->response_reader->Finish(&call->reply, &call->status, (void*)call); + + + return pplx::task([call]() + { + void* got_tag; + bool ok = false; + etcd::Response resp; + + //blocking + call->cq_.Next(&got_tag, &ok); + GPR_ASSERT(got_tag == (void*)call); + GPR_ASSERT(ok); + + etcd::AsyncPutResponse* call = static_cast(got_tag); + + if(call->status.ok()) + { + resp = call->ParseResponse(); + } + delete call; + return resp; + }); +} From 6246968086575cec22dd7da757d0e0c12b32dcf6 Mon Sep 17 00:00:00 2001 From: arches Date: Wed, 1 Jun 2016 10:15:34 -0400 Subject: [PATCH 3/4] Update client::get to use grpc. ParseResponse for AsyncRangeResponse is still stubbed --- etcd/Client.hpp | 14 +++++++++++++ src/Client.cpp | 55 +++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 67 insertions(+), 2 deletions(-) diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 3cd23e6..a0c68b0 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -16,6 +16,8 @@ using grpc::CompletionQueue; using grpc::Status; using etcdserverpb::PutRequest; using etcdserverpb::PutResponse; +using etcdserverpb::RangeRequest; +using etcdserverpb::RangeResponse; using etcdserverpb::KV; namespace etcd @@ -147,6 +149,7 @@ namespace etcd std::unique_ptr stub_; pplx::task send_put(const std::string& key, const std::string& value); + pplx::task send_get(std::string const & key); }; class AsyncPutResponse @@ -159,6 +162,17 @@ namespace etcd std::unique_ptr> response_reader; Response ParseResponse(); }; + + class AsyncRangeResponse + { + public: + RangeResponse reply; + Status status; + ClientContext context; + CompletionQueue cq_; + std::unique_ptr> response_reader; + Response ParseResponse(); + }; } #endif diff --git a/src/Client.cpp b/src/Client.cpp index 3036416..f0f6e90 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -33,8 +33,7 @@ pplx::task etcd::Client::send_put_request(web::http::uri_builder pplx::task etcd::Client::get(std::string const & key) { - web::http::uri_builder uri("/v2/keys" + key); - return send_get_request(uri); + return send_get(key); } pplx::task etcd::Client::set(std::string const & key, std::string const & value) @@ -137,9 +136,61 @@ pplx::task etcd::Client::watch(std::string const & key, int from etcd::Response etcd::AsyncPutResponse::ParseResponse() { + std::cout << reply.header().revision() << std::endl; return etcd::Response(); } +etcd::Response etcd::AsyncRangeResponse::ParseResponse() +{ + mvccpb::KeyValue kvs; + if(reply.kvs_size()) + { + int index=0; + do + { + kvs = reply.kvs(index++); + std::cout< etcd::Client::send_get(std::string const & key) +{ + RangeRequest request; + request.set_key(key); + + etcd::AsyncRangeResponse* call= new etcd::AsyncRangeResponse(); + + call->response_reader = stub_->AsyncRange(&call->context,request,&call->cq_); + + call->response_reader->Finish(&call->reply, &call->status, (void*)call); + + return pplx::task([call]() + { + void* got_tag; + bool ok = false; + etcd::Response resp; + + //blocking + call->cq_.Next(&got_tag, &ok); + GPR_ASSERT(got_tag == (void*)call); + GPR_ASSERT(ok); + + etcd::AsyncRangeResponse* call = static_cast(got_tag); + if(call->status.ok()) + { + resp = call->ParseResponse(); + } + + delete call; + return resp; + }); +} + pplx::task etcd::Client::send_put(std::string const & key, std::string const & value) { From 9bb89135402a27ed3bc41cc82e8e6e4fe3381316 Mon Sep 17 00:00:00 2001 From: arches Date: Fri, 3 Jun 2016 06:01:51 -0400 Subject: [PATCH 4/4] Added Interface and Base class interfaces --- v3/include/IResponse.hpp | 9 +++++++++ v3/include/V3Response.hpp | 9 +++++++++ 2 files changed, 18 insertions(+) create mode 100644 v3/include/IResponse.hpp create mode 100644 v3/include/V3Response.hpp diff --git a/v3/include/IResponse.hpp b/v3/include/IResponse.hpp new file mode 100644 index 0000000..0514ff6 --- /dev/null +++ b/v3/include/IResponse.hpp @@ -0,0 +1,9 @@ +#ifndef __I_RESPONSE_HPP__ +#define __I_RESPONSE_HPP__ + +namespace etcdv3 +{ + class IResponse + { + }; +} diff --git a/v3/include/V3Response.hpp b/v3/include/V3Response.hpp new file mode 100644 index 0000000..665b1cf --- /dev/null +++ b/v3/include/V3Response.hpp @@ -0,0 +1,9 @@ +#ifndef __V3_RESPONSE_HPP__ +#define __V3_RESPONSE_HPP__ + +namespace etcdv3 +{ + class V3Response + { + }; +}