From 008693a276d0a86e28cf1adc49509920ba7a5eff Mon Sep 17 00:00:00 2001 From: lampayan Date: Thu, 9 Jun 2016 11:21:42 +0200 Subject: [PATCH] merged branches for ease of update later on. cleaned up rm, implemented rm_if methods! updated tests remaining todos: rm and modif with indexes (find out where is X-ETCD-Index) watch functionality --- etcd/Client.hpp | 10 +-- etcd/Response.hpp | 31 +------ src/CMakeLists.txt | 2 +- src/Client.cpp | 129 ++++++++++++------------------ tst/EtcdTest.cpp | 84 +++++++++++-------- v3/include/AsyncDelResponse.hpp | 33 ++++++++ v3/include/AsyncRangeResponse.hpp | 1 + v3/include/V3BaseResponse.hpp | 29 +++++++ v3/src/AsyncDelResponse.cpp | 48 +++++++++++ v3/src/AsyncRangeResponse.cpp | 2 +- v3/src/V3BaseResponse.cpp | 21 +++++ 11 files changed, 239 insertions(+), 151 deletions(-) create mode 100644 v3/include/AsyncDelResponse.hpp create mode 100644 v3/include/V3BaseResponse.hpp create mode 100644 v3/src/AsyncDelResponse.cpp create mode 100644 v3/src/V3BaseResponse.cpp diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 7d3aa05..538ab18 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -54,12 +54,6 @@ namespace etcd */ pplx::task set(std::string const & key, std::string const & value); - /** - * FBDL temporary set and get items to etcd v3 - */ - void setv3(std::string const&, std::string const&); - void getv3(std::string const&); - /** * Creates a new key and sets it's value. Fails if the key already exists. * @param key is the key to be created @@ -155,7 +149,6 @@ namespace etcd pplx::task send_get_request(web::http::uri_builder & uri); pplx::task send_del_request(web::http::uri_builder & uri); pplx::task send_put_request(web::http::uri_builder & uri, std::string const & key, std::string const & value); - pplx::task removeEntry(std::string const &); web::http::client::http_client client; @@ -172,7 +165,8 @@ namespace etcd etcdv3::grpcClient grpcClient; private: - void getEntryForPreviousValue(const std::string& entryKey, etcd::AsyncDeleteResponse* drp); + pplx::task removeEntryWithKey(std::string const &); + pplx::task removeEntryWithKeyAndValue(std::string const &, std::string const &); }; diff --git a/etcd/Response.hpp b/etcd/Response.hpp index 968893b..1415003 100644 --- a/etcd/Response.hpp +++ b/etcd/Response.hpp @@ -11,6 +11,8 @@ #include "v3/include/V3Response.hpp" #include +#include + namespace etcd { typedef std::vector Keys; @@ -23,35 +25,6 @@ namespace etcd public: static pplx::task create(pplx::task response_task); - templatestatic pplx::task createV2Response(T call) - { - return pplx::task([call]() - { - void* got_tag; - bool ok = false; - etcd::Response resp; - - //blocking - call->cq_.Next(&got_tag, &ok); - GPR_ASSERT(got_tag == (void*)call); - GPR_ASSERT(ok); - - T call = static_cast(got_tag); - if(call->status.ok()) - { -// auto v3resp = call->ParseResponse(); - resp = *call;// stripping off instead of creating a new response class object - } - else - { - throw std::runtime_error(call->status.error_message()); - } - - delete call; //todo:make this a smart pointer - return resp; - }); - }; - static pplx::task createResponse(const etcdv3::V3Response& response); templatestatic pplx::task create(T call) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 9397c84..c060bd8 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/Utils.cpp ../v3/src/grpcClient.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncPutResponse.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp BaseResponse.cpp AsyncDeleteResponse.cpp) +add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/Utils.cpp ../v3/src/grpcClient.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncPutResponse.cpp ../v3/src/V3BaseResponse.cpp ../v3/src/AsyncDelResponse.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp BaseResponse.cpp AsyncDeleteResponse.cpp) set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11) target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++) diff --git a/src/Client.cpp b/src/Client.cpp index e6e86a3..61f815b 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -1,6 +1,7 @@ #include "etcd/Client.hpp" #include "v3/include/AsyncRangeResponse.hpp" #include "v3/include/AsyncPutResponse.hpp" +#include "v3/include/AsyncDelResponse.hpp" #include "v3/include/Utils.hpp" #include @@ -38,25 +39,6 @@ pplx::task etcd::Client::set(std::string const & key, std::strin return send_asyncput(key,value); } -//TODO: a temporary set, until set version 3 is implemented -void etcd::Client::setv3(std::string const &key, std::string const &value) -{ - etcdserverpb::PutRequest putRequest; - putRequest.set_key(key); - putRequest.set_value(value); - - etcdserverpb::PutResponse putResponse; - grpc::ClientContext context; - grpc::Status status = stub_->Put(&context, putRequest, &putResponse); - - if(status.ok()){ - std::cout << "put OK" << std::endl; - } - else { - std::cout << "put NOK" << std::endl; - } -} - pplx::task etcd::Client::add(std::string const & key, std::string const & value) { return send_asyncadd(key,value); @@ -80,76 +62,73 @@ pplx::task etcd::Client::modify_if(std::string const & key, std: return send_put_request(uri, "value", value); } -void etcd::Client::getEntryForPreviousValue(const std::string& entryKey, etcd::AsyncDeleteResponse* drp) -{ - std::cout << "get entry for previous value" << std::endl; - etcdserverpb::RangeRequest rangeRequest; - rangeRequest.set_key(entryKey); - etcdserverpb::RangeResponse rangeResponse; - grpc::ClientContext context; - grpc::Status status = grpcClient.stub_->Range(&context, rangeRequest, &rangeResponse); - if (status.ok()) { - std::cout << "get OK" << std::endl; - drp->fillUpV2ResponseValues(rangeResponse); - } else { - std::cout << "get NOK" << std::endl; +//note: this one seems to not need the parseResponse() method +pplx::task etcd::Client::removeEntryWithKey(std::string const & entryKey) { + + etcdv3::AsyncRangeResponse* resp = etcdv3::Utils::getKey(entryKey, grpcClient); + + if(!resp->reply.kvs_size()) + { + std::cout << "nothing to delete" << std::endl; + resp->error_code = 100; + resp->error_message = "Nothing to delete"; + return Response::createResponse(*resp); } -} - -pplx::task etcd::Client::removeEntry(std::string const & entryKey) { - - etcd::AsyncDeleteResponse *drp = new etcd::AsyncDeleteResponse(); - getEntryForPreviousValue(entryKey, drp); //TODO: failure case scenario handling etcdserverpb::DeleteRangeRequest deleteRangeRequest; deleteRangeRequest.set_key(entryKey); - drp->rpcInstance = grpcClient.stub_->AsyncDeleteRange(&drp->context, deleteRangeRequest, &drp->cq_); - drp->rpcInstance->Finish(&drp->deleteResponse, &drp->status, (void*)drp); + etcdv3::AsyncDelResponse* call = new etcdv3::AsyncDelResponse("delete"); - return Response::createV2Response(drp); -} + //mano-mano + call->prev_value = resp->reply.kvs(0); + call->client = &grpcClient; + call->key = entryKey; + call->rpcInstance = grpcClient.stub_->AsyncDeleteRange(&call->context, deleteRangeRequest, &call->cq_); + call->rpcInstance->Finish(&call->deleteResponse, &call->status, (void*)call); - -void etcd::Client::getv3(std::string const & key) { - std::cout<<"blocking call for get rpc " << key << std::endl; - etcdserverpb::RangeRequest rangeRequest; - rangeRequest.set_key(key); - - etcdserverpb::RangeResponse rangeResponse; - grpc::ClientContext context; - - grpc::Status status = stub_->Range(&context, rangeRequest, &rangeResponse); - - std::cout << "checking status" << std::endl; - if(status.ok()) { - std::cout << "get OK" << std::endl; - std::cout << "size: " << rangeResponse.kvs_size() << std::endl; - std::cout << "kvs 0 key: " << rangeResponse.kvs(0).key() << std::endl; - std::cout << "kvs 0 value: " << rangeResponse.kvs(0).value() << std::endl; - std::cout << "kvs.Get 0 value: " << rangeResponse.kvs().Get(0).value() << std::endl; - - AsyncDeleteResponse drp; - drp.fillUpV2ResponseValues(rangeResponse); - } - else { - std::cout << "get NOK" << std::endl; - } + return Response::createResponse(*call); } pplx::task etcd::Client::rm(std::string const & key) { - std::cout << "rm called" << std::endl; - return removeEntry(key); + return removeEntryWithKey(key); +} + +pplx::task etcd::Client::removeEntryWithKeyAndValue(std::string const &entryKey, std::string const &oldValue) { + + etcdv3::AsyncRangeResponse *searchResult = etcdv3::Utils::getKey(entryKey, grpcClient); + + if(!searchResult->reply.kvs_size()) { + searchResult->error_code = 100; + searchResult->error_message = "Key not Found"; + return Response::createResponse(*searchResult); + } + else if(searchResult->reply.kvs(0).value() != oldValue) { + searchResult->error_code = 101; + searchResult->error_message = "Compare failed"; + return Response::createResponse(*searchResult); + } + + etcdserverpb::DeleteRangeRequest deleteRangeRequest; + deleteRangeRequest.set_key(entryKey); + + etcdv3::AsyncDelResponse *deleteResponseCall = new etcdv3::AsyncDelResponse("compareAndDelete"); + + deleteResponseCall->prev_value = searchResult->reply.kvs(0); + deleteResponseCall->client = &grpcClient; + deleteResponseCall->key = entryKey; + + deleteResponseCall->rpcInstance = grpcClient.stub_->AsyncDeleteRange(&deleteResponseCall->context, deleteRangeRequest, &deleteResponseCall->cq_); + deleteResponseCall->rpcInstance->Finish(&deleteResponseCall->deleteResponse, &deleteResponseCall->status, (void*)deleteResponseCall); + + return Response::createResponse(*deleteResponseCall); } pplx::task etcd::Client::rm_if(std::string const & key, std::string const & old_value) { - web::http::uri_builder uri("/v2/keys" + key); - uri.append_query("dir=false"); - uri.append_query("prevValue", old_value); - return send_del_request(uri); + return removeEntryWithKeyAndValue(key, old_value); } pplx::task etcd::Client::rm_if(std::string const & key, int old_index) @@ -201,10 +180,6 @@ pplx::task etcd::Client::watch(std::string const & key, int from return send_get_request(uri); } - - - - pplx::task etcd::Client::send_asyncadd(std::string const & key, std::string const & value) { diff --git a/tst/EtcdTest.cpp b/tst/EtcdTest.cpp index 69f59fd..f4d8b3b 100644 --- a/tst/EtcdTest.cpp +++ b/tst/EtcdTest.cpp @@ -98,17 +98,63 @@ TEST_CASE("atomic compare-and-swap") TEST_CASE("delete a value") { - std::cout << "delete a value fbdl" << std::endl; etcd::Client etcd("http://127.0.0.1:4001"); -// CHECK(3 == etcd.ls("/test").get().keys().size()); // not supported in v3 etcd::Response resp = etcd.rm("/test/key1").get(); CHECK("43" == resp.prev_value().as_string()); CHECK("delete" == resp.action()); -// CHECK(2 == etcd.ls("/test").get().keys().size()); // not supported in v3 +} + +TEST_CASE("atomic compare-and-delete based on prevValue") +{ + etcd::Client etcd("http://127.0.0.1:4001"); + etcd.set("/test/key1", "42").wait(); + + etcd::Response res = etcd.rm_if("/test/key1", "43").get(); + CHECK(!res.is_ok()); + CHECK(101 == res.error_code()); + CHECK("Compare failed" == res.error_message()); + + res = etcd.rm_if("/test/key1", "42").get(); + REQUIRE(res.is_ok()); + CHECK("compareAndDelete" == res.action()); + CHECK("42" == res.prev_value().as_string()); +} + +TEST_CASE("atomic compare-and-delete based on prevValue checking index") +{ + etcd::Client etcd("http://127.0.0.1:4001"); + std::cout << "index: " << etcd.set("/test/key1", "42").get().index() << std::endl; + + etcd::Response res = etcd.rm_if("/test/key1", "43").get(); + CHECK(!res.is_ok()); + CHECK(101 == res.error_code()); + CHECK("Compare failed" == res.error_message()); + + res = etcd.rm_if("/test/key1", "42").get(); + REQUIRE(res.is_ok()); + CHECK("compareAndDelete" == res.action()); + CHECK("42" == res.prev_value().as_string()); } #if 0 +TEST_CASE("atomic compare-and-delete based on prevIndex") +{ + etcd::Client etcd("http://127.0.0.1:4001"); + int index = etcd.set("/test/key1", "42").get().index(); + std::cout << "index of old: " << index << std::endl; + + etcd::Response res = etcd.rm_if("/test/key1", index - 1).get(); + CHECK(!res.is_ok()); + CHECK(101 == res.error_code()); + CHECK("Compare failed" == res.error_message()); + + res = etcd.rm_if("/test/key1", index).get(); + REQUIRE(res.is_ok()); + CHECK("compareAndDelete" == res.action()); + CHECK("42" == res.prev_value().as_string()); +} + TEST_CASE("create a directory") { etcd::Client etcd("http://127.0.0.1:4001"); @@ -243,38 +289,6 @@ TEST_CASE("atomic compare-and-swap") CHECK("Compare failed" == res.error_message()); } -TEST_CASE("atomic compare-and-delete based on prevValue") -{ - etcd::Client etcd("http://127.0.0.1:4001"); - etcd.set("/test/key1", "42").wait(); - - etcd::Response res = etcd.rm_if("/test/key1", "43").get(); - CHECK(!res.is_ok()); - CHECK(101 == res.error_code()); - CHECK("Compare failed" == res.error_message()); - - res = etcd.rm_if("/test/key1", "42").get(); - REQUIRE(res.is_ok()); - CHECK("compareAndDelete" == res.action()); - CHECK("42" == res.prev_value().as_string()); -} - -TEST_CASE("atomic compare-and-delete based on prevIndex") -{ - etcd::Client etcd("http://127.0.0.1:4001"); - int index = etcd.set("/test/key1", "42").get().index(); - - etcd::Response res = etcd.rm_if("/test/key1", index - 1).get(); - CHECK(!res.is_ok()); - CHECK(101 == res.error_code()); - CHECK("Compare failed" == res.error_message()); - - res = etcd.rm_if("/test/key1", index).get(); - REQUIRE(res.is_ok()); - CHECK("compareAndDelete" == res.action()); - CHECK("42" == res.prev_value().as_string()); -} - TEST_CASE("cleanup") { etcd::Client etcd("http://127.0.0.1:4001"); diff --git a/v3/include/AsyncDelResponse.hpp b/v3/include/AsyncDelResponse.hpp new file mode 100644 index 0000000..efaf2ad --- /dev/null +++ b/v3/include/AsyncDelResponse.hpp @@ -0,0 +1,33 @@ +/* + * AsyncDelResponse.h + * + * Created on: Jun 8, 2016 + * Author: ubuntu + */ + +#ifndef V3_SRC_ASYNCDELRESPONSE_HPP_ +#define V3_SRC_ASYNCDELRESPONSE_HPP_ + +#include "v3/include/V3Response.hpp" +#include "v3/include/V3BaseResponse.hpp" +#include "v3/include/grpcClient.hpp" + +namespace etcdv3 { + +class AsyncDelResponse : public etcdv3::V3Response, public etcdv3::V3BaseResponse { +public: + AsyncDelResponse(){action="delete";}; + AsyncDelResponse(std::string const &); + AsyncDelResponse(const AsyncDelResponse&); + AsyncDelResponse& operator=(const AsyncDelResponse&); + virtual ~AsyncDelResponse(); + + etcdserverpb::DeleteRangeResponse deleteResponse; + std::unique_ptr> rpcInstance; + AsyncDelResponse& ParseResponse(); + etcdv3::grpcClient* client; + std::string key; +}; +} /* namespace etcdv3 */ + +#endif /* V3_SRC_ASYNCDELRESPONSE_HPP_ */ diff --git a/v3/include/AsyncRangeResponse.hpp b/v3/include/AsyncRangeResponse.hpp index a0fec36..f225909 100644 --- a/v3/include/AsyncRangeResponse.hpp +++ b/v3/include/AsyncRangeResponse.hpp @@ -21,6 +21,7 @@ namespace etcdv3 AsyncRangeResponse(const AsyncRangeResponse& other); AsyncRangeResponse& operator=(const AsyncRangeResponse& other); RangeResponse reply; + etcdserverpb::PutResponse r; Status status; ClientContext context; CompletionQueue cq_; diff --git a/v3/include/V3BaseResponse.hpp b/v3/include/V3BaseResponse.hpp new file mode 100644 index 0000000..a2d9a8a --- /dev/null +++ b/v3/include/V3BaseResponse.hpp @@ -0,0 +1,29 @@ +/* + * V3BaseResponse.h + * + * Created on: Jun 8, 2016 + * Author: ubuntu + */ + +#ifndef V3_SRC_V3BASERESPONSE_H_ +#define V3_SRC_V3BASERESPONSE_H_ + +#include +#include "proto/rpc.grpc.pb.h" +#include "proto/kv.pb.h" + +//TODO: make into abstract class +namespace etcdv3 { +class V3BaseResponse { +public: + V3BaseResponse(); + virtual ~V3BaseResponse(); + + grpc::Status status; + grpc::ClientContext context; + grpc::CompletionQueue cq_; + // type& parseResponse()=0; //a possible candidate to make this abstract +}; +} /* namespace etcdv3 */ + +#endif /* V3_SRC_V3BASERESPONSE_H_ */ diff --git a/v3/src/AsyncDelResponse.cpp b/v3/src/AsyncDelResponse.cpp new file mode 100644 index 0000000..1cb86c3 --- /dev/null +++ b/v3/src/AsyncDelResponse.cpp @@ -0,0 +1,48 @@ +/* + * AsyncDelResponse.cpp + * + * Created on: Jun 8, 2016 + * Author: ubuntu + */ + +#include "v3/include/AsyncDelResponse.hpp" +#include "v3/include/Utils.hpp" + +etcdv3::AsyncDelResponse::AsyncDelResponse(std::string const &inputAction) { + action = inputAction; +} + +etcdv3::AsyncDelResponse::AsyncDelResponse(const etcdv3::AsyncDelResponse& other) +{ + error_code = other.error_code; + error_message = other.error_message; + index = other.index; + action = other.action; + values = other.values; +} + +etcdv3::AsyncDelResponse& etcdv3::AsyncDelResponse::operator=(const etcdv3::AsyncDelResponse& other){ + error_code = other.error_code; + error_message = other.error_message; + index = other.index; + action = other.action; + values = other.values; + return *this; +} + +etcdv3::AsyncDelResponse::~AsyncDelResponse(){ +} + + +//TODO: unused +etcdv3::AsyncDelResponse& etcdv3::AsyncDelResponse::ParseResponse(){ + action = "delete"; + +// etcdv3::AsyncRangeResponse* resp = etcdv3::Utils::getKey(key, *client); +// if(resp->reply.kvs_size()) +// { +// values.push_back(resp->reply.kvs(0)); +// } + + return *this; +} diff --git a/v3/src/AsyncRangeResponse.cpp b/v3/src/AsyncRangeResponse.cpp index daf644a..dfef4b4 100644 --- a/v3/src/AsyncRangeResponse.cpp +++ b/v3/src/AsyncRangeResponse.cpp @@ -22,7 +22,7 @@ etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::operator=(const etcdv3:: etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::ParseResponse() { action = "get"; - + if(reply.kvs_size()) { if(reply.more()) diff --git a/v3/src/V3BaseResponse.cpp b/v3/src/V3BaseResponse.cpp new file mode 100644 index 0000000..32a0c1c --- /dev/null +++ b/v3/src/V3BaseResponse.cpp @@ -0,0 +1,21 @@ +/* + * V3BaseResponse.cpp + * + * Created on: Jun 8, 2016 + * Author: ubuntu + */ + +#include "v3/include/V3BaseResponse.hpp" + +namespace etcdv3 { + +V3BaseResponse::V3BaseResponse() { + // TODO Auto-generated constructor stub + +} + +V3BaseResponse::~V3BaseResponse() { + // TODO Auto-generated destructor stub +} + +} /* namespace etcdv3 */