Merge remote-tracking branch 'origin/other_dev' into lampayan

# Conflicts:
#	etcd/Client.hpp
#	proto/kv.proto
#	src/Client.cpp
This commit is contained in:
lampayan 2016-06-07 15:41:12 +02:00
commit 98e2b57ed4
4 changed files with 155 additions and 13 deletions

View File

@ -10,6 +10,14 @@
#include "proto/rpc.grpc.pb.h" #include "proto/rpc.grpc.pb.h"
using grpc::Channel; using grpc::Channel;
using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using etcdserverpb::PutRequest;
using etcdserverpb::PutResponse;
using etcdserverpb::RangeRequest;
using etcdserverpb::RangeResponse;
using etcdserverpb::KV; using etcdserverpb::KV;
using etcdserverpb::Watch; using etcdserverpb::Watch;
@ -149,6 +157,30 @@ namespace etcd
std::unique_ptr<KV::Stub> stub_; std::unique_ptr<KV::Stub> stub_;
std::unique_ptr<Watch::Stub> watchServiceStub; std::unique_ptr<Watch::Stub> watchServiceStub;
pplx::task<etcd::Response> send_put(const std::string& key, const std::string& value);
pplx::task<etcd::Response> send_get(std::string const & key);
};
class AsyncPutResponse
{
public:
PutResponse reply;
Status status;
ClientContext context;
CompletionQueue cq_;
std::unique_ptr<ClientAsyncResponseReader<PutResponse>> response_reader;
Response ParseResponse();
};
class AsyncRangeResponse
{
public:
RangeResponse reply;
Status status;
ClientContext context;
CompletionQueue cq_;
std::unique_ptr<ClientAsyncResponseReader<RangeResponse>> response_reader;
Response ParseResponse();
}; };
} }

View File

@ -6,15 +6,15 @@
etcd::Client::Client(std::string const & address) etcd::Client::Client(std::string const & address)
: client(address) : client(address)
{ {
std::string stripped_address(address); std::string stripped_address(address);
std::string substr("http://"); std::string substr("http://");
std::string::size_type i = stripped_address.find(substr); std::string::size_type i = stripped_address.find(substr);
if(i != std::string::npos) if(i != std::string::npos)
{ {
stripped_address.erase(i,substr.length()); stripped_address.erase(i,substr.length());
} }
std::shared_ptr<Channel> channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials()); std::shared_ptr<Channel> channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials());
stub_= KV::NewStub(channel); stub_= KV::NewStub(channel);
watchServiceStub = Watch::NewStub(channel); watchServiceStub = Watch::NewStub(channel);
} }
@ -37,14 +37,12 @@ pplx::task<etcd::Response> etcd::Client::send_put_request(web::http::uri_builder
pplx::task<etcd::Response> etcd::Client::get(std::string const & key) pplx::task<etcd::Response> etcd::Client::get(std::string const & key)
{ {
web::http::uri_builder uri("/v2/keys" + key); return send_get(key);
return send_get_request(uri);
} }
pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::string const & value) pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::string const & value)
{ {
web::http::uri_builder uri("/v2/keys" + key); return send_put(key,value);
return send_put_request(uri, "value", value);
} }
void etcd::Client::setv3(std::string const &key, std::string const &value) void etcd::Client::setv3(std::string const &key, std::string const &value)
@ -246,3 +244,97 @@ pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, int from
} }
etcd::Response etcd::AsyncPutResponse::ParseResponse()
{
std::cout << reply.header().revision() << std::endl;
return etcd::Response();
}
etcd::Response etcd::AsyncRangeResponse::ParseResponse()
{
mvccpb::KeyValue kvs;
if(reply.kvs_size())
{
int index=0;
do
{
kvs = reply.kvs(index++);
std::cout<<reply.header().revision() << std::endl;
std::cout << kvs.create_revision() << std::endl;
std::cout << kvs.mod_revision() << std::endl;
std::cout << kvs.version() << std::endl;
}while(reply.more());
}
return etcd::Response();
}
pplx::task<etcd::Response> etcd::Client::send_get(std::string const & key)
{
RangeRequest request;
request.set_key(key);
etcd::AsyncRangeResponse* call= new etcd::AsyncRangeResponse();
call->response_reader = stub_->AsyncRange(&call->context,request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call);
return pplx::task<etcd::Response>([call]()
{
void* got_tag;
bool ok = false;
etcd::Response resp;
//blocking
call->cq_.Next(&got_tag, &ok);
GPR_ASSERT(got_tag == (void*)call);
GPR_ASSERT(ok);
etcd::AsyncRangeResponse* call = static_cast<etcd::AsyncRangeResponse*>(got_tag);
if(call->status.ok())
{
resp = call->ParseResponse();
}
delete call;
return resp;
});
}
pplx::task<etcd::Response> etcd::Client::send_put(std::string const & key, std::string const & value)
{
PutRequest request;
request.set_key(key);
request.set_value(value);
etcd::AsyncPutResponse* call= new etcd::AsyncPutResponse();
call->response_reader = stub_->AsyncPut(&call->context,request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call);
return pplx::task<etcd::Response>([call]()
{
void* got_tag;
bool ok = false;
etcd::Response resp;
//blocking
call->cq_.Next(&got_tag, &ok);
GPR_ASSERT(got_tag == (void*)call);
GPR_ASSERT(ok);
etcd::AsyncPutResponse* call = static_cast<etcd::AsyncPutResponse*>(got_tag);
if(call->status.ok())
{
resp = call->ParseResponse();
}
delete call;
return resp;
});
}

9
v3/include/IResponse.hpp Normal file
View File

@ -0,0 +1,9 @@
#ifndef __I_RESPONSE_HPP__
#define __I_RESPONSE_HPP__
namespace etcdv3
{
class IResponse
{
};
}

View File

@ -0,0 +1,9 @@
#ifndef __V3_RESPONSE_HPP__
#define __V3_RESPONSE_HPP__
namespace etcdv3
{
class V3Response
{
};
}