Auto and watch functionalities on SyncClient.

Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
This commit is contained in:
Tao He 2020-10-06 11:52:15 +08:00 committed by Tao He
parent 0fb4f2887d
commit eb284103e0
4 changed files with 78 additions and 57 deletions

View File

@ -33,7 +33,7 @@ namespace etcd
/**
* Constructs an etcd client object.
*
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:4001",
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379",
* or multiple url, seperated by ',' or ';'.
* @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds.
*/
@ -43,7 +43,7 @@ namespace etcd
/**
* Constructs an etcd client object.
*
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:4001",
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379",
* or multiple url, seperated by ',' or ';'.
* @param username username of etcd auth
* @param password password of etcd auth

View File

@ -18,9 +18,27 @@ namespace etcd
public:
/**
* Constructs an etcd sync client object.
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:4001"
*
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379",
* or multiple url, seperated by ',' or ';'.
* @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds.
*/
SyncClient(std::string const & etcd_url);
SyncClient(std::string const & etcd_url,
std::string const & load_balancer = "round_robin");
/**
* Constructs an etcd sync client object.
*
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379",
* or multiple url, seperated by ',' or ';'.
* @param username username of etcd auth
* @param password password of etcd auth
* @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds.
*/
SyncClient(std::string const & etcd_url,
std::string const & username,
std::string const & password,
std::string const & load_balancer = "round_robin");
Response get(std::string const & key);
Response set(std::string const & key, std::string const & value, int ttl = 0);
@ -49,7 +67,7 @@ namespace etcd
* @param key is the value or directory to be watched
* @param recursive if true watch a whole subtree
*/
// Response watch(std::string const & key, bool recursive = false);
Response watch(std::string const & key, bool recursive = false);
/**
* Watches for changes of a key or a subtree from a specific index. The index value can be in the "past".
@ -57,8 +75,7 @@ namespace etcd
* @param fromIndex the first index we are interested in
* @param recursive if true watch a whole subtree
*/
// Response watch(std::string const & key, int fromIndex, bool recursive = false);
Response watch(std::string const & key, int fromIndex, bool recursive = false);
protected:
Client client;

View File

@ -10,8 +10,16 @@
return etcd::Response(500, ex.what()); \
}
etcd::SyncClient::SyncClient(std::string const & address)
: client(address)
etcd::SyncClient::SyncClient(std::string const & address, std::string const & load_balancer)
: client(address, load_balancer)
{
}
etcd::SyncClient::SyncClient(std::string const & address,
std::string const & username,
std::string const & password,
std::string const & load_balancer)
: client(address, username, password, load_balancer)
{
}
@ -101,21 +109,12 @@ etcd::Response etcd::SyncClient::leasegrant(int ttl)
CHECK_EXCEPTIONS(client.leasegrant(ttl).get());
}
// etcd::Response etcd::SyncClient::watch(std::string const & key, bool recursive)
// {
// web::http::uri_builder uri("/v2/keys" + key);
// uri.append_query("wait=true");
// if (recursive)
// uri.append_query("recursive=true");
// return send_get_request(uri);
// }
etcd::Response etcd::SyncClient::watch(std::string const & key, bool recursive)
{
CHECK_EXCEPTIONS(client.watch(key, recursive).get());
}
// etcd::Response etcd::SyncClient::watch(std::string const & key, int fromIndex, bool recursive)
// {
// web::http::uri_builder uri("/v2/keys" + key);
// uri.append_query("wait=true");
// uri.append_query("waitIndex", fromIndex);
// if (recursive)
// uri.append_query("recursive=true");
// return send_get_request(uri);
// }
etcd::Response etcd::SyncClient::watch(std::string const & key, int fromIndex, bool recursive)
{
CHECK_EXCEPTIONS(client.watch(key, fromIndex, recursive).get());
}

View File

@ -1,5 +1,6 @@
#define CATCH_CONFIG_MAIN
#include <catch.hpp>
#include <thread>
#include "etcd/SyncClient.hpp"
@ -109,68 +110,72 @@ TEST_CASE("sync operations")
TEST_CASE("wait for a value change")
{
etcd::Client etcd(etcd_uri);
etcd.set("/test/key1", "42").wait();
etcd::SyncClient etcd(etcd_uri);
etcd.set("/test/key1", "42");
pplx::task<etcd::Response> res = etcd.watch("/test/key1");
CHECK(!res.is_done());
std::thread watch_thrd([&]() {
etcd::Response res = etcd.watch("/test/key1");
REQUIRE("set" == res.action());
CHECK("43" == res.value().as_string());
});
etcd.set("/test/key1", "43").get();
sleep(1);
etcd.set("/test/key1", "43");
watch_thrd.join();
REQUIRE(res.is_done());
REQUIRE("set" == res.get().action());
CHECK("43" == res.get().value().as_string());
REQUIRE(0 == etcd.rmdir("/test", true).get().error_code());
REQUIRE(0 == etcd.rmdir("/test", true).error_code());
}
TEST_CASE("wait for a directory change")
{
etcd::Client etcd(etcd_uri);
etcd::SyncClient etcd(etcd_uri);
pplx::task<etcd::Response> res = etcd.watch("/test", true);
std::thread watch_thrd1([&]() {
etcd::Response res = etcd.watch("/test", true);
CHECK("create" == res.action());
CHECK("44" == res.value().as_string());
});
etcd.add("/test/key4", "44").wait();
sleep(1);
REQUIRE(res.is_done());
CHECK("create" == res.get().action());
CHECK("44" == res.get().value().as_string());
etcd.add("/test/key4", "44");
watch_thrd1.join();
pplx::task<etcd::Response> res2 = etcd.watch("/test", true);
std::thread watch_thrd2([&]() {
etcd::Response res2 = etcd.watch("/test", true);
CHECK("set" == res2.action());
CHECK("45" == res2.value().as_string());
});
etcd.set("/test/key4", "45").wait();
sleep(1);
REQUIRE(res2.is_done());
CHECK("set" == res2.get().action());
CHECK("45" == res2.get().value().as_string());
etcd.set("/test/key4", "45");
watch_thrd2.join();
REQUIRE(0 == etcd.rmdir("/test", true).get().error_code());
REQUIRE(0 == etcd.rmdir("/test", true).error_code());
}
TEST_CASE("watch changes in the past")
{
etcd::Client etcd(etcd_uri);
etcd::SyncClient etcd(etcd_uri);
int index = etcd.set("/test/key1", "42").get().index();
int index = etcd.set("/test/key1", "42").index();
etcd.set("/test/key1", "43").wait();
etcd.set("/test/key1", "44").wait();
etcd.set("/test/key1", "45").wait();
etcd.set("/test/key1", "43");
etcd.set("/test/key1", "44");
etcd.set("/test/key1", "45");
etcd::Response res = etcd.watch("/test/key1", ++index).get();
etcd::Response res = etcd.watch("/test/key1", ++index);
CHECK("set" == res.action());
CHECK("43" == res.value().as_string());
res = etcd.watch("/test/key1", ++index).get();
res = etcd.watch("/test/key1", ++index);
CHECK("set" == res.action());
CHECK("44" == res.value().as_string());
res = etcd.watch("/test", ++index, true).get();
res = etcd.watch("/test", ++index, true);
CHECK("set" == res.action());
CHECK("45" == res.value().as_string());
REQUIRE(0 == etcd.rmdir("/test", true).get().error_code());
REQUIRE(0 == etcd.rmdir("/test", true).error_code());
}
// TEST_CASE("request cancellation")