Fixes memory leak in shutting down gRPC streams. (#88)

* Fixes memory leak in shutting down gRPC streams.

Resolve #86.

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2021-09-23 14:51:29 +08:00 committed by GitHub
parent d386bb96b0
commit 6eed82a766
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 132 additions and 5 deletions

View File

@ -160,6 +160,7 @@ jobs:
./build/bin/EtcdSyncTest ./build/bin/EtcdSyncTest
./build/bin/EtcdTest ./build/bin/EtcdTest
./build/bin/LockTest ./build/bin/LockTest
./build/bin/MemLeakTest
./build/bin/WatcherTest ./build/bin/WatcherTest
./build/bin/ElectionTest ./build/bin/ElectionTest

View File

@ -251,8 +251,6 @@ etcd::Client::Client(std::string const & address,
grpc_args.SetLoadBalancingPolicyName(load_balancer); grpc_args.SetLoadBalancingPolicyName(load_balancer);
this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args); this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args);
std::cout << "this->channel : " << this->channel;
// setup stubs // setup stubs
stubs.reset(new EtcdServerStubs{}); stubs.reset(new EtcdServerStubs{});
stubs->kvServiceStub = KV::NewStub(this->channel); stubs->kvServiceStub = KV::NewStub(this->channel);

View File

@ -190,9 +190,17 @@ void etcdv3::AsyncObserveAction::CancelObserve()
{ {
std::lock_guard<std::mutex> scope_lock(this->protect_is_cancalled); std::lock_guard<std::mutex> scope_lock(this->protect_is_cancalled);
if (!isCancelled.exchange(true)) { if (!isCancelled.exchange(true)) {
void* got_tag;
bool ok = false;
response_reader->Finish(&status, (void *)this);
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)this) {
// ok
} else {
std::cerr << "Failed to finish a election observing connection" << std::endl;
}
cq_.Shutdown(); cq_.Shutdown();
} }
response_reader->Finish(&status, (void *)this);
} }
bool etcdv3::AsyncObserveAction::Cancelled() const { bool etcdv3::AsyncObserveAction::Cancelled() const {

View File

@ -130,9 +130,25 @@ void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive()
if(isCancelled == false) if(isCancelled == false)
{ {
isCancelled = true; isCancelled = true;
void *got_tag = nullptr;
bool ok = false;
stream->WritesDone((void*)etcdv3::KEEPALIVE_DONE); stream->WritesDone((void*)etcdv3::KEEPALIVE_DONE);
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_DONE) {
// ok
} else {
std::cerr << "Failed to mark a lease keep-alive connection as DONE" << std::endl;
}
grpc::Status status; grpc::Status status;
stream->Finish(&status, (void *)this); stream->Finish(&status, (void *)this);
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)this) {
// ok
} else {
std::cerr << "Failed to finish a lease keep-alive connection" << std::endl;
}
cq_.Shutdown(); cq_.Shutdown();
} }
} }

View File

