From 14d2450e11e323f4397e026284ed0943c225bda7 Mon Sep 17 00:00:00 2001 From: "cheng.li" Date: Mon, 2 May 2022 17:56:41 +0800 Subject: [PATCH] [#127] set timeout for keeplive grpc --- etcd/v3/AsyncLeaseAction.hpp | 2 +- src/KeepAlive.cpp | 2 +- src/v3/AsyncLeaseAction.cpp | 11 +++++++---- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/etcd/v3/AsyncLeaseAction.hpp b/etcd/v3/AsyncLeaseAction.hpp index 2ae8466..489ade0 100644 --- a/etcd/v3/AsyncLeaseAction.hpp +++ b/etcd/v3/AsyncLeaseAction.hpp @@ -46,7 +46,7 @@ namespace etcdv3 AsyncLeaseKeepAliveAction(etcdv3::ActionParameters const ¶m); AsyncLeaseKeepAliveResponse ParseResponse(); - etcd::Response Refresh(); + etcd::Response Refresh(int ttl); void CancelKeepAlive(); bool Cancelled() const; diff --git a/src/KeepAlive.cpp b/src/KeepAlive.cpp index c298913..8649b50 100644 --- a/src/KeepAlive.cpp +++ b/src/KeepAlive.cpp @@ -141,7 +141,7 @@ void etcd::KeepAlive::refresh() #endif } else { if (this->continue_next.load()) { - auto resp = this->stubs->call->Refresh(); + auto resp = this->stubs->call->Refresh(ttl); if (!resp.is_ok()) { throw std::runtime_error("Failed to refresh lease: error code: " + std::to_string(resp.error_code()) + ", message: " + resp.error_message()); diff --git a/src/v3/AsyncLeaseAction.cpp b/src/v3/AsyncLeaseAction.cpp index 5c9925f..2ac604c 100644 --- a/src/v3/AsyncLeaseAction.cpp +++ b/src/v3/AsyncLeaseAction.cpp @@ -91,7 +91,7 @@ etcdv3::AsyncLeaseKeepAliveResponse etcdv3::AsyncLeaseKeepAliveAction::ParseResp return lease_resp; } -etcd::Response etcdv3::AsyncLeaseKeepAliveAction::Refresh() +etcd::Response etcdv3::AsyncLeaseKeepAliveAction::Refresh(int ttl) { std::lock_guard scope_lock(this->protect_is_cancelled); @@ -108,16 +108,19 @@ etcd::Response etcdv3::AsyncLeaseKeepAliveAction::Refresh() void *got_tag = nullptr; bool ok = false; + auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(ttl); stream->Write(leasekeepalive_request, (void *)etcdv3::KEEPALIVE_WRITE); // wait write finish - if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_WRITE) { + if (cq_.AsyncNext(&got_tag, &ok, deadline) == CompletionQueue::NextStatus::GOT_EVENT && + ok && got_tag == (void *)etcdv3::KEEPALIVE_WRITE) { stream->Read(&reply, (void*)etcdv3::KEEPALIVE_READ); // wait read finish - if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_READ) { + if (cq_.AsyncNext(&got_tag, &ok, deadline) == CompletionQueue::NextStatus::GOT_EVENT && + ok && got_tag == (void *)etcdv3::KEEPALIVE_READ) { auto resp = ParseResponse(); auto duration = std::chrono::duration_cast( - std::chrono::high_resolution_clock::now() - start_timepoint); + std::chrono::high_resolution_clock::now() - start_timepoint); return etcd::Response(resp, duration); } }