From c28d955b22bc35edf9b492ee996d5a11b5e2960e Mon Sep 17 00:00:00 2001 From: lampayan Date: Thu, 9 Jun 2016 16:57:25 +0200 Subject: [PATCH] Complete implementation for modify with index and delete with index functionalities. AsyncModifyResponse is needed because handling of indeces are different between create and mod and delete, as per requirements. remaining TODOs: 1) Watch functionality --- etcd/Client.hpp | 2 +- etcd/Response.hpp | 1 - src/CMakeLists.txt | 2 +- src/Client.cpp | 53 +++++++++++++++++++++----- tst/EtcdTest.cpp | 5 +-- v3/include/AsyncModifyResponse.hpp | 33 ++++++++++++++++ v3/src/AsyncModifyResponse.cpp | 60 ++++++++++++++++++++++++++++++ v3/src/AsyncPutResponse.cpp | 7 +++- v3/src/AsyncRangeResponse.cpp | 2 +- 9 files changed, 147 insertions(+), 18 deletions(-) create mode 100644 v3/include/AsyncModifyResponse.hpp create mode 100644 v3/src/AsyncModifyResponse.cpp diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 1dd6fdb..2d8f7ed 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -168,7 +168,7 @@ private: pplx::task removeEntryWithKey(std::string const &entryKey); pplx::task removeEntryWithKeyAndValue(std::string const &entryKey, std::string const &oldValue); pplx::task removeEntryWithKeyAndIndex(std::string const &entryKey, int oldIndex); - + pplx::task modifyEntryWithValueAndOldIndex(std::string const & key, std::string const & value, int old_index); }; diff --git a/etcd/Response.hpp b/etcd/Response.hpp index 612d311..1415003 100644 --- a/etcd/Response.hpp +++ b/etcd/Response.hpp @@ -45,7 +45,6 @@ namespace etcd { auto v3resp = call->ParseResponse(); resp = etcd::Response(v3resp); - resp._index = call->reply.header().revision(); } else { diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index c060bd8..916ab07 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/Utils.cpp ../v3/src/grpcClient.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncPutResponse.cpp ../v3/src/V3BaseResponse.cpp ../v3/src/AsyncDelResponse.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp BaseResponse.cpp AsyncDeleteResponse.cpp) +add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/Utils.cpp ../v3/src/grpcClient.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncPutResponse.cpp ../v3/src/V3BaseResponse.cpp ../v3/src/AsyncDelResponse.cpp ../v3/src/AsyncModifyResponse.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp BaseResponse.cpp AsyncDeleteResponse.cpp) set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11) target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++) diff --git a/src/Client.cpp b/src/Client.cpp index ccabc05..0b04cff 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -2,6 +2,7 @@ #include "v3/include/AsyncRangeResponse.hpp" #include "v3/include/AsyncPutResponse.hpp" #include "v3/include/AsyncDelResponse.hpp" +#include "v3/include/AsyncModifyResponse.hpp" #include "v3/include/Utils.hpp" #include @@ -54,11 +55,44 @@ pplx::task etcd::Client::modify_if(std::string const & key, std: return send_asyncmodify_if(key, value, old_value); } +pplx::task etcd::Client::modifyEntryWithValueAndOldIndex(std::string const & key, std::string const & value, int old_index) { + + + etcdv3::AsyncRangeResponse* resp = etcdv3::Utils::getKey(key, grpcClient); + if(!resp->reply.kvs_size()) + { + resp->error_code=100; + resp->error_message="Key not found"; + return Response::createResponse(*resp); + } + else if(resp->reply.kvs(0).mod_revision() != old_index) + { + resp->error_code = 101; + resp->error_message = "Compare failed"; + return Response::createResponse(*resp); + } + + PutRequest put_request; + put_request.set_key(key); + put_request.set_value(value); + + etcdv3::AsyncModifyResponse* call= new etcdv3::AsyncModifyResponse("compareAndSwap"); + + //below 2 lines can be removed once we are able to use Txn + call->prev_value = resp->reply.kvs(0); + call->client = &grpcClient; + call->key = key; + + call->rpcInstance = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_); + + call->rpcInstance->Finish(&call->putResponse, &call->status, (void*)call); + + return Response::create(call); +} + pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index) { - web::http::uri_builder uri("/v2/keys" + key); - uri.append_query("prevIndex", old_index); - return send_put_request(uri, "value", value); + return modifyEntryWithValueAndOldIndex(key, value, old_index); } //note: this one seems to not need the parseResponse() method @@ -133,7 +167,6 @@ pplx::task etcd::Client::rm_if(std::string const & key, std::str pplx::task etcd::Client::removeEntryWithKeyAndIndex(std::string const &entryKey, int oldIndex) { etcdv3::AsyncRangeResponse *searchResult = etcdv3::Utils::getKey(entryKey, grpcClient); - std::cout << "found revision in item is: " << searchResult->reply.kvs(0).create_revision() << std::endl; if(!searchResult->reply.kvs_size()) { searchResult->error_code = 100; @@ -262,16 +295,16 @@ pplx::task etcd::Client::send_asyncmodify_if(std::string const & put_request.set_key(key); put_request.set_value(value); - etcdv3::AsyncPutResponse* call= new etcdv3::AsyncPutResponse("compareAndSwap"); + etcdv3::AsyncModifyResponse* call= new etcdv3::AsyncModifyResponse("compareAndSwap"); //below 2 lines can be removed once we are able to use Txn call->prev_value = resp->reply.kvs(0); call->client = &grpcClient; call->key = key; - call->response_reader = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_); + call->rpcInstance = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_); - call->response_reader->Finish(&call->reply, &call->status, (void*)call); + call->rpcInstance->Finish(&call->putResponse, &call->status, (void*)call); return Response::create(call); @@ -293,16 +326,16 @@ pplx::task etcd::Client::send_asyncmodify(std::string const & ke put_request.set_key(key); put_request.set_value(value); - etcdv3::AsyncPutResponse* call= new etcdv3::AsyncPutResponse("update"); + etcdv3::AsyncModifyResponse* call= new etcdv3::AsyncModifyResponse("update"); //below 2 lines can be removed once we are able to use Txn call->prev_value = resp->reply.kvs(0); call->client = &grpcClient; call->key = key; - call->response_reader = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_); + call->rpcInstance = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_); - call->response_reader->Finish(&call->reply, &call->status, (void*)call); + call->rpcInstance->Finish(&call->putResponse, &call->status, (void*)call); return Response::create(call); diff --git a/tst/EtcdTest.cpp b/tst/EtcdTest.cpp index 25c3fde..a6d1f7a 100644 --- a/tst/EtcdTest.cpp +++ b/tst/EtcdTest.cpp @@ -135,8 +135,6 @@ TEST_CASE("atomic compare-and-delete based on prevIndex") CHECK("42" == res.prev_value().as_string()); } -#if 0 - TEST_CASE("deep atomic compare-and-swap") { etcd::Client etcd("http://127.0.0.1:4001"); @@ -145,7 +143,6 @@ TEST_CASE("deep atomic compare-and-swap") // modify success etcd::Response res = etcd.modify_if("/test/key1", "43", "42").get(); int index = res.index(); - std::cout << "index to use: " << index << std::endl; REQUIRE(res.is_ok()); CHECK("compareAndSwap" == res.action()); CHECK("43" == res.value().as_string()); @@ -169,6 +166,8 @@ TEST_CASE("deep atomic compare-and-swap") CHECK("Compare failed" == res.error_message()); } +#if 0 + TEST_CASE("create a directory") { etcd::Client etcd("http://127.0.0.1:4001"); diff --git a/v3/include/AsyncModifyResponse.hpp b/v3/include/AsyncModifyResponse.hpp new file mode 100644 index 0000000..9cd30d1 --- /dev/null +++ b/v3/include/AsyncModifyResponse.hpp @@ -0,0 +1,33 @@ +/* + * AsyncModifyResponse.h + * + * Created on: Jun 9, 2016 + * Author: ubuntu + */ + +#ifndef V3_SRC_ASYNCMODIFYRESPONSE_HPP_ +#define V3_SRC_ASYNCMODIFYRESPONSE_HPP_ + +#include "v3/include/V3Response.hpp" +#include "v3/include/V3BaseResponse.hpp" +#include "v3/include/grpcClient.hpp" + +namespace etcdv3 { + +class AsyncModifyResponse : public etcdv3::V3Response, public etcdv3::V3BaseResponse { +public: + AsyncModifyResponse(){action="compareAndSwap";}; + AsyncModifyResponse(std::string const &); + AsyncModifyResponse(const AsyncModifyResponse&); + AsyncModifyResponse& operator=(const AsyncModifyResponse&); + virtual ~AsyncModifyResponse(); + + etcdserverpb::PutResponse putResponse; + std::unique_ptr> rpcInstance; + AsyncModifyResponse& ParseResponse(); + etcdv3::grpcClient* client; + std::string key; +}; +} /* namespace etcdv3 */ + +#endif /* V3_SRC_ASYNCMODIFYRESPONSE_HPP_ */ diff --git a/v3/src/AsyncModifyResponse.cpp b/v3/src/AsyncModifyResponse.cpp new file mode 100644 index 0000000..39fd61f --- /dev/null +++ b/v3/src/AsyncModifyResponse.cpp @@ -0,0 +1,60 @@ +/* + * AsyncModifyResponse.cpp + * + * Created on: Jun 9, 2016 + * Author: ubuntu + */ + +#include "v3/include/AsyncModifyResponse.hpp" +#include "v3/include/Utils.hpp" + +namespace etcdv3 { + +etcdv3::AsyncModifyResponse::AsyncModifyResponse(const etcdv3::AsyncModifyResponse& other) { + error_code = other.error_code; + error_message = other.error_message; + index = other.index; + action = other.action; + values = other.values; + prev_value.set_key(other.prev_value.key()); + prev_value.set_value(other.prev_value.value()); + prev_value.set_create_revision(other.prev_value.create_revision()); + prev_value.set_mod_revision(other.prev_value.mod_revision()); +} + +etcdv3::AsyncModifyResponse::AsyncModifyResponse(const std::string &input) { + action = input; +} + +etcdv3::AsyncModifyResponse& etcdv3::AsyncModifyResponse::operator=(const etcdv3::AsyncModifyResponse& other) { + error_code = other.error_code; + error_message = other.error_message; + index = other.index; + action = other.action; + values = other.values; + prev_value.set_key(other.prev_value.key()); + prev_value.set_value(other.prev_value.value()); + prev_value.set_create_revision(other.prev_value.create_revision()); + prev_value.set_mod_revision(other.prev_value.mod_revision()); + return *this; +} + +AsyncModifyResponse::~AsyncModifyResponse() { + // TODO Auto-generated destructor stub +} + +etcdv3::AsyncModifyResponse& etcdv3::AsyncModifyResponse::ParseResponse() { + etcdv3::AsyncRangeResponse* response = etcdv3::Utils::getKey(key, *client); + if(response->reply.kvs_size()) + { + values.push_back(response->reply.kvs(0)); + index = response->reply.kvs(0).mod_revision(); + } + else{ + index = response->reply.header().revision(); + } + + return *this; +} + +} /* namespace etcdv3 */ diff --git a/v3/src/AsyncPutResponse.cpp b/v3/src/AsyncPutResponse.cpp index 708ee1a..13e7612 100644 --- a/v3/src/AsyncPutResponse.cpp +++ b/v3/src/AsyncPutResponse.cpp @@ -1,6 +1,8 @@ #include "v3/include/AsyncPutResponse.hpp" #include "v3/include/Utils.hpp" +#include + using etcdserverpb::PutRequest; using etcdserverpb::PutRequest; @@ -38,7 +40,10 @@ etcdv3::AsyncPutResponse& etcdv3::AsyncPutResponse::ParseResponse() if(resp->reply.kvs_size()) { values.push_back(resp->reply.kvs(0)); + index = resp->reply.kvs(0).create_revision(); } - + else + index = resp->reply.header().revision(); + return *this; } diff --git a/v3/src/AsyncRangeResponse.cpp b/v3/src/AsyncRangeResponse.cpp index dfef4b4..3aeeb82 100644 --- a/v3/src/AsyncRangeResponse.cpp +++ b/v3/src/AsyncRangeResponse.cpp @@ -42,7 +42,7 @@ etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::ParseResponse() error_code=100; error_message="Key not found"; } - + index = reply.header().revision(); return *this; }