diff --git a/README.md b/README.md index 2a8a45e..a7bc4f6 100644 --- a/README.md +++ b/README.md @@ -423,27 +423,34 @@ send a periodic refresh to the server. This can be done using a one time command called `leasekeepalive(lease_id)`. However, that is inefficient since it creates and destroys a bidirectional stream on each call. Instead, it is recommend to use the `etcd::KeepAlive` service, which is a long running task that will periodically -refresh the leases that you specify. You can create multiple `etcd::KeepAlive` instances -if you need to have different refresh intervals for different leases. +refresh the leases that you specify. You can add multiple leases with different +refresh intervals to the KeepAlive service, and it will start up timers for each. +You should set the refresh interval to something less than your lease TTL, so that +any small delays in processing do not cause your lease to expire. The first refresh +request is sent immediately after adding the lease to the service, and then at +each `refresh_interval` thereafter. ```c++ etcd::Client etcd("http://127.0.0.1:4001"); - etcd::KeepAlive keepalive{etcd}; // Creates the KeepAlive service, but does not start it - keepalive.start(2000); // Starts the service with a 2s refresh interval. + { + etcd::KeepAlive keepalive{etcd}; // Creates the KeepAlive service - etcd::Response resp = etcd.leasegrant(5).get(); // create a lease - int64_t lease_id = resp.value().lease(); - etcd.set("/test/key2", "bar", lease_id); // attach lease to the key - std::cout << "ttl" << resp.value().ttl() << std::endl; - - // Tell the KeepAlive service to refresh this lease every 2 seconds - keepalive.add(resp.value().lease()); - - // ... + etcd::Response resp = etcd.leasegrant(5).get(); // create a lease + int64_t lease_id = resp.value().lease(); + etcd.set("/test/key2", "bar", lease_id); // attach lease to the key + std::cout << "ttl" << resp.value().ttl() << std::endl; + + // Tell the KeepAlive service to refresh this lease every 1.5 seconds + keepalive.add(resp.value().lease(), std::chrono::milliseconds(1500)); + + // ... - // Remove the lease from the KeepAlive, and also immediately revoke it - keepalive.remove(lease_id, true); - // '/test/key2' is now deleted + // Remove the lease from the KeepAlive, and also immediately revoke it + keepalive.remove(lease_id, true); + // '/test/key2' is now deleted + } + // When the KeepAlive service object goes out of scope and is deleted, + // all leases are removed but are not immediately revoked. ``` ### TODO diff --git a/etcd/KeepAlive.hpp b/etcd/KeepAlive.hpp index e3daf5f..64afb95 100644 --- a/etcd/KeepAlive.hpp +++ b/etcd/KeepAlive.hpp @@ -1,9 +1,14 @@ #pragma once -#include -#include #include +#include +#include +#include +#include +#include +#include +#include #include #include "etcd/Client.hpp" @@ -18,34 +23,71 @@ using etcdserverpb::KV; using grpc::Channel; namespace etcd { -class KeepAlive { - enum class Type { READ = 1, WRITE = 2, CONNECT = 3, WRITES_DONE = 4, FINISH = 5 }; +/** + * Wrapper for a boost::asio timer that provides simple start/stop/repeat + * functions + */ +class KeepAliveTimer { +public: + typedef boost::function + handler_function; - public: + KeepAliveTimer(boost::asio::io_context &io, + boost::asio::chrono::milliseconds interval) + : interval_(interval), timer_(boost::asio::steady_timer(io, interval)) {} + ~KeepAliveTimer() { timer_.cancel(); } + + void setCallback(handler_function handler) { handler_ = handler; } + + void start() { + timer_.expires_from_now(boost::asio::chrono::milliseconds( + 50) /* expire immediately; set to interval_ otherwise*/); + timer_.async_wait(boost::bind(handler_, boost::asio::placeholders::error, + this)); // boost::ref(*this))); + } + + void repeat() { + timer_.expires_at(timer_.expiry() + interval_); + timer_.async_wait(boost::bind(handler_, boost::asio::placeholders::error, + this)); // boost::ref(*this))); + } + + void stop() { timer_.cancel(); } + +private: + boost::asio::steady_timer timer_; + boost::asio::chrono::milliseconds interval_; + handler_function handler_; +}; + +/** + * The main KeepAlive service class. You only need one per process since it can + * service many leases with different refresh intervals + */ +class KeepAlive { + enum class Type { + READ = 1, + WRITE = 2, + CONNECT = 3, + WRITES_DONE = 4, + FINISH = 5 + }; + +public: /** * Create an instance of the KeepAlive service - * Call start() to being the timer that automatically sends keepalives - * Call add(leaseid) to add a leaseid to be kept alive by this service + * Call add(leaseid, refresh_interval) to add a leaseid to be kept alive by + * this service * @param client the client etcd connection to use */ - KeepAlive(Client& client); - - /** - * Start processing keepalive's - * The refresh time must be less than the granted TTL of your - * leases, otherwise they will expire. Create multiple KeepAlive - * services with different refreshes if you have many leases with varied - * TTL's - * @param refresh_in_ms the time between refreshes (default: 5000ms) - */ - pplx::task start(int refresh_in_ms = 5000); + KeepAlive(Client &client); /** * Add a lease to be kept alive by this service * @param leaseid the id of the lease - * @param ttl the ttl of the lease (in seconds) + * @param refresh_interval the interval to refresh the lease */ - void add(int64_t leaseid, int ttl = 5); + void add(int64_t leaseid, boost::asio::chrono::milliseconds refresh_interval); /** * Remove a lease from being kept alive, and optionally revoke it immediately @@ -54,32 +96,61 @@ class KeepAlive { */ void remove(int64_t leaseid, bool revoke = false); + /** + * Remove all leases that the service is currently refreshing. + * @param revoke true to immediately revoke the leases (defaulst to false) + */ + void removeAll(bool revoke = false); + ~KeepAlive(); - private: +private: + KeepAlive(KeepAlive const &rhs); // prevent copying + KeepAlive &operator=(KeepAlive const &rhs); // prevent assignment + + /** + * Moves the registered leases to a processing queue, and then resets the + * timer and triggers the first lease renewal request + */ + void queueKeepAlivesCallback(const boost::system::error_code &error, + KeepAliveTimer *timer); + /** * Sends a keep alive for the next lease in the queue */ - void sendNextKeepAlive(); + void sendNextKeepAlive(std::unique_ptr> &stream); + /** - * + * Read the response off the stream */ - void readNextMessage(); + void readNextMessage(std::unique_ptr> &stream); - // The timer that is triggering refreshes - std::unique_ptr> timer_; + // Boost io_context, which is responsible for doing the timer work + boost::asio::io_context io_; + // Thread for the io_context to use + std::unique_ptr t_; - // The map of leases that have been registered with this service to be kept alive - pplx::concurrent_unordered_map leases_; - - // The current queue of leases that still need to be refreshed on this pass of the timer - pplx::concurrent_queue> leaseQueue_; + // A mutex to use to protect all of our lease maps/queues when manipulating + // them + std::mutex mutex_; + // The map of leases that have been registered with this service to be kept + // alive + std::map> leases_; + // All of the timers that are currently running + std::map> + timers_; + // The current queue of keepalives that need to be sent + std::queue keepalive_queue_; // The long running task for this service - pplx::task currentTask_; + pplx::task current_task_; // The client that we are attached to. - Client& client_; + Client &client_; // Context for the client. It could be used to convey extra information to // the server and/or tweak certain RPC behaviors. @@ -95,7 +166,8 @@ class KeepAlive { // The bidirectional, asynchronous stream for sending/receiving messages. std::unique_ptr< - grpc::ClientAsyncReaderWriter> + grpc::ClientAsyncReaderWriter> stream_; // Allocated protobuf that holds the response. In real clients and servers, @@ -107,4 +179,4 @@ class KeepAlive { // Finish status when the client is done with the stream. grpc::Status finish_status_ = grpc::Status::OK; }; -} // namespace etcd +} // namespace etcd diff --git a/etcd/v3/AsyncLeaseKeepAliveAction.hpp b/etcd/v3/AsyncLeaseKeepAliveAction.hpp index 5e2ebfe..7458ad0 100644 --- a/etcd/v3/AsyncLeaseKeepAliveAction.hpp +++ b/etcd/v3/AsyncLeaseKeepAliveAction.hpp @@ -5,7 +5,7 @@ #include "etcd/Response.hpp" #include "etcd/v3/Action.hpp" -#include "etcd/v3/AsyncleaseKeepAliveResponse.hpp" +#include "etcd/v3/AsyncLeaseKeepAliveResponse.hpp" #include "proto/rpc.grpc.pb.h" using etcd::Response; diff --git a/src/KeepAlive.cpp b/src/KeepAlive.cpp index 7bda8d2..e89bf03 100644 --- a/src/KeepAlive.cpp +++ b/src/KeepAlive.cpp @@ -1,55 +1,46 @@ +/** + * file: KeepAlive.cpp + * credit to https://github.com/grpc/grpc/pull/8934 for an example on using grpc + * bidi in c++ + */ #include "etcd/KeepAlive.hpp" -#include -#include - #include "etcd/v3/AsyncLeaseKeepAliveAction.hpp" #include "etcd/v3/AsyncLeaseRevokeAction.hpp" -etcd::KeepAlive::KeepAlive(Client& client) : client_(client) { stub_ = etcdserverpb::Lease::NewStub(client.channel); } +etcd::KeepAlive::KeepAlive(Client &client) + : client_(client) { // , strand_(boost::asio::make_strand(io_)) { + stub_ = etcdserverpb::Lease::NewStub(client.channel); -pplx::task etcd::KeepAlive::start(int refresh_in_ms) { - pplx::task_completion_event tce; + // Open the grpc bidi stream + stream_ = stub_->AsyncLeaseKeepAlive(&context_, &cq_, + reinterpret_cast(Type::CONNECT)); - // Start a repetetive timer that will trigger our lease keepalive refresh - timer_ = std::unique_ptr>(new pplx::timer(refresh_in_ms, 0, nullptr, true)); + // Start up the long-running task that will manage the stream + current_task_ = pplx::task([this]() { + // Keep the io context running even without any work (otherwise it exits + // immediately before any timers are started) + boost::asio::executor_work_guard work{ + io_.get_executor()}; + // spin up a thread that runs the boost io_context for our timer + t_ = std::unique_ptr( + new boost::thread(boost::bind(&boost::asio::io_context::run, &io_))); - // Open the stream - stream_ = stub_->AsyncLeaseKeepAlive(&context_, &cq_, reinterpret_cast(Type::CONNECT)); - - // Define the callback that is going to be repeatedly called to create a queue of lease id's - // that need to be refreshed - auto queueKeepAlivesCallback = new pplx::call([this](int) { - // std::cout << " ** Queueing keepalives: " << std::endl; - - // Copy the lease id's to a queue for processing - pplx::concurrent_unordered_map::iterator itr; - for (itr = leases_.begin(); itr != leases_.end(); ++itr) { - leaseQueue_.push(*itr); - } - - // Kick off the process by sending the first keepalive command - // You can have at most one write or at most one read at any given time, - // so we are not sending another keepalive on the stream until the previous one - // responds - sendNextKeepAlive(); - // tce.set(); - }); - - // Create the long running task that will monitor the Completion Queue and - // send/receive data on the stream - // On a CONNECT, this will kick off the timer which then triggers the keepalive's - currentTask_ = pplx::task([this, tce, queueKeepAlivesCallback]() { - while (true) { - void* got_tag; + bool shutdown = false; + while (!shutdown) { + void *got_tag; bool ok = false; - // Block until the next result is available in the completion queue - // "cq". The return value of Next should always be checked. This - // return value tells us whether there is any kind of event or the cq_ - // is shutting down. - // std::cout << "Waiting for next event..." << std::endl; +// Block until the next result is available in the completion queue +// "cq". The return value of Next should always be checked. This +// return value tells us whether there is any kind of event or the cq_ +// is shutting down. +#if DEBUG + std::cout << "Waiting for next event..." << std::endl; +#endif if (!cq_.Next(&got_tag, &ok)) { - // std::cerr << "Client stream closed. Quitting" << std::endl; +#if DEBUG + std::cerr << "Client stream closed. Quitting" << std::endl; +#endif break; } @@ -59,69 +50,237 @@ pplx::task etcd::KeepAlive::start(int refresh_in_ms) { // to a void*, so we don't have extra memory management to take care // of. if (ok) { - // std::cout << std::endl << "**** Processing completion queue tag " << got_tag << std::endl; - +#if DEBUG + std::cout << std::endl + << "**** Processing completion queue tag " << got_tag + << std::endl; +#endif switch (static_cast(reinterpret_cast(got_tag))) { - case Type::READ: - // std::cout << "Read a new message, sending next." << std::endl; - sendNextKeepAlive(); - break; - case Type::WRITE: - // std::cout << "Sent message (async), attempting to read response." << std::endl; - readNextMessage(); - break; - case Type::CONNECT: - // std::cout << "Server connected." << std::endl; - tce.set(); - timer_->link_target(queueKeepAlivesCallback); - timer_->start(); - break; - case Type::WRITES_DONE: - // std::cout << "Server disconnecting." << std::endl; - timer_->stop(); - break; - case Type::FINISH: - // std::cout << "Client finish; status = " << (finish_status_.ok() ? "ok" : "cancelled") << std::endl; - context_.TryCancel(); - cq_.Shutdown(); - break; - default: - // std::cerr << "Unexpected tag " << got_tag << std::endl; + case Type::READ: +#if DEBUG + std::cout << "Read a new message, sending next." << std::endl; +#endif + sendNextKeepAlive(stream_); + break; + case Type::WRITE: +#if DEBUG + std::cout << "Sent message (async), attempting to read response." + << std::endl; +#endif + readNextMessage(stream_); + break; + case Type::CONNECT: +#if DEBUG + std::cout << "Server connected." << std::endl; +#endif + break; + case Type::WRITES_DONE: +#if DEBUG + std::cout << "Server disconnecting." << std::endl; +#endif + stream_->Finish(&finish_status_, + reinterpret_cast(Type::FINISH)); + break; + case Type::FINISH: +#if DEBUG + std::cout << "Client finish; status = " + << (finish_status_.ok() ? "ok" : "cancelled") << std::endl; +#endif + context_.TryCancel(); + cq_.Shutdown(); + io_.stop(); +#if DEBUG + std::cout << "Cleanup complete." << std::endl; +#endif + shutdown = true; + break; + default: +#if DEBUG + std::cerr << "Unexpected tag " << got_tag << std::endl; +#endif + break; } } } }); - - // Returning a task that completes once the CONNECT completes, so that the - // client can perform anything they need to at that time. - pplx::task start_completed(tce); - return start_completed; } -void etcd::KeepAlive::add(int64_t leaseid, int ttl) { leases_.insert(std::make_pair(leaseid, ttl)); } +void etcd::KeepAlive::queueKeepAlivesCallback( + const boost::system::error_code &error, KeepAliveTimer *timer) { +#if DEBUG + std::time_t time = + std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); + std::cout << " ** Queueing keepalives: " << std::ctime(&time); +#endif + + if (error == boost::asio::error::operation_aborted) { +#if DEBUG + std::cout << " ** Aborting" << std::endl; +#endif + return; + } + + // Lock the mutex and copy the lease id's to a queue for processing + { + std::lock_guard lg(mutex_); + std::map>::iterator itr; + for (itr = leases_.begin(); itr != leases_.end(); ++itr) { + if (itr->second.get() == timer) { +#if DEBUG + std::cout << " ** Queueing keepalive lease: " << itr->first + << std::endl; +#endif + keepalive_queue_.push(itr->first); + } + } + } + + // Trigger the timer to repeat + timer->repeat(); + + // Kick off the queue processing by sending the first keepalive command + // You can have at most one write or at most one read at any given time, + // so we are not sending another keepalive on the stream until the previous + // one responds + sendNextKeepAlive(stream_); +} + +void etcd::KeepAlive::add(int64_t leaseid, + boost::asio::chrono::milliseconds refresh_interval) { + std::lock_guard lg(mutex_); + + // See if this lease is already being handled + auto leases_search = leases_.find(leaseid); + if (leases_search != leases_.end()) { + // Lease already exists + return; + } + + // Look to see if we already have a timer with this interval + auto search = timers_.find(refresh_interval); + if (search != timers_.end()) { + // Existing timer, add this lease to the timer + // std::cout << "Found timer with refresh interval: " << + // search->first.count() << std::endl; + leases_.insert(std::make_pair(leaseid, search->second)); + + } else { + // No existing timer, create one and add this lease + // std::cout << "No timer found with refresh interval: " << + // refresh_interval.count() << std::endl; + auto timer = std::make_shared(io_, refresh_interval); + timer->setCallback(boost::bind(&KeepAlive::queueKeepAlivesCallback, this, + boost::asio::placeholders::error, + timer.get())); + timer->start(); + timers_.insert(std::make_pair(refresh_interval, timer)); + leases_.insert(std::make_pair(leaseid, timer)); + } +} void etcd::KeepAlive::remove(int64_t leaseid, bool revoke) { - leases_.unsafe_erase(leaseid); + { + std::lock_guard lg(mutex_); + + // Search for the leaseid + auto lease = leases_.find(leaseid); + if (lease != leases_.end()) { + // Lease exists, let's remove it + // Check if the timer has any other leases attached or if it needs to be + // removed + bool delete_timer = true; + for (auto timer_search = leases_.begin(); timer_search != leases_.end(); + ++timer_search) { + if (timer_search->second == lease->second && + timer_search->first != lease->first) { + // Found another lease using this timer, so don't delete the timer + delete_timer = false; + break; + } + } + // If we need to delete the timer... + if (delete_timer) { + // First make sure to stop the timer. + lease->second->stop(); + // Find the timer in our timers_ map + for (auto timer_search = timers_.begin(); timer_search != timers_.end(); + ++timer_search) { + if (timer_search->second == lease->second) { + timers_.erase(timer_search); + break; + } + } + } + // Finally, we can erase the lease entry + leases_.erase(leaseid); + } // else = no record of the lease + } + if (revoke) { client_.leaserevoke(leaseid); } } -void etcd::KeepAlive::sendNextKeepAlive() { - std::pair lease; - if (leaseQueue_.try_pop(lease)) { - // std::cout << "sending keepalive for " << lease.first << std::endl; - LeaseKeepAliveRequest request; - request.set_id(lease.first); - stream_->Write(request, reinterpret_cast(Type::WRITE)); - } else { - // std::cout << "no keepalives left" << std::endl; +void etcd::KeepAlive::sendNextKeepAlive( + std::unique_ptr< + grpc::ClientAsyncReaderWriter> + &stream) { + LeaseKeepAliveRequest request; + + { + std::lock_guard lg(mutex_); + if (!keepalive_queue_.empty()) { + int64_t leaseid = keepalive_queue_.front(); + keepalive_queue_.pop(); +#if DEBUG + std::cout << "sending keepalive for " << leaseid << std::endl; +#endif + request.set_id(leaseid); + } else { + // std::cout << "no keepalives left" << std::endl; + return; + } } + + stream->Write(request, reinterpret_cast(Type::WRITE)); } -void etcd::KeepAlive::readNextMessage() { +void etcd::KeepAlive::readNextMessage( + std::unique_ptr< + grpc::ClientAsyncReaderWriter> + &stream) { // std::cout << " ** Got response: " << response_.ttl() << std::endl; - stream_->Read(&response_, reinterpret_cast(Type::READ)); + stream->Read(&response_, reinterpret_cast(Type::READ)); } -etcd::KeepAlive::~KeepAlive() { cq_.Shutdown(); } +void etcd::KeepAlive::removeAll(bool revoke) { + { + // empty the queue + std::lock_guard lg(mutex_); + std::queue empty; + std::swap(keepalive_queue_, empty); + } + + if (revoke) { + // revoke each lease + for (auto lease : leases_) { + client_.leaserevoke(lease.first); + } + } + // Clear the maps, which will destroy the timers as the shared_ptr's go out of + // scope + std::lock_guard lg(mutex_); + leases_.clear(); + timers_.clear(); +} + +etcd::KeepAlive::~KeepAlive() { + removeAll(); + stream_->WritesDone(reinterpret_cast(Type::WRITES_DONE)); + try { + t_->join(); + } catch (const boost::thread_interrupted &) { /* suppress */ + }; +}