This commit is contained in:
Arches 2016-06-09 15:58:08 +02:00 committed by arches
parent b7500a17cb
commit 8825b43044
11 changed files with 218 additions and 246 deletions

View File

@ -8,18 +8,12 @@
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h" #include "proto/rpc.grpc.pb.h"
#include "v3/include/AsyncRangeResponse.hpp"
#include "v3/include/grpcClient.hpp"
using grpc::Channel;
using grpc::ClientAsyncResponseReader; using grpc::ClientAsyncResponseReader;
using grpc::ClientContext; using grpc::ClientContext;
using grpc::CompletionQueue; using grpc::CompletionQueue;
using grpc::Status; using grpc::Status;
using etcdserverpb::PutRequest;
using etcdserverpb::PutResponse; using etcdserverpb::PutResponse;
using etcdserverpb::RangeRequest;
using etcdserverpb::RangeResponse; using etcdserverpb::RangeResponse;
using etcdserverpb::KV; using etcdserverpb::KV;
using etcdserverpb::Watch; using etcdserverpb::Watch;
@ -161,15 +155,12 @@ namespace etcd
pplx::task<etcd::Response> send_get(std::string const & key); pplx::task<etcd::Response> send_get(std::string const & key);
pplx::task<etcd::Response> send_asyncmodify_if(std::string const & key, std::string const & value, std::string const & old_value); pplx::task<etcd::Response> send_asyncmodify_if(std::string const & key, std::string const & value, std::string const & old_value);
etcdv3::grpcClient grpcClient;
private: private:
pplx::task<Response> removeEntryWithKey(std::string const &entryKey); pplx::task<Response> removeEntryWithKey(std::string const &entryKey);
pplx::task<Response> removeEntryWithKeyAndValue(std::string const &entryKey, std::string const &oldValue); pplx::task<Response> removeEntryWithKeyAndValue(std::string const &entryKey, std::string const &oldValue);
pplx::task<Response> removeEntryWithKeyAndIndex(std::string const &entryKey, int oldIndex); pplx::task<Response> removeEntryWithKeyAndIndex(std::string const &entryKey, int oldIndex);
pplx::task<Response> modifyEntryWithValueAndOldIndex(std::string const & key, std::string const & value, int old_index); pplx::task<Response> modifyEntryWithValueAndOldIndex(std::string const & key, std::string const & value, int old_index);
}; };

View File

@ -1,4 +1,4 @@
add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/Utils.cpp ../v3/src/grpcClient.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncPutResponse.cpp ../v3/src/V3BaseResponse.cpp ../v3/src/AsyncDelResponse.cpp ../v3/src/AsyncModifyResponse.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp) add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/Utils.cpp ../v3/src/grpcClient.cpp ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncPutResponse.cpp ../v3/src/V3BaseResponse.cpp ../v3/src/AsyncDelResponse.cpp ../v3/src/AsyncModifyResponse.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp)
set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11) set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11)
target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++) target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++)

View File

