Auto and watch functionalities on SyncClient.

Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
This commit is contained in:
Tao He 2020-10-06 11:52:15 +08:00
parent b33ce0d8de
commit 50d1a61983
4 changed files with 78 additions and 57 deletions

View File

@ -33,7 +33,7 @@ namespace etcd
/** /**
* Constructs an etcd client object. * Constructs an etcd client object.
* *
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:4001", * @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379",
* or multiple url, seperated by ',' or ';'. * or multiple url, seperated by ',' or ';'.
* @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds. * @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds.
*/ */
@ -43,7 +43,7 @@ namespace etcd
/** /**
* Constructs an etcd client object. * Constructs an etcd client object.
* *
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:4001", * @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379",
* or multiple url, seperated by ',' or ';'. * or multiple url, seperated by ',' or ';'.
* @param username username of etcd auth * @param username username of etcd auth
* @param password password of etcd auth * @param password password of etcd auth

View File

@ -18,9 +18,27 @@ namespace etcd
public: public:
/** /**
* Constructs an etcd sync client object. * Constructs an etcd sync client object.
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:4001" *
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379",
* or multiple url, seperated by ',' or ';'.
* @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds.
*/ */
SyncClient(std::string const & etcd_url); SyncClient(std::string const & etcd_url,
std::string const & load_balancer = "round_robin");
/**
* Constructs an etcd sync client object.
*
* @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379",
* or multiple url, seperated by ',' or ';'.
* @param username username of etcd auth
* @param password password of etcd auth
* @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds.
*/
SyncClient(std::string const & etcd_url,
std::string const & username,
std::string const & password,
std::string const & load_balancer = "round_robin");
Response get(std::string const & key); Response get(std::string const & key);
Response set(std::string const & key, std::string const & value, int ttl = 0); Response set(std::string const & key, std::string const & value, int ttl = 0);
@ -49,7 +67,7 @@ namespace etcd
* @param key is the value or directory to be watched * @param key is the value or directory to be watched
* @param recursive if true watch a whole subtree * @param recursive if true watch a whole subtree
*/ */
// Response watch(std::string const & key, bool recursive = false); Response watch(std::string const & key, bool recursive = false);
/** /**
* Watches for changes of a key or a subtree from a specific index. The index value can be in the "past". * Watches for changes of a key or a subtree from a specific index. The index value can be in the "past".
@ -57,8 +75,7 @@ namespace etcd
* @param fromIndex the first index we are interested in * @param fromIndex the first index we are interested in
* @param recursive if true watch a whole subtree * @param recursive if true watch a whole subtree
*/ */
// Response watch(std::string const & key, int fromIndex, bool recursive = false); Response watch(std::string const & key, int fromIndex, bool recursive = false);
protected: protected:
Client client; Client client;

View File

@ -10,8 +10,16 @@
return etcd::Response(500, ex.what()); \ return etcd::Response(500, ex.what()); \
} }
etcd::SyncClient::SyncClient(std::string const & address) etcd::SyncClient::SyncClient(std::string const & address, std::string const & load_balancer)
: client(address) : client(address, load_balancer)
{
}
etcd::SyncClient::SyncClient(std::string const & address,
std::string const & username,
std::string const & password,
std::string const & load_balancer)
: client(address, username, password, load_balancer)
{ {
} }
@ -101,21 +109,12 @@ etcd::Response etcd::SyncClient::leasegrant(int ttl)
CHECK_EXCEPTIONS(client.leasegrant(ttl).get()); CHECK_EXCEPTIONS(client.leasegrant(ttl).get());
} }
// etcd::Response etcd::SyncClient::watch(std::string const & key, bool recursive) etcd::Response etcd::SyncClient::watch(std::string const & key, bool recursive)
// { {
// web::http::uri_builder uri("/v2/keys" + key); CHECK_EXCEPTIONS(client.watch(key, recursive).get());
// uri.append_query("wait=true"); }
// if (recursive)
// uri.append_query("recursive=true");
// return send_get_request(uri);
// }
// etcd::Response etcd::SyncClient::watch(std::string const & key, int fromIndex, bool recursive) etcd::Response etcd::SyncClient::watch(std::string const & key, int fromIndex, bool recursive)
// { {
// web::http::uri_builder uri("/v2/keys" + key); CHECK_EXCEPTIONS(client.watch(key, fromIndex, recursive).get());
// uri.append_query("wait=true"); }
// uri.append_query("waitIndex", fromIndex);
// if (recursive)
// uri.append_query("recursive=true");
// return send_get_request(uri);
// }

