From 174ea401b85348020e23d70f080b22f719c8ccc4 Mon Sep 17 00:00:00 2001 From: arches Date: Mon, 4 Jul 2016 06:54:07 -0400 Subject: [PATCH] Added SyncClient class --- etcd/Response.hpp | 2 + etcd/SyncClient.hpp | 62 ++++++++++++++++++ src/CMakeLists.txt | 2 +- src/Response.cpp | 7 ++ src/SyncClient.cpp | 91 ++++++++++++++++++++++++++ tst/CMakeLists.txt | 2 +- tst/EtcdSyncTest.cpp | 151 +++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 315 insertions(+), 2 deletions(-) create mode 100644 etcd/SyncClient.hpp create mode 100644 src/SyncClient.cpp create mode 100644 tst/EtcdSyncTest.cpp diff --git a/etcd/Response.hpp b/etcd/Response.hpp index c90ce3c..8f84051 100644 --- a/etcd/Response.hpp +++ b/etcd/Response.hpp @@ -99,6 +99,7 @@ namespace etcd protected: Response(const etcdv3::V3Response& response); + Response(int error_code, char const * error_message); int _error_code; std::string _error_message; @@ -108,6 +109,7 @@ namespace etcd Value _prev_value; Values _values; Keys _keys; + friend class SyncClient; }; } diff --git a/etcd/SyncClient.hpp b/etcd/SyncClient.hpp new file mode 100644 index 0000000..6b612b6 --- /dev/null +++ b/etcd/SyncClient.hpp @@ -0,0 +1,62 @@ +#ifndef __ETCD_SYNC_CLIENT_HPP__ +#define __ETCD_SYNC_CLIENT_HPP__ + +#include "etcd/Client.hpp" + +namespace etcd +{ + /** + * SyncClient is a wrapper around etcd::Client and provides a simplified sync interface with blocking operations. + * + * In case of any exceptions occur in the backend the Response value's error_message will contain the + * text of the exception and the error code will be 600 + * + * Use this class only if performance does not matter. + */ + class SyncClient + { + public: + /** + * Constructs an etcd sync client object. + * @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:4001" + */ + SyncClient(std::string const & etcd_url); + + Response get(std::string const & key); + Response set(std::string const & key, std::string const & value, int ttl = 0); + Response add(std::string const & key, std::string const & value, int ttl = 0); + Response modify(std::string const & key, std::string const & value, int ttl = 0); + Response modify_if(std::string const & key, std::string const & value, std::string const & old_value, int ttl = 0); + Response modify_if(std::string const & key, std::string const & value, int old_index, int ttl = 0); + Response rm(std::string const & key); + Response rm_if(std::string const & key, std::string const & old_value); + Response rm_if(std::string const & key, int old_index); + Response ls(std::string const & key); + Response mkdir(std::string const & key, int ttl = 0); + Response rmdir(std::string const & key, bool recursive = false); + + /** + * Watches for changes of a key or a subtree. Please note that if you watch e.g. "/testdir" and + * a new key is created, like "/testdir/newkey" then no change happened in the value of + * "/testdir" so your watch will not detect this. If you want to detect addition and deletion of + * directory entries then you have to do a recursive watch. + * @param key is the value or directory to be watched + * @param recursive if true watch a whole subtree + */ + // Response watch(std::string const & key, bool recursive = false); + + /** + * Watches for changes of a key or a subtree from a specific index. The index value can be in the "past". + * @param key is the value or directory to be watched + * @param fromIndex the first index we are interested in + * @param recursive if true watch a whole subtree + */ + // Response watch(std::string const & key, int fromIndex, bool recursive = false); + + + protected: + Client client; + }; +} + +#endif diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index da1a009..be6c7c4 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncWatchResponse.cpp ../v3/src/Transaction.cpp ../v3/src/action_constants.cpp ../v3/src/AsyncSetAction.cpp ../v3/src/AsyncCompareAndSwapAction.cpp ../v3/src/AsyncUpdateAction.cpp ../v3/src/AsyncGetAction.cpp ../v3/src/AsyncDeleteAction.cpp ../v3/src/AsyncCompareAndDeleteAction.cpp ../v3/src/Action.cpp ../v3/src/AsyncWatchAction.cpp Client.cpp Response.cpp Value.cpp) +add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/AsyncWatchResponse.cpp ../v3/src/Transaction.cpp ../v3/src/action_constants.cpp ../v3/src/AsyncSetAction.cpp ../v3/src/AsyncCompareAndSwapAction.cpp ../v3/src/AsyncUpdateAction.cpp ../v3/src/AsyncGetAction.cpp ../v3/src/AsyncDeleteAction.cpp ../v3/src/AsyncCompareAndDeleteAction.cpp ../v3/src/Action.cpp ../v3/src/AsyncWatchAction.cpp Client.cpp Response.cpp Value.cpp SyncClient.cpp) set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11) target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++) diff --git a/src/Response.cpp b/src/Response.cpp index 3b13c85..f5d074b 100644 --- a/src/Response.cpp +++ b/src/Response.cpp @@ -36,6 +36,13 @@ etcd::Response::Response() { } +etcd::Response::Response(int error_code, char const * error_message) + : _error_code(error_code), + _error_message(error_message), + _index(0) +{ +} + int etcd::Response::error_code() const { return _error_code; diff --git a/src/SyncClient.cpp b/src/SyncClient.cpp new file mode 100644 index 0000000..24dbf13 --- /dev/null +++ b/src/SyncClient.cpp @@ -0,0 +1,91 @@ +#include "etcd/SyncClient.hpp" + +#define CHECK_EXCEPTIONS(cmd) \ + try \ + { \ + return cmd; \ + } \ + catch (std::exception const & ex) \ + { \ + return etcd::Response(500, ex.what()); \ + } + +etcd::SyncClient::SyncClient(std::string const & address) + : client(address) +{ +} + +etcd::Response etcd::SyncClient::get(std::string const & key) +{ + CHECK_EXCEPTIONS(client.get(key).get()); +} + +etcd::Response etcd::SyncClient::set(std::string const & key, std::string const & value, int ttl) +{ + CHECK_EXCEPTIONS(client.set(key, value).get()); +} + +etcd::Response etcd::SyncClient::add(std::string const & key, std::string const & value, int ttl) +{ + CHECK_EXCEPTIONS(client.add(key, value).get()); +} + +etcd::Response etcd::SyncClient::modify(std::string const & key, std::string const & value, int ttl) +{ + CHECK_EXCEPTIONS(client.modify(key, value).get()); +} + +etcd::Response etcd::SyncClient::modify_if(std::string const & key, std::string const & value, std::string const & old_value, int ttl) +{ + CHECK_EXCEPTIONS(client.modify_if(key, value, old_value).get()); +} + +etcd::Response etcd::SyncClient::modify_if(std::string const & key, std::string const & value, int old_index, int ttl) +{ + CHECK_EXCEPTIONS(client.modify_if(key, value, old_index).get()); +} + +etcd::Response etcd::SyncClient::rm(std::string const & key) +{ + CHECK_EXCEPTIONS(client.rm(key).get()); +} + +etcd::Response etcd::SyncClient::rm_if(std::string const & key, std::string const & old_value) +{ + CHECK_EXCEPTIONS(client.rm_if(key, old_value).get()); +} + +etcd::Response etcd::SyncClient::rm_if(std::string const & key, int old_index) +{ + CHECK_EXCEPTIONS(client.rm_if(key, old_index).get()); +} + + +etcd::Response etcd::SyncClient::rmdir(std::string const & key, bool recursive) +{ + CHECK_EXCEPTIONS(client.rmdir(key, recursive).get()); +} + +etcd::Response etcd::SyncClient::ls(std::string const & key) +{ + CHECK_EXCEPTIONS(client.ls(key).get()); +} + +// etcd::Response etcd::SyncClient::watch(std::string const & key, bool recursive) +// { +// web::http::uri_builder uri("/v2/keys" + key); +// uri.append_query("wait=true"); +// if (recursive) +// uri.append_query("recursive=true"); +// return send_get_request(uri); +// } + +// etcd::Response etcd::SyncClient::watch(std::string const & key, int fromIndex, bool recursive) +// { +// web::http::uri_builder uri("/v2/keys" + key); +// uri.append_query("wait=true"); +// uri.append_query("waitIndex", fromIndex); +// if (recursive) +// uri.append_query("recursive=true"); +// return send_get_request(uri); +// } diff --git a/tst/CMakeLists.txt b/tst/CMakeLists.txt index 6c44391..f304fe2 100644 --- a/tst/CMakeLists.txt +++ b/tst/CMakeLists.txt @@ -1,4 +1,4 @@ -add_executable(etcd_test EtcdTest.cpp) +add_executable(etcd_test EtcdTest.cpp EtcdSyncTest.cpp) set_property(TARGET etcd_test PROPERTY CXX_STANDARD 11) target_link_libraries(etcd_test etcd-cpp-api) diff --git a/tst/EtcdSyncTest.cpp b/tst/EtcdSyncTest.cpp new file mode 100644 index 0000000..7949b65 --- /dev/null +++ b/tst/EtcdSyncTest.cpp @@ -0,0 +1,151 @@ +#include + +#include "etcd/SyncClient.hpp" + +static std::string etcd_uri("http://127.0.0.1:2379"); + +TEST_CASE("sync operations") +{ + etcd::SyncClient etcd(etcd_uri); + etcd.rmdir("/test", true); + + // add + CHECK(0 == etcd.add("/test/key1", "42").error_code()); + CHECK(105 == etcd.add("/test/key1", "42").error_code()); // Key already exists + CHECK("42" == etcd.get("/test/key1").value().as_string()); + + // modify + CHECK(0 == etcd.modify("/test/key1", "43").error_code()); + CHECK(100 == etcd.modify("/test/key2", "43").error_code()); // Key not found + CHECK("43" == etcd.modify("/test/key1", "42").prev_value().as_string()); + + // set + CHECK(0 == etcd.set("/test/key1", "43").error_code()); // overwrite + CHECK(0 == etcd.set("/test/key2", "43").error_code()); // create new + CHECK("43" == etcd.set("/test/key2", "44").prev_value().as_string()); + CHECK("" == etcd.set("/test/key3", "44").prev_value().as_string()); + //CHECK(102 == etcd.set("/test", "42").error_code()); // Not a file + + // rm + CHECK(3 == etcd.ls("/test").keys().size()); + CHECK(0 == etcd.rm("/test/key1").error_code()); + CHECK(2 == etcd.ls("/test").keys().size()); + + // mkdir + //CHECK(etcd.mkdir("/test/new_dir").value().is_dir()); + + // ls + CHECK(0 == etcd.ls("/test/new_dir").keys().size()); + etcd.set("/test/new_dir/key1", "value1"); + etcd.set("/test/new_dir/key2", "value2"); + //etcd.mkdir("/test/new_dir/sub_dir"); + CHECK(2 == etcd.ls("/test/new_dir").keys().size()); + + // rmdir + //CHECK(108 == etcd.rmdir("/test/new_dir").error_code()); // Directory not empty + CHECK(0 == etcd.rmdir("/test/new_dir", true).error_code()); + + // compare and swap + etcd.set("/test/key1", "42"); + int index = etcd.modify_if("/test/key1", "43", "42").index(); + CHECK(101 == etcd.modify_if("/test/key1", "44", "42").error_code()); + REQUIRE(etcd.modify_if("/test/key1", "44", index).is_ok()); + CHECK(101 == etcd.modify_if("/test/key1", "45", index).error_code()); + + // atomic compare-and-delete based on prevValue + etcd.set("/test/key1", "42"); + CHECK(101 == etcd.rm_if("/test/key1", "43").error_code()); + CHECK(0 == etcd.rm_if("/test/key1", "42").error_code()); + + // atomic compare-and-delete based on prevIndex + index = etcd.set("/test/key1", "42").index(); + CHECK(101 == etcd.rm_if("/test/key1", index - 1).error_code()); + CHECK(0 == etcd.rm_if("/test/key1", index).error_code()); + +// TEST_CASE("wait for a value change") +// { +// etcd::Client etcd(etcd_uri); +// etcd.set("/test/key1", "42").wait(); + +// pplx::task res = etcd.watch("/test/key1"); +// CHECK(!res.is_done()); + +// etcd.set("/test/key1", "43").get(); +// sleep(1); + +// REQUIRE(res.is_done()); +// REQUIRE("set" == res.get().action()); +// CHECK("43" == res.get().value().as_string()); +// } + +// TEST_CASE("wait for a directory change") +// { +// etcd::Client etcd(etcd_uri); + +// pplx::task res = etcd.watch("/test", true); + +// etcd.add("/test/key4", "44").wait(); +// REQUIRE(res.is_done()); +// CHECK("create" == res.get().action()); +// CHECK("44" == res.get().value().as_string()); + +// pplx::task res2 = etcd.watch("/test", true); + +// etcd.set("/test/key4", "45").wait(); +// sleep(1); +// REQUIRE(res2.is_done()); +// CHECK("set" == res2.get().action()); +// CHECK("45" == res2.get().value().as_string()); +// } + +// TEST_CASE("watch changes in the past") +// { +// etcd::Client etcd(etcd_uri); + +// int index = etcd.set("/test/key1", "42").get().index(); + +// etcd.set("/test/key1", "43").wait(); +// etcd.set("/test/key1", "44").wait(); +// etcd.set("/test/key1", "45").wait(); + +// etcd::Response res = etcd.watch("/test/key1", ++index).get(); +// CHECK("set" == res.action()); +// CHECK("43" == res.value().as_string()); + +// res = etcd.watch("/test/key1", ++index).get(); +// CHECK("set" == res.action()); +// CHECK("44" == res.value().as_string()); + +// res = etcd.watch("/test", ++index, true).get(); +// CHECK("set" == res.action()); +// CHECK("45" == res.value().as_string()); +// } + +// TEST_CASE("request cancellation") +// { +// etcd::Client etcd(etcd_uri); +// etcd.set("/test/key1", "42").wait(); + +// pplx::task res = etcd.watch("/test/key1"); +// CHECK(!res.is_done()); + +// etcd.cancel_operations(); + +// sleep(1); +// REQUIRE(res.is_done()); +// try +// { +// res.wait(); +// } +// catch(pplx::task_canceled const & ex) +// { +// std::cout << "pplx::task_canceled: " << ex.what() << "\n"; +// } +// catch(std::exception const & ex) +// { +// std::cout << "std::exception: " << ex.what() << "\n"; +// } +// } + + REQUIRE(0 == etcd.rmdir("/test", true).error_code()); +}