Drop the boost dependency on the sync runtime (#216)

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2023-04-28 20:38:13 +08:00 committed by GitHub
parent 5e2884f362
commit e771d2f6da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 65 additions and 69 deletions

View File

@ -88,7 +88,6 @@ macro(use_cxx target)
endif() endif()
endmacro(use_cxx) endmacro(use_cxx)
find_package(Boost REQUIRED COMPONENTS system thread random)
if(APPLE) if(APPLE)
# If we're on OS X check for Homebrew's copy of OpenSSL instead of Apple's # If we're on OS X check for Homebrew's copy of OpenSSL instead of Apple's
if(NOT OpenSSL_DIR) if(NOT OpenSSL_DIR)
@ -166,8 +165,7 @@ else()
set(CPPREST_LIB) set(CPPREST_LIB)
endif() endif()
include_directories(SYSTEM ${Boost_INCLUDE_DIR} include_directories(SYSTEM ${CPPREST_INCLUDE_DIR}
${CPPREST_INCLUDE_DIR}
${PROTOBUF_INCLUDE_DIRS} ${PROTOBUF_INCLUDE_DIRS}
${GRPC_INCLUDE_DIR} ${GRPC_INCLUDE_DIR}
${OPENSSL_INCLUDE_DIR}) ${OPENSSL_INCLUDE_DIR})

View File

@ -28,7 +28,7 @@ i.e., `ETCDCTL_API=3`.
## Requirements ## Requirements
1. boost and openssl 1. boost and openssl (**Note that boost is only required if you need the asynchronous runtime**)
+ On Ubuntu, above requirement could be installed as: + On Ubuntu, above requirement could be installed as:

View File

@ -3,6 +3,7 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <condition_variable>
#include <exception> #include <exception>
#include <functional> #include <functional>
#include <mutex> #include <mutex>
@ -12,14 +13,6 @@
#include "etcd/SyncClient.hpp" #include "etcd/SyncClient.hpp"
#include "etcd/Response.hpp" #include "etcd/Response.hpp"
#include <boost/config.hpp>
#if BOOST_VERSION >= 106600
#include <boost/asio/io_context.hpp>
#else
#include <boost/asio/io_service.hpp>
#endif
#include <boost/asio/steady_timer.hpp>
namespace etcd namespace etcd
{ {
// forward declaration to avoid header/library dependency // forward declaration to avoid header/library dependency
@ -117,24 +110,18 @@ namespace etcd
// Don't use `pplx::task` to avoid sharing thread pool with other actions on the client // Don't use `pplx::task` to avoid sharing thread pool with other actions on the client
// to avoid any potential blocking, which may block the keepalive loop and evict the lease. // to avoid any potential blocking, which may block the keepalive loop and evict the lease.
std::thread task_; std::thread refresh_task_;
int ttl; int ttl;
int64_t lease_id; int64_t lease_id;
// protect the initializing status of `timer`. // protect the initializing status of `timer`.
std::recursive_mutex mutex_for_refresh_; std::mutex mutex_for_refresh_;
std::condition_variable cv_for_refresh_;
std::atomic_bool continue_next; std::atomic_bool continue_next;
// grpc timeout in `refresh()` // grpc timeout in `refresh()`
mutable std::chrono::microseconds grpc_timeout = std::chrono::microseconds::zero(); mutable std::chrono::microseconds grpc_timeout = std::chrono::microseconds::zero();
#if BOOST_VERSION >= 106600
boost::asio::io_context context;
#else
boost::asio::io_service context;
#endif
std::unique_ptr<boost::asio::steady_timer> keepalive_timer_;
}; };
} }

View File

