List/delete/watch on exact range.

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2021-04-01 14:34:34 +08:00
parent d50f570846
commit 425a9c8379
10 changed files with 314 additions and 36 deletions

View File

@ -224,6 +224,26 @@ namespace etcd
*/ */
pplx::task<Response> ls(std::string const & key, size_t const limit); pplx::task<Response> ls(std::string const & key, size_t const limit);
/**
* Gets a directory listing of the directory identified by the key and range_end, i.e., get
* all keys in the range [key, range_end).
*
* @param key is the key to be listed
* @param range_end is the end of key range to be listed
*/
pplx::task<Response> ls(std::string const & key, std::string const &range_end);
/**
* Gets a directory listing of the directory identified by the key and range_end, i.e., get
* all keys in the range [key, range_end).
*
* @param key is the key to be listed
* @param range_end is the end of key range to be listed
* @param limit is the size limit of results to be listed, we don't use default parameters
* to ensure backwards binary compatibility.
*/
pplx::task<Response> ls(std::string const & key, std::string const &range_end, size_t const limit);
/** /**
* Removes a directory node. Fails if the parent directory dos not exists or not a directory. * Removes a directory node. Fails if the parent directory dos not exists or not a directory.
@ -232,6 +252,24 @@ namespace etcd
*/ */
pplx::task<Response> rmdir(std::string const & key, bool recursive = false); pplx::task<Response> rmdir(std::string const & key, bool recursive = false);
/**
* Removes multiple keys between [key, range_end).
*
* This overload for `const char *` is to avoid const char * to bool implicit casting.
*
* @param key is the directory to be created to be listed
* @param range_end is the end of key range to be removed.
*/
pplx::task<Response> rmdir(std::string const & key, const char *range_end);
/**
* Removes multiple keys between [key, range_end).
*
* @param key is the directory to be created to be listed
* @param range_end is the end of key range to be removed.
*/
pplx::task<Response> rmdir(std::string const & key, std::string const &range_end);
/** /**
* Watches for changes of a key or a subtree. Please note that if you watch e.g. "/testdir" and * Watches for changes of a key or a subtree. Please note that if you watch e.g. "/testdir" and
* a new key is created, like "/testdir/newkey" then no change happened in the value of * a new key is created, like "/testdir/newkey" then no change happened in the value of
@ -250,6 +288,35 @@ namespace etcd
*/ */
pplx::task<Response> watch(std::string const & key, int fromIndex, bool recursive = false); pplx::task<Response> watch(std::string const & key, int fromIndex, bool recursive = false);
/**
* Watches for changes of a range of keys inside [key, range_end).
*
* This overload for `const char *` is to avoid const char * to bool implicit casting.
*
* @param key is the value or directory to be watched
* @param range_end is the end of key range to be removed.
*/
pplx::task<Response> watch(std::string const & key, const char *range_end);
/**
* Watches for changes of a range of keys inside [key, range_end).
*
* @param key is the value or directory to be watched
* @param range_end is the end of key range to be removed.
*/
pplx::task<Response> watch(std::string const & key, std::string const &range_end);
/**
* Watches for changes of a range of keys inside [key, range_end) from a specific index. The index value
* can be in the "past".
*
* Watches for changes of a key or a subtree from a specific index. The index value can be in the "past".
* @param key is the value or directory to be watched
* @param range_end is the end of key range to be removed.
* @param fromIndex the first index we are interested in
*/
pplx::task<Response> watch(std::string const & key, std::string const &range_end, int fromIndex);
/** /**
* Grants a lease. * Grants a lease.
* @param ttl is the time to live of the lease * @param ttl is the time to live of the lease

View File

@ -55,8 +55,13 @@ namespace etcd
Response rm_if(std::string const & key, std::string const & old_value); Response rm_if(std::string const & key, std::string const & old_value);
Response rm_if(std::string const & key, int old_index); Response rm_if(std::string const & key, int old_index);
Response ls(std::string const & key); Response ls(std::string const & key);
Response ls(std::string const & key, size_t const limit);
Response ls(std::string const & key, std::string const &range_end);
Response ls(std::string const & key, std::string const &range_end, size_t const limit);
Response mkdir(std::string const & key, int ttl = 0); Response mkdir(std::string const & key, int ttl = 0);
Response rmdir(std::string const & key, bool recursive = false); Response rmdir(std::string const & key, bool recursive = false);
Response rmdir(std::string const & key, const char *range_end);
Response rmdir(std::string const & key, std::string const &range_end);
Response leasegrant(int ttl); Response leasegrant(int ttl);
Response leaserevoke(int64_t lease_id); Response leaserevoke(int64_t lease_id);
Response leasetimetolive(int64_t lease_id); Response leasetimetolive(int64_t lease_id);
@ -70,6 +75,8 @@ namespace etcd
* @param recursive if true watch a whole subtree * @param recursive if true watch a whole subtree
*/ */
Response watch(std::string const & key, bool recursive = false); Response watch(std::string const & key, bool recursive = false);
Response watch(std::string const & key, const char *range_end);
Response watch(std::string const & key, std::string const &range_end);
/** /**
* Watches for changes of a key or a subtree from a specific index. The index value can be in the "past". * Watches for changes of a key or a subtree from a specific index. The index value can be in the "past".
@ -78,6 +85,7 @@ namespace etcd
* @param recursive if true watch a whole subtree * @param recursive if true watch a whole subtree
*/ */
Response watch(std::string const & key, int fromIndex, bool recursive = false); Response watch(std::string const & key, int fromIndex, bool recursive = false);
Response watch(std::string const & key, std::string const &range_end, int fromIndex);
protected: protected:
Client client; Client client;

