Fixes the wrong key setup in watcher (#201)

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2023-03-10 15:05:28 +08:00 committed by GitHub
parent 639c7e9f24
commit cad42fdf07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 65 additions and 32 deletions

View File

@ -18,10 +18,10 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(
watch_create_req.set_key(parameters.key);
} else {
if (parameters.key.empty()) {
watch_create_req.set_range_end(etcdv3::NUL);
watch_create_req.set_key(etcdv3::NUL);
watch_create_req.set_range_end(etcdv3::NUL);
} else {
watch_create_req.set_range_end(parameters.key);
watch_create_req.set_key(parameters.key);
watch_create_req.set_range_end(detail::string_plus_one(parameters.key));
}
}
@ -76,8 +76,6 @@ void etcdv3::AsyncWatchAction::waitForResponse()
if (got_tag == (void *)etcdv3::WATCH_FINISH) {
// shutdown
cq_.Shutdown();
// cancel on-the-fly calls
context.TryCancel();
break;
}
if(got_tag == (void*)this) // read tag
@ -159,8 +157,6 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response
if (got_tag == (void *)etcdv3::WATCH_FINISH) {
// shutdown
cq_.Shutdown();
// cancel on-the-fly calls
context.TryCancel();
break;
}
if(got_tag == (void*)this) // read tag

View File

@ -35,6 +35,61 @@ void printResponse(etcd::Response const & resp)
++watcher_called;
}
TEST_CASE("create watcher")
{
etcd::SyncClient etcd(etcd_url);
etcd.rmdir("/test", true);
watcher_called = 0;
{
etcd::Watcher watcher(etcd_url, "/test", printResponse, true);
std::this_thread::sleep_for(std::chrono::seconds(5));
etcd.set("/test/key", "42");
std::this_thread::sleep_for(std::chrono::seconds(5));
etcd.set("/test/key", "43");
std::this_thread::sleep_for(std::chrono::seconds(5));
}
CHECK(2 == watcher_called);
etcd.rmdir("/test", true);
}
TEST_CASE("watch with correct prefix")
{
etcd::SyncClient etcd(etcd_url);
etcd.rmdir("/test", true);
watcher_called = 0;
etcd::Watcher watcher(etcd_url, "/test/key_prefix", printResponse, true);
{
etcd.set("/test/key1", "42");
std::this_thread::sleep_for(std::chrono::seconds(5));
CHECK(0 == watcher_called);
}
{
etcd.set("/test/key_prefix", "42");
std::this_thread::sleep_for(std::chrono::seconds(5));
CHECK(1 == watcher_called);
}
{
etcd.set("/test/key_prefix1", "42");
std::this_thread::sleep_for(std::chrono::seconds(5));
CHECK(2 == watcher_called);
}
{
etcd.set("/test/key_prefiy", "42");
std::this_thread::sleep_for(std::chrono::seconds(5));
CHECK(2 == watcher_called);
}
{
etcd.set("/test/key1", "42");
std::this_thread::sleep_for(std::chrono::seconds(5));
CHECK(2 == watcher_called);
}
etcd.rmdir("/test", true);
}
TEST_CASE("create watcher with cancel")
{
etcd::SyncClient etcd(etcd_url);
@ -42,17 +97,17 @@ TEST_CASE("create watcher with cancel")
watcher_called = 0;
etcd::Watcher watcher(etcd_url, "/test", printResponse, true);
std::this_thread::sleep_for(std::chrono::seconds(2));
std::this_thread::sleep_for(std::chrono::seconds(5));
etcd.set("/test/key", "42");
etcd.set("/test/key", "43");
etcd.rm("/test/key");
etcd.set("/test/key", "44");
std::this_thread::sleep_for(std::chrono::seconds(2));
std::this_thread::sleep_for(std::chrono::seconds(5));
CHECK(4 == watcher_called);
watcher.Cancel();
etcd.set("/test/key", "50");
etcd.set("/test/key", "51");
std::this_thread::sleep_for(std::chrono::seconds(2));
std::this_thread::sleep_for(std::chrono::seconds(5));
CHECK(4 == watcher_called);
etcd.rmdir("/test", true);
@ -65,50 +120,32 @@ TEST_CASE("create watcher on ranges with cancel")
watcher_called = 0;
etcd::Watcher watcher(etcd_url, "/test/key1", "/test/key5", printResponse);
std::this_thread::sleep_for(std::chrono::seconds(2));
std::this_thread::sleep_for(std::chrono::seconds(5));
etcd.set("/test/key1", "42");
etcd.set("/test/key2", "43");
etcd.rm("/test/key1");
etcd.set("/test/key5", "44");
std::this_thread::sleep_for(std::chrono::seconds(2));
std::this_thread::sleep_for(std::chrono::seconds(5));
CHECK(3 == watcher_called);
watcher.Cancel();
etcd.set("/test/key3", "50");
etcd.set("/test/key4", "51");
std::this_thread::sleep_for(std::chrono::seconds(2));
std::this_thread::sleep_for(std::chrono::seconds(5));
CHECK(3 == watcher_called);
etcd.rmdir("/test", true);
}
TEST_CASE("create watcher")
{
etcd::SyncClient etcd(etcd_url);
etcd.rmdir("/test", true);
watcher_called = 0;
{
etcd::Watcher watcher(etcd_url, "/test", printResponse, true);
std::this_thread::sleep_for(std::chrono::seconds(2));
etcd.set("/test/key", "42");
std::this_thread::sleep_for(std::chrono::seconds(2));
etcd.set("/test/key", "43");
std::this_thread::sleep_for(std::chrono::seconds(2));
}
CHECK(2 == watcher_called);
etcd.rmdir("/test", true).error_code();
}
TEST_CASE("watch should exit normally")
{
// cancal immediately after start watch.
// cancel immediately after start watch.
etcd::Watcher watcher(etcd_url, "/test", printResponse, true);
watcher.Cancel();
}
TEST_CASE("watch should can be cancelled repeatedly")
{
// cancal immediately after start watch.
// cancel immediately after start watch.
etcd::Watcher watcher(etcd_url, "/test", printResponse, true);
std::vector<std::thread> threads(10);
for (size_t i = 0; i < 10; ++i) {