Updates for GET and SET

This commit is contained in:
arches 2016-06-06 08:00:00 -04:00
parent bd54dffed7
commit 273710aafe
6 changed files with 111 additions and 93 deletions

View File

@ -148,31 +148,12 @@ namespace etcd
web::http::client::http_client client;
std::unique_ptr<KV::Stub> stub_;
pplx::task<etcd::Response> send_put(const std::string& key, const std::string& value);
pplx::task<etcd::Response> send_get(std::string const & key);
pplx::task<etcd::Response> send_asyncput(const std::string& key, const std::string& value);
pplx::task<etcd::Response> send_asyncget(std::string const & key);
};
class AsyncPutResponse
{
public:
PutResponse reply;
Status status;
ClientContext context;
CompletionQueue cq_;
std::unique_ptr<ClientAsyncResponseReader<PutResponse>> response_reader;
Response ParseResponse();
};
class AsyncRangeResponse
{
public:
RangeResponse reply;
Status status;
ClientContext context;
CompletionQueue cq_;
std::unique_ptr<ClientAsyncResponseReader<RangeResponse>> response_reader;
Response ParseResponse();
};
}
#endif

View File

@ -7,10 +7,32 @@
#include "etcd/Value.hpp"
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/V3Response.hpp"
using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using etcdserverpb::PutRequest;
using etcdserverpb::PutResponse;
namespace etcd
{
typedef std::vector<std::string> Keys;
class AsyncPutResponse
{
public:
PutResponse reply;
Status status;
ClientContext context;
CompletionQueue cq_;
std::unique_ptr<ClientAsyncResponseReader<PutResponse>> response_reader;
};
/**
* The Reponse object received for the requests of etcd::Client
*/
@ -19,6 +41,35 @@ namespace etcd
public:
static pplx::task<Response> create(pplx::task<web::http::http_response> response_task);
template<typename T>static pplx::task<etcd::Response> create(T call)
{
return pplx::task<etcd::Response>([call]()
{
void* got_tag;
bool ok = false;
etcd::Response resp;
//blocking
call->cq_.Next(&got_tag, &ok);
GPR_ASSERT(got_tag == (void*)call);
GPR_ASSERT(ok);
T call = static_cast<T>(got_tag);
if(call->status.ok())
{
auto v3resp = call->ParseResponse();
resp = etcd::Response();
}
else
{
throw std::runtime_error(call->status.error_message());
}
delete call; //todo:make this a smart pointer
return resp;
});
};
Response();
/**
@ -76,8 +127,10 @@ namespace etcd
*/
std::string const & key(int index) const;
protected:
protected:
Response(web::http::http_response http_response, web::json::value json_value);
Response(const etcdv3::V3Response& response);
Response(PutResponse reply);
int _error_code;
std::string _error_message;

View File

@ -4,6 +4,7 @@
#include <cpprest/http_client.h>
#include <string>
#include <vector>
#include "proto/kv.pb.h"
namespace etcd
{
@ -43,6 +44,7 @@ namespace etcd
friend class Response;
Value();
Value(web::json::value const & json_value);
Value(mvccpb::KeyValue const & kvs);
std::string _key;
bool dir;
std::string value;

View File

@ -1,4 +1,5 @@
#include "etcd/Client.hpp"
#include "v3/include/AsyncRangeResponse.hpp"
etcd::Client::Client(std::string const & address)
: client(address)
@ -33,12 +34,12 @@ pplx::task<etcd::Response> etcd::Client::send_put_request(web::http::uri_builder
pplx::task<etcd::Response> etcd::Client::get(std::string const & key)
{
return send_get(key);
return send_asyncget(key);
}
pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::string const & value)
{
return send_put(key,value);
return send_asyncput(key,value);
}
pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::string const & value)
@ -46,6 +47,12 @@ pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::strin
web::http::uri_builder uri("/v2/keys" + key);
uri.append_query("prevExist=false");
return send_put_request(uri, "value", value);
// since RequestUnion is still not fixed in rpc.proto skip checking if key already exist.
//check first if key exist, use rpc synchronous Range since there's still some problem
//in rpc Txn;
//return send_put(key,value);
}
pplx::task<etcd::Response> etcd::Client::modify(std::string const & key, std::string const & value)
@ -134,65 +141,22 @@ pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, int from
}
etcd::Response etcd::AsyncPutResponse::ParseResponse()
{
std::cout << reply.header().revision() << std::endl;
return etcd::Response();
}
etcd::Response etcd::AsyncRangeResponse::ParseResponse()
{
mvccpb::KeyValue kvs;
if(reply.kvs_size())
{
int index=0;
do
{
kvs = reply.kvs(index++);
std::cout<<reply.header().revision() << std::endl;
std::cout << kvs.create_revision() << std::endl;
std::cout << kvs.mod_revision() << std::endl;
std::cout << kvs.version() << std::endl;
}while(reply.more());
}
return etcd::Response();
}
pplx::task<etcd::Response> etcd::Client::send_get(std::string const & key)
pplx::task<etcd::Response> etcd::Client::send_asyncget(std::string const & key)
{
RangeRequest request;
request.set_key(key);
etcd::AsyncRangeResponse* call= new etcd::AsyncRangeResponse();
etcdv3::AsyncRangeResponse* call= new etcdv3::AsyncRangeResponse();
call->response_reader = stub_->AsyncRange(&call->context,request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call);
return pplx::task<etcd::Response>([call]()
{
void* got_tag;
bool ok = false;
etcd::Response resp;
//blocking
call->cq_.Next(&got_tag, &ok);
GPR_ASSERT(got_tag == (void*)call);
GPR_ASSERT(ok);
etcd::AsyncRangeResponse* call = static_cast<etcd::AsyncRangeResponse*>(got_tag);
if(call->status.ok())
{
resp = call->ParseResponse();
}
delete call;
return resp;
});
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::send_put(std::string const & key, std::string const & value)
pplx::task<etcd::Response> etcd::Client::send_asyncput(std::string const & key, std::string const & value)
{
PutRequest request;
request.set_key(key);
@ -204,25 +168,7 @@ pplx::task<etcd::Response> etcd::Client::send_put(std::string const & key, std::
call->response_reader->Finish(&call->reply, &call->status, (void*)call);
return pplx::task<etcd::Response>([call]()
{
void* got_tag;
bool ok = false;
etcd::Response resp;
//blocking
call->cq_.Next(&got_tag, &ok);
GPR_ASSERT(got_tag == (void*)call);
GPR_ASSERT(ok);
etcd::AsyncPutResponse* call = static_cast<etcd::AsyncPutResponse*>(got_tag);
if(call->status.ok())
{
resp = call->ParseResponse();
}
delete call;
return resp;
});
return Response::create(call);
}

View File

@ -1,6 +1,7 @@
#include "etcd/Response.hpp"
#include "json_constants.hpp"
pplx::task<etcd::Response> etcd::Response::create(pplx::task<web::http::http_response> response_task)
{
return pplx::task<etcd::Response> ([response_task](){
@ -9,6 +10,31 @@ pplx::task<etcd::Response> etcd::Response::create(pplx::task<web::http::http_res
});
}
etcd::Response::Response(const etcdv3::V3Response& reply)
{
_index = reply.index;
_error_code = reply.error_code;
_error_message = reply.error_message;
_action = reply.action;
int size = reply.values.size();
if(size > 1)
{
for(int x = 0; x < size; x++)
_values.push_back(Value(reply.values[x]));
}
else if(size == 1)
{
_value = Value(reply.values[0]);
}
}
etcd::Response::Response(PutResponse reply)
:_error_code(0),
_index(0)
{
}
etcd::Response::Response()
: _error_code(0),
_index(0)

View File

@ -1,5 +1,6 @@
#include "etcd/Value.hpp"
#include "json_constants.hpp"
#include "proto/kv.pb.h"
etcd::Value::Value()
: dir(false),
@ -17,6 +18,15 @@ etcd::Value::Value(web::json::value const & json_value)
{
}
etcd::Value::Value(mvccpb::KeyValue const & kvs)
{
dir=false;
_key=kvs.key();
value=kvs.value();
created=kvs.create_revision();
modified=kvs.mod_revision();
}
std::string const & etcd::Value::key() const
{
return _key;