From bcded542c8a33c8418e3b0d9e1bcb2d1c430d07d Mon Sep 17 00:00:00 2001 From: Tao He Date: Sun, 10 Jan 2021 23:31:00 +0800 Subject: [PATCH] Implements "KeepAlive" feature, and enhance "lock" with a lease. Signed-off-by: Tao He --- README.md | 75 +++++++++--- etcd/Client.hpp | 30 ++++- etcd/KeepAlive.hpp | 63 ++++++++++ etcd/SyncClient.hpp | 2 + etcd/Watcher.hpp | 3 + etcd/v3/AsyncLeaseAction.hpp | 80 +++++++++++++ etcd/v3/AsyncLeaseGrantAction.hpp | 25 ---- etcd/v3/AsyncLeaseGrantResponse.hpp | 21 ---- etcd/v3/AsyncLeaseResponse.hpp | 55 +++++++++ etcd/v3/Transaction.hpp | 3 - src/Client.cpp | 75 +++++++++++- src/KeepAlive.cpp | 78 +++++++++++++ src/SyncClient.cpp | 10 ++ src/v3/AsyncLeaseAction.cpp | 172 ++++++++++++++++++++++++++++ src/v3/AsyncLeaseGrantAction.cpp | 32 ------ src/v3/AsyncLeaseGrantResponse.cpp | 11 -- src/v3/AsyncLeaseResponse.cpp | 35 ++++++ src/v3/AsyncLockAction.cpp | 1 + src/v3/AsyncWatchAction.cpp | 9 +- src/v3/Transaction.cpp | 5 - tst/LockTest.cpp | 3 +- 21 files changed, 665 insertions(+), 123 deletions(-) create mode 100644 etcd/KeepAlive.hpp create mode 100644 etcd/v3/AsyncLeaseAction.hpp delete mode 100644 etcd/v3/AsyncLeaseGrantAction.hpp delete mode 100644 etcd/v3/AsyncLeaseGrantResponse.hpp create mode 100644 etcd/v3/AsyncLeaseResponse.hpp create mode 100644 src/KeepAlive.cpp create mode 100644 src/v3/AsyncLeaseAction.cpp delete mode 100644 src/v3/AsyncLeaseGrantAction.cpp delete mode 100644 src/v3/AsyncLeaseGrantResponse.cpp create mode 100644 src/v3/AsyncLeaseResponse.cpp diff --git a/README.md b/README.md index 69acd05..2c35df7 100644 --- a/README.md +++ b/README.md @@ -294,6 +294,25 @@ prefix will be deleted. All deleted keys will be placed in ```response.values()` However, if recursive parameter is false, functionality will be the same as just deleting a key. The key supplied will NOT be treated as a prefix and will be treated as a normal key name. +### Lock + +Etcd lock has been supported as follows: + +```c++ + etcd::Client etcd("http://127.0.0.1:4001"); + etcd.lock("/test/lock"); +``` + +It will create a lease and a keep-alive job behind the screen, the lease will be revoked until +the lock is unlocked. + +Users can also feed their own lease directory for lock: + +```c++ + etcd::Client etcd("http://127.0.0.1:4001"); + etcd.lock("/test/lock", lease_id); +``` + ### Watching for changes Watching for a change is possible with the ```watch()``` operation of the client. The watch method @@ -343,22 +362,7 @@ fact it just sends the asynchron request, sets up a callback for the response an callback is executed by some thread from the pplx library's thread pool and the callback (in this case a small lambda function actually) will call ```watch_for_changes``` again from there. - -### Requesting for lease - -Users can request for lease which is governed by a time-to-live(TTL) value given by the user. -Moreover, user can attached the lease to a key(s) by indicating the lease id in ```add()```, -```set()```, ```modify()``` and ```modify_if()```. Also the ttl will that was granted by etcd -server will be indicated in ```ttl()```. - -```c++ - etcd::Client etcd("http://127.0.0.1:4001"); - etcd::Response resp = etcd.leasegrant(60).get(); - etcd.set("/test/key2", "bar", resp.value().lease()); - std::cout <<"ttl" << resp.value().ttl(); -``` - -### Watcher Class +#### Watcher Class Users can watch a key indefinitely or until user cancels the watch. This can be done by instantiating a Watcher class. The supplied callback function in Watcher class will be @@ -375,11 +379,46 @@ either by user implicitly calling ```Cancel()``` or when watcher class is destro } ``` +### Requesting for lease + +Users can request for lease which is governed by a time-to-live(TTL) value given by the user. +Moreover, user can attached the lease to a key(s) by indicating the lease id in ```add()```, +```set()```, ```modify()``` and ```modify_if()```. Also the ttl will that was granted by etcd +server will be indicated in ```ttl()```. + +```c++ + etcd::Client etcd("http://127.0.0.1:4001"); + etcd::Response resp = etcd.leasegrant(60).get(); + etcd.set("/test/key2", "bar", resp.value().lease()); + std::cout << "ttl" << resp.value().ttl(); +``` + +The lease can be revoked by + +```c++ + etcd.leaserevoke(resp.value().lease()); +``` + +The remaining time-to-live of a lease can be inspected by + +```c++ + etcd::Response resp2 = etcd.leasetimetolive(resp.value().lease()).get(); + std::cout << "ttl" << resp.value().ttl(); +``` + +#### Keep alive + +Keep alive for leases is implemented using a seperate class `KeepAlive`, which can be used as: + +```c++ + etcd::KeepAlive keepalive(etcd, lease_id, ttl); +``` + +It will perform a periodly keep-alive action before it is cancelled explicitly, or destructed implicitly. + ### TODO 1. Cancellation of asynchronous calls(except for watch) -2. LeaseKeepAlive -3. Authentication ## License diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 1245822..abed256 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -3,6 +3,7 @@ #include "etcd/Response.hpp" +#include #include #include @@ -21,6 +22,7 @@ namespace etcdv3 { namespace etcd { + class KeepAlive; class Watcher; /** @@ -215,16 +217,36 @@ namespace etcd pplx::task leasegrant(int ttl); /** - * Gains a lock at a key. + * Revoke a lease. + * @param lease_id is the id the lease + */ + pplx::task leaserevoke(int64_t lease_id); + + /** + * Get time-to-live of a lease. + * @param lease_id is the id the lease + */ + pplx::task leasetimetolive(int64_t lease_id); + + /** + * Gains a lock at a key, using a default created lease, using the default lease (60 seconds), with + * keeping alive has already been taken care of by the library. * @param key is the key to be used to request the lock. */ pplx::task lock(std::string const &key); + /** + * Gains a lock at a key, using a user-provided lease, the lifetime of the lease won't be taken care + * of by the library. + * @param key is the key to be used to request the lock. + */ + pplx::task lock(std::string const &key, int64_t lease_id); + /** * Releases a lock at a key. * @param key is the lock key to release. */ - pplx::task unlock(std::string const &key); + pplx::task unlock(std::string const &lock_key); /** * Execute a etcd transaction. @@ -240,6 +262,10 @@ namespace etcd std::unique_ptr leaseServiceStub; std::unique_ptr lockServiceStub; + std::map leases_for_locks; + std::map> keep_alive_for_locks; + + friend class KeepAlive; friend class Watcher; }; diff --git a/etcd/KeepAlive.hpp b/etcd/KeepAlive.hpp new file mode 100644 index 0000000..9a90a5f --- /dev/null +++ b/etcd/KeepAlive.hpp @@ -0,0 +1,63 @@ +#ifndef __ETCD_KEEPALIVE_HPP__ +#define __ETCD_KEEPALIVE_HPP__ + +#include + +#include "etcd/Client.hpp" +#include "etcd/Response.hpp" + +#include +#include + +#include +#include "proto/rpc.grpc.pb.h" + +namespace etcdv3 { + class AsyncLeaseKeepAliveAction; +} + +using etcdserverpb::KV; +using etcdserverpb::Lease; +using grpc::Channel; + +namespace etcd +{ + /** + * If ID is set to 0, etcd will choose an ID. + */ + class KeepAlive + { + public: + KeepAlive(Client const &client, int ttl, int64_t lease_id=0); + KeepAlive(std::string const & address, int ttl, int64_t lease_id=0); + KeepAlive(std::string const & address, + std::string const & username, std::string const & password, + int ttl, int64_t lease_id=0); + + KeepAlive(KeepAlive const &) = delete; + KeepAlive(KeepAlive &&) = delete; + + /** + * Stop the keep alive action. + */ + void Cancel(); + + ~KeepAlive(); + + protected: + void refresh(); + + pplx::task currentTask; + std::unique_ptr leaseServiceStub; + std::unique_ptr call; + + private: + int ttl; + int64_t lease_id; + bool continue_next; + boost::asio::io_context context; + std::unique_ptr keepalive_timer_; + }; +} + +#endif diff --git a/etcd/SyncClient.hpp b/etcd/SyncClient.hpp index 087bff5..a90dc2e 100644 --- a/etcd/SyncClient.hpp +++ b/etcd/SyncClient.hpp @@ -58,6 +58,8 @@ namespace etcd Response mkdir(std::string const & key, int ttl = 0); Response rmdir(std::string const & key, bool recursive = false); Response leasegrant(int ttl); + Response leaserevoke(int64_t lease_id); + Response leasetimetolive(int64_t lease_id); /** * Watches for changes of a key or a subtree. Please note that if you watch e.g. "/testdir" and diff --git a/etcd/Watcher.hpp b/etcd/Watcher.hpp index 8941b83..87c32ff 100644 --- a/etcd/Watcher.hpp +++ b/etcd/Watcher.hpp @@ -39,6 +39,9 @@ namespace etcd std::string const & key, int fromIndex, std::function callback, bool recursive=false); + Watcher(Watcher const &) = delete; + Watcher(Watcher &&) = delete; + /** * Wait util the task has been stopped, actively or passively, e.g., the watcher * get cancelled or the server closes the connection. diff --git a/etcd/v3/AsyncLeaseAction.hpp b/etcd/v3/AsyncLeaseAction.hpp new file mode 100644 index 0000000..88c9943 --- /dev/null +++ b/etcd/v3/AsyncLeaseAction.hpp @@ -0,0 +1,80 @@ +#ifndef __ASYNC_LEASEACTION_HPP__ +#define __ASYNC_LEASEACTION_HPP__ + +#include + +#include +#include "proto/rpc.grpc.pb.h" +#include "etcd/v3/Action.hpp" +#include "etcd/v3/AsyncLeaseResponse.hpp" + +using grpc::ClientAsyncResponseReader; +using grpc::ClientAsyncReaderWriter; +using etcdserverpb::LeaseGrantResponse; +using etcdserverpb::LeaseRevokeResponse; +using etcdserverpb::LeaseCheckpoint; +using etcdserverpb::LeaseCheckpointResponse; +using etcdserverpb::LeaseKeepAliveRequest; +using etcdserverpb::LeaseKeepAliveResponse; +using etcdserverpb::LeaseTimeToLiveResponse; +using etcdserverpb::LeaseStatus; +using etcdserverpb::LeaseLeasesResponse; + +namespace etcdv3 +{ + class AsyncLeaseGrantAction : public etcdv3::Action { + public: + AsyncLeaseGrantAction(etcdv3::ActionParameters param); + AsyncLeaseGrantResponse ParseResponse(); + private: + LeaseGrantResponse reply; + std::unique_ptr> response_reader; + }; + + class AsyncLeaseRevokeAction: public etcdv3::Action { + public: + AsyncLeaseRevokeAction(etcdv3::ActionParameters param); + AsyncLeaseRevokeResponse ParseResponse(); + private: + LeaseRevokeResponse reply; + std::unique_ptr> response_reader; + }; + + class AsyncLeaseKeepAliveAction: public etcdv3::Action { + public: + AsyncLeaseKeepAliveAction(etcdv3::ActionParameters param); + AsyncLeaseKeepAliveResponse ParseResponse(); + + AsyncLeaseKeepAliveResponse Refresh(); + void CancelKeepAlive(); + bool Cancelled() const; + + private: + LeaseKeepAliveResponse reply; + std::unique_ptr> stream; + + LeaseKeepAliveRequest req; + bool isCancelled; + std::mutex protect_is_cancelled; + }; + + class AsyncLeaseTimeToLiveAction: public etcdv3::Action { + public: + AsyncLeaseTimeToLiveAction(etcdv3::ActionParameters param); + AsyncLeaseTimeToLiveResponse ParseResponse(); + private: + LeaseTimeToLiveResponse reply; + std::unique_ptr> response_reader; + }; + + class AsyncLeaseLeasesAction: public etcdv3::Action { + public: + AsyncLeaseLeasesAction(etcdv3::ActionParameters param); + AsyncLeaseLeasesResponse ParseResponse(); + private: + LeaseLeasesResponse reply; + std::unique_ptr> response_reader; + }; +} + +#endif diff --git a/etcd/v3/AsyncLeaseGrantAction.hpp b/etcd/v3/AsyncLeaseGrantAction.hpp deleted file mode 100644 index 8675521..0000000 --- a/etcd/v3/AsyncLeaseGrantAction.hpp +++ /dev/null @@ -1,25 +0,0 @@ -#ifndef __ASYNC_LEASEGRANTACTION_HPP__ -#define __ASYNC_LEASEGRANTACTION_HPP__ - -#include -#include "proto/rpc.grpc.pb.h" -#include "etcd/v3/Action.hpp" -#include "etcd/v3/AsyncLeaseGrantResponse.hpp" - -using grpc::ClientAsyncResponseReader; -using etcdserverpb::LeaseGrantResponse; - -namespace etcdv3 -{ - class AsyncLeaseGrantAction : public etcdv3::Action - { - public: - AsyncLeaseGrantAction(etcdv3::ActionParameters param); - AsyncLeaseGrantResponse ParseResponse(); - private: - LeaseGrantResponse reply; - std::unique_ptr> response_reader; - }; -} - -#endif diff --git a/etcd/v3/AsyncLeaseGrantResponse.hpp b/etcd/v3/AsyncLeaseGrantResponse.hpp deleted file mode 100644 index e7bdded..0000000 --- a/etcd/v3/AsyncLeaseGrantResponse.hpp +++ /dev/null @@ -1,21 +0,0 @@ -#ifndef __ASYNC_LEASEGRANTRESPONSE_HPP__ -#define __ASYNC_LEASEGRANTRESPONSE_HPP__ - -#include -#include "proto/rpc.grpc.pb.h" -#include "etcd/v3/V3Response.hpp" - - -using etcdserverpb::LeaseGrantResponse; - -namespace etcdv3 -{ - class AsyncLeaseGrantResponse : public etcdv3::V3Response - { - public: - AsyncLeaseGrantResponse(){}; - void ParseResponse(LeaseGrantResponse& resp); - }; -} - -#endif diff --git a/etcd/v3/AsyncLeaseResponse.hpp b/etcd/v3/AsyncLeaseResponse.hpp new file mode 100644 index 0000000..4800c9a --- /dev/null +++ b/etcd/v3/AsyncLeaseResponse.hpp @@ -0,0 +1,55 @@ +#ifndef __ASYNC_LEASERESPONSE_HPP__ +#define __ASYNC_LEASERESPONSE_HPP__ + +#include +#include "proto/rpc.grpc.pb.h" +#include "etcd/v3/V3Response.hpp" + +using etcdserverpb::LeaseGrantResponse; +using etcdserverpb::LeaseRevokeResponse; +using etcdserverpb::LeaseCheckpoint; +using etcdserverpb::LeaseCheckpointResponse; +using etcdserverpb::LeaseKeepAliveResponse; +using etcdserverpb::LeaseTimeToLiveResponse; +using etcdserverpb::LeaseStatus; +using etcdserverpb::LeaseLeasesResponse; + +namespace etcdv3 +{ + class AsyncLeaseGrantResponse : public etcdv3::V3Response + { + public: + AsyncLeaseGrantResponse(){}; + void ParseResponse(LeaseGrantResponse& resp); + }; + + class AsyncLeaseRevokeResponse : public etcdv3::V3Response + { + public: + AsyncLeaseRevokeResponse(){}; + void ParseResponse(LeaseRevokeResponse& resp); + }; + + class AsyncLeaseKeepAliveResponse : public etcdv3::V3Response + { + public: + AsyncLeaseKeepAliveResponse(){}; + void ParseResponse(LeaseKeepAliveResponse& resp); + }; + + class AsyncLeaseTimeToLiveResponse : public etcdv3::V3Response + { + public: + AsyncLeaseTimeToLiveResponse(){}; + void ParseResponse(LeaseTimeToLiveResponse& resp); + }; + + class AsyncLeaseLeasesResponse : public etcdv3::V3Response + { + public: + AsyncLeaseLeasesResponse(){}; + void ParseResponse(LeaseLeasesResponse& resp); + }; +} + +#endif diff --git a/etcd/v3/Transaction.hpp b/etcd/v3/Transaction.hpp index b9de935..fa5e158 100644 --- a/etcd/v3/Transaction.hpp +++ b/etcd/v3/Transaction.hpp @@ -24,15 +24,12 @@ public: void setup_delete_sequence(std::string const &key, std::string const &range_end, bool recursive); void setup_delete_failure_operation(std::string const &key, std::string const &range_end, bool recursive); void setup_compare_and_delete_operation(std::string const& key); - void setup_lease_grant_operation(int ttl); // update without `get` and no `prev_kv` returned void setup_put(std::string const &key, std::string const &value); void setup_delete(std::string const &key); etcdserverpb::TxnRequest txn_request; - etcdserverpb::LeaseGrantRequest leasegrant_request; - private: std::string key; }; diff --git a/src/Client.cpp b/src/Client.cpp index d33b98a..8245c0d 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -13,6 +13,7 @@ #include #include #include "etcd/Client.hpp" +#include "etcd/KeepAlive.hpp" #include "etcd/v3/action_constants.hpp" #include "etcd/v3/Action.hpp" #include "etcd/v3/AsyncTxnResponse.hpp" @@ -30,7 +31,7 @@ #include "etcd/v3/AsyncGetAction.hpp" #include "etcd/v3/AsyncDeleteAction.hpp" #include "etcd/v3/AsyncWatchAction.hpp" -#include "etcd/v3/AsyncLeaseGrantAction.hpp" +#include "etcd/v3/AsyncLeaseAction.hpp" #include "etcd/v3/AsyncLockAction.hpp" #include "etcd/v3/AsyncTxnAction.hpp" @@ -354,7 +355,7 @@ pplx::task etcd::Client::modify_if(std::string const & key, std: params.key.assign(key); params.value.assign(value); params.old_revision = old_index; - params.kv_stub = kvServiceStub .get(); + params.kv_stub = kvServiceStub.get(); if(ttl > 0) { auto res = leasegrant(ttl).get(); @@ -491,19 +492,85 @@ pplx::task etcd::Client::leasegrant(int ttl) return Response::create(call); } +pplx::task etcd::Client::leaserevoke(int64_t lease_id) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.lease_id = lease_id; + params.lease_stub = leaseServiceStub.get(); + std::shared_ptr call(new etcdv3::AsyncLeaseRevokeAction(params)); + return Response::create(call); +} + +pplx::task etcd::Client::leasetimetolive(int64_t lease_id) +{ + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.lease_id = lease_id; + params.lease_stub = leaseServiceStub.get(); + std::shared_ptr call(new etcdv3::AsyncLeaseTimeToLiveAction(params)); + return Response::create(call); +} + pplx::task etcd::Client::lock(std::string const &key) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); + + static const int DEFAULT_LEASE_TTL_FOR_LOCK = 10; + + // routines in lock usually will be fast, less than 10 seconds. + // + // (base on our experiences in vineyard and GraphScope). + auto resp = this->leasegrant(DEFAULT_LEASE_TTL_FOR_LOCK).get(); + int64_t lease_id = resp.value().lease(); + this->keep_alive_for_locks[lease_id].reset(new KeepAlive(*this, DEFAULT_LEASE_TTL_FOR_LOCK, lease_id)); params.key = key; + params.lease_id = lease_id; + params.lock_stub = lockServiceStub.get(); + std::shared_ptr call(new etcdv3::AsyncLockAction(params)); + return Response::create(call).then( + [this, lease_id](pplx::task const &resp_task) -> etcd::Response { + auto const& resp = resp_task.get(); + if (resp.is_ok()) { + this->leases_for_locks[resp.lock_key()] = lease_id; + } else { + this->keep_alive_for_locks.erase(lease_id); + } + return resp; + } + ); +} + +pplx::task etcd::Client::lock(std::string const &key, + int64_t lease_id) { + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.key = key; + params.lease_id = lease_id; params.lock_stub = lockServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncLockAction(params)); return Response::create(call); } -pplx::task etcd::Client::unlock(std::string const &key) { +pplx::task etcd::Client::unlock(std::string const &lock_key) { + std::cout << "begin unlock" << std::endl; + // cancel the KeepAlive first, it exists + auto p_leases = this->leases_for_locks.find(lock_key); + if (p_leases != this->leases_for_locks.end()) { + std::cout << "Unlock for " << lock_key << " and revoke lease " << std::hex << p_leases->second << std::dec << std::endl; + auto p_keeps_alive = this->keep_alive_for_locks.find(p_leases->second); + if (p_keeps_alive != this->keep_alive_for_locks.end()) { + this->keep_alive_for_locks.erase(p_keeps_alive); + } + this->leases_for_locks.erase(p_leases); + } else { + std::cout << "Unable to find lease_id for " << lock_key; + } + + // issue a "unlock" request etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); - params.key = key; + params.key = lock_key; params.lock_stub = lockServiceStub.get(); std::shared_ptr call(new etcdv3::AsyncUnlockAction(params)); return Response::create(call); diff --git a/src/KeepAlive.cpp b/src/KeepAlive.cpp new file mode 100644 index 0000000..18541a5 --- /dev/null +++ b/src/KeepAlive.cpp @@ -0,0 +1,78 @@ +#include "etcd/KeepAlive.hpp" +#include "etcd/v3/AsyncLeaseAction.hpp" + +etcd::KeepAlive::KeepAlive(Client const &client, int ttl, int64_t lease_id): + ttl(ttl), lease_id(lease_id), continue_next(true) { + leaseServiceStub= Lease::NewStub(client.channel); + + etcdv3::ActionParameters params; + params.auth_token.assign(client.auth_token); + params.lease_id = lease_id; + params.lease_stub = leaseServiceStub.get(); + + call.reset(new etcdv3::AsyncLeaseKeepAliveAction(params)); + currentTask = pplx::task([this]() { + // start refresh + this->refresh(); + context.run(); + context.stop(); // clean up + }); +} + +etcd::KeepAlive::KeepAlive(std::string const & address, int ttl, int64_t lease_id): + KeepAlive(Client(address), ttl, lease_id) { +} + +etcd::KeepAlive::KeepAlive(std::string const & address, + std::string const & username, std::string const & password, + int ttl, int64_t lease_id): + KeepAlive(Client(address, username, password), ttl, lease_id) { +} + +etcd::KeepAlive::~KeepAlive() +{ + this->Cancel(); +} + +void etcd::KeepAlive::Cancel() +{ +#ifndef NDEBUG + { + std::ios::fmtflags os_flags (std::cout.flags()); + std::cout << "Cancel keepalive for " << std::hex << lease_id << std::endl; + std::cout.flags(os_flags); + } +#endif + call->CancelKeepAlive(); + if (keepalive_timer_) { + keepalive_timer_->cancel(); + } + currentTask.wait(); +} + +void etcd::KeepAlive::refresh() +{ + // minimal resolution: 1 second + int keepalive_ttl = std::max(ttl - 1, 1); +#ifndef NDEBUG + { + std::ios::fmtflags os_flags (std::cout.flags()); + std::cout << "Trigger the next keepalive round with ttl " << keepalive_ttl + << " for " << std::hex << lease_id << std::endl; + std::cout.flags(os_flags); + } +#endif + keepalive_timer_.reset(new boost::asio::steady_timer( + context, boost::asio::chrono::seconds(keepalive_ttl))); + keepalive_timer_->async_wait([this](const boost::system::error_code& error) { + if (error) { +#ifndef NDEBUG + std::cerr << "keepalive timer error: " << error << ", " << error.message() << std::endl; +#endif + } else { + this->call->Refresh(); + // trigger the next round; + this->refresh(); + } + }); +} diff --git a/src/SyncClient.cpp b/src/SyncClient.cpp index 1f37b87..4dc267f 100644 --- a/src/SyncClient.cpp +++ b/src/SyncClient.cpp @@ -109,6 +109,16 @@ etcd::Response etcd::SyncClient::leasegrant(int ttl) CHECK_EXCEPTIONS(client.leasegrant(ttl).get()); } +etcd::Response etcd::SyncClient::leaserevoke(int64_t lease_id) +{ + CHECK_EXCEPTIONS(client.leaserevoke(lease_id).get()); +} + +etcd::Response etcd::SyncClient::leasetimetolive(int64_t lease_id) +{ + CHECK_EXCEPTIONS(client.leasetimetolive(lease_id).get()); +} + etcd::Response etcd::SyncClient::watch(std::string const & key, bool recursive) { CHECK_EXCEPTIONS(client.watch(key, recursive).get()); diff --git a/src/v3/AsyncLeaseAction.cpp b/src/v3/AsyncLeaseAction.cpp new file mode 100644 index 0000000..d867e1a --- /dev/null +++ b/src/v3/AsyncLeaseAction.cpp @@ -0,0 +1,172 @@ +#include "etcd/v3/AsyncLeaseAction.hpp" +#include "etcd/v3/action_constants.hpp" +#include "etcd/v3/Transaction.hpp" + +using etcdserverpb::LeaseGrantRequest; +using etcdserverpb::LeaseRevokeRequest; +using etcdserverpb::LeaseCheckpointRequest; +using etcdserverpb::LeaseKeepAliveRequest; +using etcdserverpb::LeaseTimeToLiveRequest; +using etcdserverpb::LeaseLeasesRequest; + +etcdv3::AsyncLeaseGrantAction::AsyncLeaseGrantAction(etcdv3::ActionParameters param) + : etcdv3::Action(param) +{ + LeaseGrantRequest leasegrant_request; + leasegrant_request.set_ttl(parameters.ttl); + // If ID is set to 0, etcd will choose an ID. + leasegrant_request.set_id(parameters.lease_id); + + response_reader = parameters.lease_stub->AsyncLeaseGrant(&context, leasegrant_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncLeaseGrantResponse etcdv3::AsyncLeaseGrantAction::ParseResponse() +{ + AsyncLeaseGrantResponse lease_resp; + if (!status.ok()) { + lease_resp.set_error_code(status.error_code()); + lease_resp.set_error_message(status.error_message()); + } else { + lease_resp.ParseResponse(reply); + } + return lease_resp; +} + +etcdv3::AsyncLeaseRevokeAction::AsyncLeaseRevokeAction(etcdv3::ActionParameters param) + : etcdv3::Action(param) +{ + LeaseRevokeRequest leaserevoke_request; + leaserevoke_request.set_id(parameters.lease_id); + + response_reader = parameters.lease_stub->AsyncLeaseRevoke(&context, leaserevoke_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncLeaseRevokeResponse etcdv3::AsyncLeaseRevokeAction::ParseResponse() +{ + AsyncLeaseRevokeResponse lease_resp; + if (!status.ok()) { + lease_resp.set_error_code(status.error_code()); + lease_resp.set_error_message(status.error_message()); + } else { + lease_resp.ParseResponse(reply); + } + return lease_resp; +} + +etcdv3::AsyncLeaseKeepAliveAction::AsyncLeaseKeepAliveAction(etcdv3::ActionParameters param) + : etcdv3::Action(param) +{ + isCancelled = false; + stream = parameters.lease_stub->AsyncLeaseKeepAlive(&context, &cq_, (void*)"keepalive create"); + + void *got_tag = nullptr; + bool ok = false; + if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)"keepalive create") { + // ok + } else { + throw std::runtime_error("Failed to create a lease keep-alive connection"); + } +} + +etcdv3::AsyncLeaseKeepAliveResponse etcdv3::AsyncLeaseKeepAliveAction::ParseResponse() +{ + AsyncLeaseKeepAliveResponse lease_resp; + if (!status.ok()) { + lease_resp.set_error_code(status.error_code()); + lease_resp.set_error_message(status.error_message()); + } else { + lease_resp.ParseResponse(reply); + } + return lease_resp; +} + +etcdv3::AsyncLeaseKeepAliveResponse etcdv3::AsyncLeaseKeepAliveAction::Refresh() +{ + std::lock_guard scope_lock(this->protect_is_cancelled); + + if (isCancelled) { + return ParseResponse(); + } + + LeaseKeepAliveRequest leasekeepalive_request; + leasekeepalive_request.set_id(parameters.lease_id); + + void *got_tag = nullptr; + bool ok = false; + + stream->Write(leasekeepalive_request, (void *)"keepalive write"); + // wait write finish + if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)"keepalive write") { + stream->Read(&reply, (void*)"keepalive read"); + // wait read finish + if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)"keepalive read") { + return ParseResponse(); + } + } + throw std::runtime_error("Failed to create a lease keep-alive connection"); +} + +void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive() +{ + std::lock_guard scope_lock(this->protect_is_cancelled); + if(isCancelled == false) + { + isCancelled = true; + stream->WritesDone((void*)"keepalive done"); + grpc::Status status; + stream->Finish(&status, (void *)this); + cq_.Shutdown(); + } +} + +bool etcdv3::AsyncLeaseKeepAliveAction::Cancelled() const +{ + return isCancelled; +} + +etcdv3::AsyncLeaseTimeToLiveAction::AsyncLeaseTimeToLiveAction(etcdv3::ActionParameters param) + : etcdv3::Action(param) +{ + LeaseTimeToLiveRequest leasetimetolive_request; + leasetimetolive_request.set_id(parameters.lease_id); + // FIXME: unsupported parameters: "keys" + // leasetimetolive_request.set_keys(parameters.keys); + + response_reader = parameters.lease_stub->AsyncLeaseTimeToLive(&context, leasetimetolive_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncLeaseTimeToLiveResponse etcdv3::AsyncLeaseTimeToLiveAction::ParseResponse() +{ + AsyncLeaseTimeToLiveResponse lease_resp; + if (!status.ok()) { + lease_resp.set_error_code(status.error_code()); + lease_resp.set_error_message(status.error_message()); + } else { + lease_resp.ParseResponse(reply); + } + return lease_resp; +} + +etcdv3::AsyncLeaseLeasesAction::AsyncLeaseLeasesAction(etcdv3::ActionParameters param) + : etcdv3::Action(param) +{ + LeaseLeasesRequest leaseleases_request; + + response_reader = parameters.lease_stub->AsyncLeaseLeases(&context, leaseleases_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncLeaseLeasesResponse etcdv3::AsyncLeaseLeasesAction::ParseResponse() +{ + AsyncLeaseLeasesResponse lease_resp; + if (!status.ok()) { + lease_resp.set_error_code(status.error_code()); + lease_resp.set_error_message(status.error_message()); + } else { + lease_resp.ParseResponse(reply); + } + return lease_resp; +} diff --git a/src/v3/AsyncLeaseGrantAction.cpp b/src/v3/AsyncLeaseGrantAction.cpp deleted file mode 100644 index 5b620b3..0000000 --- a/src/v3/AsyncLeaseGrantAction.cpp +++ /dev/null @@ -1,32 +0,0 @@ -#include "etcd/v3/AsyncLeaseGrantAction.hpp" -#include "etcd/v3/action_constants.hpp" -#include "etcd/v3/Transaction.hpp" - -using etcdserverpb::LeaseGrantRequest; - -etcdv3::AsyncLeaseGrantAction::AsyncLeaseGrantAction(etcdv3::ActionParameters param) - : etcdv3::Action(param) -{ - etcdv3::Transaction transaction; - transaction.setup_lease_grant_operation(parameters.ttl); - - response_reader = parameters.lease_stub->AsyncLeaseGrant(&context, transaction.leasegrant_request, &cq_); - response_reader->Finish(&reply, &status, (void*)this); - -} - - -etcdv3::AsyncLeaseGrantResponse etcdv3::AsyncLeaseGrantAction::ParseResponse() -{ - AsyncLeaseGrantResponse lease_resp; - if(!status.ok()) - { - lease_resp.set_error_code(status.error_code()); - lease_resp.set_error_message(status.error_message()); - } - else - { - lease_resp.ParseResponse(reply); - } - return lease_resp; -} diff --git a/src/v3/AsyncLeaseGrantResponse.cpp b/src/v3/AsyncLeaseGrantResponse.cpp deleted file mode 100644 index b5b6a1d..0000000 --- a/src/v3/AsyncLeaseGrantResponse.cpp +++ /dev/null @@ -1,11 +0,0 @@ -#include "etcd/v3/AsyncLeaseGrantResponse.hpp" -#include "etcd/v3/action_constants.hpp" - - -void etcdv3::AsyncLeaseGrantResponse::ParseResponse(LeaseGrantResponse& resp) -{ - index = resp.header().revision(); - value.kvs.set_lease(resp.id()); - value.set_ttl(resp.ttl()); - error_message = resp.error(); -} diff --git a/src/v3/AsyncLeaseResponse.cpp b/src/v3/AsyncLeaseResponse.cpp new file mode 100644 index 0000000..88436d3 --- /dev/null +++ b/src/v3/AsyncLeaseResponse.cpp @@ -0,0 +1,35 @@ +#include "etcd/v3/AsyncLeaseResponse.hpp" +#include "etcd/v3/action_constants.hpp" + + +void etcdv3::AsyncLeaseGrantResponse::ParseResponse(LeaseGrantResponse& resp) { + index = resp.header().revision(); + value.kvs.set_lease(resp.id()); + value.set_ttl(resp.ttl()); + error_message = resp.error(); +} + +void etcdv3::AsyncLeaseRevokeResponse::ParseResponse(LeaseRevokeResponse& resp) { + index = resp.header().revision(); +} + +void etcdv3::AsyncLeaseKeepAliveResponse::ParseResponse(LeaseKeepAliveResponse& resp) { + index = resp.header().revision(); + value.kvs.set_lease(resp.id()); + value.set_ttl(resp.ttl()); +} + +void etcdv3::AsyncLeaseTimeToLiveResponse::ParseResponse(LeaseTimeToLiveResponse& resp) { + index = resp.header().revision(); + value.kvs.set_lease(resp.id()); + value.set_ttl(resp.ttl()); + // FIXME: unsupported: fields "grantedTTL" and "keys" +} + +void etcdv3::AsyncLeaseLeasesResponse::ParseResponse(LeaseLeasesResponse& resp) { + index = resp.header().revision(); + // FIXME: only the first leases is recorded. + if (resp.leases_size() > 0) { + value.kvs.set_lease(resp.leases(0).id()); + } +} diff --git a/src/v3/AsyncLockAction.cpp b/src/v3/AsyncLockAction.cpp index 49d50b9..5d29604 100644 --- a/src/v3/AsyncLockAction.cpp +++ b/src/v3/AsyncLockAction.cpp @@ -9,6 +9,7 @@ etcdv3::AsyncLockAction::AsyncLockAction(ActionParameters param) { LockRequest lock_request; lock_request.set_name(parameters.key); + lock_request.set_lease(parameters.lease_id); response_reader = parameters.lock_stub->AsyncLock(&context, lock_request, &cq_); response_reader->Finish(&reply, &status, (void*)this); diff --git a/src/v3/AsyncWatchAction.cpp b/src/v3/AsyncWatchAction.cpp index bf5b51b..f4e2ae5 100644 --- a/src/v3/AsyncWatchAction.cpp +++ b/src/v3/AsyncWatchAction.cpp @@ -69,7 +69,11 @@ void etcdv3::AsyncWatchAction::waitForResponse() // // 1. watch for a future revision, return immediately with empty events set // 2. receive any effective events. + isCancelled = true; stream->WritesDone((void*)"writes done"); + grpc::Status status; + stream->Finish(&status, (void *)this); + cq_.Shutdown(); // leave a warning if the response is too large and been fragmented if (reply.fragment()) { @@ -90,9 +94,12 @@ void etcdv3::AsyncWatchAction::CancelWatch() std::lock_guard scope_lock(this->protect_is_cancalled); if(isCancelled == false) { + isCancelled = true; stream->WritesDone((void*)"writes done"); + grpc::Status status; + stream->Finish(&status, (void *)this); + cq_.Shutdown(); } - isCancelled = true; } bool etcdv3::AsyncWatchAction::Cancelled() const { diff --git a/src/v3/Transaction.cpp b/src/v3/Transaction.cpp index d1b32db..a32afd4 100644 --- a/src/v3/Transaction.cpp +++ b/src/v3/Transaction.cpp @@ -155,11 +155,6 @@ void etcdv3::Transaction::setup_compare_and_delete_operation(std::string const& req_success->set_allocated_request_delete_range(del_request.release()); } -void etcdv3::Transaction::setup_lease_grant_operation(int ttl) -{ - leasegrant_request.set_ttl(ttl); -} - void etcdv3::Transaction::setup_put(std::string const &key, std::string const &value) { std::unique_ptr put_request(new PutRequest()); put_request->set_key(key); diff --git a/tst/LockTest.cpp b/tst/LockTest.cpp index e31c739..6060517 100644 --- a/tst/LockTest.cpp +++ b/tst/LockTest.cpp @@ -60,7 +60,8 @@ TEST_CASE("double lock will fail") // create a duration first_lock_release = true; - std::this_thread::sleep_for(std::chrono::seconds(1)); + // using a duration longer than default lease TTL for lock (see: DEFAULT_LEASE_TTL_FOR_LOCK) + std::this_thread::sleep_for(std::chrono::seconds(15)); // unlock the first lock etcd::Response resp4 = etcd.unlock(lock_key).get();