Fixes bugs in ls/rmdir/watch for processing range end.

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2023-03-07 21:26:33 +08:00
parent b12fc293b9
commit 4f25a5c5de
9 changed files with 79 additions and 43 deletions

View File

@ -46,7 +46,7 @@ i.e., `ETCDCTL_API=3`.
libgrpc++-dev \ libgrpc++-dev \
libprotobuf-dev \ libprotobuf-dev \
protobuf-compiler-grpc protobuf-compiler-grpc
+ On MacOS, above requirements related to protobuf and gRPC can be installed as: + On MacOS, above requirements related to protobuf and gRPC can be installed as:
brew install grpc protobuf brew install grpc protobuf
@ -714,7 +714,7 @@ Here is an example how users can make a watcher re-connect to server after disco
```c++ ```c++
// wait the client ready // wait the client ready
void wait_for_connection(etcd::Client &client) { void wait_for_connection(etcd::Client &client) {
// wait until the client connects to etcd server // wait until the client connects to etcd server
while (!client.head().get().is_ok()) { while (!client.head().get().is_ok()) {
sleep(1); sleep(1);
} }
@ -750,16 +750,18 @@ void initialize_watcher(const std::string& endpoints,
The functionalities can be used as The functionalities can be used as
```c++ ```c++
std::string endpoints = "http://127.0.0.1:2379"; std::string endpoints = "http://127.0.0.1:2379";
std::function<void(Response)> callback = printResponse; std::function<void(Response)> callback = printResponse;
const std::string prefix = "/test/key"; const std::string prefix = "/test/key";
// the watcher initialized in this way will auto re-connect to etcd // the watcher initialized in this way will auto re-connect to etcd
std::shared_ptr<etcd::Watcher> watcher; std::shared_ptr<etcd::Watcher> watcher;
initialize_watcher(endpoints, prefix, callback, watcher); initialize_watcher(endpoints, prefix, callback, watcher);
``` ```
For a complete runnable example, see also [./tst/RewatchTest.cpp](./tst/RewatchTest.cpp). For a complete runnable example, see also [./tst/RewatchTest.cpp](./tst/RewatchTest.cpp). Note
that you shouldn't use the watcher itself inside the `Wait()` callback as the callback will be
invoked in a separate **detached** thread where the watcher may have been destroyed.
### Requesting for lease ### Requesting for lease

View File

@ -188,14 +188,18 @@ namespace etcd
* Wait util the task has been stopped, actively or passively, e.g., the watcher * Wait util the task has been stopped, actively or passively, e.g., the watcher
* get cancelled or the server closes the connection. * get cancelled or the server closes the connection.
* *
* Returns true if the watcher is been normally cancalled, otherwise false. * Returns true if the watcher is been normally cancelled, otherwise false.
*/ */
bool Wait(); bool Wait();
/** /**
* An async wait, the callback will be called when the task has been stopped. * An async wait, the callback will be called when the task has been stopped.
* *
* The callback parameter would be true if the watch is been normally cancalled. * The callback parameter would be true if the watch is been normally cancelled.
*
* Note that you shouldn't use the watcher itself inside the `Wait()` callback
* as the callback will be invoked in a separate **detached** thread where the
* watcher may have been destroyed.
*/ */
void Wait(std::function<void(bool)> callback); void Wait(std::function<void(bool)> callback);

View File

@ -1,6 +1,8 @@
#ifndef __ETCD_ACTION_CONSTANTS_HPP__ #ifndef __ETCD_ACTION_CONSTANTS_HPP__
#define __ETCD_ACTION_CONSTANTS_HPP__ #define __ETCD_ACTION_CONSTANTS_HPP__
#include <string>
namespace etcdv3 namespace etcdv3
{ {
extern char const * CREATE_ACTION; extern char const * CREATE_ACTION;
@ -28,7 +30,7 @@ namespace etcdv3
extern char const * OBSERVE_ACTION; extern char const * OBSERVE_ACTION;
extern char const * RESIGN_ACTION; extern char const * RESIGN_ACTION;
extern char const * NUL; extern std::string const NUL;
extern char const * KEEPALIVE_CREATE; extern char const * KEEPALIVE_CREATE;
extern char const * KEEPALIVE_WRITE; extern char const * KEEPALIVE_WRITE;

View File

@ -8,13 +8,15 @@ etcdv3::AsyncDeleteAction::AsyncDeleteAction(
: etcdv3::Action(std::move(params)) : etcdv3::Action(std::move(params))
{ {
DeleteRangeRequest del_request; DeleteRangeRequest del_request;
del_request.set_key(parameters.key); if (!parameters.withPrefix) {
del_request.set_prev_kv(true); del_request.set_key(parameters.key);
if(parameters.withPrefix) } else {
{
if (parameters.key.empty()) { if (parameters.key.empty()) {
del_request.set_range_end(detail::string_plus_one(etcdv3::NUL)); // see: WithFromKey in etcdv3/client
del_request.set_key(etcdv3::NUL);
del_request.set_range_end(etcdv3::NUL);
} else { } else {
del_request.set_key(parameters.key);
del_request.set_range_end(detail::string_plus_one(parameters.key)); del_request.set_range_end(detail::string_plus_one(parameters.key));
} }
} }
@ -22,6 +24,8 @@ etcdv3::AsyncDeleteAction::AsyncDeleteAction(
del_request.set_range_end(parameters.range_end); del_request.set_range_end(parameters.range_end);
} }
del_request.set_prev_kv(true);
response_reader = parameters.kv_stub->AsyncDeleteRange(&context, del_request, &cq_); response_reader = parameters.kv_stub->AsyncDeleteRange(&context, del_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this); response_reader->Finish(&reply, &status, (void*)this);
} }

View File

@ -11,23 +11,23 @@ etcdv3::AsyncRangeAction::AsyncRangeAction(
: etcdv3::Action(std::move(params)) : etcdv3::Action(std::move(params))
{ {
RangeRequest get_request; RangeRequest get_request;
if (parameters.key.empty()) { if (!parameters.withPrefix) {
get_request.set_key(etcdv3::NUL);
} else {
get_request.set_key(parameters.key); get_request.set_key(parameters.key);
} } else {
get_request.set_limit(parameters.limit);
if(parameters.withPrefix)
{
if (parameters.key.empty()) { if (parameters.key.empty()) {
get_request.set_range_end(detail::string_plus_one(etcdv3::NUL)); // see: WithFromKey in etcdv3/client
get_request.set_key(etcdv3::NUL);
get_request.set_range_end(etcdv3::NUL);
} else { } else {
get_request.set_key(parameters.key);
get_request.set_range_end(detail::string_plus_one(parameters.key)); get_request.set_range_end(detail::string_plus_one(parameters.key));
} }
} }
if(!parameters.range_end.empty()) { if(!parameters.range_end.empty()) {
get_request.set_range_end(parameters.range_end); get_request.set_range_end(parameters.range_end);
} }
get_request.set_limit(parameters.limit);
get_request.set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_NONE); get_request.set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_NONE);
// set keys_only and count_only // set keys_only and count_only

View File

@ -13,15 +13,15 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(
WatchRequest watch_req; WatchRequest watch_req;
WatchCreateRequest watch_create_req; WatchCreateRequest watch_create_req;
watch_create_req.set_key(parameters.key);
watch_create_req.set_prev_kv(true);
watch_create_req.set_start_revision(parameters.revision);
if(parameters.withPrefix) if(!parameters.withPrefix) {
{ watch_create_req.set_key(parameters.key);
} else {
if (parameters.key.empty()) { if (parameters.key.empty()) {
watch_create_req.set_range_end(detail::string_plus_one(etcdv3::NUL)); watch_create_req.set_range_end(etcdv3::NUL);
watch_create_req.set_range_end(etcdv3::NUL);
} else { } else {
watch_create_req.set_range_end(parameters.key);
watch_create_req.set_range_end(detail::string_plus_one(parameters.key)); watch_create_req.set_range_end(detail::string_plus_one(parameters.key));
} }
} }
@ -29,6 +29,9 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(
watch_create_req.set_range_end(parameters.range_end); watch_create_req.set_range_end(parameters.range_end);
} }
watch_create_req.set_prev_kv(true);
watch_create_req.set_start_revision(parameters.revision);
watch_req.mutable_create_request()->CopyFrom(watch_create_req); watch_req.mutable_create_request()->CopyFrom(watch_create_req);
// wait "create" success (the stream becomes ready) // wait "create" success (the stream becomes ready)

View File

@ -25,9 +25,9 @@ char const * etcdv3::LEADER_ACTION = "leader";
char const * etcdv3::OBSERVE_ACTION = "obverse"; char const * etcdv3::OBSERVE_ACTION = "obverse";
char const * etcdv3::RESIGN_ACTION = "resign"; char const * etcdv3::RESIGN_ACTION = "resign";
// see: noPrefixEnd in etcd, however c++ doesn't allows '\0' inside a string, thus we use // see: noPrefixEnd in etcd, however c++ doesn't allows naive '\0' inside
// the UTF-8 char U+0000 (i.e., "\xC0\x80"). // a string, thus we use std::string(1, '\x00') as the constructor.
char const * etcdv3::NUL = "\xC0\x80"; std::string const etcdv3::NUL = std::string(1, '\x00');
char const * etcdv3::KEEPALIVE_CREATE = "keepalive create"; char const * etcdv3::KEEPALIVE_CREATE = "keepalive create";
char const * etcdv3::KEEPALIVE_WRITE = "keepalive write"; char const * etcdv3::KEEPALIVE_WRITE = "keepalive write";

View File

@ -223,7 +223,7 @@ TEST_CASE("using binary keys and values, raw char pointer doesn't work")
CHECK(std::string("42\0foo", 6) != resp.value().as_string()); CHECK(std::string("42\0foo", 6) != resp.value().as_string());
} }
{ {
// should exist // should exist, but different value
etcd::Response resp = etcd.get("/test/key1\0xyz").get(); etcd::Response resp = etcd.get("/test/key1\0xyz").get();
REQUIRE(resp.is_ok()); REQUIRE(resp.is_ok());
CHECK(std::string("42") == resp.value().as_string()); CHECK(std::string("42") == resp.value().as_string());
@ -283,6 +283,11 @@ TEST_CASE("list a directory")
CHECK(0 == etcd.ls("/test/new_dir/key1").get().error_code()); CHECK(0 == etcd.ls("/test/new_dir/key1").get().error_code());
// list with empty prefix
resp = etcd.ls("").get();
CHECK(0 == resp.error_code());
CHECK(0 < resp.keys().size());
CHECK(etcd.rmdir("/test/new_dir", true).get().is_ok()); CHECK(etcd.rmdir("/test/new_dir", true).get().is_ok());
} }
@ -385,6 +390,20 @@ TEST_CASE("delete a directory")
CHECK("etcd-cpp-apiv3: key not found" == resp.error_message()); CHECK("etcd-cpp-apiv3: key not found" == resp.error_message());
} }
TEST_CASE("delete all keys with rmdir(\"\", true)")
{
etcd::Client etcd(etcd_url);
etcd.rmdir("", true).wait();
etcd.set("/test/new_dir/key1", "value1").wait();
etcd.set("/test/new_dir/key2", "value2").wait();
etcd.set("/test/new_dir/key3", "value3").wait();
auto resp = etcd.rmdir("", true).get();
CHECK(resp.is_ok());
CHECK(resp.values().size() == 3);
}
TEST_CASE("delete by range") TEST_CASE("delete by range")
{ {
etcd::Client etcd(etcd_url); etcd::Client etcd(etcd_url);

View File

@ -13,8 +13,6 @@ static int watcher_called = 0;
void printResponse(etcd::Response const & resp) void printResponse(etcd::Response const & resp)
{ {
++watcher_called;
std::cout << "print response called" << std::endl;
if (resp.error_code()) { if (resp.error_code()) {
std::cout << resp.error_code() << ": " << resp.error_message() << std::endl; std::cout << resp.error_code() << ": " << resp.error_message() << std::endl;
} }
@ -24,33 +22,37 @@ void printResponse(etcd::Response const & resp)
std::cout << "Events size: " << resp.events().size() << std::endl; std::cout << "Events size: " << resp.events().size() << std::endl;
for (auto const &ev: resp.events()) { for (auto const &ev: resp.events()) {
if (ev.prev_kv().key().find("/leader") == 0 || ev.kv().key().find("/leader") == 0) {
return;
}
std::cout << "Value change in events: " << static_cast<int>(ev.event_type()) std::cout << "Value change in events: " << static_cast<int>(ev.event_type())
<< ", prev kv = " << ev.prev_kv().key() << " -> " << ev.prev_kv().as_string() << ", prev kv = " << ev.prev_kv().key() << " -> " << ev.prev_kv().as_string()
<< ", kv = " << ev.kv().key() << " -> " << ev.kv().as_string() << ", kv = " << ev.kv().key() << " -> " << ev.kv().as_string()
<< std::endl; << std::endl;
} }
} }
std::cout << "print response called" << std::endl;
++watcher_called;
} }
TEST_CASE("create watcher with cancel") TEST_CASE("create watcher with cancel")
{ {
etcd::SyncClient etcd(etcd_url); etcd::SyncClient etcd(etcd_url);
etcd.rmdir("/test", true); etcd.rmdir("/test", true);
watcher_called = 0; watcher_called = 0;
etcd::Watcher watcher(etcd_url, "/test", printResponse, true); etcd::Watcher watcher(etcd_url, "/test", printResponse, true);
std::this_thread::sleep_for(std::chrono::seconds(3)); std::this_thread::sleep_for(std::chrono::seconds(2));
etcd.set("/test/key", "42"); etcd.set("/test/key", "42");
etcd.set("/test/key", "43"); etcd.set("/test/key", "43");
etcd.rm("/test/key"); etcd.rm("/test/key");
etcd.set("/test/key", "44"); etcd.set("/test/key", "44");
std::this_thread::sleep_for(std::chrono::seconds(3)); std::this_thread::sleep_for(std::chrono::seconds(2));
CHECK(4 == watcher_called); CHECK(4 == watcher_called);
watcher.Cancel(); watcher.Cancel();
etcd.set("/test/key", "50"); etcd.set("/test/key", "50");
etcd.set("/test/key", "51"); etcd.set("/test/key", "51");
std::this_thread::sleep_for(std::chrono::seconds(3)); std::this_thread::sleep_for(std::chrono::seconds(2));
CHECK(4 == watcher_called); CHECK(4 == watcher_called);
etcd.rmdir("/test", true); etcd.rmdir("/test", true);
@ -63,17 +65,17 @@ TEST_CASE("create watcher on ranges with cancel")
watcher_called = 0; watcher_called = 0;
etcd::Watcher watcher(etcd_url, "/test/key1", "/test/key5", printResponse); etcd::Watcher watcher(etcd_url, "/test/key1", "/test/key5", printResponse);
std::this_thread::sleep_for(std::chrono::seconds(3)); std::this_thread::sleep_for(std::chrono::seconds(2));
etcd.set("/test/key1", "42"); etcd.set("/test/key1", "42");
etcd.set("/test/key2", "43"); etcd.set("/test/key2", "43");
etcd.rm("/test/key1"); etcd.rm("/test/key1");
etcd.set("/test/key5", "44"); etcd.set("/test/key5", "44");
std::this_thread::sleep_for(std::chrono::seconds(3)); std::this_thread::sleep_for(std::chrono::seconds(2));
CHECK(3 == watcher_called); CHECK(3 == watcher_called);
watcher.Cancel(); watcher.Cancel();
etcd.set("/test/key3", "50"); etcd.set("/test/key3", "50");
etcd.set("/test/key4", "51"); etcd.set("/test/key4", "51");
std::this_thread::sleep_for(std::chrono::seconds(3)); std::this_thread::sleep_for(std::chrono::seconds(2));
CHECK(3 == watcher_called); CHECK(3 == watcher_called);
etcd.rmdir("/test", true); etcd.rmdir("/test", true);
@ -87,11 +89,11 @@ TEST_CASE("create watcher")
watcher_called = 0; watcher_called = 0;
{ {
etcd::Watcher watcher(etcd_url, "/test", printResponse, true); etcd::Watcher watcher(etcd_url, "/test", printResponse, true);
std::this_thread::sleep_for(std::chrono::seconds(3)); std::this_thread::sleep_for(std::chrono::seconds(2));
etcd.set("/test/key", "42"); etcd.set("/test/key", "42");
std::this_thread::sleep_for(std::chrono::seconds(3)); std::this_thread::sleep_for(std::chrono::seconds(2));
etcd.set("/test/key", "43"); etcd.set("/test/key", "43");
std::this_thread::sleep_for(std::chrono::seconds(3)); std::this_thread::sleep_for(std::chrono::seconds(2));
} }
CHECK(2 == watcher_called); CHECK(2 == watcher_called);
etcd.rmdir("/test", true).error_code(); etcd.rmdir("/test", true).error_code();