@ -1,22 +1,29 @@
#include <memory>
#include "etcd/Client.hpp" #include "etcd/Client.hpp"
#include "v3/include/AsyncRangeResponse.hpp"
#include "v3/include/AsyncPutResponse.hpp"
#include "v3/include/AsyncTxnResponse.hpp" #include "v3/include/AsyncTxnResponse.hpp"
#include "v3/include/AsyncDelResponse.hpp" #include "v3/include/AsyncDelResponse.hpp"
#include "v3/include/AsyncModifyResponse.hpp" #include "v3/include/AsyncModifyResponse.hpp"
#include "v3/include/Utils.hpp"
#include <iostream> #include <iostream>
#include <memory>
using grpc::Channel;
using etcdserverpb::PutRequest;
using etcdserverpb::RangeRequest;
using etcdserverpb::TxnRequest; using etcdserverpb::TxnRequest;
using etcdserverpb::Compare; using etcdserverpb::Compare;
using etcdserverpb::RequestOp; using etcdserverpb::RequestOp;
etcd::Client::Client(std::string const & address) etcd::Client::Client(std::string const & address)
: client(address), grpcClient(address) : client(address)
{ {
std::string stripped_address(address);
std::string substr("http://");
std::string::size_type i = stripped_address.find(substr);
if(i != std::string::npos)
{
stripped_address.erase(i,substr.length());
}
std::shared_ptr<Channel> channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials());
stub_= KV::NewStub(channel);
} }
pplx::task<etcd::Response> etcd::Client::send_get_request(web::http::uri_builder & uri) pplx::task<etcd::Response> etcd::Client::send_get_request(web::http::uri_builder & uri)
@ -259,30 +266,30 @@ pplx::task<etcd::Response> etcd::Client::send_asyncadd(std::string const & key,
compare->set_version(0); compare->set_version(0);
//get key whether success or failure //get key on failure
RangeRequest get_request1 = new RangeRequest(); std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request1->set_key(key); get_request->set_key(key);
RequestOp* req_failure = txn_request.add_failure(); RequestOp* req_failure = txn_request.add_failure();
req_failure->set_allocated_request_range(get_request1); req_failure->set_allocated_request_range(get_request.release());
//if success, add key and then get new value of key //if success, add key and then get new value of key
PutRequest* put_request = new PutRequest(); std::unique_ptr<PutRequest> put_request(new PutRequest());
put_request->set_key(key); put_request->set_key(key);
put_request->set_value(value); put_request->set_value(value);
RequestOp* req_success2 = txn_request.add_success();
req_success2->set_allocated_request_put(put_request);
RangeRequest* get_request2 = new RangeRequest(); RequestOp* req_success = txn_request.add_success();
get_request2->set_key(key); req_success->set_allocated_request_put(put_request.release());
RequestOp* req_success3 = txn_request.add_success();
req_success3->set_allocated_request_range(get_request2); get_request.reset(new RangeRequest());
get_request->set_key(key);
req_success = txn_request.add_success();
req_success->set_allocated_request_range(get_request.release());
etcdv3::AsyncTxnResponse* call(new etcdv3::AsyncTxnResponse("create"));
etcdv3::AsyncTxnResponse* call= new etcdv3::AsyncTxnResponse("create"); call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_);
call->response_reader = grpcClient.stub_->AsyncTxn(&call->context,txn_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call); call->response_reader->Finish(&call->reply, &call->status, (void*)call);
@ -293,36 +300,50 @@ pplx::task<etcd::Response> etcd::Client::send_asyncadd(std::string const & key,
pplx::task<etcd::Response> etcd::Client::send_asyncmodify_if(std::string const & key, std::string const & value, std::string const & old_value) pplx::task<etcd::Response> etcd::Client::send_asyncmodify_if(std::string const & key, std::string const & value, std::string const & old_value)
{ {
//check current key is equal to old_value //check key exist
etcdv3::AsyncRangeResponse* resp = etcdv3::Utils::getKey(key, grpcClient); TxnRequest txn_request;
if(!resp->reply.kvs_size()) Compare* compare = txn_request.add_compare();
{ compare->set_result(Compare::CompareResult::Compare_CompareResult_GREATER);
resp->error_code=100; compare->set_target(Compare::CompareTarget::Compare_CompareTarget_VERSION);
resp->error_message="Key not found"; compare->set_key(key);
return Response::createResponse(*resp); compare->set_version(0);
}
else
{
if(resp->reply.kvs(0).value() != old_value)
{
resp->error_code=101;
resp->error_message="Compare failed";
return Response::createResponse(*resp);
}
}
PutRequest put_request; //check key value is equal to old_value
put_request.set_key(key); compare = txn_request.add_compare();
put_request.set_value(value); compare->set_result(Compare::CompareResult::Compare_CompareResult_EQUAL);
compare->set_target(Compare::CompareTarget::Compare_CompareTarget_VALUE);
compare->set_key(key);
compare->set_value(old_value);
etcdv3::AsyncModifyResponse* call= new etcdv3::AsyncModifyResponse("compareAndSwap"); etcdv3::AsyncPutResponse* call= new etcdv3::AsyncPutResponse("compareAndSwap");
//get key on failure
std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key);
RequestOp* req_failure = txn_request.add_failure();
req_failure->set_allocated_request_range(get_request.release());
//below 2 lines can be removed once we are able to use Txn //on success get key value then modify and get new value
call->prev_value = resp->reply.kvs(0); get_request.reset(new RangeRequest());
call->client = &grpcClient; get_request->set_key(key);
call->key = key; RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_range(get_request.release());
call->rpcInstance = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_); std::unique_ptr<PutRequest> put_request(new PutRequest());
put_request->set_key(key);
put_request->set_value(value);
req_success = txn_request.add_success();
req_success->set_allocated_request_put(put_request.release());
get_request.reset(new RangeRequest());
get_request->set_key(key);
req_success = txn_request.add_success();
req_success->set_allocated_request_range(get_request.release());
etcdv3::AsyncTxnResponse* call= new etcdv3::AsyncTxnResponse("compareAndSwap");
call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_);
call->rpcInstance->Finish(&call->putResponse, &call->status, (void*)call); call->rpcInstance->Finish(&call->putResponse, &call->status, (void*)call);
@ -333,27 +354,44 @@ pplx::task<etcd::Response> etcd::Client::send_asyncmodify_if(std::string const &
pplx::task<etcd::Response> etcd::Client::send_asyncmodify(std::string const & key, std::string const & value) pplx::task<etcd::Response> etcd::Client::send_asyncmodify(std::string const & key, std::string const & value)
{ {
//check if key already exist //check if key is present
etcdv3::AsyncRangeResponse* resp = etcdv3::Utils::getKey(key, grpcClient); TxnRequest txn_request;
if(!resp->reply.kvs_size()) Compare* compare = txn_request.add_compare();
{ compare->set_result(Compare::CompareResult::Compare_CompareResult_GREATER);
resp->error_code=100; compare->set_target(Compare::CompareTarget::Compare_CompareTarget_VERSION);
resp->error_message="Key not found"; compare->set_key(key);
return Response::createResponse(*resp); compare->set_version(0);
}
PutRequest put_request; //success or failure
put_request.set_key(key); //get key value before modification
put_request.set_value(value); std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key);
RequestOp* req_failure = txn_request.add_failure();
req_failure->set_allocated_request_range(get_request.release());
etcdv3::AsyncModifyResponse* call= new etcdv3::AsyncModifyResponse("update"); get_request.reset(new RangeRequest());
get_request->set_key(key);
RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_range(get_request.release());
//below 2 lines can be removed once we are able to use Txn
call->prev_value = resp->reply.kvs(0);
call->client = &grpcClient;
call->key = key;
call->rpcInstance = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_); //if success, modify key and then get new value of key
std::unique_ptr<PutRequest> put_request(new PutRequest());
put_request->set_key(key);
put_request->set_value(value);
req_success = txn_request.add_success();
req_success->set_allocated_request_put(put_request.release());
get_request.reset(new RangeRequest());
get_request->set_key(key);
req_success = txn_request.add_success();
req_success->set_allocated_request_range(get_request.release());
etcdv3::AsyncTxnResponse* call= new etcdv3::AsyncTxnResponse("update");
call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_);
call->rpcInstance->Finish(&call->putResponse, &call->status, (void*)call); call->rpcInstance->Finish(&call->putResponse, &call->status, (void*)call);
@ -364,12 +402,32 @@ pplx::task<etcd::Response> etcd::Client::send_asyncmodify(std::string const & ke
pplx::task<etcd::Response> etcd::Client::send_asyncget(std::string const & key) pplx::task<etcd::Response> etcd::Client::send_asyncget(std::string const & key)
{ {
RangeRequest request; //check key exist
request.set_key(key); TxnRequest txn_request;
Compare* compare = txn_request.add_compare();
compare->set_result(Compare::CompareResult::Compare_CompareResult_GREATER);
compare->set_target(Compare::CompareTarget::Compare_CompareTarget_VERSION);
compare->set_key(key);
compare->set_version(0);
etcdv3::AsyncRangeResponse* call= new etcdv3::AsyncRangeResponse(); //get key on failure or success
std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key);
call->response_reader = grpcClient.stub_->AsyncRange(&call->context,request,&call->cq_); RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_range(get_request.release());
get_request.reset(new RangeRequest());
get_request->set_key(key);
RequestOp* req_failure = txn_request.add_failure();
req_failure->set_allocated_request_range(get_request.release());
etcdv3::AsyncTxnResponse* call= new etcdv3::AsyncTxnResponse("get");
call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call); call->response_reader->Finish(&call->reply, &call->status, (void*)call);
@ -379,23 +437,47 @@ pplx::task<etcd::Response> etcd::Client::send_asyncget(std::string const & key)
pplx::task<etcd::Response> etcd::Client::send_asyncput(std::string const & key, std::string const & value) pplx::task<etcd::Response> etcd::Client::send_asyncput(std::string const & key, std::string const & value)
{ {
PutRequest put_request; //check if key is not present
put_request.set_key(key); TxnRequest txn_request;
put_request.set_value(value); Compare* compare = txn_request.add_compare();
compare->set_result(Compare::CompareResult::Compare_CompareResult_EQUAL);
compare->set_target(Compare::CompareTarget::Compare_CompareTarget_VERSION);
compare->set_key(key);
compare->set_version(0);
etcdv3::AsyncPutResponse* call= new etcdv3::AsyncPutResponse("set");
//get current value //get key on failure, get key before put, modify and then get updated key
etcdv3::AsyncRangeResponse* resp = etcdv3::Utils::getKey(key, grpcClient); std::unique_ptr<RangeRequest> get_request(new RangeRequest());
if(resp->reply.kvs_size()) get_request->set_key(key);
{ RequestOp* req_failure = txn_request.add_failure();
call->prev_value = resp->reply.kvs(0); req_failure->set_allocated_request_range(get_request.release());
}
call->client = &grpcClient; std::unique_ptr<PutRequest> put_request(new PutRequest());
call->key = key; put_request->set_key(key);
put_request->set_value(value);
req_failure = txn_request.add_failure();
req_failure->set_allocated_request_put(put_request.release());
call->response_reader = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_); get_request.reset(new RangeRequest());
get_request->set_key(key);
req_failure = txn_request.add_failure();
req_failure->set_allocated_request_range(get_request.release());
//if success, put key and then get new value of key
put_request.reset(new PutRequest());
put_request->set_key(key);
put_request->set_value(value);
RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_put(put_request.release());
get_request.reset(new RangeRequest());
get_request->set_key(key);
req_success = txn_request.add_success();
req_success->set_allocated_request_range(get_request.release());
etcdv3::AsyncTxnResponse* call= new etcdv3::AsyncTxnResponse("set");
call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call); call->response_reader->Finish(&call->reply, &call->status, (void*)call);

View File

@ -7,14 +7,15 @@
TEST_CASE("setup") TEST_CASE("setup")
{ {
etcd::Client etcd("http://127.0.0.1:4001"); etcd::Client etcd("http://127.0.0.1:2379");
//etcd.rmdir("/test", true).wait(); //etcd.rmdir("/test", true).wait();
} }
TEST_CASE("add a new key") TEST_CASE("add a new key")
{ {
etcd::Client etcd("http://127.0.0.1:4001");
etcd::Client etcd("http://127.0.0.1:2379");
etcd::Response resp = etcd.add("/test/key1", "42").get(); etcd::Response resp = etcd.add("/test/key1", "42").get();
REQUIRE(0 == resp.error_code()); REQUIRE(0 == resp.error_code());
CHECK("create" == resp.action()); CHECK("create" == resp.action());
@ -33,7 +34,7 @@ TEST_CASE("add a new key")
TEST_CASE("read a value from etcd") TEST_CASE("read a value from etcd")
{ {
etcd::Client etcd("http://127.0.0.1:4001"); etcd::Client etcd("http://192.168.99.100:2379");
etcd::Response resp = etcd.get("/test/key1").get(); etcd::Response resp = etcd.get("/test/key1").get();
CHECK("get" == resp.action()); CHECK("get" == resp.action());
REQUIRE(resp.is_ok()); REQUIRE(resp.is_ok());
@ -46,7 +47,7 @@ TEST_CASE("read a value from etcd")
TEST_CASE("simplified read") TEST_CASE("simplified read")
{ {
etcd::Client etcd("http://127.0.0.1:4001"); etcd::Client etcd("http://127.0.0.1:2379");
CHECK("42" == etcd.get("/test/key1").get().value().as_string()); CHECK("42" == etcd.get("/test/key1").get().value().as_string());
CHECK(100 == etcd.get("/test/key2").get().error_code()); // Key not found CHECK(100 == etcd.get("/test/key2").get().error_code()); // Key not found
} }
@ -55,7 +56,7 @@ TEST_CASE("simplified read")
TEST_CASE("modify a key") TEST_CASE("modify a key")
{ {
etcd::Client etcd("http://127.0.0.1:4001"); etcd::Client etcd("http://127.0.0.1:2379");
etcd::Response resp = etcd.modify("/test/key1", "43").get(); etcd::Response resp = etcd.modify("/test/key1", "43").get();
REQUIRE(0 == resp.error_code()); // overwrite REQUIRE(0 == resp.error_code()); // overwrite
CHECK("update" == resp.action()); CHECK("update" == resp.action());
@ -66,7 +67,7 @@ TEST_CASE("modify a key")
TEST_CASE("set a key") TEST_CASE("set a key")
{ {
etcd::Client etcd("http://127.0.0.1:4001"); etcd::Client etcd("http://127.0.0.1:2379");
etcd::Response resp = etcd.set("/test/key1", "43").get(); etcd::Response resp = etcd.set("/test/key1", "43").get();
REQUIRE(0 == resp.error_code()); // overwrite REQUIRE(0 == resp.error_code()); // overwrite
CHECK("set" == resp.action()); CHECK("set" == resp.action());
@ -78,7 +79,7 @@ TEST_CASE("set a key")
TEST_CASE("atomic compare-and-swap") TEST_CASE("atomic compare-and-swap")
{ {
etcd::Client etcd("http://127.0.0.1:4001"); etcd::Client etcd("http://127.0.0.1:2379");
etcd.set("/test/key1", "42").wait(); etcd.set("/test/key1", "42").wait();
// modify success // modify success
@ -95,6 +96,7 @@ TEST_CASE("atomic compare-and-swap")
CHECK("Compare failed" == res.error_message()); CHECK("Compare failed" == res.error_message());
} }
TEST_CASE("delete a value") TEST_CASE("delete a value")
{ {
etcd::Client etcd("http://127.0.0.1:4001"); etcd::Client etcd("http://127.0.0.1:4001");
@ -278,3 +280,4 @@ TEST_CASE("cleanup")
} }
#endif #endif

View File

@ -1,36 +0,0 @@
#ifndef __ASYNC_PUTRESPONSE_HPP__
#define __ASYNC_PUTRESPONSE_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/V3Response.hpp"
#include "v3/include/grpcClient.hpp"
using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using etcdserverpb::PutResponse;
namespace etcdv3
{
class AsyncPutResponse : public etcdv3::V3Response
{
public:
AsyncPutResponse(){};
AsyncPutResponse(const std::string act){action = act;};
AsyncPutResponse(const AsyncPutResponse& other);
AsyncPutResponse& operator=(const AsyncPutResponse& other);
PutResponse reply;
Status status;
ClientContext context;
CompletionQueue cq_;
std::unique_ptr<ClientAsyncResponseReader<PutResponse>> response_reader;
AsyncPutResponse& ParseResponse();
etcdv3::grpcClient* client;
std::string key;
};
}
#endif

View File

@ -4,9 +4,6 @@
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h" #include "proto/rpc.grpc.pb.h"
#include "v3/include/V3Response.hpp" #include "v3/include/V3Response.hpp"
#include "v3/include/grpcClient.hpp"
using grpc::ClientAsyncResponseReader; using grpc::ClientAsyncResponseReader;
using grpc::ClientContext; using grpc::ClientContext;

View File

@ -1,15 +0,0 @@
#ifndef __UTILS_HPP__
#define __UTILS_HPP__
#include "v3/include/AsyncRangeResponse.hpp"
#include "v3/include/grpcClient.hpp"
namespace etcdv3
{
namespace Utils
{
etcdv3::AsyncRangeResponse* getKey(std::string const & key, etcdv3::grpcClient& client);
}
}
#endif

View File

@ -1,25 +0,0 @@
#ifndef __GRPC_CLIENT_HPP__
#define __GRPC_CLIENT_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/AsyncRangeResponse.hpp"
using grpc::Channel;
using etcdserverpb::PutRequest;
using etcdserverpb::RangeRequest;
using etcdserverpb::KV;
namespace etcdv3
{
class grpcClient
{
public:
grpcClient(std::string const & address);
std::unique_ptr<KV::Stub> stub_;
};
}
#endif

View File

@ -33,45 +33,58 @@ etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::operator=(const etcdv3::Asyn
etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::ParseResponse() etcdv3::AsyncTxnResponse& etcdv3::AsyncTxnResponse::ParseResponse()
{ {
if(action == "create") for(int index=0; index < reply.responses_size(); index++)
{ {
if(reply.succeeded()) auto resp = reply.responses(index);
if(ResponseOp::ResponseCase::kResponseRange == resp.response_case())
{ {
for(int index=0; index < reply.responses_size(); index++) if(resp.response_range().kvs_size())
{ {
auto resp = reply.responses(index); if(!resp.response_range().more())
if(ResponseOp::ResponseCase::kResponseRange == resp.response_case())
{ {
if(resp.response_range().kvs_size()) if(!values.empty())
{ {
if(!values.empty()) prev_value = values[0];
{
prev_value = values[0];
}
values.push_back(resp.response_range().kvs(0));
} }
else values.clear();
{ values.push_back(resp.response_range().kvs(0));
error_code=100;
error_message="Key not found";
}
}
else if(ResponseOp::ResponseCase::kResponsePut == resp.response_case())
{
//do nothing for now.
}
else
{
//do nothing for now.
} }
} }
else
{
error_code=100;
error_message="Key not found";
}
}
else if(ResponseOp::ResponseCase::kResponsePut == resp.response_case())
{
//do nothing for now.
} }
else else
{
//do nothing for now.
}
}
if(action == "create")
{
if(!reply.succeeded())
{ {
error_code=105; error_code=105;
error_message="Key already exists"; error_message="Key already exists";
} }
} }
else if(action == "compareAndSwap")
{
if(!reply.succeeded())
{
error_code=101;
error_message="Compare failed";
}
}
return *this; return *this;
} }

View File

@ -1,24 +0,0 @@
#include "v3/include/AsyncRangeResponse.hpp"
#include "proto/rpc.grpc.pb.h"
using etcdserverpb::RangeRequest;
#include "v3/include/Utils.hpp"
etcdv3::AsyncRangeResponse* etcdv3::Utils::getKey(std::string const & key, etcdv3::grpcClient& client)
{
RangeRequest get_request;
get_request.set_key(key);
etcdv3::AsyncRangeResponse* resp= new etcdv3::AsyncRangeResponse();
resp->status = client.stub_->Range(&resp->context, get_request, &resp->reply);
if(resp->status.ok())
{
return resp;
}
else
{
throw std::runtime_error(resp->status.error_message());
}
}

View File

@ -1,14 +0,0 @@
#include "v3/include/grpcClient.hpp"
etcdv3::grpcClient::grpcClient(std::string const & address)
{
std::string stripped_address(address);
std::string substr("http://");
std::string::size_type i = stripped_address.find(substr);
if(i != std::string::npos)
{
stripped_address.erase(i,substr.length());
}
std::shared_ptr<Channel> channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials());
stub_= KV::NewStub(channel);
}