From 6fb775218dea0c0d37e3c0fdd0969564bd068f1d Mon Sep 17 00:00:00 2001 From: lampayan Date: Thu, 9 Jun 2016 13:53:58 +0200 Subject: [PATCH] This is the complete implementation for delete functionalities. Index is now also supported notes/TODOs: 1) We should consider adding an algo for mod/update functions to consider BOTH creation index and modify index 2) Watch functionality --- etcd/Client.hpp | 6 ++-- etcd/Response.hpp | 1 + src/Client.cpp | 45 +++++++++++++++++++------ tst/EtcdTest.cpp | 85 +++++++++++++++++++---------------------------- 4 files changed, 74 insertions(+), 63 deletions(-) diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 538ab18..1dd6fdb 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -165,8 +165,10 @@ namespace etcd etcdv3::grpcClient grpcClient; private: - pplx::task removeEntryWithKey(std::string const &); - pplx::task removeEntryWithKeyAndValue(std::string const &, std::string const &); + pplx::task removeEntryWithKey(std::string const &entryKey); + pplx::task removeEntryWithKeyAndValue(std::string const &entryKey, std::string const &oldValue); + pplx::task removeEntryWithKeyAndIndex(std::string const &entryKey, int oldIndex); + }; diff --git a/etcd/Response.hpp b/etcd/Response.hpp index 1415003..612d311 100644 --- a/etcd/Response.hpp +++ b/etcd/Response.hpp @@ -45,6 +45,7 @@ namespace etcd { auto v3resp = call->ParseResponse(); resp = etcd::Response(v3resp); + resp._index = call->reply.header().revision(); } else { diff --git a/src/Client.cpp b/src/Client.cpp index 61f815b..ccabc05 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -54,7 +54,6 @@ pplx::task etcd::Client::modify_if(std::string const & key, std: return send_asyncmodify_if(key, value, old_value); } -//FBDL pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index) { web::http::uri_builder uri("/v2/keys" + key); @@ -131,12 +130,40 @@ pplx::task etcd::Client::rm_if(std::string const & key, std::str return removeEntryWithKeyAndValue(key, old_value); } +pplx::task etcd::Client::removeEntryWithKeyAndIndex(std::string const &entryKey, int oldIndex) { + + etcdv3::AsyncRangeResponse *searchResult = etcdv3::Utils::getKey(entryKey, grpcClient); + std::cout << "found revision in item is: " << searchResult->reply.kvs(0).create_revision() << std::endl; + + if(!searchResult->reply.kvs_size()) { + searchResult->error_code = 100; + searchResult->error_message = "Key not Found"; + return Response::createResponse(*searchResult); + } + else if(searchResult->reply.kvs(0).create_revision() != oldIndex) { + searchResult->error_code = 101; + searchResult->error_message = "Compare failed"; + return Response::createResponse(*searchResult); + } + + etcdserverpb::DeleteRangeRequest deleteRangeRequest; + deleteRangeRequest.set_key(entryKey); + + etcdv3::AsyncDelResponse *deleteResponseCall = new etcdv3::AsyncDelResponse("compareAndDelete"); + + deleteResponseCall->prev_value = searchResult->reply.kvs(0); + deleteResponseCall->client = &grpcClient; + deleteResponseCall->key = entryKey; + + deleteResponseCall->rpcInstance = grpcClient.stub_->AsyncDeleteRange(&deleteResponseCall->context, deleteRangeRequest, &deleteResponseCall->cq_); + deleteResponseCall->rpcInstance->Finish(&deleteResponseCall->deleteResponse, &deleteResponseCall->status, (void*)deleteResponseCall); + + return Response::createResponse(*deleteResponseCall); +} + pplx::task etcd::Client::rm_if(std::string const & key, int old_index) { - web::http::uri_builder uri("/v2/keys" + key); - uri.append_query("dir=false"); - uri.append_query("prevIndex", old_index); - return send_del_request(uri); + return removeEntryWithKeyAndIndex(key, old_index); } pplx::task etcd::Client::mkdir(std::string const & key) @@ -245,7 +272,7 @@ pplx::task etcd::Client::send_asyncmodify_if(std::string const & call->response_reader = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_); call->response_reader->Finish(&call->reply, &call->status, (void*)call); - + return Response::create(call); } @@ -276,7 +303,7 @@ pplx::task etcd::Client::send_asyncmodify(std::string const & ke call->response_reader = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_); call->response_reader->Finish(&call->reply, &call->status, (void*)call); - + return Response::create(call); } @@ -299,7 +326,6 @@ pplx::task etcd::Client::send_asyncget(std::string const & key) pplx::task etcd::Client::send_asyncput(std::string const & key, std::string const & value) { - PutRequest put_request; put_request.set_key(key); put_request.set_value(value); @@ -316,11 +342,10 @@ pplx::task etcd::Client::send_asyncput(std::string const & key, call->client = &grpcClient; call->key = key; - call->response_reader = grpcClient.stub_->AsyncPut(&call->context,put_request,&call->cq_); call->response_reader->Finish(&call->reply, &call->status, (void*)call); - + return Response::create(call); } diff --git a/tst/EtcdTest.cpp b/tst/EtcdTest.cpp index f4d8b3b..25c3fde 100644 --- a/tst/EtcdTest.cpp +++ b/tst/EtcdTest.cpp @@ -64,7 +64,6 @@ TEST_CASE("modify a key") } - TEST_CASE("set a key") { etcd::Client etcd("http://127.0.0.1:4001"); @@ -120,29 +119,10 @@ TEST_CASE("atomic compare-and-delete based on prevValue") CHECK("42" == res.prev_value().as_string()); } -TEST_CASE("atomic compare-and-delete based on prevValue checking index") -{ - etcd::Client etcd("http://127.0.0.1:4001"); - std::cout << "index: " << etcd.set("/test/key1", "42").get().index() << std::endl; - - etcd::Response res = etcd.rm_if("/test/key1", "43").get(); - CHECK(!res.is_ok()); - CHECK(101 == res.error_code()); - CHECK("Compare failed" == res.error_message()); - - res = etcd.rm_if("/test/key1", "42").get(); - REQUIRE(res.is_ok()); - CHECK("compareAndDelete" == res.action()); - CHECK("42" == res.prev_value().as_string()); -} - -#if 0 - TEST_CASE("atomic compare-and-delete based on prevIndex") { etcd::Client etcd("http://127.0.0.1:4001"); int index = etcd.set("/test/key1", "42").get().index(); - std::cout << "index of old: " << index << std::endl; etcd::Response res = etcd.rm_if("/test/key1", index - 1).get(); CHECK(!res.is_ok()); @@ -155,6 +135,40 @@ TEST_CASE("atomic compare-and-delete based on prevIndex") CHECK("42" == res.prev_value().as_string()); } +#if 0 + +TEST_CASE("deep atomic compare-and-swap") +{ + etcd::Client etcd("http://127.0.0.1:4001"); + etcd.set("/test/key1", "42").wait(); + + // modify success + etcd::Response res = etcd.modify_if("/test/key1", "43", "42").get(); + int index = res.index(); + std::cout << "index to use: " << index << std::endl; + REQUIRE(res.is_ok()); + CHECK("compareAndSwap" == res.action()); + CHECK("43" == res.value().as_string()); + + // modify fails the second time + res = etcd.modify_if("/test/key1", "44", "42").get(); + CHECK(!res.is_ok()); + CHECK(101 == res.error_code()); + CHECK("Compare failed" == res.error_message()); + + // succes with the correct index + res = etcd.modify_if("/test/key1", "44", index).get(); + REQUIRE(res.is_ok()); + CHECK("compareAndSwap" == res.action()); + CHECK("44" == res.value().as_string()); + + // index changes so second modify fails + res = etcd.modify_if("/test/key1", "45", index).get(); + CHECK(!res.is_ok()); + CHECK(101 == res.error_code()); + CHECK("Compare failed" == res.error_message()); +} + TEST_CASE("create a directory") { etcd::Client etcd("http://127.0.0.1:4001"); @@ -258,37 +272,6 @@ TEST_CASE("watch changes in the past") CHECK("45" == res.value().as_string()); } -TEST_CASE("atomic compare-and-swap") -{ - etcd::Client etcd("http://127.0.0.1:4001"); - etcd.set("/test/key1", "42").wait(); - - // modify success - etcd::Response res = etcd.modify_if("/test/key1", "43", "42").get(); - int index = res.index(); - REQUIRE(res.is_ok()); - CHECK("compareAndSwap" == res.action()); - CHECK("43" == res.value().as_string()); - - // modify fails the second time - res = etcd.modify_if("/test/key1", "44", "42").get(); - CHECK(!res.is_ok()); - CHECK(101 == res.error_code()); - CHECK("Compare failed" == res.error_message()); - - // succes with the correct index - res = etcd.modify_if("/test/key1", "44", index).get(); - REQUIRE(res.is_ok()); - CHECK("compareAndSwap" == res.action()); - CHECK("44" == res.value().as_string()); - - // index changes so second modify fails - res = etcd.modify_if("/test/key1", "45", index).get(); - CHECK(!res.is_ok()); - CHECK(101 == res.error_code()); - CHECK("Compare failed" == res.error_message()); -} - TEST_CASE("cleanup") { etcd::Client etcd("http://127.0.0.1:4001");