Fixes lock, and the underlying keepalive implmentation.

The previous implementation is buggy when a lot of locks happen at the same time,
as the cpprestsdk's threadpool use a fixed number of thread for posix platform:

https://github.com/microsoft/cpprestsdk/blob/master/Release/src/pplx/threadpool.cpp#L198

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
lidongze0629 2021-05-02 17:36:29 +08:00 committed by Tao He
parent bb22ef4d7f
commit 84cdcb0438
9 changed files with 193 additions and 92 deletions

View File

@ -421,7 +421,7 @@ Users can also feed their own lease directory for lock:
```c++ ```c++
etcd::Client etcd("http://127.0.0.1:4001"); etcd::Client etcd("http://127.0.0.1:4001");
etcd.lock("/test/lock", lease_id); etcd.lock_with_lease("/test/lock", lease_id);
``` ```
### Watching for changes ### Watching for changes
@ -510,6 +510,13 @@ The lease can be revoked by
etcd.leaserevoke(resp.value().lease()); etcd.leaserevoke(resp.value().lease());
``` ```
A lease can also be attached with a `KeepAlive` object at the creation time,
```c++
std::shared_ptr<etcd::KeepAlive> keepalive = etcd.leasekeepalive(60).get();
std::cout << "lease id: " << keepalive->Lease();
```
The remaining time-to-live of a lease can be inspected by The remaining time-to-live of a lease can be inspected by
```c++ ```c++

View File

