From f77bea59b370a42679053861a98878d94aaea683 Mon Sep 17 00:00:00 2001 From: Tao He Date: Wed, 7 Apr 2021 21:09:14 +0800 Subject: [PATCH] Optimize the implementation of error handling in keep alive. Signed-off-by: Tao He --- etcd/Response.hpp | 2 ++ etcd/v3/AsyncLeaseAction.hpp | 3 ++- src/KeepAlive.cpp | 15 ++++++++++++--- src/v3/AsyncLeaseAction.cpp | 15 +++++++++++---- tst/WatcherTest.cpp | 2 +- 5 files changed, 28 insertions(+), 9 deletions(-) diff --git a/etcd/Response.hpp b/etcd/Response.hpp index 8a51677..e7833f4 100644 --- a/etcd/Response.hpp +++ b/etcd/Response.hpp @@ -12,6 +12,7 @@ namespace etcdv3 { class AsyncWatchAction; + class AsyncLeaseKeepAliveAction; class V3Response; } @@ -139,6 +140,7 @@ namespace etcd std::chrono::microseconds _duration; // execute duration (in microseconds), during the action created and response parsed friend class SyncClient; friend class etcdv3::AsyncWatchAction; + friend class etcdv3::AsyncLeaseKeepAliveAction; friend class Client; }; } diff --git a/etcd/v3/AsyncLeaseAction.hpp b/etcd/v3/AsyncLeaseAction.hpp index 88c9943..8af599b 100644 --- a/etcd/v3/AsyncLeaseAction.hpp +++ b/etcd/v3/AsyncLeaseAction.hpp @@ -7,6 +7,7 @@ #include "proto/rpc.grpc.pb.h" #include "etcd/v3/Action.hpp" #include "etcd/v3/AsyncLeaseResponse.hpp" +#include "etcd/Response.hpp" using grpc::ClientAsyncResponseReader; using grpc::ClientAsyncReaderWriter; @@ -45,7 +46,7 @@ namespace etcdv3 AsyncLeaseKeepAliveAction(etcdv3::ActionParameters param); AsyncLeaseKeepAliveResponse ParseResponse(); - AsyncLeaseKeepAliveResponse Refresh(); + etcd::Response Refresh(); void CancelKeepAlive(); bool Cancelled() const; diff --git a/src/KeepAlive.cpp b/src/KeepAlive.cpp index 14be257..6ef8830 100644 --- a/src/KeepAlive.cpp +++ b/src/KeepAlive.cpp @@ -152,9 +152,18 @@ void etcd::KeepAlive::refresh() std::cerr << "keepalive timer error: " << error << ", " << error.message() << std::endl; #endif } else { - this->stubs->call->Refresh(); - // trigger the next round; - this->refresh(); + if (this->continue_next) { + auto resp = this->stubs->call->Refresh(); + if (!resp.is_ok()) { + throw std::runtime_error("Failed to refresh lease: error code: " + std::to_string(resp.error_code()) + + ", message: " + resp.error_message()); + } + if (resp.value().ttl() == -1) { + throw std::runtime_error("Failed to refresh lease due to expiration: the new TTL is -1."); + } + // trigger the next round; + this->refresh(); + } } }); } diff --git a/src/v3/AsyncLeaseAction.cpp b/src/v3/AsyncLeaseAction.cpp index d867e1a..2ef12aa 100644 --- a/src/v3/AsyncLeaseAction.cpp +++ b/src/v3/AsyncLeaseAction.cpp @@ -82,12 +82,16 @@ etcdv3::AsyncLeaseKeepAliveResponse etcdv3::AsyncLeaseKeepAliveAction::ParseResp return lease_resp; } -etcdv3::AsyncLeaseKeepAliveResponse etcdv3::AsyncLeaseKeepAliveAction::Refresh() +etcd::Response etcdv3::AsyncLeaseKeepAliveAction::Refresh() { std::lock_guard scope_lock(this->protect_is_cancelled); + auto start_timepoint = std::chrono::high_resolution_clock::now(); if (isCancelled) { - return ParseResponse(); + auto resp = ParseResponse(); + auto duration = std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - start_timepoint); + return etcd::Response(resp, duration); } LeaseKeepAliveRequest leasekeepalive_request; @@ -102,10 +106,13 @@ etcdv3::AsyncLeaseKeepAliveResponse etcdv3::AsyncLeaseKeepAliveAction::Refresh() stream->Read(&reply, (void*)"keepalive read"); // wait read finish if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)"keepalive read") { - return ParseResponse(); + auto resp = ParseResponse(); + auto duration = std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - start_timepoint); + return etcd::Response(resp, duration); } } - throw std::runtime_error("Failed to create a lease keep-alive connection"); + return etcd::Response(grpc::StatusCode::ABORTED, "Failed to create a lease keep-alive connection"); } void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive() diff --git a/tst/WatcherTest.cpp b/tst/WatcherTest.cpp index ff541f6..c57d4b6 100644 --- a/tst/WatcherTest.cpp +++ b/tst/WatcherTest.cpp @@ -81,10 +81,10 @@ TEST_CASE("create watcher") etcd::Watcher watcher(etcd_uri, "/test", printResponse, true); std::this_thread::sleep_for(std::chrono::seconds(3)); etcd.set("/test/key", "42"); + std::this_thread::sleep_for(std::chrono::seconds(3)); etcd.set("/test/key", "43"); std::this_thread::sleep_for(std::chrono::seconds(3)); } - std::this_thread::sleep_for(std::chrono::seconds(10)); CHECK(2 == watcher_called); etcd.rmdir("/test", true).error_code(); }