List/delete/watch on exact range. (#51)

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2021-04-01 14:50:50 +08:00 committed by GitHub
parent 1f9f80b5ff
commit 1b24751b9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 314 additions and 36 deletions

View File

@ -224,6 +224,26 @@ namespace etcd
*/
pplx::task<Response> ls(std::string const & key, size_t const limit);
/**
* Gets a directory listing of the directory identified by the key and range_end, i.e., get
* all keys in the range [key, range_end).
*
* @param key is the key to be listed
* @param range_end is the end of key range to be listed
*/
pplx::task<Response> ls(std::string const & key, std::string const &range_end);
/**
* Gets a directory listing of the directory identified by the key and range_end, i.e., get
* all keys in the range [key, range_end).
*
* @param key is the key to be listed
* @param range_end is the end of key range to be listed
* @param limit is the size limit of results to be listed, we don't use default parameters
* to ensure backwards binary compatibility.
*/
pplx::task<Response> ls(std::string const & key, std::string const &range_end, size_t const limit);
/**
* Removes a directory node. Fails if the parent directory dos not exists or not a directory.
@ -232,6 +252,24 @@ namespace etcd
*/
pplx::task<Response> rmdir(std::string const & key, bool recursive = false);
/**
* Removes multiple keys between [key, range_end).
*
* This overload for `const char *` is to avoid const char * to bool implicit casting.
*
* @param key is the directory to be created to be listed
* @param range_end is the end of key range to be removed.
*/
pplx::task<Response> rmdir(std::string const & key, const char *range_end);
/**
* Removes multiple keys between [key, range_end).
*
* @param key is the directory to be created to be listed
* @param range_end is the end of key range to be removed.
*/
pplx::task<Response> rmdir(std::string const & key, std::string const &range_end);
/**
* Watches for changes of a key or a subtree. Please note that if you watch e.g. "/testdir" and
* a new key is created, like "/testdir/newkey" then no change happened in the value of
@ -250,6 +288,35 @@ namespace etcd
*/
pplx::task<Response> watch(std::string const & key, int fromIndex, bool recursive = false);
/**
* Watches for changes of a range of keys inside [key, range_end).
*
* This overload for `const char *` is to avoid const char * to bool implicit casting.
*
* @param key is the value or directory to be watched
* @param range_end is the end of key range to be removed.
*/
pplx::task<Response> watch(std::string const & key, const char *range_end);
/**
* Watches for changes of a range of keys inside [key, range_end).
*
* @param key is the value or directory to be watched
* @param range_end is the end of key range to be removed.
*/
pplx::task<Response> watch(std::string const & key, std::string const &range_end);
/**
* Watches for changes of a range of keys inside [key, range_end) from a specific index. The index value
* can be in the "past".
*
* Watches for changes of a key or a subtree from a specific index. The index value can be in the "past".
* @param key is the value or directory to be watched
* @param range_end is the end of key range to be removed.
* @param fromIndex the first index we are interested in
*/
pplx::task<Response> watch(std::string const & key, std::string const &range_end, int fromIndex);
/**
* Grants a lease.
* @param ttl is the time to live of the lease

View File

@ -55,8 +55,13 @@ namespace etcd
Response rm_if(std::string const & key, std::string const & old_value);
Response rm_if(std::string const & key, int old_index);
Response ls(std::string const & key);
Response ls(std::string const & key, size_t const limit);
Response ls(std::string const & key, std::string const &range_end);
Response ls(std::string const & key, std::string const &range_end, size_t const limit);
Response mkdir(std::string const & key, int ttl = 0);
Response rmdir(std::string const & key, bool recursive = false);
Response rmdir(std::string const & key, const char *range_end);
Response rmdir(std::string const & key, std::string const &range_end);
Response leasegrant(int ttl);
Response leaserevoke(int64_t lease_id);
Response leasetimetolive(int64_t lease_id);
@ -70,6 +75,8 @@ namespace etcd
* @param recursive if true watch a whole subtree
*/
Response watch(std::string const & key, bool recursive = false);
Response watch(std::string const & key, const char *range_end);
Response watch(std::string const & key, std::string const &range_end);
/**
* Watches for changes of a key or a subtree from a specific index. The index value can be in the "past".
@ -78,6 +85,7 @@ namespace etcd
* @param recursive if true watch a whole subtree
*/
Response watch(std::string const & key, int fromIndex, bool recursive = false);
Response watch(std::string const & key, std::string const &range_end, int fromIndex);
protected:
Client client;

View File

@ -34,6 +34,7 @@ namespace etcdv3
int ttl;
int limit;
std::string key;
std::string range_end;
std::string value;
std::string old_value;
std::string auth_token;
@ -56,5 +57,9 @@ namespace etcdv3
etcdv3::ActionParameters parameters;
std::chrono::high_resolution_clock::time_point start_timepoint;
};
namespace detail {
std::string string_plus_one(std::string const &value);
}
}
#endif

