Fixes memory leak issue inside the watcher (#197)

Signed-off-by: Tao He <sighingnow@gmail.com>
This commit is contained in:
Tao He 2023-03-05 23:08:10 +08:00 committed by GitHub
parent 80b4d2178f
commit b12fc293b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 263 additions and 77 deletions

View File

@ -6,6 +6,9 @@ concurrency:
group: ${{ github.repository }}-${{ github.event.number || github.head_ref || github.sha }}-${{ github.workflow }} group: ${{ github.repository }}-${{ github.event.number || github.head_ref || github.sha }}-${{ github.workflow }}
cancel-in-progress: true cancel-in-progress: true
env:
SEGFAULT_SIGNALS: all
jobs: jobs:
build: build:
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
@ -129,6 +132,10 @@ jobs:
libgrpc++-dev \ libgrpc++-dev \
protobuf-compiler-grpc protobuf-compiler-grpc
# install libsegfault.so
sudo apt-get install libc6 || true
sudo apt-get install glibc-tools || true
- name: Install dependencies for Mac - name: Install dependencies for Mac
if: runner.os == 'macOS' if: runner.os == 'macOS'
run: | run: |
@ -172,6 +179,10 @@ jobs:
make -j`nproc` make -j`nproc`
sudo make install sudo make install
- name: Setup tmate session
if: false
uses: mxschmitt/action-tmate@v3
- name: Build - name: Build
run: | run: |
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib:/usr/local/lib/x86_64-linux-gnu export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib:/usr/local/lib/x86_64-linux-gnu
@ -211,12 +222,18 @@ jobs:
echo "Run the etcd test ........................." echo "Run the etcd test ........................."
./build/bin/EtcdTest ./build/bin/EtcdTest
echo "Run the etcd campaign test ........................."
./build/bin/CampaignTest
echo "Run the etcd memory leak test ........................." echo "Run the etcd memory leak test ........................."
./build/bin/MemLeakTest ./build/bin/MemLeakTest
echo "Run the etcd watcher test ........................." echo "Run the etcd watcher test ........................."
./build/bin/WatcherTest ./build/bin/WatcherTest
echo "Run the etcd memory leak in watcher test ........................."
./build/bin/MemLeakWatcherTest
echo "Run the etcd keepalive test ........................." echo "Run the etcd keepalive test ........................."
./build/bin/KeepAliveTest ./build/bin/KeepAliveTest

View File

@ -39,7 +39,7 @@ if(NOT "${ETCD_CMAKE_CXX_STANDARD}")
set(ETCD_CMAKE_CXX_STANDARD ${CMAKE_CXX_STANDARD}) set(ETCD_CMAKE_CXX_STANDARD ${CMAKE_CXX_STANDARD})
endif() endif()
endif() endif()
message("Building etcd-cpp-apiv3 with C++${ETCD_CMAKE_CXX_STANDARD}") message(STATUS "Building etcd-cpp-apiv3 with C++${ETCD_CMAKE_CXX_STANDARD}")
# reference: https://gitlab.kitware.com/cmake/community/-/wikis/doc/cmake/RPATH-handling#always-full-rpath # reference: https://gitlab.kitware.com/cmake/community/-/wikis/doc/cmake/RPATH-handling#always-full-rpath
set(CMAKE_BUILD_WITH_INSTALL_RPATH FALSE) set(CMAKE_BUILD_WITH_INSTALL_RPATH FALSE)
@ -134,7 +134,7 @@ else()
include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/FindGRPC.cmake) include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/FindGRPC.cmake)
set(GRPC_LIBRARIES ${GPR_LIBRARY} ${GRPC_LIBRARY} ${GRPC_GRPC++_LIBRARY}) set(GRPC_LIBRARIES ${GPR_LIBRARY} ${GRPC_LIBRARY} ${GRPC_GRPC++_LIBRARY})
endif() endif()
message("-- Found GRPC: ${GRPC_LIBRARIES} (found version: \"${gRPC_VERSION}\")") message("-- Found GRPC: ${GRPC_LIBRARIES}, ${GRPC_CPP_PLUGIN} (found version: \"${gRPC_VERSION}\")")
# avoid use the apt-get installed libgrpc-dev (version v1.13) on Ubuntu 18.04 # avoid use the apt-get installed libgrpc-dev (version v1.13) on Ubuntu 18.04
if(gRPC_FOUND AND gRPC_VERSION VERSION_LESS "1.14") if(gRPC_FOUND AND gRPC_VERSION VERSION_LESS "1.14")
message(FATAL_ERROR "gRPC '${gRPC_VERSION}' is not supported, please install a newer gRPC library " message(FATAL_ERROR "gRPC '${gRPC_VERSION}' is not supported, please install a newer gRPC library "

View File

@ -42,6 +42,7 @@ namespace etcdv3 {
namespace detail { namespace detail {
std::string string_plus_one(std::string const &value); std::string string_plus_one(std::string const &value);
std::string resolve_etcd_endpoints(std::string const &default_endpoints);
} }
} }

View File

@ -66,6 +66,8 @@ namespace etcdv3
public: public:
Action(etcdv3::ActionParameters const &params); Action(etcdv3::ActionParameters const &params);
Action(etcdv3::ActionParameters && params); Action(etcdv3::ActionParameters && params);
virtual ~Action();
void waitForResponse(); void waitForResponse();
const std::chrono::high_resolution_clock::time_point startTimepoint(); const std::chrono::high_resolution_clock::time_point startTimepoint();
protected: protected:
@ -83,6 +85,7 @@ namespace etcdv3
namespace detail { namespace detail {
std::string string_plus_one(std::string const &value); std::string string_plus_one(std::string const &value);
std::string resolve_etcd_endpoints(std::string const &default_endpoints);
} }
} }
#endif #endif