@ -22,7 +22,6 @@ use_cxx(etcd-cpp-api-core-objects)
add_dependencies(etcd-cpp-api-core-objects protobuf_generates) add_dependencies(etcd-cpp-api-core-objects protobuf_generates)
include_generated_protobuf_files(etcd-cpp-api-core-objects) include_generated_protobuf_files(etcd-cpp-api-core-objects)
target_link_libraries(etcd-cpp-api-core-objects PUBLIC target_link_libraries(etcd-cpp-api-core-objects PUBLIC
${Boost_LIBRARIES}
${PROTOBUF_LIBRARIES} ${PROTOBUF_LIBRARIES}
${OPENSSL_LIBRARIES} ${OPENSSL_LIBRARIES}
${GRPC_LIBRARIES} ${GRPC_LIBRARIES}
@ -33,7 +32,6 @@ if(BUILD_ETCD_CORE_ONLY)
add_library(etcd-cpp-api-core $<TARGET_OBJECTS:etcd-cpp-api-core-objects>) add_library(etcd-cpp-api-core $<TARGET_OBJECTS:etcd-cpp-api-core-objects>)
use_cxx(etcd-cpp-api-core) use_cxx(etcd-cpp-api-core)
target_link_libraries(etcd-cpp-api-core PUBLIC target_link_libraries(etcd-cpp-api-core PUBLIC
${Boost_LIBRARIES}
${PROTOBUF_LIBRARIES} ${PROTOBUF_LIBRARIES}
${OPENSSL_LIBRARIES} ${OPENSSL_LIBRARIES}
${GRPC_LIBRARIES} ${GRPC_LIBRARIES}
@ -45,7 +43,6 @@ else()
"${CMAKE_CURRENT_SOURCE_DIR}/Client.cpp") "${CMAKE_CURRENT_SOURCE_DIR}/Client.cpp")
use_cxx(etcd-cpp-api) use_cxx(etcd-cpp-api)
target_link_libraries(etcd-cpp-api PUBLIC target_link_libraries(etcd-cpp-api PUBLIC
${Boost_LIBRARIES}
${CPPREST_LIB} # n.b.: the asynchronous client requires pplx in cpprestsdk ${CPPREST_LIB} # n.b.: the asynchronous client requires pplx in cpprestsdk
${PROTOBUF_LIBRARIES} ${PROTOBUF_LIBRARIES}
${OPENSSL_LIBRARIES} ${OPENSSL_LIBRARIES}

View File

@ -28,8 +28,6 @@
#include <thread> #include <thread>
#include <utility> #include <utility>
#include <boost/algorithm/string.hpp>
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
#include <grpc++/security/credentials.h> #include <grpc++/security/credentials.h>
#include "proto/rpc.grpc.pb.h" #include "proto/rpc.grpc.pb.h"

View File

@ -37,15 +37,14 @@ etcd::KeepAlive::KeepAlive(SyncClient const &client, int ttl, int64_t lease_id):
continue_next.store(true); continue_next.store(true);
stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(std::move(params))); stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(std::move(params)));
task_ = std::thread([this]() { refresh_task_ = std::thread([this]() {
try { try {
// start refresh // start refresh
this->refresh(); this->refresh();
context.run(); } catch (const std::exception &e) {
} catch (...) { // propagate the exception
eptr_ = std::current_exception(); eptr_ = std::current_exception();
} }
context.stop(); // clean up
}); });
} }
@ -84,14 +83,11 @@ etcd::KeepAlive::KeepAlive(SyncClient const &client,
params.lease_stub = stubs->leaseServiceStub.get(); params.lease_stub = stubs->leaseServiceStub.get();
stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(std::move(params))); stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(std::move(params)));
task_ = std::thread([this]() { refresh_task_ = std::thread([this]() {
try { try {
// start refresh // start refresh
this->refresh(); this->refresh();
context.run();
} catch (...) { } catch (...) {
// run canceller first
this->Cancel();
// propogate the exception // propogate the exception
eptr_ = std::current_exception(); eptr_ = std::current_exception();
if (handler_) { if (handler_) {
@ -117,23 +113,23 @@ etcd::KeepAlive::KeepAlive(std::string const & address,
etcd::KeepAlive::~KeepAlive() etcd::KeepAlive::~KeepAlive()
{ {
this->Cancel(); this->Cancel();
// clean up
if (task_.joinable()) {
task_.join();
}
} }
void etcd::KeepAlive::Cancel() void etcd::KeepAlive::Cancel()
{ {
std::lock_guard<std::recursive_mutex> scope_lock(mutex_for_refresh_);
if (!continue_next.exchange(false)) { if (!continue_next.exchange(false)) {
return; return;
} }
stubs->call->CancelKeepAlive();
if (keepalive_timer_) { // stop the thread
keepalive_timer_->cancel(); cv_for_refresh_.notify_all();
refresh_task_.join();
// send a cancel request
{
std::lock_guard<std::mutex> lock(mutex_for_refresh_);
stubs->call->CancelKeepAlive();
} }
context.stop();
} }
void etcd::KeepAlive::Check() { void etcd::KeepAlive::Check() {
@ -147,7 +143,7 @@ void etcd::KeepAlive::Check() {
// run canceller first // run canceller first
this->Cancel(); this->Cancel();
// propogate the exception, as we throw in `Check()`, the `handler` won't be touched // propagate the exception, as we throw in `Check()`, the `handler` won't be touched
eptr_ = std::current_exception(); eptr_ = std::current_exception();
if (handler_) { if (handler_) {
handler_(eptr_); handler_(eptr_);
@ -160,32 +156,27 @@ void etcd::KeepAlive::Check() {
void etcd::KeepAlive::refresh() void etcd::KeepAlive::refresh()
{ {
std::lock_guard<std::recursive_mutex> scope_lock(mutex_for_refresh_); while (true) {
if (!continue_next.load()) { if (!continue_next.load()) {
return; return;
} }
// minimal resolution: 1 second // minimal resolution: 1 second
int keepalive_ttl = std::max(ttl - 1, 1); int keepalive_ttl = std::max(ttl - 1, 1);
keepalive_timer_.reset(new boost::asio::steady_timer(context, std::chrono::seconds(keepalive_ttl))); {
keepalive_timer_->async_wait([this](const boost::system::error_code& error) { std::unique_lock<std::mutex> lock(mutex_for_refresh_);
if (error) { if (cv_for_refresh_.wait_for(lock, std::chrono::seconds(keepalive_ttl)) == std::cv_status::no_timeout) {
#ifndef NDEBUG return;
std::cerr << "keepalive timer cancelled: " << error << ", " << error.message() << std::endl;
#endif
} else {
if (this->continue_next.load()) {
// execute refresh
this->refresh_once();
// trigger the next round;
this->refresh();
} }
} }
});
// execute refresh
this->refresh_once();
}
} }
void etcd::KeepAlive::refresh_once() void etcd::KeepAlive::refresh_once()
{ {
std::lock_guard<std::recursive_mutex> scope_lock(mutex_for_refresh_); std::lock_guard<std::mutex> scope_lock(mutex_for_refresh_);
if (!continue_next.load()) { if (!continue_next.load()) {
return; return;
} }

View File

@ -29,8 +29,6 @@
#include <thread> #include <thread>
#include <utility> #include <utility>
#include <boost/algorithm/string.hpp>
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
#include <grpc++/security/credentials.h> #include <grpc++/security/credentials.h>
#include <grpc++/support/status_code_enum.h> #include <grpc++/support/status_code_enum.h>
@ -69,6 +67,32 @@
namespace etcd { namespace etcd {
namespace detail { namespace detail {
static void string_split(std::vector<std::string> &dests, std::string const &src, std::string const &seps) {
dests.clear();
std::string::const_iterator start = src.begin();
std::string::const_iterator end = src.end();
std::string::const_iterator next = std::find_first_of(start, end, seps.begin(), seps.end());
while (next != end) {
dests.push_back(std::string(start, next));
start = next + 1;
next = std::find_first_of(start, end, seps.begin(), seps.end());
}
if (start != end) {
dests.push_back(std::string(start, end));
}
}
static std::string string_join(std::vector<std::string> const &srcs, std::string const sep) {
std::stringstream ss;
if (!srcs.empty()) {
ss << srcs[0];
for (size_t i = 1; i < srcs.size(); ++i) {
ss << sep << srcs[i];
}
}
return ss.str();
}
static bool dns_resolve(std::string const &target, std::vector<std::string> &endpoints) { static bool dns_resolve(std::string const &target, std::vector<std::string> &endpoints) {
struct addrinfo hints = {}, *addrs; struct addrinfo hints = {}, *addrs;
hints.ai_family = AF_INET; hints.ai_family = AF_INET;
@ -76,7 +100,7 @@ static bool dns_resolve(std::string const &target, std::vector<std::string> &end
hints.ai_protocol = IPPROTO_TCP; hints.ai_protocol = IPPROTO_TCP;
std::vector<std::string> target_parts; std::vector<std::string> target_parts;
boost::split(target_parts, target, boost::is_any_of(":")); string_split(target_parts, target, ":");
if (target_parts.size() != 2) { if (target_parts.size() != 2) {
std::cerr << "warn: invalid URL: " << target << std::endl; std::cerr << "warn: invalid URL: " << target << std::endl;
return false; return false;
@ -116,7 +140,7 @@ static bool dns_resolve(std::string const &target, std::vector<std::string> &end
const std::string strip_and_resolve_addresses(std::string const &address) { const std::string strip_and_resolve_addresses(std::string const &address) {
std::vector<std::string> addresses; std::vector<std::string> addresses;
boost::algorithm::split(addresses, address, boost::algorithm::is_any_of(",;")); string_split(addresses, address, ",;");
std::string stripped_address; std::string stripped_address;
{ {
std::vector<std::string> stripped_addresses; std::vector<std::string> stripped_addresses;
@ -126,7 +150,7 @@ const std::string strip_and_resolve_addresses(std::string const &address) {
std::string target = idx == std::string::npos ? addr : addr.substr(idx + substr.length()); std::string target = idx == std::string::npos ? addr : addr.substr(idx + substr.length());
etcd::detail::dns_resolve(target, stripped_addresses); etcd::detail::dns_resolve(target, stripped_addresses);
} }
stripped_address = boost::algorithm::join(stripped_addresses, ","); stripped_address = string_join(stripped_addresses, ",");
} }
return "ipv4:///" + stripped_address; return "ipv4:///" + stripped_address;
} }

View File

@ -205,6 +205,7 @@ TEST_CASE("concurrent lock & unlock")
constexpr size_t trials = 192; constexpr size_t trials = 192;
std::function<void(std::string const &, const size_t)> locker = [&etcd](std::string const &key, const size_t index) { std::function<void(std::string const &, const size_t)> locker = [&etcd](std::string const &key, const size_t index) {
std::cout << "start lock for " << key << ", index is " << index << std::endl;
auto resp = etcd.lock(key).get(); auto resp = etcd.lock(key).get();
std::cout << "lock for " << index << " is ok, starts sleeping: ..." << resp.error_message() << std::endl << std::flush; std::cout << "lock for " << index << " is ok, starts sleeping: ..." << resp.error_message() << std::endl << std::flush;
REQUIRE(resp.is_ok()); REQUIRE(resp.is_ok());