From 4f25a5c5de49abe5a1dbd2216c3284bc797054a4 Mon Sep 17 00:00:00 2001 From: Tao He Date: Tue, 7 Mar 2023 21:26:33 +0800 Subject: [PATCH] Fixes bugs in ls/rmdir/watch for processing range end. Signed-off-by: Tao He --- README.md | 12 +++++++----- etcd/Watcher.hpp | 8 ++++++-- etcd/v3/action_constants.hpp | 4 +++- src/v3/AsyncDeleteAction.cpp | 14 +++++++++----- src/v3/AsyncRangeAction.cpp | 16 ++++++++-------- src/v3/AsyncWatchAction.cpp | 15 +++++++++------ src/v3/action_constants.cpp | 6 +++--- tst/EtcdTest.cpp | 21 ++++++++++++++++++++- tst/WatcherTest.cpp | 26 ++++++++++++++------------ 9 files changed, 79 insertions(+), 43 deletions(-) diff --git a/README.md b/README.md index e207c26..19410a2 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ i.e., `ETCDCTL_API=3`. libgrpc++-dev \ libprotobuf-dev \ protobuf-compiler-grpc - + + On MacOS, above requirements related to protobuf and gRPC can be installed as: brew install grpc protobuf @@ -714,7 +714,7 @@ Here is an example how users can make a watcher re-connect to server after disco ```c++ // wait the client ready void wait_for_connection(etcd::Client &client) { - // wait until the client connects to etcd server + // wait until the client connects to etcd server while (!client.head().get().is_ok()) { sleep(1); } @@ -750,16 +750,18 @@ void initialize_watcher(const std::string& endpoints, The functionalities can be used as ```c++ -std::string endpoints = "http://127.0.0.1:2379"; +std::string endpoints = "http://127.0.0.1:2379"; std::function callback = printResponse; -const std::string prefix = "/test/key"; +const std::string prefix = "/test/key"; // the watcher initialized in this way will auto re-connect to etcd std::shared_ptr watcher; initialize_watcher(endpoints, prefix, callback, watcher); ``` -For a complete runnable example, see also [./tst/RewatchTest.cpp](./tst/RewatchTest.cpp). +For a complete runnable example, see also [./tst/RewatchTest.cpp](./tst/RewatchTest.cpp). Note +that you shouldn't use the watcher itself inside the `Wait()` callback as the callback will be +invoked in a separate **detached** thread where the watcher may have been destroyed. ### Requesting for lease diff --git a/etcd/Watcher.hpp b/etcd/Watcher.hpp index 370dee4..2f66099 100644 --- a/etcd/Watcher.hpp +++ b/etcd/Watcher.hpp @@ -188,14 +188,18 @@ namespace etcd * Wait util the task has been stopped, actively or passively, e.g., the watcher * get cancelled or the server closes the connection. * - * Returns true if the watcher is been normally cancalled, otherwise false. + * Returns true if the watcher is been normally cancelled, otherwise false. */ bool Wait(); /** * An async wait, the callback will be called when the task has been stopped. * - * The callback parameter would be true if the watch is been normally cancalled. + * The callback parameter would be true if the watch is been normally cancelled. + * + * Note that you shouldn't use the watcher itself inside the `Wait()` callback + * as the callback will be invoked in a separate **detached** thread where the + * watcher may have been destroyed. */ void Wait(std::function callback); diff --git a/etcd/v3/action_constants.hpp b/etcd/v3/action_constants.hpp index a5dcd19..7854945 100644 --- a/etcd/v3/action_constants.hpp +++ b/etcd/v3/action_constants.hpp @@ -1,6 +1,8 @@ #ifndef __ETCD_ACTION_CONSTANTS_HPP__ #define __ETCD_ACTION_CONSTANTS_HPP__ +#include + namespace etcdv3 { extern char const * CREATE_ACTION; @@ -28,7 +30,7 @@ namespace etcdv3 extern char const * OBSERVE_ACTION; extern char const * RESIGN_ACTION; - extern char const * NUL; + extern std::string const NUL; extern char const * KEEPALIVE_CREATE; extern char const * KEEPALIVE_WRITE; diff --git a/src/v3/AsyncDeleteAction.cpp b/src/v3/AsyncDeleteAction.cpp index a8b004c..7fad717 100644 --- a/src/v3/AsyncDeleteAction.cpp +++ b/src/v3/AsyncDeleteAction.cpp @@ -8,13 +8,15 @@ etcdv3::AsyncDeleteAction::AsyncDeleteAction( : etcdv3::Action(std::move(params)) { DeleteRangeRequest del_request; - del_request.set_key(parameters.key); - del_request.set_prev_kv(true); - if(parameters.withPrefix) - { + if (!parameters.withPrefix) { + del_request.set_key(parameters.key); + } else { if (parameters.key.empty()) { - del_request.set_range_end(detail::string_plus_one(etcdv3::NUL)); + // see: WithFromKey in etcdv3/client + del_request.set_key(etcdv3::NUL); + del_request.set_range_end(etcdv3::NUL); } else { + del_request.set_key(parameters.key); del_request.set_range_end(detail::string_plus_one(parameters.key)); } } @@ -22,6 +24,8 @@ etcdv3::AsyncDeleteAction::AsyncDeleteAction( del_request.set_range_end(parameters.range_end); } + del_request.set_prev_kv(true); + response_reader = parameters.kv_stub->AsyncDeleteRange(&context, del_request, &cq_); response_reader->Finish(&reply, &status, (void*)this); } diff --git a/src/v3/AsyncRangeAction.cpp b/src/v3/AsyncRangeAction.cpp index 9b64c8f..d81d592 100644 --- a/src/v3/AsyncRangeAction.cpp +++ b/src/v3/AsyncRangeAction.cpp @@ -11,23 +11,23 @@ etcdv3::AsyncRangeAction::AsyncRangeAction( : etcdv3::Action(std::move(params)) { RangeRequest get_request; - if (parameters.key.empty()) { - get_request.set_key(etcdv3::NUL); - } else { + if (!parameters.withPrefix) { get_request.set_key(parameters.key); - } - get_request.set_limit(parameters.limit); - if(parameters.withPrefix) - { + } else { if (parameters.key.empty()) { - get_request.set_range_end(detail::string_plus_one(etcdv3::NUL)); + // see: WithFromKey in etcdv3/client + get_request.set_key(etcdv3::NUL); + get_request.set_range_end(etcdv3::NUL); } else { + get_request.set_key(parameters.key); get_request.set_range_end(detail::string_plus_one(parameters.key)); } } if(!parameters.range_end.empty()) { get_request.set_range_end(parameters.range_end); } + + get_request.set_limit(parameters.limit); get_request.set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_NONE); // set keys_only and count_only diff --git a/src/v3/AsyncWatchAction.cpp b/src/v3/AsyncWatchAction.cpp index 06e6ea2..3c6e062 100644 --- a/src/v3/AsyncWatchAction.cpp +++ b/src/v3/AsyncWatchAction.cpp @@ -13,15 +13,15 @@ etcdv3::AsyncWatchAction::AsyncWatchAction( WatchRequest watch_req; WatchCreateRequest watch_create_req; - watch_create_req.set_key(parameters.key); - watch_create_req.set_prev_kv(true); - watch_create_req.set_start_revision(parameters.revision); - if(parameters.withPrefix) - { + if(!parameters.withPrefix) { + watch_create_req.set_key(parameters.key); + } else { if (parameters.key.empty()) { - watch_create_req.set_range_end(detail::string_plus_one(etcdv3::NUL)); + watch_create_req.set_range_end(etcdv3::NUL); + watch_create_req.set_range_end(etcdv3::NUL); } else { + watch_create_req.set_range_end(parameters.key); watch_create_req.set_range_end(detail::string_plus_one(parameters.key)); } } @@ -29,6 +29,9 @@ etcdv3::AsyncWatchAction::AsyncWatchAction( watch_create_req.set_range_end(parameters.range_end); } + watch_create_req.set_prev_kv(true); + watch_create_req.set_start_revision(parameters.revision); + watch_req.mutable_create_request()->CopyFrom(watch_create_req); // wait "create" success (the stream becomes ready) diff --git a/src/v3/action_constants.cpp b/src/v3/action_constants.cpp index ff68cc4..043a536 100644 --- a/src/v3/action_constants.cpp +++ b/src/v3/action_constants.cpp @@ -25,9 +25,9 @@ char const * etcdv3::LEADER_ACTION = "leader"; char const * etcdv3::OBSERVE_ACTION = "obverse"; char const * etcdv3::RESIGN_ACTION = "resign"; -// see: noPrefixEnd in etcd, however c++ doesn't allows '\0' inside a string, thus we use -// the UTF-8 char U+0000 (i.e., "\xC0\x80"). -char const * etcdv3::NUL = "\xC0\x80"; +// see: noPrefixEnd in etcd, however c++ doesn't allows naive '\0' inside +// a string, thus we use std::string(1, '\x00') as the constructor. +std::string const etcdv3::NUL = std::string(1, '\x00'); char const * etcdv3::KEEPALIVE_CREATE = "keepalive create"; char const * etcdv3::KEEPALIVE_WRITE = "keepalive write"; diff --git a/tst/EtcdTest.cpp b/tst/EtcdTest.cpp index ec4fbd6..52140a7 100644 --- a/tst/EtcdTest.cpp +++ b/tst/EtcdTest.cpp @@ -223,7 +223,7 @@ TEST_CASE("using binary keys and values, raw char pointer doesn't work") CHECK(std::string("42\0foo", 6) != resp.value().as_string()); } { - // should exist + // should exist, but different value etcd::Response resp = etcd.get("/test/key1\0xyz").get(); REQUIRE(resp.is_ok()); CHECK(std::string("42") == resp.value().as_string()); @@ -283,6 +283,11 @@ TEST_CASE("list a directory") CHECK(0 == etcd.ls("/test/new_dir/key1").get().error_code()); + // list with empty prefix + resp = etcd.ls("").get(); + CHECK(0 == resp.error_code()); + CHECK(0 < resp.keys().size()); + CHECK(etcd.rmdir("/test/new_dir", true).get().is_ok()); } @@ -385,6 +390,20 @@ TEST_CASE("delete a directory") CHECK("etcd-cpp-apiv3: key not found" == resp.error_message()); } +TEST_CASE("delete all keys with rmdir(\"\", true)") +{ + etcd::Client etcd(etcd_url); + etcd.rmdir("", true).wait(); + + etcd.set("/test/new_dir/key1", "value1").wait(); + etcd.set("/test/new_dir/key2", "value2").wait(); + etcd.set("/test/new_dir/key3", "value3").wait(); + + auto resp = etcd.rmdir("", true).get(); + CHECK(resp.is_ok()); + CHECK(resp.values().size() == 3); +} + TEST_CASE("delete by range") { etcd::Client etcd(etcd_url); diff --git a/tst/WatcherTest.cpp b/tst/WatcherTest.cpp index 0096303..dd20959 100644 --- a/tst/WatcherTest.cpp +++ b/tst/WatcherTest.cpp @@ -13,8 +13,6 @@ static int watcher_called = 0; void printResponse(etcd::Response const & resp) { - ++watcher_called; - std::cout << "print response called" << std::endl; if (resp.error_code()) { std::cout << resp.error_code() << ": " << resp.error_message() << std::endl; } @@ -24,33 +22,37 @@ void printResponse(etcd::Response const & resp) std::cout << "Events size: " << resp.events().size() << std::endl; for (auto const &ev: resp.events()) { + if (ev.prev_kv().key().find("/leader") == 0 || ev.kv().key().find("/leader") == 0) { + return; + } std::cout << "Value change in events: " << static_cast(ev.event_type()) << ", prev kv = " << ev.prev_kv().key() << " -> " << ev.prev_kv().as_string() << ", kv = " << ev.kv().key() << " -> " << ev.kv().as_string() << std::endl; } } + std::cout << "print response called" << std::endl; + ++watcher_called; } TEST_CASE("create watcher with cancel") { - etcd::SyncClient etcd(etcd_url); etcd.rmdir("/test", true); watcher_called = 0; etcd::Watcher watcher(etcd_url, "/test", printResponse, true); - std::this_thread::sleep_for(std::chrono::seconds(3)); + std::this_thread::sleep_for(std::chrono::seconds(2)); etcd.set("/test/key", "42"); etcd.set("/test/key", "43"); etcd.rm("/test/key"); etcd.set("/test/key", "44"); - std::this_thread::sleep_for(std::chrono::seconds(3)); + std::this_thread::sleep_for(std::chrono::seconds(2)); CHECK(4 == watcher_called); watcher.Cancel(); etcd.set("/test/key", "50"); etcd.set("/test/key", "51"); - std::this_thread::sleep_for(std::chrono::seconds(3)); + std::this_thread::sleep_for(std::chrono::seconds(2)); CHECK(4 == watcher_called); etcd.rmdir("/test", true); @@ -63,17 +65,17 @@ TEST_CASE("create watcher on ranges with cancel") watcher_called = 0; etcd::Watcher watcher(etcd_url, "/test/key1", "/test/key5", printResponse); - std::this_thread::sleep_for(std::chrono::seconds(3)); + std::this_thread::sleep_for(std::chrono::seconds(2)); etcd.set("/test/key1", "42"); etcd.set("/test/key2", "43"); etcd.rm("/test/key1"); etcd.set("/test/key5", "44"); - std::this_thread::sleep_for(std::chrono::seconds(3)); + std::this_thread::sleep_for(std::chrono::seconds(2)); CHECK(3 == watcher_called); watcher.Cancel(); etcd.set("/test/key3", "50"); etcd.set("/test/key4", "51"); - std::this_thread::sleep_for(std::chrono::seconds(3)); + std::this_thread::sleep_for(std::chrono::seconds(2)); CHECK(3 == watcher_called); etcd.rmdir("/test", true); @@ -87,11 +89,11 @@ TEST_CASE("create watcher") watcher_called = 0; { etcd::Watcher watcher(etcd_url, "/test", printResponse, true); - std::this_thread::sleep_for(std::chrono::seconds(3)); + std::this_thread::sleep_for(std::chrono::seconds(2)); etcd.set("/test/key", "42"); - std::this_thread::sleep_for(std::chrono::seconds(3)); + std::this_thread::sleep_for(std::chrono::seconds(2)); etcd.set("/test/key", "43"); - std::this_thread::sleep_for(std::chrono::seconds(3)); + std::this_thread::sleep_for(std::chrono::seconds(2)); } CHECK(2 == watcher_called); etcd.rmdir("/test", true).error_code();