From 39d021d381ba42fce9e0a033e68b7c14bedfc7a5 Mon Sep 17 00:00:00 2001 From: Eric Musgrave Date: Thu, 3 Dec 2020 09:15:45 -0500 Subject: [PATCH] Add all missing Lease functionality --- README.md | 104 +++++- etcd/Client.hpp | 434 +++++++++++++---------- etcd/KeepAlive.hpp | 110 ++++++ etcd/Response.hpp | 15 + etcd/Value.hpp | 3 + etcd/v3/Action.hpp | 1 + etcd/v3/AsyncLeaseKeepAliveAction.hpp | 32 ++ etcd/v3/AsyncLeaseKeepAliveResponse.hpp | 23 ++ etcd/v3/AsyncLeaseLeasesAction.hpp | 26 ++ etcd/v3/AsyncLeaseLeasesResponse.hpp | 19 + etcd/v3/AsyncLeaseRevokeAction.hpp | 27 ++ etcd/v3/AsyncLeaseRevokeResponse.hpp | 20 ++ etcd/v3/AsyncLeaseTimeToLiveAction.hpp | 27 ++ etcd/v3/AsyncLeaseTimeToLiveResponse.hpp | 20 ++ etcd/v3/LeaseInfo.hpp | 27 ++ etcd/v3/Transaction.hpp | 8 + etcd/v3/V3Response.hpp | 6 + src/Client.cpp | 417 ++++++++++++---------- src/KeepAlive.cpp | 127 +++++++ src/Response.cpp | 7 + src/v3/AsyncLeaseKeepAliveAction.cpp | 69 ++++ src/v3/AsyncLeaseKeepAliveResponse.cpp | 13 + src/v3/AsyncLeaseLeasesAction.cpp | 26 ++ src/v3/AsyncLeaseLeasesResponse.cpp | 11 + src/v3/AsyncLeaseRevokeAction.cpp | 28 ++ src/v3/AsyncLeaseRevokeResponse.cpp | 7 + src/v3/AsyncLeaseTimeToLiveAction.cpp | 29 ++ src/v3/AsyncLeaseTimeToLiveResponse.cpp | 15 + src/v3/LeaseInfo.cpp | 18 + src/v3/Transaction.cpp | 18 + src/v3/V3Response.cpp | 12 + 31 files changed, 1307 insertions(+), 392 deletions(-) create mode 100644 etcd/KeepAlive.hpp create mode 100644 etcd/v3/AsyncLeaseKeepAliveAction.hpp create mode 100644 etcd/v3/AsyncLeaseKeepAliveResponse.hpp create mode 100644 etcd/v3/AsyncLeaseLeasesAction.hpp create mode 100644 etcd/v3/AsyncLeaseLeasesResponse.hpp create mode 100644 etcd/v3/AsyncLeaseRevokeAction.hpp create mode 100644 etcd/v3/AsyncLeaseRevokeResponse.hpp create mode 100644 etcd/v3/AsyncLeaseTimeToLiveAction.hpp create mode 100644 etcd/v3/AsyncLeaseTimeToLiveResponse.hpp create mode 100644 etcd/v3/LeaseInfo.hpp create mode 100644 src/KeepAlive.cpp create mode 100644 src/v3/AsyncLeaseKeepAliveAction.cpp create mode 100644 src/v3/AsyncLeaseKeepAliveResponse.cpp create mode 100644 src/v3/AsyncLeaseLeasesAction.cpp create mode 100644 src/v3/AsyncLeaseLeasesResponse.cpp create mode 100644 src/v3/AsyncLeaseRevokeAction.cpp create mode 100644 src/v3/AsyncLeaseRevokeResponse.cpp create mode 100644 src/v3/AsyncLeaseTimeToLiveAction.cpp create mode 100644 src/v3/AsyncLeaseTimeToLiveResponse.cpp create mode 100644 src/v3/LeaseInfo.cpp diff --git a/README.md b/README.md index 69acd05..2a8a45e 100644 --- a/README.md +++ b/README.md @@ -343,21 +343,6 @@ fact it just sends the asynchron request, sets up a callback for the response an callback is executed by some thread from the pplx library's thread pool and the callback (in this case a small lambda function actually) will call ```watch_for_changes``` again from there. - -### Requesting for lease - -Users can request for lease which is governed by a time-to-live(TTL) value given by the user. -Moreover, user can attached the lease to a key(s) by indicating the lease id in ```add()```, -```set()```, ```modify()``` and ```modify_if()```. Also the ttl will that was granted by etcd -server will be indicated in ```ttl()```. - -```c++ - etcd::Client etcd("http://127.0.0.1:4001"); - etcd::Response resp = etcd.leasegrant(60).get(); - etcd.set("/test/key2", "bar", resp.value().lease()); - std::cout <<"ttl" << resp.value().ttl(); -``` - ### Watcher Class Users can watch a key indefinitely or until user cancels the watch. This can be done by @@ -375,11 +360,96 @@ either by user implicitly calling ```Cancel()``` or when watcher class is destro } ``` + +### Leases +#### Create a lease +Users can request a lease which is governed by a time-to-live(TTL) value given by the user. +Moreover, user can attached the lease to a key(s) by indicating the lease id in ```add()```, +```set()```, ```modify()``` and ```modify_if()```. Also the ttl will that was granted by etcd +server will be indicated in ```ttl()```. + +```c++ + etcd::Client etcd("http://127.0.0.1:4001"); + etcd::Response resp = etcd.leasegrant(60).get(); + etcd.set("/test/key2", "bar", resp.value().lease()); + std::cout <<"ttl" << resp.value().ttl(); +``` + +When the lease is revoked or expires, any key attached to the lease will be removed from +the etcd database. +#### Revoke a lease +To revoke a lease, call `leaserevoke(lease_id)`. +```c++ + etcd::Client etcd("http://127.0.0.1:4001"); + etcd::Response resp = etcd.leasegrant(60).get(); + etcd.set("/test/key2", "bar", resp.value().lease()); + std::cout <<"ttl" << resp.value().ttl(); + etcd.leaserevoke(resp.value().lease()); // revokes the lease and deletes '/test/key2' +``` +#### Lease TTL +To get information on an existing lease, you can call `leasettl(lease_id, keys)`. +The `keys` parameter specifies whether or not to return a list of the keys that the +lease is attached to. +```c++ + etcd::Client etcd("http://127.0.0.1:4001"); + etcd::Response resp = etcd.leasegrant(60).get(); + etcd.set("/test/key2", "bar", resp.value().lease()); + std::cout <<"ttl" << resp.value().ttl() << std::endl; + etcd::Response leaseinfo_resp = etcd.leasettl(resp.value().lease(), true); + std::cout << "Lease Info: " << std::endl; + std::cout << " Granted TTL: " << leaseinfo_resp.leaseinfo().get_grantedttl() << std::endl; + std::cout << " Current TTL: " << leaseinfo_resp.leaseinfo().get_ttl() << std::endl; + std::cout << " Keys: " << std::endl; + for (auto k : leaseinfo_resp.leaseinfo().get_keys()) { + LOG(INFO) << " " << k << std::endl; + } +``` +#### List all Leases +To get a list of all existing leases, call `listleases()`. +```c++ + etcd::Client etcd("http://127.0.0.1:4001"); + etcd::Response resp = etcd.leasegrant(60).get(); + etcd.set("/test/key2", "bar", resp.value().lease()); + std::cout <<"ttl" << resp.value().ttl() << std::endl; + etcd::Response leases_resp = etcd.listleases().get(); + for (auto l : leases_resp.leases()) { + LOG(INFO) << "Lease: " << std::hex << l; + } +``` + +#### Lease KeepAlive +To keep a lease alive so that it does not expire and delete the attached keys, you must +send a periodic refresh to the server. This can be done using a one time command +called `leasekeepalive(lease_id)`. However, that is inefficient since it creates and +destroys a bidirectional stream on each call. Instead, it is recommend to use the +`etcd::KeepAlive` service, which is a long running task that will periodically +refresh the leases that you specify. You can create multiple `etcd::KeepAlive` instances +if you need to have different refresh intervals for different leases. + +```c++ + etcd::Client etcd("http://127.0.0.1:4001"); + etcd::KeepAlive keepalive{etcd}; // Creates the KeepAlive service, but does not start it + keepalive.start(2000); // Starts the service with a 2s refresh interval. + + etcd::Response resp = etcd.leasegrant(5).get(); // create a lease + int64_t lease_id = resp.value().lease(); + etcd.set("/test/key2", "bar", lease_id); // attach lease to the key + std::cout << "ttl" << resp.value().ttl() << std::endl; + + // Tell the KeepAlive service to refresh this lease every 2 seconds + keepalive.add(resp.value().lease()); + + // ... + + // Remove the lease from the KeepAlive, and also immediately revoke it + keepalive.remove(lease_id, true); + // '/test/key2' is now deleted +``` + ### TODO 1. Cancellation of asynchronous calls(except for watch) -2. LeaseKeepAlive -3. Authentication +2. Authentication ## License diff --git a/etcd/Client.hpp b/etcd/Client.hpp index 1245822..2aa8816 100644 --- a/etcd/Client.hpp +++ b/etcd/Client.hpp @@ -5,246 +5,288 @@ #include -#include #include "proto/rpc.grpc.pb.h" #include "proto/v3lock.grpc.pb.h" +#include using etcdserverpb::Auth; using etcdserverpb::KV; -using etcdserverpb::Watch; using etcdserverpb::Lease; +using etcdserverpb::Watch; using v3lockpb::Lock; namespace etcdv3 { - class Transaction; +class Transaction; } -namespace etcd -{ - class Watcher; +namespace etcd { +class Watcher; + +/** + * Client is responsible for maintaining a connection towards an etcd server. + * Etcd operations can be reached via the methods of the client. + */ +class Client { +public: + /** + * Constructs an etcd client object. + * + * @param etcd_url is the url of the etcd server to connect to, like + * "http://127.0.0.1:2379", or multiple url, seperated by ',' or ';'. + * @param load_balancer is the load balance strategy, can be one of + * round_robin/pick_first/grpclb/xds. + */ + Client(std::string const &etcd_url, + std::string const &load_balancer = "round_robin"); /** - * Client is responsible for maintaining a connection towards an etcd server. - * Etcd operations can be reached via the methods of the client. + * Constructs an etcd client object. + * + * @param etcd_url is the url of the etcd server to connect to, like + * "http://127.0.0.1:2379", or multiple url, seperated by ',' or ';'. + * @param username username of etcd auth + * @param password password of etcd auth + * @param load_balancer is the load balance strategy, can be one of + * round_robin/pick_first/grpclb/xds. */ - class Client - { - public: - /** - * Constructs an etcd client object. - * - * @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379", - * or multiple url, seperated by ',' or ';'. - * @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds. - */ - Client(std::string const & etcd_url, - std::string const & load_balancer = "round_robin"); + Client(std::string const &etcd_url, std::string const &username, + std::string const &password, + std::string const &load_balancer = "round_robin"); - /** - * Constructs an etcd client object. - * - * @param etcd_url is the url of the etcd server to connect to, like "http://127.0.0.1:2379", - * or multiple url, seperated by ',' or ';'. - * @param username username of etcd auth - * @param password password of etcd auth - * @param load_balancer is the load balance strategy, can be one of round_robin/pick_first/grpclb/xds. - */ - Client(std::string const & etcd_url, - std::string const & username, - std::string const & password, - std::string const & load_balancer = "round_robin"); + /** + * Sends a get request to the etcd server + * @param key is the key to be read + */ + pplx::task get(std::string const &key); - /** - * Sends a get request to the etcd server - * @param key is the key to be read - */ - pplx::task get(std::string const & key); + /** + * Sets the value of a key. The key will be modified if already exists or + * created if it does not exists. + * @param key is the key to be created or modified + * @param value is the new value to be set + */ + pplx::task set(std::string const &key, std::string const &value, + int ttl = 0); - /** - * Sets the value of a key. The key will be modified if already exists or created - * if it does not exists. - * @param key is the key to be created or modified - * @param value is the new value to be set - */ - pplx::task set(std::string const & key, std::string const & value, int ttl = 0); + /** + * Sets the value of a key. The key will be modified if already exists or + * created if it does not exists. + * @param key is the key to be created or modified + * @param value is the new value to be set + * @param leaseId is the lease attached to the key + */ + pplx::task set(std::string const &key, std::string const &value, + int64_t leaseId); - /** - * Sets the value of a key. The key will be modified if already exists or created - * if it does not exists. - * @param key is the key to be created or modified - * @param value is the new value to be set - * @param leaseId is the lease attached to the key - */ - pplx::task set(std::string const & key, std::string const & value, int64_t leaseId); + /** + * Creates a new key and sets it's value. Fails if the key already exists. + * @param key is the key to be created + * @param value is the value to be set + */ + pplx::task add(std::string const &key, std::string const &value, + int ttl = 0); + /** + * Creates a new key and sets it's value. Fails if the key already exists. + * @param key is the key to be created + * @param value is the value to be set + * @param leaseId is the lease attached to the key + */ + pplx::task add(std::string const &key, std::string const &value, + int64_t leaseId); - /** - * Creates a new key and sets it's value. Fails if the key already exists. - * @param key is the key to be created - * @param value is the value to be set - */ - pplx::task add(std::string const & key, std::string const & value, int ttl = 0); + /** + * Modifies an existing key. Fails if the key does not exists. + * @param key is the key to be modified + * @param value is the new value to be set + */ + pplx::task modify(std::string const &key, std::string const &value, + int ttl = 0); - /** - * Creates a new key and sets it's value. Fails if the key already exists. - * @param key is the key to be created - * @param value is the value to be set - * @param leaseId is the lease attached to the key - */ - pplx::task add(std::string const & key, std::string const & value, int64_t leaseId); + /** + * Modifies an existing key. Fails if the key does not exists. + * @param key is the key to be modified + * @param value is the new value to be set + * @param leaseId is the lease attached to the key + */ + pplx::task modify(std::string const &key, std::string const &value, + int64_t leaseId); - /** - * Modifies an existing key. Fails if the key does not exists. - * @param key is the key to be modified - * @param value is the new value to be set - */ - pplx::task modify(std::string const & key, std::string const & value, int ttl = 0); + /** + * Modifies an existing key only if it has a specific value. Fails if the key + * does not exists or the original value differs from the expected one. + * @param key is the key to be modified + * @param value is the new value to be set + * @param old_value is the value to be replaced + */ + pplx::task modify_if(std::string const &key, + std::string const &value, + std::string const &old_value, int ttl = 0); - /** - * Modifies an existing key. Fails if the key does not exists. - * @param key is the key to be modified - * @param value is the new value to be set - * @param leaseId is the lease attached to the key - */ - pplx::task modify(std::string const & key, std::string const & value, int64_t leaseId); + /** + * Modifies an existing key only if it has a specific value. Fails if the key + * does not exists or the original value differs from the expected one. + * @param key is the key to be modified + * @param value is the new value to be set + * @param old_value is the value to be replaced + * @param leaseId is the lease attached to the key + */ + pplx::task modify_if(std::string const &key, + std::string const &value, + std::string const &old_value, int64_t leaseId); - /** - * Modifies an existing key only if it has a specific value. Fails if the key does not exists - * or the original value differs from the expected one. - * @param key is the key to be modified - * @param value is the new value to be set - * @param old_value is the value to be replaced - */ - pplx::task modify_if(std::string const & key, std::string const & value, std::string const & old_value, int ttl = 0); + /** + * Modifies an existing key only if it has a specific modification index + * value. Fails if the key does not exists or the modification index of the + * previous value differs from the expected one. + * @param key is the key to be modified + * @param value is the new value to be set + * @param old_index is the expected index of the original value + */ + pplx::task modify_if(std::string const &key, + std::string const &value, int old_index, + int ttl = 0); - /** - * Modifies an existing key only if it has a specific value. Fails if the key does not exists - * or the original value differs from the expected one. - * @param key is the key to be modified - * @param value is the new value to be set - * @param old_value is the value to be replaced - * @param leaseId is the lease attached to the key - */ - pplx::task modify_if(std::string const & key, std::string const & value, std::string const & old_value, int64_t leaseId); + /** + * Modifies an existing key only if it has a specific modification index + * value. Fails if the key does not exists or the modification index of the + * previous value differs from the expected one. + * @param key is the key to be modified + * @param value is the new value to be set + * @param old_index is the expected index of the original value + * @param leaseId is the lease attached to the key + */ + pplx::task modify_if(std::string const &key, + std::string const &value, int old_index, + int64_t leaseId); - /** - * Modifies an existing key only if it has a specific modification index value. Fails if the key - * does not exists or the modification index of the previous value differs from the expected one. - * @param key is the key to be modified - * @param value is the new value to be set - * @param old_index is the expected index of the original value - */ - pplx::task modify_if(std::string const & key, std::string const & value, int old_index, int ttl = 0); + /** + * Removes a single key. The key has to point to a plain, non directory entry. + * @param key is the key to be deleted + */ + pplx::task rm(std::string const &key); - /** - * Modifies an existing key only if it has a specific modification index value. Fails if the key - * does not exists or the modification index of the previous value differs from the expected one. - * @param key is the key to be modified - * @param value is the new value to be set - * @param old_index is the expected index of the original value - * @param leaseId is the lease attached to the key - */ - pplx::task modify_if(std::string const & key, std::string const & value, int old_index, int64_t leaseId); + /** + * Removes a single key but only if it has a specific value. Fails if the key + * does not exists or the its value differs from the expected one. + * @param key is the key to be deleted + */ + pplx::task rm_if(std::string const &key, + std::string const &old_value); - /** - * Removes a single key. The key has to point to a plain, non directory entry. - * @param key is the key to be deleted - */ - pplx::task rm(std::string const & key); + /** + * Removes an existing key only if it has a specific modification index value. + * Fails if the key does not exists or the modification index of it differs + * from the expected one. + * @param key is the key to be deleted + * @param old_index is the expected index of the existing value + */ + pplx::task rm_if(std::string const &key, int old_index); - /** - * Removes a single key but only if it has a specific value. Fails if the key does not exists - * or the its value differs from the expected one. - * @param key is the key to be deleted - */ - pplx::task rm_if(std::string const & key, std::string const & old_value); + /** + * Gets a directory listing of the directory identified by the key. + * @param key is the key to be listed + */ + pplx::task ls(std::string const &key); - /** - * Removes an existing key only if it has a specific modification index value. Fails if the key - * does not exists or the modification index of it differs from the expected one. - * @param key is the key to be deleted - * @param old_index is the expected index of the existing value - */ - pplx::task rm_if(std::string const & key, int old_index); + /** + * Gets a directory listing of the directory identified by the key. + * @param key is the key to be listed + * @param limit is the size limit of results to be listed, we don't use + * default parameters to ensure backwards binary compatibility. + */ + pplx::task ls(std::string const &key, size_t const limit); - /** - * Gets a directory listing of the directory identified by the key. - * @param key is the key to be listed - */ - pplx::task ls(std::string const & key); + /** + * Removes a directory node. Fails if the parent directory dos not exists or + * not a directory. + * @param key is the directory to be created to be listed + * @param recursive if true then delete a whole subtree, otherwise deletes + * only an empty directory. + */ + pplx::task rmdir(std::string const &key, bool recursive = false); + /** + * Watches for changes of a key or a subtree. Please note that if you watch + * e.g. "/testdir" and a new key is created, like "/testdir/newkey" then no + * change happened in the value of + * "/testdir" so your watch will not detect this. If you want to detect + * addition and deletion of directory entries then you have to do a recursive + * watch. + * @param key is the value or directory to be watched + * @param recursive if true watch a whole subtree + */ + pplx::task watch(std::string const &key, bool recursive = false); - /** - * Gets a directory listing of the directory identified by the key. - * @param key is the key to be listed - * @param limit is the size limit of results to be listed, we don't use default parameters - * to ensure backwards binary compatibility. - */ - pplx::task ls(std::string const & key, size_t const limit); + /** + * Watches for changes of a key or a subtree from a specific index. The index + * value can be in the "past". + * @param key is the value or directory to be watched + * @param fromIndex the first index we are interested in + * @param recursive if true watch a whole subtree + */ + pplx::task watch(std::string const &key, int fromIndex, + bool recursive = false); + /** + * Grants a lease. + * @param ttl is the time to live of the lease (in seconds) + */ + pplx::task leasegrant(int ttl); - /** - * Removes a directory node. Fails if the parent directory dos not exists or not a directory. - * @param key is the directory to be created to be listed - * @param recursive if true then delete a whole subtree, otherwise deletes only an empty directory. - */ - pplx::task rmdir(std::string const & key, bool recursive = false); + /** + * Gets the list of existing leases + */ + pplx::task listleases(); - /** - * Watches for changes of a key or a subtree. Please note that if you watch e.g. "/testdir" and - * a new key is created, like "/testdir/newkey" then no change happened in the value of - * "/testdir" so your watch will not detect this. If you want to detect addition and deletion of - * directory entries then you have to do a recursive watch. - * @param key is the value or directory to be watched - * @param recursive if true watch a whole subtree - */ - pplx::task watch(std::string const & key, bool recursive = false); + /** + * Renews a lease. + * @param lease_id is the id of of the lease + */ + pplx::task leasekeepalive(int64_t lease_id); - /** - * Watches for changes of a key or a subtree from a specific index. The index value can be in the "past". - * @param key is the value or directory to be watched - * @param fromIndex the first index we are interested in - * @param recursive if true watch a whole subtree - */ - pplx::task watch(std::string const & key, int fromIndex, bool recursive = false); + /** + * Revokes a lease. + * @param lease_id is the id of of the lease + */ + pplx::task leaserevoke(int64_t lease_id); - /** - * Grants a lease. - * @param ttl is the time to live of the lease - */ - pplx::task leasegrant(int ttl); + /** + * Gets the TTL of an existing lease + * @param lease_id is the id of of the lease + */ + pplx::task leasettl(int64_t lease_id, bool keys); - /** - * Gains a lock at a key. - * @param key is the key to be used to request the lock. - */ - pplx::task lock(std::string const &key); + /** + * Gains a lock at a key. + * @param key is the key to be used to request the lock. + */ + pplx::task lock(std::string const &key); - /** - * Releases a lock at a key. - * @param key is the lock key to release. - */ - pplx::task unlock(std::string const &key); + /** + * Releases a lock at a key. + * @param key is the lock key to release. + */ + pplx::task unlock(std::string const &key); - /** - * Execute a etcd transaction. - * @param txn is the transaction object to be executed. - */ - pplx::task txn(etcdv3::Transaction const &txn); + /** + * Execute a etcd transaction. + * @param txn is the transaction object to be executed. + */ + pplx::task txn(etcdv3::Transaction const &txn); - private: - std::shared_ptr channel; - std::string auth_token; - std::unique_ptr kvServiceStub; - std::unique_ptr watchServiceStub; - std::unique_ptr leaseServiceStub; - std::unique_ptr lockServiceStub; +private: + std::shared_ptr channel; + std::string auth_token; + std::unique_ptr kvServiceStub; + std::unique_ptr watchServiceStub; + std::unique_ptr leaseServiceStub; + std::unique_ptr lockServiceStub; - friend class Watcher; + friend class Watcher; + friend class KeepAlive; }; - - -} +} // namespace etcd #endif diff --git a/etcd/KeepAlive.hpp b/etcd/KeepAlive.hpp new file mode 100644 index 0000000..e3daf5f --- /dev/null +++ b/etcd/KeepAlive.hpp @@ -0,0 +1,110 @@ +#pragma once + +#include +#include +#include + +#include + +#include "etcd/Client.hpp" +#include "etcd/Response.hpp" +#include "proto/rpc.grpc.pb.h" + +namespace etcdv3 { +class AsyncKeepAliveAction; +} + +using etcdserverpb::KV; +using grpc::Channel; + +namespace etcd { +class KeepAlive { + enum class Type { READ = 1, WRITE = 2, CONNECT = 3, WRITES_DONE = 4, FINISH = 5 }; + + public: + /** + * Create an instance of the KeepAlive service + * Call start() to being the timer that automatically sends keepalives + * Call add(leaseid) to add a leaseid to be kept alive by this service + * @param client the client etcd connection to use + */ + KeepAlive(Client& client); + + /** + * Start processing keepalive's + * The refresh time must be less than the granted TTL of your + * leases, otherwise they will expire. Create multiple KeepAlive + * services with different refreshes if you have many leases with varied + * TTL's + * @param refresh_in_ms the time between refreshes (default: 5000ms) + */ + pplx::task start(int refresh_in_ms = 5000); + + /** + * Add a lease to be kept alive by this service + * @param leaseid the id of the lease + * @param ttl the ttl of the lease (in seconds) + */ + void add(int64_t leaseid, int ttl = 5); + + /** + * Remove a lease from being kept alive, and optionally revoke it immediately + * @param leaseid the id of the lease + * @param revoke true to immediatley revoke the lease (defaults to false) + */ + void remove(int64_t leaseid, bool revoke = false); + + ~KeepAlive(); + + private: + /** + * Sends a keep alive for the next lease in the queue + */ + void sendNextKeepAlive(); + /** + * + */ + void readNextMessage(); + + // The timer that is triggering refreshes + std::unique_ptr> timer_; + + // The map of leases that have been registered with this service to be kept alive + pplx::concurrent_unordered_map leases_; + + // The current queue of leases that still need to be refreshed on this pass of the timer + pplx::concurrent_queue> leaseQueue_; + + // The long running task for this service + pplx::task currentTask_; + + // The client that we are attached to. + Client& client_; + + // Context for the client. It could be used to convey extra information to + // the server and/or tweak certain RPC behaviors. + grpc::ClientContext context_; + + // The producer-consumer queue we use to communicate asynchronously with the + // gRPC runtime. + grpc::CompletionQueue cq_; + + // Out of the passed in Channel comes the stub, stored here, our view of the + // server's exposed services. + std::unique_ptr stub_; + + // The bidirectional, asynchronous stream for sending/receiving messages. + std::unique_ptr< + grpc::ClientAsyncReaderWriter> + stream_; + + // Allocated protobuf that holds the response. In real clients and servers, + // the memory management would a bit more complex as the thread that fills + // in the response should take care of concurrency as well as memory + // management. + etcdserverpb::LeaseKeepAliveResponse response_; + + // Finish status when the client is done with the stream. + grpc::Status finish_status_ = grpc::Status::OK; +}; +} // namespace etcd diff --git a/etcd/Response.hpp b/etcd/Response.hpp index bc25623..2d4bde1 100644 --- a/etcd/Response.hpp +++ b/etcd/Response.hpp @@ -5,6 +5,7 @@ #include #include "etcd/Value.hpp" +#include "etcd/v3/LeaseInfo.hpp" #include #include "proto/kv.pb.h" @@ -12,6 +13,7 @@ namespace etcdv3 { class AsyncWatchAction; + class AsyncLeaseKeepAliveAction; class V3Response; } @@ -122,6 +124,16 @@ namespace etcd */ std::chrono::microseconds const & duration() const; + /** + * Returns the list of leases + */ + std::vector const& leases() const; + + /** + * Returns detailed info for the lease + */ + etcdv3::LeaseInfo const& leaseinfo() const; + protected: Response(const etcdv3::V3Response& response, std::chrono::microseconds const& duration); Response(int error_code, char const * error_message); @@ -133,12 +145,15 @@ namespace etcd Value _value; Value _prev_value; Values _values; + std::vector _leases; + etcdv3::LeaseInfo _leaseinfo; Keys _keys; std::string _lock_key; // for lock std::vector _events; // for watch std::chrono::microseconds _duration; // execute duration (in microseconds), during the action created and response parsed friend class SyncClient; friend class etcdv3::AsyncWatchAction; + friend class etcdv3::AsyncLeaseKeepAliveAction; friend class Client; }; } diff --git a/etcd/Value.hpp b/etcd/Value.hpp index 0d0eacc..1f3deb1 100644 --- a/etcd/Value.hpp +++ b/etcd/Value.hpp @@ -48,6 +48,9 @@ namespace etcd */ int ttl() const; + /** + * Returns the id of the lease + */ int64_t lease() const; protected: diff --git a/etcd/v3/Action.hpp b/etcd/v3/Action.hpp index 3c8f8f2..7f0c8fe 100644 --- a/etcd/v3/Action.hpp +++ b/etcd/v3/Action.hpp @@ -31,6 +31,7 @@ namespace etcdv3 int revision; int old_revision; int64_t lease_id; + bool keys; int ttl; int limit; std::string key; diff --git a/etcd/v3/AsyncLeaseKeepAliveAction.hpp b/etcd/v3/AsyncLeaseKeepAliveAction.hpp new file mode 100644 index 0000000..5e2ebfe --- /dev/null +++ b/etcd/v3/AsyncLeaseKeepAliveAction.hpp @@ -0,0 +1,32 @@ +#ifndef __ASYNC_LEASEKEEPALIVEACTION_HPP__ +#define __ASYNC_LEASEKEEPALIVEACTION_HPP__ + +#include + +#include "etcd/Response.hpp" +#include "etcd/v3/Action.hpp" +#include "etcd/v3/AsyncleaseKeepAliveResponse.hpp" +#include "proto/rpc.grpc.pb.h" + +using etcd::Response; +using etcdserverpb::LeaseKeepAliveRequest; +using etcdserverpb::LeaseKeepAliveResponse; +using grpc::ClientAsyncReaderWriter; +using grpc::ClientAsyncResponseReader; + +namespace etcdv3 { +class AsyncLeaseKeepAliveAction : public etcdv3::Action { + public: + AsyncLeaseKeepAliveAction(etcdv3::ActionParameters param); + AsyncLeaseKeepAliveResponse ParseResponse(); + void waitForResponse(); + + private: + LeaseKeepAliveResponse reply; + std::unique_ptr> stream; + + char* doneTag = "writes done"; +}; +} // namespace etcdv3 + +#endif diff --git a/etcd/v3/AsyncLeaseKeepAliveResponse.hpp b/etcd/v3/AsyncLeaseKeepAliveResponse.hpp new file mode 100644 index 0000000..544b910 --- /dev/null +++ b/etcd/v3/AsyncLeaseKeepAliveResponse.hpp @@ -0,0 +1,23 @@ +#ifndef __ASYNC_LEASEKEEPALIVERESPONSE_HPP__ +#define __ASYNC_LEASEKEEPALIVERESPONSE_HPP__ + +#include + +#include "etcd/v3/V3Response.hpp" +#include "proto/rpc.grpc.pb.h" +#include "proto/rpc.pb.h" + + +using etcdserverpb::KV; +using etcdserverpb::LeaseKeepAliveRequest; +using etcdserverpb::LeaseKeepAliveResponse; + +namespace etcdv3 { +class AsyncLeaseKeepAliveResponse : public etcdv3::V3Response { + public: + AsyncLeaseKeepAliveResponse(){}; + void ParseResponse(LeaseKeepAliveResponse& resp); +}; +} // namespace etcdv3 + +#endif diff --git a/etcd/v3/AsyncLeaseLeasesAction.hpp b/etcd/v3/AsyncLeaseLeasesAction.hpp new file mode 100644 index 0000000..d82770d --- /dev/null +++ b/etcd/v3/AsyncLeaseLeasesAction.hpp @@ -0,0 +1,26 @@ +#ifndef __ASYNC_LEASELEASESACTION_HPP__ +#define __ASYNC_LEASELEASESACTION_HPP__ + +#include + +#include "etcd/v3/Action.hpp" +#include "etcd/v3/AsyncLeaseLeasesResponse.hpp" +#include "proto/rpc.grpc.pb.h" + +using etcdserverpb::LeaseLeasesRequest; +using etcdserverpb::LeaseLeasesResponse; +using grpc::ClientAsyncResponseReader; + +namespace etcdv3 { +class AsyncLeaseLeasesAction : public etcdv3::Action { + public: + AsyncLeaseLeasesAction(etcdv3::ActionParameters param); + AsyncLeaseLeasesResponse ParseResponse(); + + private: + LeaseLeasesResponse reply; + std::unique_ptr> response_reader; +}; +} // namespace etcdv3 + +#endif diff --git a/etcd/v3/AsyncLeaseLeasesResponse.hpp b/etcd/v3/AsyncLeaseLeasesResponse.hpp new file mode 100644 index 0000000..c6e34b6 --- /dev/null +++ b/etcd/v3/AsyncLeaseLeasesResponse.hpp @@ -0,0 +1,19 @@ +#ifndef __ASYNC_LEASELEASESRESPONSE_HPP__ +#define __ASYNC_LEASELEASESRESPONSE_HPP__ + +#include + +#include "etcd/v3/V3Response.hpp" +#include "proto/rpc.grpc.pb.h" + +using etcdserverpb::LeaseLeasesResponse; + +namespace etcdv3 { +class AsyncLeaseLeasesResponse : public etcdv3::V3Response { + public: + AsyncLeaseLeasesResponse(){}; + void ParseResponse(LeaseLeasesResponse& resp); +}; +} // namespace etcdv3 + +#endif diff --git a/etcd/v3/AsyncLeaseRevokeAction.hpp b/etcd/v3/AsyncLeaseRevokeAction.hpp new file mode 100644 index 0000000..48c8840 --- /dev/null +++ b/etcd/v3/AsyncLeaseRevokeAction.hpp @@ -0,0 +1,27 @@ +#ifndef __ASYNC_LEASEREVOKEACTION_HPP__ +#define __ASYNC_LEASEREVOKEACTION_HPP__ + +#include + +#include "etcd/v3/Action.hpp" +#include "etcd/v3/AsyncLeaseRevokeResponse.hpp" +#include "proto/rpc.grpc.pb.h" + + +using etcdserverpb::LeaseRevokeRequest; +using etcdserverpb::LeaseRevokeResponse; +using grpc::ClientAsyncResponseReader; + +namespace etcdv3 { +class AsyncLeaseRevokeAction : public etcdv3::Action { + public: + AsyncLeaseRevokeAction(etcdv3::ActionParameters param); + AsyncLeaseRevokeResponse ParseResponse(); + + private: + LeaseRevokeResponse reply; + std::unique_ptr> response_reader; +}; +} // namespace etcdv3 + +#endif diff --git a/etcd/v3/AsyncLeaseRevokeResponse.hpp b/etcd/v3/AsyncLeaseRevokeResponse.hpp new file mode 100644 index 0000000..9eeecbd --- /dev/null +++ b/etcd/v3/AsyncLeaseRevokeResponse.hpp @@ -0,0 +1,20 @@ +#ifndef __ASYNC_LEASEREVOKERESPONSE_HPP__ +#define __ASYNC_LEASEREVOKERESPONSE_HPP__ + +#include + +#include "etcd/v3/V3Response.hpp" +#include "proto/rpc.grpc.pb.h" + + +using etcdserverpb::LeaseRevokeResponse; + +namespace etcdv3 { +class AsyncLeaseRevokeResponse : public etcdv3::V3Response { + public: + AsyncLeaseRevokeResponse(){}; + void ParseResponse(LeaseRevokeResponse& resp); +}; +} // namespace etcdv3 + +#endif diff --git a/etcd/v3/AsyncLeaseTimeToLiveAction.hpp b/etcd/v3/AsyncLeaseTimeToLiveAction.hpp new file mode 100644 index 0000000..2122929 --- /dev/null +++ b/etcd/v3/AsyncLeaseTimeToLiveAction.hpp @@ -0,0 +1,27 @@ +#ifndef __ASYNC_LEASETIMETOLIVEACTION_HPP__ +#define __ASYNC_LEASETIMETOLIVEACTION_HPP__ + +#include + +#include "etcd/v3/Action.hpp" +#include "etcd/v3/AsyncLeaseTimeToLiveResponse.hpp" +#include "proto/rpc.grpc.pb.h" + + +using etcdserverpb::LeaseTimeToLiveRequest; +using etcdserverpb::LeaseTimeToLiveResponse; +using grpc::ClientAsyncResponseReader; + +namespace etcdv3 { +class AsyncLeaseTimeToLiveAction : public etcdv3::Action { + public: + AsyncLeaseTimeToLiveAction(etcdv3::ActionParameters param); + AsyncLeaseTimeToLiveResponse ParseResponse(); + + private: + LeaseTimeToLiveResponse reply; + std::unique_ptr> response_reader; +}; +} // namespace etcdv3 + +#endif diff --git a/etcd/v3/AsyncLeaseTimeToLiveResponse.hpp b/etcd/v3/AsyncLeaseTimeToLiveResponse.hpp new file mode 100644 index 0000000..f86c068 --- /dev/null +++ b/etcd/v3/AsyncLeaseTimeToLiveResponse.hpp @@ -0,0 +1,20 @@ +#ifndef __ASYNC_LEASETIMETOLIVERESPONSE_HPP__ +#define __ASYNC_LEASETIMETOLIVERESPONSE_HPP__ + +#include + +#include "etcd/v3/V3Response.hpp" +#include "proto/rpc.grpc.pb.h" + + +using etcdserverpb::LeaseTimeToLiveResponse; + +namespace etcdv3 { +class AsyncLeaseTimeToLiveResponse : public etcdv3::V3Response { + public: + AsyncLeaseTimeToLiveResponse(){}; + void ParseResponse(LeaseTimeToLiveResponse& resp); +}; +} // namespace etcdv3 + +#endif diff --git a/etcd/v3/LeaseInfo.hpp b/etcd/v3/LeaseInfo.hpp new file mode 100644 index 0000000..16b4f9e --- /dev/null +++ b/etcd/v3/LeaseInfo.hpp @@ -0,0 +1,27 @@ +#ifndef __V3_ETCDV3LEASEINFO_HPP__ +#define __V3_ETCDV3LEASEINFO_HPP__ + +#include "proto/kv.pb.h" + +namespace etcdv3 { +class LeaseInfo { + public: + LeaseInfo(); + // mvccpb::KeyValue kvs; + void set_lease(int64_t leaseid); + void set_ttl(int ttl); + void set_grantedttl(int ttl); + void add_key(std::string key); + int64_t get_lease() const; + int get_ttl() const; + int get_grantedttl() const; + std::vector get_keys() const; + + private: + int64_t leaseid_; + int ttl_; + int grantedttl_; + std::vector keys_; +}; +} // namespace etcdv3 +#endif diff --git a/etcd/v3/Transaction.hpp b/etcd/v3/Transaction.hpp index b9de935..89b43f7 100644 --- a/etcd/v3/Transaction.hpp +++ b/etcd/v3/Transaction.hpp @@ -25,6 +25,10 @@ public: void setup_delete_failure_operation(std::string const &key, std::string const &range_end, bool recursive); void setup_compare_and_delete_operation(std::string const& key); void setup_lease_grant_operation(int ttl); + void setup_lease_leases_operation(); + void setup_lease_keepalive_operation(int64_t lease_id); + void setup_lease_revoke_operation(int64_t lease_id); + void setup_lease_timetolive_operation(int64_t lease_id, bool keys); // update without `get` and no `prev_kv` returned void setup_put(std::string const &key, std::string const &value); @@ -32,6 +36,10 @@ public: etcdserverpb::TxnRequest txn_request; etcdserverpb::LeaseGrantRequest leasegrant_request; + etcdserverpb::LeaseLeasesRequest leaseleases_request; + etcdserverpb::LeaseKeepAliveRequest leasekeepalive_request; + etcdserverpb::LeaseRevokeRequest leaserevoke_request; + etcdserverpb::LeaseTimeToLiveRequest leasetimetolive_request; private: std::string key; diff --git a/etcd/v3/V3Response.hpp b/etcd/v3/V3Response.hpp index 0c97789..831459d 100644 --- a/etcd/v3/V3Response.hpp +++ b/etcd/v3/V3Response.hpp @@ -5,6 +5,7 @@ #include "proto/kv.pb.h" #include "etcd/v3/KeyValue.hpp" +#include "etcd/v3/LeaseInfo.hpp" namespace etcdv3 { @@ -21,9 +22,12 @@ namespace etcdv3 std::string const & get_action() const; std::vector const & get_values() const; std::vector const & get_prev_values() const; + std::vector const& get_leases() const; etcdv3::KeyValue const & get_value() const; etcdv3::KeyValue const & get_prev_value() const; + etcdv3::LeaseInfo const& get_leaseinfo() const; bool has_values() const; + bool has_leases() const; void set_lock_key(std::string const &key); std::string const &get_lock_key() const; std::vector const & get_events() const; @@ -38,6 +42,8 @@ namespace etcdv3 std::vector prev_values; std::string lock_key; // for lock std::vector events; // for watch + std::vector leases; // for list leases + LeaseInfo leaseinfo; // for lease ttl information }; } #endif diff --git a/src/Client.cpp b/src/Client.cpp index aa1cb3a..821f184 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -1,32 +1,36 @@ +#include #include #include -#include #include -#include +#include -#include -#include #include "etcd/Client.hpp" -#include "etcd/v3/action_constants.hpp" #include "etcd/v3/Action.hpp" -#include "etcd/v3/AsyncTxnResponse.hpp" -#include "etcd/v3/AsyncRangeResponse.hpp" -#include "etcd/v3/AsyncWatchResponse.hpp" #include "etcd/v3/AsyncDeleteRangeResponse.hpp" #include "etcd/v3/AsyncLockResponse.hpp" +#include "etcd/v3/AsyncRangeResponse.hpp" +#include "etcd/v3/AsyncTxnResponse.hpp" +#include "etcd/v3/AsyncWatchResponse.hpp" #include "etcd/v3/Transaction.hpp" +#include "etcd/v3/action_constants.hpp" #include +#include +#include -#include "etcd/v3/AsyncSetAction.hpp" -#include "etcd/v3/AsyncCompareAndSwapAction.hpp" #include "etcd/v3/AsyncCompareAndDeleteAction.hpp" -#include "etcd/v3/AsyncUpdateAction.hpp" -#include "etcd/v3/AsyncGetAction.hpp" +#include "etcd/v3/AsyncCompareAndSwapAction.hpp" #include "etcd/v3/AsyncDeleteAction.hpp" -#include "etcd/v3/AsyncWatchAction.hpp" +#include "etcd/v3/AsyncGetAction.hpp" #include "etcd/v3/AsyncLeaseGrantAction.hpp" +#include "etcd/v3/AsyncLeaseKeepAliveAction.hpp" +#include "etcd/v3/AsyncLeaseLeasesAction.hpp" +#include "etcd/v3/AsyncLeaseRevokeAction.hpp" +#include "etcd/v3/AsyncLeaseTimeToLiveAction.hpp" #include "etcd/v3/AsyncLockAction.hpp" +#include "etcd/v3/AsyncSetAction.hpp" #include "etcd/v3/AsyncTxnAction.hpp" +#include "etcd/v3/AsyncUpdateAction.hpp" +#include "etcd/v3/AsyncWatchAction.hpp" #include @@ -35,7 +39,8 @@ using grpc::Channel; namespace etcd { namespace detail { -static bool dns_resolve(std::string const &target, std::vector &endpoints) { +static bool dns_resolve(std::string const &target, + std::vector &endpoints) { struct addrinfo hints = {}, *addrs; hints.ai_family = AF_INET; hints.ai_socktype = SOCK_STREAM; @@ -47,15 +52,18 @@ static bool dns_resolve(std::string const &target, std::vector &end std::cerr << "warn: invalid URL: " << target << std::endl; return false; } - if (getaddrinfo(target_parts[0].c_str(), target_parts[1].c_str(), &hints, &addrs) != 0) { - std::cerr << "warn: getaddrinfo() failed for endpoint " << target << std::endl; + if (getaddrinfo(target_parts[0].c_str(), target_parts[1].c_str(), &hints, + &addrs) != 0) { + std::cerr << "warn: getaddrinfo() failed for endpoint " << target + << std::endl; return false; } char host[16] = {'\0'}; - for (struct addrinfo* addr = addrs; addr != nullptr; addr = addr->ai_next) { + for (struct addrinfo *addr = addrs; addr != nullptr; addr = addr->ai_next) { memset(host, '\0', sizeof(host)); - getnameinfo(addr->ai_addr, addr->ai_addrlen, host, sizeof(host), NULL, 0, NI_NUMERICHOST); + getnameinfo(addr->ai_addr, addr->ai_addrlen, host, sizeof(host), NULL, 0, + NI_NUMERICHOST); endpoints.emplace_back(std::string(host) + ":" + target_parts[1]); } freeaddrinfo(addrs); @@ -64,14 +72,16 @@ static bool dns_resolve(std::string const &target, std::vector &end const std::string strip_and_resolve_addresses(std::string const &address) { std::vector addresses; - boost::algorithm::split(addresses, address, boost::algorithm::is_any_of(",;")); + boost::algorithm::split(addresses, address, + boost::algorithm::is_any_of(",;")); std::string stripped_address; { std::vector stripped_addresses; std::string substr("://"); - for (auto const &addr: addresses) { + for (auto const &addr : addresses) { std::string::size_type idx = addr.find(substr); - std::string target = idx == std::string::npos ? addr : addr.substr(idx + substr.length()); + std::string target = + idx == std::string::npos ? addr : addr.substr(idx + substr.length()); etcd::detail::dns_resolve(target, stripped_addresses); } stripped_address = boost::algorithm::join(stripped_addresses, ","); @@ -100,370 +110,416 @@ const bool authenticate(std::shared_ptr const &channel, } } -} -} +} // namespace detail +} // namespace etcd -etcd::Client::Client(std::string const & address, - std::string const & load_balancer) -{ +etcd::Client::Client(std::string const &address, + std::string const &load_balancer) { // create channels - std::string const addresses = etcd::detail::strip_and_resolve_addresses(address); + std::string const addresses = + etcd::detail::strip_and_resolve_addresses(address); grpc::ChannelArguments grpc_args; grpc_args.SetMaxSendMessageSize(std::numeric_limits::max()); grpc_args.SetMaxReceiveMessageSize(std::numeric_limits::max()); - std::shared_ptr creds = grpc::InsecureChannelCredentials(); + std::shared_ptr creds = + grpc::InsecureChannelCredentials(); grpc_args.SetLoadBalancingPolicyName(load_balancer); this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args); // create stubs kvServiceStub = KV::NewStub(this->channel); - watchServiceStub= Watch::NewStub(this->channel); - leaseServiceStub= Lease::NewStub(this->channel); + watchServiceStub = Watch::NewStub(this->channel); + leaseServiceStub = Lease::NewStub(this->channel); lockServiceStub = Lock::NewStub(this->channel); } -etcd::Client::Client(std::string const & address, - std::string const & username, - std::string const & password, - std::string const & load_balancer) -{ +etcd::Client::Client(std::string const &address, std::string const &username, + std::string const &password, + std::string const &load_balancer) { // create channels - std::string const addresses = etcd::detail::strip_and_resolve_addresses(address); + std::string const addresses = + etcd::detail::strip_and_resolve_addresses(address); grpc::ChannelArguments grpc_args; grpc_args.SetMaxSendMessageSize(std::numeric_limits::max()); grpc_args.SetMaxReceiveMessageSize(std::numeric_limits::max()); - std::shared_ptr creds = grpc::InsecureChannelCredentials(); + std::shared_ptr creds = + grpc::InsecureChannelCredentials(); grpc_args.SetLoadBalancingPolicyName(load_balancer); this->channel = grpc::CreateCustomChannel(addresses, creds, grpc_args); // auth std::string token_or_message; - if (!etcd::detail::authenticate(this->channel, username, password, token_or_message)) { - throw std::invalid_argument("Etcd authentication failed: " + token_or_message); + if (!etcd::detail::authenticate(this->channel, username, password, + token_or_message)) { + throw std::invalid_argument("Etcd authentication failed: " + + token_or_message); } this->auth_token = token_or_message; // setup stubs kvServiceStub = KV::NewStub(this->channel); - watchServiceStub= Watch::NewStub(this->channel); - leaseServiceStub= Lease::NewStub(this->channel); + watchServiceStub = Watch::NewStub(this->channel); + leaseServiceStub = Lease::NewStub(this->channel); lockServiceStub = Lock::NewStub(this->channel); } -pplx::task etcd::Client::get(std::string const & key) -{ +pplx::task etcd::Client::get(std::string const &key) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.key.assign(key); params.withPrefix = false; - params.kv_stub = kvServiceStub .get(); - std::shared_ptr call(new etcdv3::AsyncGetAction(params)); + params.kv_stub = kvServiceStub.get(); + std::shared_ptr call( + new etcdv3::AsyncGetAction(params)); return Response::create(call); } -pplx::task etcd::Client::set(std::string const & key, std::string const & value, int ttl) -{ +pplx::task +etcd::Client::set(std::string const &key, std::string const &value, int ttl) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.key.assign(key); params.value.assign(value); - params.kv_stub = kvServiceStub .get(); + params.kv_stub = kvServiceStub.get(); - if(ttl > 0) - { + if (ttl > 0) { auto res = leasegrant(ttl).get(); - if(!res.is_ok()) - { - return pplx::task([res]() - { - return etcd::Response(res.error_code(), res.error_message().c_str()); + if (!res.is_ok()) { + return pplx::task([res]() { + return etcd::Response(res.error_code(), res.error_message().c_str()); }); - } - else - { + } else { params.lease_id = res.value().lease(); } } - std::shared_ptr call(new etcdv3::AsyncSetAction(params)); + std::shared_ptr call( + new etcdv3::AsyncSetAction(params)); return Response::create(call); } -pplx::task etcd::Client::set(std::string const & key, std::string const & value, int64_t leaseid) -{ +pplx::task etcd::Client::set(std::string const &key, + std::string const &value, + int64_t leaseid) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.key.assign(key); params.value.assign(value); params.lease_id = leaseid; - params.kv_stub = kvServiceStub .get(); - std::shared_ptr call(new etcdv3::AsyncSetAction(params)); + params.kv_stub = kvServiceStub.get(); + std::shared_ptr call( + new etcdv3::AsyncSetAction(params)); return Response::create(call); } - -pplx::task etcd::Client::add(std::string const & key, std::string const & value, int ttl) -{ +pplx::task +etcd::Client::add(std::string const &key, std::string const &value, int ttl) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.key.assign(key); params.value.assign(value); - params.kv_stub = kvServiceStub .get(); + params.kv_stub = kvServiceStub.get(); - if(ttl > 0) - { + if (ttl > 0) { auto res = leasegrant(ttl).get(); - if(!res.is_ok()) - { - return pplx::task([res]() - { - return etcd::Response(res.error_code(), res.error_message().c_str()); + if (!res.is_ok()) { + return pplx::task([res]() { + return etcd::Response(res.error_code(), res.error_message().c_str()); }); - } - else - { + } else { params.lease_id = res.value().lease(); } } - std::shared_ptr call(new etcdv3::AsyncSetAction(params,true)); + std::shared_ptr call( + new etcdv3::AsyncSetAction(params, true)); return Response::create(call); } -pplx::task etcd::Client::add(std::string const & key, std::string const & value, int64_t leaseid) -{ +pplx::task etcd::Client::add(std::string const &key, + std::string const &value, + int64_t leaseid) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.key.assign(key); params.value.assign(value); params.lease_id = leaseid; - params.kv_stub = kvServiceStub .get(); - std::shared_ptr call(new etcdv3::AsyncSetAction(params,true)); + params.kv_stub = kvServiceStub.get(); + std::shared_ptr call( + new etcdv3::AsyncSetAction(params, true)); return Response::create(call); } - -pplx::task etcd::Client::modify(std::string const & key, std::string const & value, int ttl) -{ +pplx::task etcd::Client::modify(std::string const &key, + std::string const &value, + int ttl) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.key.assign(key); params.value.assign(value); - params.kv_stub = kvServiceStub .get(); + params.kv_stub = kvServiceStub.get(); - if(ttl > 0) - { + if (ttl > 0) { auto res = leasegrant(ttl).get(); - if(!res.is_ok()) - { - return pplx::task([res]() - { - return etcd::Response(res.error_code(), res.error_message().c_str()); + if (!res.is_ok()) { + return pplx::task([res]() { + return etcd::Response(res.error_code(), res.error_message().c_str()); }); - } - else - { + } else { params.lease_id = res.value().lease(); } } - std::shared_ptr call(new etcdv3::AsyncUpdateAction(params)); + std::shared_ptr call( + new etcdv3::AsyncUpdateAction(params)); return Response::create(call); } -pplx::task etcd::Client::modify(std::string const & key, std::string const & value, int64_t leaseid) -{ +pplx::task etcd::Client::modify(std::string const &key, + std::string const &value, + int64_t leaseid) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.key.assign(key); params.value.assign(value); params.lease_id = leaseid; - params.kv_stub = kvServiceStub .get(); - std::shared_ptr call(new etcdv3::AsyncUpdateAction(params)); + params.kv_stub = kvServiceStub.get(); + std::shared_ptr call( + new etcdv3::AsyncUpdateAction(params)); return Response::create(call); } - -pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value, int ttl) -{ +pplx::task etcd::Client::modify_if(std::string const &key, + std::string const &value, + std::string const &old_value, + int ttl) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.key.assign(key); params.value.assign(value); params.old_value.assign(old_value); - params.kv_stub = kvServiceStub .get(); + params.kv_stub = kvServiceStub.get(); - if(ttl > 0) - { + if (ttl > 0) { auto res = leasegrant(ttl).get(); - if(!res.is_ok()) - { - return pplx::task([res]() - { - return etcd::Response(res.error_code(), res.error_message().c_str()); + if (!res.is_ok()) { + return pplx::task([res]() { + return etcd::Response(res.error_code(), res.error_message().c_str()); }); - } - else - { + } else { params.lease_id = res.value().lease(); } } - std::shared_ptr call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_VALUE)); + std::shared_ptr call( + new etcdv3::AsyncCompareAndSwapAction( + params, etcdv3::Atomicity_Type::PREV_VALUE)); return Response::create(call); } -pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, std::string const & old_value, int64_t leaseid) -{ +pplx::task etcd::Client::modify_if(std::string const &key, + std::string const &value, + std::string const &old_value, + int64_t leaseid) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.key.assign(key); params.value.assign(value); params.old_value.assign(old_value); params.lease_id = leaseid; - params.kv_stub = kvServiceStub .get(); - std::shared_ptr call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_VALUE)); + params.kv_stub = kvServiceStub.get(); + std::shared_ptr call( + new etcdv3::AsyncCompareAndSwapAction( + params, etcdv3::Atomicity_Type::PREV_VALUE)); return Response::create(call); } -pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index, int ttl) -{ +pplx::task etcd::Client::modify_if(std::string const &key, + std::string const &value, + int old_index, int ttl) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.key.assign(key); params.value.assign(value); params.old_revision = old_index; - params.kv_stub = kvServiceStub .get(); - if(ttl > 0) - { + params.kv_stub = kvServiceStub.get(); + if (ttl > 0) { auto res = leasegrant(ttl).get(); - if(!res.is_ok()) - { - return pplx::task([res]() - { - return etcd::Response(res.error_code(), res.error_message().c_str()); + if (!res.is_ok()) { + return pplx::task([res]() { + return etcd::Response(res.error_code(), res.error_message().c_str()); }); - } - else - { + } else { params.lease_id = res.value().lease(); } } - std::shared_ptr call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_INDEX)); + std::shared_ptr call( + new etcdv3::AsyncCompareAndSwapAction( + params, etcdv3::Atomicity_Type::PREV_INDEX)); return Response::create(call); } -pplx::task etcd::Client::modify_if(std::string const & key, std::string const & value, int old_index, int64_t leaseid) -{ +pplx::task etcd::Client::modify_if(std::string const &key, + std::string const &value, + int old_index, + int64_t leaseid) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.key.assign(key); params.value.assign(value); params.lease_id = leaseid; params.old_revision = old_index; - params.kv_stub = kvServiceStub .get(); - std::shared_ptr call(new etcdv3::AsyncCompareAndSwapAction(params,etcdv3::Atomicity_Type::PREV_INDEX)); + params.kv_stub = kvServiceStub.get(); + std::shared_ptr call( + new etcdv3::AsyncCompareAndSwapAction( + params, etcdv3::Atomicity_Type::PREV_INDEX)); return Response::create(call); } - -pplx::task etcd::Client::rm(std::string const & key) -{ +pplx::task etcd::Client::rm(std::string const &key) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.key.assign(key); params.withPrefix = false; - params.kv_stub = kvServiceStub .get(); - std::shared_ptr call(new etcdv3::AsyncDeleteAction(params)); + params.kv_stub = kvServiceStub.get(); + std::shared_ptr call( + new etcdv3::AsyncDeleteAction(params)); return Response::create(call); } - -pplx::task etcd::Client::rm_if(std::string const & key, std::string const & old_value) -{ +pplx::task etcd::Client::rm_if(std::string const &key, + std::string const &old_value) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.key.assign(key); params.old_value.assign(old_value); - params.kv_stub = kvServiceStub .get(); - std::shared_ptr call(new etcdv3::AsyncCompareAndDeleteAction(params,etcdv3::Atomicity_Type::PREV_VALUE)); + params.kv_stub = kvServiceStub.get(); + std::shared_ptr call( + new etcdv3::AsyncCompareAndDeleteAction( + params, etcdv3::Atomicity_Type::PREV_VALUE)); return Response::create(call); } -pplx::task etcd::Client::rm_if(std::string const & key, int old_index) -{ +pplx::task etcd::Client::rm_if(std::string const &key, + int old_index) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.key.assign(key); params.old_revision = old_index; - params.kv_stub = kvServiceStub .get(); - std::shared_ptr call(new etcdv3::AsyncCompareAndDeleteAction(params, etcdv3::Atomicity_Type::PREV_INDEX));; + params.kv_stub = kvServiceStub.get(); + std::shared_ptr call( + new etcdv3::AsyncCompareAndDeleteAction( + params, etcdv3::Atomicity_Type::PREV_INDEX)); + ; return Response::create(call); - } -pplx::task etcd::Client::rmdir(std::string const & key, bool recursive) -{ +pplx::task etcd::Client::rmdir(std::string const &key, + bool recursive) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.key.assign(key); params.withPrefix = recursive; - params.kv_stub = kvServiceStub .get(); - std::shared_ptr call(new etcdv3::AsyncDeleteAction(params)); + params.kv_stub = kvServiceStub.get(); + std::shared_ptr call( + new etcdv3::AsyncDeleteAction(params)); return Response::create(call); } -pplx::task etcd::Client::ls(std::string const & key) -{ +pplx::task etcd::Client::ls(std::string const &key) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.key.assign(key); params.withPrefix = true; - params.limit = 0; // default no limit. - params.kv_stub = kvServiceStub .get(); - std::shared_ptr call(new etcdv3::AsyncGetAction(params)); + params.limit = 0; // default no limit. + params.kv_stub = kvServiceStub.get(); + std::shared_ptr call( + new etcdv3::AsyncGetAction(params)); return Response::create(call); } -pplx::task etcd::Client::ls(std::string const & key, size_t const limit) -{ +pplx::task etcd::Client::ls(std::string const &key, + size_t const limit) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.key.assign(key); params.withPrefix = true; params.limit = limit; - params.kv_stub = kvServiceStub .get(); - std::shared_ptr call(new etcdv3::AsyncGetAction(params)); + params.kv_stub = kvServiceStub.get(); + std::shared_ptr call( + new etcdv3::AsyncGetAction(params)); return Response::create(call); } -pplx::task etcd::Client::watch(std::string const & key, bool recursive) -{ +pplx::task etcd::Client::watch(std::string const &key, + bool recursive) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.key.assign(key); params.withPrefix = recursive; params.watch_stub = watchServiceStub.get(); - std::shared_ptr call(new etcdv3::AsyncWatchAction(params)); + std::shared_ptr call( + new etcdv3::AsyncWatchAction(params)); return Response::create(call); } -pplx::task etcd::Client::watch(std::string const & key, int fromIndex, bool recursive) -{ +pplx::task etcd::Client::watch(std::string const &key, + int fromIndex, bool recursive) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.key.assign(key); params.withPrefix = recursive; params.revision = fromIndex; params.watch_stub = watchServiceStub.get(); - std::shared_ptr call(new etcdv3::AsyncWatchAction(params)); + std::shared_ptr call( + new etcdv3::AsyncWatchAction(params)); return Response::create(call); } -pplx::task etcd::Client::leasegrant(int ttl) -{ +pplx::task etcd::Client::leasegrant(int ttl) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); params.ttl = ttl; params.lease_stub = leaseServiceStub.get(); - std::shared_ptr call(new etcdv3::AsyncLeaseGrantAction(params)); + std::shared_ptr call( + new etcdv3::AsyncLeaseGrantAction(params)); + return Response::create(call); +} + +pplx::task etcd::Client::listleases() { + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.lease_stub = leaseServiceStub.get(); + std::shared_ptr call( + new etcdv3::AsyncLeaseLeasesAction(params)); + return Response::create(call); +} + +pplx::task etcd::Client::leasekeepalive(int64_t lease_id) { + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.lease_id = lease_id; + params.lease_stub = leaseServiceStub.get(); + std::shared_ptr call( + new etcdv3::AsyncLeaseKeepAliveAction(params)); + return Response::create(call); +} + +pplx::task etcd::Client::leaserevoke(int64_t lease_id) { + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.lease_id = lease_id; + params.lease_stub = leaseServiceStub.get(); + std::shared_ptr call( + new etcdv3::AsyncLeaseRevokeAction(params)); + return Response::create(call); +} + +pplx::task etcd::Client::leasettl(int64_t lease_id, bool keys) { + etcdv3::ActionParameters params; + params.auth_token.assign(this->auth_token); + params.lease_id = lease_id; + params.keys = keys; + params.lease_stub = leaseServiceStub.get(); + std::shared_ptr call( + new etcdv3::AsyncLeaseTimeToLiveAction(params)); return Response::create(call); } @@ -472,7 +528,8 @@ pplx::task etcd::Client::lock(std::string const &key) { params.auth_token.assign(this->auth_token); params.key = key; params.lock_stub = lockServiceStub.get(); - std::shared_ptr call(new etcdv3::AsyncLockAction(params)); + std::shared_ptr call( + new etcdv3::AsyncLockAction(params)); return Response::create(call); } @@ -481,14 +538,16 @@ pplx::task etcd::Client::unlock(std::string const &key) { params.auth_token.assign(this->auth_token); params.key = key; params.lock_stub = lockServiceStub.get(); - std::shared_ptr call(new etcdv3::AsyncUnlockAction(params)); + std::shared_ptr call( + new etcdv3::AsyncUnlockAction(params)); return Response::create(call); } pplx::task etcd::Client::txn(etcdv3::Transaction const &txn) { etcdv3::ActionParameters params; params.auth_token.assign(this->auth_token); - params.kv_stub = kvServiceStub .get(); - std::shared_ptr call(new etcdv3::AsyncTxnAction(params, txn)); + params.kv_stub = kvServiceStub.get(); + std::shared_ptr call( + new etcdv3::AsyncTxnAction(params, txn)); return Response::create(call); } diff --git a/src/KeepAlive.cpp b/src/KeepAlive.cpp new file mode 100644 index 0000000..16ec9cf --- /dev/null +++ b/src/KeepAlive.cpp @@ -0,0 +1,127 @@ +#include "etcd/KeepAlive.hpp" + +#include +#include + +#include "etcd/v3/AsyncLeaseKeepAliveAction.hpp" +#include "etcd/v3/AsyncLeaseRevokeAction.hpp" + +etcd::KeepAlive::KeepAlive(Client& client) : client_(client) { stub_ = etcdserverpb::Lease::NewStub(client.channel); } + +pplx::task etcd::KeepAlive::start(int refresh_in_ms) { + pplx::task_completion_event tce; + + // Start a repetetive timer that will trigger our lease keepalive refresh + timer_ = std::unique_ptr>(new pplx::timer(refresh_in_ms, 0, nullptr, true)); + + // Open the stream + stream_ = stub_->AsyncLeaseKeepAlive(&context_, &cq_, reinterpret_cast(Type::CONNECT)); + + // Define the callback that is going to be repeatedly called to create a queue of lease id's + // that need to be refreshed + auto queueKeepAlivesCallback = new pplx::call([this](int) { + std::cout << " ** Queueing keepalives: " << std::endl; + + // Copy the lease id's to a queue for processing + pplx::concurrent_unordered_map::iterator itr; + for (itr = leases_.begin(); itr != leases_.end(); ++itr) { + leaseQueue_.push(*itr); + } + + // Kick off the process by sending the first keepalive command + // You can have at most one write or at most one read at any given time, + // so we are not sending another keepalive on the stream until the previous one + // responds + sendNextKeepAlive(); + // tce.set(); + }); + + // Create the long running task that will monitor the Completion Queue and + // send/receive data on the stream + // On a CONNECT, this will kick off the timer which then triggers the keepalive's + currentTask_ = pplx::task([this, tce, queueKeepAlivesCallback]() { + while (true) { + void* got_tag; + bool ok = false; + // Block until the next result is available in the completion queue + // "cq". The return value of Next should always be checked. This + // return value tells us whether there is any kind of event or the cq_ + // is shutting down. + std::cout << "Waiting for next event..." << std::endl; + if (!cq_.Next(&got_tag, &ok)) { + std::cerr << "Client stream closed. Quitting" << std::endl; + break; + } + + // It's important to process all tags even if the ok is false. One + // might want to deallocate memory that has be reinterpret_cast'ed to + // void* when the tag got initialized. For our example, we cast an int + // to a void*, so we don't have extra memory management to take care + // of. + if (ok) { + std::cout << std::endl << "**** Processing completion queue tag " << got_tag << std::endl; + + switch (static_cast(reinterpret_cast(got_tag))) { + case Type::READ: + std::cout << "Read a new message, sending next." << std::endl; + sendNextKeepAlive(); + break; + case Type::WRITE: + std::cout << "Sent message (async), attempting to read response." << std::endl; + readNextMessage(); + break; + case Type::CONNECT: + std::cout << "Server connected." << std::endl; + tce.set(); + timer_->link_target(queueKeepAlivesCallback); + timer_->start(); + break; + case Type::WRITES_DONE: + std::cout << "Server disconnecting." << std::endl; + timer_->stop(); + break; + case Type::FINISH: + std::cout << "Client finish; status = " << (finish_status_.ok() ? "ok" : "cancelled") << std::endl; + context_.TryCancel(); + cq_.Shutdown(); + break; + default: + std::cerr << "Unexpected tag " << got_tag << std::endl; + } + } + } + }); + + // Returning a task that completes once the CONNECT completes, so that the + // client can perform anything they need to at that time. + pplx::task start_completed(tce); + return start_completed; +} + +void etcd::KeepAlive::add(int64_t leaseid, int ttl) { leases_.insert(std::make_pair(leaseid, ttl)); } + +void etcd::KeepAlive::remove(int64_t leaseid, bool revoke) { + leases_.unsafe_erase(leaseid); + if (revoke) { + client_.leaserevoke(leaseid); + } +} + +void etcd::KeepAlive::sendNextKeepAlive() { + std::pair lease; + if (leaseQueue_.try_pop(lease)) { + std::cout << "sending keepalive for " << lease.first << std::endl; + LeaseKeepAliveRequest request; + request.set_id(lease.first); + stream_->Write(request, reinterpret_cast(Type::WRITE)); + } else { + std::cout << "no keepalives left" << std::endl; + } +} + +void etcd::KeepAlive::readNextMessage() { + std::cout << " ** Got response: " << response_.ttl() << std::endl; + stream_->Read(&response_, reinterpret_cast(Type::READ)); +} + +etcd::KeepAlive::~KeepAlive() { cq_.Shutdown(); } diff --git a/src/Response.cpp b/src/Response.cpp index 57ac8b8..7e42e23 100644 --- a/src/Response.cpp +++ b/src/Response.cpp @@ -31,6 +31,9 @@ etcd::Response::Response(const etcdv3::V3Response& reply, std::chrono::microseco // duration _duration = duration; + + _leases = reply.get_leases(); + _leaseinfo = reply.get_leaseinfo(); } @@ -118,3 +121,7 @@ std::vector const & etcd::Response::events() const { std::chrono::microseconds const& etcd::Response::duration() const { return this->_duration; } + +std::vector const& etcd::Response::leases() const { return _leases; } + +etcdv3::LeaseInfo const& etcd::Response::leaseinfo() const { return _leaseinfo; } diff --git a/src/v3/AsyncLeaseKeepAliveAction.cpp b/src/v3/AsyncLeaseKeepAliveAction.cpp new file mode 100644 index 0000000..a23c174 --- /dev/null +++ b/src/v3/AsyncLeaseKeepAliveAction.cpp @@ -0,0 +1,69 @@ +#include "etcd/v3/AsyncLeaseKeepAliveAction.hpp" + +#include "etcd/v3/Transaction.hpp" +#include "etcd/v3/action_constants.hpp" + +using etcdserverpb::LeaseKeepAliveRequest; +using etcdserverpb::RangeRequest; +using etcdserverpb::RangeResponse; + +etcdv3::AsyncLeaseKeepAliveAction::AsyncLeaseKeepAliveAction(etcdv3::ActionParameters param) : etcdv3::Action(param) { + char* createTag = "create"; + char* writeTag = "write"; + + stream = parameters.lease_stub->AsyncLeaseKeepAlive(&context, &cq_, (void*)createTag); + + LeaseKeepAliveRequest request; + request.set_id(parameters.lease_id); + + // wait "create" success (the stream becomes ready) + void* got_tag; + bool ok = false; + if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void*)createTag) { + stream->Write(request, (void*)writeTag); + } else { + throw std::runtime_error("failed to create a keepalive connection"); + } + + // wait "write" (LeaseKeepAliveRequest) success, and start to read the first + // reply + if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void*)writeTag) { + stream->Read(&reply, (void*)this); + } else { + throw std::runtime_error("failed to write LeaseKeepAliveRequest to server"); + } +} + +void etcdv3::AsyncLeaseKeepAliveAction::waitForResponse() { + void* got_tag; + bool ok = false; + + while (cq_.Next(&got_tag, &ok)) { + if (ok == false) { + break; + } + if (got_tag == (void*)doneTag) { + cq_.Shutdown(); + break; + } + if (got_tag == (void*)this) // read tag + { + if (reply.ByteSize()) { + stream->WritesDone((void*)doneTag); + } else { + stream->Read(&reply, (void*)this); + } + } + } +} + +etcdv3::AsyncLeaseKeepAliveResponse etcdv3::AsyncLeaseKeepAliveAction::ParseResponse() { + AsyncLeaseKeepAliveResponse resp; + if (!status.ok()) { + resp.set_error_code(status.error_code()); + resp.set_error_message(status.error_message()); + } else { + resp.ParseResponse(reply); + } + return resp; +} diff --git a/src/v3/AsyncLeaseKeepAliveResponse.cpp b/src/v3/AsyncLeaseKeepAliveResponse.cpp new file mode 100644 index 0000000..e8786c2 --- /dev/null +++ b/src/v3/AsyncLeaseKeepAliveResponse.cpp @@ -0,0 +1,13 @@ +#include "etcd/v3/AsyncLeaseKeepAliveResponse.hpp" + +#include + +#include "etcd/v3/action_constants.hpp" + +using namespace std; +void etcdv3::AsyncLeaseKeepAliveResponse::ParseResponse(LeaseKeepAliveResponse& reply) { + index = reply.header().revision(); + value.kvs.set_lease(reply.id()); + value.set_ttl(reply.ttl()); + return; +} diff --git a/src/v3/AsyncLeaseLeasesAction.cpp b/src/v3/AsyncLeaseLeasesAction.cpp new file mode 100644 index 0000000..7705b98 --- /dev/null +++ b/src/v3/AsyncLeaseLeasesAction.cpp @@ -0,0 +1,26 @@ +#include "etcd/v3/AsyncLeaseLeasesAction.hpp" + +#include "etcd/v3/Transaction.hpp" +#include "etcd/v3/action_constants.hpp" + + +using etcdserverpb::LeaseLeasesRequest; + +etcdv3::AsyncLeaseLeasesAction::AsyncLeaseLeasesAction(etcdv3::ActionParameters param) : etcdv3::Action(param) { + etcdv3::Transaction transaction; + transaction.setup_lease_leases_operation(); + + response_reader = parameters.lease_stub->AsyncLeaseLeases(&context, transaction.leaseleases_request, &cq_); + response_reader->Finish(&reply, &status, (void*)this); +} + +etcdv3::AsyncLeaseLeasesResponse etcdv3::AsyncLeaseLeasesAction::ParseResponse() { + AsyncLeaseLeasesResponse lease_resp; + if (!status.ok()) { + lease_resp.set_error_code(status.error_code()); + lease_resp.set_error_message(status.error_message()); + } else { + lease_resp.ParseResponse(reply); + } + return lease_resp; +} diff --git a/src/v3/AsyncLeaseLeasesResponse.cpp b/src/v3/AsyncLeaseLeasesResponse.cpp new file mode 100644 index 0000000..424c36a --- /dev/null +++ b/src/v3/AsyncLeaseLeasesResponse.cpp @@ -0,0 +1,11 @@ +#include "etcd/v3/AsyncLeaseLeasesResponse.hpp" +#include "etcd/v3/action_constants.hpp" + +void etcdv3::AsyncLeaseLeasesResponse::ParseResponse( + LeaseLeasesResponse &resp) { + index = resp.header().revision(); + + for (int index = 0; index < resp.leases_size(); index++) { + leases.push_back(resp.leases(index).id()); + } +} diff --git a/src/v3/AsyncLeaseRevokeAction.cpp b/src/v3/AsyncLeaseRevokeAction.cpp new file mode 100644 index 0000000..5e4de24 --- /dev/null +++ b/src/v3/AsyncLeaseRevokeAction.cpp @@ -0,0 +1,28 @@ +#include "etcd/v3/AsyncLeaseRevokeAction.hpp" +#include "etcd/v3/Transaction.hpp" +#include "etcd/v3/action_constants.hpp" + +using etcdserverpb::LeaseRevokeRequest; + +etcdv3::AsyncLeaseRevokeAction::AsyncLeaseRevokeAction( + etcdv3::ActionParameters param) + : etcdv3::Action(param) { + etcdv3::Transaction transaction; + transaction.setup_lease_revoke_operation(parameters.lease_id); + + response_reader = parameters.lease_stub->AsyncLeaseRevoke( + &context, transaction.leaserevoke_request, &cq_); + response_reader->Finish(&reply, &status, (void *)this); +} + +etcdv3::AsyncLeaseRevokeResponse +etcdv3::AsyncLeaseRevokeAction::ParseResponse() { + AsyncLeaseRevokeResponse lease_resp; + if (!status.ok()) { + lease_resp.set_error_code(status.error_code()); + lease_resp.set_error_message(status.error_message()); + } else { + lease_resp.ParseResponse(reply); + } + return lease_resp; +} diff --git a/src/v3/AsyncLeaseRevokeResponse.cpp b/src/v3/AsyncLeaseRevokeResponse.cpp new file mode 100644 index 0000000..81e598d --- /dev/null +++ b/src/v3/AsyncLeaseRevokeResponse.cpp @@ -0,0 +1,7 @@ +#include "etcd/v3/AsyncLeaseRevokeResponse.hpp" +#include "etcd/v3/action_constants.hpp" + +void etcdv3::AsyncLeaseRevokeResponse::ParseResponse( + LeaseRevokeResponse &resp) { + index = resp.header().revision(); +} diff --git a/src/v3/AsyncLeaseTimeToLiveAction.cpp b/src/v3/AsyncLeaseTimeToLiveAction.cpp new file mode 100644 index 0000000..fc69c10 --- /dev/null +++ b/src/v3/AsyncLeaseTimeToLiveAction.cpp @@ -0,0 +1,29 @@ +#include "etcd/v3/AsyncLeaseTimeToLiveAction.hpp" +#include "etcd/v3/Transaction.hpp" +#include "etcd/v3/action_constants.hpp" + +using etcdserverpb::LeaseTimeToLiveRequest; + +etcdv3::AsyncLeaseTimeToLiveAction::AsyncLeaseTimeToLiveAction( + etcdv3::ActionParameters param) + : etcdv3::Action(param) { + etcdv3::Transaction transaction; + transaction.setup_lease_timetolive_operation(parameters.lease_id, + parameters.keys); + + response_reader = parameters.lease_stub->AsyncLeaseTimeToLive( + &context, transaction.leasetimetolive_request, &cq_); + response_reader->Finish(&reply, &status, (void *)this); +} + +etcdv3::AsyncLeaseTimeToLiveResponse +etcdv3::AsyncLeaseTimeToLiveAction::ParseResponse() { + AsyncLeaseTimeToLiveResponse lease_resp; + if (!status.ok()) { + lease_resp.set_error_code(status.error_code()); + lease_resp.set_error_message(status.error_message()); + } else { + lease_resp.ParseResponse(reply); + } + return lease_resp; +} diff --git a/src/v3/AsyncLeaseTimeToLiveResponse.cpp b/src/v3/AsyncLeaseTimeToLiveResponse.cpp new file mode 100644 index 0000000..b8479a9 --- /dev/null +++ b/src/v3/AsyncLeaseTimeToLiveResponse.cpp @@ -0,0 +1,15 @@ +#include "etcd/v3/AsyncLeaseTimeToLiveResponse.hpp" +#include "etcd/v3/action_constants.hpp" + +void etcdv3::AsyncLeaseTimeToLiveResponse::ParseResponse( + LeaseTimeToLiveResponse &resp) { + index = resp.header().revision(); + + leaseinfo.set_lease(resp.id()); + leaseinfo.set_ttl(resp.ttl()); + leaseinfo.set_grantedttl(resp.grantedttl()); + + for (int index = 0; index < resp.keys_size(); index++) { + leaseinfo.add_key(resp.keys(index)); + } +} diff --git a/src/v3/LeaseInfo.cpp b/src/v3/LeaseInfo.cpp new file mode 100644 index 0000000..80b550b --- /dev/null +++ b/src/v3/LeaseInfo.cpp @@ -0,0 +1,18 @@ +#include "etcd/v3/LeaseInfo.hpp" + +etcdv3::LeaseInfo::LeaseInfo() { + leaseid_ = 0; + ttl_ = 0; + grantedttl_ = 0; + keys_.clear(); +} + +void etcdv3::LeaseInfo::set_lease(int64_t id) { leaseid_ = id; } +void etcdv3::LeaseInfo::set_ttl(int ttl) { ttl_ = ttl; } +void etcdv3::LeaseInfo::set_grantedttl(int ttl) { grantedttl_ = ttl; } +void etcdv3::LeaseInfo::add_key(std::string key) { keys_.push_back(key); } + +int64_t etcdv3::LeaseInfo::get_lease() const { return leaseid_; } +int etcdv3::LeaseInfo::get_ttl() const { return ttl_; } +int etcdv3::LeaseInfo::get_grantedttl() const { return grantedttl_; } +std::vector etcdv3::LeaseInfo::get_keys() const { return keys_; } diff --git a/src/v3/Transaction.cpp b/src/v3/Transaction.cpp index d1b32db..e078082 100644 --- a/src/v3/Transaction.cpp +++ b/src/v3/Transaction.cpp @@ -160,6 +160,24 @@ void etcdv3::Transaction::setup_lease_grant_operation(int ttl) leasegrant_request.set_ttl(ttl); } +void etcdv3::Transaction::setup_lease_leases_operation() { + // leaseleases_request.set_id(lease_id); +} + +void etcdv3::Transaction::setup_lease_keepalive_operation(int64_t lease_id) { + leasekeepalive_request.set_id(lease_id); +} + +void etcdv3::Transaction::setup_lease_revoke_operation(int64_t lease_id) { + leaserevoke_request.set_id(lease_id); +} + +void etcdv3::Transaction::setup_lease_timetolive_operation(int64_t lease_id, + bool keys) { + leasetimetolive_request.set_id(lease_id); + leasetimetolive_request.set_keys(keys); +} + void etcdv3::Transaction::setup_put(std::string const &key, std::string const &value) { std::unique_ptr put_request(new PutRequest()); put_request->set_key(key); diff --git a/src/v3/V3Response.cpp b/src/v3/V3Response.cpp index 3e76730..94e6507 100644 --- a/src/v3/V3Response.cpp +++ b/src/v3/V3Response.cpp @@ -46,6 +46,10 @@ std::vector const & etcdv3::V3Response::get_prev_values() cons return prev_values; } +std::vector const& etcdv3::V3Response::get_leases() const { + return leases; +} + etcdv3::KeyValue const & etcdv3::V3Response::get_value() const { return value; @@ -56,11 +60,19 @@ etcdv3::KeyValue const & etcdv3::V3Response::get_prev_value() const return prev_value; } +etcdv3::LeaseInfo const& etcdv3::V3Response::get_leaseinfo() const { + return leaseinfo; +} + bool etcdv3::V3Response::has_values() const { return values.size() > 0; } +bool etcdv3::V3Response::has_leases() const { + return leases.size() > 0; +} + void etcdv3::V3Response::set_lock_key(std::string const &key) { this->lock_key = key; }