make templated create with shared_ptr arguement

with watch snippet(should be deleted next commit)
This commit is contained in:
arches 2016-06-14 11:21:29 -04:00
parent eedbcd4b70
commit 88e5298f86
3 changed files with 53 additions and 19 deletions

View File

@ -146,6 +146,7 @@ namespace etcd
web::http::client::http_client client; web::http::client::http_client client;
std::unique_ptr<KV::Stub> stub_; std::unique_ptr<KV::Stub> stub_;
std::unique_ptr<Watch::Stub> stub1_;
pplx::task<etcd::Response> send_asyncput(const std::string& key, const std::string& value); pplx::task<etcd::Response> send_asyncput(const std::string& key, const std::string& value);
std::unique_ptr<Watch::Stub> watchServiceStub; std::unique_ptr<Watch::Stub> watchServiceStub;
pplx::task<etcd::Response> send_asyncadd(std::string const & key, const std::string& value); pplx::task<etcd::Response> send_asyncadd(std::string const & key, const std::string& value);

View File

@ -27,25 +27,22 @@ namespace etcd
static pplx::task<Response> createResponse(const etcdv3::V3Response& response); static pplx::task<Response> createResponse(const etcdv3::V3Response& response);
template<typename T>static pplx::task<etcd::Response> create(T call) template<typename T>static pplx::task<etcd::Response> create(std::shared_ptr<T> call)
{ {
return pplx::task<etcd::Response>([call]() return pplx::task<etcd::Response>([call]()
{ {
void* got_tag; void* got_tag;
bool ok = false; bool ok = false;
etcd::Response resp; etcd::Response resp;
//blocking //blocking
call->cq_.Next(&got_tag, &ok); call->cq_.Next(&got_tag, &ok);
GPR_ASSERT(got_tag == (void*)call); GPR_ASSERT(got_tag == (void*)call.get());
T call = static_cast<T>(got_tag);
auto v3resp = call->ParseResponse(); auto v3resp = call->ParseResponse();
resp = etcd::Response(v3resp); resp = etcd::Response(v3resp);
delete call; //todo:make this a smart pointer
return resp; return resp;
}); });
}; };

View File

