Compare commits

...

12 Commits

Author SHA1 Message Date
holobay 5994b54c68 Bump up the version to v0.15.5 2025-04-01 23:16:01 +03:00
Tao He ba6216385f Bump up the version to v0.15.4
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
2023-12-20 18:28:15 +08:00
Tao He 5ccaccec43
Enable ipv6 endpoints support (#262)
Resolves #250

Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
2023-12-20 17:56:18 +08:00
Tao He b82efea7a9
Enable -fno-exceptions support (#261)
Resolves #259

Signed-off-by: Tao He <sighingnow@gmail.com>
2023-12-20 09:18:10 +08:00
Tao He 5aff57cce5
Fixes the noisy logs when meets invalid addresses. (#260)
Signed-off-by: Tao He <sighingnow@gmail.com>
2023-12-20 00:32:00 +08:00
penfree 84343ca9f0
Fix: keepalive exit without any message due to clock drift (#258)
Fix:  #257

Co-authored-by: qiupengfei <qiupengfei@baidu.com>
2023-12-19 23:45:19 +08:00
Diskein 59635008c0
Fixes compiler errors (#254)
Co-authored-by: Denis Kalantaevsky <dkalantaevsky@gmail.com>
Co-authored-by: Tao He <sighingnow@gmail.com>
2023-10-05 22:59:06 -05:00
Clément Péron 47f0d9e032
cmake: fix when cross compiling (#252)
To compile protobuf, CMake needs to use the protoc and grpc-cpp-plugin
in the host architecture.

Unfortunately by default the protoc and grpc-cpp-plugin are the one for
the Target.
And since gRPC 1.52 they are explictly not exported when Cross Compiling
to avoid architecture mismatch.
See:
831d2a6855

Fix this by looking at the correct program

See example.
https://github.com/grpc/grpc/blob/master/examples/cpp/cmake/common.cmake#L54-L62

Signed-off-by: Clément Péron <peron.clem@gmail.com>
2023-09-25 09:49:24 +08:00
Tao He 6fc1f164c0
Fixes the extra-smi error in code generated by protobuf (#251)
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
2023-09-19 20:42:49 +08:00
Tao He e31ac4d4ca Bump up etcd-cpp-apiv3 to v0.15.3
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
2023-07-27 11:28:00 +08:00
JonLiu1993 e5dc903a5d
Fix error LNK1107 and undeclared identifier 'IPPROTO_TCP' (#244)
When I [Update](https://github.com/microsoft/vcpkg/pull/32747)
etcd-cpp-apiv3 version from 0.14.2 to 0.15.2, I get two build error:
````
fatal error LNK1107: invalid or corrupt file: cannot read at 0x330
````
````
error: use of undeclared identifier 'IPPROTO_TCP'
````

The first error was because the target `etcd-cpp-api-core-objects`
linked the wrong `libprotobufd.dll` file instead of the .lib file, I
used the usage provided by vcpkg to link the correct .lib file to fix
this error.

Another error was because `IPPROTO_TCP` was missing declaration
`"<netinet/in.h>"`, I added it to fix this error.

---------

Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
Co-authored-by: Zhao Liu <v-zhli17@microsoft.com>
Co-authored-by: Tao He <linzhu.ht@alibaba-inc.com>
2023-07-27 11:27:19 +08:00
Tao He 0eee75b52e
KeepAlive: auto grant a new lease if 0 is given as lease id (#242)
Fixes #3037

Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
2023-07-20 14:21:27 +08:00
35 changed files with 1153 additions and 146 deletions

View File

@ -16,7 +16,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-20.04, ubuntu-22.04, macos-11, macos-12]
etcd: [v3.2.26, v3.3.11, v3.4.13, v3.5.7]
etcd: [v3.2.32, v3.3.27, v3.4.27, v3.5.9]
exclude:
- os: ubuntu-20.04
etcd: v3.2.26
@ -243,6 +243,10 @@ jobs:
# tests without auth
echo "Run the etcd resolver test ........................."
# there's no ipv6 on github CI runner
# ./build/bin/EtcdResolverTest
echo "Run the etcd sync test ........................."
./build/bin/EtcdSyncTest
@ -399,6 +403,26 @@ jobs:
killall -TERM etcd
sleep 5
- name: Etcd Member test
run: |
killall -TERM etcd || true
sleep 5
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib:/usr/local/lib/x86_64-linux-gnu
# use etcd v3 api
export ETCDCTL_API="3"
rm -rf default.etcd
/usr/local/bin/etcd &
sleep 5
./build/bin/EtcdMemberTest
killall -TERM etcd
sleep 5
- name: Check ccache
run: |
ccache --show-stats

View File

@ -5,7 +5,7 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
set(etcd-cpp-api_VERSION_MAJOR 0)
set(etcd-cpp-api_VERSION_MINOR 15)
set(etcd-cpp-api_VERSION_PATCH 2)
set(etcd-cpp-api_VERSION_PATCH 5)
set(etcd-cpp-api_VERSION ${etcd-cpp-api_VERSION_MAJOR}.${etcd-cpp-api_VERSION_MINOR}.${etcd-cpp-api_VERSION_PATCH})
set(CMAKE_PROJECT_HOMEPAGE_URL https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3)
@ -26,6 +26,7 @@ if(NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES)
)
endif()
option(BUILD_WITH_NO_EXCEPTIONS "Build etcd-cpp-apiv3 with disabling exception handling, i.e., -fno-exceptions" OFF)
option(BUILD_SHARED_LIBS "Build etcd-cpp-apiv3 shared libraries" ON)
option(BUILD_ETCD_CORE_ONLY "Build etcd-cpp-apiv3 core library (the synchronous runtime) only" OFF)
option(BUILD_ETCD_TESTS "Build etcd-cpp-apiv3 test cases" OFF)
@ -88,6 +89,21 @@ macro(use_cxx target)
endif()
endmacro(use_cxx)
macro(set_exceptions target)
if(BUILD_WITH_NO_EXCEPTIONS)
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
target_compile_options(${target} PRIVATE "-fno-exceptions")
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
target_compile_options(${target} PRIVATE "-fno-exceptions")
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang")
target_compile_options(${target} PRIVATE "-fno-exceptions")
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "MSVC")
target_compile_options(${target} PRIVATE "/EHs-c-")
endif()
target_compile_definitions(${target} PUBLIC -D_ETCD_NO_EXCEPTIONS)
endif()
endmacro(set_exceptions)
if(APPLE)
# If we're on OS X check for Homebrew's copy of OpenSSL instead of Apple's
if(NOT OpenSSL_DIR)
@ -120,10 +136,34 @@ if(Protobuf_PROTOC_EXECUTABLE)
endif()
endif()
# When cross compiling we look for the native protoc compiler
# overwrite protobuf::protoc with the proper protoc
if(CMAKE_CROSSCOMPILING)
find_program(Protobuf_PROTOC_EXECUTABLE REQUIRED NAMES protoc)
if(NOT TARGET protobuf::protoc)
add_executable(protobuf::protoc IMPORTED)
endif()
set_target_properties(protobuf::protoc PROPERTIES
IMPORTED_LOCATION "${Protobuf_PROTOC_EXECUTABLE}")
endif()
find_package(gRPC QUIET)
if(gRPC_FOUND AND TARGET gRPC::grpc AND TARGET gRPC::grpc_cpp_plugin)
if(gRPC_FOUND AND TARGET gRPC::grpc)
# When cross compiling we look for the native grpc_cpp_plugin
if(CMAKE_CROSSCOMPILING)
find_program(GRPC_CPP_PLUGIN REQUIRED NAMES grpc_cpp_plugin)
if(NOT TARGET gRPC::grpc_cpp_plugin)
add_executable(gRPC::grpc_cpp_plugin IMPORTED)
endif()
set_target_properties(gRPC::grpc_cpp_plugin PROPERTIES
IMPORTED_LOCATION "${GRPC_CPP_PLUGIN}")
elseif(TARGET gRPC::grpc_cpp_plugin)
get_target_property(GRPC_CPP_PLUGIN gRPC::grpc_cpp_plugin LOCATION)
else()
message(FATAL_ERROR "Found gRPC but no gRPC CPP plugin defined")
endif()
set(GRPC_LIBRARIES gRPC::gpr gRPC::grpc gRPC::grpc++)
get_target_property(GRPC_CPP_PLUGIN gRPC::grpc_cpp_plugin LOCATION)
get_target_property(GRPC_INCLUDE_DIR gRPC::grpc INTERFACE_INCLUDE_DIRECTORIES)
else()
include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/FindGRPC.cmake)
@ -142,6 +182,9 @@ include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/GenerateProtobufGRPC.cmake)
if(gRPC_VERSION VERSION_LESS "1.21" OR gRPC_VERSION VERSION_GREATER "1.31")
add_definitions(-DWITH_GRPC_CHANNEL_CLASS)
endif()
if(gRPC_VERSION VERSION_LESS "1.17")
add_definitions(-DWITH_GRPC_CREATE_CHANNEL_INTERNAL_UNIQUE_POINTER)
endif()
# will set `PROTOBUF_GENERATES`, indicates all generated .cc files, and a target `protobuf_generates`.
include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/GenerateProtobuf.cmake)
@ -181,9 +224,13 @@ if(NOT CMAKE_CXX_COMPILER_ID MATCHES "MSVC")
endif()
check_cxx_compiler_flag(-Wno-c++17-extensions W_NO_CPP17_EXTENSIONS)
check_cxx_compiler_flag(-Wno-extra-semi W_NO_EXTRA_SEMI)
if(W_NO_CPP17_EXTENSIONS)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-c++17-extensions")
endif()
if(W_NO_EXTRA_SEMI)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-extra-semi")
endif()
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/bin)
@ -205,6 +252,7 @@ if(NOT BUILD_ETCD_CORE_ONLY)
endif()
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/etcd/v3/action_constants.hpp
${CMAKE_CURRENT_SOURCE_DIR}/etcd/v3/Transaction.hpp
${CMAKE_CURRENT_SOURCE_DIR}/etcd/v3/Member.hpp
DESTINATION include/etcd/v3)
configure_file(etcd-cpp-api-config.in.cmake
@ -290,6 +338,7 @@ set(CPACK_DEBIAN_PACKAGE_INSTALL "/usr/lib/lib*.so*"
"/usr/include/etcd/*.hpp"
"/usr/include/etcd/v3/action_constants.hpp"
"/usr/include/etcd/v3/Transaction.hpp"
"/usr/include/etcd/v3/Member.hpp"
)
set(CPACK_DEBIAN_PACKAGE_BUILD_NUMBER_PREFIX "")

View File

@ -222,6 +222,14 @@ Connecting to multiple endpoints is supported:
etcd::Client etcd("http://a.com:2379;http://b.com:2379;http://c.com:2379");
```
### IPv6
Connecting to IPv6 endpoints is supported:
```c++
etcd::Client etcd("http://::1:2379");
```
Behind the screen, gRPC's load balancer is used and the round-robin strategy will
be used by default.
@ -526,10 +534,10 @@ Values can be deleted with the `rm` method passing the key to be deleted as a pa
should point to an existing value. There are conditional variations for deletion too.
* `rm(std::string const& key)` unconditionally deletes the given key
* `rm_if(key, value, old_value)` deletes an already existing value but only if the previous
* `rm_if(key, old_value)` deletes an already existing value but only if the previous
value equals with old_value. If the values does not match returns with "Compare failed" error
(code `ERROR_COMPARE_FAILED`)
* `rm_if(key, value, old_index)` deletes an already existing value but only if the index of
* `rm_if(key, old_index)` deletes an already existing value but only if the index of
the previous value equals with old_index. If the indices does not match returns with "Compare
failed" error (code `ERROR_COMPARE_FAILED`)
@ -863,6 +871,8 @@ Without handler, the internal state can be checked via `KeepAlive::Check()` and
the async exception when there are errors during keeping the lease alive.
Note that even with `handler`, the `KeepAlive::Check()` still rethrow if there's an async exception.
When the library is built with `-fno-exceptions`, the `handler` argument and the `Check()` method
will abort the program when there are errors during keeping the lease alive.
### Etcd transactions
@ -960,7 +970,15 @@ The observer stream will be canceled when been destructed.
for more details, please refer to [etcd/Client.hpp](./etcd/Client.hpp) and [tst/ElectionTest.cpp](./tst/ElectionTest.cpp).
### TODO
## `-fno-exceptions`
The _etcd-cpp-apiv3_ library supports to be built with `-fno-exceptions` flag, controlled by the
cmake option `BUILD_WITH_NO_EXCEPTIONS=ON/OFF` (defaults to `OFF`).
When building with `-fno-exceptions`, the library will abort the program under certain circumstances,
e.g., when calling `.Check()` method of `KeepAlive` and there are errors during keeping the lease alive,
## TODO
1. Cancellation of asynchronous calls(except for watch)

View File

@ -630,6 +630,25 @@ class Client {
*/
pplx::task<Response> leases();
/**
* Add an etcd member to the etcd cluster, equivalent to `etcdctl member add`.
* @param peer_urls is comma separated list of URLs for the new member.
* @param is_learner is true if the added member is a learner.
*/
pplx::task<Response> add_member(std::string const& peer_urls,
bool is_learner);
/**
* List all members, equivalent to `etcdctl member list`.
*/
pplx::task<Response> list_member();
/**
* Add an etcd member to the etcd cluster, equivalent to `etcdctl member add`.
* @param member_id is the ID of the member to be removed.
*/
pplx::task<Response> remove_member(const uint64_t member_id);
/**
* Gains a lock at a key, using a default created lease, using the default
* lease (10 seconds), with keeping alive has already been taken care of by

View File

@ -85,10 +85,10 @@ class KeepAlive {
~KeepAlive();
protected:
// automatically refresh loop
void refresh();
// refresh once immediately
void refresh_once();
// automatically refresh loop, returns the error message if failed
std::string refresh();
// refresh once immediately, returns the error message if failed
std::string refresh_once();
struct EtcdServerStubs;
struct EtcdServerStubsDeleter {

View File

@ -9,6 +9,7 @@
#include <vector>
#include "etcd/Value.hpp"
#include "etcd/v3/Member.hpp"
namespace etcdv3 {
class AsyncWatchAction;
@ -34,7 +35,7 @@ class KeepAlive;
class Watcher;
/**
* The Reponse object received for the requests of etcd::Client
* The Response object received for the requests of etcd::Client
*/
class Response {
public:
@ -208,6 +209,11 @@ class Response {
*/
std::vector<int64_t> const& leases() const;
/**
* Returns the member list.
*/
std::vector<etcdv3::Member> const& members() const;
protected:
Response(const etcdv3::V3Response& response,
std::chrono::microseconds const& duration);
@ -239,6 +245,9 @@ class Response {
// for lease list
std::vector<int64_t> _leases;
// for member list
std::vector<etcdv3::Member> _members;
friend class Client;
friend class SyncClient;
friend class KeepAlive;

View File

@ -29,6 +29,9 @@ class AsyncLeaseKeepAliveAction;
class AsyncLeaseTimeToLiveAction;
class AsyncLeaseLeasesAction;
class AsyncLockAction;
class AsyncAddMemberAction;
class AsyncListMemberAction;
class AsyncRemoveMemberAction;
class AsyncUnlockAction;
class AsyncPutAction;
class AsyncRangeAction;
@ -679,6 +682,24 @@ class SyncClient {
*/
Response leases();
/**
* Add an etcd member to the etcd cluster, equivalent to `etcdctl member add`.
* @param peer_urls is comma separated list of URLs for the new member.
* @param is_learner is true if the added member is a learner.
*/
Response add_member(std::string const& peer_urls, bool is_learner);
/**
* List all members, equivalent to `etcdctl member list`.
*/
Response list_member();
/**
* Remove a member of an etcd cluster, equivalent to `etcdctl member remove`.
* @param member_id is the id of the member to be removed.
*/
Response remove_member(uint64_t member_id);
/**
* Gains a lock at a key, using a default created lease, using the default
* lease (10 seconds), with keeping alive has already been taken care of by
@ -835,6 +856,11 @@ class SyncClient {
std::shared_ptr<etcdv3::AsyncLeaseTimeToLiveAction> leasetimetolive_internal(
int64_t lease_id);
std::shared_ptr<etcdv3::AsyncLeaseLeasesAction> leases_internal();
std::shared_ptr<etcdv3::AsyncAddMemberAction> add_member_internal(
std::string const& peer_urls, bool is_learner);
std::shared_ptr<etcdv3::AsyncListMemberAction> list_member_internal();
std::shared_ptr<etcdv3::AsyncRemoveMemberAction> remove_member_internal(
const uint64_t member_id);
Response lock_internal(std::string const& key,
std::shared_ptr<etcd::KeepAlive> const& keepalive);
std::shared_ptr<etcdv3::AsyncLockAction> lock_with_lease_internal(

View File

@ -91,7 +91,9 @@ class Value {
int64_t leaseId;
};
typedef std::vector<Value> Values;
using Values = std::vector<Value>;
std::ostream& operator<<(std::ostream& os, const Value& value);
class Event {
public:
@ -122,7 +124,12 @@ class Event {
bool _has_kv, _has_prev_kv;
};
typedef std::vector<Event> Events;
using Events = std::vector<Event>;
std::ostream& operator<<(std::ostream& os, const Event::EventType& value);
std::ostream& operator<<(std::ostream& os, const Event& event);
} // namespace etcd
#endif

View File

@ -162,8 +162,11 @@ class Watcher {
* Note that you shouldn't use the watcher itself inside the `Wait()` callback
* as the callback will be invoked in a separate **detached** thread where the
* watcher may have been destroyed.
*
* @return true if the callback has been set successfully (no existing
* callback).
*/
void Wait(std::function<void(bool)> callback);
bool Wait(std::function<void(bool)> callback);
/**
* Stop the watching action.

View File

@ -15,6 +15,7 @@ using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using etcdserverpb::Cluster;
using etcdserverpb::KV;
using etcdserverpb::Lease;
using etcdserverpb::Watch;
@ -44,9 +45,16 @@ struct ActionParameters {
std::string value;
std::string old_value;
std::string auth_token;
// for cluster management apis
std::vector<std::string> peer_urls;
bool is_learner;
uint64_t member_id;
std::chrono::microseconds grpc_timeout = std::chrono::microseconds::zero();
KV::Stub* kv_stub;
Watch::Stub* watch_stub;
Cluster::Stub* cluster_stub;
Lease::Stub* lease_stub;
Lock::Stub* lock_stub;
Election::Stub* election_stub;

View File

@ -37,6 +37,12 @@ using etcdserverpb::LeaseRevokeRequest;
using etcdserverpb::LeaseRevokeResponse;
using etcdserverpb::LeaseTimeToLiveRequest;
using etcdserverpb::LeaseTimeToLiveResponse;
using etcdserverpb::MemberAddRequest;
using etcdserverpb::MemberAddResponse;
using etcdserverpb::MemberListRequest;
using etcdserverpb::MemberListResponse;
using etcdserverpb::MemberRemoveRequest;
using etcdserverpb::MemberRemoveResponse;
using etcdserverpb::PutRequest;
using etcdserverpb::PutResponse;
using etcdserverpb::RangeRequest;
@ -103,6 +109,24 @@ class AsyncLeaseKeepAliveResponse : public etcdv3::V3Response {
void ParseResponse(LeaseKeepAliveResponse& resp);
};
class AsyncMemberAddResponse : public etcdv3::V3Response {
public:
AsyncMemberAddResponse(){};
void ParseResponse(MemberAddResponse& resp);
};
class AsyncMemberListResponse : public etcdv3::V3Response {
public:
AsyncMemberListResponse(){};
void ParseResponse(MemberListResponse& resp);
};
class AsyncMemberRemoveResponse : public etcdv3::V3Response {
public:
AsyncMemberRemoveResponse(){};
void ParseResponse(MemberRemoveResponse& resp);
};
class AsyncLeaseLeasesResponse : public etcdv3::V3Response {
public:
AsyncLeaseLeasesResponse(){};
@ -269,12 +293,44 @@ class AsyncLeaseKeepAliveAction : public etcdv3::Action {
stream;
LeaseKeepAliveRequest req;
bool isCancelled;
std::atomic_bool isCancelled;
std::recursive_mutex protect_is_cancelled;
friend class etcd::KeepAlive;
};
class AsyncAddMemberAction : public etcdv3::Action {
public:
AsyncAddMemberAction(etcdv3::ActionParameters&& params);
AsyncMemberAddResponse ParseResponse();
private:
MemberAddResponse reply;
std::unique_ptr<ClientAsyncResponseReader<MemberAddResponse>> response_reader;
};
class AsyncListMemberAction : public etcdv3::Action {
public:
AsyncListMemberAction(etcdv3::ActionParameters&& params);
AsyncMemberListResponse ParseResponse();
private:
MemberListResponse reply;
std::unique_ptr<ClientAsyncResponseReader<MemberListResponse>>
response_reader;
};
class AsyncRemoveMemberAction : public etcdv3::Action {
public:
AsyncRemoveMemberAction(etcdv3::ActionParameters&& params);
AsyncMemberRemoveResponse ParseResponse();
private:
MemberRemoveResponse reply;
std::unique_ptr<ClientAsyncResponseReader<MemberRemoveResponse>>
response_reader;
};
class AsyncLeaseLeasesAction : public etcdv3::Action {
public:
AsyncLeaseLeasesAction(etcdv3::ActionParameters&& params);
@ -330,7 +386,7 @@ class AsyncObserveAction : public etcdv3::Action {
LeaderResponse reply;
std::unique_ptr<ClientAsyncReader<LeaderResponse>> response_reader;
std::atomic_bool isCancelled;
std::mutex protect_is_cancalled;
std::mutex protect_is_cancelled;
};
class AsyncProclaimAction : public etcdv3::Action {

33
etcd/v3/Member.hpp Normal file
View File

@ -0,0 +1,33 @@
#ifndef __V3_ETCDV3MEMBERS_HPP__
#define __V3_ETCDV3MEMBERS_HPP__
#include <cstdint>
#include <string>
#include <vector>
using namespace std;
namespace etcdv3 {
class Member {
public:
Member();
void set_id(uint64_t const& id);
uint64_t const& get_id() const;
void set_name(std::string const& name);
std::string const& get_name() const;
void set_peerURLs(std::vector<std::string> const& peerURLs);
std::vector<std::string> const& get_peerURLs() const;
void set_clientURLs(std::vector<std::string> const& clientURLs);
std::vector<std::string> const& get_clientURLs() const;
void set_learner(bool isLearner);
bool get_learner() const;
private:
uint64_t id;
std::string name;
std::vector<std::string> peerURLs;
std::vector<std::string> clientURLs;
bool isLearner;
};
} // namespace etcdv3
#endif

View File

@ -6,6 +6,7 @@
#include "proto/v3election.pb.h"
#include "etcd/v3/KeyValue.hpp"
#include "etcd/v3/Member.hpp"
namespace etcdv3 {
class V3Response {
@ -36,6 +37,7 @@ class V3Response {
uint64_t get_member_id() const;
uint64_t get_raft_term() const;
std::vector<int64_t> const& get_leases() const;
std::vector<etcdv3::Member> const& get_members() const;
protected:
int error_code;
@ -59,6 +61,8 @@ class V3Response {
// for lease list
std::vector<int64_t> leases;
// for member list
std::vector<etcdv3::Member> members;
};
} // namespace etcdv3
#endif

View File

@ -23,6 +23,10 @@ extern char const* LEASEKEEPALIVE;
extern char const* LEASETIMETOLIVE;
extern char const* LEASELEASES;
extern char const* ADDMEMBER;
extern char const* LISTMEMBER;
extern char const* REMOVEMEMBER;
extern char const* CAMPAIGN_ACTION;
extern char const* PROCLAIM_ACTION;
extern char const* LEADER_ACTION;

View File

@ -19,35 +19,50 @@ file(GLOB_RECURSE CPP_CLIENT_CORE_SRC
add_library(etcd-cpp-api-core-objects OBJECT ${CPP_CLIENT_CORE_SRC} ${PROTOBUF_GENERATES})
use_cxx(etcd-cpp-api-core-objects)
set_exceptions(etcd-cpp-api-core-objects)
add_dependencies(etcd-cpp-api-core-objects protobuf_generates)
include_generated_protobuf_files(etcd-cpp-api-core-objects)
target_link_libraries(etcd-cpp-api-core-objects PUBLIC
${PROTOBUF_LIBRARIES}
${OPENSSL_LIBRARIES}
${GRPC_LIBRARIES}
)
if(TARGET protobuf::libprotobuf)
target_link_libraries(etcd-cpp-api-core-objects PUBLIC protobuf::libprotobuf)
else()
target_link_libraries(etcd-cpp-api-core-objects PUBLIC ${PROTOBUF_LIBRARIES})
endif()
if(BUILD_ETCD_CORE_ONLY)
# add the core library, includes the sycnhronous client only
add_library(etcd-cpp-api-core $<TARGET_OBJECTS:etcd-cpp-api-core-objects>)
use_cxx(etcd-cpp-api-core)
set_exceptions(etcd-cpp-api-core)
target_link_libraries(etcd-cpp-api-core PUBLIC
${PROTOBUF_LIBRARIES}
${OPENSSL_LIBRARIES}
${GRPC_LIBRARIES}
${OPENSSL_LIBRARIES}
${GRPC_LIBRARIES}
)
if(TARGET protobuf::libprotobuf)
target_link_libraries(etcd-cpp-api-core PUBLIC protobuf::libprotobuf)
else()
target_link_libraries(etcd-cpp-api-core PUBLIC ${PROTOBUF_LIBRARIES})
endif()
include_generated_protobuf_files(etcd-cpp-api-core)
else()
# add the client with asynchronus client
add_library(etcd-cpp-api $<TARGET_OBJECTS:etcd-cpp-api-core-objects>
"${CMAKE_CURRENT_SOURCE_DIR}/Client.cpp")
use_cxx(etcd-cpp-api)
set_exceptions(etcd-cpp-api)
target_link_libraries(etcd-cpp-api PUBLIC
${CPPREST_LIB} # n.b.: the asynchronous client requires pplx in cpprestsdk
${PROTOBUF_LIBRARIES}
${OPENSSL_LIBRARIES}
${GRPC_LIBRARIES}
)
if(TARGET protobuf::libprotobuf)
target_link_libraries(etcd-cpp-api PUBLIC protobuf::libprotobuf)
else()
target_link_libraries(etcd-cpp-api PUBLIC ${PROTOBUF_LIBRARIES})
endif()
include_generated_protobuf_files(etcd-cpp-api)
endif()

View File

@ -494,6 +494,27 @@ pplx::task<etcd::Response> etcd::Client::leases() {
this->client->leases_internal());
}
pplx::task<etcd::Response> etcd::Client::add_member(
std::string const& peer_urls, bool is_learner) {
return etcd::detail::asyncify(
static_cast<responser_t<etcdv3::AsyncAddMemberAction>>(Response::create),
this->client->add_member_internal(peer_urls, is_learner));
}
pplx::task<etcd::Response> etcd::Client::list_member() {
return etcd::detail::asyncify(
static_cast<responser_t<etcdv3::AsyncListMemberAction>>(Response::create),
this->client->list_member_internal());
}
pplx::task<etcd::Response> etcd::Client::remove_member(
const uint64_t member_id) {
return etcd::detail::asyncify(
static_cast<responser_t<etcdv3::AsyncRemoveMemberAction>>(
Response::create),
this->client->remove_member_internal(member_id));
}
pplx::task<etcd::Response> etcd::Client::lock(std::string const& key) {
static const int DEFAULT_LEASE_TTL_FOR_LOCK =
10; // see also etcd::SyncClient::lock

View File

@ -1,4 +1,5 @@
#include <chrono>
#include <iostream>
#include <ratio>
#include "etcd/KeepAlive.hpp"
@ -28,6 +29,10 @@ etcd::KeepAlive::KeepAlive(SyncClient const& client, int ttl, int64_t lease_id)
lease_id(lease_id),
continue_next(true),
grpc_timeout(client.get_grpc_timeout()) {
if (ttl > 0 && lease_id == 0) {
this->lease_id =
const_cast<SyncClient&>(client).leasegrant(ttl).value().lease();
}
stubs.reset(new EtcdServerStubs{});
stubs->leaseServiceStub = Lease::NewStub(client.grpc_channel());
@ -41,6 +46,7 @@ etcd::KeepAlive::KeepAlive(SyncClient const& client, int ttl, int64_t lease_id)
stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(std::move(params)));
refresh_task_ = std::thread([this]() {
#ifndef _ETCD_NO_EXCEPTIONS
try {
// start refresh
this->refresh();
@ -48,6 +54,12 @@ etcd::KeepAlive::KeepAlive(SyncClient const& client, int ttl, int64_t lease_id)
// propagate the exception
eptr_ = std::current_exception();
}
#else
const std::string err = this->refresh();
if (!err.empty()) {
eptr_ = std::make_exception_ptr(std::runtime_error(err));
}
#endif
});
}
@ -68,7 +80,7 @@ etcd::KeepAlive::KeepAlive(
std::function<void(std::exception_ptr)> const& handler, int ttl,
int64_t lease_id, std::string const& target_name_override)
: KeepAlive(SyncClient(address, ca, cert, privkey, target_name_override),
ttl, lease_id) {}
handler, ttl, lease_id) {}
etcd::KeepAlive::KeepAlive(
SyncClient const& client,
@ -79,6 +91,10 @@ etcd::KeepAlive::KeepAlive(
lease_id(lease_id),
continue_next(true),
grpc_timeout(client.get_grpc_timeout()) {
if (ttl > 0 && lease_id == 0) {
this->lease_id =
const_cast<SyncClient&>(client).leasegrant(ttl).value().lease();
}
stubs.reset(new EtcdServerStubs{});
stubs->leaseServiceStub = Lease::NewStub(client.grpc_channel());
@ -90,15 +106,22 @@ etcd::KeepAlive::KeepAlive(
stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(std::move(params)));
refresh_task_ = std::thread([this]() {
#ifndef _ETCD_NO_EXCEPTIONS
try {
// start refresh
this->refresh();
} catch (...) {
// propogate the exception
// propagate the exception
eptr_ = std::current_exception();
if (handler_) {
handler_(eptr_);
}
}
#else
const std::string err = this->refresh();
if (!err.empty()) {
eptr_ = std::make_exception_ptr(std::runtime_error(err));
}
#endif
if (eptr_ && handler_) {
handler_(eptr_);
}
});
}
@ -140,6 +163,7 @@ void etcd::KeepAlive::Check() {
std::rethrow_exception(eptr_);
}
// issue an refresh to make sure it still alive
#ifndef _ETCD_NO_EXCEPTIONS
try {
this->refresh_once();
} catch (...) {
@ -149,19 +173,35 @@ void etcd::KeepAlive::Check() {
// propagate the exception, as we throw in `Check()`, the `handler` won't be
// touched
eptr_ = std::current_exception();
if (handler_) {
handler_(eptr_);
}
}
#else
const std::string err = this->refresh_once();
if (!err.empty()) {
// run canceller first
this->Cancel();
// propagate the exception, as we throw in `Check()`, the `handler` won't be
// touched
eptr_ = std::make_exception_ptr(std::runtime_error(err));
}
#endif
if (eptr_ && handler_) {
handler_(eptr_);
}
#ifndef _ETCD_NO_EXCEPTIONS
if (eptr_) {
// rethrow in `Check()` to keep the consistent semantics
std::rethrow_exception(eptr_);
}
#endif
return;
}
void etcd::KeepAlive::refresh() {
std::string etcd::KeepAlive::refresh() {
while (true) {
if (!continue_next.load()) {
return;
return std::string{};
}
// minimal resolution: 1 second
int keepalive_ttl = std::max(ttl - 1, 1);
@ -169,29 +209,50 @@ void etcd::KeepAlive::refresh() {
std::unique_lock<std::mutex> lock(mutex_for_refresh_);
if (cv_for_refresh_.wait_for(lock, std::chrono::seconds(keepalive_ttl)) ==
std::cv_status::no_timeout) {
return;
if (!continue_next.load()) {
return std::string{};
}
#ifndef NDEBUG
std::cerr
<< "[warn] awaked from condition_variable but continue_next is "
"not set, maybe due to clock drift."
<< std::endl;
#endif
}
}
// execute refresh
this->refresh_once();
const std::string err = this->refresh_once();
if (!err.empty()) {
return err;
}
}
return std::string{};
}
void etcd::KeepAlive::refresh_once() {
std::string etcd::KeepAlive::refresh_once() {
std::lock_guard<std::mutex> scope_lock(mutex_for_refresh_);
if (!continue_next.load()) {
return;
return std::string{};
}
this->stubs->call->mutable_parameters().grpc_timeout = this->grpc_timeout;
auto resp = this->stubs->call->Refresh();
if (!resp.is_ok()) {
throw std::runtime_error("Failed to refresh lease: error code: " +
std::to_string(resp.error_code()) +
", message: " + resp.error_message());
const std::string err = "Failed to refresh lease: error code: " +
std::to_string(resp.error_code()) +
", message: " + resp.error_message();
#ifndef _ETCD_NO_EXCEPTIONS
throw std::runtime_error(err);
#endif
return err;
}
if (resp.value().ttl() == 0) {
throw std::out_of_range(
"Failed to refresh lease due to expiration: the new TTL is 0.");
const std::string err =
"Failed to refresh lease due to expiration: the new TTL is 0.";
#ifndef _ETCD_NO_EXCEPTIONS
throw std::out_of_range(err);
#endif
return err;
}
return std::string{};
}

View File

@ -25,6 +25,7 @@ etcd::Response::Response(const etcd::Response& response) {
this->_raft_term = response._raft_term;
this->_leases = response._leases;
this->_members = response._members;
}
etcd::Response::Response(const etcdv3::V3Response& reply,
@ -64,6 +65,8 @@ etcd::Response::Response(const etcdv3::V3Response& reply,
// lease list
this->_leases = reply.get_leases();
// member list
this->_members = reply.get_members();
}
etcd::Response::Response(int error_code, std::string const& error_message)
@ -131,3 +134,7 @@ uint64_t etcd::Response::raft_term() const { return this->_raft_term; }
std::vector<int64_t> const& etcd::Response::leases() const {
return this->_leases;
}
std::vector<etcdv3::Member> const& etcd::Response::members() const {
return this->_members;
}

View File

@ -4,6 +4,11 @@
#define NOMINMAX
#endif
#ifdef __ANDROID__
#include <netinet/in.h>
#endif
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
@ -33,6 +38,7 @@
#include <grpc++/grpc++.h>
#include <grpc++/security/credentials.h>
#include <grpc++/support/status_code_enum.h>
#include <grpc/grpc.h> // for grpc_lame_client_channel_create()
#include "proto/rpc.grpc.pb.h"
#include "proto/v3election.grpc.pb.h"
@ -45,6 +51,22 @@
#include "etcd/v3/Transaction.hpp"
#include "etcd/v3/action_constants.hpp"
namespace grpc {
// forward declaration for compatibility with older grpc versions
std::shared_ptr<Channel> CreateChannelInternal(
const std::string& host, grpc_channel* c_channel,
#if defined(WITH_GRPC_CREATE_CHANNEL_INTERNAL_UNIQUE_POINTER)
std::unique_ptr<std::vector<
std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>
interceptor_creators
#else
std::vector<
std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
interceptor_creators
#endif
);
} // namespace grpc
namespace etcd {
namespace detail {
@ -66,7 +88,7 @@ static void string_split(std::vector<std::string>& dests,
}
static std::string string_join(std::vector<std::string> const& srcs,
std::string const sep) {
std::string const& sep) {
std::stringstream ss;
if (!srcs.empty()) {
ss << srcs[0];
@ -78,17 +100,30 @@ static std::string string_join(std::vector<std::string> const& srcs,
}
static bool dns_resolve(std::string const& target,
std::vector<std::string>& endpoints) {
struct addrinfo hints = {}, *addrs;
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
std::vector<std::string>& endpoints, bool ipv4 = true) {
std::vector<std::string> target_parts;
string_split(target_parts, target, ":");
if (target_parts.size() != 2) {
std::cerr << "warn: invalid URL: " << target << std::endl;
return false;
bool ipv6_url{false};
{
size_t rindex = target.rfind(':');
if (rindex == target.npos) {
#ifndef NDEBUG
std::cerr << "[warn] invalid URL: " << target << ", expects 'host:port'"
<< std::endl;
#endif
return false;
}
std::string host(target.substr(0, rindex));
// host format is [ipv6]
if (!ipv4 && !host.empty() && host[0] == '[' &&
host[host.size() - 1] == ']') {
host = target.substr(1, rindex - 2);
ipv6_url = true;
}
target_parts.push_back(host);
target_parts.push_back(target.substr(rindex + 1));
}
#if defined(_WIN32)
@ -100,26 +135,61 @@ static bool dns_resolve(std::string const& target,
int err = WSAStartup(wVersionRequested, &wsaData);
if (err != 0) {
// Tell the user that we could not find a usable Winsock DLL.
std::cerr << "WSAStartup failed with error: %d" << err << std::endl;
#ifndef NDEBUG
std::cerr << "[warn] WSAStartup failed with error: %d" << err
<< std::endl;
#endif
return false;
}
}
#endif
if (ipv6_url) {
// check valid ipv6
struct sockaddr_in6 sa6;
if (inet_pton(AF_INET6, target_parts[0].c_str(), &(sa6.sin6_addr)) == 1) {
endpoints.emplace_back(target);
return true;
}
return false;
}
struct addrinfo hints = {}, *addrs;
hints.ai_family = ipv4 ? AF_INET : AF_INET6;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
int r = getaddrinfo(target_parts[0].c_str(), target_parts[1].c_str(), &hints,
&addrs);
if (r != 0) {
std::cerr << "warn: getaddrinfo() failed for endpoint " << target
<< " with error: " << r << std::endl;
#ifndef NDEBUG
std::cerr << "[warn] getaddrinfo() as " << (ipv4 ? "ipv4" : "ipv6")
<< " failed for endpoint " << target << " with error: " << r
<< ", " << strerror(errno) << std::endl;
#endif
return false;
}
char host[16] = {'\0'};
for (struct addrinfo* addr = addrs; addr != nullptr; addr = addr->ai_next) {
if (addr->ai_family != AF_INET && addr->ai_family != AF_INET6) {
continue;
}
memset(host, '\0', sizeof(host));
getnameinfo(addr->ai_addr, addr->ai_addrlen, host, sizeof(host), NULL, 0,
NI_NUMERICHOST);
endpoints.emplace_back(std::string(host) + ":" + target_parts[1]);
int r = getnameinfo(addr->ai_addr, addr->ai_addrlen, host, sizeof(host),
NULL, 0, NI_NUMERICHOST);
if (r != 0) {
#ifndef NDEBUG
std::cerr << "[warn] getnameinfo() failed for endpoint " << target
<< " with error: " << r << ", " << strerror(errno) << std::endl;
#endif
continue;
}
std::string host_string = host;
if (addr->ai_family == AF_INET6) {
host_string = "[" + host_string + "]";
}
endpoints.emplace_back(host_string + ":" + target_parts[1]);
}
freeaddrinfo(addrs);
return true;
@ -128,25 +198,33 @@ static bool dns_resolve(std::string const& target,
const std::string strip_and_resolve_addresses(std::string const& address) {
std::vector<std::string> addresses;
string_split(addresses, address, ",;");
std::string stripped_address;
std::string stripped_v4_address, stripped_v6_address;
{
std::vector<std::string> stripped_addresses;
std::vector<std::string> stripped_v4_addresses, stripped_v6_addresses;
std::string substr("://");
for (auto const& addr : addresses) {
std::string::size_type idx = addr.find(substr);
std::string target =
idx == std::string::npos ? addr : addr.substr(idx + substr.length());
etcd::detail::dns_resolve(target, stripped_addresses);
etcd::detail::dns_resolve(target, stripped_v4_addresses, true);
etcd::detail::dns_resolve(target, stripped_v6_addresses, false);
}
stripped_address = string_join(stripped_addresses, ",");
stripped_v4_address = string_join(stripped_v4_addresses, ",");
stripped_v6_address = string_join(stripped_v6_addresses, ",");
}
return "ipv4:///" + stripped_address;
// prefer resolved ipv4 addresses
if (!stripped_v4_address.empty()) {
return "ipv4:///" + stripped_v4_address;
}
if (!stripped_v6_address.empty()) {
return "ipv6:///" + stripped_v6_address;
}
return std::string{};
}
const bool authenticate(std::shared_ptr<grpc::Channel> const& channel,
std::string const& username,
std::string const& password,
std::string& token_or_message) {
bool authenticate(std::shared_ptr<grpc::Channel> const& channel,
std::string const& username, std::string const& password,
std::string& token_or_message) {
// run a round of auth
auto auth_stub = etcdserverpb::Auth::NewStub(channel);
ClientContext context;
@ -172,10 +250,12 @@ static std::string read_from_file(std::string const& filename) {
file.close();
return ss.str();
} else {
std::cerr << "[ERROR] failed to load given file '" << filename << "', "
#ifndef NDEBUG
std::cerr << "[error] failed to load given file '" << filename << "', "
<< strerror(errno) << std::endl;
#endif
return std::string{};
}
return std::string{};
}
static grpc::SslCredentialsOptions make_ssl_credentials(
@ -192,6 +272,33 @@ std::unique_ptr<T> make_unique_ptr(Args&&... args) {
return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
}
static std::shared_ptr<grpc::Channel> create_grpc_channel(
const std::string& address,
const std::shared_ptr<grpc::ChannelCredentials> creds,
const grpc::ChannelArguments& grpc_args) {
const std::string addresses =
etcd::detail::strip_and_resolve_addresses(address);
#ifndef NDEBUG
std::cerr << "[debug] resolved addresses: " << addresses << std::endl;
#endif
if (addresses.empty() || addresses == "ipv4:///" || addresses == "ipv6:///") {
// bypass grpc initialization to avoid noisy logs from grpc
return grpc::CreateChannelInternal(
"",
grpc_lame_client_channel_create(addresses.c_str(), GRPC_STATUS_INTERNAL,
"the target uri is not valid"),
#if defined(WITH_GRPC_CREATE_CHANNEL_INTERNAL_UNIQUE_POINTER)
nullptr
#else
std::vector<std::unique_ptr<
grpc::experimental::ClientInterceptorFactoryInterface>>()
#endif
);
} else {
return grpc::CreateCustomChannel(addresses, creds, grpc_args);
}
}
} // namespace detail
} // namespace etcd
@ -235,7 +342,11 @@ class etcd::SyncClient::TokenAuthenticator {
// auth
if (!etcd::detail::authenticate(this->channel_, username_, password_,
token_)) {
throw std::invalid_argument("Etcd authentication failed: " + token_);
// n.b.: no throw here as the failure of auth will be propagated
// to client when it is asked to issue requests.
//
// throw std::invalid_argument("Etcd authentication failed: " +
// token_);
}
}
}
@ -253,6 +364,7 @@ void etcd::SyncClient::TokenAuthenticatorDeleter::operator()(
struct etcd::SyncClient::EtcdServerStubs {
std::unique_ptr<etcdserverpb::KV::Stub> kvServiceStub;
std::unique_ptr<etcdserverpb::Watch::Stub> watchServiceStub;
std::unique_ptr<etcdserverpb::Cluster::Stub> clusterServiceStub;
std::unique_ptr<etcdserverpb::Lease::Stub> leaseServiceStub;
std::unique_ptr<v3lockpb::Lock::Stub> lockServiceStub;
std::unique_ptr<v3electionpb::Election::Stub> electionServiceStub;
@ -268,21 +380,20 @@ void etcd::SyncClient::EtcdServerStubsDeleter::operator()(
etcd::SyncClient::SyncClient(std::string const& address,
std::string const& load_balancer) {
// create channels
std::string const addresses =
etcd::detail::strip_and_resolve_addresses(address);
grpc::ChannelArguments grpc_args;
grpc_args.SetMaxSendMessageSize(std::numeric_limits<int>::max());
grpc_args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
std::shared_ptr<grpc::ChannelCredentials> creds =
grpc::InsecureChannelCredentials();
grpc_args.SetLoadBalancingPolicyName(load_balancer);
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
this->channel = etcd::detail::create_grpc_channel(address, creds, grpc_args);
this->token_authenticator.reset(new TokenAuthenticator());
// create stubs
stubs.reset(new EtcdServerStubs{});
stubs->kvServiceStub = KV::NewStub(this->channel);
stubs->watchServiceStub = Watch::NewStub(this->channel);
stubs->clusterServiceStub = Cluster::NewStub(this->channel);
stubs->leaseServiceStub = Lease::NewStub(this->channel);
stubs->lockServiceStub = Lock::NewStub(this->channel);
stubs->electionServiceStub = Election::NewStub(this->channel);
@ -291,14 +402,12 @@ etcd::SyncClient::SyncClient(std::string const& address,
etcd::SyncClient::SyncClient(std::string const& address,
grpc::ChannelArguments const& arguments) {
// create channels
std::string const addresses =
etcd::detail::strip_and_resolve_addresses(address);
grpc::ChannelArguments grpc_args = arguments;
grpc_args.SetMaxSendMessageSize(std::numeric_limits<int>::max());
grpc_args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
std::shared_ptr<grpc::ChannelCredentials> creds =
grpc::InsecureChannelCredentials();
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
this->channel = etcd::detail::create_grpc_channel(address, creds, grpc_args);
this->token_authenticator.reset(new TokenAuthenticator());
// create stubs
@ -326,15 +435,13 @@ etcd::SyncClient::SyncClient(std::string const& address,
int const auth_token_ttl,
std::string const& load_balancer) {
// create channels
std::string const addresses =
etcd::detail::strip_and_resolve_addresses(address);
grpc::ChannelArguments grpc_args;
grpc_args.SetMaxSendMessageSize(std::numeric_limits<int>::max());
grpc_args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
std::shared_ptr<grpc::ChannelCredentials> creds =
grpc::InsecureChannelCredentials();
grpc_args.SetLoadBalancingPolicyName(load_balancer);
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
this->channel = etcd::detail::create_grpc_channel(address, creds, grpc_args);
// auth
this->token_authenticator.reset(new TokenAuthenticator(
@ -355,14 +462,12 @@ etcd::SyncClient::SyncClient(std::string const& address,
int const auth_token_ttl,
grpc::ChannelArguments const& arguments) {
// create channels
std::string const addresses =
etcd::detail::strip_and_resolve_addresses(address);
grpc::ChannelArguments grpc_args = arguments;
grpc_args.SetMaxSendMessageSize(std::numeric_limits<int>::max());
grpc_args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
std::shared_ptr<grpc::ChannelCredentials> creds =
grpc::InsecureChannelCredentials();
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
this->channel = etcd::detail::create_grpc_channel(address, creds, grpc_args);
// auth
this->token_authenticator.reset(new TokenAuthenticator(
@ -400,8 +505,6 @@ etcd::SyncClient::SyncClient(std::string const& address, std::string const& ca,
std::string const& target_name_override,
std::string const& load_balancer) {
// create channels
std::string const addresses =
etcd::detail::strip_and_resolve_addresses(address);
grpc::ChannelArguments grpc_args;
grpc_args.SetMaxSendMessageSize(std::numeric_limits<int>::max());
grpc_args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
@ -412,7 +515,7 @@ etcd::SyncClient::SyncClient(std::string const& address, std::string const& ca,
grpc_args.SetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
target_name_override);
}
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
this->channel = etcd::detail::create_grpc_channel(address, creds, grpc_args);
this->token_authenticator.reset(new TokenAuthenticator());
// setup stubs
@ -430,8 +533,6 @@ etcd::SyncClient::SyncClient(std::string const& address, std::string const& ca,
std::string const& target_name_override,
grpc::ChannelArguments const& arguments) {
// create channels
std::string const addresses =
etcd::detail::strip_and_resolve_addresses(address);
grpc::ChannelArguments grpc_args = arguments;
grpc_args.SetMaxSendMessageSize(std::numeric_limits<int>::max());
grpc_args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
@ -441,7 +542,7 @@ etcd::SyncClient::SyncClient(std::string const& address, std::string const& ca,
grpc_args.SetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
target_name_override);
}
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
this->channel = etcd::detail::create_grpc_channel(address, creds, grpc_args);
this->token_authenticator.reset(new TokenAuthenticator());
// setup stubs
@ -918,6 +1019,60 @@ etcd::SyncClient::leases_internal() {
return std::make_shared<etcdv3::AsyncLeaseLeasesAction>(std::move(params));
}
etcd::Response etcd::SyncClient::add_member(std::string const& peer_urls,
bool is_learner) {
return Response::create(this->add_member_internal(peer_urls, is_learner));
}
std::shared_ptr<etcdv3::AsyncAddMemberAction>
etcd::SyncClient::add_member_internal(std::string const& peer_urls,
bool is_learner) {
etcdv3::ActionParameters params;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.cluster_stub = stubs->clusterServiceStub.get();
std::vector<std::string> peer_urls_vector;
std::istringstream iss(peer_urls);
std::string peer_url;
while (std::getline(iss, peer_url, ',')) {
peer_urls_vector.push_back(peer_url);
}
params.is_learner = is_learner;
params.peer_urls = peer_urls_vector;
return std::make_shared<etcdv3::AsyncAddMemberAction>(std::move(params));
}
etcd::Response etcd::SyncClient::list_member() {
return Response::create(this->list_member_internal());
}
std::shared_ptr<etcdv3::AsyncListMemberAction>
etcd::SyncClient::list_member_internal() {
etcdv3::ActionParameters params;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.cluster_stub = stubs->clusterServiceStub.get();
return std::make_shared<etcdv3::AsyncListMemberAction>(std::move(params));
}
etcd::Response etcd::SyncClient::remove_member(const uint64_t member_id) {
return Response::create(this->remove_member_internal(member_id));
}
std::shared_ptr<etcdv3::AsyncRemoveMemberAction>
etcd::SyncClient::remove_member_internal(const uint64_t member_id) {
etcdv3::ActionParameters params;
params.auth_token.assign(this->token_authenticator->renew_if_expired());
params.grpc_timeout = this->grpc_timeout;
params.cluster_stub = stubs->clusterServiceStub.get();
params.member_id = member_id;
return std::make_shared<etcdv3::AsyncRemoveMemberAction>(std::move(params));
}
etcd::Response etcd::SyncClient::lock(std::string const& key) {
// routines in lock usually will be fast, less than 10 seconds.
//
@ -1002,15 +1157,15 @@ std::shared_ptr<etcdv3::AsyncUnlockAction> etcd::SyncClient::unlock_internal(
if (p_keeps_alive != this->keep_alive_for_locks.end()) {
this->keep_alive_for_locks.erase(p_keeps_alive);
} else {
#if !defined(NDEBUG)
std::cerr << "Keepalive for lease not found" << std::endl;
#ifndef NDEBUG
std::cerr << "[warn] keepalive for lease not found" << std::endl;
#endif
}
lock_lease_id = p_leases->second;
this->leases_for_locks.erase(p_leases);
} else {
#if !defined(NDEBUG)
std::cerr << "Lease for lock not found" << std::endl;
#ifndef NDEBUG
std::cerr << "[warn] lease for lock not found" << std::endl;
#endif
}
if (lock_lease_id != 0) {

View File

@ -1,3 +1,4 @@
#include <cstdint>
#include <iomanip>
#include "etcd/Value.hpp"
@ -44,6 +45,19 @@ int etcd::Value::ttl() const { return _ttl; }
int64_t etcd::Value::lease() const { return leaseId; }
std::ostream& etcd::operator<<(std::ostream& os, const etcd::Value& value) {
os << "Event: {";
os << "Key: " << value.key() << ", ";
os << "Value: " << value.as_string() << ", ";
os << "Created: " << value.created_index() << ", ";
os << "Modified: " << value.modified_index() << ", ";
os << "Version: " << value.version() << ", ";
os << "TTL: " << value.ttl() << ", ";
os << "Lease: " << value.lease() << ", ";
os << "}";
return os;
}
etcd::Event::Event(mvccpb::Event const& event) {
_has_kv = event.has_kv();
_has_prev_kv = event.has_prev_kv();
@ -73,3 +87,30 @@ bool etcd::Event::has_prev_kv() const { return _has_prev_kv; }
const etcd::Value& etcd::Event::kv() const { return _kv; }
const etcd::Value& etcd::Event::prev_kv() const { return _prev_kv; }
std::ostream& etcd::operator<<(std::ostream& os,
const etcd::Event::EventType& value) {
switch (value) {
case etcd::Event::EventType::PUT:
os << "PUT";
break;
case etcd::Event::EventType::DELETE_:
os << "DELETE";
break;
case etcd::Event::EventType::INVALID:
os << "INVALID";
break;
}
return os;
}
std::ostream& etcd::operator<<(std::ostream& os, const etcd::Event& event) {
os << "Event type: " << event.event_type();
if (event.has_kv()) {
os << ", KV: " << event.kv();
}
if (event.has_prev_kv()) {
os << ", Prev KV: " << event.prev_kv();
}
return os;
}

View File

@ -229,13 +229,18 @@ bool etcd::Watcher::Wait() {
return stubs->call->Cancelled();
}
void etcd::Watcher::Wait(std::function<void(bool)> callback) {
bool etcd::Watcher::Wait(std::function<void(bool)> callback) {
if (wait_callback == nullptr) {
wait_callback = callback;
return true;
} else {
std::cerr << "Failed to set a asynchronous wait callback since it has "
"already been set"
<< std::endl;
#ifndef NDEBUG
std::cerr
<< "[warn] failed to set a asynchronous wait callback since it has "
"already been set"
<< std::endl;
#endif
return false;
}
}

View File

@ -2,6 +2,15 @@
#include <grpc/support/log.h>
#include <grpcpp/support/status.h>
#include "etcd/v3/action_constants.hpp"
#include <cstdlib>
#ifndef GPR_ASSERT
#define GPR_ASSERT(x) \
if (!(x)) { \
fprintf(stderr, "%s:%d assert failed\n", __FILE__, __LINE__); \
abort(); \
}
#endif
etcdv3::Action::Action(etcdv3::ActionParameters const& params) {
parameters = params;

View File

@ -95,6 +95,43 @@ void etcdv3::AsyncLeaseKeepAliveResponse::ParseResponse(
value.set_ttl(resp.ttl());
}
void etcdv3::AsyncMemberAddResponse::ParseResponse(MemberAddResponse& resp) {
index = resp.header().revision();
std::string member_type = "Member";
if (resp.member().islearner()) {
member_type = "Learner";
}
std::cout << "Member (" << resp.member().id() << ")"
<< " Added to the etcd cluster as " << member_type << std::endl;
}
void etcdv3::AsyncMemberListResponse::ParseResponse(MemberListResponse& resp) {
index = resp.header().revision();
for (auto member : resp.members()) {
etcdv3::Member m;
m.set_id(member.id());
m.set_name(member.name());
std::vector<std::string> clientUrlsVec, peerUrlsVec;
for (const auto& clientUrl : member.clienturls()) {
clientUrlsVec.push_back(clientUrl);
}
for (const auto& peerUrl : member.peerurls()) {
peerUrlsVec.push_back(peerUrl);
}
m.set_clientURLs(clientUrlsVec);
m.set_peerURLs(peerUrlsVec);
members.push_back(m);
}
}
void etcdv3::AsyncMemberRemoveResponse::ParseResponse(
MemberRemoveResponse& resp) {
index = resp.header().revision();
}
void etcdv3::AsyncLeaseLeasesResponse::ParseResponse(
LeaseLeasesResponse& resp) {
index = resp.header().revision();
@ -248,9 +285,12 @@ void etcdv3::AsyncTxnResponse::ParseResponse(TxnResponse& reply) {
}
// skip
std::cerr << "Not implemented error: unable to parse nested transaction "
#ifndef NDEBUG
std::cerr << "[debug] not implemented error: unable to parse nested "
"transaction "
"response"
<< std::endl;
#endif
}
}
if (!values.empty()) {
@ -509,7 +549,10 @@ etcdv3::AsyncLeaseKeepAliveAction::AsyncLeaseKeepAliveAction(
got_tag == (void*) etcdv3::KEEPALIVE_CREATE) {
// ok
} else {
throw std::runtime_error("Failed to create a lease keep-alive connection");
status = grpc::Status(grpc::StatusCode::CANCELLED,
"Failed to create a lease keep-alive connection");
// cannot continue for further refresh
isCancelled.store(true);
}
}
@ -531,7 +574,7 @@ etcd::Response etcdv3::AsyncLeaseKeepAliveAction::Refresh() {
std::lock_guard<std::recursive_mutex> scope_lock(this->protect_is_cancelled);
auto start_timepoint = std::chrono::high_resolution_clock::now();
if (isCancelled) {
if (isCancelled.load()) {
status = grpc::Status::CANCELLED;
return etcd::Response(ParseResponse(),
etcd::detail::duration_till_now(start_timepoint));
@ -618,9 +661,7 @@ etcd::Response etcdv3::AsyncLeaseKeepAliveAction::Refresh() {
void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive() {
std::lock_guard<std::recursive_mutex> scope_lock(this->protect_is_cancelled);
if (isCancelled == false) {
isCancelled = true;
if (!isCancelled.exchange(true)) {
void* got_tag = nullptr;
bool ok = false;
@ -629,17 +670,22 @@ void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive() {
got_tag == (void*) etcdv3::KEEPALIVE_DONE) {
// ok
} else {
std::cerr << "Failed to mark a lease keep-alive connection as DONE: "
<< context.debug_error_string() << std::endl;
#ifndef NDEBUG
std::cerr
<< "[debug] failed to mark a lease keep-alive connection as DONE: "
<< context.debug_error_string() << std::endl;
#endif
}
stream->Finish(&status, (void*) KEEPALIVE_FINISH);
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void*) KEEPALIVE_FINISH) {
// ok
} else {
std::cerr << "Failed to finish a lease keep-alive connection: "
#ifndef NDEBUG
std::cerr << "[debug] failed to finish a lease keep-alive connection: "
<< status.error_message() << ", "
<< context.debug_error_string() << std::endl;
#endif
}
// cancel on-the-fly calls
@ -650,7 +696,7 @@ void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive() {
}
bool etcdv3::AsyncLeaseKeepAliveAction::Cancelled() const {
return isCancelled;
return isCancelled.load();
}
etcdv3::ActionParameters&
@ -658,6 +704,81 @@ etcdv3::AsyncLeaseKeepAliveAction::mutable_parameters() {
return this->parameters;
}
etcdv3::AsyncAddMemberAction::AsyncAddMemberAction(
etcdv3::ActionParameters&& params)
: etcdv3::Action(std::move(params)) {
MemberAddRequest add_member_request;
for (const auto& url : parameters.peer_urls) {
add_member_request.add_peerurls(url);
}
add_member_request.set_islearner(parameters.is_learner);
response_reader = parameters.cluster_stub->AsyncMemberAdd(
&context, add_member_request, &cq_);
response_reader->Finish(&reply, &status, (void*) this);
}
etcdv3::AsyncMemberAddResponse etcdv3::AsyncAddMemberAction::ParseResponse() {
AsyncMemberAddResponse add_member_resp;
add_member_resp.set_action(etcdv3::ADDMEMBER);
if (!status.ok()) {
add_member_resp.set_error_code(status.error_code());
add_member_resp.set_error_message(status.error_message());
} else {
add_member_resp.ParseResponse(reply);
}
return add_member_resp;
}
etcdv3::AsyncListMemberAction::AsyncListMemberAction(
etcdv3::ActionParameters&& params)
: etcdv3::Action(std::move(params)) {
MemberListRequest member_list_request;
response_reader = parameters.cluster_stub->AsyncMemberList(
&context, member_list_request, &cq_);
response_reader->Finish(&reply, &status, (void*) this);
}
etcdv3::AsyncMemberListResponse etcdv3::AsyncListMemberAction::ParseResponse() {
AsyncMemberListResponse list_member_resp;
list_member_resp.set_action(etcdv3::LISTMEMBER);
if (!status.ok()) {
list_member_resp.set_error_code(status.error_code());
list_member_resp.set_error_message(status.error_message());
} else {
list_member_resp.ParseResponse(reply);
}
return list_member_resp;
}
etcdv3::AsyncRemoveMemberAction::AsyncRemoveMemberAction(
etcdv3::ActionParameters&& params)
: etcdv3::Action(std::move(params)) {
MemberRemoveRequest remove_member_request;
remove_member_request.set_id(parameters.member_id);
response_reader = parameters.cluster_stub->AsyncMemberRemove(
&context, remove_member_request, &cq_);
response_reader->Finish(&reply, &status, (void*) this);
}
etcdv3::AsyncMemberRemoveResponse
etcdv3::AsyncRemoveMemberAction::ParseResponse() {
AsyncMemberRemoveResponse remove_member_resp;
remove_member_resp.set_action(etcdv3::REMOVEMEMBER);
if (!status.ok()) {
remove_member_resp.set_error_code(status.error_code());
remove_member_resp.set_error_message(status.error_message());
} else {
remove_member_resp.ParseResponse(reply);
}
return remove_member_resp;
}
etcdv3::AsyncLeaseLeasesAction::AsyncLeaseLeasesAction(
etcdv3::ActionParameters&& params)
: etcdv3::Action(std::move(params)) {
@ -774,7 +895,10 @@ etcdv3::AsyncObserveAction::AsyncObserveAction(
got_tag == (void*) etcdv3::ELECTION_OBSERVE_CREATE) {
// n.b.: leave the issue of `Read` to the `waitForResponse`
} else {
throw std::runtime_error("failed to create a observe connection");
status = grpc::Status(grpc::StatusCode::CANCELLED,
"failed to create a observe connection");
// cannot continue for further observing
isCancelled.store(true);
}
}
@ -802,7 +926,7 @@ void etcdv3::AsyncObserveAction::waitForResponse() {
}
void etcdv3::AsyncObserveAction::CancelObserve() {
std::lock_guard<std::mutex> scope_lock(this->protect_is_cancalled);
std::lock_guard<std::mutex> scope_lock(this->protect_is_cancelled);
if (!isCancelled.exchange(true)) {
void* got_tag;
bool ok = false;
@ -820,8 +944,10 @@ void etcdv3::AsyncObserveAction::CancelObserve() {
break;
case CompletionQueue::NextStatus::GOT_EVENT:
if (!ok || got_tag != (void*) ELECTION_OBSERVE_FINISH) {
std::cerr << "Failed to finish a election observing connection"
#ifndef NDEBUG
std::cerr << "[debug] failed to finish a election observing connection"
<< std::endl;
#endif
}
}
@ -980,9 +1106,9 @@ etcdv3::AsyncSetAction::AsyncSetAction(etcdv3::ActionParameters&& params,
// backwards compatibility
txn.add_success_range(parameters.key);
if (create) {
txn.add_failure_put(parameters.key, parameters.value, parameters.lease_id);
} else {
txn.add_failure_range(parameters.key);
} else {
txn.add_failure_put(parameters.key, parameters.value, parameters.lease_id);
}
response_reader =
parameters.kv_stub->AsyncTxn(&context, *txn.txn_request, &cq_);
@ -1120,7 +1246,14 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters&& params)
got_tag == (void*) etcdv3::WATCH_CREATE) {
stream->Write(watch_req, (void*) etcdv3::WATCH_WRITE);
} else {
throw std::runtime_error("failed to create a watch connection");
status = grpc::Status(grpc::StatusCode::CANCELLED,
"failed to create a watch connection");
// cannot continue for further watching
isCancelled.store(true);
}
if (!status.ok()) {
return;
}
// wait "write" (WatchCreateRequest) success, and start to read the first
@ -1128,7 +1261,10 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters&& params)
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void*) etcdv3::WATCH_WRITE) {
stream->Read(&reply, (void*) this);
} else {
throw std::runtime_error("failed to write WatchCreateRequest to server");
status = grpc::Status(grpc::StatusCode::CANCELLED,
"failed to write WatchCreateRequest to server");
// cannot continue for further watching
isCancelled.store(true);
}
}
@ -1154,6 +1290,11 @@ void etcdv3::AsyncWatchAction::waitForResponse() {
bool ok = false;
bool the_final_round = false;
// failed to create the watcher
if (!status.ok()) {
return;
}
while (true) {
if (!the_final_round) {
if (!cq_.Next(&got_tag, &ok)) {
@ -1165,7 +1306,9 @@ void etcdv3::AsyncWatchAction::waitForResponse() {
switch (cq_.AsyncNext(&got_tag, &ok, deadline)) {
case CompletionQueue::NextStatus::TIMEOUT:
case CompletionQueue::NextStatus::SHUTDOWN: {
#ifndef NDEBUG
std::cerr << "[warn] watcher does't exit normally" << std::endl;
#endif
// pretend to be received a "WATCH_FINISH" tag: shutdown
context.TryCancel();
cq_.Shutdown();
@ -1219,7 +1362,6 @@ void etcdv3::AsyncWatchAction::waitForResponse() {
<< std::endl;
}
std::cout << "issue a watch cancel" << std::endl;
// cancel the watcher after receiving the good response
this->CancelWatch();
@ -1245,6 +1387,14 @@ void etcdv3::AsyncWatchAction::waitForResponse(
bool ok = false;
bool the_final_round = false;
// failed to create the watcher
if (!status.ok()) {
auto resp = ParseResponse();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - start_timepoint);
callback(etcd::Response(resp, duration));
}
while (true) {
if (!the_final_round) {
if (!cq_.Next(&got_tag, &ok)) {

40
src/v3/Member.cpp Normal file
View File

@ -0,0 +1,40 @@
#include "etcd/v3/Member.hpp"
etcdv3::Member::Member() {
id = 0;
name = "";
peerURLs = {};
clientURLs = {};
isLearner = false;
}
void etcdv3::Member::set_id(uint64_t const& id) { this->id = id; }
void etcdv3::Member::set_name(std::string const& name) { this->name = name; }
void etcdv3::Member::set_peerURLs(std::vector<std::string> const& peerURLs) {
this->peerURLs = peerURLs;
}
void etcdv3::Member::set_clientURLs(
std::vector<std::string> const& clientURLs) {
this->clientURLs = clientURLs;
}
void etcdv3::Member::set_learner(bool isLearner) {
this->isLearner = isLearner;
}
uint64_t const& etcdv3::Member::get_id() const { return id; }
std::string const& etcdv3::Member::get_name() const { return name; }
std::vector<std::string> const& etcdv3::Member::get_peerURLs() const {
return peerURLs;
}
std::vector<std::string> const& etcdv3::Member::get_clientURLs() const {
return clientURLs;
}
bool etcdv3::Member::get_learner() const { return isLearner; }

View File

@ -1,4 +1,5 @@
#include "etcd/v3/V3Response.hpp"
#include "etcd/v3/Member.hpp"
#include "etcd/v3/action_constants.hpp"
void etcdv3::V3Response::set_error_code(int code) { error_code = code; }
@ -79,3 +80,7 @@ uint64_t etcdv3::V3Response::get_raft_term() const { return this->raft_term; }
std::vector<int64_t> const& etcdv3::V3Response::get_leases() const {
return this->leases;
}
std::vector<etcdv3::Member> const& etcdv3::V3Response::get_members() const {
return this->members;
}

View File

@ -19,6 +19,10 @@ char const* etcdv3::LEASEKEEPALIVE = "leasekeepalive";
char const* etcdv3::LEASETIMETOLIVE = "leasetimetolive";
char const* etcdv3::LEASELEASES = "leaseleases";
char const* etcdv3::ADDMEMBER = "addmember";
char const* etcdv3::LISTMEMBER = "listmember";
char const* etcdv3::REMOVEMEMBER = "removemember";
char const* etcdv3::CAMPAIGN_ACTION = "campaign";
char const* etcdv3::PROCLAIM_ACTION = "preclaim";
char const* etcdv3::LEADER_ACTION = "leader";

View File

@ -19,6 +19,7 @@ foreach(testfile ${TEST_FILES})
add_executable(${test_name} EXCLUDE_FROM_ALL ${CMAKE_CURRENT_SOURCE_DIR}/${testfile})
endif()
use_cxx(${test_name})
set_exceptions(${test_name})
add_test(NAME ${test_name} COMMAND $<TARGET_FILE:${test_name}>)
target_include_directories(${test_name} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/../proto/gen)

136
tst/EtcdMemberTest.cpp Normal file
View File

@ -0,0 +1,136 @@
#include <cmath>
#define CATCH_CONFIG_MAIN
#include <catch.hpp>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <chrono>
#include <cstring>
#include <iostream>
#include <thread>
#include <vector>
#include "etcd/Client.hpp"
static const std::string etcd_url =
etcdv3::detail::resolve_etcd_endpoints("http://127.0.0.1:2379");
pid_t new_etcd_pid = 0;
pid_t start_etcd_server(const std::string& etcd_path,
const std::vector<std::string>& args) {
pid_t pid = fork();
if (pid == -1) {
std::cout << "Failed to fork process" << std::endl;
exit(EXIT_FAILURE);
} else if (pid == 0) {
std::vector<char*> c_args;
c_args.push_back(const_cast<char*>(etcd_path.c_str()));
for (const auto& arg : args) {
c_args.push_back(const_cast<char*>(arg.c_str()));
}
c_args.push_back(nullptr);
if (execvp(etcd_path.c_str(), c_args.data()) == -1) {
std::cout << "Failed to exec etcd process: " << std::strerror(errno)
<< std::endl;
exit(EXIT_FAILURE);
}
}
return pid;
}
TEST_CASE("add member") {
etcd::Client etcd(etcd_url);
etcd::Response res = etcd.list_member().get();
REQUIRE(res.is_ok());
CHECK(1 == res.members().size());
std::string member_name = res.members()[0].get_name();
std::string prev_peer_urls = res.members()[0].get_peerURLs()[0];
// Add a new member
std::string peer_urls = "http://127.0.0.1:33691";
std::string client_urls = "http://127.0.0.1:33690";
bool is_learner = false;
res = etcd.add_member(peer_urls, is_learner).get();
REQUIRE(res.is_ok());
// Create the directory for the new etcd server
std::string cmd = "mkdir -p /tmp/new_etcd_member";
system(cmd.c_str());
// Start a new etcd server
std::vector<std::string> args = {
"--name",
"new_etcd_member",
"--initial-advertise-peer-urls",
peer_urls,
"--listen-peer-urls",
peer_urls,
"--initial-cluster",
member_name + "=" + prev_peer_urls + ",new_etcd_member=" + peer_urls,
"--initial-cluster-state",
"existing",
"--listen-client-urls",
client_urls,
"--advertise-client-urls",
client_urls,
"--data-dir",
"/tmp/new_etcd_member",
};
new_etcd_pid = start_etcd_server("/usr/local/bin/etcd", args);
std::this_thread::sleep_for(std::chrono::seconds(30));
// Check the member's number
{
etcd::Response res = etcd.list_member().get();
REQUIRE(res.is_ok());
CHECK(2 == res.members().size());
}
}
TEST_CASE("member remove") {
etcd::Client etcd(etcd_url);
etcd::Response res = etcd.list_member().get();
REQUIRE(res.is_ok());
CHECK(2 == res.members().size());
uint64_t member_id = 0;
for (const auto& member : res.members()) {
if (member.get_name() == "new_etcd_member") {
member_id = member.get_id();
break;
}
}
REQUIRE(member_id != 0);
// Remove the new member
res = etcd.remove_member(member_id).get();
std::this_thread::sleep_for(std::chrono::seconds(30));
// Check whether the new etcd server is quited
{
int status;
pid_t wpid = waitpid(new_etcd_pid, &status, WNOHANG);
REQUIRE(wpid == new_etcd_pid);
REQUIRE(WIFEXITED(status));
REQUIRE(WEXITSTATUS(status) == 0);
}
// Remove the directory for the new etcd server
std::string cmd = "rm -rf /tmp/new_etcd_member";
system(cmd.c_str());
// Check the member's number
{
etcd::Response res = etcd.list_member().get();
REQUIRE(res.is_ok());
CHECK(1 == res.members().size());
}
}

30
tst/EtcdResolverTest.cpp Normal file
View File

@ -0,0 +1,30 @@
#define CATCH_CONFIG_MAIN
#include <catch.hpp>
#include <iostream>
#include "etcd/Client.hpp"
static const std::string etcd_v4_url =
etcdv3::detail::resolve_etcd_endpoints("http://127.0.0.1:2379");
static const std::string etcd_v6_url =
etcdv3::detail::resolve_etcd_endpoints("http://::1:2379");
// http://[ipv6]:port url
static const std::string etcd_ipv6_url =
etcdv3::detail::resolve_etcd_endpoints("http://[::1]:2379");
TEST_CASE("test ipv4 connection") {
std::cout << "ipv4 endpoints: " << etcd_v4_url << std::endl;
etcd::Client etcd(etcd_v4_url);
REQUIRE(etcd.head().get().is_ok());
}
TEST_CASE("test ipv6 connection") {
std::cout << "ipv6 endpoints: " << etcd_v6_url << std::endl;
etcd::Client etcd(etcd_v6_url);
REQUIRE(etcd.head().get().is_ok());
std::cout << "ipv6 endpoints: " << etcd_ipv6_url << std::endl;
etcd::Client etcd1(etcd_ipv6_url);
REQUIRE(etcd1.head().get().is_ok());
}

View File

@ -18,7 +18,7 @@ TEST_CASE("sync operations") {
// add
CHECK(0 == etcd.add("/test/key1", "42").error_code());
CHECK(etcd::ERROR_KEY_ALREADY_EXISTS ==
etcd.add("/test/key1", "42").error_code()); // Key already exists
etcd.add("/test/key1", "41").error_code()); // Key already exists
CHECK("42" == etcd.get("/test/key1").value().as_string());
// modify

View File

@ -31,7 +31,9 @@ TEST_CASE("keepalive revoke and check if alive") {
std::this_thread::sleep_for(std::chrono::seconds(1));
// expect keep_alive->Check() to throw exception
#ifndef _ETCD_NO_EXCEPTIONS
REQUIRE_THROWS(keepalive->Check());
#endif
}
TEST_CASE("keepalive won't expire") {
@ -45,6 +47,7 @@ TEST_CASE("keepalive won't expire") {
auto lease_id = resp.value().lease();
etcd.add(key, meta_str, lease_id);
#ifndef _ETCD_NO_EXCEPTIONS
std::function<void(std::exception_ptr)> handler =
[](std::exception_ptr eptr) {
try {
@ -57,6 +60,22 @@ TEST_CASE("keepalive won't expire") {
std::cerr << "Lease expiry \"" << e.what() << "\"\n";
}
};
#else
std::function<void(std::exception_ptr)> handler;
#endif
etcd::KeepAlive keepalive(etcd, handler, ttl, lease_id);
std::this_thread::sleep_for(std::chrono::seconds(5));
}
TEST_CASE("keepalive auto-grant") {
etcd::Client etcd(etcd_uri);
// create a lease without pre-granted lease id
auto keepalive = std::make_shared<etcd::KeepAlive>(etcd, 10 /* ttl */);
auto lease_id = keepalive->Lease();
REQUIRE(lease_id != 0);
// sleep for a while, and cancel
std::this_thread::sleep_for(std::chrono::seconds(5));
keepalive->Cancel();
}

View File

@ -117,6 +117,7 @@ TEST_CASE("lock using lease") {
bool failed = false;
#ifndef _ETCD_NO_EXCEPTIONS
std::function<void(std::exception_ptr)> handler =
[&failed](std::exception_ptr eptr) {
try {
@ -128,6 +129,9 @@ TEST_CASE("lock using lease") {
failed = true;
}
};
#else
std::function<void(std::exception_ptr)> handler;
#endif
// with handler
{

View File

@ -24,37 +24,31 @@ class DistributedLock {
DistributedLock::DistributedLock(const std::string& lock_name, uint timeout) {
_etcd_client = std::unique_ptr<etcd::Client>(new etcd::Client(etcd_url));
try {
if (timeout == 0) {
if (timeout == 0) {
etcd::Response resp = _etcd_client->lock(lock_name).get();
if (resp.is_ok()) {
_lock_key = resp.lock_key();
_acquired = true;
}
} else {
std::future<etcd::Response> future = std::async(std::launch::async, [&]() {
etcd::Response resp = _etcd_client->lock(lock_name).get();
return resp;
});
std::future_status status = future.wait_for(std::chrono::seconds(timeout));
if (status == std::future_status::ready) {
auto resp = future.get();
if (resp.is_ok()) {
_lock_key = resp.lock_key();
_acquired = true;
}
} else if (status == std::future_status::timeout) {
std::cerr << "failed to acquire distributed because of lock timeout"
<< std::endl;
} else {
std::future<etcd::Response> future =
std::async(std::launch::async, [&]() {
etcd::Response resp = _etcd_client->lock(lock_name).get();
return resp;
});
std::future_status status =
future.wait_for(std::chrono::seconds(timeout));
if (status == std::future_status::ready) {
auto resp = future.get();
if (resp.is_ok()) {
_lock_key = resp.lock_key();
_acquired = true;
}
} else if (status == std::future_status::timeout) {
std::cerr << "failed to acquire distributed because of lock timeout"
<< std::endl;
} else {
std::cerr << "failed to acquire distributed lock" << std::endl;
}
std::cerr << "failed to acquire distributed lock" << std::endl;
}
} catch (std::exception& e) {
std::cerr << "failed to construct: " << e.what() << std::endl;
}
}
@ -63,13 +57,9 @@ DistributedLock::~DistributedLock() noexcept {
return;
}
try {
auto resp = _etcd_client->unlock(_lock_key).get();
if (!resp.is_ok()) {
std::cout << resp.error_code() << std::endl;
}
} catch (std::exception& e) {
std::cerr << "failed to destruct: " << e.what() << std::endl;
auto resp = _etcd_client->unlock(_lock_key).get();
if (!resp.is_ok()) {
std::cout << resp.error_code() << std::endl;
}
}

View File

@ -37,6 +37,7 @@ void print_response(etcd::Response const& resp) {
void wait_for_connection(std::string endpoints) {
// wait until the client connects to etcd server
while (true) {
#ifndef _ETCD_NO_EXCEPTIONS
try {
etcd::Client client(endpoints);
if (client.head().get().is_ok()) {
@ -45,6 +46,12 @@ void wait_for_connection(std::string endpoints) {
} catch (...) {
// pass
}
#else
etcd::Client client(endpoints);
if (client.head().get().is_ok()) {
break;
}
#endif
sleep(1);
}
}
@ -85,6 +92,7 @@ TEST_CASE("watch should can be re-established") {
// issue some changes to see if the watcher works
for (int round = 0; round < 100000; ++round) {
#ifndef _ETCD_NO_EXCEPTIONS
try {
etcd::Client client(etcd_url);
auto response =
@ -92,6 +100,11 @@ TEST_CASE("watch should can be re-established") {
} catch (...) {
// pass
}
#else
etcd::Client client(etcd_url);
auto response =
client.set(my_prefix + "/foo", "bar-" + std::to_string(round)).get();
#endif
std::this_thread::sleep_for(std::chrono::seconds(2));
}
@ -101,6 +114,7 @@ TEST_CASE("watch should can be re-established") {
// the watcher has been cancelled and shouldn't work anymore
for (int round = 10; round < 20; ++round) {
#ifndef _ETCD_NO_EXCEPTIONS
try {
etcd::Client client(etcd_url);
auto response =
@ -108,6 +122,11 @@ TEST_CASE("watch should can be re-established") {
} catch (...) {
// pass
}
#else
etcd::Client client(etcd_url);
auto response =
client.set(my_prefix + "/foo", "bar-" + std::to_string(round)).get();
#endif
std::this_thread::sleep_for(std::chrono::seconds(2));
}

View File

@ -191,6 +191,41 @@ TEST_CASE("create two watcher") {
std::this_thread::sleep_for(std::chrono::seconds(5));
}
TEST_CASE("using two watcher") {
etcd::SyncClient etcd(etcd_url);
int watched1 = 0;
int watched2 = 0;
etcd::Watcher w1(
etcd, "/test/def",
[&](etcd::Response const& resp) {
std::cout << "w1 called: " << resp.events().at(0).event_type() << " on "
<< resp.events().at(0).kv().key() << std::endl;
++watched1;
},
true);
etcd::Watcher w2(
etcd, "/test",
[&](etcd::Response const& resp) {
std::cout << "w2 called: " << resp.events().at(0).event_type() << " on "
<< resp.events().at(0).kv().key() << std::endl;
++watched2;
},
true);
std::this_thread::sleep_for(std::chrono::seconds(5));
etcd.put("/test/def/xxx", "42");
etcd.put("/test/abc", "42");
etcd.rm("/test/def/xxx");
etcd.rm("/test/abc");
std::this_thread::sleep_for(std::chrono::seconds(5));
CHECK(2 == watched1);
CHECK(4 == watched2);
}
// TEST_CASE("request cancellation")
// {
// etcd::Client etcd(etcd_url);