View File

@ -34,6 +34,7 @@ namespace etcdv3
int ttl; int ttl;
int limit; int limit;
std::string key; std::string key;
std::string range_end;
std::string value; std::string value;
std::string old_value; std::string old_value;
std::string auth_token; std::string auth_token;
@ -56,5 +57,9 @@ namespace etcdv3
etcdv3::ActionParameters parameters; etcdv3::ActionParameters parameters;
std::chrono::high_resolution_clock::time_point start_timepoint; std::chrono::high_resolution_clock::time_point start_timepoint;
}; };
namespace detail {
std::string string_plus_one(std::string const &value);
}
} }
#endif #endif

View File

@ -485,7 +485,6 @@ pplx::task<etcd::Response> etcd::Client::rm(std::string const & key)
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, std::string const & old_value) pplx::task<etcd::Response> etcd::Client::rm_if(std::string const & key, std::string const & old_value)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
@ -520,6 +519,23 @@ pplx::task<etcd::Response> etcd::Client::rmdir(std::string const & key, bool rec
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::rmdir(std::string const & key, const char *range_end)
{
return rmdir(key, std::string(range_end));
}
pplx::task<etcd::Response> etcd::Client::rmdir(std::string const & key, std::string const &range_end)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.range_end.assign(range_end);
params.withPrefix = false;
params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncDeleteAction> call(new etcdv3::AsyncDeleteAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::ls(std::string const & key) pplx::task<etcd::Response> etcd::Client::ls(std::string const & key)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
@ -544,6 +560,32 @@ pplx::task<etcd::Response> etcd::Client::ls(std::string const & key, size_t cons
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::ls(std::string const & key, std::string const &range_end)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.range_end.assign(range_end);
params.withPrefix = false;
params.limit = 0; // default no limit.
params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::ls(std::string const & key, std::string const &range_end, size_t const limit)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.range_end.assign(range_end);
params.withPrefix = false;
params.limit = limit;
params.kv_stub = stubs->kvServiceStub.get();
std::shared_ptr<etcdv3::AsyncGetAction> call(new etcdv3::AsyncGetAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, bool recursive) pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, bool recursive)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
@ -567,6 +609,36 @@ pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, int from
return Response::create(call); return Response::create(call);
} }
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, const char *range_end)
{
return watch(key, std::string(range_end));
}
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, std::string const & range_end)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.range_end.assign(range_end);
params.withPrefix = false;
params.watch_stub = stubs->watchServiceStub.get();
std::shared_ptr<etcdv3::AsyncWatchAction> call(new etcdv3::AsyncWatchAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, std::string const & range_end, int fromIndex)
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key.assign(key);
params.range_end.assign(range_end);
params.withPrefix = false;
params.revision = fromIndex;
params.watch_stub = stubs->watchServiceStub.get();
std::shared_ptr<etcdv3::AsyncWatchAction> call(new etcdv3::AsyncWatchAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::leasegrant(int ttl) pplx::task<etcd::Response> etcd::Client::leasegrant(int ttl)
{ {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;

View File

@ -99,11 +99,36 @@ etcd::Response etcd::SyncClient::rmdir(std::string const & key, bool recursive)
CHECK_EXCEPTIONS(client.rmdir(key, recursive).get()); CHECK_EXCEPTIONS(client.rmdir(key, recursive).get());
} }
etcd::Response etcd::SyncClient::rmdir(std::string const & key, const char *range_end)
{
CHECK_EXCEPTIONS(client.rmdir(key, range_end).get());
}
etcd::Response etcd::SyncClient::rmdir(std::string const & key, std::string const &range_end)
{
CHECK_EXCEPTIONS(client.rmdir(key, range_end).get());
}
etcd::Response etcd::SyncClient::ls(std::string const & key) etcd::Response etcd::SyncClient::ls(std::string const & key)
{ {
CHECK_EXCEPTIONS(client.ls(key).get()); CHECK_EXCEPTIONS(client.ls(key).get());
} }
etcd::Response etcd::SyncClient::ls(std::string const & key, size_t const limit)
{
CHECK_EXCEPTIONS(client.ls(key, limit).get());
}
etcd::Response etcd::SyncClient::ls(std::string const & key, std::string const &range_end)
{
CHECK_EXCEPTIONS(client.ls(key, range_end).get());
}
etcd::Response etcd::SyncClient::ls(std::string const & key, std::string const &range_end, size_t limit)
{
CHECK_EXCEPTIONS(client.ls(key, range_end, limit).get());
}
etcd::Response etcd::SyncClient::leasegrant(int ttl) etcd::Response etcd::SyncClient::leasegrant(int ttl)
{ {
CHECK_EXCEPTIONS(client.leasegrant(ttl).get()); CHECK_EXCEPTIONS(client.leasegrant(ttl).get());
@ -128,3 +153,18 @@ etcd::Response etcd::SyncClient::watch(std::string const & key, int fromIndex, b
{ {
CHECK_EXCEPTIONS(client.watch(key, fromIndex, recursive).get()); CHECK_EXCEPTIONS(client.watch(key, fromIndex, recursive).get());
} }
etcd::Response etcd::SyncClient::watch(std::string const & key, const char *range_end)
{
CHECK_EXCEPTIONS(client.watch(key, range_end).get());
}
etcd::Response etcd::SyncClient::watch(std::string const & key, std::string const &range_end)
{
CHECK_EXCEPTIONS(client.watch(key, range_end).get());
}
etcd::Response etcd::SyncClient::watch(std::string const & key, std::string const &range_end, int fromIndex)
{
CHECK_EXCEPTIONS(client.watch(key, range_end, fromIndex).get());
}

View File

@ -37,3 +37,19 @@ void etcdv3::Action::waitForResponse()
const std::chrono::high_resolution_clock::time_point etcdv3::Action::startTimepoint() { const std::chrono::high_resolution_clock::time_point etcdv3::Action::startTimepoint() {
return this->start_timepoint; return this->start_timepoint;
} }
std::string etcdv3::detail::string_plus_one(std::string const &value) {
// referred from the Go implementation in etcd.
char *s = static_cast<char *>(calloc(value.size() + 1, sizeof(char)));
std::memcpy(s, value.c_str(), value.size());
for (int i = value.size() - 1; i >= 0; --i) {
if (static_cast<unsigned char>(s[i]) < 0xff) {
s[i] = s[i] + 1;
std::string ret = std::string(s, i + 1);
free(s);
return ret;
}
}
// see: noPrefixEnd in etcd
return {"\0"};
}

View File

@ -9,12 +9,16 @@ etcdv3::AsyncDeleteAction::AsyncDeleteAction(ActionParameters param)
DeleteRangeRequest del_request; DeleteRangeRequest del_request;
del_request.set_key(parameters.key); del_request.set_key(parameters.key);
del_request.set_prev_kv(true); del_request.set_prev_kv(true);
std::string range_end(parameters.key);
if(parameters.withPrefix) if(parameters.withPrefix)
{ {
int ascii = (int)range_end[range_end.length()-1]; if (parameters.key.empty()) {
range_end.back() = ascii+1; del_request.set_range_end(detail::string_plus_one("\0"));
del_request.set_range_end(range_end); } else {
del_request.set_range_end(detail::string_plus_one(parameters.key));
}
}
if(!parameters.range_end.empty()) {
del_request.set_range_end(parameters.range_end);
} }
response_reader = parameters.kv_stub->AsyncDeleteRange(&context, del_request, &cq_); response_reader = parameters.kv_stub->AsyncDeleteRange(&context, del_request, &cq_);
@ -32,7 +36,7 @@ etcdv3::AsyncDeleteRangeResponse etcdv3::AsyncDeleteAction::ParseResponse()
} }
else else
{ {
del_resp.ParseResponse(parameters.key, parameters.withPrefix, reply); del_resp.ParseResponse(parameters.key, parameters.withPrefix || !parameters.range_end.empty(), reply);
} }
return del_resp; return del_resp;

View File

@ -6,21 +6,6 @@
using etcdserverpb::RangeRequest; using etcdserverpb::RangeRequest;
static std::string string_plus_one(std::string const &value) {
char *s = static_cast<char *>(calloc(value.size() + 1, sizeof(char)));
std::memcpy(s, value.c_str(), value.size());
for (int i = value.size() - 1; i >= 0; --i) {
if (static_cast<unsigned char>(s[i]) < 0xff) {
s[i] = s[i] + 1;
std::string ret = std::string(s, i + 1);
free(s);
return ret;
}
}
// see: noPrefixEnd in etcd
return {"\0"};
}
etcdv3::AsyncGetAction::AsyncGetAction(etcdv3::ActionParameters param) etcdv3::AsyncGetAction::AsyncGetAction(etcdv3::ActionParameters param)
: etcdv3::Action(param) : etcdv3::Action(param)
{ {
@ -34,12 +19,16 @@ etcdv3::AsyncGetAction::AsyncGetAction(etcdv3::ActionParameters param)
if(parameters.withPrefix) if(parameters.withPrefix)
{ {
if (parameters.key.empty()) { if (parameters.key.empty()) {
get_request.set_range_end(string_plus_one("\0")); get_request.set_range_end(detail::string_plus_one("\0"));
} else { } else {
get_request.set_range_end(string_plus_one(parameters.key)); get_request.set_range_end(detail::string_plus_one(parameters.key));
} }
get_request.set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_NONE);
} }
if(!parameters.range_end.empty()) {
get_request.set_range_end(parameters.range_end);
}
get_request.set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_NONE);
response_reader = parameters.kv_stub->AsyncRange(&context,get_request,&cq_); response_reader = parameters.kv_stub->AsyncRange(&context,get_request,&cq_);
response_reader->Finish(&reply, &status, (void*)this); response_reader->Finish(&reply, &status, (void*)this);
} }
@ -54,7 +43,7 @@ etcdv3::AsyncRangeResponse etcdv3::AsyncGetAction::ParseResponse()
} }
else else
{ {
range_resp.ParseResponse(reply, parameters.withPrefix); range_resp.ParseResponse(reply, parameters.withPrefix || !parameters.range_end.empty());
} }
return range_resp; return range_resp;
} }

View File

@ -20,10 +20,14 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters param)
if(parameters.withPrefix) if(parameters.withPrefix)
{ {
std::string range_end(parameters.key); if (parameters.key.empty()) {
int ascii = (int)range_end[range_end.length()-1]; watch_create_req.set_range_end(detail::string_plus_one("\0"));
range_end.back() = ascii+1; } else {
watch_create_req.set_range_end(range_end); watch_create_req.set_range_end(detail::string_plus_one(parameters.key));
}
}
if(!parameters.range_end.empty()) {
watch_create_req.set_range_end(parameters.range_end);
} }
watch_req.mutable_create_request()->CopyFrom(watch_create_req); watch_req.mutable_create_request()->CopyFrom(watch_create_req);

View File

@ -222,16 +222,48 @@ TEST_CASE("list a directory")
CHECK(resp.values()[2].is_dir() == 0); CHECK(resp.values()[2].is_dir() == 0);
CHECK(0 == etcd.ls("/test/new_dir/key1").get().error_code()); CHECK(0 == etcd.ls("/test/new_dir/key1").get().error_code());
CHECK(etcd.rmdir("/test/new_dir", true).get().is_ok());
}
TEST_CASE("list by range")
{
etcd::Client etcd("http://127.0.0.1:2379");
CHECK(0 == etcd.ls("/test/new_dir").get().keys().size());
etcd.set("/test/new_dir/key1", "value1").wait();
etcd.set("/test/new_dir/key2", "value1").wait();
etcd.set("/test/new_dir/key3", "value1").wait();
etcd.set("/test/new_dir/key4", "value1").wait();
etcd::Response resp1 = etcd.ls("/test/new_dir/key1", "/test/new_dir/key3").get();
REQUIRE(resp1.is_ok());
CHECK("get" == resp1.action());
REQUIRE(2 == resp1.keys().size());
REQUIRE(2 == resp1.values().size());
etcd::Response resp2 = etcd.ls("/test/new_dir/key1", "/test/new_dir/key4").get();
REQUIRE(resp2.is_ok());
CHECK("get" == resp2.action());
REQUIRE(3 == resp2.keys().size());
REQUIRE(3 == resp2.values().size());
CHECK(0 == etcd.ls("/test/new_dir/key1").get().error_code());
CHECK(etcd.rmdir("/test/new_dir", true).get().is_ok());
} }
TEST_CASE("delete a directory") TEST_CASE("delete a directory")
{ {
etcd::Client etcd("http://127.0.0.1:2379"); etcd::Client etcd("http://127.0.0.1:2379");
etcd.set("/test/new_dir/key1", "value1").wait();
etcd.set("/test/new_dir/key2", "value2").wait();
etcd.set("/test/new_dir/key3", "value3").wait();
CHECK(100 == etcd.rmdir("/test/new_dir").get().error_code()); // key not found CHECK(100 == etcd.rmdir("/test/new_dir").get().error_code()); // key not found
etcd::Response resp = etcd.ls("/test/new_dir").get(); etcd::Response resp = etcd.ls("/test/new_dir").get();
resp = etcd.rmdir("/test/new_dir", true).get(); resp = etcd.rmdir("/test/new_dir", true).get();
int index = resp.index(); int index = resp.index();
CHECK("delete" == resp.action()); CHECK("delete" == resp.action());
@ -253,6 +285,27 @@ TEST_CASE("delete a directory")
CHECK("Key not found" == resp.error_message()); CHECK("Key not found" == resp.error_message());
} }
TEST_CASE("delete by range")
{
etcd::Client etcd("http://127.0.0.1:2379");
CHECK(100 == etcd.rmdir("/test/new_dir").get().error_code()); // key not found
etcd::Response resp = etcd.ls("/test/new_dir").get();
etcd.set("/test/new_dir/key1", "value1").wait();
etcd.set("/test/new_dir/key2", "value2").wait();
etcd.set("/test/new_dir/key3", "value3").wait();
etcd.set("/test/new_dir/key4", "value4").wait();
resp = etcd.rmdir("/test/new_dir/key1", "/test/new_dir/key3").get();
CHECK("delete" == resp.action());
REQUIRE(2 == resp.keys().size());
CHECK("/test/new_dir/key1" == resp.key(0));
CHECK("/test/new_dir/key2" == resp.key(1));
CHECK("value1" == resp.value(0).as_string());
CHECK("value2" == resp.value(1).as_string());
}
TEST_CASE("wait for a value change") TEST_CASE("wait for a value change")
{ {
etcd::Client etcd("http://127.0.0.1:2379"); etcd::Client etcd("http://127.0.0.1:2379");
@ -322,11 +375,34 @@ TEST_CASE("watch changes in the past")
CHECK("45" == res.value().as_string()); CHECK("45" == res.value().as_string());
} }
TEST_CASE("watch range changes in the past")
{
etcd::Client etcd("http://127.0.0.1:2379");
REQUIRE(0 == etcd.rmdir("/test", true).get().error_code());
int index = etcd.set("/test/key1", "42").get().index();
etcd.set("/test/key1", "43").wait();
etcd.set("/test/key2", "44").wait();
etcd.set("/test/key3", "45").wait();
etcd.set("/test/key4", "45").wait();
etcd::Response res;
res = etcd.watch("/test/key1", "/test/key4", index).get();
CHECK(4 == res.events().size());
res = etcd.watch("/test/key1", "/test/key5", index).get();
CHECK(5 == res.events().size());
res = etcd.watch("/test/key1", "/test/key4", ++index).get();
CHECK(3 == res.events().size());
res = etcd.watch("/test/key1", "/test/key5", ++index).get();
CHECK(3 == res.events().size());
}
TEST_CASE("watch multiple keys and use promise") { TEST_CASE("watch multiple keys and use promise") {
etcd::Client etcd("http://127.0.0.1:2379"); etcd::Client etcd("http://127.0.0.1:2379");
int start_index = etcd.add("/test/key1", "value1").get().index(); int start_index = etcd.set("/test/key1", "value1").get().index();
etcd.add("/test/key2", "value2").get(); etcd.set("/test/key2", "value2").get();
pplx::task<size_t> res = etcd.watch("/test", start_index, true) pplx::task<size_t> res = etcd.watch("/test", start_index, true)
.then([](pplx::task<etcd::Response> const &resp_task) -> size_t { .then([](pplx::task<etcd::Response> const &resp_task) -> size_t {
@ -418,6 +494,3 @@ TEST_CASE("cleanup")
etcd::Client etcd("http://127.0.0.1:2379"); etcd::Client etcd("http://127.0.0.1:2379");
REQUIRE(0 == etcd.rmdir("/test", true).get().error_code()); REQUIRE(0 == etcd.rmdir("/test", true).get().error_code());
} }