diff --git a/CMakeLists.txt b/CMakeLists.txt index 80ebad2..6e93bf8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -166,6 +166,9 @@ include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/GenerateProtobufGRPC.cmake) if(gRPC_VERSION VERSION_LESS "1.21" OR gRPC_VERSION VERSION_GREATER "1.31") add_definitions(-DWITH_GRPC_CHANNEL_CLASS) endif() +if(gRPC_VERSION VERSION_LESS "1.17") + add_definitions(-DWITH_GRPC_CREATE_CHANNEL_INTERNAL_UNIQUE_POINTER) +endif() # will set `PROTOBUF_GENERATES`, indicates all generated .cc files, and a target `protobuf_generates`. include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/GenerateProtobuf.cmake) diff --git a/etcd/Watcher.hpp b/etcd/Watcher.hpp index 95a943d..ed037d3 100644 --- a/etcd/Watcher.hpp +++ b/etcd/Watcher.hpp @@ -162,8 +162,11 @@ class Watcher { * Note that you shouldn't use the watcher itself inside the `Wait()` callback * as the callback will be invoked in a separate **detached** thread where the * watcher may have been destroyed. + * + * @return true if the callback has been set successfully (no existing + * callback). */ - void Wait(std::function callback); + bool Wait(std::function callback); /** * Stop the watching action. diff --git a/src/KeepAlive.cpp b/src/KeepAlive.cpp index cb5b9bc..6660d83 100644 --- a/src/KeepAlive.cpp +++ b/src/KeepAlive.cpp @@ -181,8 +181,12 @@ void etcd::KeepAlive::refresh() { if (!continue_next.load()) { return; } - std::cerr << "Warn: awaked from condition_variable" + - " but continue_next is not set, maybe due to clock drift." << std::endl; +#ifndef NDEBUG + std::cerr + << "[warn] awaked from condition_variable but continue_next is " + "not set, maybe due to clock drift." + << std::endl; +#endif } } diff --git a/src/SyncClient.cpp b/src/SyncClient.cpp index 7ab9d77..158d783 100644 --- a/src/SyncClient.cpp +++ b/src/SyncClient.cpp @@ -37,6 +37,7 @@ #include #include #include +#include // for grpc_lame_client_channel_create() #include "proto/rpc.grpc.pb.h" #include "proto/v3election.grpc.pb.h" @@ -49,6 +50,22 @@ #include "etcd/v3/Transaction.hpp" #include "etcd/v3/action_constants.hpp" +namespace grpc { +// forward declaration for compatibility with older grpc versions +std::shared_ptr CreateChannelInternal( + const std::string& host, grpc_channel* c_channel, +#if defined(WITH_GRPC_CREATE_CHANNEL_INTERNAL_UNIQUE_POINTER) + std::unique_ptr>> + interceptor_creators +#else + std::vector< + std::unique_ptr> + interceptor_creators +#endif +); +} // namespace grpc + namespace etcd { namespace detail { @@ -70,7 +87,7 @@ static void string_split(std::vector& dests, } static std::string string_join(std::vector const& srcs, - std::string const sep) { + std::string const& sep) { std::stringstream ss; if (!srcs.empty()) { ss << srcs[0]; @@ -91,7 +108,9 @@ static bool dns_resolve(std::string const& target, std::vector target_parts; string_split(target_parts, target, ":"); if (target_parts.size() != 2) { - std::cerr << "warn: invalid URL: " << target << std::endl; +#ifndef NDEBUG + std::cerr << "[warn] invalid URL: " << target << std::endl; +#endif return false; } @@ -104,7 +123,10 @@ static bool dns_resolve(std::string const& target, int err = WSAStartup(wVersionRequested, &wsaData); if (err != 0) { // Tell the user that we could not find a usable Winsock DLL. - std::cerr << "WSAStartup failed with error: %d" << err << std::endl; +#ifndef NDEBUG + std::cerr << "[warn] WSAStartup failed with error: %d" << err + << std::endl; +#endif return false; } } @@ -113,8 +135,10 @@ static bool dns_resolve(std::string const& target, int r = getaddrinfo(target_parts[0].c_str(), target_parts[1].c_str(), &hints, &addrs); if (r != 0) { - std::cerr << "warn: getaddrinfo() failed for endpoint " << target +#ifndef NDEBUG + std::cerr << "[warn] getaddrinfo() failed for endpoint " << target << " with error: " << r << std::endl; +#endif return false; } @@ -175,10 +199,12 @@ static std::string read_from_file(std::string const& filename) { file.close(); return ss.str(); } else { - std::cerr << "[ERROR] failed to load given file '" << filename << "', " +#ifndef NDEBUG + std::cerr << "[error] failed to load given file '" << filename << "', " << strerror(errno) << std::endl; +#endif + return std::string{}; } - return std::string{}; } static grpc::SslCredentialsOptions make_ssl_credentials( @@ -195,6 +221,30 @@ std::unique_ptr make_unique_ptr(Args&&... args) { return std::unique_ptr(new T(std::forward(args)...)); } +static std::shared_ptr create_grpc_channel( + const std::string& address, + const std::shared_ptr creds, + const grpc::ChannelArguments& grpc_args) { + const std::string addresses = + etcd::detail::strip_and_resolve_addresses(address); + if (addresses.empty() || addresses == "ipv4:///") { + // bypass grpc initialization to avoid noisy logs from grpc + return grpc::CreateChannelInternal( + "", + grpc_lame_client_channel_create(addresses.c_str(), GRPC_STATUS_INTERNAL, + "the target uri is not valid"), +#if defined(WITH_GRPC_CREATE_CHANNEL_INTERNAL_UNIQUE_POINTER) + nullptr +#else + std::vector>() +#endif + ); + } else { + return grpc::CreateCustomChannel(addresses, creds, grpc_args); + } +} + } // namespace detail } // namespace etcd @@ -271,15 +321,13 @@ void etcd::SyncClient::EtcdServerStubsDeleter::operator()( etcd::SyncClient::SyncClient(std::string const& address, std::string const& load_balancer) { // create channels - std::string const addresses = - etcd::detail::strip_and_resolve_addresses(address); grpc::ChannelArguments grpc_args; grpc_args.SetMaxSendMessageSize(std::numeric_limits::max()); grpc_args.SetMaxReceiveMessageSize(std::numeric_limits::max()); std::shared_ptr creds = grpc::InsecureChannelCredentials(); grpc_args.SetLoadBalancingPolicyName(load_balancer); - this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args); + this->channel = etcd::detail::create_grpc_channel(address, creds, grpc_args); this->token_authenticator.reset(new TokenAuthenticator()); // create stubs @@ -294,14 +342,12 @@ etcd::SyncClient::SyncClient(std::string const& address, etcd::SyncClient::SyncClient(std::string const& address, grpc::ChannelArguments const& arguments) { // create channels - std::string const addresses = - etcd::detail::strip_and_resolve_addresses(address); grpc::ChannelArguments grpc_args = arguments; grpc_args.SetMaxSendMessageSize(std::numeric_limits::max()); grpc_args.SetMaxReceiveMessageSize(std::numeric_limits::max()); std::shared_ptr creds = grpc::InsecureChannelCredentials(); - this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args); + this->channel = etcd::detail::create_grpc_channel(address, creds, grpc_args); this->token_authenticator.reset(new TokenAuthenticator()); // create stubs @@ -329,15 +375,13 @@ etcd::SyncClient::SyncClient(std::string const& address, int const auth_token_ttl, std::string const& load_balancer) { // create channels - std::string const addresses = - etcd::detail::strip_and_resolve_addresses(address); grpc::ChannelArguments grpc_args; grpc_args.SetMaxSendMessageSize(std::numeric_limits::max()); grpc_args.SetMaxReceiveMessageSize(std::numeric_limits::max()); std::shared_ptr creds = grpc::InsecureChannelCredentials(); grpc_args.SetLoadBalancingPolicyName(load_balancer); - this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args); + this->channel = etcd::detail::create_grpc_channel(address, creds, grpc_args); // auth this->token_authenticator.reset(new TokenAuthenticator( @@ -358,14 +402,12 @@ etcd::SyncClient::SyncClient(std::string const& address, int const auth_token_ttl, grpc::ChannelArguments const& arguments) { // create channels - std::string const addresses = - etcd::detail::strip_and_resolve_addresses(address); grpc::ChannelArguments grpc_args = arguments; grpc_args.SetMaxSendMessageSize(std::numeric_limits::max()); grpc_args.SetMaxReceiveMessageSize(std::numeric_limits::max()); std::shared_ptr creds = grpc::InsecureChannelCredentials(); - this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args); + this->channel = etcd::detail::create_grpc_channel(address, creds, grpc_args); // auth this->token_authenticator.reset(new TokenAuthenticator( @@ -403,8 +445,6 @@ etcd::SyncClient::SyncClient(std::string const& address, std::string const& ca, std::string const& target_name_override, std::string const& load_balancer) { // create channels - std::string const addresses = - etcd::detail::strip_and_resolve_addresses(address); grpc::ChannelArguments grpc_args; grpc_args.SetMaxSendMessageSize(std::numeric_limits::max()); grpc_args.SetMaxReceiveMessageSize(std::numeric_limits::max()); @@ -415,7 +455,7 @@ etcd::SyncClient::SyncClient(std::string const& address, std::string const& ca, grpc_args.SetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG, target_name_override); } - this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args); + this->channel = etcd::detail::create_grpc_channel(address, creds, grpc_args); this->token_authenticator.reset(new TokenAuthenticator()); // setup stubs @@ -433,8 +473,6 @@ etcd::SyncClient::SyncClient(std::string const& address, std::string const& ca, std::string const& target_name_override, grpc::ChannelArguments const& arguments) { // create channels - std::string const addresses = - etcd::detail::strip_and_resolve_addresses(address); grpc::ChannelArguments grpc_args = arguments; grpc_args.SetMaxSendMessageSize(std::numeric_limits::max()); grpc_args.SetMaxReceiveMessageSize(std::numeric_limits::max()); @@ -444,7 +482,7 @@ etcd::SyncClient::SyncClient(std::string const& address, std::string const& ca, grpc_args.SetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG, target_name_override); } - this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args); + this->channel = etcd::detail::create_grpc_channel(address, creds, grpc_args); this->token_authenticator.reset(new TokenAuthenticator()); // setup stubs @@ -1005,15 +1043,15 @@ std::shared_ptr etcd::SyncClient::unlock_internal( if (p_keeps_alive != this->keep_alive_for_locks.end()) { this->keep_alive_for_locks.erase(p_keeps_alive); } else { -#if !defined(NDEBUG) - std::cerr << "Keepalive for lease not found" << std::endl; +#ifndef NDEBUG + std::cerr << "[warn] keepalive for lease not found" << std::endl; #endif } lock_lease_id = p_leases->second; this->leases_for_locks.erase(p_leases); } else { -#if !defined(NDEBUG) - std::cerr << "Lease for lock not found" << std::endl; +#ifndef NDEBUG + std::cerr << "[warn] lease for lock not found" << std::endl; #endif } if (lock_lease_id != 0) { diff --git a/src/Watcher.cpp b/src/Watcher.cpp index eacaa75..eb79661 100644 --- a/src/Watcher.cpp +++ b/src/Watcher.cpp @@ -229,13 +229,18 @@ bool etcd::Watcher::Wait() { return stubs->call->Cancelled(); } -void etcd::Watcher::Wait(std::function callback) { +bool etcd::Watcher::Wait(std::function callback) { if (wait_callback == nullptr) { wait_callback = callback; + return true; } else { - std::cerr << "Failed to set a asynchronous wait callback since it has " - "already been set" - << std::endl; +#ifndef NDEBUG + std::cerr + << "[warn] failed to set a asynchronous wait callback since it has " + "already been set" + << std::endl; +#endif + return false; } } diff --git a/src/v3/AsyncGRPC.cpp b/src/v3/AsyncGRPC.cpp index 0a8a9d2..38eec26 100644 --- a/src/v3/AsyncGRPC.cpp +++ b/src/v3/AsyncGRPC.cpp @@ -248,9 +248,12 @@ void etcdv3::AsyncTxnResponse::ParseResponse(TxnResponse& reply) { } // skip - std::cerr << "Not implemented error: unable to parse nested transaction " +#ifndef NDEBUG + std::cerr << "[debug] not implemented error: unable to parse nested " + "transaction " "response" << std::endl; +#endif } } if (!values.empty()) { @@ -629,17 +632,22 @@ void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive() { got_tag == (void*) etcdv3::KEEPALIVE_DONE) { // ok } else { - std::cerr << "Failed to mark a lease keep-alive connection as DONE: " - << context.debug_error_string() << std::endl; +#ifndef NDEBUG + std::cerr + << "[debug] failed to mark a lease keep-alive connection as DONE: " + << context.debug_error_string() << std::endl; +#endif } stream->Finish(&status, (void*) KEEPALIVE_FINISH); if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void*) KEEPALIVE_FINISH) { // ok } else { - std::cerr << "Failed to finish a lease keep-alive connection: " +#ifndef NDEBUG + std::cerr << "[debug] failed to finish a lease keep-alive connection: " << status.error_message() << ", " << context.debug_error_string() << std::endl; +#endif } // cancel on-the-fly calls @@ -820,8 +828,10 @@ void etcdv3::AsyncObserveAction::CancelObserve() { break; case CompletionQueue::NextStatus::GOT_EVENT: if (!ok || got_tag != (void*) ELECTION_OBSERVE_FINISH) { - std::cerr << "Failed to finish a election observing connection" +#ifndef NDEBUG + std::cerr << "[debug] failed to finish a election observing connection" << std::endl; +#endif } } @@ -1165,7 +1175,9 @@ void etcdv3::AsyncWatchAction::waitForResponse() { switch (cq_.AsyncNext(&got_tag, &ok, deadline)) { case CompletionQueue::NextStatus::TIMEOUT: case CompletionQueue::NextStatus::SHUTDOWN: { +#ifndef NDEBUG std::cerr << "[warn] watcher does't exit normally" << std::endl; +#endif // pretend to be received a "WATCH_FINISH" tag: shutdown context.TryCancel(); cq_.Shutdown(); @@ -1219,7 +1231,6 @@ void etcdv3::AsyncWatchAction::waitForResponse() { << std::endl; } - std::cout << "issue a watch cancel" << std::endl; // cancel the watcher after receiving the good response this->CancelWatch();