[#127] set timeout for keeplive grpc

This commit is contained in:
cheng.li 2022-05-02 17:56:41 +08:00
parent f21c45b362
commit 14d2450e11
3 changed files with 9 additions and 6 deletions

View File

@ -46,7 +46,7 @@ namespace etcdv3
AsyncLeaseKeepAliveAction(etcdv3::ActionParameters const &param); AsyncLeaseKeepAliveAction(etcdv3::ActionParameters const &param);
AsyncLeaseKeepAliveResponse ParseResponse(); AsyncLeaseKeepAliveResponse ParseResponse();
etcd::Response Refresh(); etcd::Response Refresh(int ttl);
void CancelKeepAlive(); void CancelKeepAlive();
bool Cancelled() const; bool Cancelled() const;

View File

@ -141,7 +141,7 @@ void etcd::KeepAlive::refresh()
#endif #endif
} else { } else {
if (this->continue_next.load()) { if (this->continue_next.load()) {
auto resp = this->stubs->call->Refresh(); auto resp = this->stubs->call->Refresh(ttl);
if (!resp.is_ok()) { if (!resp.is_ok()) {
throw std::runtime_error("Failed to refresh lease: error code: " + std::to_string(resp.error_code()) + throw std::runtime_error("Failed to refresh lease: error code: " + std::to_string(resp.error_code()) +
", message: " + resp.error_message()); ", message: " + resp.error_message());

View File

@ -91,7 +91,7 @@ etcdv3::AsyncLeaseKeepAliveResponse etcdv3::AsyncLeaseKeepAliveAction::ParseResp
return lease_resp; return lease_resp;
} }
etcd::Response etcdv3::AsyncLeaseKeepAliveAction::Refresh() etcd::Response etcdv3::AsyncLeaseKeepAliveAction::Refresh(int ttl)
{ {
std::lock_guard<std::mutex> scope_lock(this->protect_is_cancelled); std::lock_guard<std::mutex> scope_lock(this->protect_is_cancelled);
@ -108,13 +108,16 @@ etcd::Response etcdv3::AsyncLeaseKeepAliveAction::Refresh()
void *got_tag = nullptr; void *got_tag = nullptr;
bool ok = false; bool ok = false;
auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(ttl);
stream->Write(leasekeepalive_request, (void *)etcdv3::KEEPALIVE_WRITE); stream->Write(leasekeepalive_request, (void *)etcdv3::KEEPALIVE_WRITE);
// wait write finish // wait write finish
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_WRITE) { if (cq_.AsyncNext(&got_tag, &ok, deadline) == CompletionQueue::NextStatus::GOT_EVENT &&
ok && got_tag == (void *)etcdv3::KEEPALIVE_WRITE) {
stream->Read(&reply, (void*)etcdv3::KEEPALIVE_READ); stream->Read(&reply, (void*)etcdv3::KEEPALIVE_READ);
// wait read finish // wait read finish
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_READ) { if (cq_.AsyncNext(&got_tag, &ok, deadline) == CompletionQueue::NextStatus::GOT_EVENT &&
ok && got_tag == (void *)etcdv3::KEEPALIVE_READ) {
auto resp = ParseResponse(); auto resp = ParseResponse();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>( auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - start_timepoint); std::chrono::high_resolution_clock::now() - start_timepoint);