etcd-cpp-apiv3/tst/LockTest.cpp

264 lines
7.5 KiB
C++

#define CATCH_CONFIG_MAIN
#include <catch.hpp>
#include <atomic>
#include <chrono>
#include <cstdlib>
#include <iostream>
#include <thread>
#include "etcd/Client.hpp"
#include "etcd/KeepAlive.hpp"
static const std::string etcd_url =
etcdv3::detail::resolve_etcd_endpoints("http://127.0.0.1:2379");
TEST_CASE("lock and unlock") {
etcd::Client etcd(etcd_url);
// lock
etcd::Response resp1 = etcd.lock("/test/abcd").get();
CHECK("lock" == resp1.action());
REQUIRE(resp1.is_ok());
REQUIRE(0 == resp1.error_code());
// unlock
etcd::Response resp2 = etcd.unlock(resp1.lock_key()).get();
CHECK("unlock" == resp2.action());
REQUIRE(resp2.is_ok());
REQUIRE(0 == resp2.error_code());
}
TEST_CASE("double lock will fail") {
etcd::Client etcd(etcd_url);
// lock
etcd::Response resp1 = etcd.lock("/test/abcd").get();
CHECK("lock" == resp1.action());
REQUIRE(resp1.is_ok());
REQUIRE(0 == resp1.error_code());
bool first_lock_release = false;
std::string lock_key = resp1.lock_key();
auto second_lock_thr = std::thread([&]() {
// lock again
etcd::Response resp2 = etcd.lock("/test/abcd").get();
CHECK("lock" == resp2.action());
REQUIRE(resp2.is_ok());
REQUIRE(0 == resp2.error_code());
lock_key = resp2.lock_key();
// will success after first lock released.
REQUIRE(first_lock_release);
});
auto first_lock_thr = std::thread([&]() {
// check the lock key exists.
etcd::Response resp3 = etcd.get(resp1.lock_key()).get();
CHECK("get" == resp3.action());
REQUIRE(resp3.is_ok());
REQUIRE(0 == resp3.error_code());
// create a duration
// using a duration longer than default lease TTL for lock (see:
// DEFAULT_LEASE_TTL_FOR_LOCK)
std::this_thread::sleep_for(std::chrono::seconds(15));
// unlock the first lock
first_lock_release = true;
etcd::Response resp4 = etcd.unlock(lock_key).get();
CHECK("unlock" == resp4.action());
REQUIRE(resp4.is_ok());
REQUIRE(0 == resp4.error_code());
});
first_lock_thr.join();
second_lock_thr.join();
// cleanup: unlock the second lock
etcd::Response resp5 = etcd.unlock(lock_key).get();
CHECK("unlock" == resp5.action());
REQUIRE(resp5.is_ok());
REQUIRE(0 == resp5.error_code());
}
TEST_CASE("lock could be timeout") {
etcd::Client etcd(etcd_url);
// setup the timeout
etcd.set_grpc_timeout(std::chrono::seconds(5));
// lock
etcd::Response resp1 = etcd.lock("/test/abcd").get();
CHECK("lock" == resp1.action());
REQUIRE(resp1.is_ok());
REQUIRE(0 == resp1.error_code());
auto lock_in_another_thread = std::thread([&]() {
// lock again
etcd::Response resp2 = etcd.lock("/test/abcd").get();
CHECK("lock" == resp2.action());
REQUIRE(resp2.is_grpc_timeout());
});
lock_in_another_thread.join();
// cleanup: unlock the second lock
etcd::Response resp5 = etcd.unlock(resp1.lock_key()).get();
CHECK("unlock" == resp5.action());
REQUIRE(resp5.is_ok());
REQUIRE(0 == resp5.error_code());
}
TEST_CASE("lock using lease") {
etcd::Client etcd(etcd_url);
bool failed = false;
std::function<void(std::exception_ptr)> handler =
[&failed](std::exception_ptr eptr) {
try {
if (eptr) {
std::rethrow_exception(eptr);
}
} catch (const std::exception& e) {
std::cerr << "Caught exception \"" << e.what() << "\"\n";
failed = true;
}
};
// with handler
{
// grant lease and keep it alive
int64_t lease_id = etcd.leasegrant(5).get().value().lease();
etcd::KeepAlive keepalive(etcd, handler, 3, lease_id);
REQUIRE(!failed);
keepalive.Check(); // shouldn't throw
// lock
etcd::Response resp1 = etcd.lock_with_lease("/test/abcd", lease_id).get();
CHECK("lock" == resp1.action());
REQUIRE(resp1.is_ok());
REQUIRE(0 == resp1.error_code());
REQUIRE(!failed);
keepalive.Check(); // shouldn't throw
std::this_thread::sleep_for(std::chrono::seconds(20));
REQUIRE(!failed);
keepalive.Check(); // shouldn't throw
// unlock
etcd::Response resp2 = etcd.unlock(resp1.lock_key()).get();
CHECK("unlock" == resp2.action());
REQUIRE(resp2.is_ok());
REQUIRE(0 == resp2.error_code());
REQUIRE(!failed);
keepalive.Check(); // shouldn't throw
}
// without handler
{
// grant lease and keep it alive
int64_t lease_id = etcd.leasegrant(5).get().value().lease();
etcd::KeepAlive keepalive(etcd, 3, lease_id);
REQUIRE(!failed);
keepalive.Check(); // shouldn't throw
// lock
etcd::Response resp1 = etcd.lock_with_lease("/test/abcd", lease_id).get();
CHECK("lock" == resp1.action());
REQUIRE(resp1.is_ok());
REQUIRE(0 == resp1.error_code());
REQUIRE(!failed);
keepalive.Check(); // shouldn't throw
std::this_thread::sleep_for(std::chrono::seconds(20));
REQUIRE(!failed);
keepalive.Check(); // shouldn't throw
// unlock
etcd::Response resp2 = etcd.unlock(resp1.lock_key()).get();
CHECK("unlock" == resp2.action());
REQUIRE(resp2.is_ok());
REQUIRE(0 == resp2.error_code());
REQUIRE(!failed);
keepalive.Check(); // shouldn't throw
}
}
TEST_CASE("concurrent lock & unlock") {
etcd::Client etcd(etcd_url);
std::string const lock_key = "/test/test_key";
constexpr size_t trials = 192;
std::function<void(std::string const&, const size_t)> locker =
[&etcd](std::string const& key, const size_t index) {
std::cout << "start lock for " << key << ", index is " << index
<< std::endl;
auto resp = etcd.lock(key).get();
std::cout << "lock for " << index << " is ok, starts sleeping: ..."
<< resp.error_message() << std::endl
<< std::flush;
REQUIRE(resp.is_ok());
std::srand(index);
size_t time_to_sleep = 1;
std::this_thread::sleep_for(std::chrono::seconds(time_to_sleep));
std::cout << "lock for " << index << " resumes from sleep: ..."
<< resp.error_message() << std::endl
<< std::flush;
REQUIRE(etcd.unlock(resp.lock_key()).get().is_ok());
std::cout << "thread " << index << " been unlocked" << std::endl
<< std::flush;
};
std::vector<std::thread> locks(trials);
for (size_t index = 0; index < trials; ++index) {
locks[index] = std::thread(locker, lock_key, index);
}
for (size_t index = 0; index < trials; ++index) {
locks[index].join();
}
}
TEST_CASE("concurrent lock & unlock with a put in between") {
etcd::Client etcd(etcd_url);
std::string const lock_key = "/test/test_key";
constexpr size_t trials = 128;
std::function<void(std::string const&, const size_t)> locker =
[&etcd](std::string const& key, const size_t index) {
std::cout << "start lock for " << index << std::endl;
auto resp = etcd.lock(key, true).get();
std::cout << "lock for " << index << " is ok, start put and unlock: ..."
<< resp.error_message() << std::endl;
REQUIRE(resp.is_ok());
auto put_resp =
etcd.put("/test/test_put", "hello" + std::to_string(index)).get();
REQUIRE(put_resp.is_ok());
REQUIRE(etcd.unlock(resp.lock_key()).get().is_ok());
std::cout << "thread " << index << " been unlocked" << std::endl;
};
std::vector<std::thread> locks(trials);
for (size_t index = 0; index < trials; ++index) {
locks[index] = std::thread(locker, lock_key, index);
}
for (size_t index = 0; index < trials; ++index) {
locks[index].join();
}
}