@ -14,6 +14,10 @@ using etcdserverpb::DeleteRangeRequest;
using etcdserverpb::Compare; using etcdserverpb::Compare;
using etcdserverpb::RequestOp; using etcdserverpb::RequestOp;
using grpc::ClientReaderWriter;
using etcdserverpb::WatchRequest;
using etcdserverpb::WatchResponse;
using etcdserverpb::WatchCreateRequest;
etcd::Client::Client(std::string const & address) etcd::Client::Client(std::string const & address)
: client(address) : client(address)
@ -27,6 +31,7 @@ etcd::Client::Client(std::string const & address)
} }
std::shared_ptr<Channel> channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials()); std::shared_ptr<Channel> channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials());
stub_= KV::NewStub(channel); stub_= KV::NewStub(channel);
stub1_= Watch::NewStub(channel);
} }
pplx::task<etcd::Response> etcd::Client::send_get_request(web::http::uri_builder & uri) pplx::task<etcd::Response> etcd::Client::send_get_request(web::http::uri_builder & uri)
@ -295,11 +300,11 @@ pplx::task<etcd::Response> etcd::Client::send_asyncadd(std::string const & key,
req_success->set_allocated_request_range(get_request.release()); req_success->set_allocated_request_range(get_request.release());
etcdv3::AsyncTxnResponse* call(new etcdv3::AsyncTxnResponse("create")); std::shared_ptr<etcdv3::AsyncTxnResponse> call(new etcdv3::AsyncTxnResponse("create"));
call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_); call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call); call->response_reader->Finish(&call->reply, &call->status, (void*)call.get());
return Response::create(call); return Response::create(call);
@ -349,11 +354,11 @@ pplx::task<etcd::Response> etcd::Client::send_asyncmodify_if(std::string const &
etcdv3::AsyncTxnResponse* call= new etcdv3::AsyncTxnResponse("compareAndSwap"); std::shared_ptr<etcdv3::AsyncTxnResponse> call(new etcdv3::AsyncTxnResponse("compareAndSwap"));
call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_); call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_);
call->rpcInstance->Finish(&call->putResponse, &call->status, (void*)call); call->response_reader->Finish(&call->reply, &call->status, (void*)call.get());
return Response::create(call); return Response::create(call);
@ -397,7 +402,7 @@ pplx::task<etcd::Response> etcd::Client::send_asyncmodify(std::string const & ke
req_success->set_allocated_request_range(get_request.release()); req_success->set_allocated_request_range(get_request.release());
etcdv3::AsyncTxnResponse* call= new etcdv3::AsyncTxnResponse("update"); std::shared_ptr<etcdv3::AsyncTxnResponse> call(new etcdv3::AsyncTxnResponse("update"));
call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_); call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_);
@ -419,11 +424,11 @@ pplx::task<etcd::Response> etcd::Client::send_asyncget(std::string const & key,
get_request.set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND); get_request.set_sort_order(RangeRequest::SortOrder::RangeRequest_SortOrder_ASCEND);
} }
etcdv3::AsyncRangeResponse* call= new etcdv3::AsyncRangeResponse(); std::shared_ptr<etcdv3::AsyncRangeResponse> call(new etcdv3::AsyncRangeResponse());
call->response_reader = stub_->AsyncRange(&call->context,get_request,&call->cq_); call->response_reader = stub_->AsyncRange(&call->context,get_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call); call->response_reader->Finish(&call->reply, &call->status, (void*)call.get());
return Response::create(call); return Response::create(call);
} }
@ -431,6 +436,39 @@ pplx::task<etcd::Response> etcd::Client::send_asyncget(std::string const & key,
pplx::task<etcd::Response> etcd::Client::send_asyncput(std::string const & key, std::string const & value) pplx::task<etcd::Response> etcd::Client::send_asyncput(std::string const & key, std::string const & value)
{ {
#if 0
//try watch here:
ClientContext context;
std::shared_ptr<ClientReaderWriter<WatchRequest,WatchResponse>> stream(stub1_->Watch(&context));
WatchRequest watch_req;
WatchCreateRequest watch_create_req;
watch_create_req.set_key(key);
watch_req.mutable_create_request()->CopyFrom(watch_create_req);
stream->Write(watch_req);
std::cout<< "write finished" << std::endl;
WatchResponse server_resp;
while(stream->Read(&server_resp))
{
std::cout<< "read...watch id: "<< server_resp.watch_id()<< std::endl;
if(server_resp.events_size())
{
std::cout << "event type: " << server_resp.events(0).type() << std::endl;
std::cout << "key: " << server_resp.events(0).kv().key() << std::endl;
std::cout << "value: " << server_resp.events(0).kv().value() << std::endl;
stream->WritesDone();
}
}
Status status = stream->Finish();
if(!status.ok())
{
std::cout << "rpc failed" << std::endl;
}
#endif
//check if key is not present //check if key is not present
TxnRequest txn_request; TxnRequest txn_request;
Compare* compare = txn_request.add_compare(); Compare* compare = txn_request.add_compare();
@ -469,11 +507,11 @@ pplx::task<etcd::Response> etcd::Client::send_asyncput(std::string const & key,
req_success = txn_request.add_success(); req_success = txn_request.add_success();
req_success->set_allocated_request_range(get_request.release()); req_success->set_allocated_request_range(get_request.release());
etcdv3::AsyncTxnResponse* call= new etcdv3::AsyncTxnResponse("set"); std::shared_ptr<etcdv3::AsyncTxnResponse> call(new etcdv3::AsyncTxnResponse("set"));
call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_); call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call); call->response_reader->Finish(&call->reply, &call->status, (void*)call.get());
return Response::create(call); return Response::create(call);
} }
@ -543,13 +581,11 @@ pplx::task<etcd::Response> etcd::Client::send_asyncdelete(std::string const & ke
req_failure->set_allocated_request_delete_range(del_request.release()); req_failure->set_allocated_request_delete_range(del_request.release());
std::shared_ptr<etcdv3::AsyncTxnResponse> call(new etcdv3::AsyncTxnResponse("delete"));
etcdv3::AsyncTxnResponse* call= new etcdv3::AsyncTxnResponse("delete");
call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_); call->response_reader = stub_->AsyncTxn(&call->context,txn_request,&call->cq_);
call->response_reader->Finish(&call->reply, &call->status, (void*)call); call->response_reader->Finish(&call->reply, &call->status, (void*)call.get());
return Response::create(call); return Response::create(call);
} }