Fixes lock, and the underlying keepalive implmentation. (#60)
* Fixes lock, and the underlying keepalive implmentation. The previous implementation is buggy when a lot of locks happen at the same time, as the cpprestsdk's threadpool use a fixed number of thread for posix platform: https://github.com/microsoft/cpprestsdk/blob/master/Release/src/pplx/threadpool.cpp#L198 Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
parent
bb22ef4d7f
commit
9e77fdb2ee
|
|
@ -421,7 +421,7 @@ Users can also feed their own lease directory for lock:
|
||||||
|
|
||||||
```c++
|
```c++
|
||||||
etcd::Client etcd("http://127.0.0.1:4001");
|
etcd::Client etcd("http://127.0.0.1:4001");
|
||||||
etcd.lock("/test/lock", lease_id);
|
etcd.lock_with_lease("/test/lock", lease_id);
|
||||||
```
|
```
|
||||||
|
|
||||||
### Watching for changes
|
### Watching for changes
|
||||||
|
|
@ -510,6 +510,13 @@ The lease can be revoked by
|
||||||
etcd.leaserevoke(resp.value().lease());
|
etcd.leaserevoke(resp.value().lease());
|
||||||
```
|
```
|
||||||
|
|
||||||
|
A lease can also be attached with a `KeepAlive` object at the creation time,
|
||||||
|
|
||||||
|
```c++
|
||||||
|
std::shared_ptr<etcd::KeepAlive> keepalive = etcd.leasekeepalive(60).get();
|
||||||
|
std::cout << "lease id: " << keepalive->Lease();
|
||||||
|
```
|
||||||
|
|
||||||
The remaining time-to-live of a lease can be inspected by
|
The remaining time-to-live of a lease can be inspected by
|
||||||
|
|
||||||
```c++
|
```c++
|
||||||
|
|
|
||||||
|
|
@ -327,6 +327,12 @@ namespace etcd
|
||||||
*/
|
*/
|
||||||
pplx::task<Response> leasegrant(int ttl);
|
pplx::task<Response> leasegrant(int ttl);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Grants a lease.
|
||||||
|
* @param ttl is the time to live of the lease
|
||||||
|
*/
|
||||||
|
pplx::task<std::shared_ptr<KeepAlive>> leasekeepalive(int ttl);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Revoke a lease.
|
* Revoke a lease.
|
||||||
* @param lease_id is the id the lease
|
* @param lease_id is the id the lease
|
||||||
|
|
@ -359,7 +365,7 @@ namespace etcd
|
||||||
* of by the library.
|
* of by the library.
|
||||||
* @param key is the key to be used to request the lock.
|
* @param key is the key to be used to request the lock.
|
||||||
*/
|
*/
|
||||||
pplx::task<Response> lock(std::string const &key, int64_t lease_id);
|
pplx::task<Response> lock_with_lease(std::string const &key, int64_t lease_id);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Releases a lock at a key.
|
* Releases a lock at a key.
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,11 @@
|
||||||
#ifndef __ETCD_KEEPALIVE_HPP__
|
#ifndef __ETCD_KEEPALIVE_HPP__
|
||||||
#define __ETCD_KEEPALIVE_HPP__
|
#define __ETCD_KEEPALIVE_HPP__
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
#include <exception>
|
#include <exception>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
#include "etcd/Client.hpp"
|
#include "etcd/Client.hpp"
|
||||||
#include "etcd/Response.hpp"
|
#include "etcd/Response.hpp"
|
||||||
|
|
@ -19,7 +21,7 @@
|
||||||
namespace etcd
|
namespace etcd
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* If ID is set to 0, etcd will choose an ID.
|
* If ID is set to 0, the library will choose an ID, and can be accessed from ".Lease()".
|
||||||
*/
|
*/
|
||||||
class KeepAlive
|
class KeepAlive
|
||||||
{
|
{
|
||||||
|
|
@ -43,10 +45,11 @@ namespace etcd
|
||||||
std::function<void (std::exception_ptr)> const &handler,
|
std::function<void (std::exception_ptr)> const &handler,
|
||||||
int ttl, int64_t lease_id=0);
|
int ttl, int64_t lease_id=0);
|
||||||
|
|
||||||
|
|
||||||
KeepAlive(KeepAlive const &) = delete;
|
KeepAlive(KeepAlive const &) = delete;
|
||||||
KeepAlive(KeepAlive &&) = delete;
|
KeepAlive(KeepAlive &&) = delete;
|
||||||
|
|
||||||
|
int64_t Lease() const { return lease_id; }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop the keep alive action.
|
* Stop the keep alive action.
|
||||||
*/
|
*/
|
||||||
|
|
@ -64,7 +67,6 @@ namespace etcd
|
||||||
protected:
|
protected:
|
||||||
void refresh();
|
void refresh();
|
||||||
|
|
||||||
pplx::task<void> currentTask;
|
|
||||||
struct EtcdServerStubs;
|
struct EtcdServerStubs;
|
||||||
struct EtcdServerStubsDeleter {
|
struct EtcdServerStubsDeleter {
|
||||||
void operator()(EtcdServerStubs *stubs);
|
void operator()(EtcdServerStubs *stubs);
|
||||||
|
|
@ -76,9 +78,13 @@ namespace etcd
|
||||||
std::exception_ptr eptr_;
|
std::exception_ptr eptr_;
|
||||||
std::function<void (std::exception_ptr)> handler_;
|
std::function<void (std::exception_ptr)> handler_;
|
||||||
|
|
||||||
|
// Don't use `pplx::task` to avoid sharing thread pool with other actions on the client
|
||||||
|
// to avoid any potential blocking, which may block the keepalive loop and evict the lease.
|
||||||
|
std::thread task_;
|
||||||
|
|
||||||
int ttl;
|
int ttl;
|
||||||
int64_t lease_id;
|
int64_t lease_id;
|
||||||
bool continue_next;
|
std::atomic_bool continue_next;
|
||||||
#if BOOST_VERSION >= 106600
|
#if BOOST_VERSION >= 106600
|
||||||
boost::asio::io_context context;
|
boost::asio::io_context context;
|
||||||
#else
|
#else
|
||||||
|
|
|
||||||
|
|
@ -32,20 +32,42 @@ namespace etcd
|
||||||
{
|
{
|
||||||
return pplx::task<etcd::Response>([call]()
|
return pplx::task<etcd::Response>([call]()
|
||||||
{
|
{
|
||||||
etcd::Response resp;
|
|
||||||
|
|
||||||
call->waitForResponse();
|
call->waitForResponse();
|
||||||
|
|
||||||
auto v3resp = call->ParseResponse();
|
auto v3resp = call->ParseResponse();
|
||||||
|
|
||||||
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
|
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
|
||||||
std::chrono::high_resolution_clock::now() - call->startTimepoint());
|
std::chrono::high_resolution_clock::now() - call->startTimepoint());
|
||||||
resp = etcd::Response(v3resp, duration);
|
return etcd::Response(v3resp, duration);
|
||||||
|
|
||||||
return resp;
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
static pplx::task<etcd::Response> create(std::function<std::shared_ptr<T>()> callfn)
|
||||||
|
{
|
||||||
|
return pplx::task<etcd::Response>([callfn]()
|
||||||
|
{
|
||||||
|
auto call = callfn();
|
||||||
|
|
||||||
|
call->waitForResponse();
|
||||||
|
auto v3resp = call->ParseResponse();
|
||||||
|
|
||||||
|
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
|
||||||
|
std::chrono::high_resolution_clock::now() - call->startTimepoint());
|
||||||
|
return etcd::Response(v3resp, duration);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
static etcd::Response create_sync(std::shared_ptr<T> call)
|
||||||
|
{
|
||||||
|
call->waitForResponse();
|
||||||
|
auto v3resp = call->ParseResponse();
|
||||||
|
|
||||||
|
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
|
||||||
|
std::chrono::high_resolution_clock::now() - call->startTimepoint());
|
||||||
|
return etcd::Response(v3resp, duration);
|
||||||
|
}
|
||||||
|
|
||||||
Response();
|
Response();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,10 @@
|
||||||
#ifndef __ETCD_WATCHER_HPP__
|
#ifndef __ETCD_WATCHER_HPP__
|
||||||
#define __ETCD_WATCHER_HPP__
|
#define __ETCD_WATCHER_HPP__
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
#include <functional>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
#include "etcd/Client.hpp"
|
#include "etcd/Client.hpp"
|
||||||
#include "etcd/Response.hpp"
|
#include "etcd/Response.hpp"
|
||||||
|
|
@ -81,7 +84,11 @@ namespace etcd
|
||||||
|
|
||||||
int index;
|
int index;
|
||||||
std::function<void(Response)> callback;
|
std::function<void(Response)> callback;
|
||||||
pplx::task<void> currentTask;
|
std::function<void(bool)> wait_callback;
|
||||||
|
|
||||||
|
// Don't use `pplx::task` to avoid sharing thread pool with other actions on the client
|
||||||
|
// to avoid any potential blocking, which may block the keepalive loop and evict the lease.
|
||||||
|
std::thread task_;
|
||||||
|
|
||||||
struct EtcdServerStubs;
|
struct EtcdServerStubs;
|
||||||
struct EtcdServerStubsDeleter {
|
struct EtcdServerStubsDeleter {
|
||||||
|
|
@ -92,6 +99,7 @@ namespace etcd
|
||||||
private:
|
private:
|
||||||
int fromIndex;
|
int fromIndex;
|
||||||
bool recursive;
|
bool recursive;
|
||||||
|
std::atomic_bool cancelled;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
#ifndef __ASYNC_WATCHACTION_HPP__
|
#ifndef __ASYNC_WATCHACTION_HPP__
|
||||||
#define __ASYNC_WATCHACTION_HPP__
|
#define __ASYNC_WATCHACTION_HPP__
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
|
|
||||||
#include <grpc++/grpc++.h>
|
#include <grpc++/grpc++.h>
|
||||||
|
|
@ -28,7 +29,7 @@ namespace etcdv3
|
||||||
private:
|
private:
|
||||||
WatchResponse reply;
|
WatchResponse reply;
|
||||||
std::unique_ptr<ClientAsyncReaderWriter<WatchRequest,WatchResponse>> stream;
|
std::unique_ptr<ClientAsyncReaderWriter<WatchRequest,WatchResponse>> stream;
|
||||||
bool isCancelled;
|
std::atomic_bool isCancelled;
|
||||||
std::mutex protect_is_cancalled;
|
std::mutex protect_is_cancalled;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -10,11 +10,13 @@
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#include <chrono>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <fstream>
|
#include <fstream>
|
||||||
#include <sstream>
|
|
||||||
#include <limits>
|
#include <limits>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
#include <sstream>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
#include <boost/algorithm/string.hpp>
|
#include <boost/algorithm/string.hpp>
|
||||||
|
|
||||||
|
|
@ -641,12 +643,30 @@ pplx::task<etcd::Response> etcd::Client::watch(std::string const & key, std::str
|
||||||
|
|
||||||
pplx::task<etcd::Response> etcd::Client::leasegrant(int ttl)
|
pplx::task<etcd::Response> etcd::Client::leasegrant(int ttl)
|
||||||
{
|
{
|
||||||
|
// lease grant is special, that we are expected the callback could be invoked
|
||||||
|
// immediately after the lease is granted by the server.
|
||||||
|
return Response::create<etcdv3::AsyncLeaseGrantAction>([this, ttl]() {
|
||||||
etcdv3::ActionParameters params;
|
etcdv3::ActionParameters params;
|
||||||
params.auth_token.assign(this->auth_token);
|
params.auth_token.assign(this->auth_token);
|
||||||
params.ttl = ttl;
|
params.ttl = ttl;
|
||||||
params.lease_stub = stubs->leaseServiceStub.get();
|
params.lease_stub = stubs->leaseServiceStub.get();
|
||||||
std::shared_ptr<etcdv3::AsyncLeaseGrantAction> call(new etcdv3::AsyncLeaseGrantAction(params));
|
return std::make_shared<etcdv3::AsyncLeaseGrantAction>(params);
|
||||||
return Response::create(call);
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
pplx::task<std::shared_ptr<etcd::KeepAlive>> etcd::Client::leasekeepalive(int ttl) {
|
||||||
|
return pplx::task<std::shared_ptr<etcd::KeepAlive>>([this, ttl]()
|
||||||
|
{
|
||||||
|
etcdv3::ActionParameters params;
|
||||||
|
params.auth_token.assign(this->auth_token);
|
||||||
|
params.ttl = ttl;
|
||||||
|
params.lease_stub = stubs->leaseServiceStub.get();
|
||||||
|
auto call = std::make_shared<etcdv3::AsyncLeaseGrantAction>(params);
|
||||||
|
|
||||||
|
call->waitForResponse();
|
||||||
|
auto v3resp = call->ParseResponse();
|
||||||
|
return std::make_shared<KeepAlive>(*this, ttl, v3resp.get_value().kvs.lease());
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
pplx::task<etcd::Response> etcd::Client::leaserevoke(int64_t lease_id)
|
pplx::task<etcd::Response> etcd::Client::leaserevoke(int64_t lease_id)
|
||||||
|
|
@ -678,37 +698,37 @@ pplx::task<etcd::Response> etcd::Client::lock(std::string const &key) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pplx::task<etcd::Response> etcd::Client::lock(std::string const &key, int lease_ttl) {
|
pplx::task<etcd::Response> etcd::Client::lock(std::string const &key, int lease_ttl) {
|
||||||
etcdv3::ActionParameters params;
|
return this->leasekeepalive(lease_ttl).then([this, key](
|
||||||
params.auth_token.assign(this->auth_token);
|
pplx::task<std::shared_ptr<etcd::KeepAlive>> const& resp_task) {
|
||||||
|
auto const &keepalive = resp_task.get();
|
||||||
|
|
||||||
auto resp = this->leasegrant(lease_ttl).get();
|
int64_t lease_id = keepalive->Lease();
|
||||||
int64_t lease_id = resp.value().lease();
|
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lexical_scope_lock(mutex_for_keepalives);
|
std::lock_guard<std::mutex> lexical_scope_lock(mutex_for_keepalives);
|
||||||
this->keep_alive_for_locks[lease_id].reset(
|
this->keep_alive_for_locks[lease_id] = keepalive;
|
||||||
new KeepAlive(*this, lease_ttl, lease_id));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
etcdv3::ActionParameters params;
|
||||||
|
params.auth_token.assign(this->auth_token);
|
||||||
params.key = key;
|
params.key = key;
|
||||||
params.lease_id = lease_id;
|
params.lease_id = lease_id;
|
||||||
params.lock_stub = stubs->lockServiceStub.get();
|
params.lock_stub = stubs->lockServiceStub.get();
|
||||||
std::shared_ptr<etcdv3::AsyncLockAction> call(new etcdv3::AsyncLockAction(params));
|
std::shared_ptr<etcdv3::AsyncLockAction> call(new etcdv3::AsyncLockAction(params));
|
||||||
return Response::create(call).then(
|
|
||||||
[this, lease_id](pplx::task<etcd::Response> const &resp_task) -> etcd::Response {
|
auto lock_resp = Response::create_sync(call);
|
||||||
auto const& resp = resp_task.get();
|
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lexical_scope_lock(mutex_for_keepalives);
|
std::lock_guard<std::mutex> lexical_scope_lock(mutex_for_keepalives);
|
||||||
if (resp.is_ok()) {
|
if (lock_resp.is_ok()) {
|
||||||
this->leases_for_locks[resp.lock_key()] = lease_id;
|
this->leases_for_locks[lock_resp.lock_key()] = lease_id;
|
||||||
} else {
|
} else {
|
||||||
this->keep_alive_for_locks.erase(lease_id);
|
this->keep_alive_for_locks.erase(lease_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return resp;
|
return lock_resp;
|
||||||
}
|
});
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pplx::task<etcd::Response> etcd::Client::lock(std::string const &key,
|
pplx::task<etcd::Response> etcd::Client::lock_with_lease(std::string const &key,
|
||||||
int64_t lease_id) {
|
int64_t lease_id) {
|
||||||
etcdv3::ActionParameters params;
|
etcdv3::ActionParameters params;
|
||||||
params.auth_token.assign(this->auth_token);
|
params.auth_token.assign(this->auth_token);
|
||||||
|
|
@ -720,7 +740,14 @@ pplx::task<etcd::Response> etcd::Client::lock(std::string const &key,
|
||||||
}
|
}
|
||||||
|
|
||||||
pplx::task<etcd::Response> etcd::Client::unlock(std::string const &lock_key) {
|
pplx::task<etcd::Response> etcd::Client::unlock(std::string const &lock_key) {
|
||||||
// cancel the KeepAlive first, it exists
|
// issue a "unlock" request
|
||||||
|
etcdv3::ActionParameters params;
|
||||||
|
params.auth_token.assign(this->auth_token);
|
||||||
|
params.key = lock_key;
|
||||||
|
params.lock_stub = stubs->lockServiceStub.get();
|
||||||
|
std::shared_ptr<etcdv3::AsyncUnlockAction> call(new etcdv3::AsyncUnlockAction(params));
|
||||||
|
|
||||||
|
// cancel the KeepAlive first, if it exists
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lexical_scope_lock(mutex_for_keepalives);
|
std::lock_guard<std::mutex> lexical_scope_lock(mutex_for_keepalives);
|
||||||
auto p_leases = this->leases_for_locks.find(lock_key);
|
auto p_leases = this->leases_for_locks.find(lock_key);
|
||||||
|
|
@ -728,17 +755,20 @@ pplx::task<etcd::Response> etcd::Client::unlock(std::string const &lock_key) {
|
||||||
auto p_keeps_alive = this->keep_alive_for_locks.find(p_leases->second);
|
auto p_keeps_alive = this->keep_alive_for_locks.find(p_leases->second);
|
||||||
if (p_keeps_alive != this->keep_alive_for_locks.end()) {
|
if (p_keeps_alive != this->keep_alive_for_locks.end()) {
|
||||||
this->keep_alive_for_locks.erase(p_keeps_alive);
|
this->keep_alive_for_locks.erase(p_keeps_alive);
|
||||||
|
} else {
|
||||||
|
#if !defined(NDEBUG)
|
||||||
|
std::cerr << "Keepalive for lease not found" << std::endl;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
this->leases_for_locks.erase(p_leases);
|
this->leases_for_locks.erase(p_leases);
|
||||||
|
} else {
|
||||||
|
#if !defined(NDEBUG)
|
||||||
|
std::cerr << "Lease for lock not found" << std::endl;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// issue a "unlock" request
|
// wait in the io_context loop.
|
||||||
etcdv3::ActionParameters params;
|
|
||||||
params.auth_token.assign(this->auth_token);
|
|
||||||
params.key = lock_key;
|
|
||||||
params.lock_stub = stubs->lockServiceStub.get();
|
|
||||||
std::shared_ptr<etcdv3::AsyncUnlockAction> call(new etcdv3::AsyncUnlockAction(params));
|
|
||||||
return Response::create(call);
|
return Response::create(call);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -31,8 +31,10 @@ etcd::KeepAlive::KeepAlive(Client const &client, int ttl, int64_t lease_id):
|
||||||
params.lease_id = this->lease_id;
|
params.lease_id = this->lease_id;
|
||||||
params.lease_stub = stubs->leaseServiceStub.get();
|
params.lease_stub = stubs->leaseServiceStub.get();
|
||||||
|
|
||||||
|
continue_next.store(true);
|
||||||
|
|
||||||
stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params));
|
stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params));
|
||||||
currentTask = pplx::task<void>([this]() {
|
task_ = std::thread([this]() {
|
||||||
try {
|
try {
|
||||||
// start refresh
|
// start refresh
|
||||||
this->refresh();
|
this->refresh();
|
||||||
|
|
@ -67,7 +69,7 @@ etcd::KeepAlive::KeepAlive(Client const &client,
|
||||||
params.lease_stub = stubs->leaseServiceStub.get();
|
params.lease_stub = stubs->leaseServiceStub.get();
|
||||||
|
|
||||||
stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params));
|
stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params));
|
||||||
currentTask = pplx::task<void>([this]() {
|
task_ = std::thread([this]() {
|
||||||
try {
|
try {
|
||||||
// start refresh
|
// start refresh
|
||||||
this->refresh();
|
this->refresh();
|
||||||
|
|
@ -78,8 +80,8 @@ etcd::KeepAlive::KeepAlive(Client const &client,
|
||||||
} else {
|
} else {
|
||||||
eptr_ = std::current_exception();
|
eptr_ = std::current_exception();
|
||||||
}
|
}
|
||||||
|
this->Cancel();
|
||||||
}
|
}
|
||||||
context.stop(); // clean up
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -103,23 +105,19 @@ etcd::KeepAlive::~KeepAlive()
|
||||||
|
|
||||||
void etcd::KeepAlive::Cancel()
|
void etcd::KeepAlive::Cancel()
|
||||||
{
|
{
|
||||||
if (!continue_next) {
|
if (!continue_next.exchange(false)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
continue_next = false;
|
|
||||||
#ifndef NDEBUG
|
|
||||||
{
|
|
||||||
std::ios::fmtflags os_flags (std::cout.flags());
|
|
||||||
std::cout << "Cancel keepalive for " << lease_id
|
|
||||||
<< "(" << std::hex << lease_id << ")" << std::endl;
|
|
||||||
std::cout.flags(os_flags);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
stubs->call->CancelKeepAlive();
|
stubs->call->CancelKeepAlive();
|
||||||
if (keepalive_timer_) {
|
if (keepalive_timer_) {
|
||||||
keepalive_timer_->cancel();
|
keepalive_timer_->cancel();
|
||||||
}
|
}
|
||||||
currentTask.wait();
|
|
||||||
|
// clean up
|
||||||
|
context.stop();
|
||||||
|
if (task_.joinable()) {
|
||||||
|
task_.join();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void etcd::KeepAlive::Check() {
|
void etcd::KeepAlive::Check() {
|
||||||
|
|
@ -130,29 +128,20 @@ void etcd::KeepAlive::Check() {
|
||||||
|
|
||||||
void etcd::KeepAlive::refresh()
|
void etcd::KeepAlive::refresh()
|
||||||
{
|
{
|
||||||
if (!continue_next) {
|
if (!continue_next.load()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// minimal resolution: 1 second
|
// minimal resolution: 1 second
|
||||||
int keepalive_ttl = std::max(ttl - 1, 1);
|
int keepalive_ttl = std::max(ttl - 1, 1);
|
||||||
#ifndef NDEBUG
|
|
||||||
{
|
|
||||||
std::ios::fmtflags os_flags (std::cout.flags());
|
|
||||||
std::cout << "Trigger the next keepalive round with ttl " << keepalive_ttl
|
|
||||||
<< " for " << lease_id
|
|
||||||
<< "(" << std::hex << lease_id << ")" << std::endl;
|
|
||||||
std::cout.flags(os_flags);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
keepalive_timer_.reset(new boost::asio::steady_timer(
|
keepalive_timer_.reset(new boost::asio::steady_timer(
|
||||||
context, std::chrono::seconds(keepalive_ttl)));
|
context, std::chrono::seconds(keepalive_ttl)));
|
||||||
keepalive_timer_->async_wait([this](const boost::system::error_code& error) {
|
keepalive_timer_->async_wait([this](const boost::system::error_code& error) {
|
||||||
if (error) {
|
if (error) {
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
std::cerr << "keepalive timer error: " << error << ", " << error.message() << std::endl;
|
std::cerr << "keepalive timer cancelled: " << error << ", " << error.message() << std::endl;
|
||||||
#endif
|
#endif
|
||||||
} else {
|
} else {
|
||||||
if (this->continue_next) {
|
if (this->continue_next.load()) {
|
||||||
auto resp = this->stubs->call->Refresh();
|
auto resp = this->stubs->call->Refresh();
|
||||||
if (!resp.is_ok()) {
|
if (!resp.is_ok()) {
|
||||||
throw std::runtime_error("Failed to refresh lease: error code: " + std::to_string(resp.error_code()) +
|
throw std::runtime_error("Failed to refresh lease: error code: " + std::to_string(resp.error_code()) +
|
||||||
|
|
|
||||||
|
|
@ -92,22 +92,26 @@ etcd::Watcher::Watcher(std::string const & address,
|
||||||
|
|
||||||
etcd::Watcher::~Watcher()
|
etcd::Watcher::~Watcher()
|
||||||
{
|
{
|
||||||
stubs->call->CancelWatch();
|
this->Cancel();
|
||||||
currentTask.wait();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool etcd::Watcher::Wait()
|
bool etcd::Watcher::Wait()
|
||||||
{
|
{
|
||||||
currentTask.wait();
|
if (!cancelled.exchange(true)) {
|
||||||
|
if (task_.joinable()) {
|
||||||
|
task_.join();
|
||||||
|
}
|
||||||
|
}
|
||||||
return stubs->call->Cancelled();
|
return stubs->call->Cancelled();
|
||||||
}
|
}
|
||||||
|
|
||||||
void etcd::Watcher::Wait(std::function<void(bool)> callback)
|
void etcd::Watcher::Wait(std::function<void(bool)> callback)
|
||||||
{
|
{
|
||||||
currentTask.then([this, callback](pplx::task<void> const & resp_task) {
|
if (wait_callback == nullptr) {
|
||||||
resp_task.wait();
|
wait_callback = callback;
|
||||||
callback(this->stubs->call->Cancelled());
|
} else {
|
||||||
});
|
std::cerr << "Failed to set a asynchronous wait callback since it has already been set" << std::endl;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void etcd::Watcher::Cancel()
|
void etcd::Watcher::Cancel()
|
||||||
|
|
@ -133,8 +137,11 @@ void etcd::Watcher::doWatch(std::string const & key,
|
||||||
|
|
||||||
stubs->call.reset(new etcdv3::AsyncWatchAction(params));
|
stubs->call.reset(new etcdv3::AsyncWatchAction(params));
|
||||||
|
|
||||||
currentTask = pplx::task<void>([this, callback]()
|
task_ = std::thread([this, callback]() {
|
||||||
{
|
stubs->call->waitForResponse(callback);
|
||||||
return stubs->call->waitForResponse(callback);
|
if (wait_callback != nullptr) {
|
||||||
});
|
wait_callback(stubs->call->Cancelled());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
cancelled.store(false);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ using etcdserverpb::WatchCreateRequest;
|
||||||
etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters param)
|
etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters param)
|
||||||
: etcdv3::Action(param)
|
: etcdv3::Action(param)
|
||||||
{
|
{
|
||||||
isCancelled = false;
|
isCancelled.store(false);
|
||||||
stream = parameters.watch_stub->AsyncWatch(&context,&cq_,(void*)"create");
|
stream = parameters.watch_stub->AsyncWatch(&context,&cq_,(void*)"create");
|
||||||
|
|
||||||
WatchRequest watch_req;
|
WatchRequest watch_req;
|
||||||
|
|
@ -61,14 +61,14 @@ void etcdv3::AsyncWatchAction::waitForResponse()
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if(got_tag == (void*)"writes done") {
|
if(got_tag == (void*)"writes done") {
|
||||||
isCancelled = true;
|
isCancelled.store(true);
|
||||||
cq_.Shutdown();
|
cq_.Shutdown();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if(got_tag == (void*)this) // read tag
|
if(got_tag == (void*)this) // read tag
|
||||||
{
|
{
|
||||||
if (reply.canceled()) {
|
if (reply.canceled()) {
|
||||||
isCancelled = true;
|
isCancelled.store(true);
|
||||||
cq_.Shutdown();
|
cq_.Shutdown();
|
||||||
}
|
}
|
||||||
else if ((reply.created() && reply.header().revision() < parameters.revision) ||
|
else if ((reply.created() && reply.header().revision() < parameters.revision) ||
|
||||||
|
|
@ -77,7 +77,7 @@ void etcdv3::AsyncWatchAction::waitForResponse()
|
||||||
//
|
//
|
||||||
// 1. watch for a future revision, return immediately with empty events set
|
// 1. watch for a future revision, return immediately with empty events set
|
||||||
// 2. receive any effective events.
|
// 2. receive any effective events.
|
||||||
isCancelled = true;
|
isCancelled.store(true);
|
||||||
stream->WritesDone((void*)"writes done");
|
stream->WritesDone((void*)"writes done");
|
||||||
grpc::Status status;
|
grpc::Status status;
|
||||||
stream->Finish(&status, (void *)this);
|
stream->Finish(&status, (void *)this);
|
||||||
|
|
@ -100,9 +100,7 @@ void etcdv3::AsyncWatchAction::waitForResponse()
|
||||||
void etcdv3::AsyncWatchAction::CancelWatch()
|
void etcdv3::AsyncWatchAction::CancelWatch()
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> scope_lock(this->protect_is_cancalled);
|
std::lock_guard<std::mutex> scope_lock(this->protect_is_cancalled);
|
||||||
if(isCancelled == false)
|
if (!isCancelled.exchange(true)) {
|
||||||
{
|
|
||||||
isCancelled = true;
|
|
||||||
stream->WritesDone((void*)"writes done");
|
stream->WritesDone((void*)"writes done");
|
||||||
grpc::Status status;
|
grpc::Status status;
|
||||||
stream->Finish(&status, (void *)this);
|
stream->Finish(&status, (void *)this);
|
||||||
|
|
@ -111,7 +109,7 @@ void etcdv3::AsyncWatchAction::CancelWatch()
|
||||||
}
|
}
|
||||||
|
|
||||||
bool etcdv3::AsyncWatchAction::Cancelled() const {
|
bool etcdv3::AsyncWatchAction::Cancelled() const {
|
||||||
return isCancelled;
|
return isCancelled.load();
|
||||||
}
|
}
|
||||||
|
|
||||||
void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response)> callback)
|
void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response)> callback)
|
||||||
|
|
@ -127,14 +125,14 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response
|
||||||
}
|
}
|
||||||
if(got_tag == (void*)"writes done")
|
if(got_tag == (void*)"writes done")
|
||||||
{
|
{
|
||||||
isCancelled = true;
|
isCancelled.store(true);
|
||||||
cq_.Shutdown();
|
cq_.Shutdown();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
else if(got_tag == (void*)this) // read tag
|
else if(got_tag == (void*)this) // read tag
|
||||||
{
|
{
|
||||||
if (reply.canceled()) {
|
if (reply.canceled()) {
|
||||||
isCancelled = true;
|
isCancelled.store(true);
|
||||||
cq_.Shutdown();
|
cq_.Shutdown();
|
||||||
if (reply.compact_revision() != 0) {
|
if (reply.compact_revision() != 0) {
|
||||||
callback(etcd::Response(grpc::StatusCode::OUT_OF_RANGE /* error code */,
|
callback(etcd::Response(grpc::StatusCode::OUT_OF_RANGE /* error code */,
|
||||||
|
|
@ -159,7 +157,6 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response
|
||||||
|
|
||||||
etcdv3::AsyncWatchResponse etcdv3::AsyncWatchAction::ParseResponse()
|
etcdv3::AsyncWatchResponse etcdv3::AsyncWatchAction::ParseResponse()
|
||||||
{
|
{
|
||||||
|
|
||||||
AsyncWatchResponse watch_resp;
|
AsyncWatchResponse watch_resp;
|
||||||
if(!status.ok())
|
if(!status.ok())
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@
|
||||||
|
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
|
#include <cstdlib>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
|
||||||
|
|
@ -108,7 +109,7 @@ TEST_CASE("lock using lease")
|
||||||
keepalive.Check(); // shouldn't throw
|
keepalive.Check(); // shouldn't throw
|
||||||
|
|
||||||
// lock
|
// lock
|
||||||
etcd::Response resp1 = etcd.lock("/test/abcd", lease_id).get();
|
etcd::Response resp1 = etcd.lock_with_lease("/test/abcd", lease_id).get();
|
||||||
CHECK("lock" == resp1.action());
|
CHECK("lock" == resp1.action());
|
||||||
REQUIRE(resp1.is_ok());
|
REQUIRE(resp1.is_ok());
|
||||||
REQUIRE(0 == resp1.error_code());
|
REQUIRE(0 == resp1.error_code());
|
||||||
|
|
@ -141,7 +142,7 @@ TEST_CASE("lock using lease")
|
||||||
keepalive.Check(); // shouldn't throw
|
keepalive.Check(); // shouldn't throw
|
||||||
|
|
||||||
// lock
|
// lock
|
||||||
etcd::Response resp1 = etcd.lock("/test/abcd", lease_id).get();
|
etcd::Response resp1 = etcd.lock_with_lease("/test/abcd", lease_id).get();
|
||||||
CHECK("lock" == resp1.action());
|
CHECK("lock" == resp1.action());
|
||||||
REQUIRE(resp1.is_ok());
|
REQUIRE(resp1.is_ok());
|
||||||
REQUIRE(0 == resp1.error_code());
|
REQUIRE(0 == resp1.error_code());
|
||||||
|
|
@ -164,3 +165,32 @@ TEST_CASE("lock using lease")
|
||||||
keepalive.Check(); // shouldn't throw
|
keepalive.Check(); // shouldn't throw
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_CASE("concurrent lock & unlock")
|
||||||
|
{
|
||||||
|
etcd::Client etcd("http://127.0.0.1:2379");
|
||||||
|
std::string const lock_key = "/test/test_key";
|
||||||
|
|
||||||
|
constexpr size_t trials = 128;
|
||||||
|
|
||||||
|
std::function<void(std::string const &, const size_t)> locker = [&etcd](std::string const &key, const size_t index) {
|
||||||
|
std::cout << "start lock for " << index << std::endl;
|
||||||
|
auto resp = etcd.lock(key).get();
|
||||||
|
std::cout << "lock for " << index << " is ok, start sleep: ..." << resp.error_message() << std::endl;
|
||||||
|
REQUIRE(resp.is_ok());
|
||||||
|
std::srand(index);
|
||||||
|
size_t time_to_sleep = 1;
|
||||||
|
std::this_thread::sleep_for(std::chrono::seconds(time_to_sleep));
|
||||||
|
REQUIRE(etcd.unlock(resp.lock_key()).get().is_ok());
|
||||||
|
std::cout << "thread " << index << " been unlocked" << std::endl;
|
||||||
|
};
|
||||||
|
|
||||||
|
std::vector<std::thread> locks(trials);
|
||||||
|
for (size_t index = 0; index < trials; ++index) {
|
||||||
|
locks[index] = std::thread(locker, lock_key, index);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (size_t index = 0; index < trials; ++index) {
|
||||||
|
locks[index].join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue