Make "CancelWatch()" thread-safe and happen only once, fixes the potential assert failure in gRPC routines.
Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
This commit is contained in:
parent
7fe755ae53
commit
dd1f106150
|
|
@ -1,18 +1,18 @@
|
||||||
#ifndef __ASYNC_WATCHACTION_HPP__
|
#ifndef __ASYNC_WATCHACTION_HPP__
|
||||||
#define __ASYNC_WATCHACTION_HPP__
|
#define __ASYNC_WATCHACTION_HPP__
|
||||||
|
|
||||||
|
#include <mutex>
|
||||||
|
|
||||||
#include <grpc++/grpc++.h>
|
#include <grpc++/grpc++.h>
|
||||||
#include "proto/rpc.grpc.pb.h"
|
#include "proto/rpc.grpc.pb.h"
|
||||||
#include "etcd/v3/Action.hpp"
|
#include "etcd/v3/Action.hpp"
|
||||||
#include "etcd/v3/AsyncWatchResponse.hpp"
|
#include "etcd/v3/AsyncWatchResponse.hpp"
|
||||||
#include "etcd/Response.hpp"
|
#include "etcd/Response.hpp"
|
||||||
|
|
||||||
|
|
||||||
using grpc::ClientAsyncReaderWriter;
|
using grpc::ClientAsyncReaderWriter;
|
||||||
using etcdserverpb::WatchRequest;
|
using etcdserverpb::WatchRequest;
|
||||||
using etcdserverpb::WatchResponse;
|
using etcdserverpb::WatchResponse;
|
||||||
|
|
||||||
|
|
||||||
namespace etcdv3
|
namespace etcdv3
|
||||||
{
|
{
|
||||||
class AsyncWatchAction : public etcdv3::Action
|
class AsyncWatchAction : public etcdv3::Action
|
||||||
|
|
@ -29,6 +29,7 @@ namespace etcdv3
|
||||||
WatchResponse reply;
|
WatchResponse reply;
|
||||||
std::unique_ptr<ClientAsyncReaderWriter<WatchRequest,WatchResponse>> stream;
|
std::unique_ptr<ClientAsyncReaderWriter<WatchRequest,WatchResponse>> stream;
|
||||||
bool isCancelled;
|
bool isCancelled;
|
||||||
|
std::mutex protect_is_cancalled;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@ file(GLOB_RECURSE CPP_CLIENT_SRC
|
||||||
"${CMAKE_CURRENT_SOURCE_DIR}/**/*.cpp")
|
"${CMAKE_CURRENT_SOURCE_DIR}/**/*.cpp")
|
||||||
|
|
||||||
set_source_files_properties(${PROTOBUF_GENERATES} PROPERTIES GENERATED TRUE)
|
set_source_files_properties(${PROTOBUF_GENERATES} PROPERTIES GENERATED TRUE)
|
||||||
add_library(etcd-cpp-api SHARED ${CPP_CLIENT_SRC} ${PROTOBUF_GENERATES})
|
add_library(etcd-cpp-api ${CPP_CLIENT_SRC} ${PROTOBUF_GENERATES})
|
||||||
add_dependencies(etcd-cpp-api protobuf_generates)
|
add_dependencies(etcd-cpp-api protobuf_generates)
|
||||||
set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11)
|
set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -87,10 +87,12 @@ void etcdv3::AsyncWatchAction::waitForResponse()
|
||||||
|
|
||||||
void etcdv3::AsyncWatchAction::CancelWatch()
|
void etcdv3::AsyncWatchAction::CancelWatch()
|
||||||
{
|
{
|
||||||
|
std::lock_guard<std::mutex> scope_lock(this->protect_is_cancalled);
|
||||||
if(isCancelled == false)
|
if(isCancelled == false)
|
||||||
{
|
{
|
||||||
stream->WritesDone((void*)"writes done");
|
stream->WritesDone((void*)"writes done");
|
||||||
}
|
}
|
||||||
|
isCancelled = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool etcdv3::AsyncWatchAction::Cancelled() const {
|
bool etcdv3::AsyncWatchAction::Cancelled() const {
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,8 @@
|
||||||
#define CATCH_CONFIG_MAIN
|
#define CATCH_CONFIG_MAIN
|
||||||
#include <catch.hpp>
|
#include <catch.hpp>
|
||||||
|
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
#include "etcd/Watcher.hpp"
|
#include "etcd/Watcher.hpp"
|
||||||
#include "etcd/SyncClient.hpp"
|
#include "etcd/SyncClient.hpp"
|
||||||
|
|
||||||
|
|
@ -70,6 +72,21 @@ TEST_CASE("watch should exit normally")
|
||||||
watcher.Cancel();
|
watcher.Cancel();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_CASE("watch should can be cancelled repeatedly")
|
||||||
|
{
|
||||||
|
// cancal immediately after start watch.
|
||||||
|
etcd::Watcher watcher(etcd_uri, "/test", printResponse, true);
|
||||||
|
std::vector<std::thread> threads(10);
|
||||||
|
for (size_t i = 0; i < 10; ++i) {
|
||||||
|
threads[i] = std::thread([&]() {
|
||||||
|
watcher.Cancel();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
for (size_t i = 0; i < 10; ++i) {
|
||||||
|
threads[i].join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TEST_CASE("request cancellation")
|
// TEST_CASE("request cancellation")
|
||||||
// {
|
// {
|
||||||
// etcd::Client etcd(etcd_uri);
|
// etcd::Client etcd(etcd_uri);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue