From 6eed82a76643384f4d11a59ce62fd4be4e18a575 Mon Sep 17 00:00:00 2001 From: Tao He Date: Thu, 23 Sep 2021 14:51:29 +0800 Subject: [PATCH] Fixes memory leak in shutting down gRPC streams. (#88) * Fixes memory leak in shutting down gRPC streams. Resolve #86. Signed-off-by: Tao He --- .github/workflows/build-test.yml | 1 + src/Client.cpp | 2 - src/v3/AsyncElectionAction.cpp | 10 +++- src/v3/AsyncLeaseAction.cpp | 16 ++++++ src/v3/AsyncWatchAction.cpp | 17 +++++- tst/MemLeakTest.cpp | 91 ++++++++++++++++++++++++++++++++ 6 files changed, 132 insertions(+), 5 deletions(-) create mode 100644 tst/MemLeakTest.cpp diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index 3b38887..65f09a8 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -160,6 +160,7 @@ jobs: ./build/bin/EtcdSyncTest ./build/bin/EtcdTest ./build/bin/LockTest + ./build/bin/MemLeakTest ./build/bin/WatcherTest ./build/bin/ElectionTest diff --git a/src/Client.cpp b/src/Client.cpp index 5e50617..deebb18 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -251,8 +251,6 @@ etcd::Client::Client(std::string const & address, grpc_args.SetLoadBalancingPolicyName(load_balancer); this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args); - std::cout << "this->channel : " << this->channel; - // setup stubs stubs.reset(new EtcdServerStubs{}); stubs->kvServiceStub = KV::NewStub(this->channel); diff --git a/src/v3/AsyncElectionAction.cpp b/src/v3/AsyncElectionAction.cpp index d1387de..77dc624 100644 --- a/src/v3/AsyncElectionAction.cpp +++ b/src/v3/AsyncElectionAction.cpp @@ -190,9 +190,17 @@ void etcdv3::AsyncObserveAction::CancelObserve() { std::lock_guard scope_lock(this->protect_is_cancalled); if (!isCancelled.exchange(true)) { + void* got_tag; + bool ok = false; + response_reader->Finish(&status, (void *)this); + if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)this) { + // ok + } else { + std::cerr << "Failed to finish a election observing connection" << std::endl; + } + cq_.Shutdown(); } - response_reader->Finish(&status, (void *)this); } bool etcdv3::AsyncObserveAction::Cancelled() const { diff --git a/src/v3/AsyncLeaseAction.cpp b/src/v3/AsyncLeaseAction.cpp index 6881c22..4899777 100644 --- a/src/v3/AsyncLeaseAction.cpp +++ b/src/v3/AsyncLeaseAction.cpp @@ -130,9 +130,25 @@ void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive() if(isCancelled == false) { isCancelled = true; + + void *got_tag = nullptr; + bool ok = false; + stream->WritesDone((void*)etcdv3::KEEPALIVE_DONE); + if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_DONE) { + // ok + } else { + std::cerr << "Failed to mark a lease keep-alive connection as DONE" << std::endl; + } + grpc::Status status; stream->Finish(&status, (void *)this); + if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)this) { + // ok + } else { + std::cerr << "Failed to finish a lease keep-alive connection" << std::endl; + } + cq_.Shutdown(); } } diff --git a/src/v3/AsyncWatchAction.cpp b/src/v3/AsyncWatchAction.cpp index 14ee80f..ef630de 100644 --- a/src/v3/AsyncWatchAction.cpp +++ b/src/v3/AsyncWatchAction.cpp @@ -61,6 +61,9 @@ void etcdv3::AsyncWatchAction::waitForResponse() { break; } + if(isCancelled.load()) { + break; + } if(got_tag == (void*)etcdv3::WATCH_WRITES_DONE) { isCancelled.store(true); cq_.Shutdown(); @@ -71,6 +74,7 @@ void etcdv3::AsyncWatchAction::waitForResponse() if (reply.canceled()) { isCancelled.store(true); cq_.Shutdown(); + break; } else if ((reply.created() && reply.header().revision() < parameters.revision) || reply.events_size() > 0) { @@ -79,22 +83,26 @@ void etcdv3::AsyncWatchAction::waitForResponse() // 1. watch for a future revision, return immediately with empty events set // 2. receive any effective events. isCancelled.store(true); + stream->WritesDone((void*)etcdv3::WATCH_WRITES_DONE); + grpc::Status status; stream->Finish(&status, (void *)this); + cq_.Shutdown(); // leave a warning if the response is too large and been fragmented if (reply.fragment()) { std::cerr << "WARN: The response hasn't been fully received and parsed" << std::endl; } + break; } else { // otherwise, start next round read-reply stream->Read(&reply, (void*)this); - } - } + } + } } } @@ -103,8 +111,10 @@ void etcdv3::AsyncWatchAction::CancelWatch() std::lock_guard scope_lock(this->protect_is_cancalled); if (!isCancelled.exchange(true)) { stream->WritesDone((void*)etcdv3::WATCH_WRITES_DONE); + grpc::Status status; stream->Finish(&status, (void *)this); + cq_.Shutdown(); } } @@ -124,6 +134,9 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::function +#include + +#include "etcd/Client.hpp" +#include "etcd/KeepAlive.hpp" + +class DistributedLock { +public: + DistributedLock(const std::string &lock_name, + uint timeout = 0); + ~DistributedLock() noexcept; + inline bool lock_acquired() { + return _acquired; + } + +private: + bool _acquired = false; + std::string _lock_key; + std::unique_ptr<::etcd::Client> _etcd_client; +}; + +DistributedLock::DistributedLock(const std::string &lock_name, + uint timeout) { + _etcd_client = std::unique_ptr(new etcd::Client("localhost:2379")); + + try { + if (timeout == 0) { + etcd::Response resp = _etcd_client->lock(lock_name).get(); + if (resp.is_ok()) { + _lock_key = resp.lock_key(); + _acquired = true; + } + } else { + std::future future = std::async(std::launch::async, [&]() { + etcd::Response resp = _etcd_client->lock(lock_name).get(); + return resp; + }); + + std::future_status status = future.wait_for(std::chrono::seconds(timeout)); + if (status == std::future_status::ready) { + auto resp = future.get(); + if (resp.is_ok()) { + _lock_key = resp.lock_key(); + _acquired = true; + } + } else if (status == std::future_status::timeout) { + std::cerr << "failed to acquire distributed because of lock timeout" << std::endl; + } else { + std::cerr << "failed to acquire distributed lock" << std::endl; + } + } + } catch (std::exception &e) { + std::cerr << "failed to construct: " << e.what() << std::endl; + } +} + +DistributedLock::~DistributedLock() noexcept { + if (!_acquired) { + return; + } + + try { + auto resp = _etcd_client->unlock(_lock_key).get(); + if (!resp.is_ok()) { + std::cout << resp.error_code() << std::endl; + } + } catch (std::exception &e) { + std::cerr << "failed to destruct: " << e.what() << std::endl; + } +} + +int main() { + int i = 0, t = 0; + while(t < 10 /* update this value to make it run for longer */) { + { + DistributedLock lock(std::to_string(i), 0); + if(!lock.lock_acquired()) { + std::cerr << "failed to acquire lock" << std::endl; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ++i; + ++t; + if (i == 10) { + i = 0; + } + std::cout << "round: i = " << i << ", t = " << t << std::endl; + } +}