merged branches for ease of update later on.

cleaned up rm, implemented rm_if methods!
updated tests

remaining todos:
rm and modif with indexes (find out where is X-ETCD-Index)
watch functionality
This commit is contained in:
lampayan 2016-06-09 11:21:42 +02:00
parent 2721e39e12
commit 008693a276
11 changed files with 239 additions and 151 deletions

View File

@ -54,12 +54,6 @@ namespace etcd
*/
pplx::task<Response> set(std::string const & key, std::string const & value);
/**
* FBDL temporary set and get items to etcd v3
*/
void setv3(std::string const&, std::string const&);
void getv3(std::string const&);
/**
* Creates a new key and sets it's value. Fails if the key already exists.
* @param key is the key to be created
@ -155,7 +149,6 @@ namespace etcd
pplx::task<Response> send_get_request(web::http::uri_builder & uri);
pplx::task<Response> send_del_request(web::http::uri_builder & uri);
pplx::task<Response> send_put_request(web::http::uri_builder & uri, std::string const & key, std::string const & value);
pplx::task<Response> removeEntry(std::string const &);
web::http::client::http_client client;
@ -172,7 +165,8 @@ namespace etcd
etcdv3::grpcClient grpcClient;
private:
void getEntryForPreviousValue(const std::string& entryKey, etcd::AsyncDeleteResponse* drp);
pplx::task<Response> removeEntryWithKey(std::string const &);
pplx::task<Response> removeEntryWithKeyAndValue(std::string const &, std::string const &);
};

View File

@ -11,6 +11,8 @@
#include "v3/include/V3Response.hpp"
#include <grpc++/grpc++.h>
#include <iostream>
namespace etcd
{
typedef std::vector<std::string> Keys;
@ -23,35 +25,6 @@ namespace etcd
public:
static pplx::task<Response> create(pplx::task<web::http::http_response> response_task);
template<typename T>static pplx::task<etcd::Response> createV2Response(T call)
{
return pplx::task<etcd::Response>([call]()
{
void* got_tag;
bool ok = false;
etcd::Response resp;
//blocking
call->cq_.Next(&got_tag, &ok);
GPR_ASSERT(got_tag == (void*)call);
GPR_ASSERT(ok);
T call = static_cast<T>(got_tag);
if(call->status.ok())
{
// auto v3resp = call->ParseResponse();
resp = *call;// stripping off instead of creating a new response class object
}
else
{
throw std::runtime_error(call->status.error_message());
}
delete call; //todo:make this a smart pointer
return resp;
});
};
static pplx::task<Response> createResponse(const etcdv3::V3Response& response);
template<typename T>static pplx::task<etcd::Response> create(T call)

View File

@ -1,4 +1,4 @@
add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/Utils.cpp ../v3/src/grpcClient.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncPutResponse.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp BaseResponse.cpp AsyncDeleteResponse.cpp)
add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/Utils.cpp ../v3/src/grpcClient.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncPutResponse.cpp ../v3/src/V3BaseResponse.cpp ../v3/src/AsyncDelResponse.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp BaseResponse.cpp AsyncDeleteResponse.cpp)
set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11)
target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++)

View File

@ -1,6 +1,7 @@
#include "etcd/Client.hpp"
#include "v3/include/AsyncRangeResponse.hpp"
#include "v3/include/AsyncPutResponse.hpp"
#include "v3/include/AsyncDelResponse.hpp"
#include "v3/include/Utils.hpp"
#include <iostream>
@ -38,25 +39,6 @@ pplx::task<etcd::Response> etcd::Client::set(std::string const & key, std::strin
return send_asyncput(key,value);
}
//TODO: a temporary set, until set version 3 is implemented
void etcd::Client::setv3(std::string const &key, std::string const &value)
{
etcdserverpb::PutRequest putRequest;
putRequest.set_key(key);
putRequest.set_value(value);
etcdserverpb::PutResponse putResponse;
grpc::ClientContext context;
grpc::Status status = stub_->Put(&context, putRequest, &putResponse);
if(status.ok()){
std::cout << "put OK" << std::endl;
}
else {
std::cout << "put NOK" << std::endl;
}
}
pplx::task<etcd::Response> etcd::Client::add(std::string const & key, std::string const & value)
{
return send_asyncadd(key,value);
@ -80,76 +62,73 @@ pplx::task<etcd::Response> etcd::Client::modify_if(std::string const & key, std:
return send_put_request(uri, "value", value);
}
void etcd::Client::getEntryForPreviousValue(const std::string& entryKey, etcd::AsyncDeleteResponse* drp)
//note: this one seems to not need the parseResponse() method
pplx::task<etcd::Response> etcd::Client::removeEntryWithKey(std::string const & entryKey) {
etcdv3::AsyncRangeResponse* resp = etcdv3::Utils::getKey(entryKey, grpcClient);
if(!resp->reply.kvs_size())
{
std::cout << "get entry for previous value" << std::endl;
etcdserverpb::RangeRequest rangeRequest;
rangeRequest.set_key(entryKey);
etcdserverpb::RangeResponse rangeResponse;
grpc::ClientContext context;
grpc::Status status = grpcClient.stub_->Range(&context, rangeRequest, &rangeResponse);
if (status.ok()) {
std::cout << "get OK" << std::endl;
drp->fillUpV2ResponseValues(rangeResponse);
} else {
std::cout << "get NOK" << std::endl;
std::cout << "nothing to delete" << std::endl;
resp->error_code = 100;
resp->error_message = "Nothing to delete";
return Response::createResponse(*resp);
}
}
pplx::task<etcd::Response> etcd::Client::removeEntry(std::string const & entryKey) {
etcd::AsyncDeleteResponse *drp = new etcd::AsyncDeleteResponse();
getEntryForPreviousValue(entryKey, drp); //TODO: failure case scenario handling
etcdserverpb::DeleteRangeRequest deleteRangeRequest;
deleteRangeRequest.set_key(entryKey);
drp->rpcInstance = grpcClient.stub_->AsyncDeleteRange(&drp->context, deleteRangeRequest, &drp->cq_);
drp->rpcInstance->Finish(&drp->deleteResponse, &drp->status, (void*)drp);
etcdv3::AsyncDelResponse* call = new etcdv3::AsyncDelResponse("delete");
return Response::createV2Response(drp);
}
//mano-mano
call->prev_value = resp->reply.kvs(0);
call->client = &grpcClient;
call->key = entryKey;
call->rpcInstance = grpcClient.stub_->AsyncDeleteRange(&call->context, deleteRangeRequest, &call->cq_);
call->rpcInstance->Finish(&call->deleteResponse, &call->status, (void*)call);
void etcd::Client::getv3(std::string const & key) {
std::cout<<"blocking call for get rpc " << key << std::endl;
etcdserverpb::RangeRequest rangeRequest;
rangeRequest.set_key(key);
etcdserverpb::RangeResponse rangeResponse;
grpc::ClientContext context;
grpc::Status status = stub_->Range(&context, rangeRequest, &rangeResponse);
std::cout << "checking status" << std::endl;
if(status.ok()) {
std::cout << "get OK" << std::endl;
std::cout << "size: " << rangeResponse.kvs_size() << std::endl;
std::cout << "kvs 0 key: " << rangeResponse.kvs(0).key() << std::endl;
std::cout << "kvs 0 value: " << rangeResponse.kvs(0).value() << std::endl;
std::cout << "kvs.Get 0 value: " << rangeResponse.kvs().Get(0).value() << std::endl;
AsyncDeleteResponse drp;
drp.fillUpV2ResponseValues(rangeResponse);
}
else {
std::cout << "get NOK" << std::endl;
}
return Response::createResponse(*call);
}
pplx::task<etcd::Response> etcd::Client::rm(std::string const & key)
{
std::cout << "rm called" << std::endl;
return removeEntry(key);
return removeEntryWithKey(key);
}
pplx::task<etcd::Response> etcd::Client::removeEntryWithKeyAndValue(std::string const &entryKey, std::string const &oldValue) {
etcdv3::AsyncRangeResponse *searchResult = etcdv3::Utils::getKey(entryKey, grpcClient);
if(!searchResult->reply.kvs_size()) {
searchResult->error_code = 100;
searchResult->error_message = "Key not Found";
return Response::createResponse(*searchResult);
}
else if(searchResult->reply.kvs(0).value() != oldValue) {
searchResult->error_code = 101;
searchResult->error_message = "Compare failed";
return Response::createResponse(*searchResult);
}
etcdserverpb::DeleteRangeRequest deleteRangeRequest;
deleteRangeRequest.set_key(entryKey);
etcdv3::AsyncDelResponse *deleteResponseCall = new etcdv3::AsyncDelResponse("compareAndDelete");
deleteResponseCall->prev_value = searchResult->reply.kvs(0);
deleteResponseCall->client = &grpcClient;
deleteResponseCall->key = entryKey;
deleteResponseCall->rpcInstance = grpcClient.stub_->AsyncDeleteRange(&deleteResponseCall->context, deleteRangeRequest, &deleteResponseCall->cq_);
deleteResponseCall->rpcInstance->Finish(&deleteResponseCall->deleteResponse, &deleteResponseCall->status, (void*)deleteResponseCall);
return Response::createResponse(*deleteResponseCall);
}
pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, std::string const & old_value)
{
web::http::uri_builder uri("/v2/keys" + key);
uri.append_query("dir=false");
uri.append_query("prevValue", old_value);
return send_del_request(uri);
return removeEntryWithKeyAndValue(key, old_value);
}
pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, int old_index)
@ -201,10 +180,6 @@ pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, int from
return send_get_request(uri);
}
pplx::task<etcd::Response> etcd::Client::send_asyncadd(std::string const & key, std::string const & value)
{

View File

@ -98,17 +98,63 @@ TEST_CASE("atomic compare-and-swap")
TEST_CASE("delete a value")
{
std::cout << "delete a value fbdl" << std::endl;
etcd::Client etcd("http://127.0.0.1:4001");
// CHECK(3 == etcd.ls("/test").get().keys().size()); // not supported in v3
etcd::Response resp = etcd.rm("/test/key1").get();
CHECK("43" == resp.prev_value().as_string());
CHECK("delete" == resp.action());
// CHECK(2 == etcd.ls("/test").get().keys().size()); // not supported in v3
}
TEST_CASE("atomic compare-and-delete based on prevValue")
{
etcd::Client etcd("http://127.0.0.1:4001");
etcd.set("/test/key1", "42").wait();
etcd::Response res = etcd.rm_if("/test/key1", "43").get();
CHECK(!res.is_ok());
CHECK(101 == res.error_code());
CHECK("Compare failed" == res.error_message());
res = etcd.rm_if("/test/key1", "42").get();
REQUIRE(res.is_ok());
CHECK("compareAndDelete" == res.action());
CHECK("42" == res.prev_value().as_string());
}
TEST_CASE("atomic compare-and-delete based on prevValue checking index")
{
etcd::Client etcd("http://127.0.0.1:4001");
std::cout << "index: " << etcd.set("/test/key1", "42").get().index() << std::endl;
etcd::Response res = etcd.rm_if("/test/key1", "43").get();
CHECK(!res.is_ok());
CHECK(101 == res.error_code());
CHECK("Compare failed" == res.error_message());
res = etcd.rm_if("/test/key1", "42").get();
REQUIRE(res.is_ok());
CHECK("compareAndDelete" == res.action());
CHECK("42" == res.prev_value().as_string());
}
#if 0
TEST_CASE("atomic compare-and-delete based on prevIndex")
{
etcd::Client etcd("http://127.0.0.1:4001");
int index = etcd.set("/test/key1", "42").get().index();
std::cout << "index of old: " << index << std::endl;
etcd::Response res = etcd.rm_if("/test/key1", index - 1).get();
CHECK(!res.is_ok());
CHECK(101 == res.error_code());
CHECK("Compare failed" == res.error_message());
res = etcd.rm_if("/test/key1", index).get();
REQUIRE(res.is_ok());
CHECK("compareAndDelete" == res.action());
CHECK("42" == res.prev_value().as_string());
}
TEST_CASE("create a directory")
{
etcd::Client etcd("http://127.0.0.1:4001");
@ -243,38 +289,6 @@ TEST_CASE("atomic compare-and-swap")
CHECK("Compare failed" == res.error_message());
}
TEST_CASE("atomic compare-and-delete based on prevValue")
{
etcd::Client etcd("http://127.0.0.1:4001");
etcd.set("/test/key1", "42").wait();
etcd::Response res = etcd.rm_if("/test/key1", "43").get();
CHECK(!res.is_ok());
CHECK(101 == res.error_code());
CHECK("Compare failed" == res.error_message());
res = etcd.rm_if("/test/key1", "42").get();
REQUIRE(res.is_ok());
CHECK("compareAndDelete" == res.action());
CHECK("42" == res.prev_value().as_string());
}
TEST_CASE("atomic compare-and-delete based on prevIndex")
{
etcd::Client etcd("http://127.0.0.1:4001");
int index = etcd.set("/test/key1", "42").get().index();
etcd::Response res = etcd.rm_if("/test/key1", index - 1).get();
CHECK(!res.is_ok());
CHECK(101 == res.error_code());
CHECK("Compare failed" == res.error_message());
res = etcd.rm_if("/test/key1", index).get();
REQUIRE(res.is_ok());
CHECK("compareAndDelete" == res.action());
CHECK("42" == res.prev_value().as_string());
}
TEST_CASE("cleanup")
{
etcd::Client etcd("http://127.0.0.1:4001");

View File

@ -0,0 +1,33 @@
/*
* AsyncDelResponse.h
*
* Created on: Jun 8, 2016
* Author: ubuntu
*/
#ifndef V3_SRC_ASYNCDELRESPONSE_HPP_
#define V3_SRC_ASYNCDELRESPONSE_HPP_
#include "v3/include/V3Response.hpp"
#include "v3/include/V3BaseResponse.hpp"
#include "v3/include/grpcClient.hpp"
namespace etcdv3 {
class AsyncDelResponse : public etcdv3::V3Response, public etcdv3::V3BaseResponse {
public:
AsyncDelResponse(){action="delete";};
AsyncDelResponse(std::string const &);
AsyncDelResponse(const AsyncDelResponse&);
AsyncDelResponse& operator=(const AsyncDelResponse&);
virtual ~AsyncDelResponse();
etcdserverpb::DeleteRangeResponse deleteResponse;
std::unique_ptr<grpc::ClientAsyncResponseReader<etcdserverpb::DeleteRangeResponse>> rpcInstance;
AsyncDelResponse& ParseResponse();
etcdv3::grpcClient* client;
std::string key;
};
} /* namespace etcdv3 */
#endif /* V3_SRC_ASYNCDELRESPONSE_HPP_ */