@ -61,6 +61,9 @@ void etcdv3::AsyncWatchAction::waitForResponse()
{ {
break; break;
} }
if(isCancelled.load()) {
break;
}
if(got_tag == (void*)etcdv3::WATCH_WRITES_DONE) { if(got_tag == (void*)etcdv3::WATCH_WRITES_DONE) {
isCancelled.store(true); isCancelled.store(true);
cq_.Shutdown(); cq_.Shutdown();
@ -71,6 +74,7 @@ void etcdv3::AsyncWatchAction::waitForResponse()
if (reply.canceled()) { if (reply.canceled()) {
isCancelled.store(true); isCancelled.store(true);
cq_.Shutdown(); cq_.Shutdown();
break;
} }
else if ((reply.created() && reply.header().revision() < parameters.revision) || else if ((reply.created() && reply.header().revision() < parameters.revision) ||
reply.events_size() > 0) { reply.events_size() > 0) {
@ -79,22 +83,26 @@ void etcdv3::AsyncWatchAction::waitForResponse()
// 1. watch for a future revision, return immediately with empty events set // 1. watch for a future revision, return immediately with empty events set
// 2. receive any effective events. // 2. receive any effective events.
isCancelled.store(true); isCancelled.store(true);
stream->WritesDone((void*)etcdv3::WATCH_WRITES_DONE); stream->WritesDone((void*)etcdv3::WATCH_WRITES_DONE);
grpc::Status status; grpc::Status status;
stream->Finish(&status, (void *)this); stream->Finish(&status, (void *)this);
cq_.Shutdown(); cq_.Shutdown();
// leave a warning if the response is too large and been fragmented // leave a warning if the response is too large and been fragmented
if (reply.fragment()) { if (reply.fragment()) {
std::cerr << "WARN: The response hasn't been fully received and parsed" << std::endl; std::cerr << "WARN: The response hasn't been fully received and parsed" << std::endl;
} }
break;
} }
else else
{ {
// otherwise, start next round read-reply // otherwise, start next round read-reply
stream->Read(&reply, (void*)this); stream->Read(&reply, (void*)this);
} }
} }
} }
} }
@ -103,8 +111,10 @@ void etcdv3::AsyncWatchAction::CancelWatch()
std::lock_guard<std::mutex> scope_lock(this->protect_is_cancalled); std::lock_guard<std::mutex> scope_lock(this->protect_is_cancalled);
if (!isCancelled.exchange(true)) { if (!isCancelled.exchange(true)) {
stream->WritesDone((void*)etcdv3::WATCH_WRITES_DONE); stream->WritesDone((void*)etcdv3::WATCH_WRITES_DONE);
grpc::Status status; grpc::Status status;
stream->Finish(&status, (void *)this); stream->Finish(&status, (void *)this);
cq_.Shutdown(); cq_.Shutdown();
} }
} }
@ -124,6 +134,9 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response
{ {
break; break;
} }
if(isCancelled.load()) {
break;
}
if(got_tag == (void*)etcdv3::WATCH_WRITES_DONE) if(got_tag == (void*)etcdv3::WATCH_WRITES_DONE)
{ {
isCancelled.store(true); isCancelled.store(true);

91
tst/MemLeakTest.cpp Normal file
View File

@ -0,0 +1,91 @@
// See also: https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/issues/86
#include <future>
#include <memory>
#include "etcd/Client.hpp"
#include "etcd/KeepAlive.hpp"
class DistributedLock {
public:
DistributedLock(const std::string &lock_name,
uint timeout = 0);
~DistributedLock() noexcept;
inline bool lock_acquired() {
return _acquired;
}
private:
bool _acquired = false;
std::string _lock_key;
std::unique_ptr<::etcd::Client> _etcd_client;
};
DistributedLock::DistributedLock(const std::string &lock_name,
uint timeout) {
_etcd_client = std::unique_ptr<etcd::Client>(new etcd::Client("localhost:2379"));
try {
if (timeout == 0) {
etcd::Response resp = _etcd_client->lock(lock_name).get();
if (resp.is_ok()) {
_lock_key = resp.lock_key();
_acquired = true;
}
} else {
std::future<etcd::Response> future = std::async(std::launch::async, [&]() {
etcd::Response resp = _etcd_client->lock(lock_name).get();
return resp;
});
std::future_status status = future.wait_for(std::chrono::seconds(timeout));
if (status == std::future_status::ready) {
auto resp = future.get();
if (resp.is_ok()) {
_lock_key = resp.lock_key();
_acquired = true;
}
} else if (status == std::future_status::timeout) {
std::cerr << "failed to acquire distributed because of lock timeout" << std::endl;
} else {
std::cerr << "failed to acquire distributed lock" << std::endl;
}
}
} catch (std::exception &e) {
std::cerr << "failed to construct: " << e.what() << std::endl;
}
}
DistributedLock::~DistributedLock() noexcept {
if (!_acquired) {
return;
}
try {
auto resp = _etcd_client->unlock(_lock_key).get();
if (!resp.is_ok()) {
std::cout << resp.error_code() << std::endl;
}
} catch (std::exception &e) {
std::cerr << "failed to destruct: " << e.what() << std::endl;
}
}
int main() {
int i = 0, t = 0;
while(t < 10 /* update this value to make it run for longer */) {
{
DistributedLock lock(std::to_string(i), 0);
if(!lock.lock_acquired()) {
std::cerr << "failed to acquire lock" << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
++i;
++t;
if (i == 10) {
i = 0;
}
std::cout << "round: i = " << i << ", t = " << t << std::endl;
}
}