Optimize the implementation of error handling in keep alive. (#54)
Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
parent
cee938fb0a
commit
3e30c4c61d
|
|
@ -12,6 +12,7 @@
|
|||
|
||||
namespace etcdv3 {
|
||||
class AsyncWatchAction;
|
||||
class AsyncLeaseKeepAliveAction;
|
||||
class V3Response;
|
||||
}
|
||||
|
||||
|
|
@ -139,6 +140,7 @@ namespace etcd
|
|||
std::chrono::microseconds _duration; // execute duration (in microseconds), during the action created and response parsed
|
||||
friend class SyncClient;
|
||||
friend class etcdv3::AsyncWatchAction;
|
||||
friend class etcdv3::AsyncLeaseKeepAliveAction;
|
||||
friend class Client;
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@
|
|||
#include "proto/rpc.grpc.pb.h"
|
||||
#include "etcd/v3/Action.hpp"
|
||||
#include "etcd/v3/AsyncLeaseResponse.hpp"
|
||||
#include "etcd/Response.hpp"
|
||||
|
||||
using grpc::ClientAsyncResponseReader;
|
||||
using grpc::ClientAsyncReaderWriter;
|
||||
|
|
@ -45,7 +46,7 @@ namespace etcdv3
|
|||
AsyncLeaseKeepAliveAction(etcdv3::ActionParameters param);
|
||||
AsyncLeaseKeepAliveResponse ParseResponse();
|
||||
|
||||
AsyncLeaseKeepAliveResponse Refresh();
|
||||
etcd::Response Refresh();
|
||||
void CancelKeepAlive();
|
||||
bool Cancelled() const;
|
||||
|
||||
|
|
|
|||
|
|
@ -152,9 +152,18 @@ void etcd::KeepAlive::refresh()
|
|||
std::cerr << "keepalive timer error: " << error << ", " << error.message() << std::endl;
|
||||
#endif
|
||||
} else {
|
||||
this->stubs->call->Refresh();
|
||||
if (this->continue_next) {
|
||||
auto resp = this->stubs->call->Refresh();
|
||||
if (!resp.is_ok()) {
|
||||
throw std::runtime_error("Failed to refresh lease: error code: " + std::to_string(resp.error_code()) +
|
||||
", message: " + resp.error_message());
|
||||
}
|
||||
if (resp.value().ttl() == -1) {
|
||||
throw std::runtime_error("Failed to refresh lease due to expiration: the new TTL is -1.");
|
||||
}
|
||||
// trigger the next round;
|
||||
this->refresh();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -82,12 +82,16 @@ etcdv3::AsyncLeaseKeepAliveResponse etcdv3::AsyncLeaseKeepAliveAction::ParseResp
|
|||
return lease_resp;
|
||||
}
|
||||
|
||||
etcdv3::AsyncLeaseKeepAliveResponse etcdv3::AsyncLeaseKeepAliveAction::Refresh()
|
||||
etcd::Response etcdv3::AsyncLeaseKeepAliveAction::Refresh()
|
||||
{
|
||||
std::lock_guard<std::mutex> scope_lock(this->protect_is_cancelled);
|
||||
|
||||
auto start_timepoint = std::chrono::high_resolution_clock::now();
|
||||
if (isCancelled) {
|
||||
return ParseResponse();
|
||||
auto resp = ParseResponse();
|
||||
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
|
||||
std::chrono::high_resolution_clock::now() - start_timepoint);
|
||||
return etcd::Response(resp, duration);
|
||||
}
|
||||
|
||||
LeaseKeepAliveRequest leasekeepalive_request;
|
||||
|
|
@ -102,10 +106,13 @@ etcdv3::AsyncLeaseKeepAliveResponse etcdv3::AsyncLeaseKeepAliveAction::Refresh()
|
|||
stream->Read(&reply, (void*)"keepalive read");
|
||||
// wait read finish
|
||||
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)"keepalive read") {
|
||||
return ParseResponse();
|
||||
auto resp = ParseResponse();
|
||||
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
|
||||
std::chrono::high_resolution_clock::now() - start_timepoint);
|
||||
return etcd::Response(resp, duration);
|
||||
}
|
||||
}
|
||||
throw std::runtime_error("Failed to create a lease keep-alive connection");
|
||||
return etcd::Response(grpc::StatusCode::ABORTED, "Failed to create a lease keep-alive connection");
|
||||
}
|
||||
|
||||
void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive()
|
||||
|
|
|
|||
|
|
@ -81,10 +81,10 @@ TEST_CASE("create watcher")
|
|||
etcd::Watcher watcher(etcd_uri, "/test", printResponse, true);
|
||||
std::this_thread::sleep_for(std::chrono::seconds(3));
|
||||
etcd.set("/test/key", "42");
|
||||
std::this_thread::sleep_for(std::chrono::seconds(3));
|
||||
etcd.set("/test/key", "43");
|
||||
std::this_thread::sleep_for(std::chrono::seconds(3));
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::seconds(10));
|
||||
CHECK(2 == watcher_called);
|
||||
etcd.rmdir("/test", true).error_code();
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue