diff --git a/CMakeLists.txt b/CMakeLists.txt index 84b4851..32eb232 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -88,7 +88,6 @@ macro(use_cxx target) endif() endmacro(use_cxx) -find_package(Boost REQUIRED COMPONENTS system thread random) if(APPLE) # If we're on OS X check for Homebrew's copy of OpenSSL instead of Apple's if(NOT OpenSSL_DIR) @@ -166,8 +165,7 @@ else() set(CPPREST_LIB) endif() -include_directories(SYSTEM ${Boost_INCLUDE_DIR} - ${CPPREST_INCLUDE_DIR} +include_directories(SYSTEM ${CPPREST_INCLUDE_DIR} ${PROTOBUF_INCLUDE_DIRS} ${GRPC_INCLUDE_DIR} ${OPENSSL_INCLUDE_DIR}) diff --git a/README.md b/README.md index 3b34ab6..7a5719e 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ i.e., `ETCDCTL_API=3`. ## Requirements -1. boost and openssl +1. boost and openssl (**Note that boost is only required if you need the asynchronous runtime**) + On Ubuntu, above requirement could be installed as: diff --git a/etcd/KeepAlive.hpp b/etcd/KeepAlive.hpp index 71f86fc..4302b08 100644 --- a/etcd/KeepAlive.hpp +++ b/etcd/KeepAlive.hpp @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -12,14 +13,6 @@ #include "etcd/SyncClient.hpp" #include "etcd/Response.hpp" -#include -#if BOOST_VERSION >= 106600 -#include -#else -#include -#endif -#include - namespace etcd { // forward declaration to avoid header/library dependency @@ -117,24 +110,18 @@ namespace etcd // Don't use `pplx::task` to avoid sharing thread pool with other actions on the client // to avoid any potential blocking, which may block the keepalive loop and evict the lease. - std::thread task_; + std::thread refresh_task_; int ttl; int64_t lease_id; // protect the initializing status of `timer`. - std::recursive_mutex mutex_for_refresh_; + std::mutex mutex_for_refresh_; + std::condition_variable cv_for_refresh_; std::atomic_bool continue_next; // grpc timeout in `refresh()` mutable std::chrono::microseconds grpc_timeout = std::chrono::microseconds::zero(); - -#if BOOST_VERSION >= 106600 - boost::asio::io_context context; -#else - boost::asio::io_service context; -#endif - std::unique_ptr keepalive_timer_; }; } diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b7cb2ee..e39ce1f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -22,7 +22,6 @@ use_cxx(etcd-cpp-api-core-objects) add_dependencies(etcd-cpp-api-core-objects protobuf_generates) include_generated_protobuf_files(etcd-cpp-api-core-objects) target_link_libraries(etcd-cpp-api-core-objects PUBLIC - ${Boost_LIBRARIES} ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${GRPC_LIBRARIES} @@ -33,7 +32,6 @@ if(BUILD_ETCD_CORE_ONLY) add_library(etcd-cpp-api-core $) use_cxx(etcd-cpp-api-core) target_link_libraries(etcd-cpp-api-core PUBLIC - ${Boost_LIBRARIES} ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${GRPC_LIBRARIES} @@ -45,7 +43,6 @@ else() "${CMAKE_CURRENT_SOURCE_DIR}/Client.cpp") use_cxx(etcd-cpp-api) target_link_libraries(etcd-cpp-api PUBLIC - ${Boost_LIBRARIES} ${CPPREST_LIB} # n.b.: the asynchronous client requires pplx in cpprestsdk ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} diff --git a/src/Client.cpp b/src/Client.cpp index 1fe7ee2..c176e0b 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -28,8 +28,6 @@ #include #include -#include - #include #include #include "proto/rpc.grpc.pb.h" diff --git a/src/KeepAlive.cpp b/src/KeepAlive.cpp index 1f4089a..1a936cb 100644 --- a/src/KeepAlive.cpp +++ b/src/KeepAlive.cpp @@ -37,15 +37,14 @@ etcd::KeepAlive::KeepAlive(SyncClient const &client, int ttl, int64_t lease_id): continue_next.store(true); stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(std::move(params))); - task_ = std::thread([this]() { + refresh_task_ = std::thread([this]() { try { // start refresh this->refresh(); - context.run(); - } catch (...) { + } catch (const std::exception &e) { + // propagate the exception eptr_ = std::current_exception(); } - context.stop(); // clean up }); } @@ -84,14 +83,11 @@ etcd::KeepAlive::KeepAlive(SyncClient const &client, params.lease_stub = stubs->leaseServiceStub.get(); stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(std::move(params))); - task_ = std::thread([this]() { + refresh_task_ = std::thread([this]() { try { // start refresh this->refresh(); - context.run(); } catch (...) { - // run canceller first - this->Cancel(); // propogate the exception eptr_ = std::current_exception(); if (handler_) { @@ -117,23 +113,23 @@ etcd::KeepAlive::KeepAlive(std::string const & address, etcd::KeepAlive::~KeepAlive() { this->Cancel(); - // clean up - if (task_.joinable()) { - task_.join(); - } } void etcd::KeepAlive::Cancel() { - std::lock_guard scope_lock(mutex_for_refresh_); if (!continue_next.exchange(false)) { return; } - stubs->call->CancelKeepAlive(); - if (keepalive_timer_) { - keepalive_timer_->cancel(); + + // stop the thread + cv_for_refresh_.notify_all(); + refresh_task_.join(); + + // send a cancel request + { + std::lock_guard lock(mutex_for_refresh_); + stubs->call->CancelKeepAlive(); } - context.stop(); } void etcd::KeepAlive::Check() { @@ -147,7 +143,7 @@ void etcd::KeepAlive::Check() { // run canceller first this->Cancel(); - // propogate the exception, as we throw in `Check()`, the `handler` won't be touched + // propagate the exception, as we throw in `Check()`, the `handler` won't be touched eptr_ = std::current_exception(); if (handler_) { handler_(eptr_); @@ -160,32 +156,27 @@ void etcd::KeepAlive::Check() { void etcd::KeepAlive::refresh() { - std::lock_guard scope_lock(mutex_for_refresh_); - if (!continue_next.load()) { - return; - } - // minimal resolution: 1 second - int keepalive_ttl = std::max(ttl - 1, 1); - keepalive_timer_.reset(new boost::asio::steady_timer(context, std::chrono::seconds(keepalive_ttl))); - keepalive_timer_->async_wait([this](const boost::system::error_code& error) { - if (error) { -#ifndef NDEBUG - std::cerr << "keepalive timer cancelled: " << error << ", " << error.message() << std::endl; -#endif - } else { - if (this->continue_next.load()) { - // execute refresh - this->refresh_once(); - // trigger the next round; - this->refresh(); + while (true) { + if (!continue_next.load()) { + return; + } + // minimal resolution: 1 second + int keepalive_ttl = std::max(ttl - 1, 1); + { + std::unique_lock lock(mutex_for_refresh_); + if (cv_for_refresh_.wait_for(lock, std::chrono::seconds(keepalive_ttl)) == std::cv_status::no_timeout) { + return; } } - }); + + // execute refresh + this->refresh_once(); + } } void etcd::KeepAlive::refresh_once() { - std::lock_guard scope_lock(mutex_for_refresh_); + std::lock_guard scope_lock(mutex_for_refresh_); if (!continue_next.load()) { return; } diff --git a/src/SyncClient.cpp b/src/SyncClient.cpp index 1b27103..ec93b44 100644 --- a/src/SyncClient.cpp +++ b/src/SyncClient.cpp @@ -29,8 +29,6 @@ #include #include -#include - #include #include #include @@ -69,6 +67,32 @@ namespace etcd { namespace detail { +static void string_split(std::vector &dests, std::string const &src, std::string const &seps) { + dests.clear(); + std::string::const_iterator start = src.begin(); + std::string::const_iterator end = src.end(); + std::string::const_iterator next = std::find_first_of(start, end, seps.begin(), seps.end()); + while (next != end) { + dests.push_back(std::string(start, next)); + start = next + 1; + next = std::find_first_of(start, end, seps.begin(), seps.end()); + } + if (start != end) { + dests.push_back(std::string(start, end)); + } +} + +static std::string string_join(std::vector const &srcs, std::string const sep) { + std::stringstream ss; + if (!srcs.empty()) { + ss << srcs[0]; + for (size_t i = 1; i < srcs.size(); ++i) { + ss << sep << srcs[i]; + } + } + return ss.str(); +} + static bool dns_resolve(std::string const &target, std::vector &endpoints) { struct addrinfo hints = {}, *addrs; hints.ai_family = AF_INET; @@ -76,7 +100,7 @@ static bool dns_resolve(std::string const &target, std::vector &end hints.ai_protocol = IPPROTO_TCP; std::vector target_parts; - boost::split(target_parts, target, boost::is_any_of(":")); + string_split(target_parts, target, ":"); if (target_parts.size() != 2) { std::cerr << "warn: invalid URL: " << target << std::endl; return false; @@ -116,7 +140,7 @@ static bool dns_resolve(std::string const &target, std::vector &end const std::string strip_and_resolve_addresses(std::string const &address) { std::vector addresses; - boost::algorithm::split(addresses, address, boost::algorithm::is_any_of(",;")); + string_split(addresses, address, ",;"); std::string stripped_address; { std::vector stripped_addresses; @@ -126,7 +150,7 @@ const std::string strip_and_resolve_addresses(std::string const &address) { std::string target = idx == std::string::npos ? addr : addr.substr(idx + substr.length()); etcd::detail::dns_resolve(target, stripped_addresses); } - stripped_address = boost::algorithm::join(stripped_addresses, ","); + stripped_address = string_join(stripped_addresses, ","); } return "ipv4:///" + stripped_address; } diff --git a/tst/LockTest.cpp b/tst/LockTest.cpp index 5a834cc..6111393 100644 --- a/tst/LockTest.cpp +++ b/tst/LockTest.cpp @@ -205,6 +205,7 @@ TEST_CASE("concurrent lock & unlock") constexpr size_t trials = 192; std::function locker = [&etcd](std::string const &key, const size_t index) { + std::cout << "start lock for " << key << ", index is " << index << std::endl; auto resp = etcd.lock(key).get(); std::cout << "lock for " << index << " is ok, starts sleeping: ..." << resp.error_message() << std::endl << std::flush; REQUIRE(resp.is_ok());