View File

@ -21,6 +21,7 @@ namespace etcdv3
AsyncRangeResponse(const AsyncRangeResponse& other);
AsyncRangeResponse& operator=(const AsyncRangeResponse& other);
RangeResponse reply;
etcdserverpb::PutResponse r;
Status status;
ClientContext context;
CompletionQueue cq_;

View File

@ -0,0 +1,29 @@
/*
* V3BaseResponse.h
*
* Created on: Jun 8, 2016
* Author: ubuntu
*/
#ifndef V3_SRC_V3BASERESPONSE_H_
#define V3_SRC_V3BASERESPONSE_H_
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "proto/kv.pb.h"
//TODO: make into abstract class
namespace etcdv3 {
class V3BaseResponse {
public:
V3BaseResponse();
virtual ~V3BaseResponse();
grpc::Status status;
grpc::ClientContext context;
grpc::CompletionQueue cq_;
// type& parseResponse()=0; //a possible candidate to make this abstract
};
} /* namespace etcdv3 */
#endif /* V3_SRC_V3BASERESPONSE_H_ */

View File

@ -0,0 +1,48 @@
/*
* AsyncDelResponse.cpp
*
* Created on: Jun 8, 2016
* Author: ubuntu
*/
#include "v3/include/AsyncDelResponse.hpp"
#include "v3/include/Utils.hpp"
etcdv3::AsyncDelResponse::AsyncDelResponse(std::string const &inputAction) {
action = inputAction;
}
etcdv3::AsyncDelResponse::AsyncDelResponse(const etcdv3::AsyncDelResponse& other)
{
error_code = other.error_code;
error_message = other.error_message;
index = other.index;
action = other.action;
values = other.values;
}
etcdv3::AsyncDelResponse& etcdv3::AsyncDelResponse::operator=(const etcdv3::AsyncDelResponse& other){
error_code = other.error_code;
error_message = other.error_message;
index = other.index;
action = other.action;
values = other.values;
return *this;
}
etcdv3::AsyncDelResponse::~AsyncDelResponse(){
}
//TODO: unused
etcdv3::AsyncDelResponse& etcdv3::AsyncDelResponse::ParseResponse(){
action = "delete";
// etcdv3::AsyncRangeResponse* resp = etcdv3::Utils::getKey(key, *client);
// if(resp->reply.kvs_size())
// {
// values.push_back(resp->reply.kvs(0));
// }
return *this;
}

21
v3/src/V3BaseResponse.cpp Normal file
View File

@ -0,0 +1,21 @@
/*
* V3BaseResponse.cpp
*
* Created on: Jun 8, 2016
* Author: ubuntu
*/
#include "v3/include/V3BaseResponse.hpp"
namespace etcdv3 {
V3BaseResponse::V3BaseResponse() {
// TODO Auto-generated constructor stub
}
V3BaseResponse::~V3BaseResponse() {
// TODO Auto-generated destructor stub
}
} /* namespace etcdv3 */