Add a sync variant of lock on async client.
Resolves #139. Lock is special, as it may cause the thread resources (in the pplx thread pool) to be exhausted. Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
parent
ed7ab08ef7
commit
6019dead4c
|
|
@ -512,6 +512,21 @@ namespace etcd
|
|||
*/
|
||||
pplx::task<Response> lock(std::string const &key);
|
||||
|
||||
/**
|
||||
* Lock is special, as waiting for lock may cause the thread resources (in the pplx thread pool) to
|
||||
* be exhausted. So we need to provide a way to let the user to issue a lock without taking the a
|
||||
* shared thread.
|
||||
*
|
||||
* This works like what we have in the sync client, but we can issue such method from the async client
|
||||
* directly.
|
||||
*
|
||||
* That would be useful when use already issue the lock from a controlled thread. See more discussion
|
||||
* about the target scenario in https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/issues/139
|
||||
*
|
||||
* @param key is the key to be used to request the lock.
|
||||
*/
|
||||
pplx::task<Response> lock(std::string const &key, const bool sync);
|
||||
|
||||
/**
|
||||
* Gains a lock at a key, using a default created lease, using the specified lease TTL (in seconds), with
|
||||
* keeping alive has already been taken care of by the library.
|
||||
|
|
@ -520,6 +535,11 @@ namespace etcd
|
|||
*/
|
||||
pplx::task<Response> lock(std::string const &key, int lease_ttl);
|
||||
|
||||
/**
|
||||
* Lock, but the sync version.
|
||||
*/
|
||||
pplx::task<Response> lock(std::string const &key, int lease_ttl, const bool sync);
|
||||
|
||||
/**
|
||||
* Gains a lock at a key, using a user-provided lease, the lifetime of the lease won't be taken care
|
||||
* of by the library.
|
||||
|
|
@ -527,6 +547,11 @@ namespace etcd
|
|||
*/
|
||||
pplx::task<Response> lock_with_lease(std::string const &key, int64_t lease_id);
|
||||
|
||||
/**
|
||||
* Lock, but the sync version.
|
||||
*/
|
||||
pplx::task<Response> lock_with_lease(std::string const &key, int64_t lease_id, const bool sync);
|
||||
|
||||
/**
|
||||
* Releases a lock at a key.
|
||||
* @param key is the lock key to release.
|
||||
|
|
|
|||
|
|
@ -554,7 +554,14 @@ pplx::task<etcd::Response> etcd::Client::lock(std::string const &key) {
|
|||
return this->lock(key, DEFAULT_LEASE_TTL_FOR_LOCK);
|
||||
}
|
||||
|
||||
pplx::task<etcd::Response> etcd::Client::lock(std::string const &key, int lease_ttl) {
|
||||
pplx::task<etcd::Response> etcd::Client::lock(std::string const &key,
|
||||
const bool sync) {
|
||||
static const int DEFAULT_LEASE_TTL_FOR_LOCK = 10; // see also etcd::SyncClient::lock
|
||||
return this->lock(key, DEFAULT_LEASE_TTL_FOR_LOCK, sync);
|
||||
}
|
||||
|
||||
pplx::task<etcd::Response> etcd::Client::lock(std::string const &key,
|
||||
int lease_ttl) {
|
||||
// See Note [lease with TTL and issue the actual request]
|
||||
// See also SyncClient::lock
|
||||
//
|
||||
|
|
@ -566,6 +573,18 @@ pplx::task<etcd::Response> etcd::Client::lock(std::string const &key, int lease_
|
|||
});
|
||||
}
|
||||
|
||||
pplx::task<etcd::Response> etcd::Client::lock(std::string const &key,
|
||||
int lease_ttl,
|
||||
const bool sync) {
|
||||
if (sync) {
|
||||
pplx::task_completion_event<etcd::Response> event;
|
||||
event.set(this->client->lock(key, lease_ttl));
|
||||
return pplx::task<etcd::Response>(event);
|
||||
} else {
|
||||
return this->lock(key, lease_ttl);
|
||||
}
|
||||
}
|
||||
|
||||
pplx::task<etcd::Response> etcd::Client::lock_with_lease(std::string const &key,
|
||||
int64_t lease_id) {
|
||||
return etcd::detail::asyncify(
|
||||
|
|
@ -573,6 +592,18 @@ pplx::task<etcd::Response> etcd::Client::lock_with_lease(std::string const &key,
|
|||
this->client->lock_with_lease_internal(key, lease_id));
|
||||
}
|
||||
|
||||
pplx::task<etcd::Response> etcd::Client::lock_with_lease(std::string const &key,
|
||||
int64_t lease_id,
|
||||
const bool sync) {
|
||||
if (sync) {
|
||||
pplx::task_completion_event<etcd::Response> event;
|
||||
event.set(this->client->lock_with_lease(key, lease_id));
|
||||
return pplx::task<etcd::Response>(event);
|
||||
} else {
|
||||
return this->lock_with_lease(key, lease_id);
|
||||
}
|
||||
}
|
||||
|
||||
pplx::task<etcd::Response> etcd::Client::unlock(std::string const &lock_key) {
|
||||
return etcd::detail::asyncify(
|
||||
static_cast<responser_t<etcdv3::AsyncUnlockAction>>(Response::create),
|
||||
|
|
|
|||
|
|
@ -224,3 +224,30 @@ TEST_CASE("concurrent lock & unlock")
|
|||
locks[index].join();
|
||||
}
|
||||
}
|
||||
|
||||
TEST_CASE("concurrent lock & unlock with a put in between") {
|
||||
etcd::Client etcd("http://127.0.0.1:2379");
|
||||
std::string const lock_key = "/test/test_key";
|
||||
|
||||
constexpr size_t trials = 128;
|
||||
|
||||
std::function<void(std::string const &, const size_t)> locker = [&etcd](std::string const &key, const size_t index) {
|
||||
std::cout << "start lock for " << index << std::endl;
|
||||
auto resp = etcd.lock(key, true).get();
|
||||
std::cout << "lock for " << index << " is ok, start put and unlock: ..." << resp.error_message() << std::endl;
|
||||
REQUIRE(resp.is_ok());
|
||||
auto put_resp = etcd.put("/test/test_put", "hello" + std::to_string(index)).get();
|
||||
REQUIRE(put_resp.is_ok());
|
||||
REQUIRE(etcd.unlock(resp.lock_key()).get().is_ok());
|
||||
std::cout << "thread " << index << " been unlocked" << std::endl;
|
||||
};
|
||||
|
||||
std::vector<std::thread> locks(trials);
|
||||
for (size_t index = 0; index < trials; ++index) {
|
||||
locks[index] = std::thread(locker, lock_key, index);
|
||||
}
|
||||
|
||||
for (size_t index = 0; index < trials; ++index) {
|
||||
locks[index].join();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue