Enable -fno-exceptions support (#261)
Resolves #259 Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
parent
5aff57cce5
commit
b82efea7a9
|
|
@ -26,6 +26,7 @@ if(NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES)
|
||||||
)
|
)
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
|
option(BUILD_WITH_NO_EXCEPTIONS "Build etcd-cpp-apiv3 with disabling exception handling, i.e., -fno-exceptions" OFF)
|
||||||
option(BUILD_SHARED_LIBS "Build etcd-cpp-apiv3 shared libraries" ON)
|
option(BUILD_SHARED_LIBS "Build etcd-cpp-apiv3 shared libraries" ON)
|
||||||
option(BUILD_ETCD_CORE_ONLY "Build etcd-cpp-apiv3 core library (the synchronous runtime) only" OFF)
|
option(BUILD_ETCD_CORE_ONLY "Build etcd-cpp-apiv3 core library (the synchronous runtime) only" OFF)
|
||||||
option(BUILD_ETCD_TESTS "Build etcd-cpp-apiv3 test cases" OFF)
|
option(BUILD_ETCD_TESTS "Build etcd-cpp-apiv3 test cases" OFF)
|
||||||
|
|
@ -88,6 +89,21 @@ macro(use_cxx target)
|
||||||
endif()
|
endif()
|
||||||
endmacro(use_cxx)
|
endmacro(use_cxx)
|
||||||
|
|
||||||
|
macro(set_exceptions target)
|
||||||
|
if(BUILD_NO_EXCEPTIONS)
|
||||||
|
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
|
||||||
|
target_compile_options(${target} PRIVATE "-fno-exceptions")
|
||||||
|
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
|
||||||
|
target_compile_options(${target} PRIVATE "-fno-exceptions")
|
||||||
|
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang")
|
||||||
|
target_compile_options(${target} PRIVATE "-fno-exceptions")
|
||||||
|
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "MSVC")
|
||||||
|
target_compile_options(${target} PRIVATE "/EHs-c-")
|
||||||
|
endif()
|
||||||
|
target_compile_definitions(${target} PUBLIC -D_ETCD_NO_EXCEPTIONS)
|
||||||
|
endif()
|
||||||
|
endmacro(set_exceptions)
|
||||||
|
|
||||||
if(APPLE)
|
if(APPLE)
|
||||||
# If we're on OS X check for Homebrew's copy of OpenSSL instead of Apple's
|
# If we're on OS X check for Homebrew's copy of OpenSSL instead of Apple's
|
||||||
if(NOT OpenSSL_DIR)
|
if(NOT OpenSSL_DIR)
|
||||||
|
|
|
||||||
12
README.md
12
README.md
|
|
@ -863,6 +863,8 @@ Without handler, the internal state can be checked via `KeepAlive::Check()` and
|
||||||
the async exception when there are errors during keeping the lease alive.
|
the async exception when there are errors during keeping the lease alive.
|
||||||
|
|
||||||
Note that even with `handler`, the `KeepAlive::Check()` still rethrow if there's an async exception.
|
Note that even with `handler`, the `KeepAlive::Check()` still rethrow if there's an async exception.
|
||||||
|
When the library is built with `-fno-exceptions`, the `handler` argument and the `Check()` method
|
||||||
|
will abort the program when there are errors during keeping the lease alive.
|
||||||
|
|
||||||
### Etcd transactions
|
### Etcd transactions
|
||||||
|
|
||||||
|
|
@ -960,7 +962,15 @@ The observer stream will be canceled when been destructed.
|
||||||
|
|
||||||
for more details, please refer to [etcd/Client.hpp](./etcd/Client.hpp) and [tst/ElectionTest.cpp](./tst/ElectionTest.cpp).
|
for more details, please refer to [etcd/Client.hpp](./etcd/Client.hpp) and [tst/ElectionTest.cpp](./tst/ElectionTest.cpp).
|
||||||
|
|
||||||
### TODO
|
## `-fno-exceptions`
|
||||||
|
|
||||||
|
The _etcd-cpp-apiv3_ library supports to be built with `-fno-exceptions` flag, controlled by the
|
||||||
|
cmake option `BUILD_WITH_NO_EXCEPTIONS=ON/OFF` (defaults to `OFF`).
|
||||||
|
|
||||||
|
When building with `-fno-exceptions`, the library will abort the program under certain circumstances,
|
||||||
|
e.g., when calling `.Check()` method of `KeepAlive` and there are errors during keeping the lease alive,
|
||||||
|
|
||||||
|
## TODO
|
||||||
|
|
||||||
1. Cancellation of asynchronous calls(except for watch)
|
1. Cancellation of asynchronous calls(except for watch)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -85,10 +85,10 @@ class KeepAlive {
|
||||||
~KeepAlive();
|
~KeepAlive();
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
// automatically refresh loop
|
// automatically refresh loop, returns the error message if failed
|
||||||
void refresh();
|
std::string refresh();
|
||||||
// refresh once immediately
|
// refresh once immediately, returns the error message if failed
|
||||||
void refresh_once();
|
std::string refresh_once();
|
||||||
|
|
||||||
struct EtcdServerStubs;
|
struct EtcdServerStubs;
|
||||||
struct EtcdServerStubsDeleter {
|
struct EtcdServerStubsDeleter {
|
||||||
|
|
|
||||||
|
|
@ -34,7 +34,7 @@ class KeepAlive;
|
||||||
class Watcher;
|
class Watcher;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The Reponse object received for the requests of etcd::Client
|
* The Response object received for the requests of etcd::Client
|
||||||
*/
|
*/
|
||||||
class Response {
|
class Response {
|
||||||
public:
|
public:
|
||||||
|
|
|
||||||
|
|
@ -269,7 +269,7 @@ class AsyncLeaseKeepAliveAction : public etcdv3::Action {
|
||||||
stream;
|
stream;
|
||||||
|
|
||||||
LeaseKeepAliveRequest req;
|
LeaseKeepAliveRequest req;
|
||||||
bool isCancelled;
|
std::atomic_bool isCancelled;
|
||||||
std::recursive_mutex protect_is_cancelled;
|
std::recursive_mutex protect_is_cancelled;
|
||||||
|
|
||||||
friend class etcd::KeepAlive;
|
friend class etcd::KeepAlive;
|
||||||
|
|
@ -330,7 +330,7 @@ class AsyncObserveAction : public etcdv3::Action {
|
||||||
LeaderResponse reply;
|
LeaderResponse reply;
|
||||||
std::unique_ptr<ClientAsyncReader<LeaderResponse>> response_reader;
|
std::unique_ptr<ClientAsyncReader<LeaderResponse>> response_reader;
|
||||||
std::atomic_bool isCancelled;
|
std::atomic_bool isCancelled;
|
||||||
std::mutex protect_is_cancalled;
|
std::mutex protect_is_cancelled;
|
||||||
};
|
};
|
||||||
|
|
||||||
class AsyncProclaimAction : public etcdv3::Action {
|
class AsyncProclaimAction : public etcdv3::Action {
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ file(GLOB_RECURSE CPP_CLIENT_CORE_SRC
|
||||||
|
|
||||||
add_library(etcd-cpp-api-core-objects OBJECT ${CPP_CLIENT_CORE_SRC} ${PROTOBUF_GENERATES})
|
add_library(etcd-cpp-api-core-objects OBJECT ${CPP_CLIENT_CORE_SRC} ${PROTOBUF_GENERATES})
|
||||||
use_cxx(etcd-cpp-api-core-objects)
|
use_cxx(etcd-cpp-api-core-objects)
|
||||||
|
set_exceptions(etcd-cpp-api-core-objects)
|
||||||
add_dependencies(etcd-cpp-api-core-objects protobuf_generates)
|
add_dependencies(etcd-cpp-api-core-objects protobuf_generates)
|
||||||
include_generated_protobuf_files(etcd-cpp-api-core-objects)
|
include_generated_protobuf_files(etcd-cpp-api-core-objects)
|
||||||
target_link_libraries(etcd-cpp-api-core-objects PUBLIC
|
target_link_libraries(etcd-cpp-api-core-objects PUBLIC
|
||||||
|
|
@ -35,6 +36,7 @@ if(BUILD_ETCD_CORE_ONLY)
|
||||||
# add the core library, includes the sycnhronous client only
|
# add the core library, includes the sycnhronous client only
|
||||||
add_library(etcd-cpp-api-core $<TARGET_OBJECTS:etcd-cpp-api-core-objects>)
|
add_library(etcd-cpp-api-core $<TARGET_OBJECTS:etcd-cpp-api-core-objects>)
|
||||||
use_cxx(etcd-cpp-api-core)
|
use_cxx(etcd-cpp-api-core)
|
||||||
|
set_exceptions(etcd-cpp-api-core)
|
||||||
target_link_libraries(etcd-cpp-api-core PUBLIC
|
target_link_libraries(etcd-cpp-api-core PUBLIC
|
||||||
${OPENSSL_LIBRARIES}
|
${OPENSSL_LIBRARIES}
|
||||||
${GRPC_LIBRARIES}
|
${GRPC_LIBRARIES}
|
||||||
|
|
@ -50,6 +52,7 @@ else()
|
||||||
add_library(etcd-cpp-api $<TARGET_OBJECTS:etcd-cpp-api-core-objects>
|
add_library(etcd-cpp-api $<TARGET_OBJECTS:etcd-cpp-api-core-objects>
|
||||||
"${CMAKE_CURRENT_SOURCE_DIR}/Client.cpp")
|
"${CMAKE_CURRENT_SOURCE_DIR}/Client.cpp")
|
||||||
use_cxx(etcd-cpp-api)
|
use_cxx(etcd-cpp-api)
|
||||||
|
set_exceptions(etcd-cpp-api)
|
||||||
target_link_libraries(etcd-cpp-api PUBLIC
|
target_link_libraries(etcd-cpp-api PUBLIC
|
||||||
${CPPREST_LIB} # n.b.: the asynchronous client requires pplx in cpprestsdk
|
${CPPREST_LIB} # n.b.: the asynchronous client requires pplx in cpprestsdk
|
||||||
${OPENSSL_LIBRARIES}
|
${OPENSSL_LIBRARIES}
|
||||||
|
|
|
||||||
|
|
@ -46,6 +46,7 @@ etcd::KeepAlive::KeepAlive(SyncClient const& client, int ttl, int64_t lease_id)
|
||||||
|
|
||||||
stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(std::move(params)));
|
stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(std::move(params)));
|
||||||
refresh_task_ = std::thread([this]() {
|
refresh_task_ = std::thread([this]() {
|
||||||
|
#ifndef _ETCD_NO_EXCEPTIONS
|
||||||
try {
|
try {
|
||||||
// start refresh
|
// start refresh
|
||||||
this->refresh();
|
this->refresh();
|
||||||
|
|
@ -53,6 +54,12 @@ etcd::KeepAlive::KeepAlive(SyncClient const& client, int ttl, int64_t lease_id)
|
||||||
// propagate the exception
|
// propagate the exception
|
||||||
eptr_ = std::current_exception();
|
eptr_ = std::current_exception();
|
||||||
}
|
}
|
||||||
|
#else
|
||||||
|
const std::string err = this->refresh();
|
||||||
|
if (!err.empty()) {
|
||||||
|
eptr_ = std::make_exception_ptr(std::runtime_error(err));
|
||||||
|
}
|
||||||
|
#endif
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -99,15 +106,22 @@ etcd::KeepAlive::KeepAlive(
|
||||||
|
|
||||||
stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(std::move(params)));
|
stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(std::move(params)));
|
||||||
refresh_task_ = std::thread([this]() {
|
refresh_task_ = std::thread([this]() {
|
||||||
|
#ifndef _ETCD_NO_EXCEPTIONS
|
||||||
try {
|
try {
|
||||||
// start refresh
|
// start refresh
|
||||||
this->refresh();
|
this->refresh();
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
// propogate the exception
|
// propagate the exception
|
||||||
eptr_ = std::current_exception();
|
eptr_ = std::current_exception();
|
||||||
if (handler_) {
|
}
|
||||||
handler_(eptr_);
|
#else
|
||||||
}
|
const std::string err = this->refresh();
|
||||||
|
if (!err.empty()) {
|
||||||
|
eptr_ = std::make_exception_ptr(std::runtime_error(err));
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
if (eptr_ && handler_) {
|
||||||
|
handler_(eptr_);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
@ -149,6 +163,7 @@ void etcd::KeepAlive::Check() {
|
||||||
std::rethrow_exception(eptr_);
|
std::rethrow_exception(eptr_);
|
||||||
}
|
}
|
||||||
// issue an refresh to make sure it still alive
|
// issue an refresh to make sure it still alive
|
||||||
|
#ifndef _ETCD_NO_EXCEPTIONS
|
||||||
try {
|
try {
|
||||||
this->refresh_once();
|
this->refresh_once();
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
|
|
@ -158,19 +173,35 @@ void etcd::KeepAlive::Check() {
|
||||||
// propagate the exception, as we throw in `Check()`, the `handler` won't be
|
// propagate the exception, as we throw in `Check()`, the `handler` won't be
|
||||||
// touched
|
// touched
|
||||||
eptr_ = std::current_exception();
|
eptr_ = std::current_exception();
|
||||||
if (handler_) {
|
}
|
||||||
handler_(eptr_);
|
#else
|
||||||
}
|
const std::string err = this->refresh_once();
|
||||||
|
if (!err.empty()) {
|
||||||
|
// run canceller first
|
||||||
|
this->Cancel();
|
||||||
|
|
||||||
|
// propagate the exception, as we throw in `Check()`, the `handler` won't be
|
||||||
|
// touched
|
||||||
|
eptr_ = std::make_exception_ptr(std::runtime_error(err));
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
if (eptr_ && handler_) {
|
||||||
|
handler_(eptr_);
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifndef _ETCD_NO_EXCEPTIONS
|
||||||
|
if (eptr_) {
|
||||||
// rethrow in `Check()` to keep the consistent semantics
|
// rethrow in `Check()` to keep the consistent semantics
|
||||||
std::rethrow_exception(eptr_);
|
std::rethrow_exception(eptr_);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
void etcd::KeepAlive::refresh() {
|
std::string etcd::KeepAlive::refresh() {
|
||||||
while (true) {
|
while (true) {
|
||||||
if (!continue_next.load()) {
|
if (!continue_next.load()) {
|
||||||
return;
|
return std::string{};
|
||||||
}
|
}
|
||||||
// minimal resolution: 1 second
|
// minimal resolution: 1 second
|
||||||
int keepalive_ttl = std::max(ttl - 1, 1);
|
int keepalive_ttl = std::max(ttl - 1, 1);
|
||||||
|
|
@ -179,7 +210,7 @@ void etcd::KeepAlive::refresh() {
|
||||||
if (cv_for_refresh_.wait_for(lock, std::chrono::seconds(keepalive_ttl)) ==
|
if (cv_for_refresh_.wait_for(lock, std::chrono::seconds(keepalive_ttl)) ==
|
||||||
std::cv_status::no_timeout) {
|
std::cv_status::no_timeout) {
|
||||||
if (!continue_next.load()) {
|
if (!continue_next.load()) {
|
||||||
return;
|
return std::string{};
|
||||||
}
|
}
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
std::cerr
|
std::cerr
|
||||||
|
|
@ -191,24 +222,37 @@ void etcd::KeepAlive::refresh() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// execute refresh
|
// execute refresh
|
||||||
this->refresh_once();
|
const std::string err = this->refresh_once();
|
||||||
|
if (!err.empty()) {
|
||||||
|
return err;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return std::string{};
|
||||||
}
|
}
|
||||||
|
|
||||||
void etcd::KeepAlive::refresh_once() {
|
std::string etcd::KeepAlive::refresh_once() {
|
||||||
std::lock_guard<std::mutex> scope_lock(mutex_for_refresh_);
|
std::lock_guard<std::mutex> scope_lock(mutex_for_refresh_);
|
||||||
if (!continue_next.load()) {
|
if (!continue_next.load()) {
|
||||||
return;
|
return std::string{};
|
||||||
}
|
}
|
||||||
this->stubs->call->mutable_parameters().grpc_timeout = this->grpc_timeout;
|
this->stubs->call->mutable_parameters().grpc_timeout = this->grpc_timeout;
|
||||||
auto resp = this->stubs->call->Refresh();
|
auto resp = this->stubs->call->Refresh();
|
||||||
if (!resp.is_ok()) {
|
if (!resp.is_ok()) {
|
||||||
throw std::runtime_error("Failed to refresh lease: error code: " +
|
const std::string err = "Failed to refresh lease: error code: " +
|
||||||
std::to_string(resp.error_code()) +
|
std::to_string(resp.error_code()) +
|
||||||
", message: " + resp.error_message());
|
", message: " + resp.error_message();
|
||||||
|
#ifndef _ETCD_NO_EXCEPTIONS
|
||||||
|
throw std::runtime_error(err);
|
||||||
|
#endif
|
||||||
|
return err;
|
||||||
}
|
}
|
||||||
if (resp.value().ttl() == 0) {
|
if (resp.value().ttl() == 0) {
|
||||||
throw std::out_of_range(
|
const std::string err =
|
||||||
"Failed to refresh lease due to expiration: the new TTL is 0.");
|
"Failed to refresh lease due to expiration: the new TTL is 0.";
|
||||||
|
#ifndef _ETCD_NO_EXCEPTIONS
|
||||||
|
throw std::out_of_range(err);
|
||||||
|
#endif
|
||||||
|
return err;
|
||||||
}
|
}
|
||||||
|
return std::string{};
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -288,7 +288,11 @@ class etcd::SyncClient::TokenAuthenticator {
|
||||||
// auth
|
// auth
|
||||||
if (!etcd::detail::authenticate(this->channel_, username_, password_,
|
if (!etcd::detail::authenticate(this->channel_, username_, password_,
|
||||||
token_)) {
|
token_)) {
|
||||||
throw std::invalid_argument("Etcd authentication failed: " + token_);
|
// n.b.: no throw here as the failure of auth will be propagated
|
||||||
|
// to client when it is asked to issue requests.
|
||||||
|
//
|
||||||
|
// throw std::invalid_argument("Etcd authentication failed: " +
|
||||||
|
// token_);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -512,7 +512,10 @@ etcdv3::AsyncLeaseKeepAliveAction::AsyncLeaseKeepAliveAction(
|
||||||
got_tag == (void*) etcdv3::KEEPALIVE_CREATE) {
|
got_tag == (void*) etcdv3::KEEPALIVE_CREATE) {
|
||||||
// ok
|
// ok
|
||||||
} else {
|
} else {
|
||||||
throw std::runtime_error("Failed to create a lease keep-alive connection");
|
status = grpc::Status(grpc::StatusCode::CANCELLED,
|
||||||
|
"Failed to create a lease keep-alive connection");
|
||||||
|
// cannot continue for further refresh
|
||||||
|
isCancelled.store(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -534,7 +537,7 @@ etcd::Response etcdv3::AsyncLeaseKeepAliveAction::Refresh() {
|
||||||
std::lock_guard<std::recursive_mutex> scope_lock(this->protect_is_cancelled);
|
std::lock_guard<std::recursive_mutex> scope_lock(this->protect_is_cancelled);
|
||||||
|
|
||||||
auto start_timepoint = std::chrono::high_resolution_clock::now();
|
auto start_timepoint = std::chrono::high_resolution_clock::now();
|
||||||
if (isCancelled) {
|
if (isCancelled.load()) {
|
||||||
status = grpc::Status::CANCELLED;
|
status = grpc::Status::CANCELLED;
|
||||||
return etcd::Response(ParseResponse(),
|
return etcd::Response(ParseResponse(),
|
||||||
etcd::detail::duration_till_now(start_timepoint));
|
etcd::detail::duration_till_now(start_timepoint));
|
||||||
|
|
@ -621,9 +624,7 @@ etcd::Response etcdv3::AsyncLeaseKeepAliveAction::Refresh() {
|
||||||
|
|
||||||
void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive() {
|
void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive() {
|
||||||
std::lock_guard<std::recursive_mutex> scope_lock(this->protect_is_cancelled);
|
std::lock_guard<std::recursive_mutex> scope_lock(this->protect_is_cancelled);
|
||||||
if (isCancelled == false) {
|
if (!isCancelled.exchange(true)) {
|
||||||
isCancelled = true;
|
|
||||||
|
|
||||||
void* got_tag = nullptr;
|
void* got_tag = nullptr;
|
||||||
bool ok = false;
|
bool ok = false;
|
||||||
|
|
||||||
|
|
@ -658,7 +659,7 @@ void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive() {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool etcdv3::AsyncLeaseKeepAliveAction::Cancelled() const {
|
bool etcdv3::AsyncLeaseKeepAliveAction::Cancelled() const {
|
||||||
return isCancelled;
|
return isCancelled.load();
|
||||||
}
|
}
|
||||||
|
|
||||||
etcdv3::ActionParameters&
|
etcdv3::ActionParameters&
|
||||||
|
|
@ -782,7 +783,10 @@ etcdv3::AsyncObserveAction::AsyncObserveAction(
|
||||||
got_tag == (void*) etcdv3::ELECTION_OBSERVE_CREATE) {
|
got_tag == (void*) etcdv3::ELECTION_OBSERVE_CREATE) {
|
||||||
// n.b.: leave the issue of `Read` to the `waitForResponse`
|
// n.b.: leave the issue of `Read` to the `waitForResponse`
|
||||||
} else {
|
} else {
|
||||||
throw std::runtime_error("failed to create a observe connection");
|
status = grpc::Status(grpc::StatusCode::CANCELLED,
|
||||||
|
"failed to create a observe connection");
|
||||||
|
// cannot continue for further observing
|
||||||
|
isCancelled.store(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -810,7 +814,7 @@ void etcdv3::AsyncObserveAction::waitForResponse() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void etcdv3::AsyncObserveAction::CancelObserve() {
|
void etcdv3::AsyncObserveAction::CancelObserve() {
|
||||||
std::lock_guard<std::mutex> scope_lock(this->protect_is_cancalled);
|
std::lock_guard<std::mutex> scope_lock(this->protect_is_cancelled);
|
||||||
if (!isCancelled.exchange(true)) {
|
if (!isCancelled.exchange(true)) {
|
||||||
void* got_tag;
|
void* got_tag;
|
||||||
bool ok = false;
|
bool ok = false;
|
||||||
|
|
@ -1130,7 +1134,14 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters&& params)
|
||||||
got_tag == (void*) etcdv3::WATCH_CREATE) {
|
got_tag == (void*) etcdv3::WATCH_CREATE) {
|
||||||
stream->Write(watch_req, (void*) etcdv3::WATCH_WRITE);
|
stream->Write(watch_req, (void*) etcdv3::WATCH_WRITE);
|
||||||
} else {
|
} else {
|
||||||
throw std::runtime_error("failed to create a watch connection");
|
status = grpc::Status(grpc::StatusCode::CANCELLED,
|
||||||
|
"failed to create a watch connection");
|
||||||
|
// cannot continue for further watching
|
||||||
|
isCancelled.store(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!status.ok()) {
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait "write" (WatchCreateRequest) success, and start to read the first
|
// wait "write" (WatchCreateRequest) success, and start to read the first
|
||||||
|
|
@ -1138,7 +1149,10 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters&& params)
|
||||||
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void*) etcdv3::WATCH_WRITE) {
|
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void*) etcdv3::WATCH_WRITE) {
|
||||||
stream->Read(&reply, (void*) this);
|
stream->Read(&reply, (void*) this);
|
||||||
} else {
|
} else {
|
||||||
throw std::runtime_error("failed to write WatchCreateRequest to server");
|
status = grpc::Status(grpc::StatusCode::CANCELLED,
|
||||||
|
"failed to write WatchCreateRequest to server");
|
||||||
|
// cannot continue for further watching
|
||||||
|
isCancelled.store(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1164,6 +1178,11 @@ void etcdv3::AsyncWatchAction::waitForResponse() {
|
||||||
bool ok = false;
|
bool ok = false;
|
||||||
bool the_final_round = false;
|
bool the_final_round = false;
|
||||||
|
|
||||||
|
// failed to create the watcher
|
||||||
|
if (!status.ok()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (!the_final_round) {
|
if (!the_final_round) {
|
||||||
if (!cq_.Next(&got_tag, &ok)) {
|
if (!cq_.Next(&got_tag, &ok)) {
|
||||||
|
|
@ -1256,6 +1275,14 @@ void etcdv3::AsyncWatchAction::waitForResponse(
|
||||||
bool ok = false;
|
bool ok = false;
|
||||||
bool the_final_round = false;
|
bool the_final_round = false;
|
||||||
|
|
||||||
|
// failed to create the watcher
|
||||||
|
if (!status.ok()) {
|
||||||
|
auto resp = ParseResponse();
|
||||||
|
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
|
||||||
|
std::chrono::high_resolution_clock::now() - start_timepoint);
|
||||||
|
callback(etcd::Response(resp, duration));
|
||||||
|
}
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (!the_final_round) {
|
if (!the_final_round) {
|
||||||
if (!cq_.Next(&got_tag, &ok)) {
|
if (!cq_.Next(&got_tag, &ok)) {
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ foreach(testfile ${TEST_FILES})
|
||||||
add_executable(${test_name} EXCLUDE_FROM_ALL ${CMAKE_CURRENT_SOURCE_DIR}/${testfile})
|
add_executable(${test_name} EXCLUDE_FROM_ALL ${CMAKE_CURRENT_SOURCE_DIR}/${testfile})
|
||||||
endif()
|
endif()
|
||||||
use_cxx(${test_name})
|
use_cxx(${test_name})
|
||||||
|
set_exceptions(${test_name})
|
||||||
add_test(NAME ${test_name} COMMAND $<TARGET_FILE:${test_name}>)
|
add_test(NAME ${test_name} COMMAND $<TARGET_FILE:${test_name}>)
|
||||||
|
|
||||||
target_include_directories(${test_name} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/../proto/gen)
|
target_include_directories(${test_name} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/../proto/gen)
|
||||||
|
|
|
||||||
|
|
@ -31,7 +31,9 @@ TEST_CASE("keepalive revoke and check if alive") {
|
||||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||||
|
|
||||||
// expect keep_alive->Check() to throw exception
|
// expect keep_alive->Check() to throw exception
|
||||||
|
#ifndef _ETCD_NO_EXCEPTIONS
|
||||||
REQUIRE_THROWS(keepalive->Check());
|
REQUIRE_THROWS(keepalive->Check());
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_CASE("keepalive won't expire") {
|
TEST_CASE("keepalive won't expire") {
|
||||||
|
|
@ -45,6 +47,7 @@ TEST_CASE("keepalive won't expire") {
|
||||||
auto lease_id = resp.value().lease();
|
auto lease_id = resp.value().lease();
|
||||||
etcd.add(key, meta_str, lease_id);
|
etcd.add(key, meta_str, lease_id);
|
||||||
|
|
||||||
|
#ifndef _ETCD_NO_EXCEPTIONS
|
||||||
std::function<void(std::exception_ptr)> handler =
|
std::function<void(std::exception_ptr)> handler =
|
||||||
[](std::exception_ptr eptr) {
|
[](std::exception_ptr eptr) {
|
||||||
try {
|
try {
|
||||||
|
|
@ -57,6 +60,9 @@ TEST_CASE("keepalive won't expire") {
|
||||||
std::cerr << "Lease expiry \"" << e.what() << "\"\n";
|
std::cerr << "Lease expiry \"" << e.what() << "\"\n";
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
#else
|
||||||
|
std::function<void(std::exception_ptr)> handler;
|
||||||
|
#endif
|
||||||
etcd::KeepAlive keepalive(etcd, handler, ttl, lease_id);
|
etcd::KeepAlive keepalive(etcd, handler, ttl, lease_id);
|
||||||
std::this_thread::sleep_for(std::chrono::seconds(5));
|
std::this_thread::sleep_for(std::chrono::seconds(5));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -117,6 +117,7 @@ TEST_CASE("lock using lease") {
|
||||||
|
|
||||||
bool failed = false;
|
bool failed = false;
|
||||||
|
|
||||||
|
#ifndef _ETCD_NO_EXCEPTIONS
|
||||||
std::function<void(std::exception_ptr)> handler =
|
std::function<void(std::exception_ptr)> handler =
|
||||||
[&failed](std::exception_ptr eptr) {
|
[&failed](std::exception_ptr eptr) {
|
||||||
try {
|
try {
|
||||||
|
|
@ -128,6 +129,9 @@ TEST_CASE("lock using lease") {
|
||||||
failed = true;
|
failed = true;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
#else
|
||||||
|
std::function<void(std::exception_ptr)> handler;
|
||||||
|
#endif
|
||||||
|
|
||||||
// with handler
|
// with handler
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -24,37 +24,31 @@ class DistributedLock {
|
||||||
DistributedLock::DistributedLock(const std::string& lock_name, uint timeout) {
|
DistributedLock::DistributedLock(const std::string& lock_name, uint timeout) {
|
||||||
_etcd_client = std::unique_ptr<etcd::Client>(new etcd::Client(etcd_url));
|
_etcd_client = std::unique_ptr<etcd::Client>(new etcd::Client(etcd_url));
|
||||||
|
|
||||||
try {
|
if (timeout == 0) {
|
||||||
if (timeout == 0) {
|
etcd::Response resp = _etcd_client->lock(lock_name).get();
|
||||||
|
if (resp.is_ok()) {
|
||||||
|
_lock_key = resp.lock_key();
|
||||||
|
_acquired = true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
std::future<etcd::Response> future = std::async(std::launch::async, [&]() {
|
||||||
etcd::Response resp = _etcd_client->lock(lock_name).get();
|
etcd::Response resp = _etcd_client->lock(lock_name).get();
|
||||||
|
return resp;
|
||||||
|
});
|
||||||
|
|
||||||
|
std::future_status status = future.wait_for(std::chrono::seconds(timeout));
|
||||||
|
if (status == std::future_status::ready) {
|
||||||
|
auto resp = future.get();
|
||||||
if (resp.is_ok()) {
|
if (resp.is_ok()) {
|
||||||
_lock_key = resp.lock_key();
|
_lock_key = resp.lock_key();
|
||||||
_acquired = true;
|
_acquired = true;
|
||||||
}
|
}
|
||||||
|
} else if (status == std::future_status::timeout) {
|
||||||
|
std::cerr << "failed to acquire distributed because of lock timeout"
|
||||||
|
<< std::endl;
|
||||||
} else {
|
} else {
|
||||||
std::future<etcd::Response> future =
|
std::cerr << "failed to acquire distributed lock" << std::endl;
|
||||||
std::async(std::launch::async, [&]() {
|
|
||||||
etcd::Response resp = _etcd_client->lock(lock_name).get();
|
|
||||||
return resp;
|
|
||||||
});
|
|
||||||
|
|
||||||
std::future_status status =
|
|
||||||
future.wait_for(std::chrono::seconds(timeout));
|
|
||||||
if (status == std::future_status::ready) {
|
|
||||||
auto resp = future.get();
|
|
||||||
if (resp.is_ok()) {
|
|
||||||
_lock_key = resp.lock_key();
|
|
||||||
_acquired = true;
|
|
||||||
}
|
|
||||||
} else if (status == std::future_status::timeout) {
|
|
||||||
std::cerr << "failed to acquire distributed because of lock timeout"
|
|
||||||
<< std::endl;
|
|
||||||
} else {
|
|
||||||
std::cerr << "failed to acquire distributed lock" << std::endl;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} catch (std::exception& e) {
|
|
||||||
std::cerr << "failed to construct: " << e.what() << std::endl;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -63,13 +57,9 @@ DistributedLock::~DistributedLock() noexcept {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
auto resp = _etcd_client->unlock(_lock_key).get();
|
||||||
auto resp = _etcd_client->unlock(_lock_key).get();
|
if (!resp.is_ok()) {
|
||||||
if (!resp.is_ok()) {
|
std::cout << resp.error_code() << std::endl;
|
||||||
std::cout << resp.error_code() << std::endl;
|
|
||||||
}
|
|
||||||
} catch (std::exception& e) {
|
|
||||||
std::cerr << "failed to destruct: " << e.what() << std::endl;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -37,6 +37,7 @@ void print_response(etcd::Response const& resp) {
|
||||||
void wait_for_connection(std::string endpoints) {
|
void wait_for_connection(std::string endpoints) {
|
||||||
// wait until the client connects to etcd server
|
// wait until the client connects to etcd server
|
||||||
while (true) {
|
while (true) {
|
||||||
|
#ifndef _ETCD_NO_EXCEPTIONS
|
||||||
try {
|
try {
|
||||||
etcd::Client client(endpoints);
|
etcd::Client client(endpoints);
|
||||||
if (client.head().get().is_ok()) {
|
if (client.head().get().is_ok()) {
|
||||||
|
|
@ -45,6 +46,12 @@ void wait_for_connection(std::string endpoints) {
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
// pass
|
// pass
|
||||||
}
|
}
|
||||||
|
#else
|
||||||
|
etcd::Client client(endpoints);
|
||||||
|
if (client.head().get().is_ok()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
sleep(1);
|
sleep(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -85,6 +92,7 @@ TEST_CASE("watch should can be re-established") {
|
||||||
|
|
||||||
// issue some changes to see if the watcher works
|
// issue some changes to see if the watcher works
|
||||||
for (int round = 0; round < 100000; ++round) {
|
for (int round = 0; round < 100000; ++round) {
|
||||||
|
#ifndef _ETCD_NO_EXCEPTIONS
|
||||||
try {
|
try {
|
||||||
etcd::Client client(etcd_url);
|
etcd::Client client(etcd_url);
|
||||||
auto response =
|
auto response =
|
||||||
|
|
@ -92,6 +100,11 @@ TEST_CASE("watch should can be re-established") {
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
// pass
|
// pass
|
||||||
}
|
}
|
||||||
|
#else
|
||||||
|
etcd::Client client(etcd_url);
|
||||||
|
auto response =
|
||||||
|
client.set(my_prefix + "/foo", "bar-" + std::to_string(round)).get();
|
||||||
|
#endif
|
||||||
|
|
||||||
std::this_thread::sleep_for(std::chrono::seconds(2));
|
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||||
}
|
}
|
||||||
|
|
@ -101,6 +114,7 @@ TEST_CASE("watch should can be re-established") {
|
||||||
|
|
||||||
// the watcher has been cancelled and shouldn't work anymore
|
// the watcher has been cancelled and shouldn't work anymore
|
||||||
for (int round = 10; round < 20; ++round) {
|
for (int round = 10; round < 20; ++round) {
|
||||||
|
#ifndef _ETCD_NO_EXCEPTIONS
|
||||||
try {
|
try {
|
||||||
etcd::Client client(etcd_url);
|
etcd::Client client(etcd_url);
|
||||||
auto response =
|
auto response =
|
||||||
|
|
@ -108,6 +122,11 @@ TEST_CASE("watch should can be re-established") {
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
// pass
|
// pass
|
||||||
}
|
}
|
||||||
|
#else
|
||||||
|
etcd::Client client(etcd_url);
|
||||||
|
auto response =
|
||||||
|
client.set(my_prefix + "/foo", "bar-" + std::to_string(round)).get();
|
||||||
|
#endif
|
||||||
|
|
||||||
std::this_thread::sleep_for(std::chrono::seconds(2));
|
std::this_thread::sleep_for(std::chrono::seconds(2));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue