diff --git a/README.md b/README.md index d5d8baf..51db31b 100644 --- a/README.md +++ b/README.md @@ -421,7 +421,7 @@ Users can also feed their own lease directory for lock: ```c++ etcd::Client etcd("http://127.0.0.1:4001"); - etcd.lock("/test/lock", lease_id); + etcd.lock_with_lease("/test/lock", lease_id); ``` ### Watching for changes @@ -510,6 +510,13 @@ The lease can be revoked by etcd.leaserevoke(resp.value().lease()); ``` +A lease can also be attached with a `KeepAlive` object at the creation time, + +```c++ + std::shared_ptr keepalive = etcd.leasekeepalive(60).get(); + std::cout << "lease id: " << keepalive->Lease(); +``` + The remaining time-to-live of a lease can be inspected by ```c++ diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 9c1242f..4879ba7 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -327,6 +327,12 @@ namespace etcd */ pplx::task leasegrant(int ttl); + /** + * Grants a lease. + * @param ttl is the time to live of the lease + */ + pplx::task> leasekeepalive(int ttl); + /** * Revoke a lease. * @param lease_id is the id the lease @@ -359,7 +365,7 @@ namespace etcd * of by the library. * @param key is the key to be used to request the lock. */ - pplx::task lock(std::string const &key, int64_t lease_id); + pplx::task lock_with_lease(std::string const &key, int64_t lease_id); /** * Releases a lock at a key. diff --git a/etcd/KeepAlive.hpp b/etcd/KeepAlive.hpp index 9e1ca11..65fab55 100644 --- a/etcd/KeepAlive.hpp +++ b/etcd/KeepAlive.hpp @@ -1,9 +1,11 @@ #ifndef __ETCD_KEEPALIVE_HPP__ #define __ETCD_KEEPALIVE_HPP__ +#include #include #include #include +#include #include "etcd/Client.hpp" #include "etcd/Response.hpp" @@ -19,7 +21,7 @@ namespace etcd { /** - * If ID is set to 0, etcd will choose an ID. + * If ID is set to 0, the library will choose an ID, and can be accessed from ".Lease()". */ class KeepAlive { @@ -43,10 +45,11 @@ namespace etcd std::function const &handler, int ttl, int64_t lease_id=0); - KeepAlive(KeepAlive const &) = delete; KeepAlive(KeepAlive &&) = delete; + int64_t Lease() const { return lease_id; } + /** * Stop the keep alive action. */ @@ -64,7 +67,6 @@ namespace etcd protected: void refresh(); - pplx::task currentTask; struct EtcdServerStubs; struct EtcdServerStubsDeleter { void operator()(EtcdServerStubs *stubs); @@ -76,9 +78,13 @@ namespace etcd std::exception_ptr eptr_; std::function handler_; + // Don't use `pplx::task` to avoid sharing thread pool with other actions on the client + // to avoid any potential blocking, which may block the keepalive loop and evict the lease. + std::thread task_; + int ttl; int64_t lease_id; - bool continue_next; + std::atomic_bool continue_next; #if BOOST_VERSION >= 106600 boost::asio::io_context context; #else diff --git a/etcd/Response.hpp b/etcd/Response.hpp index e7833f4..a18a3bd 100644 --- a/etcd/Response.hpp +++ b/etcd/Response.hpp @@ -32,20 +32,42 @@ namespace etcd { return pplx::task([call]() { - etcd::Response resp; - call->waitForResponse(); - auto v3resp = call->ParseResponse(); - + auto duration = std::chrono::duration_cast( std::chrono::high_resolution_clock::now() - call->startTimepoint()); - resp = etcd::Response(v3resp, duration); - - return resp; + return etcd::Response(v3resp, duration); }); } + template + static pplx::task create(std::function()> callfn) + { + return pplx::task([callfn]() + { + auto call = callfn(); + + call->waitForResponse(); + auto v3resp = call->ParseResponse(); + + auto duration = std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - call->startTimepoint()); + return etcd::Response(v3resp, duration); + }); + } + + template + static etcd::Response create_sync(std::shared_ptr call) + { + call->waitForResponse(); + auto v3resp = call->ParseResponse(); + + auto duration = std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - call->startTimepoint()); + return etcd::Response(v3resp, duration); + } + Response(); /** diff --git a/etcd/Watcher.hpp b/etcd/Watcher.hpp index 0ed32a7..1c5300f 100644 --- a/etcd/Watcher.hpp +++ b/etcd/Watcher.hpp @@ -1,7 +1,10 @@ #ifndef __ETCD_WATCHER_HPP__ #define __ETCD_WATCHER_HPP__ +#include +#include #include +#include #include "etcd/Client.hpp" #include "etcd/Response.hpp" @@ -81,7 +84,11 @@ namespace etcd int index; std::function callback; - pplx::task currentTask; + std::function wait_callback; + + // Don't use `pplx::task` to avoid sharing thread pool with other actions on the client + // to avoid any potential blocking, which may block the keepalive loop and evict the lease. + std::thread task_; struct EtcdServerStubs; struct EtcdServerStubsDeleter { @@ -92,6 +99,7 @@ namespace etcd private: int fromIndex; bool recursive; + std::atomic_bool cancelled; }; } diff --git a/etcd/v3/AsyncWatchAction.hpp b/etcd/v3/AsyncWatchAction.hpp index f356653..7fc4dea 100644 --- a/etcd/v3/AsyncWatchAction.hpp +++ b/etcd/v3/AsyncWatchAction.hpp @@ -1,6 +1,7 @@ #ifndef __ASYNC_WATCHACTION_HPP__ #define __ASYNC_WATCHACTION_HPP__ +#include #include #include @@ -28,7 +29,7 @@ namespace etcdv3 private: WatchResponse reply; std::unique_ptr> stream; - bool isCancelled; + std::atomic_bool isCancelled; std::mutex protect_is_cancalled; }; } diff --git a/src/Client.cpp b/src/Client.cpp index 76c86cb..d182f8a 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -10,11 +10,13 @@ #include #endif +#include #include #include -#include #include #include +#include +#include #include @@ -641,12 +643,30 @@ pplx::task etcd::Client::watch(std::string const & key, std::str pplx::task etcd::Client::leasegrant(int ttl) { - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.ttl = ttl; - params.lease_stub = stubs->leaseServiceStub.get(); - std::shared_ptr call(new etcdv3::AsyncLeaseGrantAction(params)); - return Response::create(call); + // lease grant is special, that we are expected the callback could be invoked + // immediately after the lease is granted by the server. + return Response::create([this, ttl]() { + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.ttl = ttl; + params.lease_stub = stubs->leaseServiceStub.get(); + return std::make_shared(params); + }); +} + +pplx::task> etcd::Client::leasekeepalive(int ttl) { + return pplx::task>([this, ttl]() + { + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.ttl = ttl; + params.lease_stub = stubs->leaseServiceStub.get(); + auto call = std::make_shared(params); + + call->waitForResponse(); + auto v3resp = call->ParseResponse(); + return std::make_shared(*this, ttl, v3resp.get_value().kvs.lease()); + }); } pplx::task etcd::Client::leaserevoke(int64_t lease_id) @@ -678,37 +698,37 @@ pplx::task etcd::Client::lock(std::string const &key) { } pplx::task etcd::Client::lock(std::string const &key, int lease_ttl) { - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); + return this->leasekeepalive(lease_ttl).then([this, key]( + pplx::task> const& resp_task) { + auto const &keepalive = resp_task.get(); - auto resp = this->leasegrant(lease_ttl).get(); - int64_t lease_id = resp.value().lease(); - { - std::lock_guard lexical_scope_lock(mutex_for_keepalives); - this->keep_alive_for_locks[lease_id].reset( - new KeepAlive(*this, lease_ttl, lease_id)); - } - params.key = key; - params.lease_id = lease_id; - params.lock_stub = stubs->lockServiceStub.get(); - std::shared_ptr call(new etcdv3::AsyncLockAction(params)); - return Response::create(call).then( - [this, lease_id](pplx::task const &resp_task) -> etcd::Response { - auto const& resp = resp_task.get(); - { - std::lock_guard lexical_scope_lock(mutex_for_keepalives); - if (resp.is_ok()) { - this->leases_for_locks[resp.lock_key()] = lease_id; - } else { - this->keep_alive_for_locks.erase(lease_id); - } - } - return resp; + int64_t lease_id = keepalive->Lease(); + { + std::lock_guard lexical_scope_lock(mutex_for_keepalives); + this->keep_alive_for_locks[lease_id] = keepalive; } - ); + + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key = key; + params.lease_id = lease_id; + params.lock_stub = stubs->lockServiceStub.get(); + std::shared_ptr call(new etcdv3::AsyncLockAction(params)); + + auto lock_resp = Response::create_sync(call); + { + std::lock_guard lexical_scope_lock(mutex_for_keepalives); + if (lock_resp.is_ok()) { + this->leases_for_locks[lock_resp.lock_key()] = lease_id; + } else { + this->keep_alive_for_locks.erase(lease_id); + } + } + return lock_resp; + }); } -pplx::task etcd::Client::lock(std::string const &key, +pplx::task etcd::Client::lock_with_lease(std::string const &key, int64_t lease_id) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); @@ -720,7 +740,14 @@ pplx::task etcd::Client::lock(std::string const &key, } pplx::task etcd::Client::unlock(std::string const &lock_key) { - // cancel the KeepAlive first, it exists + // issue a "unlock" request + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key = lock_key; + params.lock_stub = stubs->lockServiceStub.get(); + std::shared_ptr call(new etcdv3::AsyncUnlockAction(params)); + + // cancel the KeepAlive first, if it exists { std::lock_guard lexical_scope_lock(mutex_for_keepalives); auto p_leases = this->leases_for_locks.find(lock_key); @@ -728,17 +755,20 @@ pplx::task etcd::Client::unlock(std::string const &lock_key) { auto p_keeps_alive = this->keep_alive_for_locks.find(p_leases->second); if (p_keeps_alive != this->keep_alive_for_locks.end()) { this->keep_alive_for_locks.erase(p_keeps_alive); + } else { +#if !defined(NDEBUG) + std::cerr << "Keepalive for lease not found" << std::endl; +#endif } this->leases_for_locks.erase(p_leases); + } else { +#if !defined(NDEBUG) + std::cerr << "Lease for lock not found" << std::endl; +#endif } } - // issue a "unlock" request - etcdv3::ActionParameters params; - params.auth_token.assign(this->auth_token); - params.key = lock_key; - params.lock_stub = stubs->lockServiceStub.get(); - std::shared_ptr call(new etcdv3::AsyncUnlockAction(params)); + // wait in the io_context loop. return Response::create(call); } diff --git a/src/KeepAlive.cpp b/src/KeepAlive.cpp index 2dd7560..a933edc 100644 --- a/src/KeepAlive.cpp +++ b/src/KeepAlive.cpp @@ -31,8 +31,10 @@ etcd::KeepAlive::KeepAlive(Client const &client, int ttl, int64_t lease_id): params.lease_id = this->lease_id; params.lease_stub = stubs->leaseServiceStub.get(); + continue_next.store(true); + stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params)); - currentTask = pplx::task([this]() { + task_ = std::thread([this]() { try { // start refresh this->refresh(); @@ -67,7 +69,7 @@ etcd::KeepAlive::KeepAlive(Client const &client, params.lease_stub = stubs->leaseServiceStub.get(); stubs->call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params)); - currentTask = pplx::task([this]() { + task_ = std::thread([this]() { try { // start refresh this->refresh(); @@ -78,8 +80,8 @@ etcd::KeepAlive::KeepAlive(Client const &client, } else { eptr_ = std::current_exception(); } + this->Cancel(); } - context.stop(); // clean up }); } @@ -103,23 +105,19 @@ etcd::KeepAlive::~KeepAlive() void etcd::KeepAlive::Cancel() { - if (!continue_next) { + if (!continue_next.exchange(false)) { return; } - continue_next = false; -#ifndef NDEBUG - { - std::ios::fmtflags os_flags (std::cout.flags()); - std::cout << "Cancel keepalive for " << lease_id - << "(" << std::hex << lease_id << ")" << std::endl; - std::cout.flags(os_flags); - } -#endif stubs->call->CancelKeepAlive(); if (keepalive_timer_) { keepalive_timer_->cancel(); } - currentTask.wait(); + + // clean up + context.stop(); + if (task_.joinable()) { + task_.join(); + } } void etcd::KeepAlive::Check() { @@ -130,29 +128,20 @@ void etcd::KeepAlive::Check() { void etcd::KeepAlive::refresh() { - if (!continue_next) { + if (!continue_next.load()) { return; } // minimal resolution: 1 second int keepalive_ttl = std::max(ttl - 1, 1); -#ifndef NDEBUG - { - std::ios::fmtflags os_flags (std::cout.flags()); - std::cout << "Trigger the next keepalive round with ttl " << keepalive_ttl - << " for " << lease_id - << "(" << std::hex << lease_id << ")" << std::endl; - std::cout.flags(os_flags); - } -#endif keepalive_timer_.reset(new boost::asio::steady_timer( context, std::chrono::seconds(keepalive_ttl))); keepalive_timer_->async_wait([this](const boost::system::error_code& error) { if (error) { #ifndef NDEBUG - std::cerr << "keepalive timer error: " << error << ", " << error.message() << std::endl; + std::cerr << "keepalive timer cancelled: " << error << ", " << error.message() << std::endl; #endif } else { - if (this->continue_next) { + if (this->continue_next.load()) { auto resp = this->stubs->call->Refresh(); if (!resp.is_ok()) { throw std::runtime_error("Failed to refresh lease: error code: " + std::to_string(resp.error_code()) + diff --git a/src/Watcher.cpp b/src/Watcher.cpp index 89b979c..50fd291 100644 --- a/src/Watcher.cpp +++ b/src/Watcher.cpp @@ -92,22 +92,26 @@ etcd::Watcher::Watcher(std::string const & address, etcd::Watcher::~Watcher() { - stubs->call->CancelWatch(); - currentTask.wait(); + this->Cancel(); } bool etcd::Watcher::Wait() { - currentTask.wait(); + if (!cancelled.exchange(true)) { + if (task_.joinable()) { + task_.join(); + } + } return stubs->call->Cancelled(); } void etcd::Watcher::Wait(std::function callback) { - currentTask.then([this, callback](pplx::task const & resp_task) { - resp_task.wait(); - callback(this->stubs->call->Cancelled()); - }); + if (wait_callback == nullptr) { + wait_callback = callback; + } else { + std::cerr << "Failed to set a asynchronous wait callback since it has already been set" << std::endl; + } } void etcd::Watcher::Cancel() @@ -133,8 +137,11 @@ void etcd::Watcher::doWatch(std::string const & key, stubs->call.reset(new etcdv3::AsyncWatchAction(params)); - currentTask = pplx::task([this, callback]() - { - return stubs->call->waitForResponse(callback); + task_ = std::thread([this, callback]() { + stubs->call->waitForResponse(callback); + if (wait_callback != nullptr) { + wait_callback(stubs->call->Cancelled()); + } }); + cancelled.store(false); } diff --git a/src/v3/AsyncWatchAction.cpp b/src/v3/AsyncWatchAction.cpp index 81acaa2..0c4a77e 100644 --- a/src/v3/AsyncWatchAction.cpp +++ b/src/v3/AsyncWatchAction.cpp @@ -9,7 +9,7 @@ using etcdserverpb::WatchCreateRequest; etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters param) : etcdv3::Action(param) { - isCancelled = false; + isCancelled.store(false); stream = parameters.watch_stub->AsyncWatch(&context,&cq_,(void*)"create"); WatchRequest watch_req; @@ -61,14 +61,14 @@ void etcdv3::AsyncWatchAction::waitForResponse() break; } if(got_tag == (void*)"writes done") { - isCancelled = true; + isCancelled.store(true); cq_.Shutdown(); break; } if(got_tag == (void*)this) // read tag { if (reply.canceled()) { - isCancelled = true; + isCancelled.store(true); cq_.Shutdown(); } else if ((reply.created() && reply.header().revision() < parameters.revision) || @@ -77,7 +77,7 @@ void etcdv3::AsyncWatchAction::waitForResponse() // // 1. watch for a future revision, return immediately with empty events set // 2. receive any effective events. - isCancelled = true; + isCancelled.store(true); stream->WritesDone((void*)"writes done"); grpc::Status status; stream->Finish(&status, (void *)this); @@ -100,9 +100,7 @@ void etcdv3::AsyncWatchAction::waitForResponse() void etcdv3::AsyncWatchAction::CancelWatch() { std::lock_guard scope_lock(this->protect_is_cancalled); - if(isCancelled == false) - { - isCancelled = true; + if (!isCancelled.exchange(true)) { stream->WritesDone((void*)"writes done"); grpc::Status status; stream->Finish(&status, (void *)this); @@ -111,7 +109,7 @@ void etcdv3::AsyncWatchAction::CancelWatch() } bool etcdv3::AsyncWatchAction::Cancelled() const { - return isCancelled; + return isCancelled.load(); } void etcdv3::AsyncWatchAction::waitForResponse(std::function callback) @@ -127,14 +125,14 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::function #include +#include #include #include @@ -108,7 +109,7 @@ TEST_CASE("lock using lease") keepalive.Check(); // shouldn't throw // lock - etcd::Response resp1 = etcd.lock("/test/abcd", lease_id).get(); + etcd::Response resp1 = etcd.lock_with_lease("/test/abcd", lease_id).get(); CHECK("lock" == resp1.action()); REQUIRE(resp1.is_ok()); REQUIRE(0 == resp1.error_code()); @@ -141,7 +142,7 @@ TEST_CASE("lock using lease") keepalive.Check(); // shouldn't throw // lock - etcd::Response resp1 = etcd.lock("/test/abcd", lease_id).get(); + etcd::Response resp1 = etcd.lock_with_lease("/test/abcd", lease_id).get(); CHECK("lock" == resp1.action()); REQUIRE(resp1.is_ok()); REQUIRE(0 == resp1.error_code()); @@ -164,3 +165,32 @@ TEST_CASE("lock using lease") keepalive.Check(); // shouldn't throw } } + +TEST_CASE("concurrent lock & unlock") +{ + etcd::Client etcd("http://127.0.0.1:2379"); + std::string const lock_key = "/test/test_key"; + + constexpr size_t trials = 128; + + std::function locker = [&etcd](std::string const &key, const size_t index) { + std::cout << "start lock for " << index << std::endl; + auto resp = etcd.lock(key).get(); + std::cout << "lock for " << index << " is ok, start sleep: ..." << resp.error_message() << std::endl; + REQUIRE(resp.is_ok()); + std::srand(index); + size_t time_to_sleep = 1; + std::this_thread::sleep_for(std::chrono::seconds(time_to_sleep)); + REQUIRE(etcd.unlock(resp.lock_key()).get().is_ok()); + std::cout << "thread " << index << " been unlocked" << std::endl; + }; + + std::vector locks(trials); + for (size_t index = 0; index < trials; ++index) { + locks[index] = std::thread(locker, lock_key, index); + } + + for (size_t index = 0; index < trials; ++index) { + locks[index].join(); + } +}