View File

@ -26,6 +26,7 @@ namespace etcdv3
void CancelWatch(); void CancelWatch();
bool Cancelled() const; bool Cancelled() const;
private: private:
int64_t watch_id = -1;
WatchResponse reply; WatchResponse reply;
std::unique_ptr<ClientAsyncReaderWriter<WatchRequest,WatchResponse>> stream; std::unique_ptr<ClientAsyncReaderWriter<WatchRequest,WatchResponse>> stream;
std::atomic_bool isCancelled; std::atomic_bool isCancelled;

View File

@ -34,10 +34,13 @@ namespace etcdv3
extern char const * KEEPALIVE_WRITE; extern char const * KEEPALIVE_WRITE;
extern char const * KEEPALIVE_READ; extern char const * KEEPALIVE_READ;
extern char const * KEEPALIVE_DONE; extern char const * KEEPALIVE_DONE;
extern char const * KEEPALIVE_FINISH;
extern char const * WATCH_CREATE; extern char const * WATCH_CREATE;
extern char const * WATCH_WRITE; extern char const * WATCH_WRITE;
extern char const * WATCH_WRITE_CANCEL;
extern char const * WATCH_WRITES_DONE; extern char const * WATCH_WRITES_DONE;
extern char const * WATCH_FINISH;
extern char const * ELECTION_OBSERVE_CREATE; extern char const * ELECTION_OBSERVE_CREATE;

View File

@ -11,6 +11,12 @@ struct etcd::Watcher::EtcdServerStubs {
void etcd::Watcher::EtcdServerStubsDeleter::operator()(etcd::Watcher::EtcdServerStubs *stubs) { void etcd::Watcher::EtcdServerStubsDeleter::operator()(etcd::Watcher::EtcdServerStubs *stubs) {
if (stubs) { if (stubs) {
if (stubs->watchServiceStub) {
stubs->watchServiceStub.reset();
}
if (stubs->call) {
stubs->call.reset();
}
delete stubs; delete stubs;
} }
} }

View File

@ -15,6 +15,13 @@ etcdv3::Action::Action(etcdv3::ActionParameters && params)
this->InitAction(); this->InitAction();
} }
etcdv3::Action::~Action() {
cq_.Shutdown();
// cancel on-the-fly calls
context.TryCancel();
}
void etcdv3::Action::InitAction() { void etcdv3::Action::InitAction() {
if (!parameters.auth_token.empty()) { if (!parameters.auth_token.empty()) {
// use `token` as the key, see: // use `token` as the key, see:
@ -104,6 +111,10 @@ std::string etcdv3::detail::string_plus_one(std::string const &value) {
return s; return s;
} }
} }
return {etcdv3::NUL}; return {etcdv3::NUL};
} }
std::string etcdv3::detail::resolve_etcd_endpoints(std::string const&default_endpoints) {
const char *ep = std::getenv("ETCD_ENDPOINTS");
return ep ? ep : default_endpoints;
}

View File

@ -183,17 +183,23 @@ void etcdv3::AsyncLeaseKeepAliveAction::CancelKeepAlive()
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_DONE) { if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::KEEPALIVE_DONE) {
// ok // ok
} else { } else {
std::cerr << "Failed to mark a lease keep-alive connection as DONE" << std::endl; std::cerr << "Failed to mark a lease keep-alive connection as DONE: "
<< context.debug_error_string() << std::endl;
} }
grpc::Status status; grpc::Status status;
stream->Finish(&status, (void *)this); stream->Finish(&status, (void *)KEEPALIVE_FINISH);
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)this) { if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)KEEPALIVE_FINISH) {
// ok // ok
} else { } else {
std::cerr << "Failed to finish a lease keep-alive connection" << std::endl; std::cerr << "Failed to finish a lease keep-alive connection: "
<< status.error_message()
<< ", " << context.debug_error_string() << std::endl;
} }
// cancel on-the-fly calls
context.TryCancel();
cq_.Shutdown(); cq_.Shutdown();
} }
} }

View File

@ -2,8 +2,6 @@
#include "etcd/v3/action_constants.hpp" #include "etcd/v3/action_constants.hpp"
using etcdserverpb::RangeRequest;
using etcdserverpb::RangeResponse;
using etcdserverpb::WatchCreateRequest; using etcdserverpb::WatchCreateRequest;
etcdv3::AsyncWatchAction::AsyncWatchAction( etcdv3::AsyncWatchAction::AsyncWatchAction(
@ -45,6 +43,7 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(
// wait "write" (WatchCreateRequest) success, and start to read the first reply // wait "write" (WatchCreateRequest) success, and start to read the first reply
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::WATCH_WRITE) { if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)etcdv3::WATCH_WRITE) {
stream->Read(&reply, (void*)this); stream->Read(&reply, (void*)this);
this->watch_id = reply.watch_id();
} else { } else {
throw std::runtime_error("failed to write WatchCreateRequest to server"); throw std::runtime_error("failed to write WatchCreateRequest to server");
} }
@ -61,47 +60,60 @@ void etcdv3::AsyncWatchAction::waitForResponse()
{ {
break; break;
} }
if(isCancelled.load()) { if(got_tag == (void *)etcdv3::WATCH_WRITE_CANCEL) {
break; stream->WritesDone((void*)etcdv3::WATCH_WRITES_DONE);
continue;
} }
if(got_tag == (void*)etcdv3::WATCH_WRITES_DONE) { if(got_tag == (void*)etcdv3::WATCH_WRITES_DONE)
isCancelled.store(true); {
grpc::Status status;
stream->Finish(&status, (void *)etcdv3::WATCH_FINISH);
continue;
}
if (got_tag == (void *)etcdv3::WATCH_FINISH) {
// shutdown
cq_.Shutdown(); cq_.Shutdown();
// cancel on-the-fly calls
context.TryCancel();
break; break;
} }
if(got_tag == (void*)this) // read tag if(got_tag == (void*)this) // read tag
{ {
if (reply.canceled()) { if (reply.canceled()) {
isCancelled.store(true); // cancel on-the-fly calls, but don't shutdown the completion queue as there
cq_.Shutdown(); // are still a inflight call to finish
break; context.TryCancel();
// cq_.Shutdown();
continue;
} }
else if ((reply.created() && reply.header().revision() < parameters.revision) ||
reply.events_size() > 0) {
// we stop watch under two conditions: // we stop watch under two conditions:
// //
// 1. watch for a future revision, return immediately with empty events set // 1. watch for a future revision, return immediately with empty events set
// 2. receive any effective events. // 2. receive any effective events.
isCancelled.store(true); if ((reply.created() && reply.header().revision() < parameters.revision) ||
reply.events_size() > 0) {
stream->WritesDone((void*)etcdv3::WATCH_WRITES_DONE);
grpc::Status status;
stream->Finish(&status, (void *)this);
cq_.Shutdown();
// leave a warning if the response is too large and been fragmented // leave a warning if the response is too large and been fragmented
if (reply.fragment()) { if (reply.fragment()) {
std::cerr << "WARN: The response hasn't been fully received and parsed" << std::endl; std::cerr << "WARN: The response hasn't been fully received and parsed" << std::endl;
} }
break;
} std::cout << "issue a watch cancel" << std::endl;
else // cancel the watcher after receiving the good response
{ this->CancelWatch();
// otherwise, start next round read-reply
// start the next round to read finish messages, read into "&dummy"
// (use nullptr, as it won't be touched).
stream->Read(nullptr, (void*)etcdv3::WATCH_FINISH);
} else {
// start the next round to read reply, read into "&reply"
stream->Read(&reply, (void*)this); stream->Read(&reply, (void*)this);
} }
continue;
}
if(isCancelled.load()) {
// invalid tag, and is cancelled
break;
} }
} }
} }
@ -109,12 +121,10 @@ void etcdv3::AsyncWatchAction::waitForResponse()
void etcdv3::AsyncWatchAction::CancelWatch() void etcdv3::AsyncWatchAction::CancelWatch()
{ {
if (!isCancelled.exchange(true)) { if (!isCancelled.exchange(true)) {
stream->WritesDone((void*)etcdv3::WATCH_WRITES_DONE); WatchRequest cancel_req;
cancel_req.mutable_cancel_request()->set_watch_id(this->watch_id);
grpc::Status status; stream->Write(cancel_req, (void *)etcdv3::WATCH_WRITE_CANCEL);
stream->Finish(&status, (void *)this); isCancelled.store(true);
cq_.Shutdown();
} }
} }
@ -133,32 +143,43 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response
{ {
break; break;
} }
if(isCancelled.load()) { if(got_tag == (void *)etcdv3::WATCH_WRITE_CANCEL) {
break; stream->WritesDone((void*)etcdv3::WATCH_WRITES_DONE);
continue;
} }
if(got_tag == (void*)etcdv3::WATCH_WRITES_DONE) if(got_tag == (void*)etcdv3::WATCH_WRITES_DONE)
{ {
isCancelled.store(true); grpc::Status status;
stream->Finish(&status, (void *)etcdv3::WATCH_FINISH);
continue;
}
if (got_tag == (void *)etcdv3::WATCH_FINISH) {
// shutdown
cq_.Shutdown(); cq_.Shutdown();
// cancel on-the-fly calls
context.TryCancel();
break; break;
} }
else if(got_tag == (void*)this) // read tag if(got_tag == (void*)this) // read tag
{ {
if (reply.canceled()) { if (reply.canceled()) {
isCancelled.store(true);
cq_.Shutdown();
if (reply.compact_revision() != 0) { if (reply.compact_revision() != 0) {
auto resp = etcd::Response(grpc::StatusCode::OUT_OF_RANGE /* error code */, auto resp = ParseResponse();
"required revision has been compacted"); auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
resp._compact_revision = reply.compact_revision(); std::chrono::high_resolution_clock::now() - start_timepoint);
callback(resp); callback(etcd::Response(resp, duration));
} }
break; // cancel on-the-fly calls, but don't shutdown the completion queue as there
// are still a inflight call to finish
context.TryCancel();
// cq_.Shutdown();
continue;
} }
// for the callback case, we don't invoke callback immediately if watching
// for a future revision, we wait until there are some effective events.
if(reply.events_size()) if(reply.events_size())
{ {
// for the callback case, we don't stop immediately if watching for a future revison,
// we wait until there are some expected events.
auto resp = ParseResponse(); auto resp = ParseResponse();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>( auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - start_timepoint); std::chrono::high_resolution_clock::now() - start_timepoint);
@ -166,6 +187,11 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response
start_timepoint = std::chrono::high_resolution_clock::now(); start_timepoint = std::chrono::high_resolution_clock::now();
} }
stream->Read(&reply, (void*)this); stream->Read(&reply, (void*)this);
continue;
}
if(isCancelled.load()) {
// invalid tag, and is cancelled
break;
} }
} }
} }

View File

@ -33,10 +33,13 @@ char const * etcdv3::KEEPALIVE_CREATE = "keepalive create";
char const * etcdv3::KEEPALIVE_WRITE = "keepalive write"; char const * etcdv3::KEEPALIVE_WRITE = "keepalive write";
char const * etcdv3::KEEPALIVE_READ = "keepalive read"; char const * etcdv3::KEEPALIVE_READ = "keepalive read";
char const * etcdv3::KEEPALIVE_DONE = "keepalive done"; char const * etcdv3::KEEPALIVE_DONE = "keepalive done";
char const * etcdv3::KEEPALIVE_FINISH = "keepalive finish";
char const * etcdv3::WATCH_CREATE = "watch create"; char const * etcdv3::WATCH_CREATE = "watch create";
char const * etcdv3::WATCH_WRITE = "watch write"; char const * etcdv3::WATCH_WRITE = "watch write";
char const * etcdv3::WATCH_WRITE_CANCEL = "watch write cancel";
char const * etcdv3::WATCH_WRITES_DONE = "watch writes done"; char const * etcdv3::WATCH_WRITES_DONE = "watch writes done";
char const * etcdv3::WATCH_FINISH = "watch finish";
char const * etcdv3::ELECTION_OBSERVE_CREATE = "observe create"; char const * etcdv3::ELECTION_OBSERVE_CREATE = "observe create";