View File

@ -485,7 +485,6 @@ pplx::task<etcd::Response> etcd::Client::rm(std::string const & key)
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, std::string const & old_value)
{
etcdv3::ActionParameters params;
@ -520,6 +519,23 @@ pplx::task<etcd::Response> etcd::Client::rmdir(std::string const & key, bool rec
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::rmdir(std::string const & key, const char *range_end)
{
return rmdir(key, std::string(range_end));
}
pplx::task<etcd::Response> etcd::Client::rmdir(std::string const & key, std::string const &range_end)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.range_end.assign(range_end);
params.withPrefix = false;
params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncDeleteAction> call(new etcdv3::AsyncDeleteAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::ls(std::string const & key)
{
etcdv3::ActionParameters params;
@ -544,6 +560,32 @@ pplx::task<etcd::Response> etcd::Client::ls(std::string const & key, size_t cons
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::ls(std::string const & key, std::string const &range_end)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.range_end.assign(range_end);
params.withPrefix = false;
params.limit = 0; // default no limit.
params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::ls(std::string const & key, std::string const &range_end, size_t const limit)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.range_end.assign(range_end);
params.withPrefix = false;
params.limit = limit;
params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, bool recursive)
{
etcdv3::ActionParameters params;
@ -567,6 +609,36 @@ pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, int from
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, const char *range_end)
{
return watch(key, std::string(range_end));
}
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, std::string const & range_end)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.range_end.assign(range_end);
params.withPrefix = false;
params.watch_stub = stubs->watchServiceStub.get();
std::shared_ptr<etcdv3::AsyncWatchAction> call(new etcdv3::AsyncWatchAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, std::string const & range_end, int fromIndex)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.range_end.assign(range_end);
params.withPrefix = false;
params.revision = fromIndex;
params.watch_stub = stubs->watchServiceStub.get();
std::shared_ptr<etcdv3::AsyncWatchAction> call(new etcdv3::AsyncWatchAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::leasegrant(int ttl)
{
etcdv3::ActionParameters params;

View File

@ -99,11 +99,36 @@ etcd::Response etcd::SyncClient::rmdir(std::string const & key, bool recursive)
CHECK_EXCEPTIONS(client.rmdir(key, recursive).get());
}
etcd::Response etcd::SyncClient::rmdir(std::string const & key, const char *range_end)
{
CHECK_EXCEPTIONS(client.rmdir(key, range_end).get());
}
etcd::Response etcd::SyncClient::rmdir(std::string const & key, std::string const &range_end)
{
CHECK_EXCEPTIONS(client.rmdir(key, range_end).get());
}
etcd::Response etcd::SyncClient::ls(std::string const & key)
{
CHECK_EXCEPTIONS(client.ls(key).get());
}
etcd::Response etcd::SyncClient::ls(std::string const & key, size_t const limit)
{
CHECK_EXCEPTIONS(client.ls(key, limit).get());
}
etcd::Response etcd::SyncClient::ls(std::string const & key, std::string const &range_end)
{
CHECK_EXCEPTIONS(client.ls(key, range_end).get());
}
etcd::Response etcd::SyncClient::ls(std::string const & key, std::string const &range_end, size_t limit)
{
CHECK_EXCEPTIONS(client.ls(key, range_end, limit).get());
}
etcd::Response etcd::SyncClient::leasegrant(int ttl)
{
CHECK_EXCEPTIONS(client.leasegrant(ttl).get());
@ -128,3 +153,18 @@ etcd::Response etcd::SyncClient::watch(std::string const & key, int fromIndex, b
{
CHECK_EXCEPTIONS(client.watch(key, fromIndex, recursive).get());
}
etcd::Response etcd::SyncClient::watch(std::string const & key, const char *range_end)
{
CHECK_EXCEPTIONS(client.watch(key, range_end).get());
}
etcd::Response etcd::SyncClient::watch(std::string const & key, std::string const &range_end)
{
CHECK_EXCEPTIONS(client.watch(key, range_end).get());
}
etcd::Response etcd::SyncClient::watch(std::string const & key, std::string const &range_end, int fromIndex)
{
CHECK_EXCEPTIONS(client.watch(key, range_end, fromIndex).get());
}

View File

@ -37,3 +37,19 @@ void etcdv3::Action::waitForResponse()
const std::chrono::high_resolution_clock::time_point etcdv3::Action::startTimepoint() {
return this->start_timepoint;
}
std::string etcdv3::detail::string_plus_one(std::string const &value) {
// referred from the Go implementation in etcd.
char *s = static_cast<char *>(calloc(value.size() + 1, sizeof(char)));
std::memcpy(s, value.c_str(), value.size());
for (int i = value.size() - 1; i >= 0; --i) {
if (static_cast<unsigned char>(s[i]) < 0xff) {
s[i] = s[i] + 1;
std::string ret = std::string(s, i + 1);
free(s);
return ret;
}
}
// see: noPrefixEnd in etcd
return {"\0"};
}

View File

@ -9,12 +9,16 @@ etcdv3::AsyncDeleteAction::AsyncDeleteAction(ActionParameters param)
DeleteRangeRequest del_request;
del_request.set_key(parameters.key);
del_request.set_prev_kv(true);
std::string range_end(parameters.key);
if(parameters.withPrefix)
{
int ascii = (int)range_end[range_end.length()-1];
range_end.back() = ascii+1;
del_request.set_range_end(range_end);
if (parameters.key.empty()) {
del_request.set_range_end(detail::string_plus_one("\0"));
} else {
del_request.set_range_end(detail::string_plus_one(parameters.key));
}
}
if(!parameters.range_end.empty()) {
del_request.set_range_end(parameters.range_end);
}
response_reader = parameters.kv_stub->AsyncDeleteRange(&context, del_request, &cq_);
@ -32,7 +36,7 @@ etcdv3::AsyncDeleteRangeResponse etcdv3::AsyncDeleteAction::ParseResponse()
}
else
{
del_resp.ParseResponse(parameters.key, parameters.withPrefix, reply);
del_resp.ParseResponse(parameters.key, parameters.withPrefix || !parameters.range_end.empty(), reply);
}
return del_resp;

View File

@ -6,21 +6,6 @@
using etcdserverpb::RangeRequest;
static std::string string_plus_one(std::string const &value) {
char *s = static_cast<char *>(calloc(value.size() + 1, sizeof(char)));
std::memcpy(s, value.c_str(), value.size());
for (int i = value.size() - 1; i >= 0; --i) {
if (static_cast<unsigned char>(s[i]) < 0xff) {
s[i] = s[i] + 1;
std::string ret = std::string(s, i + 1);
free(s);
return ret;
}
}
// see: noPrefixEnd in etcd
return {"\0"};
}
etcdv3::AsyncGetAction::AsyncGetAction(etcdv3::ActionParameters param)
: etcdv3::Action(param)
{
@ -34,12 +19,16 @@ etcdv3::AsyncGetAction::AsyncGetAction(etcdv3::ActionParameters param)
if(parameters.withPrefix)
{
if (parameters.key.empty()) {
get_request.set_range_end(string_plus_one("\0"));
get_request.set_range_end(detail::string_plus_one("\0"));
} else {
get_request.set_range_end(string_plus_one(parameters.key));
get_request.set_range_end(detail::string_plus_one(parameters.key));
}
}
if(!parameters.range_end.empty()) {
get_request.set_range_end(parameters.range_end);
}
get_request.set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_NONE);
}
response_reader = parameters.kv_stub->AsyncRange(&context,get_request,&cq_);
response_reader->Finish(&reply, &status, (void*)this);
}
@ -54,7 +43,7 @@ etcdv3::AsyncRangeResponse etcdv3::AsyncGetAction::ParseResponse()
}
else
{
range_resp.ParseResponse(reply, parameters.withPrefix);
range_resp.ParseResponse(reply, parameters.withPrefix || !parameters.range_end.empty());
}
return range_resp;
}

View File

@ -20,10 +20,14 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters param)
if(parameters.withPrefix)
{
std::string range_end(parameters.key);
int ascii = (int)range_end[range_end.length()-1];
range_end.back() = ascii+1;
watch_create_req.set_range_end(range_end);
if (parameters.key.empty()) {
watch_create_req.set_range_end(detail::string_plus_one("\0"));
} else {
watch_create_req.set_range_end(detail::string_plus_one(parameters.key));
}
}
if(!parameters.range_end.empty()) {
watch_create_req.set_range_end(parameters.range_end);
}
watch_req.mutable_create_request()->CopyFrom(watch_create_req);

