From eb284103e0bc2c5943948f8d0fed327fd78908e7 Mon Sep 17 00:00:00 2001 From: Tao He Date: Tue, 6 Oct 2020 11:52:15 +0800 Subject: [PATCH] Auto and watch functionalities on SyncClient. Signed-off-by: Tao He --- etcd/Client.hpp | 4 +-- etcd/SyncClient.hpp | 27 ++++++++++++++---- src/SyncClient.cpp | 37 ++++++++++++------------ tst/EtcdSyncTest.cpp | 67 ++++++++++++++++++++++++-------------------- 4 files changed, 78 insertions(+), 57 deletions(-) diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 52970c6..6b108cd 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -33,7 +33,7 @@ namespace etcd /** * Constructs an etcd client object. * - * @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:4001", + * @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379", * or multiple url, seperated by ',' or ';'. * @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds. */ @@ -43,7 +43,7 @@ namespace etcd /** * Constructs an etcd client object. * - * @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:4001", + * @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379", * or multiple url, seperated by ',' or ';'. * @param username username of etcd auth * @param password password of etcd auth diff --git a/etcd/SyncClient.hpp b/etcd/SyncClient.hpp index 1bde9b9..087bff5 100644 --- a/etcd/SyncClient.hpp +++ b/etcd/SyncClient.hpp @@ -18,9 +18,27 @@ namespace etcd public: /** * Constructs an etcd sync client object. - * @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:4001" + * + * @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379", + * or multiple url, seperated by ',' or ';'. + * @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds. */ - SyncClient(std::string const & etcd_url); + SyncClient(std::string const & etcd_url, + std::string const & load_balancer = "round_robin"); + + /** + * Constructs an etcd sync client object. + * + * @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379", + * or multiple url, seperated by ',' or ';'. + * @param username username of etcd auth + * @param password password of etcd auth + * @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds. + */ + SyncClient(std::string const & etcd_url, + std::string const & username, + std::string const & password, + std::string const & load_balancer = "round_robin"); Response get(std::string const & key); Response set(std::string const & key, std::string const & value, int ttl = 0); @@ -49,7 +67,7 @@ namespace etcd * @param key is the value or directory to be watched * @param recursive if true watch a whole subtree */ - // Response watch(std::string const & key, bool recursive = false); + Response watch(std::string const & key, bool recursive = false); /** * Watches for changes of a key or a subtree from a specific index. The index value can be in the "past". @@ -57,8 +75,7 @@ namespace etcd * @param fromIndex the first index we are interested in * @param recursive if true watch a whole subtree */ - // Response watch(std::string const & key, int fromIndex, bool recursive = false); - + Response watch(std::string const & key, int fromIndex, bool recursive = false); protected: Client client; diff --git a/src/SyncClient.cpp b/src/SyncClient.cpp index e277339..1f37b87 100644 --- a/src/SyncClient.cpp +++ b/src/SyncClient.cpp @@ -10,8 +10,16 @@ return etcd::Response(500, ex.what()); \ } -etcd::SyncClient::SyncClient(std::string const & address) - : client(address) +etcd::SyncClient::SyncClient(std::string const & address, std::string const & load_balancer) + : client(address, load_balancer) +{ +} + +etcd::SyncClient::SyncClient(std::string const & address, + std::string const & username, + std::string const & password, + std::string const & load_balancer) + : client(address, username, password, load_balancer) { } @@ -101,21 +109,12 @@ etcd::Response etcd::SyncClient::leasegrant(int ttl) CHECK_EXCEPTIONS(client.leasegrant(ttl).get()); } -// etcd::Response etcd::SyncClient::watch(std::string const & key, bool recursive) -// { -// web::http::uri_builder uri("/v2/keys" + key); -// uri.append_query("wait=true"); -// if (recursive) -// uri.append_query("recursive=true"); -// return send_get_request(uri); -// } +etcd::Response etcd::SyncClient::watch(std::string const & key, bool recursive) +{ + CHECK_EXCEPTIONS(client.watch(key, recursive).get()); +} -// etcd::Response etcd::SyncClient::watch(std::string const & key, int fromIndex, bool recursive) -// { -// web::http::uri_builder uri("/v2/keys" + key); -// uri.append_query("wait=true"); -// uri.append_query("waitIndex", fromIndex); -// if (recursive) -// uri.append_query("recursive=true"); -// return send_get_request(uri); -// } +etcd::Response etcd::SyncClient::watch(std::string const & key, int fromIndex, bool recursive) +{ + CHECK_EXCEPTIONS(client.watch(key, fromIndex, recursive).get()); +} diff --git a/tst/EtcdSyncTest.cpp b/tst/EtcdSyncTest.cpp index 295ab2f..16ebd83 100644 --- a/tst/EtcdSyncTest.cpp +++ b/tst/EtcdSyncTest.cpp @@ -1,5 +1,6 @@ #define CATCH_CONFIG_MAIN #include +#include #include "etcd/SyncClient.hpp" @@ -109,68 +110,72 @@ TEST_CASE("sync operations") TEST_CASE("wait for a value change") { - etcd::Client etcd(etcd_uri); - etcd.set("/test/key1", "42").wait(); + etcd::SyncClient etcd(etcd_uri); + etcd.set("/test/key1", "42"); - pplx::task res = etcd.watch("/test/key1"); - CHECK(!res.is_done()); + std::thread watch_thrd([&]() { + etcd::Response res = etcd.watch("/test/key1"); + REQUIRE("set" == res.action()); + CHECK("43" == res.value().as_string()); + }); - etcd.set("/test/key1", "43").get(); sleep(1); + etcd.set("/test/key1", "43"); + watch_thrd.join(); - REQUIRE(res.is_done()); - REQUIRE("set" == res.get().action()); - CHECK("43" == res.get().value().as_string()); - - REQUIRE(0 == etcd.rmdir("/test", true).get().error_code()); + REQUIRE(0 == etcd.rmdir("/test", true).error_code()); } TEST_CASE("wait for a directory change") { - etcd::Client etcd(etcd_uri); + etcd::SyncClient etcd(etcd_uri); - pplx::task res = etcd.watch("/test", true); + std::thread watch_thrd1([&]() { + etcd::Response res = etcd.watch("/test", true); + CHECK("create" == res.action()); + CHECK("44" == res.value().as_string()); + }); - etcd.add("/test/key4", "44").wait(); sleep(1); - REQUIRE(res.is_done()); - CHECK("create" == res.get().action()); - CHECK("44" == res.get().value().as_string()); + etcd.add("/test/key4", "44"); + watch_thrd1.join(); - pplx::task res2 = etcd.watch("/test", true); + std::thread watch_thrd2([&]() { + etcd::Response res2 = etcd.watch("/test", true); + CHECK("set" == res2.action()); + CHECK("45" == res2.value().as_string()); + }); - etcd.set("/test/key4", "45").wait(); sleep(1); - REQUIRE(res2.is_done()); - CHECK("set" == res2.get().action()); - CHECK("45" == res2.get().value().as_string()); + etcd.set("/test/key4", "45"); + watch_thrd2.join(); - REQUIRE(0 == etcd.rmdir("/test", true).get().error_code()); + REQUIRE(0 == etcd.rmdir("/test", true).error_code()); } TEST_CASE("watch changes in the past") { - etcd::Client etcd(etcd_uri); + etcd::SyncClient etcd(etcd_uri); - int index = etcd.set("/test/key1", "42").get().index(); + int index = etcd.set("/test/key1", "42").index(); - etcd.set("/test/key1", "43").wait(); - etcd.set("/test/key1", "44").wait(); - etcd.set("/test/key1", "45").wait(); + etcd.set("/test/key1", "43"); + etcd.set("/test/key1", "44"); + etcd.set("/test/key1", "45"); - etcd::Response res = etcd.watch("/test/key1", ++index).get(); + etcd::Response res = etcd.watch("/test/key1", ++index); CHECK("set" == res.action()); CHECK("43" == res.value().as_string()); - res = etcd.watch("/test/key1", ++index).get(); + res = etcd.watch("/test/key1", ++index); CHECK("set" == res.action()); CHECK("44" == res.value().as_string()); - res = etcd.watch("/test", ++index, true).get(); + res = etcd.watch("/test", ++index, true); CHECK("set" == res.action()); CHECK("45" == res.value().as_string()); - REQUIRE(0 == etcd.rmdir("/test", true).get().error_code()); + REQUIRE(0 == etcd.rmdir("/test", true).error_code()); } // TEST_CASE("request cancellation")