View File

@ -5,7 +5,7 @@
#include "etcd/Client.hpp" #include "etcd/Client.hpp"
static const std::string etcd_url("http://127.0.0.1:2379"); static const std::string etcd_url = etcdv3::detail::resolve_etcd_endpoints("http://127.0.0.1:2379");
TEST_CASE("setup with auth") TEST_CASE("setup with auth")
{ {

View File

@ -23,7 +23,15 @@ foreach(testfile ${TEST_FILES})
target_include_directories(${test_name} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/../proto/gen) target_include_directories(${test_name} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/../proto/gen)
target_include_directories(${test_name} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/../proto/gen/proto) target_include_directories(${test_name} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/../proto/gen/proto)
target_link_libraries(${test_name} etcd-cpp-api) target_link_libraries(${test_name} PRIVATE etcd-cpp-api)
if(UNIX AND NOT APPLE)
if(CMAKE_VERSION VERSION_LESS 3.13)
target_link_libraries(${test_name} PRIVATE -Wl,--no-as-needed -lSegFault -Wl,--as-needed)
else()
target_link_options(${test_name} PRIVATE -Wl,--no-as-needed -lSegFault -Wl,--as-needed)
endif()
endif()
add_dependencies(etcd_tests ${test_name}) add_dependencies(etcd_tests ${test_name})
endforeach() endforeach()

39
tst/CampaignTest.cpp Normal file
View File

@ -0,0 +1,39 @@
#define CATCH_CONFIG_MAIN
#include <catch.hpp>
#include <chrono>
#include <thread>
#include "etcd/Client.hpp"
#include "etcd/KeepAlive.hpp"
#include "etcd/Response.hpp"
#include "etcd/SyncClient.hpp"
#include "etcd/Value.hpp"
static std::string etcd_uri = etcdv3::detail::resolve_etcd_endpoints(
"http://127.0.0.1:2379,http://127.0.0.1:2479,http://127.0.0.1:2579");
TEST_CASE("campaign and loadership using keepalive") {
etcd::Client etcd(etcd_uri);
auto keepalive = etcd.leasekeepalive(5).get();
auto lease_id = keepalive->Lease();
std::cout << lease_id << std::endl;
std::string value = std::string("192.168.1.6:1880");
auto resp1 = etcd.campaign("/leader", lease_id, value).get();
if (0 == resp1.error_code()) {
std::cout << "became leader: " << resp1.index() << std::endl;
} else {
std::cout << "error code: " << resp1.error_code()
<< "error message: " << resp1.error_message() << std::endl;
assert(false);
}
std::cout << "finish campaign" << std::endl;
auto resp2 = etcd.leader("/leader").get();
std::cout << resp2.value().as_string() << std::endl;
std::cout << resp2.value().key() << std::endl;
std::cout << "finish leader" << std::endl;
}

View File

@ -8,7 +8,7 @@
#include "etcd/Client.hpp" #include "etcd/Client.hpp"
#include "etcd/KeepAlive.hpp" #include "etcd/KeepAlive.hpp"
static const std::string etcd_url("http://127.0.0.1:2379"); static const std::string etcd_url = etcdv3::detail::resolve_etcd_endpoints("http://127.0.0.1:2379");
TEST_CASE("setup") TEST_CASE("setup")
{ {

View File

@ -5,7 +5,7 @@
#include "etcd/SyncClient.hpp" #include "etcd/SyncClient.hpp"
static const std::string etcd_url("http://127.0.0.1:2379"); static const std::string etcd_url = etcdv3::detail::resolve_etcd_endpoints("http://127.0.0.1:2379");
TEST_CASE("sync operations") TEST_CASE("sync operations")
{ {

View File

@ -7,7 +7,7 @@
#include "etcd/Client.hpp" #include "etcd/Client.hpp"
static const std::string etcd_url("http://127.0.0.1:2379"); static const std::string etcd_url = etcdv3::detail::resolve_etcd_endpoints("http://127.0.0.1:2379");
TEST_CASE("setup") TEST_CASE("setup")
{ {

View File

@ -12,7 +12,7 @@
#include "etcd/Client.hpp" #include "etcd/Client.hpp"
#include "etcd/KeepAlive.hpp" #include "etcd/KeepAlive.hpp"
static const std::string etcd_url("http://127.0.0.1:2379"); static const std::string etcd_url = etcdv3::detail::resolve_etcd_endpoints("http://127.0.0.1:2379");
TEST_CASE("fork: set in child and get from self") TEST_CASE("fork: set in child and get from self")
{ {

View File

@ -10,7 +10,7 @@
#include "etcd/SyncClient.hpp" #include "etcd/SyncClient.hpp"
#include "etcd/Value.hpp" #include "etcd/Value.hpp"
static std::string etcd_uri( static std::string etcd_uri = etcdv3::detail::resolve_etcd_endpoints(
"http://127.0.0.1:2379,http://127.0.0.1:2479,http://127.0.0.1:2579"); "http://127.0.0.1:2379,http://127.0.0.1:2479,http://127.0.0.1:2579");
TEST_CASE("keepalive revoke and check if alive") { TEST_CASE("keepalive revoke and check if alive") {

View File

@ -10,7 +10,7 @@
#include "etcd/Client.hpp" #include "etcd/Client.hpp"
#include "etcd/KeepAlive.hpp" #include "etcd/KeepAlive.hpp"
static const std::string etcd_url("http://127.0.0.1:2379"); static const std::string etcd_url = etcdv3::detail::resolve_etcd_endpoints("http://127.0.0.1:2379");
TEST_CASE("lock and unlock") TEST_CASE("lock and unlock")
{ {

View File

@ -6,7 +6,7 @@
#include "etcd/Client.hpp" #include "etcd/Client.hpp"
#include "etcd/KeepAlive.hpp" #include "etcd/KeepAlive.hpp"
static const std::string etcd_url("http://127.0.0.1:2379"); static const std::string etcd_url = etcdv3::detail::resolve_etcd_endpoints("http://127.0.0.1:2379");
class DistributedLock { class DistributedLock {
public: public:
@ -75,7 +75,7 @@ DistributedLock::~DistributedLock() noexcept {
int main() { int main() {
int i = 0, t = 0; int i = 0, t = 0;
while(t < 10 /* update this value to make it run for longer */) { while(t < 100 /* update this value to make it run for longer */) {
{ {
DistributedLock lock(std::to_string(i), 0); DistributedLock lock(std::to_string(i), 0);
if(!lock.lock_acquired()) { if(!lock.lock_acquired()) {
@ -88,6 +88,8 @@ int main() {
if (i == 10) { if (i == 10) {
i = 0; i = 0;
} }
if (t % 10 == 0) {
std::cout << "round: i = " << i << ", t = " << t << std::endl; std::cout << "round: i = " << i << ", t = " << t << std::endl;
} }
} }
}

View File

@ -0,0 +1,60 @@
#define CATCH_CONFIG_MAIN
#include <catch.hpp>
#include <future>
#include <memory>
#include "etcd/Client.hpp"
#include "etcd/KeepAlive.hpp"
#include "etcd/Watcher.hpp"
static const std::string etcd_url = etcdv3::detail::resolve_etcd_endpoints("http://127.0.0.1:2379");
static std::atomic_int watcher_called;
void print_response(etcd::Response const & resp)
{
watcher_called.fetch_add(1);
}
/**
* @brief emulate the behavior of creating watcher many times:
*
* 1. create a watcher
* 2. change a value
* 3. cancel the watcher
*/
void watch_once(etcd::Client & client, std::unique_ptr<etcd::Watcher> &watcher, const size_t round) {
const std::string my_prefix = "/test";
const std::string my_key = my_prefix + "/foo";
watcher.reset(new etcd::Watcher(client, my_prefix, print_response, true));
int k = watcher_called.load();
client.set(my_key, "bar-" + std::to_string(round)).wait();
while (true) {
if (watcher_called.load() > k) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
// cancel the watcher
watcher->Cancel();
}
TEST_CASE("watch shouldn't leak memory")
{
watcher_called.store(0);
// issue some changes to see if the watcher works
etcd::Client client(etcd_url);
std::unique_ptr<etcd::Watcher> watcher;
for (int round = 0; round < 10 /* update this value to make it run for longer */; ++round) {
if (round % 50 == 0) {
std::cout << "starting round " << round << std::endl;
}
watch_once(client, watcher, round);
}
std::cout << "watcher been called for " << watcher_called.load() << " times" << std::endl;
}

View File

@ -8,7 +8,7 @@
#include "etcd/SyncClient.hpp" #include "etcd/SyncClient.hpp"
#include "etcd/Watcher.hpp" #include "etcd/Watcher.hpp"
static const std::string etcd_url("http://127.0.0.1:24799"); static const std::string etcd_url = etcdv3::detail::resolve_etcd_endpoints("http://127.0.0.1:2379");
static int watcher_called = 0; static int watcher_called = 0;

View File

@ -9,7 +9,7 @@ static std::string ca = "security-config/certs/ca.crt";
static std::string cert = "security-config/certs/etcd0.example.com.crt"; static std::string cert = "security-config/certs/etcd0.example.com.crt";
static std::string key = "security-config/private/etcd0.example.com.key"; static std::string key = "security-config/private/etcd0.example.com.key";
static const std::string etcd_url("https://127.0.0.1:2379"); static const std::string etcd_url = etcdv3::detail::resolve_etcd_endpoints("https://127.0.0.1:2379");
TEST_CASE("setup with auth") TEST_CASE("setup with auth")
{ {

View File

@ -8,7 +8,7 @@
#include "etcd/Client.hpp" #include "etcd/Client.hpp"
#include "etcd/v3/Transaction.hpp" #include "etcd/v3/Transaction.hpp"
static const std::string etcd_url("http://127.0.0.1:2379"); static const std::string etcd_url = etcdv3::detail::resolve_etcd_endpoints("http://127.0.0.1:2379");
TEST_CASE("setup") TEST_CASE("setup")
{ {

View File

@ -7,7 +7,7 @@
#include "etcd/Watcher.hpp" #include "etcd/Watcher.hpp"
#include "etcd/SyncClient.hpp" #include "etcd/SyncClient.hpp"
static const std::string etcd_url("http://127.0.0.1:2379"); static const std::string etcd_url = etcdv3::detail::resolve_etcd_endpoints("http://127.0.0.1:2379");
static int watcher_called = 0; static int watcher_called = 0;