Drop the boost dependency on the sync runtime
Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
parent
5e2884f362
commit
4e2434a836
|
|
@ -88,7 +88,6 @@ macro(use_cxx target)
|
||||||
endif()
|
endif()
|
||||||
endmacro(use_cxx)
|
endmacro(use_cxx)
|
||||||
|
|
||||||
find_package(Boost REQUIRED COMPONENTS system thread random)
|
|
||||||
if(APPLE)
|
if(APPLE)
|
||||||
# If we're on OS X check for Homebrew's copy of OpenSSL instead of Apple's
|
# If we're on OS X check for Homebrew's copy of OpenSSL instead of Apple's
|
||||||
if(NOT OpenSSL_DIR)
|
if(NOT OpenSSL_DIR)
|
||||||
|
|
@ -166,8 +165,7 @@ else()
|
||||||
set(CPPREST_LIB)
|
set(CPPREST_LIB)
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
include_directories(SYSTEM ${Boost_INCLUDE_DIR}
|
include_directories(SYSTEM ${CPPREST_INCLUDE_DIR}
|
||||||
${CPPREST_INCLUDE_DIR}
|
|
||||||
${PROTOBUF_INCLUDE_DIRS}
|
${PROTOBUF_INCLUDE_DIRS}
|
||||||
${GRPC_INCLUDE_DIR}
|
${GRPC_INCLUDE_DIR}
|
||||||
${OPENSSL_INCLUDE_DIR})
|
${OPENSSL_INCLUDE_DIR})
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,7 @@ i.e., `ETCDCTL_API=3`.
|
||||||
|
|
||||||
## Requirements
|
## Requirements
|
||||||
|
|
||||||
1. boost and openssl
|
1. boost and openssl (**Note that boost is only required if you need the asynchronous runtime**)
|
||||||
|
|
||||||
+ On Ubuntu, above requirement could be installed as:
|
+ On Ubuntu, above requirement could be installed as:
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@
|
||||||
|
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
|
#include <condition_variable>
|
||||||
#include <exception>
|
#include <exception>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
|
|
@ -12,14 +13,6 @@
|
||||||
#include "etcd/SyncClient.hpp"
|
#include "etcd/SyncClient.hpp"
|
||||||
#include "etcd/Response.hpp"
|
#include "etcd/Response.hpp"
|
||||||
|
|
||||||
#include <boost/config.hpp>
|
|
||||||
#if BOOST_VERSION >= 106600
|
|
||||||
#include <boost/asio/io_context.hpp>
|
|
||||||
#else
|
|
||||||
#include <boost/asio/io_service.hpp>
|
|
||||||
#endif
|
|
||||||
#include <boost/asio/steady_timer.hpp>
|
|
||||||
|
|
||||||
namespace etcd
|
namespace etcd
|
||||||
{
|
{
|
||||||
// forward declaration to avoid header/library dependency
|
// forward declaration to avoid header/library dependency
|
||||||
|
|
@ -117,24 +110,18 @@ namespace etcd
|
||||||
|
|
||||||
// Don't use `pplx::task` to avoid sharing thread pool with other actions on the client
|
// Don't use `pplx::task` to avoid sharing thread pool with other actions on the client
|
||||||
// to avoid any potential blocking, which may block the keepalive loop and evict the lease.
|
// to avoid any potential blocking, which may block the keepalive loop and evict the lease.
|
||||||
std::thread task_;
|
std::thread refresh_task_;
|
||||||
|
|
||||||
int ttl;
|
int ttl;
|
||||||
int64_t lease_id;
|
int64_t lease_id;
|
||||||
|
|
||||||
// protect the initializing status of `timer`.
|
// protect the initializing status of `timer`.
|
||||||
std::recursive_mutex mutex_for_refresh_;
|
std::mutex mutex_for_refresh_;
|
||||||
|
std::condition_variable cv_for_refresh_;
|
||||||
std::atomic_bool continue_next;
|
std::atomic_bool continue_next;
|
||||||
|
|
||||||
// grpc timeout in `refresh()`
|
// grpc timeout in `refresh()`
|
||||||
mutable std::chrono::microseconds grpc_timeout = std::chrono::microseconds::zero();
|
mutable std::chrono::microseconds grpc_timeout = std::chrono::microseconds::zero();
|
||||||
|
|
||||||
#if BOOST_VERSION >= 106600
|
|
||||||
boost::asio::io_context context;
|
|
||||||
#else
|
|
||||||
boost::asio::io_service context;
|
|
||||||
#endif
|
|
||||||
std::unique_ptr<boost::asio::steady_timer> keepalive_timer_;
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,6 @@ use_cxx(etcd-cpp-api-core-objects)
|
||||||
add_dependencies(etcd-cpp-api-core-objects protobuf_generates)
|
add_dependencies(etcd-cpp-api-core-objects protobuf_generates)
|
||||||
include_generated_protobuf_files(etcd-cpp-api-core-objects)
|
include_generated_protobuf_files(etcd-cpp-api-core-objects)
|
||||||
target_link_libraries(etcd-cpp-api-core-objects PUBLIC
|
target_link_libraries(etcd-cpp-api-core-objects PUBLIC
|
||||||
${Boost_LIBRARIES}
|
|
||||||
${PROTOBUF_LIBRARIES}
|
${PROTOBUF_LIBRARIES}
|
||||||
${OPENSSL_LIBRARIES}
|
${OPENSSL_LIBRARIES}
|
||||||
${GRPC_LIBRARIES}
|
${GRPC_LIBRARIES}
|
||||||
|
|
@ -33,7 +32,6 @@ if(BUILD_ETCD_CORE_ONLY)
|
||||||
add_library(etcd-cpp-api-core $<TARGET_OBJECTS:etcd-cpp-api-core-objects>)
|
add_library(etcd-cpp-api-core $<TARGET_OBJECTS:etcd-cpp-api-core-objects>)
|
||||||
use_cxx(etcd-cpp-api-core)
|
use_cxx(etcd-cpp-api-core)
|
||||||
target_link_libraries(etcd-cpp-api-core PUBLIC
|
target_link_libraries(etcd-cpp-api-core PUBLIC
|
||||||
${Boost_LIBRARIES}
|
|
||||||
${PROTOBUF_LIBRARIES}
|
${PROTOBUF_LIBRARIES}
|
||||||
${OPENSSL_LIBRARIES}
|
${OPENSSL_LIBRARIES}
|
||||||
${GRPC_LIBRARIES}
|
${GRPC_LIBRARIES}
|
||||||
|
|
@ -45,7 +43,6 @@ else()
|
||||||
"${CMAKE_CURRENT_SOURCE_DIR}/Client.cpp")
|
"${CMAKE_CURRENT_SOURCE_DIR}/Client.cpp")
|
||||||
use_cxx(etcd-cpp-api)
|
use_cxx(etcd-cpp-api)
|
||||||
target_link_libraries(etcd-cpp-api PUBLIC
|
target_link_libraries(etcd-cpp-api PUBLIC
|
||||||
${Boost_LIBRARIES}
|
|
||||||
${CPPREST_LIB} # n.b.: the asynchronous client requires pplx in cpprestsdk
|
${CPPREST_LIB} # n.b.: the asynchronous client requires pplx in cpprestsdk
|
||||||
${PROTOBUF_LIBRARIES}
|
${PROTOBUF_LIBRARIES}
|
||||||
${OPENSSL_LIBRARIES}
|
${OPENSSL_LIBRARIES}
|
||||||
|
|
|
||||||
|
|
@ -28,8 +28,6 @@
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
|
||||||
#include <boost/algorithm/string.hpp>
|
|
||||||
|
|
||||||
#include <grpc++/grpc++.h>
|
#include <grpc++/grpc++.h>
|
||||||
#include <grpc++/security/credentials.h>
|
#include <grpc++/security/credentials.h>
|
||||||
#include "proto/rpc.grpc.pb.h"
|
#include "proto/rpc.grpc.pb.h"
|
||||||
|
|
|
||||||
|
|
@ -37,15 +37,14 @@ etcd::KeepAlive::KeepAlive(SyncClient const &client, int ttl, int64_t lease_id):
|
||||||
continue_next.store(true);
|
continue_next.store(true);
|
||||||
|
|
||||||
stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(std::move(params)));
|
stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(std::move(params)));
|
||||||
task_ = std::thread([this]() {
|
refresh_task_ = std::thread([this]() {
|
||||||
try {
|
try {
|
||||||
// start refresh
|
// start refresh
|
||||||
this->refresh();
|
this->refresh();
|
||||||
context.run();
|
} catch (const std::exception &e) {
|
||||||
} catch (...) {
|
// propagate the exception
|
||||||
eptr_ = std::current_exception();
|
eptr_ = std::current_exception();
|
||||||
}
|
}
|
||||||
context.stop(); // clean up
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -84,14 +83,11 @@ etcd::KeepAlive::KeepAlive(SyncClient const &client,
|
||||||
params.lease_stub = stubs->leaseServiceStub.get();
|
params.lease_stub = stubs->leaseServiceStub.get();
|
||||||
|
|
||||||
stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(std::move(params)));
|
stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(std::move(params)));
|
||||||
task_ = std::thread([this]() {
|
refresh_task_ = std::thread([this]() {
|
||||||
try {
|
try {
|
||||||
// start refresh
|
// start refresh
|
||||||
this->refresh();
|
this->refresh();
|
||||||
context.run();
|
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
// run canceller first
|
|
||||||
this->Cancel();
|
|
||||||
// propogate the exception
|
// propogate the exception
|
||||||
eptr_ = std::current_exception();
|
eptr_ = std::current_exception();
|
||||||
if (handler_) {
|
if (handler_) {
|
||||||
|
|
@ -117,23 +113,23 @@ etcd::KeepAlive::KeepAlive(std::string const & address,
|
||||||
etcd::KeepAlive::~KeepAlive()
|
etcd::KeepAlive::~KeepAlive()
|
||||||
{
|
{
|
||||||
this->Cancel();
|
this->Cancel();
|
||||||
// clean up
|
|
||||||
if (task_.joinable()) {
|
|
||||||
task_.join();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void etcd::KeepAlive::Cancel()
|
void etcd::KeepAlive::Cancel()
|
||||||
{
|
{
|
||||||
std::lock_guard<std::recursive_mutex> scope_lock(mutex_for_refresh_);
|
|
||||||
if (!continue_next.exchange(false)) {
|
if (!continue_next.exchange(false)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
stubs->call->CancelKeepAlive();
|
|
||||||
if (keepalive_timer_) {
|
// stop the thread
|
||||||
keepalive_timer_->cancel();
|
cv_for_refresh_.notify_all();
|
||||||
|
refresh_task_.join();
|
||||||
|
|
||||||
|
// send a cancel request
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(mutex_for_refresh_);
|
||||||
|
stubs->call->CancelKeepAlive();
|
||||||
}
|
}
|
||||||
context.stop();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void etcd::KeepAlive::Check() {
|
void etcd::KeepAlive::Check() {
|
||||||
|
|
@ -147,7 +143,7 @@ void etcd::KeepAlive::Check() {
|
||||||
// run canceller first
|
// run canceller first
|
||||||
this->Cancel();
|
this->Cancel();
|
||||||
|
|
||||||
// propogate the exception, as we throw in `Check()`, the `handler` won't be touched
|
// propagate the exception, as we throw in `Check()`, the `handler` won't be touched
|
||||||
eptr_ = std::current_exception();
|
eptr_ = std::current_exception();
|
||||||
if (handler_) {
|
if (handler_) {
|
||||||
handler_(eptr_);
|
handler_(eptr_);
|
||||||
|
|
@ -160,32 +156,27 @@ void etcd::KeepAlive::Check() {
|
||||||
|
|
||||||
void etcd::KeepAlive::refresh()
|
void etcd::KeepAlive::refresh()
|
||||||
{
|
{
|
||||||
std::lock_guard<std::recursive_mutex> scope_lock(mutex_for_refresh_);
|
while (true) {
|
||||||
if (!continue_next.load()) {
|
if (!continue_next.load()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// minimal resolution: 1 second
|
// minimal resolution: 1 second
|
||||||
int keepalive_ttl = std::max(ttl - 1, 1);
|
int keepalive_ttl = std::max(ttl - 1, 1);
|
||||||
keepalive_timer_.reset(new boost::asio::steady_timer(context, std::chrono::seconds(keepalive_ttl)));
|
{
|
||||||
keepalive_timer_->async_wait([this](const boost::system::error_code& error) {
|
std::unique_lock<std::mutex> lock(mutex_for_refresh_);
|
||||||
if (error) {
|
if (cv_for_refresh_.wait_for(lock, std::chrono::seconds(keepalive_ttl)) == std::cv_status::no_timeout) {
|
||||||
#ifndef NDEBUG
|
return;
|
||||||
std::cerr << "keepalive timer cancelled: " << error << ", " << error.message() << std::endl;
|
|
||||||
#endif
|
|
||||||
} else {
|
|
||||||
if (this->continue_next.load()) {
|
|
||||||
// execute refresh
|
|
||||||
this->refresh_once();
|
|
||||||
// trigger the next round;
|
|
||||||
this->refresh();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
// execute refresh
|
||||||
|
this->refresh_once();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void etcd::KeepAlive::refresh_once()
|
void etcd::KeepAlive::refresh_once()
|
||||||
{
|
{
|
||||||
std::lock_guard<std::recursive_mutex> scope_lock(mutex_for_refresh_);
|
std::lock_guard<std::mutex> scope_lock(mutex_for_refresh_);
|
||||||
if (!continue_next.load()) {
|
if (!continue_next.load()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -29,8 +29,6 @@
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
|
||||||
#include <boost/algorithm/string.hpp>
|
|
||||||
|
|
||||||
#include <grpc++/grpc++.h>
|
#include <grpc++/grpc++.h>
|
||||||
#include <grpc++/security/credentials.h>
|
#include <grpc++/security/credentials.h>
|
||||||
#include <grpc++/support/status_code_enum.h>
|
#include <grpc++/support/status_code_enum.h>
|
||||||
|
|
@ -69,6 +67,32 @@
|
||||||
namespace etcd {
|
namespace etcd {
|
||||||
namespace detail {
|
namespace detail {
|
||||||
|
|
||||||
|
static void string_split(std::vector<std::string> &dests, std::string const &src, std::string const &seps) {
|
||||||
|
dests.clear();
|
||||||
|
std::string::const_iterator start = src.begin();
|
||||||
|
std::string::const_iterator end = src.end();
|
||||||
|
std::string::const_iterator next = std::find_first_of(start, end, seps.begin(), seps.end());
|
||||||
|
while (next != end) {
|
||||||
|
dests.push_back(std::string(start, next));
|
||||||
|
start = next + 1;
|
||||||
|
next = std::find_first_of(start, end, seps.begin(), seps.end());
|
||||||
|
}
|
||||||
|
if (start != end) {
|
||||||
|
dests.push_back(std::string(start, end));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static std::string string_join(std::vector<std::string> const &srcs, std::string const sep) {
|
||||||
|
std::stringstream ss;
|
||||||
|
if (!srcs.empty()) {
|
||||||
|
ss << srcs[0];
|
||||||
|
for (size_t i = 1; i < srcs.size(); ++i) {
|
||||||
|
ss << sep << srcs[i];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ss.str();
|
||||||
|
}
|
||||||
|
|
||||||
static bool dns_resolve(std::string const &target, std::vector<std::string> &endpoints) {
|
static bool dns_resolve(std::string const &target, std::vector<std::string> &endpoints) {
|
||||||
struct addrinfo hints = {}, *addrs;
|
struct addrinfo hints = {}, *addrs;
|
||||||
hints.ai_family = AF_INET;
|
hints.ai_family = AF_INET;
|
||||||
|
|
@ -76,7 +100,7 @@ static bool dns_resolve(std::string const &target, std::vector<std::string> &end
|
||||||
hints.ai_protocol = IPPROTO_TCP;
|
hints.ai_protocol = IPPROTO_TCP;
|
||||||
|
|
||||||
std::vector<std::string> target_parts;
|
std::vector<std::string> target_parts;
|
||||||
boost::split(target_parts, target, boost::is_any_of(":"));
|
string_split(target_parts, target, ":");
|
||||||
if (target_parts.size() != 2) {
|
if (target_parts.size() != 2) {
|
||||||
std::cerr << "warn: invalid URL: " << target << std::endl;
|
std::cerr << "warn: invalid URL: " << target << std::endl;
|
||||||
return false;
|
return false;
|
||||||
|
|
@ -116,7 +140,7 @@ static bool dns_resolve(std::string const &target, std::vector<std::string> &end
|
||||||
|
|
||||||
const std::string strip_and_resolve_addresses(std::string const &address) {
|
const std::string strip_and_resolve_addresses(std::string const &address) {
|
||||||
std::vector<std::string> addresses;
|
std::vector<std::string> addresses;
|
||||||
boost::algorithm::split(addresses, address, boost::algorithm::is_any_of(",;"));
|
string_split(addresses, address, ",;");
|
||||||
std::string stripped_address;
|
std::string stripped_address;
|
||||||
{
|
{
|
||||||
std::vector<std::string> stripped_addresses;
|
std::vector<std::string> stripped_addresses;
|
||||||
|
|
@ -126,7 +150,7 @@ const std::string strip_and_resolve_addresses(std::string const &address) {
|
||||||
std::string target = idx == std::string::npos ? addr : addr.substr(idx + substr.length());
|
std::string target = idx == std::string::npos ? addr : addr.substr(idx + substr.length());
|
||||||
etcd::detail::dns_resolve(target, stripped_addresses);
|
etcd::detail::dns_resolve(target, stripped_addresses);
|
||||||
}
|
}
|
||||||
stripped_address = boost::algorithm::join(stripped_addresses, ",");
|
stripped_address = string_join(stripped_addresses, ",");
|
||||||
}
|
}
|
||||||
return "ipv4:///" + stripped_address;
|
return "ipv4:///" + stripped_address;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -205,6 +205,7 @@ TEST_CASE("concurrent lock & unlock")
|
||||||
constexpr size_t trials = 192;
|
constexpr size_t trials = 192;
|
||||||
|
|
||||||
std::function<void(std::string const &, const size_t)> locker = [&etcd](std::string const &key, const size_t index) {
|
std::function<void(std::string const &, const size_t)> locker = [&etcd](std::string const &key, const size_t index) {
|
||||||
|
std::cout << "start lock for " << key << ", index is " << index << std::endl;
|
||||||
auto resp = etcd.lock(key).get();
|
auto resp = etcd.lock(key).get();
|
||||||
std::cout << "lock for " << index << " is ok, starts sleeping: ..." << resp.error_message() << std::endl << std::flush;
|
std::cout << "lock for " << index << " is ok, starts sleeping: ..." << resp.error_message() << std::endl << std::flush;
|
||||||
REQUIRE(resp.is_ok());
|
REQUIRE(resp.is_ok());
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue