diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 1dd6fdb..2d8f7ed 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -168,7 +168,7 @@ private: pplx::task removeEntryWithKey(std::string const &entryKey); pplx::task removeEntryWithKeyAndValue(std::string const &entryKey, std::string const &oldValue); pplx::task removeEntryWithKeyAndIndex(std::string const &entryKey, int oldIndex); - + pplx::task modifyEntryWithValueAndOldIndex(std::string const & key, std::string const & value, int old_index); }; diff --git a/etcd/Response.hpp b/etcd/Response.hpp index 612d311..1415003 100644 --- a/etcd/Response.hpp +++ b/etcd/Response.hpp @@ -45,7 +45,6 @@ namespace etcd { auto v3resp = call->ParseResponse(); resp = etcd::Response(v3resp); - resp._index = call->reply.header().revision(); } else { diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index c060bd8..916ab07 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/Utils.cpp ../v3/src/grpcClient.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncPutResponse.cpp ../v3/src/V3BaseResponse.cpp ../v3/src/AsyncDelResponse.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp BaseResponse.cpp AsyncDeleteResponse.cpp) +add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/Utils.cpp ../v3/src/grpcClient.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncPutResponse.cpp ../v3/src/V3BaseResponse.cpp ../v3/src/AsyncDelResponse.cpp ../v3/src/AsyncModifyResponse.cpp Client.cpp Response.cpp Value.cpp json_constants.cpp BaseResponse.cpp AsyncDeleteResponse.cpp) set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11) target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++) diff --git a/src/Client.cpp b/src/Client.cpp index ccabc05..0b04cff 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -2,6 +2,7 @@ #include "v3/include/AsyncRangeResponse.hpp" #include "v3/include/AsyncPutResponse.hpp" #include "v3/include/AsyncDelResponse.hpp" +#include "v3/include/AsyncModifyResponse.hpp" #include "v3/include/Utils.hpp" #include @@ -54,11 +55,44 @@ pplx::task etcd::Client::modify_if(std::string const & key, std: return send_asyncmodify_if(key, value, old_value); } +pplx::task etcd::Client::modifyEntryWithValueAndOldIndex(std::string const & key, std::string const & value, int old_index) { + + + etcdv3::AsyncRangeResponse* resp = etcdv3::Utils::getKey(key, grpcClient); + if(!resp->reply.kvs_size()) + { + resp->error_code=100; + resp->error_message="Key not found"; + return Response::createResponse(*resp); + } + else if(resp->reply.kvs(0).mod_revision() != old_index) + { + resp->error_code = 101; + resp->error_message = "Compare failed"; + return Response::createResponse(*resp); + } + + PutRequest put_request; + put_request.set_key(key); + put_request.set_value(value); + + etcdv3::AsyncModifyResponse* call= new etcdv3::AsyncModifyResponse("compareAndSwap"); + + //below 2 lines can be removed once we are able to use Txn + call->prev_value = resp->reply.kvs(0); + call->client = &grpcClient; + call->key = key; + + call->rpcInstance = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_); + + call->rpcInstance->Finish(&call->putResponse, &call->status, (void*)call); + + return Response::create(call); +} + pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index) { - web::http::uri_builder uri("/v2/keys" + key); - uri.append_query("prevIndex", old_index); - return send_put_request(uri, "value", value); + return modifyEntryWithValueAndOldIndex(key, value, old_index); } //note: this one seems to not need the parseResponse() method @@ -133,7 +167,6 @@ pplx::task etcd::Client::rm_if(std::string const & key, std::str pplx::task etcd::Client::removeEntryWithKeyAndIndex(std::string const &entryKey, int oldIndex) { etcdv3::AsyncRangeResponse *searchResult = etcdv3::Utils::getKey(entryKey, grpcClient); - std::cout << "found revision in item is: " << searchResult->reply.kvs(0).create_revision() << std::endl; if(!searchResult->reply.kvs_size()) { searchResult->error_code = 100; @@ -262,16 +295,16 @@ pplx::task etcd::Client::send_asyncmodify_if(std::string const & put_request.set_key(key); put_request.set_value(value); - etcdv3::AsyncPutResponse* call= new etcdv3::AsyncPutResponse("compareAndSwap"); + etcdv3::AsyncModifyResponse* call= new etcdv3::AsyncModifyResponse("compareAndSwap"); //below 2 lines can be removed once we are able to use Txn call->prev_value = resp->reply.kvs(0); call->client = &grpcClient; call->key = key; - call->response_reader = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_); + call->rpcInstance = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_); - call->response_reader->Finish(&call->reply, &call->status, (void*)call); + call->rpcInstance->Finish(&call->putResponse, &call->status, (void*)call); return Response::create(call); @@ -293,16 +326,16 @@ pplx::task etcd::Client::send_asyncmodify(std::string const & ke put_request.set_key(key); put_request.set_value(value); - etcdv3::AsyncPutResponse* call= new etcdv3::AsyncPutResponse("update"); + etcdv3::AsyncModifyResponse* call= new etcdv3::AsyncModifyResponse("update"); //below 2 lines can be removed once we are able to use Txn call->prev_value = resp->reply.kvs(0); call->client = &grpcClient; call->key = key; - call->response_reader = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_); + call->rpcInstance = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_); - call->response_reader->Finish(&call->reply, &call->status, (void*)call); + call->rpcInstance->Finish(&call->putResponse, &call->status, (void*)call); return Response::create(call); diff --git a/tst/EtcdTest.cpp b/tst/EtcdTest.cpp index 25c3fde..a6d1f7a 100644 --- a/tst/EtcdTest.cpp +++ b/tst/EtcdTest.cpp @@ -135,8 +135,6 @@ TEST_CASE("atomic compare-and-delete based on prevIndex") CHECK("42" == res.prev_value().as_string()); } -#if 0 - TEST_CASE("deep atomic compare-and-swap") { etcd::Client etcd("http://127.0.0.1:4001"); @@ -145,7 +143,6 @@ TEST_CASE("deep atomic compare-and-swap") // modify success etcd::Response res = etcd.modify_if("/test/key1", "43", "42").get(); int index = res.index(); - std::cout << "index to use: " << index << std::endl; REQUIRE(res.is_ok()); CHECK("compareAndSwap" == res.action()); CHECK("43" == res.value().as_string()); @@ -169,6 +166,8 @@ TEST_CASE("deep atomic compare-and-swap") CHECK("Compare failed" == res.error_message()); } +#if 0 + TEST_CASE("create a directory") { etcd::Client etcd("http://127.0.0.1:4001"); diff --git a/v3/include/AsyncModifyResponse.hpp b/v3/include/AsyncModifyResponse.hpp new file mode 100644 index 0000000..9cd30d1 --- /dev/null +++ b/v3/include/AsyncModifyResponse.hpp @@ -0,0 +1,33 @@ +/* + * AsyncModifyResponse.h + * + * Created on: Jun 9, 2016 + * Author: ubuntu + */ + +#ifndef V3_SRC_ASYNCMODIFYRESPONSE_HPP_ +#define V3_SRC_ASYNCMODIFYRESPONSE_HPP_ + +#include "v3/include/V3Response.hpp" +#include "v3/include/V3BaseResponse.hpp" +#include "v3/include/grpcClient.hpp" + +namespace etcdv3 { + +class AsyncModifyResponse : public etcdv3::V3Response, public etcdv3::V3BaseResponse { +public: + AsyncModifyResponse(){action="compareAndSwap";}; + AsyncModifyResponse(std::string const &); + AsyncModifyResponse(const AsyncModifyResponse&); + AsyncModifyResponse& operator=(const AsyncModifyResponse&); + virtual ~AsyncModifyResponse(); + + etcdserverpb::PutResponse putResponse; + std::unique_ptr> rpcInstance; + AsyncModifyResponse& ParseResponse(); + etcdv3::grpcClient* client; + std::string key; +}; +} /* namespace etcdv3 */ + +#endif /* V3_SRC_ASYNCMODIFYRESPONSE_HPP_ */ diff --git a/v3/src/AsyncModifyResponse.cpp b/v3/src/AsyncModifyResponse.cpp new file mode 100644 index 0000000..39fd61f --- /dev/null +++ b/v3/src/AsyncModifyResponse.cpp @@ -0,0 +1,60 @@ +/* + * AsyncModifyResponse.cpp + * + * Created on: Jun 9, 2016 + * Author: ubuntu + */ + +#include "v3/include/AsyncModifyResponse.hpp" +#include "v3/include/Utils.hpp" + +namespace etcdv3 { + +etcdv3::AsyncModifyResponse::AsyncModifyResponse(const etcdv3::AsyncModifyResponse& other) { + error_code = other.error_code; + error_message = other.error_message; + index = other.index; + action = other.action; + values = other.values; + prev_value.set_key(other.prev_value.key()); + prev_value.set_value(other.prev_value.value()); + prev_value.set_create_revision(other.prev_value.create_revision()); + prev_value.set_mod_revision(other.prev_value.mod_revision()); +} + +etcdv3::AsyncModifyResponse::AsyncModifyResponse(const std::string &input) { + action = input; +} + +etcdv3::AsyncModifyResponse& etcdv3::AsyncModifyResponse::operator=(const etcdv3::AsyncModifyResponse& other) { + error_code = other.error_code; + error_message = other.error_message; + index = other.index; + action = other.action; + values = other.values; + prev_value.set_key(other.prev_value.key()); + prev_value.set_value(other.prev_value.value()); + prev_value.set_create_revision(other.prev_value.create_revision()); + prev_value.set_mod_revision(other.prev_value.mod_revision()); + return *this; +} + +AsyncModifyResponse::~AsyncModifyResponse() { + // TODO Auto-generated destructor stub +} + +etcdv3::AsyncModifyResponse& etcdv3::AsyncModifyResponse::ParseResponse() { + etcdv3::AsyncRangeResponse* response = etcdv3::Utils::getKey(key, *client); + if(response->reply.kvs_size()) + { + values.push_back(response->reply.kvs(0)); + index = response->reply.kvs(0).mod_revision(); + } + else{ + index = response->reply.header().revision(); + } + + return *this; +} + +} /* namespace etcdv3 */ diff --git a/v3/src/AsyncPutResponse.cpp b/v3/src/AsyncPutResponse.cpp index 708ee1a..13e7612 100644 --- a/v3/src/AsyncPutResponse.cpp +++ b/v3/src/AsyncPutResponse.cpp @@ -1,6 +1,8 @@ #include "v3/include/AsyncPutResponse.hpp" #include "v3/include/Utils.hpp" +#include + using etcdserverpb::PutRequest; using etcdserverpb::PutRequest; @@ -38,7 +40,10 @@ etcdv3::AsyncPutResponse& etcdv3::AsyncPutResponse::ParseResponse() if(resp->reply.kvs_size()) { values.push_back(resp->reply.kvs(0)); + index = resp->reply.kvs(0).create_revision(); } - + else + index = resp->reply.header().revision(); + return *this; } diff --git a/v3/src/AsyncRangeResponse.cpp b/v3/src/AsyncRangeResponse.cpp index dfef4b4..3aeeb82 100644 --- a/v3/src/AsyncRangeResponse.cpp +++ b/v3/src/AsyncRangeResponse.cpp @@ -42,7 +42,7 @@ etcdv3::AsyncRangeResponse& etcdv3::AsyncRangeResponse::ParseResponse() error_code=100; error_message="Key not found"; } - + index = reply.header().revision(); return *this; }