View File

@ -222,16 +222,48 @@ TEST_CASE("list a directory")
CHECK(resp.values()[2].is_dir() == 0);
CHECK(0 == etcd.ls("/test/new_dir/key1").get().error_code());
CHECK(etcd.rmdir("/test/new_dir", true).get().is_ok());
}
TEST_CASE("list by range")
{
etcd::Client etcd("http://127.0.0.1:2379");
CHECK(0 == etcd.ls("/test/new_dir").get().keys().size());
etcd.set("/test/new_dir/key1", "value1").wait();
etcd.set("/test/new_dir/key2", "value1").wait();
etcd.set("/test/new_dir/key3", "value1").wait();
etcd.set("/test/new_dir/key4", "value1").wait();
etcd::Response resp1 = etcd.ls("/test/new_dir/key1", "/test/new_dir/key3").get();
REQUIRE(resp1.is_ok());
CHECK("get" == resp1.action());
REQUIRE(2 == resp1.keys().size());
REQUIRE(2 == resp1.values().size());
etcd::Response resp2 = etcd.ls("/test/new_dir/key1", "/test/new_dir/key4").get();
REQUIRE(resp2.is_ok());
CHECK("get" == resp2.action());
REQUIRE(3 == resp2.keys().size());
REQUIRE(3 == resp2.values().size());
CHECK(0 == etcd.ls("/test/new_dir/key1").get().error_code());
CHECK(etcd.rmdir("/test/new_dir", true).get().is_ok());
}
TEST_CASE("delete a directory")
{
etcd::Client etcd("http://127.0.0.1:2379");
etcd.set("/test/new_dir/key1", "value1").wait();
etcd.set("/test/new_dir/key2", "value2").wait();
etcd.set("/test/new_dir/key3", "value3").wait();
CHECK(100 == etcd.rmdir("/test/new_dir").get().error_code()); // key not found
etcd::Response resp = etcd.ls("/test/new_dir").get();
resp = etcd.rmdir("/test/new_dir", true).get();
int index = resp.index();
CHECK("delete" == resp.action());
@ -253,6 +285,27 @@ TEST_CASE("delete a directory")
CHECK("Key not found" == resp.error_message());
}
TEST_CASE("delete by range")
{
etcd::Client etcd("http://127.0.0.1:2379");
CHECK(100 == etcd.rmdir("/test/new_dir").get().error_code()); // key not found
etcd::Response resp = etcd.ls("/test/new_dir").get();
etcd.set("/test/new_dir/key1", "value1").wait();
etcd.set("/test/new_dir/key2", "value2").wait();
etcd.set("/test/new_dir/key3", "value3").wait();
etcd.set("/test/new_dir/key4", "value4").wait();
resp = etcd.rmdir("/test/new_dir/key1", "/test/new_dir/key3").get();
CHECK("delete" == resp.action());
REQUIRE(2 == resp.keys().size());
CHECK("/test/new_dir/key1" == resp.key(0));
CHECK("/test/new_dir/key2" == resp.key(1));
CHECK("value1" == resp.value(0).as_string());
CHECK("value2" == resp.value(1).as_string());
}
TEST_CASE("wait for a value change")
{
etcd::Client etcd("http://127.0.0.1:2379");
@ -322,11 +375,34 @@ TEST_CASE("watch changes in the past")
CHECK("45" == res.value().as_string());
}
TEST_CASE("watch range changes in the past")
{
etcd::Client etcd("http://127.0.0.1:2379");
REQUIRE(0 == etcd.rmdir("/test", true).get().error_code());
int index = etcd.set("/test/key1", "42").get().index();
etcd.set("/test/key1", "43").wait();
etcd.set("/test/key2", "44").wait();
etcd.set("/test/key3", "45").wait();
etcd.set("/test/key4", "45").wait();
etcd::Response res;
res = etcd.watch("/test/key1", "/test/key4", index).get();
CHECK(4 == res.events().size());
res = etcd.watch("/test/key1", "/test/key5", index).get();
CHECK(5 == res.events().size());
res = etcd.watch("/test/key1", "/test/key4", ++index).get();
CHECK(3 == res.events().size());
res = etcd.watch("/test/key1", "/test/key5", ++index).get();
CHECK(3 == res.events().size());
}
TEST_CASE("watch multiple keys and use promise") {
etcd::Client etcd("http://127.0.0.1:2379");
int start_index = etcd.add("/test/key1", "value1").get().index();
etcd.add("/test/key2", "value2").get();
int start_index = etcd.set("/test/key1", "value1").get().index();
etcd.set("/test/key2", "value2").get();
pplx::task<size_t> res = etcd.watch("/test", start_index, true)
.then([](pplx::task<etcd::Response> const &resp_task) -> size_t {
@ -418,6 +494,3 @@ TEST_CASE("cleanup")
etcd::Client etcd("http://127.0.0.1:2379");
REQUIRE(0 == etcd.rmdir("/test", true).get().error_code());
}