@ -327,6 +327,12 @@ namespace etcd
*/ */
pplx::task<Response> leasegrant(int ttl); pplx::task<Response> leasegrant(int ttl);
/**
* Grants a lease.
* @param ttl is the time to live of the lease
*/
pplx::task<std::shared_ptr<KeepAlive>> leasekeepalive(int ttl);
/** /**
* Revoke a lease. * Revoke a lease.
* @param lease_id is the id the lease * @param lease_id is the id the lease
@ -359,7 +365,7 @@ namespace etcd
* of by the library. * of by the library.
* @param key is the key to be used to request the lock. * @param key is the key to be used to request the lock.
*/ */
pplx::task<Response> lock(std::string const &key, int64_t lease_id); pplx::task<Response> lock_with_lease(std::string const &key, int64_t lease_id);
/** /**
* Releases a lock at a key. * Releases a lock at a key.

View File

@ -1,9 +1,11 @@
#ifndef __ETCD_KEEPALIVE_HPP__ #ifndef __ETCD_KEEPALIVE_HPP__
#define __ETCD_KEEPALIVE_HPP__ #define __ETCD_KEEPALIVE_HPP__
#include <atomic>
#include <exception> #include <exception>
#include <functional> #include <functional>
#include <string> #include <string>
#include <thread>
#include "etcd/Client.hpp" #include "etcd/Client.hpp"
#include "etcd/Response.hpp" #include "etcd/Response.hpp"
@ -19,7 +21,7 @@
namespace etcd namespace etcd
{ {
/** /**
* If ID is set to 0, etcd will choose an ID. * If ID is set to 0, the library will choose an ID, and can be accessed from ".Lease()".
*/ */
class KeepAlive class KeepAlive
{ {
@ -43,10 +45,11 @@ namespace etcd
std::function<void (std::exception_ptr)> const &handler, std::function<void (std::exception_ptr)> const &handler,
int ttl, int64_t lease_id=0); int ttl, int64_t lease_id=0);
KeepAlive(KeepAlive const &) = delete; KeepAlive(KeepAlive const &) = delete;
KeepAlive(KeepAlive &&) = delete; KeepAlive(KeepAlive &&) = delete;
int64_t Lease() const { return lease_id; }
/** /**
* Stop the keep alive action. * Stop the keep alive action.
*/ */
@ -64,7 +67,6 @@ namespace etcd
protected: protected:
void refresh(); void refresh();
pplx::task<void> currentTask;
struct EtcdServerStubs; struct EtcdServerStubs;
struct EtcdServerStubsDeleter { struct EtcdServerStubsDeleter {
void operator()(EtcdServerStubs *stubs); void operator()(EtcdServerStubs *stubs);
@ -76,9 +78,13 @@ namespace etcd
std::exception_ptr eptr_; std::exception_ptr eptr_;
std::function<void (std::exception_ptr)> handler_; std::function<void (std::exception_ptr)> handler_;
// Don't use `pplx::task` to avoid sharing thread pool with other actions on the client
// to avoid any potential blocking, which may block the keepalive loop and evict the lease.
std::thread task_;
int ttl; int ttl;
int64_t lease_id; int64_t lease_id;
bool continue_next; std::atomic_bool continue_next;
#if BOOST_VERSION >= 106600 #if BOOST_VERSION >= 106600
boost::asio::io_context context; boost::asio::io_context context;
#else #else

View File

@ -32,20 +32,42 @@ namespace etcd
{ {
return pplx::task<etcd::Response>([call]() return pplx::task<etcd::Response>([call]()
{ {
etcd::Response resp;
call->waitForResponse(); call->waitForResponse();
auto v3resp = call->ParseResponse(); auto v3resp = call->ParseResponse();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>( auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - call->startTimepoint()); std::chrono::high_resolution_clock::now() - call->startTimepoint());
resp = etcd::Response(v3resp, duration); return etcd::Response(v3resp, duration);
return resp;
}); });
} }
template <typename T>
static pplx::task<etcd::Response> create(std::function<std::shared_ptr<T>()> callfn)
{
return pplx::task<etcd::Response>([callfn]()
{
auto call = callfn();
call->waitForResponse();
auto v3resp = call->ParseResponse();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - call->startTimepoint());
return etcd::Response(v3resp, duration);
});
}
template <typename T>
static etcd::Response create_sync(std::shared_ptr<T> call)
{
call->waitForResponse();
auto v3resp = call->ParseResponse();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - call->startTimepoint());
return etcd::Response(v3resp, duration);
}
Response(); Response();
/** /**

View File

@ -1,7 +1,9 @@
#ifndef __ETCD_WATCHER_HPP__ #ifndef __ETCD_WATCHER_HPP__
#define __ETCD_WATCHER_HPP__ #define __ETCD_WATCHER_HPP__
#include <functional>
#include <string> #include <string>
#include <thread>
#include "etcd/Client.hpp" #include "etcd/Client.hpp"
#include "etcd/Response.hpp" #include "etcd/Response.hpp"
@ -81,7 +83,11 @@ namespace etcd
int index; int index;
std::function<void(Response)> callback; std::function<void(Response)> callback;
pplx::task<void> currentTask; std::function<void(bool)> wait_callback;
// Don't use `pplx::task` to avoid sharing thread pool with other actions on the client
// to avoid any potential blocking, which may block the keepalive loop and evict the lease.
std::thread task_;
struct EtcdServerStubs; struct EtcdServerStubs;
struct EtcdServerStubsDeleter { struct EtcdServerStubsDeleter {

View File

@ -10,11 +10,13 @@
#include <sys/socket.h> #include <sys/socket.h>
#endif #endif
#include <chrono>
#include <iostream> #include <iostream>
#include <fstream> #include <fstream>
#include <sstream>
#include <limits> #include <limits>
#include <memory> #include <memory>
#include <sstream>
#include <thread>
#include <boost/algorithm/string.hpp> #include <boost/algorithm/string.hpp>
@ -641,12 +643,30 @@ pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, std::str
pplx::task<etcd::Response> etcd::Client::leasegrant(int ttl) pplx::task<etcd::Response> etcd::Client::leasegrant(int ttl)
{ {
etcdv3::ActionParameters params; // lease grant is special, that we are expected the callback could be invoked
params.auth_token.assign(this->auth_token); // immediately after the lease is granted by the server.
params.ttl = ttl; return Response::create<etcdv3::AsyncLeaseGrantAction>([this, ttl]() {
params.lease_stub = stubs->leaseServiceStub.get(); etcdv3::ActionParameters params;
std::shared_ptr<etcdv3::AsyncLeaseGrantAction> call(new etcdv3::AsyncLeaseGrantAction(params)); params.auth_token.assign(this->auth_token);
return Response::create(call); params.ttl = ttl;
params.lease_stub = stubs->leaseServiceStub.get();
return std::make_shared<etcdv3::AsyncLeaseGrantAction>(params);
});
}
pplx::task<std::shared_ptr<etcd::KeepAlive>> etcd::Client::leasekeepalive(int ttl) {
return pplx::task<std::shared_ptr<etcd::KeepAlive>>([this, ttl]()
{
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.ttl = ttl;
params.lease_stub = stubs->leaseServiceStub.get();
auto call = std::make_shared<etcdv3::AsyncLeaseGrantAction>(params);
call->waitForResponse();
auto v3resp = call->ParseResponse();
return std::make_shared<KeepAlive>(*this, ttl, v3resp.get_value().kvs.lease());
});
} }
pplx::task<etcd::Response> etcd::Client::leaserevoke(int64_t lease_id) pplx::task<etcd::Response> etcd::Client::leaserevoke(int64_t lease_id)
@ -678,37 +698,37 @@ pplx::task<etcd::Response> etcd::Client::lock(std::string const &key) {
} }
pplx::task<etcd::Response> etcd::Client::lock(std::string const &key, int lease_ttl) { pplx::task<etcd::Response> etcd::Client::lock(std::string const &key, int lease_ttl) {
etcdv3::ActionParameters params; return this->leasekeepalive(lease_ttl).then([this, key](
params.auth_token.assign(this->auth_token); pplx::task<std::shared_ptr<etcd::KeepAlive>> const& resp_task) {
auto const &keepalive = resp_task.get();
auto resp = this->leasegrant(lease_ttl).get(); int64_t lease_id = keepalive->Lease();
int64_t lease_id = resp.value().lease(); {
{ std::lock_guard<std::mutex> lexical_scope_lock(mutex_for_keepalives);
std::lock_guard<std::mutex> lexical_scope_lock(mutex_for_keepalives); this->keep_alive_for_locks[lease_id] = keepalive;
this->keep_alive_for_locks[lease_id].reset(
new KeepAlive(*this, lease_ttl, lease_id));
}
params.key = key;
params.lease_id = lease_id;
params.lock_stub = stubs->lockServiceStub.get();
std::shared_ptr<etcdv3::AsyncLockAction> call(new etcdv3::AsyncLockAction(params));
return Response::create(call).then(
[this, lease_id](pplx::task<etcd::Response> const &resp_task) -> etcd::Response {
auto const& resp = resp_task.get();
{
std::lock_guard<std::mutex> lexical_scope_lock(mutex_for_keepalives);
if (resp.is_ok()) {
this->leases_for_locks[resp.lock_key()] = lease_id;
} else {
this->keep_alive_for_locks.erase(lease_id);
}
}
return resp;
} }
);
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key = key;
params.lease_id = lease_id;
params.lock_stub = stubs->lockServiceStub.get();
std::shared_ptr<etcdv3::AsyncLockAction> call(new etcdv3::AsyncLockAction(params));
auto lock_resp = Response::create_sync(call);
{
std::lock_guard<std::mutex> lexical_scope_lock(mutex_for_keepalives);
if (lock_resp.is_ok()) {
this->leases_for_locks[lock_resp.lock_key()] = lease_id;
} else {
this->keep_alive_for_locks.erase(lease_id);
}
}
return lock_resp;
});
} }
pplx::task<etcd::Response> etcd::Client::lock(std::string const &key, pplx::task<etcd::Response> etcd::Client::lock_with_lease(std::string const &key,
int64_t lease_id) { int64_t lease_id) {
etcdv3::ActionParameters params; etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token); params.auth_token.assign(this->auth_token);
@ -720,7 +740,14 @@ pplx::task<etcd::Response> etcd::Client::lock(std::string const &key,
} }
pplx::task<etcd::Response> etcd::Client::unlock(std::string const &lock_key) { pplx::task<etcd::Response> etcd::Client::unlock(std::string const &lock_key) {
// cancel the KeepAlive first, it exists // issue a "unlock" request
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key = lock_key;
params.lock_stub = stubs->lockServiceStub.get();
std::shared_ptr<etcdv3::AsyncUnlockAction> call(new etcdv3::AsyncUnlockAction(params));
// cancel the KeepAlive first, if it exists
{ {
std::lock_guard<std::mutex> lexical_scope_lock(mutex_for_keepalives); std::lock_guard<std::mutex> lexical_scope_lock(mutex_for_keepalives);
auto p_leases = this->leases_for_locks.find(lock_key); auto p_leases = this->leases_for_locks.find(lock_key);
@ -728,17 +755,20 @@ pplx::task<etcd::Response> etcd::Client::unlock(std::string const &lock_key) {
auto p_keeps_alive = this->keep_alive_for_locks.find(p_leases->second); auto p_keeps_alive = this->keep_alive_for_locks.find(p_leases->second);
if (p_keeps_alive != this->keep_alive_for_locks.end()) { if (p_keeps_alive != this->keep_alive_for_locks.end()) {
this->keep_alive_for_locks.erase(p_keeps_alive); this->keep_alive_for_locks.erase(p_keeps_alive);
} else {
#if !defined(NDEBUG)
std::cerr << "Keepalive for lease not found" << std::endl;
#endif
} }
this->leases_for_locks.erase(p_leases); this->leases_for_locks.erase(p_leases);
} else {
#if !defined(NDEBUG)
std::cerr << "Lease for lock not found" << std::endl;
#endif
} }
} }
// issue a "unlock" request // wait in the io_context loop.
etcdv3::ActionParameters params;
params.auth_token.assign(this->auth_token);
params.key = lock_key;
params.lock_stub = stubs->lockServiceStub.get();
std::shared_ptr<etcdv3::AsyncUnlockAction> call(new etcdv3::AsyncUnlockAction(params));
return Response::create(call); return Response::create(call);
} }

View File

@ -31,8 +31,10 @@ etcd::KeepAlive::KeepAlive(Client const &client, int ttl, int64_t lease_id):
params.lease_id = this->lease_id; params.lease_id = this->lease_id;
params.lease_stub = stubs->leaseServiceStub.get(); params.lease_stub = stubs->leaseServiceStub.get();
continue_next.store(true);
stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params)); stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params));
currentTask = pplx::task<void>([this]() { task_ = std::thread([this]() {
try { try {
// start refresh // start refresh
this->refresh(); this->refresh();
@ -67,7 +69,7 @@ etcd::KeepAlive::KeepAlive(Client const &client,
params.lease_stub = stubs->leaseServiceStub.get(); params.lease_stub = stubs->leaseServiceStub.get();
stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params)); stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params));
currentTask = pplx::task<void>([this]() { task_ = std::thread([this]() {
try { try {
// start refresh // start refresh
this->refresh(); this->refresh();
@ -78,8 +80,8 @@ etcd::KeepAlive::KeepAlive(Client const &client,
} else { } else {
eptr_ = std::current_exception(); eptr_ = std::current_exception();
} }
this->Cancel();
} }
context.stop(); // clean up
}); });
} }
@ -103,23 +105,17 @@ etcd::KeepAlive::~KeepAlive()
void etcd::KeepAlive::Cancel() void etcd::KeepAlive::Cancel()
{ {
if (!continue_next) { if (!continue_next.exchange(false)) {
return; return;
} }
continue_next = false;
#ifndef NDEBUG
{
std::ios::fmtflags os_flags (std::cout.flags());
std::cout << "Cancel keepalive for " << lease_id
<< "(" << std::hex << lease_id << ")" << std::endl;
std::cout.flags(os_flags);
}
#endif
stubs->call->CancelKeepAlive(); stubs->call->CancelKeepAlive();
if (keepalive_timer_) { if (keepalive_timer_) {
keepalive_timer_->cancel(); keepalive_timer_->cancel();
} }
currentTask.wait();
// clean up
context.stop();
task_.join();
} }
void etcd::KeepAlive::Check() { void etcd::KeepAlive::Check() {
@ -130,29 +126,20 @@ void etcd::KeepAlive::Check() {
void etcd::KeepAlive::refresh() void etcd::KeepAlive::refresh()
{ {
if (!continue_next) { if (!continue_next.load()) {
return; return;
} }
// minimal resolution: 1 second // minimal resolution: 1 second
int keepalive_ttl = std::max(ttl - 1, 1); int keepalive_ttl = std::max(ttl - 1, 1);
#ifndef NDEBUG
{
std::ios::fmtflags os_flags (std::cout.flags());
std::cout << "Trigger the next keepalive round with ttl " << keepalive_ttl
<< " for " << lease_id
<< "(" << std::hex << lease_id << ")" << std::endl;
std::cout.flags(os_flags);
}
#endif
keepalive_timer_.reset(new boost::asio::steady_timer( keepalive_timer_.reset(new boost::asio::steady_timer(
context, std::chrono::seconds(keepalive_ttl))); context, std::chrono::seconds(keepalive_ttl)));
keepalive_timer_->async_wait([this](const boost::system::error_code& error) { keepalive_timer_->async_wait([this](const boost::system::error_code& error) {
if (error) { if (error) {
#ifndef NDEBUG #ifndef NDEBUG
std::cerr << "keepalive timer error: " << error << ", " << error.message() << std::endl; std::cerr << "keepalive timer cancelled: " << error << ", " << error.message() << std::endl;
#endif #endif
} else { } else {
if (this->continue_next) { if (this->continue_next.load()) {
auto resp = this->stubs->call->Refresh(); auto resp = this->stubs->call->Refresh();
if (!resp.is_ok()) { if (!resp.is_ok()) {
throw std::runtime_error("Failed to refresh lease: error code: " + std::to_string(resp.error_code()) + throw std::runtime_error("Failed to refresh lease: error code: " + std::to_string(resp.error_code()) +

View File

@ -93,21 +93,26 @@ etcd::Watcher::Watcher(std::string const & address,
etcd::Watcher::~Watcher() etcd::Watcher::~Watcher()
{ {
stubs->call->CancelWatch(); stubs->call->CancelWatch();
currentTask.wait(); if (task_.joinable()) {
task_.join();
}
} }
bool etcd::Watcher::Wait() bool etcd::Watcher::Wait()
{ {
currentTask.wait(); if (task_.joinable()) {
task_.join();
}
return stubs->call->Cancelled(); return stubs->call->Cancelled();
} }
void etcd::Watcher::Wait(std::function<void(bool)> callback) void etcd::Watcher::Wait(std::function<void(bool)> callback)
{ {
currentTask.then([this, callback](pplx::task<void> const & resp_task) { if (wait_callback == nullptr) {
resp_task.wait(); wait_callback = callback;
callback(this->stubs->call->Cancelled()); } else {
}); std::cerr << "Failed to set a asynchronous wait callback since it has already been set" << std::endl;
}
} }
void etcd::Watcher::Cancel() void etcd::Watcher::Cancel()
@ -133,8 +138,10 @@ void etcd::Watcher::doWatch(std::string const & key,
stubs->call.reset(new etcdv3::AsyncWatchAction(params)); stubs->call.reset(new etcdv3::AsyncWatchAction(params));
currentTask = pplx::task<void>([this, callback]() task_ = std::thread([this, callback]() {
{ stubs->call->waitForResponse(callback);
return stubs->call->waitForResponse(callback); if (wait_callback != nullptr) {
wait_callback(stubs->call->Cancelled());
}
}); });
} }

View File

@ -3,6 +3,7 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <cstdlib>
#include <iostream> #include <iostream>
#include <thread> #include <thread>
@ -108,7 +109,7 @@ TEST_CASE("lock using lease")
keepalive.Check(); // shouldn't throw keepalive.Check(); // shouldn't throw
// lock // lock
etcd::Response resp1 = etcd.lock("/test/abcd", lease_id).get(); etcd::Response resp1 = etcd.lock_with_lease("/test/abcd", lease_id).get();
CHECK("lock" == resp1.action()); CHECK("lock" == resp1.action());
REQUIRE(resp1.is_ok()); REQUIRE(resp1.is_ok());
REQUIRE(0 == resp1.error_code()); REQUIRE(0 == resp1.error_code());
@ -141,7 +142,7 @@ TEST_CASE("lock using lease")
keepalive.Check(); // shouldn't throw keepalive.Check(); // shouldn't throw
// lock // lock
etcd::Response resp1 = etcd.lock("/test/abcd", lease_id).get(); etcd::Response resp1 = etcd.lock_with_lease("/test/abcd", lease_id).get();
CHECK("lock" == resp1.action()); CHECK("lock" == resp1.action());
REQUIRE(resp1.is_ok()); REQUIRE(resp1.is_ok());
REQUIRE(0 == resp1.error_code()); REQUIRE(0 == resp1.error_code());
@ -164,3 +165,32 @@ TEST_CASE("lock using lease")
keepalive.Check(); // shouldn't throw keepalive.Check(); // shouldn't throw
} }
} }
TEST_CASE("concurrent lock & unlock")
{
etcd::Client etcd("http://127.0.0.1:2379");
std::string const lock_key = "/test/test_key";
constexpr size_t trials = 128;
std::function<void(std::string const &, const size_t)> locker = [&etcd](std::string const &key, const size_t index) {
std::cout << "start lock for " << index << std::endl;
auto resp = etcd.lock(key).get();
std::cout << "lock for " << index << " is ok, start sleep: ..." << resp.error_message() << std::endl;
REQUIRE(resp.is_ok());
std::srand(index);
size_t time_to_sleep = 1;
std::this_thread::sleep_for(std::chrono::seconds(time_to_sleep));
REQUIRE(etcd.unlock(resp.lock_key()).get().is_ok());
std::cout << "thread " << index << " been unlocked" << std::endl;
};
std::vector<std::thread> locks(trials);
for (size_t index = 0; index < trials; ++index) {
locks[index] = std::thread(locker, lock_key, index);
}
for (size_t index = 0; index < trials; ++index) {
locks[index].join();
}
}