View File

@ -1,5 +1,6 @@
#define CATCH_CONFIG_MAIN #define CATCH_CONFIG_MAIN
#include <catch.hpp> #include <catch.hpp>
#include <thread>
#include "etcd/SyncClient.hpp" #include "etcd/SyncClient.hpp"
@ -109,68 +110,72 @@ TEST_CASE("sync operations")
TEST_CASE("wait for a value change") TEST_CASE("wait for a value change")
{ {
etcd::Client etcd(etcd_uri); etcd::SyncClient etcd(etcd_uri);
etcd.set("/test/key1", "42").wait(); etcd.set("/test/key1", "42");
pplx::task<etcd::Response> res = etcd.watch("/test/key1"); std::thread watch_thrd([&]() {
CHECK(!res.is_done()); etcd::Response res = etcd.watch("/test/key1");
REQUIRE("set" == res.action());
CHECK("43" == res.value().as_string());
});
etcd.set("/test/key1", "43").get();
sleep(1); sleep(1);
etcd.set("/test/key1", "43");
watch_thrd.join();
REQUIRE(res.is_done()); REQUIRE(0 == etcd.rmdir("/test", true).error_code());
REQUIRE("set" == res.get().action());
CHECK("43" == res.get().value().as_string());
REQUIRE(0 == etcd.rmdir("/test", true).get().error_code());
} }
TEST_CASE("wait for a directory change") TEST_CASE("wait for a directory change")
{ {
etcd::Client etcd(etcd_uri); etcd::SyncClient etcd(etcd_uri);
pplx::task<etcd::Response> res = etcd.watch("/test", true); std::thread watch_thrd1([&]() {
etcd::Response res = etcd.watch("/test", true);
CHECK("create" == res.action());
CHECK("44" == res.value().as_string());
});
etcd.add("/test/key4", "44").wait();
sleep(1); sleep(1);
REQUIRE(res.is_done()); etcd.add("/test/key4", "44");
CHECK("create" == res.get().action()); watch_thrd1.join();
CHECK("44" == res.get().value().as_string());
pplx::task<etcd::Response> res2 = etcd.watch("/test", true); std::thread watch_thrd2([&]() {
etcd::Response res2 = etcd.watch("/test", true);
CHECK("set" == res2.action());
CHECK("45" == res2.value().as_string());
});
etcd.set("/test/key4", "45").wait();
sleep(1); sleep(1);
REQUIRE(res2.is_done()); etcd.set("/test/key4", "45");
CHECK("set" == res2.get().action()); watch_thrd2.join();
CHECK("45" == res2.get().value().as_string());
REQUIRE(0 == etcd.rmdir("/test", true).get().error_code()); REQUIRE(0 == etcd.rmdir("/test", true).error_code());
} }
TEST_CASE("watch changes in the past") TEST_CASE("watch changes in the past")
{ {
etcd::Client etcd(etcd_uri); etcd::SyncClient etcd(etcd_uri);
int index = etcd.set("/test/key1", "42").get().index(); int index = etcd.set("/test/key1", "42").index();
etcd.set("/test/key1", "43").wait(); etcd.set("/test/key1", "43");
etcd.set("/test/key1", "44").wait(); etcd.set("/test/key1", "44");
etcd.set("/test/key1", "45").wait(); etcd.set("/test/key1", "45");
etcd::Response res = etcd.watch("/test/key1", ++index).get(); etcd::Response res = etcd.watch("/test/key1", ++index);
CHECK("set" == res.action()); CHECK("set" == res.action());
CHECK("43" == res.value().as_string()); CHECK("43" == res.value().as_string());
res = etcd.watch("/test/key1", ++index).get(); res = etcd.watch("/test/key1", ++index);
CHECK("set" == res.action()); CHECK("set" == res.action());
CHECK("44" == res.value().as_string()); CHECK("44" == res.value().as_string());
res = etcd.watch("/test", ++index, true).get(); res = etcd.watch("/test", ++index, true);
CHECK("set" == res.action()); CHECK("set" == res.action());
CHECK("45" == res.value().as_string()); CHECK("45" == res.value().as_string());
REQUIRE(0 == etcd.rmdir("/test", true).get().error_code()); REQUIRE(0 == etcd.rmdir("/test", true).error_code());
} }
// TEST_CASE("request cancellation") // TEST_CASE("request cancellation")