Add a sync variant of lock on async client. (#163)
Resolves #139. Lock is special, as it may cause the thread resources (in the pplx thread pool) to be exhausted. Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
parent
ed7ab08ef7
commit
39be27e021
|
|
@ -512,6 +512,21 @@ namespace etcd
|
||||||
*/
|
*/
|
||||||
pplx::task<Response> lock(std::string const &key);
|
pplx::task<Response> lock(std::string const &key);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lock is special, as waiting for lock may cause the thread resources (in the pplx thread pool) to
|
||||||
|
* be exhausted. So we need to provide a way to let the user to issue a lock without taking the a
|
||||||
|
* shared thread.
|
||||||
|
*
|
||||||
|
* This works like what we have in the sync client, but we can issue such method from the async client
|
||||||
|
* directly.
|
||||||
|
*
|
||||||
|
* That would be useful when use already issue the lock from a controlled thread. See more discussion
|
||||||
|
* about the target scenario in https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3/issues/139
|
||||||
|
*
|
||||||
|
* @param key is the key to be used to request the lock.
|
||||||
|
*/
|
||||||
|
pplx::task<Response> lock(std::string const &key, const bool sync);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gains a lock at a key, using a default created lease, using the specified lease TTL (in seconds), with
|
* Gains a lock at a key, using a default created lease, using the specified lease TTL (in seconds), with
|
||||||
* keeping alive has already been taken care of by the library.
|
* keeping alive has already been taken care of by the library.
|
||||||
|
|
@ -520,6 +535,11 @@ namespace etcd
|
||||||
*/
|
*/
|
||||||
pplx::task<Response> lock(std::string const &key, int lease_ttl);
|
pplx::task<Response> lock(std::string const &key, int lease_ttl);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lock, but the sync version.
|
||||||
|
*/
|
||||||
|
pplx::task<Response> lock(std::string const &key, int lease_ttl, const bool sync);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gains a lock at a key, using a user-provided lease, the lifetime of the lease won't be taken care
|
* Gains a lock at a key, using a user-provided lease, the lifetime of the lease won't be taken care
|
||||||
* of by the library.
|
* of by the library.
|
||||||
|
|
@ -527,6 +547,11 @@ namespace etcd
|
||||||
*/
|
*/
|
||||||
pplx::task<Response> lock_with_lease(std::string const &key, int64_t lease_id);
|
pplx::task<Response> lock_with_lease(std::string const &key, int64_t lease_id);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lock, but the sync version.
|
||||||
|
*/
|
||||||
|
pplx::task<Response> lock_with_lease(std::string const &key, int64_t lease_id, const bool sync);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Releases a lock at a key.
|
* Releases a lock at a key.
|
||||||
* @param key is the lock key to release.
|
* @param key is the lock key to release.
|
||||||
|
|
|
||||||
|
|
@ -554,7 +554,14 @@ pplx::task<etcd::Response> etcd::Client::lock(std::string const &key) {
|
||||||
return this->lock(key, DEFAULT_LEASE_TTL_FOR_LOCK);
|
return this->lock(key, DEFAULT_LEASE_TTL_FOR_LOCK);
|
||||||
}
|
}
|
||||||
|
|
||||||
pplx::task<etcd::Response> etcd::Client::lock(std::string const &key, int lease_ttl) {
|
pplx::task<etcd::Response> etcd::Client::lock(std::string const &key,
|
||||||
|
const bool sync) {
|
||||||
|
static const int DEFAULT_LEASE_TTL_FOR_LOCK = 10; // see also etcd::SyncClient::lock
|
||||||
|
return this->lock(key, DEFAULT_LEASE_TTL_FOR_LOCK, sync);
|
||||||
|
}
|
||||||
|
|
||||||
|
pplx::task<etcd::Response> etcd::Client::lock(std::string const &key,
|
||||||
|
int lease_ttl) {
|
||||||
// See Note [lease with TTL and issue the actual request]
|
// See Note [lease with TTL and issue the actual request]
|
||||||
// See also SyncClient::lock
|
// See also SyncClient::lock
|
||||||
//
|
//
|
||||||
|
|
@ -566,6 +573,18 @@ pplx::task<etcd::Response> etcd::Client::lock(std::string const &key, int lease_
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pplx::task<etcd::Response> etcd::Client::lock(std::string const &key,
|
||||||
|
int lease_ttl,
|
||||||
|
const bool sync) {
|
||||||
|
if (sync) {
|
||||||
|
pplx::task_completion_event<etcd::Response> event;
|
||||||
|
event.set(this->client->lock(key, lease_ttl));
|
||||||
|
return pplx::task<etcd::Response>(event);
|
||||||
|
} else {
|
||||||
|
return this->lock(key, lease_ttl);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pplx::task<etcd::Response> etcd::Client::lock_with_lease(std::string const &key,
|
pplx::task<etcd::Response> etcd::Client::lock_with_lease(std::string const &key,
|
||||||
int64_t lease_id) {
|
int64_t lease_id) {
|
||||||
return etcd::detail::asyncify(
|
return etcd::detail::asyncify(
|
||||||
|
|
@ -573,6 +592,18 @@ pplx::task<etcd::Response> etcd::Client::lock_with_lease(std::string const &key,
|
||||||
this->client->lock_with_lease_internal(key, lease_id));
|
this->client->lock_with_lease_internal(key, lease_id));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pplx::task<etcd::Response> etcd::Client::lock_with_lease(std::string const &key,
|
||||||
|
int64_t lease_id,
|
||||||
|
const bool sync) {
|
||||||
|
if (sync) {
|
||||||
|
pplx::task_completion_event<etcd::Response> event;
|
||||||
|
event.set(this->client->lock_with_lease(key, lease_id));
|
||||||
|
return pplx::task<etcd::Response>(event);
|
||||||
|
} else {
|
||||||
|
return this->lock_with_lease(key, lease_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pplx::task<etcd::Response> etcd::Client::unlock(std::string const &lock_key) {
|
pplx::task<etcd::Response> etcd::Client::unlock(std::string const &lock_key) {
|
||||||
return etcd::detail::asyncify(
|
return etcd::detail::asyncify(
|
||||||
static_cast<responser_t<etcdv3::AsyncUnlockAction>>(Response::create),
|
static_cast<responser_t<etcdv3::AsyncUnlockAction>>(Response::create),
|
||||||
|
|
|
||||||
|
|
@ -224,3 +224,30 @@ TEST_CASE("concurrent lock & unlock")
|
||||||
locks[index].join();
|
locks[index].join();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_CASE("concurrent lock & unlock with a put in between") {
|
||||||
|
etcd::Client etcd("http://127.0.0.1:2379");
|
||||||
|
std::string const lock_key = "/test/test_key";
|
||||||
|
|
||||||
|
constexpr size_t trials = 128;
|
||||||
|
|
||||||
|
std::function<void(std::string const &, const size_t)> locker = [&etcd](std::string const &key, const size_t index) {
|
||||||
|
std::cout << "start lock for " << index << std::endl;
|
||||||
|
auto resp = etcd.lock(key, true).get();
|
||||||
|
std::cout << "lock for " << index << " is ok, start put and unlock: ..." << resp.error_message() << std::endl;
|
||||||
|
REQUIRE(resp.is_ok());
|
||||||
|
auto put_resp = etcd.put("/test/test_put", "hello" + std::to_string(index)).get();
|
||||||
|
REQUIRE(put_resp.is_ok());
|
||||||
|
REQUIRE(etcd.unlock(resp.lock_key()).get().is_ok());
|
||||||
|
std::cout << "thread " << index << " been unlocked" << std::endl;
|
||||||
|
};
|
||||||
|
|
||||||
|
std::vector<std::thread> locks(trials);
|
||||||
|
for (size_t index = 0; index < trials; ++index) {
|
||||||
|
locks[index] = std::thread(locker, lock_key, index);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (size_t index = 0; index < trials; ++index) {
|
||||||
|
locks[index].join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue