From 425a9c83796f32f8f46a81dd376f5fdb1aeabe9f Mon Sep 17 00:00:00 2001 From: Tao He Date: Thu, 1 Apr 2021 14:34:34 +0800 Subject: [PATCH] List/delete/watch on exact range. Signed-off-by: Tao He --- etcd/Client.hpp | 67 ++++++++++++++++++++++++++++ etcd/SyncClient.hpp | 8 ++++ etcd/v3/Action.hpp | 5 +++ src/Client.cpp | 74 ++++++++++++++++++++++++++++++- src/SyncClient.cpp | 40 +++++++++++++++++ src/v3/Action.cpp | 16 +++++++ src/v3/AsyncDeleteAction.cpp | 14 +++--- src/v3/AsyncGetAction.cpp | 29 ++++-------- src/v3/AsyncWatchAction.cpp | 12 +++-- tst/EtcdTest.cpp | 85 +++++++++++++++++++++++++++++++++--- 10 files changed, 314 insertions(+), 36 deletions(-) diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 6f72144..bb5f5bc 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -224,6 +224,26 @@ namespace etcd */ pplx::task ls(std::string const & key, size_t const limit); + /** + * Gets a directory listing of the directory identified by the key and range_end, i.e., get + * all keys in the range [key, range_end). + * + * @param key is the key to be listed + * @param range_end is the end of key range to be listed + */ + pplx::task ls(std::string const & key, std::string const &range_end); + + + /** + * Gets a directory listing of the directory identified by the key and range_end, i.e., get + * all keys in the range [key, range_end). + * + * @param key is the key to be listed + * @param range_end is the end of key range to be listed + * @param limit is the size limit of results to be listed, we don't use default parameters + * to ensure backwards binary compatibility. + */ + pplx::task ls(std::string const & key, std::string const &range_end, size_t const limit); /** * Removes a directory node. Fails if the parent directory dos not exists or not a directory. @@ -232,6 +252,24 @@ namespace etcd */ pplx::task rmdir(std::string const & key, bool recursive = false); + /** + * Removes multiple keys between [key, range_end). + * + * This overload for `const char *` is to avoid const char * to bool implicit casting. + * + * @param key is the directory to be created to be listed + * @param range_end is the end of key range to be removed. + */ + pplx::task rmdir(std::string const & key, const char *range_end); + + /** + * Removes multiple keys between [key, range_end). + * + * @param key is the directory to be created to be listed + * @param range_end is the end of key range to be removed. + */ + pplx::task rmdir(std::string const & key, std::string const &range_end); + /** * Watches for changes of a key or a subtree. Please note that if you watch e.g. "/testdir" and * a new key is created, like "/testdir/newkey" then no change happened in the value of @@ -250,6 +288,35 @@ namespace etcd */ pplx::task watch(std::string const & key, int fromIndex, bool recursive = false); + /** + * Watches for changes of a range of keys inside [key, range_end). + * + * This overload for `const char *` is to avoid const char * to bool implicit casting. + * + * @param key is the value or directory to be watched + * @param range_end is the end of key range to be removed. + */ + pplx::task watch(std::string const & key, const char *range_end); + + /** + * Watches for changes of a range of keys inside [key, range_end). + * + * @param key is the value or directory to be watched + * @param range_end is the end of key range to be removed. + */ + pplx::task watch(std::string const & key, std::string const &range_end); + + /** + * Watches for changes of a range of keys inside [key, range_end) from a specific index. The index value + * can be in the "past". + * + * Watches for changes of a key or a subtree from a specific index. The index value can be in the "past". + * @param key is the value or directory to be watched + * @param range_end is the end of key range to be removed. + * @param fromIndex the first index we are interested in + */ + pplx::task watch(std::string const & key, std::string const &range_end, int fromIndex); + /** * Grants a lease. * @param ttl is the time to live of the lease diff --git a/etcd/SyncClient.hpp b/etcd/SyncClient.hpp index a90dc2e..8750755 100644 --- a/etcd/SyncClient.hpp +++ b/etcd/SyncClient.hpp @@ -55,8 +55,13 @@ namespace etcd Response rm_if(std::string const & key, std::string const & old_value); Response rm_if(std::string const & key, int old_index); Response ls(std::string const & key); + Response ls(std::string const & key, size_t const limit); + Response ls(std::string const & key, std::string const &range_end); + Response ls(std::string const & key, std::string const &range_end, size_t const limit); Response mkdir(std::string const & key, int ttl = 0); Response rmdir(std::string const & key, bool recursive = false); + Response rmdir(std::string const & key, const char *range_end); + Response rmdir(std::string const & key, std::string const &range_end); Response leasegrant(int ttl); Response leaserevoke(int64_t lease_id); Response leasetimetolive(int64_t lease_id); @@ -70,6 +75,8 @@ namespace etcd * @param recursive if true watch a whole subtree */ Response watch(std::string const & key, bool recursive = false); + Response watch(std::string const & key, const char *range_end); + Response watch(std::string const & key, std::string const &range_end); /** * Watches for changes of a key or a subtree from a specific index. The index value can be in the "past". @@ -78,6 +85,7 @@ namespace etcd * @param recursive if true watch a whole subtree */ Response watch(std::string const & key, int fromIndex, bool recursive = false); + Response watch(std::string const & key, std::string const &range_end, int fromIndex); protected: Client client; diff --git a/etcd/v3/Action.hpp b/etcd/v3/Action.hpp index 3c8f8f2..1550203 100644 --- a/etcd/v3/Action.hpp +++ b/etcd/v3/Action.hpp @@ -34,6 +34,7 @@ namespace etcdv3 int ttl; int limit; std::string key; + std::string range_end; std::string value; std::string old_value; std::string auth_token; @@ -56,5 +57,9 @@ namespace etcdv3 etcdv3::ActionParameters parameters; std::chrono::high_resolution_clock::time_point start_timepoint; }; + + namespace detail { + std::string string_plus_one(std::string const &value); + } } #endif diff --git a/src/Client.cpp b/src/Client.cpp index c62218b..76c86cb 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -485,7 +485,6 @@ pplx::task etcd::Client::rm(std::string const & key) return Response::create(call); } - pplx::task etcd::Client::rm_if(std::string const & key, std::string const & old_value) { etcdv3::ActionParameters params; @@ -520,6 +519,23 @@ pplx::task etcd::Client::rmdir(std::string const & key, bool rec return Response::create(call); } +pplx::task etcd::Client::rmdir(std::string const & key, const char *range_end) +{ + return rmdir(key, std::string(range_end)); +} + +pplx::task etcd::Client::rmdir(std::string const & key, std::string const &range_end) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key.assign(key); + params.range_end.assign(range_end); + params.withPrefix = false; + params.kv_stub = stubs->kvServiceStub.get(); + std::shared_ptr call(new etcdv3::AsyncDeleteAction(params)); + return Response::create(call); +} + pplx::task etcd::Client::ls(std::string const & key) { etcdv3::ActionParameters params; @@ -544,6 +560,32 @@ pplx::task etcd::Client::ls(std::string const & key, size_t cons return Response::create(call); } +pplx::task etcd::Client::ls(std::string const & key, std::string const &range_end) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key.assign(key); + params.range_end.assign(range_end); + params.withPrefix = false; + params.limit = 0; // default no limit. + params.kv_stub = stubs->kvServiceStub.get(); + std::shared_ptr call(new etcdv3::AsyncGetAction(params)); + return Response::create(call); +} + +pplx::task etcd::Client::ls(std::string const & key, std::string const &range_end, size_t const limit) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key.assign(key); + params.range_end.assign(range_end); + params.withPrefix = false; + params.limit = limit; + params.kv_stub = stubs->kvServiceStub.get(); + std::shared_ptr call(new etcdv3::AsyncGetAction(params)); + return Response::create(call); +} + pplx::task etcd::Client::watch(std::string const & key, bool recursive) { etcdv3::ActionParameters params; @@ -567,6 +609,36 @@ pplx::task etcd::Client::watch(std::string const & key, int from return Response::create(call); } +pplx::task etcd::Client::watch(std::string const & key, const char *range_end) +{ + return watch(key, std::string(range_end)); +} + +pplx::task etcd::Client::watch(std::string const & key, std::string const & range_end) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key.assign(key); + params.range_end.assign(range_end); + params.withPrefix = false; + params.watch_stub = stubs->watchServiceStub.get(); + std::shared_ptr call(new etcdv3::AsyncWatchAction(params)); + return Response::create(call); +} + +pplx::task etcd::Client::watch(std::string const & key, std::string const & range_end, int fromIndex) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key.assign(key); + params.range_end.assign(range_end); + params.withPrefix = false; + params.revision = fromIndex; + params.watch_stub = stubs->watchServiceStub.get(); + std::shared_ptr call(new etcdv3::AsyncWatchAction(params)); + return Response::create(call); +} + pplx::task etcd::Client::leasegrant(int ttl) { etcdv3::ActionParameters params; diff --git a/src/SyncClient.cpp b/src/SyncClient.cpp index 4dc267f..4bf5594 100644 --- a/src/SyncClient.cpp +++ b/src/SyncClient.cpp @@ -99,11 +99,36 @@ etcd::Response etcd::SyncClient::rmdir(std::string const & key, bool recursive) CHECK_EXCEPTIONS(client.rmdir(key, recursive).get()); } +etcd::Response etcd::SyncClient::rmdir(std::string const & key, const char *range_end) +{ + CHECK_EXCEPTIONS(client.rmdir(key, range_end).get()); +} + +etcd::Response etcd::SyncClient::rmdir(std::string const & key, std::string const &range_end) +{ + CHECK_EXCEPTIONS(client.rmdir(key, range_end).get()); +} + etcd::Response etcd::SyncClient::ls(std::string const & key) { CHECK_EXCEPTIONS(client.ls(key).get()); } +etcd::Response etcd::SyncClient::ls(std::string const & key, size_t const limit) +{ + CHECK_EXCEPTIONS(client.ls(key, limit).get()); +} + +etcd::Response etcd::SyncClient::ls(std::string const & key, std::string const &range_end) +{ + CHECK_EXCEPTIONS(client.ls(key, range_end).get()); +} + +etcd::Response etcd::SyncClient::ls(std::string const & key, std::string const &range_end, size_t limit) +{ + CHECK_EXCEPTIONS(client.ls(key, range_end, limit).get()); +} + etcd::Response etcd::SyncClient::leasegrant(int ttl) { CHECK_EXCEPTIONS(client.leasegrant(ttl).get()); @@ -128,3 +153,18 @@ etcd::Response etcd::SyncClient::watch(std::string const & key, int fromIndex, b { CHECK_EXCEPTIONS(client.watch(key, fromIndex, recursive).get()); } + +etcd::Response etcd::SyncClient::watch(std::string const & key, const char *range_end) +{ + CHECK_EXCEPTIONS(client.watch(key, range_end).get()); +} + +etcd::Response etcd::SyncClient::watch(std::string const & key, std::string const &range_end) +{ + CHECK_EXCEPTIONS(client.watch(key, range_end).get()); +} + +etcd::Response etcd::SyncClient::watch(std::string const & key, std::string const &range_end, int fromIndex) +{ + CHECK_EXCEPTIONS(client.watch(key, range_end, fromIndex).get()); +} diff --git a/src/v3/Action.cpp b/src/v3/Action.cpp index 2211296..2def3ce 100644 --- a/src/v3/Action.cpp +++ b/src/v3/Action.cpp @@ -37,3 +37,19 @@ void etcdv3::Action::waitForResponse() const std::chrono::high_resolution_clock::time_point etcdv3::Action::startTimepoint() { return this->start_timepoint; } + +std::string etcdv3::detail::string_plus_one(std::string const &value) { + // referred from the Go implementation in etcd. + char *s = static_cast(calloc(value.size() + 1, sizeof(char))); + std::memcpy(s, value.c_str(), value.size()); + for (int i = value.size() - 1; i >= 0; --i) { + if (static_cast(s[i]) < 0xff) { + s[i] = s[i] + 1; + std::string ret = std::string(s, i + 1); + free(s); + return ret; + } + } + // see: noPrefixEnd in etcd + return {"\0"}; +} diff --git a/src/v3/AsyncDeleteAction.cpp b/src/v3/AsyncDeleteAction.cpp index 045321a..8bf0933 100644 --- a/src/v3/AsyncDeleteAction.cpp +++ b/src/v3/AsyncDeleteAction.cpp @@ -9,12 +9,16 @@ etcdv3::AsyncDeleteAction::AsyncDeleteAction(ActionParameters param) DeleteRangeRequest del_request; del_request.set_key(parameters.key); del_request.set_prev_kv(true); - std::string range_end(parameters.key); if(parameters.withPrefix) { - int ascii = (int)range_end[range_end.length()-1]; - range_end.back() = ascii+1; - del_request.set_range_end(range_end); + if (parameters.key.empty()) { + del_request.set_range_end(detail::string_plus_one("\0")); + } else { + del_request.set_range_end(detail::string_plus_one(parameters.key)); + } + } + if(!parameters.range_end.empty()) { + del_request.set_range_end(parameters.range_end); } response_reader = parameters.kv_stub->AsyncDeleteRange(&context, del_request, &cq_); @@ -32,7 +36,7 @@ etcdv3::AsyncDeleteRangeResponse etcdv3::AsyncDeleteAction::ParseResponse() } else { - del_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); + del_resp.ParseResponse(parameters.key, parameters.withPrefix || !parameters.range_end.empty(), reply); } return del_resp; diff --git a/src/v3/AsyncGetAction.cpp b/src/v3/AsyncGetAction.cpp index 957d863..bed840e 100644 --- a/src/v3/AsyncGetAction.cpp +++ b/src/v3/AsyncGetAction.cpp @@ -6,21 +6,6 @@ using etcdserverpb::RangeRequest; -static std::string string_plus_one(std::string const &value) { - char *s = static_cast(calloc(value.size() + 1, sizeof(char))); - std::memcpy(s, value.c_str(), value.size()); - for (int i = value.size() - 1; i >= 0; --i) { - if (static_cast(s[i]) < 0xff) { - s[i] = s[i] + 1; - std::string ret = std::string(s, i + 1); - free(s); - return ret; - } - } - // see: noPrefixEnd in etcd - return {"\0"}; -} - etcdv3::AsyncGetAction::AsyncGetAction(etcdv3::ActionParameters param) : etcdv3::Action(param) { @@ -34,12 +19,16 @@ etcdv3::AsyncGetAction::AsyncGetAction(etcdv3::ActionParameters param) if(parameters.withPrefix) { if (parameters.key.empty()) { - get_request.set_range_end(string_plus_one("\0")); + get_request.set_range_end(detail::string_plus_one("\0")); } else { - get_request.set_range_end(string_plus_one(parameters.key)); + get_request.set_range_end(detail::string_plus_one(parameters.key)); } - get_request.set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_NONE); - } + } + if(!parameters.range_end.empty()) { + get_request.set_range_end(parameters.range_end); + } + get_request.set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_NONE); + response_reader = parameters.kv_stub->AsyncRange(&context,get_request,&cq_); response_reader->Finish(&reply, &status, (void*)this); } @@ -54,7 +43,7 @@ etcdv3::AsyncRangeResponse etcdv3::AsyncGetAction::ParseResponse() } else { - range_resp.ParseResponse(reply, parameters.withPrefix); + range_resp.ParseResponse(reply, parameters.withPrefix || !parameters.range_end.empty()); } return range_resp; } diff --git a/src/v3/AsyncWatchAction.cpp b/src/v3/AsyncWatchAction.cpp index 8b1b9cd..81acaa2 100644 --- a/src/v3/AsyncWatchAction.cpp +++ b/src/v3/AsyncWatchAction.cpp @@ -20,10 +20,14 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters param) if(parameters.withPrefix) { - std::string range_end(parameters.key); - int ascii = (int)range_end[range_end.length()-1]; - range_end.back() = ascii+1; - watch_create_req.set_range_end(range_end); + if (parameters.key.empty()) { + watch_create_req.set_range_end(detail::string_plus_one("\0")); + } else { + watch_create_req.set_range_end(detail::string_plus_one(parameters.key)); + } + } + if(!parameters.range_end.empty()) { + watch_create_req.set_range_end(parameters.range_end); } watch_req.mutable_create_request()->CopyFrom(watch_create_req); diff --git a/tst/EtcdTest.cpp b/tst/EtcdTest.cpp index f755b71..a580a39 100644 --- a/tst/EtcdTest.cpp +++ b/tst/EtcdTest.cpp @@ -222,16 +222,48 @@ TEST_CASE("list a directory") CHECK(resp.values()[2].is_dir() == 0); CHECK(0 == etcd.ls("/test/new_dir/key1").get().error_code()); + + CHECK(etcd.rmdir("/test/new_dir", true).get().is_ok()); +} + +TEST_CASE("list by range") +{ + etcd::Client etcd("http://127.0.0.1:2379"); + CHECK(0 == etcd.ls("/test/new_dir").get().keys().size()); + + etcd.set("/test/new_dir/key1", "value1").wait(); + etcd.set("/test/new_dir/key2", "value1").wait(); + etcd.set("/test/new_dir/key3", "value1").wait(); + etcd.set("/test/new_dir/key4", "value1").wait(); + + etcd::Response resp1 = etcd.ls("/test/new_dir/key1", "/test/new_dir/key3").get(); + REQUIRE(resp1.is_ok()); + CHECK("get" == resp1.action()); + REQUIRE(2 == resp1.keys().size()); + REQUIRE(2 == resp1.values().size()); + + etcd::Response resp2 = etcd.ls("/test/new_dir/key1", "/test/new_dir/key4").get(); + REQUIRE(resp2.is_ok()); + CHECK("get" == resp2.action()); + REQUIRE(3 == resp2.keys().size()); + REQUIRE(3 == resp2.values().size()); + + CHECK(0 == etcd.ls("/test/new_dir/key1").get().error_code()); + + CHECK(etcd.rmdir("/test/new_dir", true).get().is_ok()); } TEST_CASE("delete a directory") { etcd::Client etcd("http://127.0.0.1:2379"); + etcd.set("/test/new_dir/key1", "value1").wait(); + etcd.set("/test/new_dir/key2", "value2").wait(); + etcd.set("/test/new_dir/key3", "value3").wait(); + CHECK(100 == etcd.rmdir("/test/new_dir").get().error_code()); // key not found etcd::Response resp = etcd.ls("/test/new_dir").get(); - resp = etcd.rmdir("/test/new_dir", true).get(); int index = resp.index(); CHECK("delete" == resp.action()); @@ -253,6 +285,27 @@ TEST_CASE("delete a directory") CHECK("Key not found" == resp.error_message()); } +TEST_CASE("delete by range") +{ + etcd::Client etcd("http://127.0.0.1:2379"); + + CHECK(100 == etcd.rmdir("/test/new_dir").get().error_code()); // key not found + etcd::Response resp = etcd.ls("/test/new_dir").get(); + + etcd.set("/test/new_dir/key1", "value1").wait(); + etcd.set("/test/new_dir/key2", "value2").wait(); + etcd.set("/test/new_dir/key3", "value3").wait(); + etcd.set("/test/new_dir/key4", "value4").wait(); + + resp = etcd.rmdir("/test/new_dir/key1", "/test/new_dir/key3").get(); + CHECK("delete" == resp.action()); + REQUIRE(2 == resp.keys().size()); + CHECK("/test/new_dir/key1" == resp.key(0)); + CHECK("/test/new_dir/key2" == resp.key(1)); + CHECK("value1" == resp.value(0).as_string()); + CHECK("value2" == resp.value(1).as_string()); +} + TEST_CASE("wait for a value change") { etcd::Client etcd("http://127.0.0.1:2379"); @@ -322,11 +375,34 @@ TEST_CASE("watch changes in the past") CHECK("45" == res.value().as_string()); } +TEST_CASE("watch range changes in the past") +{ + etcd::Client etcd("http://127.0.0.1:2379"); + REQUIRE(0 == etcd.rmdir("/test", true).get().error_code()); + int index = etcd.set("/test/key1", "42").get().index(); + + etcd.set("/test/key1", "43").wait(); + etcd.set("/test/key2", "44").wait(); + etcd.set("/test/key3", "45").wait(); + etcd.set("/test/key4", "45").wait(); + + etcd::Response res; + + res = etcd.watch("/test/key1", "/test/key4", index).get(); + CHECK(4 == res.events().size()); + res = etcd.watch("/test/key1", "/test/key5", index).get(); + CHECK(5 == res.events().size()); + res = etcd.watch("/test/key1", "/test/key4", ++index).get(); + CHECK(3 == res.events().size()); + res = etcd.watch("/test/key1", "/test/key5", ++index).get(); + CHECK(3 == res.events().size()); +} + TEST_CASE("watch multiple keys and use promise") { etcd::Client etcd("http://127.0.0.1:2379"); - int start_index = etcd.add("/test/key1", "value1").get().index(); - etcd.add("/test/key2", "value2").get(); + int start_index = etcd.set("/test/key1", "value1").get().index(); + etcd.set("/test/key2", "value2").get(); pplx::task res = etcd.watch("/test", start_index, true) .then([](pplx::task const &resp_task) -> size_t { @@ -418,6 +494,3 @@ TEST_CASE("cleanup") etcd::Client etcd("http://127.0.0.1:2379"); REQUIRE(0 == etcd.rmdir("/test", true).get().error_code()); } - - -