Protect implicit keepalive maps using a lexical scoped lock.

Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
This commit is contained in:
Tao He 2021-01-12 01:11:27 +08:00
parent cfb5cb89d3
commit e5c804416c
2 changed files with 24 additions and 14 deletions

View File

@ -4,6 +4,7 @@
#include "etcd/Response.hpp" #include "etcd/Response.hpp"
#include <map> #include <map>
#include <mutex>
#include <string> #include <string>
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
@ -262,6 +263,7 @@ namespace etcd
std::unique_ptr<Lease::Stub> leaseServiceStub; std::unique_ptr<Lease::Stub> leaseServiceStub;
std::unique_ptr<Lock::Stub> lockServiceStub; std::unique_ptr<Lock::Stub> lockServiceStub;
std::mutex mutex_for_keepalives;
std::map<std::string, int64_t> leases_for_locks; std::map<std::string, int64_t> leases_for_locks;
std::map<int64_t, std::shared_ptr<KeepAlive>> keep_alive_for_locks; std::map<int64_t, std::shared_ptr<KeepAlive>> keep_alive_for_locks;

View File

@ -523,7 +523,11 @@ pplx::task<etcd::Response> etcd::Client::lock(std::string const &key) {
// (base on our experiences in vineyard and GraphScope). // (base on our experiences in vineyard and GraphScope).
auto resp = this->leasegrant(DEFAULT_LEASE_TTL_FOR_LOCK).get(); auto resp = this->leasegrant(DEFAULT_LEASE_TTL_FOR_LOCK).get();
int64_t lease_id = resp.value().lease(); int64_t lease_id = resp.value().lease();
this->keep_alive_for_locks[lease_id].reset(new KeepAlive(*this, DEFAULT_LEASE_TTL_FOR_LOCK, lease_id)); {
std::lock_guard<std::mutex> lexical_scope_lock(mutex_for_keepalives);
this->keep_alive_for_locks[lease_id].reset(
new KeepAlive(*this, DEFAULT_LEASE_TTL_FOR_LOCK, lease_id));
}
params.key = key; params.key = key;
params.lease_id = lease_id; params.lease_id = lease_id;
params.lock_stub = lockServiceStub.get(); params.lock_stub = lockServiceStub.get();
@ -531,10 +535,13 @@ pplx::task<etcd::Response> etcd::Client::lock(std::string const &key) {
return Response::create(call).then( return Response::create(call).then(
[this, lease_id](pplx::task<etcd::Response> const &resp_task) -> etcd::Response { [this, lease_id](pplx::task<etcd::Response> const &resp_task) -> etcd::Response {
auto const& resp = resp_task.get(); auto const& resp = resp_task.get();
if (resp.is_ok()) { {
this->leases_for_locks[resp.lock_key()] = lease_id; std::lock_guard<std::mutex> lexical_scope_lock(mutex_for_keepalives);
} else { if (resp.is_ok()) {
this->keep_alive_for_locks.erase(lease_id); this->leases_for_locks[resp.lock_key()] = lease_id;
} else {
this->keep_alive_for_locks.erase(lease_id);
}
} }
return resp; return resp;
} }
@ -555,16 +562,17 @@ pplx::task<etcd::Response> etcd::Client::lock(std::string const &key,
pplx::task<etcd::Response> etcd::Client::unlock(std::string const &lock_key) { pplx::task<etcd::Response> etcd::Client::unlock(std::string const &lock_key) {
std::cout << "begin unlock" << std::endl; std::cout << "begin unlock" << std::endl;
// cancel the KeepAlive first, it exists // cancel the KeepAlive first, it exists
auto p_leases = this->leases_for_locks.find(lock_key); {
if (p_leases != this->leases_for_locks.end()) { std::lock_guard<std::mutex> lexical_scope_lock(mutex_for_keepalives);
std::cout << "Unlock for " << lock_key << " and revoke lease " << std::hex << p_leases->second << std::dec << std::endl; auto p_leases = this->leases_for_locks.find(lock_key);
auto p_keeps_alive = this->keep_alive_for_locks.find(p_leases->second); if (p_leases != this->leases_for_locks.end()) {
if (p_keeps_alive != this->keep_alive_for_locks.end()) { std::cout << "Unlock for " << lock_key << " and revoke lease " << std::hex << p_leases->second << std::dec << std::endl;
this->keep_alive_for_locks.erase(p_keeps_alive); auto p_keeps_alive = this->keep_alive_for_locks.find(p_leases->second);
if (p_keeps_alive != this->keep_alive_for_locks.end()) {
this->keep_alive_for_locks.erase(p_keeps_alive);
}
this->leases_for_locks.erase(p_leases);
} }
this->leases_for_locks.erase(p_leases);
} else {
std::cout << "Unable to find lease_id for " << lock_key;
} }
// issue a "unlock" request // issue a "unlock" request