Fixes the noisy logs when meets invalid addresses. (#260)

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2023-12-20 00:32:00 +08:00 committed by GitHub
parent 84343ca9f0
commit 5aff57cce5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 105 additions and 41 deletions

View File

@ -166,6 +166,9 @@ include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/GenerateProtobufGRPC.cmake)
if(gRPC_VERSION VERSION_LESS "1.21" OR gRPC_VERSION VERSION_GREATER "1.31")
add_definitions(-DWITH_GRPC_CHANNEL_CLASS)
endif()
if(gRPC_VERSION VERSION_LESS "1.17")
add_definitions(-DWITH_GRPC_CREATE_CHANNEL_INTERNAL_UNIQUE_POINTER)
endif()
# will set `PROTOBUF_GENERATES`, indicates all generated .cc files, and a target `protobuf_generates`.
include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/GenerateProtobuf.cmake)

View File

@ -162,8 +162,11 @@ class Watcher {
* Note that you shouldn't use the watcher itself inside the `Wait()` callback
* as the callback will be invoked in a separate **detached** thread where the
* watcher may have been destroyed.
*
* @return true if the callback has been set successfully (no existing
* callback).
*/
void Wait(std::function<void(bool)> callback);
bool Wait(std::function<void(bool)> callback);
/**
* Stop the watching action.

View File

@ -181,8 +181,12 @@ void etcd::KeepAlive::refresh() {
if (!continue_next.load()) {
return;
}
std::cerr << "Warn: awaked from condition_variable" +
" but continue_next is not set, maybe due to clock drift." << std::endl;
#ifndef NDEBUG
std::cerr
<< "[warn] awaked from condition_variable but continue_next is "
"not set, maybe due to clock drift."
<< std::endl;
#endif
}
}

View File

@ -37,6 +37,7 @@
#include <grpc++/grpc++.h>
#include <grpc++/security/credentials.h>
#include <grpc++/support/status_code_enum.h>
#include <grpc/grpc.h> // for grpc_lame_client_channel_create()
#include "proto/rpc.grpc.pb.h"
#include "proto/v3election.grpc.pb.h"
@ -49,6 +50,22 @@
#include "etcd/v3/Transaction.hpp"
#include "etcd/v3/action_constants.hpp"
namespace grpc {
// forward declaration for compatibility with older grpc versions
std::shared_ptr<Channel> CreateChannelInternal(
const std::string& host, grpc_channel* c_channel,
#if defined(WITH_GRPC_CREATE_CHANNEL_INTERNAL_UNIQUE_POINTER)
std::unique_ptr<std::vector<
std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>
interceptor_creators
#else
std::vector<
std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
interceptor_creators
#endif
);
} // namespace grpc
namespace etcd {
namespace detail {
@ -70,7 +87,7 @@ static void string_split(std::vector<std::string>& dests,
}
static std::string string_join(std::vector<std::string> const& srcs,
std::string const sep) {
std::string const& sep) {
std::stringstream ss;
if (!srcs.empty()) {
ss << srcs[0];
@ -91,7 +108,9 @@ static bool dns_resolve(std::string const& target,
std::vector<std::string> target_parts;
string_split(target_parts, target, ":");
if (target_parts.size() != 2) {
std::cerr << "warn: invalid URL: " << target << std::endl;
#ifndef NDEBUG
std::cerr << "[warn] invalid URL: " << target << std::endl;
#endif
return false;
}
@ -104,7 +123,10 @@ static bool dns_resolve(std::string const& target,
int err = WSAStartup(wVersionRequested, &wsaData);
if (err != 0) {
// Tell the user that we could not find a usable Winsock DLL.
std::cerr << "WSAStartup failed with error: %d" << err << std::endl;
#ifndef NDEBUG
std::cerr << "[warn] WSAStartup failed with error: %d" << err
<< std::endl;
#endif
return false;
}
}
@ -113,8 +135,10 @@ static bool dns_resolve(std::string const& target,
int r = getaddrinfo(target_parts[0].c_str(), target_parts[1].c_str(), &hints,
&addrs);
if (r != 0) {
std::cerr << "warn: getaddrinfo() failed for endpoint " << target
#ifndef NDEBUG
std::cerr << "[warn] getaddrinfo() failed for endpoint " << target
<< " with error: " << r << std::endl;
#endif
return false;
}
@ -175,10 +199,12 @@ static std::string read_from_file(std::string const& filename) {
file.close();
return ss.str();
} else {
std::cerr << "[ERROR] failed to load given file '" << filename << "', "
#ifndef NDEBUG
std::cerr << "[error] failed to load given file '" << filename << "', "
<< strerror(errno) << std::endl;
#endif
return std::string{};
}
return std::string{};
}
static grpc::SslCredentialsOptions make_ssl_credentials(
@ -195,6 +221,30 @@ std::unique_ptr<T> make_unique_ptr(Args&&... args) {
return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
}
static std::shared_ptr<grpc::Channel> create_grpc_channel(
const std::string& address,
const std::shared_ptr<grpc::ChannelCredentials> creds,
const grpc::ChannelArguments& grpc_args) {
const std::string addresses =
etcd::detail::strip_and_resolve_addresses(address);
if (addresses.empty() || addresses == "ipv4:///") {
// bypass grpc initialization to avoid noisy logs from grpc
return grpc::CreateChannelInternal(
"",
grpc_lame_client_channel_create(addresses.c_str(), GRPC_STATUS_INTERNAL,
"the target uri is not valid"),
#if defined(WITH_GRPC_CREATE_CHANNEL_INTERNAL_UNIQUE_POINTER)
nullptr
#else
std::vector<std::unique_ptr<
grpc::experimental::ClientInterceptorFactoryInterface>>()
#endif
);
} else {
return grpc::CreateCustomChannel(addresses, creds, grpc_args);
}
}
} // namespace detail
} // namespace etcd
@ -271,15 +321,13 @@ void etcd::SyncClient::EtcdServerStubsDeleter::operator()(
etcd::SyncClient::SyncClient(std::string const& address,
std::string const& load_balancer) {
// create channels
std::string const addresses =
etcd::detail::strip_and_resolve_addresses(address);
grpc::ChannelArguments grpc_args;
grpc_args.SetMaxSendMessageSize(std::numeric_limits<int>::max());
grpc_args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
std::shared_ptr<grpc::ChannelCredentials> creds =
grpc::InsecureChannelCredentials();
grpc_args.SetLoadBalancingPolicyName(load_balancer);
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
this->channel = etcd::detail::create_grpc_channel(address, creds, grpc_args);
this->token_authenticator.reset(new TokenAuthenticator());
// create stubs
@ -294,14 +342,12 @@ etcd::SyncClient::SyncClient(std::string const& address,
etcd::SyncClient::SyncClient(std::string const& address,
grpc::ChannelArguments const& arguments) {
// create channels
std::string const addresses =
etcd::detail::strip_and_resolve_addresses(address);
grpc::ChannelArguments grpc_args = arguments;
grpc_args.SetMaxSendMessageSize(std::numeric_limits<int>::max());
grpc_args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
std::shared_ptr<grpc::ChannelCredentials> creds =
grpc::InsecureChannelCredentials();
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
this->channel = etcd::detail::create_grpc_channel(address, creds, grpc_args);
this->token_authenticator.reset(new TokenAuthenticator());
// create stubs
@ -329,15 +375,13 @@ etcd::SyncClient::SyncClient(std::string const& address,
int const auth_token_ttl,
std::string const& load_balancer) {
// create channels
std::string const addresses =
etcd::detail::strip_and_resolve_addresses(address);
grpc::ChannelArguments grpc_args;
grpc_args.SetMaxSendMessageSize(std::numeric_limits<int>::max());
grpc_args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
std::shared_ptr<grpc::ChannelCredentials> creds =
grpc::InsecureChannelCredentials();
grpc_args.SetLoadBalancingPolicyName(load_balancer);
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
this->channel = etcd::detail::create_grpc_channel(address, creds, grpc_args);
// auth
this->token_authenticator.reset(new TokenAuthenticator(
@ -358,14 +402,12 @@ etcd::SyncClient::SyncClient(std::string const& address,
int const auth_token_ttl,
grpc::ChannelArguments const& arguments) {
// create channels
std::string const addresses =
etcd::detail::strip_and_resolve_addresses(address);
grpc::ChannelArguments grpc_args = arguments;
grpc_args.SetMaxSendMessageSize(std::numeric_limits<int>::max());
grpc_args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
std::shared_ptr<grpc::ChannelCredentials> creds =
grpc::InsecureChannelCredentials();
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
this->channel = etcd::detail::create_grpc_channel(address, creds, grpc_args);
// auth
this->token_authenticator.reset(new TokenAuthenticator(
@ -403,8 +445,6 @@ etcd::SyncClient::SyncClient(std::string const& address, std::string const& ca,
std::string const& target_name_override,
std::string const& load_balancer) {
// create channels
std::string const addresses =
etcd::detail::strip_and_resolve_addresses(address);
grpc::ChannelArguments grpc_args;
grpc_args.SetMaxSendMessageSize(std::numeric_limits<int>::max());
grpc_args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
@ -415,7 +455,7 @@ etcd::SyncClient::SyncClient(std::string const& address, std::string const& ca,
grpc_args.SetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
target_name_override);
}
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
this->channel = etcd::detail::create_grpc_channel(address, creds, grpc_args);
this->token_authenticator.reset(new TokenAuthenticator());
// setup stubs
@ -433,8 +473,6 @@ etcd::SyncClient::SyncClient(std::string const& address, std::string const& ca,
std::string const& target_name_override,
grpc::ChannelArguments const& arguments) {
// create channels
std::string const addresses =
etcd::detail::strip_and_resolve_addresses(address);
grpc::ChannelArguments grpc_args = arguments;
grpc_args.SetMaxSendMessageSize(std::numeric_limits<int>::max());
grpc_args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
@ -444,7 +482,7 @@ etcd::SyncClient::SyncClient(std::string const& address, std::string const& ca,
grpc_args.SetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
target_name_override);
}
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
this->channel = etcd::detail::create_grpc_channel(address, creds, grpc_args);
this->token_authenticator.reset(new TokenAuthenticator());
// setup stubs
@ -1005,15 +1043,15 @@ std::shared_ptr<etcdv3::AsyncUnlockAction> etcd::SyncClient::unlock_internal(
if (p_keeps_alive != this->keep_alive_for_locks.end()) {
this->keep_alive_for_locks.erase(p_keeps_alive);
} else {
#if !defined(NDEBUG)
std::cerr << "Keepalive for lease not found" << std::endl;
#ifndef NDEBUG
std::cerr << "[warn] keepalive for lease not found" << std::endl;
#endif
}
lock_lease_id = p_leases->second;
this->leases_for_locks.erase(p_leases);
} else {
#if !defined(NDEBUG)
std::cerr << "Lease for lock not found" << std::endl;
#ifndef NDEBUG
std::cerr << "[warn] lease for lock not found" << std::endl;
#endif
}
if (lock_lease_id != 0) {

View File

@ -229,13 +229,18 @@ bool etcd::Watcher::Wait() {
return stubs->call->Cancelled();
}
void etcd::Watcher::Wait(std::function<void(bool)> callback) {
bool etcd::Watcher::Wait(std::function<void(bool)> callback) {
if (wait_callback == nullptr) {
wait_callback = callback;
return true;
} else {
std::cerr << "Failed to set a asynchronous wait callback since it has "
"already been set"
<< std::endl;
#ifndef NDEBUG
std::cerr
<< "[warn] failed to set a asynchronous wait callback since it has "
"already been set"
<< std::endl;
#endif
return false;
}
}

View File

@ -248,9 +248,12 @@ void etcdv3::AsyncTxnResponse::ParseResponse(TxnResponse& reply) {
}
// skip
std::cerr << "Not implemented error: unable to parse nested transaction "
#ifndef NDEBUG
std::cerr << "[debug] not implemented error: unable to parse nested "
"transaction "
"response"
<< std::endl;
#endif
}
}
if (!values.empty()) {
@ -629,17 +632,22 @@ void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive() {
got_tag == (void*) etcdv3::KEEPALIVE_DONE) {
// ok
} else {
std::cerr << "Failed to mark a lease keep-alive connection as DONE: "
<< context.debug_error_string() << std::endl;
#ifndef NDEBUG
std::cerr
<< "[debug] failed to mark a lease keep-alive connection as DONE: "
<< context.debug_error_string() << std::endl;
#endif
}
stream->Finish(&status, (void*) KEEPALIVE_FINISH);
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void*) KEEPALIVE_FINISH) {
// ok
} else {
std::cerr << "Failed to finish a lease keep-alive connection: "
#ifndef NDEBUG
std::cerr << "[debug] failed to finish a lease keep-alive connection: "
<< status.error_message() << ", "
<< context.debug_error_string() << std::endl;
#endif
}
// cancel on-the-fly calls
@ -820,8 +828,10 @@ void etcdv3::AsyncObserveAction::CancelObserve() {
break;
case CompletionQueue::NextStatus::GOT_EVENT:
if (!ok || got_tag != (void*) ELECTION_OBSERVE_FINISH) {
std::cerr << "Failed to finish a election observing connection"
#ifndef NDEBUG
std::cerr << "[debug] failed to finish a election observing connection"
<< std::endl;
#endif
}
}
@ -1165,7 +1175,9 @@ void etcdv3::AsyncWatchAction::waitForResponse() {
switch (cq_.AsyncNext(&got_tag, &ok, deadline)) {
case CompletionQueue::NextStatus::TIMEOUT:
case CompletionQueue::NextStatus::SHUTDOWN: {
#ifndef NDEBUG
std::cerr << "[warn] watcher does't exit normally" << std::endl;
#endif
// pretend to be received a "WATCH_FINISH" tag: shutdown
context.TryCancel();
cq_.Shutdown();
@ -1219,7 +1231,6 @@ void etcdv3::AsyncWatchAction::waitForResponse() {
<< std::endl;
}
std::cout << "issue a watch cancel" << std::endl;
// cancel the watcher after receiving the good response
this->CancelWatch();