Fixes bugs in ls/rmdir/watch for processing range end. (#199)
Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
parent
b12fc293b9
commit
9d3f8cec3d
|
|
@ -759,7 +759,9 @@ std::shared_ptr<etcd::Watcher> watcher;
|
||||||
initialize_watcher(endpoints, prefix, callback, watcher);
|
initialize_watcher(endpoints, prefix, callback, watcher);
|
||||||
```
|
```
|
||||||
|
|
||||||
For a complete runnable example, see also [./tst/RewatchTest.cpp](./tst/RewatchTest.cpp).
|
For a complete runnable example, see also [./tst/RewatchTest.cpp](./tst/RewatchTest.cpp). Note
|
||||||
|
that you shouldn't use the watcher itself inside the `Wait()` callback as the callback will be
|
||||||
|
invoked in a separate **detached** thread where the watcher may have been destroyed.
|
||||||
|
|
||||||
### Requesting for lease
|
### Requesting for lease
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -188,14 +188,18 @@ namespace etcd
|
||||||
* Wait util the task has been stopped, actively or passively, e.g., the watcher
|
* Wait util the task has been stopped, actively or passively, e.g., the watcher
|
||||||
* get cancelled or the server closes the connection.
|
* get cancelled or the server closes the connection.
|
||||||
*
|
*
|
||||||
* Returns true if the watcher is been normally cancalled, otherwise false.
|
* Returns true if the watcher is been normally cancelled, otherwise false.
|
||||||
*/
|
*/
|
||||||
bool Wait();
|
bool Wait();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An async wait, the callback will be called when the task has been stopped.
|
* An async wait, the callback will be called when the task has been stopped.
|
||||||
*
|
*
|
||||||
* The callback parameter would be true if the watch is been normally cancalled.
|
* The callback parameter would be true if the watch is been normally cancelled.
|
||||||
|
*
|
||||||
|
* Note that you shouldn't use the watcher itself inside the `Wait()` callback
|
||||||
|
* as the callback will be invoked in a separate **detached** thread where the
|
||||||
|
* watcher may have been destroyed.
|
||||||
*/
|
*/
|
||||||
void Wait(std::function<void(bool)> callback);
|
void Wait(std::function<void(bool)> callback);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,8 @@
|
||||||
#ifndef __ETCD_ACTION_CONSTANTS_HPP__
|
#ifndef __ETCD_ACTION_CONSTANTS_HPP__
|
||||||
#define __ETCD_ACTION_CONSTANTS_HPP__
|
#define __ETCD_ACTION_CONSTANTS_HPP__
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
|
||||||
namespace etcdv3
|
namespace etcdv3
|
||||||
{
|
{
|
||||||
extern char const * CREATE_ACTION;
|
extern char const * CREATE_ACTION;
|
||||||
|
|
@ -28,7 +30,7 @@ namespace etcdv3
|
||||||
extern char const * OBSERVE_ACTION;
|
extern char const * OBSERVE_ACTION;
|
||||||
extern char const * RESIGN_ACTION;
|
extern char const * RESIGN_ACTION;
|
||||||
|
|
||||||
extern char const * NUL;
|
extern std::string const NUL;
|
||||||
|
|
||||||
extern char const * KEEPALIVE_CREATE;
|
extern char const * KEEPALIVE_CREATE;
|
||||||
extern char const * KEEPALIVE_WRITE;
|
extern char const * KEEPALIVE_WRITE;
|
||||||
|
|
|
||||||
|
|
@ -8,13 +8,15 @@ etcdv3::AsyncDeleteAction::AsyncDeleteAction(
|
||||||
: etcdv3::Action(std::move(params))
|
: etcdv3::Action(std::move(params))
|
||||||
{
|
{
|
||||||
DeleteRangeRequest del_request;
|
DeleteRangeRequest del_request;
|
||||||
|
if (!parameters.withPrefix) {
|
||||||
del_request.set_key(parameters.key);
|
del_request.set_key(parameters.key);
|
||||||
del_request.set_prev_kv(true);
|
|
||||||
if(parameters.withPrefix)
|
|
||||||
{
|
|
||||||
if (parameters.key.empty()) {
|
|
||||||
del_request.set_range_end(detail::string_plus_one(etcdv3::NUL));
|
|
||||||
} else {
|
} else {
|
||||||
|
if (parameters.key.empty()) {
|
||||||
|
// see: WithFromKey in etcdv3/client
|
||||||
|
del_request.set_key(etcdv3::NUL);
|
||||||
|
del_request.set_range_end(etcdv3::NUL);
|
||||||
|
} else {
|
||||||
|
del_request.set_key(parameters.key);
|
||||||
del_request.set_range_end(detail::string_plus_one(parameters.key));
|
del_request.set_range_end(detail::string_plus_one(parameters.key));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -22,6 +24,8 @@ etcdv3::AsyncDeleteAction::AsyncDeleteAction(
|
||||||
del_request.set_range_end(parameters.range_end);
|
del_request.set_range_end(parameters.range_end);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
del_request.set_prev_kv(true);
|
||||||
|
|
||||||
response_reader = parameters.kv_stub->AsyncDeleteRange(&context, del_request, &cq_);
|
response_reader = parameters.kv_stub->AsyncDeleteRange(&context, del_request, &cq_);
|
||||||
response_reader->Finish(&reply, &status, (void*)this);
|
response_reader->Finish(&reply, &status, (void*)this);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -11,23 +11,23 @@ etcdv3::AsyncRangeAction::AsyncRangeAction(
|
||||||
: etcdv3::Action(std::move(params))
|
: etcdv3::Action(std::move(params))
|
||||||
{
|
{
|
||||||
RangeRequest get_request;
|
RangeRequest get_request;
|
||||||
|
if (!parameters.withPrefix) {
|
||||||
|
get_request.set_key(parameters.key);
|
||||||
|
} else {
|
||||||
if (parameters.key.empty()) {
|
if (parameters.key.empty()) {
|
||||||
|
// see: WithFromKey in etcdv3/client
|
||||||
get_request.set_key(etcdv3::NUL);
|
get_request.set_key(etcdv3::NUL);
|
||||||
|
get_request.set_range_end(etcdv3::NUL);
|
||||||
} else {
|
} else {
|
||||||
get_request.set_key(parameters.key);
|
get_request.set_key(parameters.key);
|
||||||
}
|
|
||||||
get_request.set_limit(parameters.limit);
|
|
||||||
if(parameters.withPrefix)
|
|
||||||
{
|
|
||||||
if (parameters.key.empty()) {
|
|
||||||
get_request.set_range_end(detail::string_plus_one(etcdv3::NUL));
|
|
||||||
} else {
|
|
||||||
get_request.set_range_end(detail::string_plus_one(parameters.key));
|
get_request.set_range_end(detail::string_plus_one(parameters.key));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if(!parameters.range_end.empty()) {
|
if(!parameters.range_end.empty()) {
|
||||||
get_request.set_range_end(parameters.range_end);
|
get_request.set_range_end(parameters.range_end);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
get_request.set_limit(parameters.limit);
|
||||||
get_request.set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_NONE);
|
get_request.set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_NONE);
|
||||||
|
|
||||||
// set keys_only and count_only
|
// set keys_only and count_only
|
||||||
|
|
|
||||||
|
|
@ -13,15 +13,15 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(
|
||||||
|
|
||||||
WatchRequest watch_req;
|
WatchRequest watch_req;
|
||||||
WatchCreateRequest watch_create_req;
|
WatchCreateRequest watch_create_req;
|
||||||
watch_create_req.set_key(parameters.key);
|
|
||||||
watch_create_req.set_prev_kv(true);
|
|
||||||
watch_create_req.set_start_revision(parameters.revision);
|
|
||||||
|
|
||||||
if(parameters.withPrefix)
|
if(!parameters.withPrefix) {
|
||||||
{
|
watch_create_req.set_key(parameters.key);
|
||||||
if (parameters.key.empty()) {
|
|
||||||
watch_create_req.set_range_end(detail::string_plus_one(etcdv3::NUL));
|
|
||||||
} else {
|
} else {
|
||||||
|
if (parameters.key.empty()) {
|
||||||
|
watch_create_req.set_range_end(etcdv3::NUL);
|
||||||
|
watch_create_req.set_range_end(etcdv3::NUL);
|
||||||
|
} else {
|
||||||
|
watch_create_req.set_range_end(parameters.key);
|
||||||
watch_create_req.set_range_end(detail::string_plus_one(parameters.key));
|
watch_create_req.set_range_end(detail::string_plus_one(parameters.key));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -29,6 +29,9 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(
|
||||||
watch_create_req.set_range_end(parameters.range_end);
|
watch_create_req.set_range_end(parameters.range_end);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
watch_create_req.set_prev_kv(true);
|
||||||
|
watch_create_req.set_start_revision(parameters.revision);
|
||||||
|
|
||||||
watch_req.mutable_create_request()->CopyFrom(watch_create_req);
|
watch_req.mutable_create_request()->CopyFrom(watch_create_req);
|
||||||
|
|
||||||
// wait "create" success (the stream becomes ready)
|
// wait "create" success (the stream becomes ready)
|
||||||
|
|
|
||||||
|
|
@ -25,9 +25,9 @@ char const * etcdv3::LEADER_ACTION = "leader";
|
||||||
char const * etcdv3::OBSERVE_ACTION = "obverse";
|
char const * etcdv3::OBSERVE_ACTION = "obverse";
|
||||||
char const * etcdv3::RESIGN_ACTION = "resign";
|
char const * etcdv3::RESIGN_ACTION = "resign";
|
||||||
|
|
||||||
// see: noPrefixEnd in etcd, however c++ doesn't allows '\0' inside a string, thus we use
|
// see: noPrefixEnd in etcd, however c++ doesn't allows naive '\0' inside
|
||||||
// the UTF-8 char U+0000 (i.e., "\xC0\x80").
|
// a string, thus we use std::string(1, '\x00') as the constructor.
|
||||||
char const * etcdv3::NUL = "\xC0\x80";
|
std::string const etcdv3::NUL = std::string(1, '\x00');
|
||||||
|
|
||||||
char const * etcdv3::KEEPALIVE_CREATE = "keepalive create";
|
char const * etcdv3::KEEPALIVE_CREATE = "keepalive create";
|
||||||
char const * etcdv3::KEEPALIVE_WRITE = "keepalive write";
|
char const * etcdv3::KEEPALIVE_WRITE = "keepalive write";
|
||||||
|
|
|
||||||
|
|
@ -223,7 +223,7 @@ TEST_CASE("using binary keys and values, raw char pointer doesn't work")
|
||||||
CHECK(std::string("42\0foo", 6) != resp.value().as_string());
|
CHECK(std::string("42\0foo", 6) != resp.value().as_string());
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
// should exist
|
// should exist, but different value
|
||||||
etcd::Response resp = etcd.get("/test/key1\0xyz").get();
|
etcd::Response resp = etcd.get("/test/key1\0xyz").get();
|
||||||
REQUIRE(resp.is_ok());
|
REQUIRE(resp.is_ok());
|
||||||
CHECK(std::string("42") == resp.value().as_string());
|
CHECK(std::string("42") == resp.value().as_string());
|
||||||
|
|
@ -283,6 +283,11 @@ TEST_CASE("list a directory")
|
||||||
|
|
||||||
CHECK(0 == etcd.ls("/test/new_dir/key1").get().error_code());
|
CHECK(0 == etcd.ls("/test/new_dir/key1").get().error_code());
|
||||||
|
|
||||||
|
// list with empty prefix
|
||||||
|
resp = etcd.ls("").get();
|
||||||
|
CHECK(0 == resp.error_code());
|
||||||
|
CHECK(0 < resp.keys().size());
|
||||||
|
|
||||||
CHECK(etcd.rmdir("/test/new_dir", true).get().is_ok());
|
CHECK(etcd.rmdir("/test/new_dir", true).get().is_ok());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -385,6 +390,20 @@ TEST_CASE("delete a directory")
|
||||||
CHECK("etcd-cpp-apiv3: key not found" == resp.error_message());
|
CHECK("etcd-cpp-apiv3: key not found" == resp.error_message());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_CASE("delete all keys with rmdir(\"\", true)")
|
||||||
|
{
|
||||||
|
etcd::Client etcd(etcd_url);
|
||||||
|
etcd.rmdir("", true).wait();
|
||||||
|
|
||||||
|
etcd.set("/test/new_dir/key1", "value1").wait();
|
||||||
|
etcd.set("/test/new_dir/key2", "value2").wait();
|
||||||
|
etcd.set("/test/new_dir/key3", "value3").wait();
|
||||||
|
|
||||||
|
auto resp = etcd.rmdir("", true).get();
|
||||||
|
CHECK(resp.is_ok());
|
||||||
|
CHECK(resp.values().size() == 3);
|
||||||
|
}
|
||||||
|
|
||||||
TEST_CASE("delete by range")
|
TEST_CASE("delete by range")
|
||||||
{
|
{
|
||||||
etcd::Client etcd(etcd_url);
|
etcd::Client etcd(etcd_url);
|
||||||
|
|
|
||||||
|
|
@ -13,8 +13,6 @@ static int watcher_called = 0;
|
||||||
|
|
||||||
void printResponse(etcd::Response const & resp)
|
void printResponse(etcd::Response const & resp)
|
||||||
{
|
{
|
||||||
++watcher_called;
|
|
||||||
std::cout << "print response called" << std::endl;
|
|
||||||
if (resp.error_code()) {
|
if (resp.error_code()) {
|
||||||
std::cout << resp.error_code() << ": " << resp.error_message() << std::endl;
|
std::cout << resp.error_code() << ": " << resp.error_message() << std::endl;
|
||||||
}
|
}
|
||||||
|
|
@ -24,33 +22,37 @@ void printResponse(etcd::Response const & resp)
|
||||||
|
|
||||||
std::cout << "Events size: " << resp.events().size() << std::endl;
|
std::cout << "Events size: " << resp.events().size() << std::endl;
|
||||||
for (auto const &ev: resp.events()) {
|
for (auto const &ev: resp.events()) {
|
||||||
|
if (ev.prev_kv().key().find("/leader") == 0 || ev.kv().key().find("/leader") == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
std::cout << "Value change in events: " << static_cast<int>(ev.event_type())
|
std::cout << "Value change in events: " << static_cast<int>(ev.event_type())
|
||||||
<< ", prev kv = " << ev.prev_kv().key() << " -> " << ev.prev_kv().as_string()
|
<< ", prev kv = " << ev.prev_kv().key() << " -> " << ev.prev_kv().as_string()
|
||||||
<< ", kv = " << ev.kv().key() << " -> " << ev.kv().as_string()
|
<< ", kv = " << ev.kv().key() << " -> " << ev.kv().as_string()
|
||||||
<< std::endl;
|
<< std::endl;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
std::cout << "print response called" << std::endl;
|
||||||
|
++watcher_called;
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_CASE("create watcher with cancel")
|
TEST_CASE("create watcher with cancel")
|
||||||
{
|
{
|
||||||
|
|
||||||
etcd::SyncClient etcd(etcd_url);
|
etcd::SyncClient etcd(etcd_url);
|
||||||
etcd.rmdir("/test", true);
|
etcd.rmdir("/test", true);
|
||||||
|
|
||||||
watcher_called = 0;
|
watcher_called = 0;
|
||||||
etcd::Watcher watcher(etcd_url, "/test", printResponse, true);
|
etcd::Watcher watcher(etcd_url, "/test", printResponse, true);
|
||||||
std::this_thread::sleep_for(std::chrono::seconds(3));
|
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||||
etcd.set("/test/key", "42");
|
etcd.set("/test/key", "42");
|
||||||
etcd.set("/test/key", "43");
|
etcd.set("/test/key", "43");
|
||||||
etcd.rm("/test/key");
|
etcd.rm("/test/key");
|
||||||
etcd.set("/test/key", "44");
|
etcd.set("/test/key", "44");
|
||||||
std::this_thread::sleep_for(std::chrono::seconds(3));
|
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||||
CHECK(4 == watcher_called);
|
CHECK(4 == watcher_called);
|
||||||
watcher.Cancel();
|
watcher.Cancel();
|
||||||
etcd.set("/test/key", "50");
|
etcd.set("/test/key", "50");
|
||||||
etcd.set("/test/key", "51");
|
etcd.set("/test/key", "51");
|
||||||
std::this_thread::sleep_for(std::chrono::seconds(3));
|
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||||
CHECK(4 == watcher_called);
|
CHECK(4 == watcher_called);
|
||||||
|
|
||||||
etcd.rmdir("/test", true);
|
etcd.rmdir("/test", true);
|
||||||
|
|
@ -63,17 +65,17 @@ TEST_CASE("create watcher on ranges with cancel")
|
||||||
|
|
||||||
watcher_called = 0;
|
watcher_called = 0;
|
||||||
etcd::Watcher watcher(etcd_url, "/test/key1", "/test/key5", printResponse);
|
etcd::Watcher watcher(etcd_url, "/test/key1", "/test/key5", printResponse);
|
||||||
std::this_thread::sleep_for(std::chrono::seconds(3));
|
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||||
etcd.set("/test/key1", "42");
|
etcd.set("/test/key1", "42");
|
||||||
etcd.set("/test/key2", "43");
|
etcd.set("/test/key2", "43");
|
||||||
etcd.rm("/test/key1");
|
etcd.rm("/test/key1");
|
||||||
etcd.set("/test/key5", "44");
|
etcd.set("/test/key5", "44");
|
||||||
std::this_thread::sleep_for(std::chrono::seconds(3));
|
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||||
CHECK(3 == watcher_called);
|
CHECK(3 == watcher_called);
|
||||||
watcher.Cancel();
|
watcher.Cancel();
|
||||||
etcd.set("/test/key3", "50");
|
etcd.set("/test/key3", "50");
|
||||||
etcd.set("/test/key4", "51");
|
etcd.set("/test/key4", "51");
|
||||||
std::this_thread::sleep_for(std::chrono::seconds(3));
|
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||||
CHECK(3 == watcher_called);
|
CHECK(3 == watcher_called);
|
||||||
|
|
||||||
etcd.rmdir("/test", true);
|
etcd.rmdir("/test", true);
|
||||||
|
|
@ -87,11 +89,11 @@ TEST_CASE("create watcher")
|
||||||
watcher_called = 0;
|
watcher_called = 0;
|
||||||
{
|
{
|
||||||
etcd::Watcher watcher(etcd_url, "/test", printResponse, true);
|
etcd::Watcher watcher(etcd_url, "/test", printResponse, true);
|
||||||
std::this_thread::sleep_for(std::chrono::seconds(3));
|
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||||
etcd.set("/test/key", "42");
|
etcd.set("/test/key", "42");
|
||||||
std::this_thread::sleep_for(std::chrono::seconds(3));
|
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||||
etcd.set("/test/key", "43");
|
etcd.set("/test/key", "43");
|
||||||
std::this_thread::sleep_for(std::chrono::seconds(3));
|
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||||
}
|
}
|
||||||
CHECK(2 == watcher_called);
|
CHECK(2 == watcher_called);
|
||||||
etcd.rmdir("/test", true).error_code();
|
etcd.rmdir("/test", true).error_code();
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue