Fixes memory leak in shutting down gRPC streams.

Resolve #86.

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2021-09-23 12:47:29 +08:00
parent d386bb96b0
commit 9729858da4
6 changed files with 151 additions and 3 deletions

View File

@ -160,6 +160,7 @@ jobs:
./build/bin/EtcdSyncTest
./build/bin/EtcdTest
./build/bin/LockTest
./build/bin/MemLeakTest
./build/bin/WatcherTest
./build/bin/ElectionTest

View File

@ -251,8 +251,6 @@ etcd::Client::Client(std::string const & address,
grpc_args.SetLoadBalancingPolicyName(load_balancer);
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
std::cout << "this->channel : " << this->channel;
// setup stubs
stubs.reset(new EtcdServerStubs{});
stubs->kvServiceStub = KV::NewStub(this->channel);

View File

@ -190,9 +190,17 @@ void etcdv3::AsyncObserveAction::CancelObserve()
{
std::lock_guard<std::mutex> scope_lock(this->protect_is_cancalled);
if (!isCancelled.exchange(true)) {
void* got_tag;
bool ok = false;
response_reader->Finish(&status, (void *)this);
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)this) {
// ok
} else {
std::cerr << "Failed to finish a election observing connection" << std::endl;
}
cq_.Shutdown();
}
response_reader->Finish(&status, (void *)this);
}
bool etcdv3::AsyncObserveAction::Cancelled() const {

View File

@ -130,9 +130,25 @@ void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive()
if(isCancelled == false)
{
isCancelled = true;
void *got_tag = nullptr;
bool ok = false;
stream->WritesDone((void*)etcdv3::KEEPALIVE_DONE);
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_DONE) {
// ok
} else {
std::cerr << "Failed to mark a lease keep-alive connection as DONE" << std::endl;
}
grpc::Status status;
stream->Finish(&status, (void *)this);
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)this) {
// ok
} else {
std::cerr << "Failed to finish a lease keep-alive connection" << std::endl;
}
cq_.Shutdown();
}
}

View File

@ -61,6 +61,9 @@ void etcdv3::AsyncWatchAction::waitForResponse()
{
break;
}
if(isCancelled.load()) {
break;
}
if(got_tag == (void*)etcdv3::WATCH_WRITES_DONE) {
isCancelled.store(true);
cq_.Shutdown();
@ -79,9 +82,22 @@ void etcdv3::AsyncWatchAction::waitForResponse()
// 1. watch for a future revision, return immediately with empty events set
// 2. receive any effective events.
isCancelled.store(true);
stream->WritesDone((void*)etcdv3::WATCH_WRITES_DONE);
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::WATCH_WRITES_DONE) {
// ok
} else {
std::cerr << "WARN: Failed to mark a watch connection as DONE" << std::endl;
}
grpc::Status status;
stream->Finish(&status, (void *)this);
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)this) {
// ok
} else {
std::cerr << "WARN: Failed to finish a watch connection" << std::endl;
}
cq_.Shutdown();
// leave a warning if the response is too large and been fragmented
@ -102,9 +118,24 @@ void etcdv3::AsyncWatchAction::CancelWatch()
{
std::lock_guard<std::mutex> scope_lock(this->protect_is_cancalled);
if (!isCancelled.exchange(true)) {
void* got_tag;
bool ok = false;
stream->WritesDone((void*)etcdv3::WATCH_WRITES_DONE);
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::WATCH_WRITES_DONE) {
// ok
} else {
std::cerr << "WARN: Failed to mark a watch connection as DONE" << std::endl;
}
grpc::Status status;
stream->Finish(&status, (void *)this);
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)this) {
// ok
} else {
std::cerr << "WARN: Failed to finish a watch connection" << std::endl;
}
cq_.Shutdown();
}
}
@ -124,6 +155,9 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response
{
break;
}
if(isCancelled.load()) {
break;
}
if(got_tag == (void*)etcdv3::WATCH_WRITES_DONE)
{
isCancelled.store(true);

91
tst/MemLeakTest.cpp Normal file
View File

@ -0,0 +1,91 @@
// See also: https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/issues/86
#include <future>
#include <memory>
#include "etcd/Client.hpp"
#include "etcd/KeepAlive.hpp"
class DistributedLock {
public:
DistributedLock(const std::string &lock_name,
uint timeout = 0);
~DistributedLock() noexcept;
inline bool lock_acquired() {
return _acquired;
}
private:
bool _acquired = false;
std::string _lock_key;
std::unique_ptr<::etcd::Client> _etcd_client;
};
DistributedLock::DistributedLock(const std::string &lock_name,
uint timeout) {
_etcd_client = std::unique_ptr<etcd::Client>(new etcd::Client("localhost:2379"));
try {
if (timeout == 0) {
etcd::Response resp = _etcd_client->lock(lock_name).get();
if (resp.is_ok()) {
_lock_key = resp.lock_key();
_acquired = true;
}
} else {
std::future<etcd::Response> future = std::async(std::launch::async, [&]() {
etcd::Response resp = _etcd_client->lock(lock_name).get();
return resp;
});
std::future_status status = future.wait_for(std::chrono::seconds(timeout));
if (status == std::future_status::ready) {
auto resp = future.get();
if (resp.is_ok()) {
_lock_key = resp.lock_key();
_acquired = true;
}
} else if (status == std::future_status::timeout) {
std::cerr << "failed to acquire distributed because of lock timeout" << std::endl;
} else {
std::cerr << "failed to acquire distributed lock" << std::endl;
}
}
} catch (std::exception &e) {
throw e;
}
}
DistributedLock::~DistributedLock() noexcept {
if (!_acquired) {
return;
}
try {
auto resp = _etcd_client->unlock(_lock_key).get();
if (!resp.is_ok()) {
std::cout << resp.error_code() << std::endl;
}
} catch (std::exception &e) {
throw e;
}
}
int main() {
int i = 0, t = 0;
while(t < 500) {
{
DistributedLock lock(std::to_string(i), 0);
if(!lock.lock_acquired()) {
std::cerr << "failed to acquire lock" << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
++i;
++t;
if (i == 10) {
i = 0;
}
std::cout << "round: i = " << i << ", t = " << t << std::endl;
}
}