Update protobuf, implments lock, fixes watch and improves txn (#1)

This commit is contained in:
Tao He 2020-04-07 10:49:49 +08:00
parent ac9cce8815
commit 4e9d17c188
41 changed files with 1741 additions and 225 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
build/
compile_commands.json
proto/**/*.pb.cc
proto/**/*.pb.h

View File

@ -1,18 +1,43 @@
cmake_minimum_required (VERSION 3.1.3 FATAL_ERROR)
project (etcd-cpp-api)
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
set (etcd-cpp-api_VERSION_MAJOR 0)
set (etcd-cpp-api_VERSION_MINOR 1)
find_library(CPPREST_LIB NAMES cpprest)
find_path(CPPREST_INCLUDE_DIR NAMES cpprest/http_client.h)
find_package(Boost REQUIRED COMPONENTS system thread locale random)
find_package(OpenSSL REQUIRED)
find_package(Protobuf REQUIRED)
set (etcd-cpp-api_VERSION_MAJOR 0)
set (etcd-cpp-api_VERSION_MINOR 1)
set(GRPC_LIBRARY_PATH /usr/lib
/usr/lib64
/usr/local/lib
/usr/local/lib64
/usr/local/opt/grpc)
find_library(GPR_LIBRARY NAMES gpr PATHS ${GRPC_LIBRARY_PATH})
find_library(GRPC_LIBRARY NAMES grpc PATHS ${GRPC_LIBRARY_PATH})
find_library(GRPC++_LIBRARY NAMES grpc++ PATHS ${GRPC_LIBRARY_PATH})
set(GRPC_LIBRARIES ${GPR_LIBRARY} ${GRPC_LIBRARY} ${GRPC++_LIBRARY})
file(GLOB_RECURSE PROTO_SRC RELATIVE "${CMAKE_SOURCE_DIR}/proto" "proto/*.proto")
add_custom_target(proto-gen
COMMAND protoc -I . --cpp_out=. ${PROTO_SRC}
COMMAND protoc -I . --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_cpp_plugin` ./rpc.proto ./v3lock.proto
COMMENT "Generate protobuf stuffs"
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/proto)
enable_testing()
include_directories(SYSTEM ${CPPREST_INCLUDE_DIR} ${Boost_INCLUDE_DIR})
include_directories(SYSTEM ${CPPREST_INCLUDE_DIR}
${Boost_INCLUDE_DIR}
${PROTOBUF_INCLUDE_DIRS}
${OPENSSL_INCLUDE_DIR})
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wpedantic -Werror -std=c++11")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wpedantic -Werror -Wno-string-compare -std=c++11")
add_subdirectory(src)
add_subdirectory(tst)

View File

@ -2,18 +2,21 @@
#define __ETCD_CLIENT_HPP__
#include "etcd/Response.hpp"
#include "v3/include/Transaction.hpp"
#include "v3/include/AsyncTxnResponse.hpp"
#include "v3/include/Action.hpp"
#include <string>
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "proto/v3lock.grpc.pb.h"
using etcdserverpb::KV;
using etcdserverpb::Watch;
using etcdserverpb::Lease;
using v3lockpb::Lock;
namespace etcdv3 {
class Transaction;
}
namespace etcd
{
@ -181,11 +184,29 @@ namespace etcd
*/
pplx::task<Response> leasegrant(int ttl);
private:
/**
* Gains a lock at a key.
* @param key is the key to be used to request the lock.
*/
pplx::task<Response> lock(std::string const &key);
/**
* Releases a lock at a key.
* @param key is the lock key to release.
*/
pplx::task<Response> unlock(std::string const &key);
/**
* Execute a etcd transaction.
* @param txn is the transaction object to be executed.
*/
pplx::task<Response> txn(etcdv3::Transaction const &txn);
private:
std::unique_ptr<KV::Stub> stub_;
std::unique_ptr<Watch::Stub> watchServiceStub;
std::unique_ptr<Lease::Stub> leaseServiceStub;
std::unique_ptr<Lock::Stub> lockServiceStub;
};

View File

@ -6,12 +6,13 @@
#include "etcd/Value.hpp"
#include <grpc++/grpc++.h>
#include "v3/include/V3Response.hpp"
#include "proto/kv.pb.h"
#include <iostream>
namespace etcdv3 {
class AsyncWatchAction;
class V3Response;
}
namespace etcd
@ -98,7 +99,17 @@ namespace etcd
*/
std::string const & key(int index) const;
protected:
/**
* Returns the lock key.
*/
std::string const & lock_key() const;
/**
* Returns the watched events.
*/
std::vector<mvccpb::Event> const & events() const;
protected:
Response(const etcdv3::V3Response& response);
Response(int error_code, char const * error_message);
@ -110,6 +121,8 @@ namespace etcd
Value _prev_value;
Values _values;
Keys _keys;
std::string _lock_key; // for lock
std::vector<mvccpb::Event> _events; // for watch
friend class SyncClient;
friend class etcdv3::AsyncWatchAction;
friend class Client;

View File

@ -4,7 +4,10 @@
#include <cpprest/http_client.h>
#include <string>
#include <vector>
#include "v3/include/KeyValue.hpp"
namespace etcdv3 {
class KeyValue;
}
namespace etcd
{

View File

@ -3,9 +3,13 @@
#include <string>
#include "etcd/Response.hpp"
#include "v3/include/AsyncWatchAction.hpp"
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
namespace etcdv3 {
class AsyncWatchAction;
}
using etcdserverpb::KV;
using etcdserverpb::Watch;

View File

@ -1,23 +1,37 @@
syntax = "proto3";
package authpb;
import "gogoproto/gogo.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
option (gogoproto.goproto_enum_prefix_all) = false;
message UserAddOptions {
bool no_password = 1;
};
// User is a single entry in the bucket authUsers
message User {
bytes name = 1;
bytes password = 2;
repeated string roles = 3;
UserAddOptions options = 4;
}
// Permission is a single entity
message Permission {
bytes key = 1;
enum Type {
READ = 0;
WRITE = 1;
READWRITE = 2;
}
Type permType = 2;
Type permType = 1;
bytes key = 2;
bytes range_end = 3;
}
// Role is a single entry in the bucket authRoles

View File

@ -1,27 +1,34 @@
syntax = "proto2";
package etcdserverpb;
import "gogoproto/gogo.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
message Request {
optional uint64 ID = 1;
optional string Method = 2;
optional string Path = 3;
optional string Val = 4;
optional bool Dir = 5;
optional string PrevValue = 6;
optional uint64 PrevIndex = 7;
optional bool PrevExist = 8;
optional int64 Expiration = 9;
optional bool Wait = 10;
optional uint64 Since = 11;
optional bool Recursive = 12;
optional bool Sorted = 13;
optional bool Quorum = 14;
optional int64 Time = 15;
optional bool Stream = 16;
optional bool Refresh = 17;
optional uint64 ID = 1 [(gogoproto.nullable) = false];
optional string Method = 2 [(gogoproto.nullable) = false];
optional string Path = 3 [(gogoproto.nullable) = false];
optional string Val = 4 [(gogoproto.nullable) = false];
optional bool Dir = 5 [(gogoproto.nullable) = false];
optional string PrevValue = 6 [(gogoproto.nullable) = false];
optional uint64 PrevIndex = 7 [(gogoproto.nullable) = false];
optional bool PrevExist = 8 [(gogoproto.nullable) = true];
optional int64 Expiration = 9 [(gogoproto.nullable) = false];
optional bool Wait = 10 [(gogoproto.nullable) = false];
optional uint64 Since = 11 [(gogoproto.nullable) = false];
optional bool Recursive = 12 [(gogoproto.nullable) = false];
optional bool Sorted = 13 [(gogoproto.nullable) = false];
optional bool Quorum = 14 [(gogoproto.nullable) = false];
optional int64 Time = 15 [(gogoproto.nullable) = false];
optional bool Stream = 16 [(gogoproto.nullable) = false];
optional bool Refresh = 17 [(gogoproto.nullable) = true];
}
message Metadata {
optional uint64 NodeID = 1;
optional uint64 ClusterID = 2;
optional uint64 NodeID = 1 [(gogoproto.nullable) = false];
optional uint64 ClusterID = 2 [(gogoproto.nullable) = false];
}

144
proto/gogoproto/gogo.proto Normal file
View File

@ -0,0 +1,144 @@
// Protocol Buffers for Go with Gadgets
//
// Copyright (c) 2013, The GoGo Authors. All rights reserved.
// http://github.com/gogo/protobuf
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
syntax = "proto2";
package gogoproto;
import "google/protobuf/descriptor.proto";
option java_package = "com.google.protobuf";
option java_outer_classname = "GoGoProtos";
option go_package = "github.com/gogo/protobuf/gogoproto";
extend google.protobuf.EnumOptions {
optional bool goproto_enum_prefix = 62001;
optional bool goproto_enum_stringer = 62021;
optional bool enum_stringer = 62022;
optional string enum_customname = 62023;
optional bool enumdecl = 62024;
}
extend google.protobuf.EnumValueOptions {
optional string enumvalue_customname = 66001;
}
extend google.protobuf.FileOptions {
optional bool goproto_getters_all = 63001;
optional bool goproto_enum_prefix_all = 63002;
optional bool goproto_stringer_all = 63003;
optional bool verbose_equal_all = 63004;
optional bool face_all = 63005;
optional bool gostring_all = 63006;
optional bool populate_all = 63007;
optional bool stringer_all = 63008;
optional bool onlyone_all = 63009;
optional bool equal_all = 63013;
optional bool description_all = 63014;
optional bool testgen_all = 63015;
optional bool benchgen_all = 63016;
optional bool marshaler_all = 63017;
optional bool unmarshaler_all = 63018;
optional bool stable_marshaler_all = 63019;
optional bool sizer_all = 63020;
optional bool goproto_enum_stringer_all = 63021;
optional bool enum_stringer_all = 63022;
optional bool unsafe_marshaler_all = 63023;
optional bool unsafe_unmarshaler_all = 63024;
optional bool goproto_extensions_map_all = 63025;
optional bool goproto_unrecognized_all = 63026;
optional bool gogoproto_import = 63027;
optional bool protosizer_all = 63028;
optional bool compare_all = 63029;
optional bool typedecl_all = 63030;
optional bool enumdecl_all = 63031;
optional bool goproto_registration = 63032;
optional bool messagename_all = 63033;
optional bool goproto_sizecache_all = 63034;
optional bool goproto_unkeyed_all = 63035;
}
extend google.protobuf.MessageOptions {
optional bool goproto_getters = 64001;
optional bool goproto_stringer = 64003;
optional bool verbose_equal = 64004;
optional bool face = 64005;
optional bool gostring = 64006;
optional bool populate = 64007;
optional bool stringer = 67008;
optional bool onlyone = 64009;
optional bool equal = 64013;
optional bool description = 64014;
optional bool testgen = 64015;
optional bool benchgen = 64016;
optional bool marshaler = 64017;
optional bool unmarshaler = 64018;
optional bool stable_marshaler = 64019;
optional bool sizer = 64020;
optional bool unsafe_marshaler = 64023;
optional bool unsafe_unmarshaler = 64024;
optional bool goproto_extensions_map = 64025;
optional bool goproto_unrecognized = 64026;
optional bool protosizer = 64028;
optional bool compare = 64029;
optional bool typedecl = 64030;
optional bool messagename = 64033;
optional bool goproto_sizecache = 64034;
optional bool goproto_unkeyed = 64035;
}
extend google.protobuf.FieldOptions {
optional bool nullable = 65001;
optional bool embed = 65002;
optional string customtype = 65003;
optional string customname = 65004;
optional string jsontag = 65005;
optional string moretags = 65006;
optional string casttype = 65007;
optional string castkey = 65008;
optional string castvalue = 65009;
optional bool stdtime = 65010;
optional bool stdduration = 65011;
optional bool wktpointer = 65012;
}

View File

@ -0,0 +1,31 @@
// Copyright (c) 2015, Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package google.api;
import "google/api/http.proto";
import "google/protobuf/descriptor.proto";
option go_package = "google.golang.org/genproto/googleapis/api/annotations;annotations";
option java_multiple_files = true;
option java_outer_classname = "AnnotationsProto";
option java_package = "com.google.api";
option objc_class_prefix = "GAPI";
extend google.protobuf.MethodOptions {
// See `HttpRule`.
HttpRule http = 72295728;
}

376
proto/google/api/http.proto Normal file
View File

@ -0,0 +1,376 @@
// Copyright 2019 Google LLC.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
syntax = "proto3";
package google.api;
option cc_enable_arenas = true;
option go_package = "google.golang.org/genproto/googleapis/api/annotations;annotations";
option java_multiple_files = true;
option java_outer_classname = "HttpProto";
option java_package = "com.google.api";
option objc_class_prefix = "GAPI";
// Defines the HTTP configuration for an API service. It contains a list of
// [HttpRule][google.api.HttpRule], each specifying the mapping of an RPC method
// to one or more HTTP REST API methods.
message Http {
// A list of HTTP configuration rules that apply to individual API methods.
//
// **NOTE:** All service configuration rules follow "last one wins" order.
repeated HttpRule rules = 1;
// When set to true, URL path parameters will be fully URI-decoded except in
// cases of single segment matches in reserved expansion, where "%2F" will be
// left encoded.
//
// The default behavior is to not decode RFC 6570 reserved characters in multi
// segment matches.
bool fully_decode_reserved_expansion = 2;
}
// # gRPC Transcoding
//
// gRPC Transcoding is a feature for mapping between a gRPC method and one or
// more HTTP REST endpoints. It allows developers to build a single API service
// that supports both gRPC APIs and REST APIs. Many systems, including [Google
// APIs](https://github.com/googleapis/googleapis),
// [Cloud Endpoints](https://cloud.google.com/endpoints), [gRPC
// Gateway](https://github.com/grpc-ecosystem/grpc-gateway),
// and [Envoy](https://github.com/envoyproxy/envoy) proxy support this feature
// and use it for large scale production services.
//
// `HttpRule` defines the schema of the gRPC/REST mapping. The mapping specifies
// how different portions of the gRPC request message are mapped to the URL
// path, URL query parameters, and HTTP request body. It also controls how the
// gRPC response message is mapped to the HTTP response body. `HttpRule` is
// typically specified as an `google.api.http` annotation on the gRPC method.
//
// Each mapping specifies a URL path template and an HTTP method. The path
// template may refer to one or more fields in the gRPC request message, as long
// as each field is a non-repeated field with a primitive (non-message) type.
// The path template controls how fields of the request message are mapped to
// the URL path.
//
// Example:
//
// service Messaging {
// rpc GetMessage(GetMessageRequest) returns (Message) {
// option (google.api.http) = {
// get: "/v1/{name=messages/*}"
// };
// }
// }
// message GetMessageRequest {
// string name = 1; // Mapped to URL path.
// }
// message Message {
// string text = 1; // The resource content.
// }
//
// This enables an HTTP REST to gRPC mapping as below:
//
// HTTP | gRPC
// -----|-----
// `GET /v1/messages/123456` | `GetMessage(name: "messages/123456")`
//
// Any fields in the request message which are not bound by the path template
// automatically become HTTP query parameters if there is no HTTP request body.
// For example:
//
// service Messaging {
// rpc GetMessage(GetMessageRequest) returns (Message) {
// option (google.api.http) = {
// get:"/v1/messages/{message_id}"
// };
// }
// }
// message GetMessageRequest {
// message SubMessage {
// string subfield = 1;
// }
// string message_id = 1; // Mapped to URL path.
// int64 revision = 2; // Mapped to URL query parameter `revision`.
// SubMessage sub = 3; // Mapped to URL query parameter `sub.subfield`.
// }
//
// This enables a HTTP JSON to RPC mapping as below:
//
// HTTP | gRPC
// -----|-----
// `GET /v1/messages/123456?revision=2&sub.subfield=foo` |
// `GetMessage(message_id: "123456" revision: 2 sub: SubMessage(subfield:
// "foo"))`
//
// Note that fields which are mapped to URL query parameters must have a
// primitive type or a repeated primitive type or a non-repeated message type.
// In the case of a repeated type, the parameter can be repeated in the URL
// as `...?param=A&param=B`. In the case of a message type, each field of the
// message is mapped to a separate parameter, such as
// `...?foo.a=A&foo.b=B&foo.c=C`.
//
// For HTTP methods that allow a request body, the `body` field
// specifies the mapping. Consider a REST update method on the
// message resource collection:
//
// service Messaging {
// rpc UpdateMessage(UpdateMessageRequest) returns (Message) {
// option (google.api.http) = {
// patch: "/v1/messages/{message_id}"
// body: "message"
// };
// }
// }
// message UpdateMessageRequest {
// string message_id = 1; // mapped to the URL
// Message message = 2; // mapped to the body
// }
//
// The following HTTP JSON to RPC mapping is enabled, where the
// representation of the JSON in the request body is determined by
// protos JSON encoding:
//
// HTTP | gRPC
// -----|-----
// `PATCH /v1/messages/123456 { "text": "Hi!" }` | `UpdateMessage(message_id:
// "123456" message { text: "Hi!" })`
//
// The special name `*` can be used in the body mapping to define that
// every field not bound by the path template should be mapped to the
// request body. This enables the following alternative definition of
// the update method:
//
// service Messaging {
// rpc UpdateMessage(Message) returns (Message) {
// option (google.api.http) = {
// patch: "/v1/messages/{message_id}"
// body: "*"
// };
// }
// }
// message Message {
// string message_id = 1;
// string text = 2;
// }
//
//
// The following HTTP JSON to RPC mapping is enabled:
//
// HTTP | gRPC
// -----|-----
// `PATCH /v1/messages/123456 { "text": "Hi!" }` | `UpdateMessage(message_id:
// "123456" text: "Hi!")`
//
// Note that when using `*` in the body mapping, it is not possible to
// have HTTP parameters, as all fields not bound by the path end in
// the body. This makes this option more rarely used in practice when
// defining REST APIs. The common usage of `*` is in custom methods
// which don't use the URL at all for transferring data.
//
// It is possible to define multiple HTTP methods for one RPC by using
// the `additional_bindings` option. Example:
//
// service Messaging {
// rpc GetMessage(GetMessageRequest) returns (Message) {
// option (google.api.http) = {
// get: "/v1/messages/{message_id}"
// additional_bindings {
// get: "/v1/users/{user_id}/messages/{message_id}"
// }
// };
// }
// }
// message GetMessageRequest {
// string message_id = 1;
// string user_id = 2;
// }
//
// This enables the following two alternative HTTP JSON to RPC mappings:
//
// HTTP | gRPC
// -----|-----
// `GET /v1/messages/123456` | `GetMessage(message_id: "123456")`
// `GET /v1/users/me/messages/123456` | `GetMessage(user_id: "me" message_id:
// "123456")`
//
// ## Rules for HTTP mapping
//
// 1. Leaf request fields (recursive expansion nested messages in the request
// message) are classified into three categories:
// - Fields referred by the path template. They are passed via the URL path.
// - Fields referred by the [HttpRule.body][google.api.HttpRule.body]. They are passed via the HTTP
// request body.
// - All other fields are passed via the URL query parameters, and the
// parameter name is the field path in the request message. A repeated
// field can be represented as multiple query parameters under the same
// name.
// 2. If [HttpRule.body][google.api.HttpRule.body] is "*", there is no URL query parameter, all fields
// are passed via URL path and HTTP request body.
// 3. If [HttpRule.body][google.api.HttpRule.body] is omitted, there is no HTTP request body, all
// fields are passed via URL path and URL query parameters.
//
// ### Path template syntax
//
// Template = "/" Segments [ Verb ] ;
// Segments = Segment { "/" Segment } ;
// Segment = "*" | "**" | LITERAL | Variable ;
// Variable = "{" FieldPath [ "=" Segments ] "}" ;
// FieldPath = IDENT { "." IDENT } ;
// Verb = ":" LITERAL ;
//
// The syntax `*` matches a single URL path segment. The syntax `**` matches
// zero or more URL path segments, which must be the last part of the URL path
// except the `Verb`.
//
// The syntax `Variable` matches part of the URL path as specified by its
// template. A variable template must not contain other variables. If a variable
// matches a single path segment, its template may be omitted, e.g. `{var}`
// is equivalent to `{var=*}`.
//
// The syntax `LITERAL` matches literal text in the URL path. If the `LITERAL`
// contains any reserved character, such characters should be percent-encoded
// before the matching.
//
// If a variable contains exactly one path segment, such as `"{var}"` or
// `"{var=*}"`, when such a variable is expanded into a URL path on the client
// side, all characters except `[-_.~0-9a-zA-Z]` are percent-encoded. The
// server side does the reverse decoding. Such variables show up in the
// [Discovery
// Document](https://developers.google.com/discovery/v1/reference/apis) as
// `{var}`.
//
// If a variable contains multiple path segments, such as `"{var=foo/*}"`
// or `"{var=**}"`, when such a variable is expanded into a URL path on the
// client side, all characters except `[-_.~/0-9a-zA-Z]` are percent-encoded.
// The server side does the reverse decoding, except "%2F" and "%2f" are left
// unchanged. Such variables show up in the
// [Discovery
// Document](https://developers.google.com/discovery/v1/reference/apis) as
// `{+var}`.
//
// ## Using gRPC API Service Configuration
//
// gRPC API Service Configuration (service config) is a configuration language
// for configuring a gRPC service to become a user-facing product. The
// service config is simply the YAML representation of the `google.api.Service`
// proto message.
//
// As an alternative to annotating your proto file, you can configure gRPC
// transcoding in your service config YAML files. You do this by specifying a
// `HttpRule` that maps the gRPC method to a REST endpoint, achieving the same
// effect as the proto annotation. This can be particularly useful if you
// have a proto that is reused in multiple services. Note that any transcoding
// specified in the service config will override any matching transcoding
// configuration in the proto.
//
// Example:
//
// http:
// rules:
// # Selects a gRPC method and applies HttpRule to it.
// - selector: example.v1.Messaging.GetMessage
// get: /v1/messages/{message_id}/{sub.subfield}
//
// ## Special notes
//
// When gRPC Transcoding is used to map a gRPC to JSON REST endpoints, the
// proto to JSON conversion must follow the [proto3
// specification](https://developers.google.com/protocol-buffers/docs/proto3#json).
//
// While the single segment variable follows the semantics of
// [RFC 6570](https://tools.ietf.org/html/rfc6570) Section 3.2.2 Simple String
// Expansion, the multi segment variable **does not** follow RFC 6570 Section
// 3.2.3 Reserved Expansion. The reason is that the Reserved Expansion
// does not expand special characters like `?` and `#`, which would lead
// to invalid URLs. As the result, gRPC Transcoding uses a custom encoding
// for multi segment variables.
//
// The path variables **must not** refer to any repeated or mapped field,
// because client libraries are not capable of handling such variable expansion.
//
// The path variables **must not** capture the leading "/" character. The reason
// is that the most common use case "{var}" does not capture the leading "/"
// character. For consistency, all path variables must share the same behavior.
//
// Repeated message fields must not be mapped to URL query parameters, because
// no client library can support such complicated mapping.
//
// If an API needs to use a JSON array for request or response body, it can map
// the request or response body to a repeated field. However, some gRPC
// Transcoding implementations may not support this feature.
message HttpRule {
// Selects a method to which this rule applies.
//
// Refer to [selector][google.api.DocumentationRule.selector] for syntax details.
string selector = 1;
// Determines the URL pattern is matched by this rules. This pattern can be
// used with any of the {get|put|post|delete|patch} methods. A custom method
// can be defined using the 'custom' field.
oneof pattern {
// Maps to HTTP GET. Used for listing and getting information about
// resources.
string get = 2;
// Maps to HTTP PUT. Used for replacing a resource.
string put = 3;
// Maps to HTTP POST. Used for creating a resource or performing an action.
string post = 4;
// Maps to HTTP DELETE. Used for deleting a resource.
string delete = 5;
// Maps to HTTP PATCH. Used for updating a resource.
string patch = 6;
// The custom pattern is used for specifying an HTTP method that is not
// included in the `pattern` field, such as HEAD, or "*" to leave the
// HTTP method unspecified for this rule. The wild-card rule is useful
// for services that provide content to Web (HTML) clients.
CustomHttpPattern custom = 8;
}
// The name of the request field whose value is mapped to the HTTP request
// body, or `*` for mapping all request fields not captured by the path
// pattern to the HTTP body, or omitted for not having any HTTP request body.
//
// NOTE: the referred field must be present at the top-level of the request
// message type.
string body = 7;
// Optional. The name of the response field whose value is mapped to the HTTP
// response body. When omitted, the entire response message will be used
// as the HTTP response body.
//
// NOTE: The referred field must be present at the top-level of the response
// message type.
string response_body = 12;
// Additional HTTP bindings for the selector. Nested bindings must
// not contain an `additional_bindings` field themselves (that is,
// the nesting may only be one level deep).
repeated HttpRule additional_bindings = 11;
}
// A custom pattern is used for defining custom HTTP verb.
message CustomHttpPattern {
// The name of this custom HTTP verb.
string kind = 1;
// The path matched by this custom verb.
string path = 2;
}

View File

@ -1,6 +1,14 @@
syntax = "proto3";
package mvccpb;
import "gogoproto/gogo.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
option (gogoproto.goproto_enum_prefix_all) = false;
message KeyValue {
// key is the key in bytes. An empty key is not allowed.
bytes key = 1;

View File

@ -1,34 +1,65 @@
syntax = "proto3";
package etcdserverpb;
import "gogoproto/gogo.proto";
import "kv.proto";
import "auth.proto";
// for grpc-gateway
import "google/api/annotations.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.unmarshaler_all) = true;
service KV {
// Range gets the keys in the range from the key-value store.
rpc Range(RangeRequest) returns (RangeResponse) {}
rpc Range(RangeRequest) returns (RangeResponse) {
option (google.api.http) = {
post: "/v3/kv/range"
body: "*"
};
}
// Put puts the given key into the key-value store.
// A put request increments the revision of the key-value store
// and generates one event in the event history.
rpc Put(PutRequest) returns (PutResponse) {}
rpc Put(PutRequest) returns (PutResponse) {
option (google.api.http) = {
post: "/v3/kv/put"
body: "*"
};
}
// DeleteRange deletes the given range from the key-value store.
// A delete request increments the revision of the key-value store
// and generates a delete event in the event history for every deleted key.
rpc DeleteRange(DeleteRangeRequest) returns (DeleteRangeResponse) {}
rpc DeleteRange(DeleteRangeRequest) returns (DeleteRangeResponse) {
option (google.api.http) = {
post: "/v3/kv/deleterange"
body: "*"
};
}
// Txn processes multiple requests in a single transaction.
// A txn request increments the revision of the key-value store
// and generates events with the same revision for every completed request.
// It is not allowed to modify the same key several times within one txn.
rpc Txn(TxnRequest) returns (TxnResponse) {}
rpc Txn(TxnRequest) returns (TxnResponse) {
option (google.api.http) = {
post: "/v3/kv/txn"
body: "*"
};
}
// Compact compacts the event history in the etcd key-value store. The key-value
// store should be periodically compacted or the event history will continue to grow
// indefinitely.
rpc Compact(CompactionRequest) returns (CompactionResponse) {}
rpc Compact(CompactionRequest) returns (CompactionResponse) {
option (google.api.http) = {
post: "/v3/kv/compaction"
body: "*"
};
}
}
service Watch {
@ -37,107 +68,305 @@ service Watch {
// stream sends events. One watch RPC can watch on multiple key ranges, streaming events
// for several watches at once. The entire event history can be watched starting from the
// last compaction revision.
rpc Watch(stream WatchRequest) returns (stream WatchResponse) {}
rpc Watch(stream WatchRequest) returns (stream WatchResponse) {
option (google.api.http) = {
post: "/v3/watch"
body: "*"
};
}
}
service Lease {
// LeaseGrant creates a lease which expires if the server does not receive a keepAlive
// within a given time to live period. All keys attached to the lease will be expired and
// deleted if the lease expires. Each expired key generates a delete event in the event history.
rpc LeaseGrant(LeaseGrantRequest) returns (LeaseGrantResponse) {}
rpc LeaseGrant(LeaseGrantRequest) returns (LeaseGrantResponse) {
option (google.api.http) = {
post: "/v3/lease/grant"
body: "*"
};
}
// LeaseRevoke revokes a lease. All keys attached to the lease will expire and be deleted.
rpc LeaseRevoke(LeaseRevokeRequest) returns (LeaseRevokeResponse) {}
rpc LeaseRevoke(LeaseRevokeRequest) returns (LeaseRevokeResponse) {
option (google.api.http) = {
post: "/v3/lease/revoke"
body: "*"
additional_bindings {
post: "/v3/kv/lease/revoke"
body: "*"
}
};
}
// LeaseKeepAlive keeps the lease alive by streaming keep alive requests from the client
// to the server and streaming keep alive responses from the server to the client.
rpc LeaseKeepAlive(stream LeaseKeepAliveRequest) returns (stream LeaseKeepAliveResponse) {}
rpc LeaseKeepAlive(stream LeaseKeepAliveRequest) returns (stream LeaseKeepAliveResponse) {
option (google.api.http) = {
post: "/v3/lease/keepalive"
body: "*"
};
}
// TODO(xiangli) List all existing Leases?
// TODO(xiangli) Get details information (expirations, leased keys, etc.) of a lease?
// LeaseTimeToLive retrieves lease information.
rpc LeaseTimeToLive(LeaseTimeToLiveRequest) returns (LeaseTimeToLiveResponse) {
option (google.api.http) = {
post: "/v3/lease/timetolive"
body: "*"
additional_bindings {
post: "/v3/kv/lease/timetolive"
body: "*"
}
};
}
// LeaseLeases lists all existing leases.
rpc LeaseLeases(LeaseLeasesRequest) returns (LeaseLeasesResponse) {
option (google.api.http) = {
post: "/v3/lease/leases"
body: "*"
additional_bindings {
post: "/v3/kv/lease/leases"
body: "*"
}
};
}
}
service Cluster {
// MemberAdd adds a member into the cluster.
rpc MemberAdd(MemberAddRequest) returns (MemberAddResponse) {}
rpc MemberAdd(MemberAddRequest) returns (MemberAddResponse) {
option (google.api.http) = {
post: "/v3/cluster/member/add"
body: "*"
};
}
// MemberRemove removes an existing member from the cluster.
rpc MemberRemove(MemberRemoveRequest) returns (MemberRemoveResponse) {}
rpc MemberRemove(MemberRemoveRequest) returns (MemberRemoveResponse) {
option (google.api.http) = {
post: "/v3/cluster/member/remove"
body: "*"
};
}
// MemberUpdate updates the member configuration.
rpc MemberUpdate(MemberUpdateRequest) returns (MemberUpdateResponse) {}
rpc MemberUpdate(MemberUpdateRequest) returns (MemberUpdateResponse) {
option (google.api.http) = {
post: "/v3/cluster/member/update"
body: "*"
};
}
// MemberList lists all the members in the cluster.
rpc MemberList(MemberListRequest) returns (MemberListResponse) {}
rpc MemberList(MemberListRequest) returns (MemberListResponse) {
option (google.api.http) = {
post: "/v3/cluster/member/list"
body: "*"
};
}
// MemberPromote promotes a member from raft learner (non-voting) to raft voting member.
rpc MemberPromote(MemberPromoteRequest) returns (MemberPromoteResponse) {
option (google.api.http) = {
post: "/v3/cluster/member/promote"
body: "*"
};
}
}
service Maintenance {
// Alarm activates, deactivates, and queries alarms regarding cluster health.
rpc Alarm(AlarmRequest) returns (AlarmResponse) {}
rpc Alarm(AlarmRequest) returns (AlarmResponse) {
option (google.api.http) = {
post: "/v3/maintenance/alarm"
body: "*"
};
}
// Status gets the status of the member.
rpc Status(StatusRequest) returns (StatusResponse) {}
rpc Status(StatusRequest) returns (StatusResponse) {
option (google.api.http) = {
post: "/v3/maintenance/status"
body: "*"
};
}
// Defragment defragments a member's backend database to recover storage space.
rpc Defragment(DefragmentRequest) returns (DefragmentResponse) {}
rpc Defragment(DefragmentRequest) returns (DefragmentResponse) {
option (google.api.http) = {
post: "/v3/maintenance/defragment"
body: "*"
};
}
// Hash returns the hash of the local KV state for consistency checking purpose.
// This is designed for testing; do not use this in production when there
// are ongoing transactions.
rpc Hash(HashRequest) returns (HashResponse) {}
// Hash computes the hash of whole backend keyspace,
// including key, lease, and other buckets in storage.
// This is designed for testing ONLY!
// Do not rely on this in production with ongoing transactions,
// since Hash operation does not hold MVCC locks.
// Use "HashKV" API instead for "key" bucket consistency checks.
rpc Hash(HashRequest) returns (HashResponse) {
option (google.api.http) = {
post: "/v3/maintenance/hash"
body: "*"
};
}
// HashKV computes the hash of all MVCC keys up to a given revision.
// It only iterates "key" bucket in backend storage.
rpc HashKV(HashKVRequest) returns (HashKVResponse) {
option (google.api.http) = {
post: "/v3/maintenance/hash"
body: "*"
};
}
// Snapshot sends a snapshot of the entire backend from a member over a stream to a client.
rpc Snapshot(SnapshotRequest) returns (stream SnapshotResponse) {}
rpc Snapshot(SnapshotRequest) returns (stream SnapshotResponse) {
option (google.api.http) = {
post: "/v3/maintenance/snapshot"
body: "*"
};
}
// MoveLeader requests current leader node to transfer its leadership to transferee.
rpc MoveLeader(MoveLeaderRequest) returns (MoveLeaderResponse) {
option (google.api.http) = {
post: "/v3/maintenance/transfer-leadership"
body: "*"
};
}
}
service Auth {
// AuthEnable enables authentication.
rpc AuthEnable(AuthEnableRequest) returns (AuthEnableResponse) {}
rpc AuthEnable(AuthEnableRequest) returns (AuthEnableResponse) {
option (google.api.http) = {
post: "/v3/auth/enable"
body: "*"
};
}
// AuthDisable disables authentication.
rpc AuthDisable(AuthDisableRequest) returns (AuthDisableResponse) {}
rpc AuthDisable(AuthDisableRequest) returns (AuthDisableResponse) {
option (google.api.http) = {
post: "/v3/auth/disable"
body: "*"
};
}
// Authenticate processes an authenticate request.
rpc Authenticate(AuthenticateRequest) returns (AuthenticateResponse) {}
rpc Authenticate(AuthenticateRequest) returns (AuthenticateResponse) {
option (google.api.http) = {
post: "/v3/auth/authenticate"
body: "*"
};
}
// UserAdd adds a new user.
rpc UserAdd(AuthUserAddRequest) returns (AuthUserAddResponse) {}
// UserAdd adds a new user. User name cannot be empty.
rpc UserAdd(AuthUserAddRequest) returns (AuthUserAddResponse) {
option (google.api.http) = {
post: "/v3/auth/user/add"
body: "*"
};
}
// UserGet gets detailed user information.
rpc UserGet(AuthUserGetRequest) returns (AuthUserGetResponse) {}
rpc UserGet(AuthUserGetRequest) returns (AuthUserGetResponse) {
option (google.api.http) = {
post: "/v3/auth/user/get"
body: "*"
};
}
// UserList gets a list of all users.
rpc UserList(AuthUserListRequest) returns (AuthUserListResponse) {}
rpc UserList(AuthUserListRequest) returns (AuthUserListResponse) {
option (google.api.http) = {
post: "/v3/auth/user/list"
body: "*"
};
}
// UserDelete deletes a specified user.
rpc UserDelete(AuthUserDeleteRequest) returns (AuthUserDeleteResponse) {}
rpc UserDelete(AuthUserDeleteRequest) returns (AuthUserDeleteResponse) {
option (google.api.http) = {
post: "/v3/auth/user/delete"
body: "*"
};
}
// UserChangePassword changes the password of a specified user.
rpc UserChangePassword(AuthUserChangePasswordRequest) returns (AuthUserChangePasswordResponse) {}
rpc UserChangePassword(AuthUserChangePasswordRequest) returns (AuthUserChangePasswordResponse) {
option (google.api.http) = {
post: "/v3/auth/user/changepw"
body: "*"
};
}
// UserGrant grants a role to a specified user.
rpc UserGrantRole(AuthUserGrantRoleRequest) returns (AuthUserGrantRoleResponse) {}
rpc UserGrantRole(AuthUserGrantRoleRequest) returns (AuthUserGrantRoleResponse) {
option (google.api.http) = {
post: "/v3/auth/user/grant"
body: "*"
};
}
// UserRevokeRole revokes a role of specified user.
rpc UserRevokeRole(AuthUserRevokeRoleRequest) returns (AuthUserRevokeRoleResponse) {}
rpc UserRevokeRole(AuthUserRevokeRoleRequest) returns (AuthUserRevokeRoleResponse) {
option (google.api.http) = {
post: "/v3/auth/user/revoke"
body: "*"
};
}
// RoleAdd adds a new role.
rpc RoleAdd(AuthRoleAddRequest) returns (AuthRoleAddResponse) {}
// RoleAdd adds a new role. Role name cannot be empty.
rpc RoleAdd(AuthRoleAddRequest) returns (AuthRoleAddResponse) {
option (google.api.http) = {
post: "/v3/auth/role/add"
body: "*"
};
}
// RoleGet gets detailed role information.
rpc RoleGet(AuthRoleGetRequest) returns (AuthRoleGetResponse) {}
rpc RoleGet(AuthRoleGetRequest) returns (AuthRoleGetResponse) {
option (google.api.http) = {
post: "/v3/auth/role/get"
body: "*"
};
}
// RoleList gets lists of all roles.
rpc RoleList(AuthRoleListRequest) returns (AuthRoleListResponse) {}
rpc RoleList(AuthRoleListRequest) returns (AuthRoleListResponse) {
option (google.api.http) = {
post: "/v3/auth/role/list"
body: "*"
};
}
// RoleDelete deletes a specified role.
rpc RoleDelete(AuthRoleDeleteRequest) returns (AuthRoleDeleteResponse) {}
rpc RoleDelete(AuthRoleDeleteRequest) returns (AuthRoleDeleteResponse) {
option (google.api.http) = {
post: "/v3/auth/role/delete"
body: "*"
};
}
// RoleGrantPermission grants a permission of a specified key or range to a specified role.
rpc RoleGrantPermission(AuthRoleGrantPermissionRequest) returns (AuthRoleGrantPermissionResponse) {}
rpc RoleGrantPermission(AuthRoleGrantPermissionRequest) returns (AuthRoleGrantPermissionResponse) {
option (google.api.http) = {
post: "/v3/auth/role/grant"
body: "*"
};
}
// RoleRevokePermission revokes a key or range permission of a specified role.
rpc RoleRevokePermission(AuthRoleRevokePermissionRequest) returns (AuthRoleRevokePermissionResponse) {}
rpc RoleRevokePermission(AuthRoleRevokePermissionRequest) returns (AuthRoleRevokePermissionResponse) {
option (google.api.http) = {
post: "/v3/auth/role/revoke"
body: "*"
};
}
}
message ResponseHeader {
@ -146,6 +375,9 @@ message ResponseHeader {
// member_id is the ID of the member which sent the response.
uint64 member_id = 2;
// revision is the key-value store revision when the request was applied.
// For watch progress responses, the header.revision indicates progress. All future events
// recieved in this stream are guaranteed to have a higher revision number than the
// header.revision number.
int64 revision = 3;
// raft_term is the raft term when the request was applied.
uint64 raft_term = 4;
@ -169,11 +401,12 @@ message RangeRequest {
bytes key = 1;
// range_end is the upper bound on the requested range [key, range_end).
// If range_end is '\0', the range is all keys >= key.
// If the range_end is one bit larger than the given key,
// then the range requests get the all keys with the prefix (the given key).
// If both key and range_end are '\0', then range requests returns all keys.
// If range_end is key plus one (e.g., "aa"+1 == "ab", "a\xff"+1 == "b"),
// then the range request gets all keys prefixed with key.
// If both key and range_end are '\0', then the range request returns all keys.
bytes range_end = 2;
// limit is a limit on the number of keys returned for the request.
// limit is a limit on the number of keys returned for the request. When limit is set to 0,
// it is treated as no limit.
int64 limit = 3;
// revision is the point-in-time of the key-value store to use for the range.
// If revision is less or equal to zero, the range is over the newest key-value store.
@ -196,9 +429,25 @@ message RangeRequest {
// keys_only when set returns only the keys and not the values.
bool keys_only = 8;
// count_only when set returns only the count of the keys in the range.
bool count_only = 9;
// min_mod_revision is the lower bound for returned key mod revisions; all keys with
// lesser mod revisions will be filtered away.
int64 min_mod_revision = 10;
// max_mod_revision is the upper bound for returned key mod revisions; all keys with
// greater mod revisions will be filtered away.
int64 max_mod_revision = 11;
// min_create_revision is the lower bound for returned key create revisions; all keys with
// lesser create revisions will be filtered away.
int64 min_create_revision = 12;
// max_create_revision is the upper bound for returned key create revisions; all keys with
// greater create revisions will be filtered away.
int64 max_create_revision = 13;
}
message RangeResponse {
@ -224,6 +473,14 @@ message PutRequest {
// If prev_kv is set, etcd gets the previous key-value pair before changing it.
// The previous key-value pair will be returned in the put response.
bool prev_kv = 4;
// If ignore_value is set, etcd updates the key using its current value.
// Returns an error if the key does not exist.
bool ignore_value = 5;
// If ignore_lease is set, etcd updates the key using its current lease.
// Returns an error if the key does not exist.
bool ignore_lease = 6;
}
message PutResponse {
@ -237,11 +494,13 @@ message DeleteRangeRequest {
bytes key = 1;
// range_end is the key following the last key to delete for the range [key, range_end).
// If range_end is not given, the range is defined to contain only the key argument.
// If range_end is one bit larger than the given key, then the range is all the keys
// with the prefix (the given key).
// If range_end is '\0', the range is all keys greater than or equal to the key argument.
bytes range_end = 2;
// If prev_kv is set, etcd gets the previous key-value pairs before deleting it.
// The previous key-value pairs will be returned in the delte response.
// The previous key-value pairs will be returned in the delete response.
bool prev_kv = 3;
}
@ -259,6 +518,7 @@ message RequestOp {
RangeRequest request_range = 1;
PutRequest request_put = 2;
DeleteRangeRequest request_delete_range = 3;
TxnRequest request_txn = 4;
}
}
@ -268,6 +528,7 @@ message ResponseOp {
RangeResponse response_range = 1;
PutResponse response_put = 2;
DeleteRangeResponse response_delete_range = 3;
TxnResponse response_txn = 4;
}
}
@ -276,12 +537,14 @@ message Compare {
EQUAL = 0;
GREATER = 1;
LESS = 2;
NOT_EQUAL = 3;
}
enum CompareTarget {
VERSION = 0;
CREATE = 1;
MOD = 2;
VALUE= 3;
VALUE = 3;
LEASE = 4;
}
// result is logical comparison operation for this comparison.
CompareResult result = 1;
@ -298,7 +561,15 @@ message Compare {
int64 mod_revision = 6;
// value is the value of the given key, in bytes.
bytes value = 7;
// lease is the lease id of the given key.
int64 lease = 8;
// leave room for more target_union field tags, jump to 64
}
// range_end compares the given target to all keys in the range [key, range_end).
// See RangeRequest for more details on key ranges.
bytes range_end = 64;
// TODO: fill out with most of the rest of RangeRequest fields when needed.
}
// From google paxosdb paper:
@ -341,7 +612,7 @@ message TxnResponse {
// CompactionRequest compacts the key-value store up to a given revision. All superseded keys
// with a revision less than the compaction revision will be removed.
message CompactionRequest {
// revision is the key-value store revision for the compaction operation.
// revision is the key-value store revision for the compaction operation.
int64 revision = 1;
// physical is set so the RPC will wait until the compaction is physically
// applied to the local database such that compacted entries are totally
@ -356,9 +627,22 @@ message CompactionResponse {
message HashRequest {
}
message HashKVRequest {
// revision is the key-value store revision for the hash operation.
int64 revision = 1;
}
message HashKVResponse {
ResponseHeader header = 1;
// hash is the hash value computed from the responding member's MVCC keys up to a given revision.
uint32 hash = 2;
// compact_revision is the compacted revision of key-value store when hash begins.
int64 compact_revision = 3;
}
message HashResponse {
ResponseHeader header = 1;
// hash is the hash value computed from the responding member's key-value store.
// hash is the hash value computed from the responding member's KV's backend.
uint32 hash = 2;
}
@ -382,18 +666,24 @@ message WatchRequest {
oneof request_union {
WatchCreateRequest create_request = 1;
WatchCancelRequest cancel_request = 2;
WatchProgressRequest progress_request = 3;
}
}
message WatchCreateRequest {
// key is the key to register for watching.
bytes key = 1;
// range_end is the end of the range [key, range_end) to watch. If range_end is not given,
// only the key argument is watched. If range_end is equal to '\0', all keys greater than
// or equal to the key argument are watched.
// If the range_end is one bit larger than the given key,
// then all keys with the prefix (the given key) will be watched.
bytes range_end = 2;
// start_revision is an optional revision to watch from (inclusive). No start_revision is "now".
int64 start_revision = 3;
// progress_notify is set so that the etcd server will periodically send a WatchResponse with
// no events to the new watcher if there are no recent events. It is useful when clients
// wish to recover a disconnected watcher starting from a recent known revision.
@ -401,17 +691,28 @@ message WatchCreateRequest {
bool progress_notify = 4;
enum FilterType {
// filter out put event.
NOPUT = 0;
// filter out delete event.
NODELETE = 1;
// filter out put event.
NOPUT = 0;
// filter out delete event.
NODELETE = 1;
}
// filters filter the events at server side before it sends back to the watcher.
repeated FilterType filters = 5;
// If prev_kv is set, created watcher gets the previous KV before the event happens.
// If the previous KV is already compacted, nothing will be returned.
bool prev_kv = 6;
// If watch_id is provided and non-zero, it will be assigned to this watcher.
// Since creating a watcher in etcd is not a synchronous operation,
// this can be used ensure that ordering is correct when creating multiple
// watchers on the same stream. Creating a watcher with an ID already in
// use on the stream will cause an error to be returned.
int64 watch_id = 7;
// fragment enables splitting large revisions into multiple watch responses.
bool fragment = 8;
}
message WatchCancelRequest {
@ -419,33 +720,47 @@ message WatchCancelRequest {
int64 watch_id = 1;
}
// Requests the a watch stream progress status be sent in the watch response stream as soon as
// possible.
message WatchProgressRequest {
}
message WatchResponse {
ResponseHeader header = 1;
// watch_id is the ID of the watcher that corresponds to the response.
int64 watch_id = 2;
// created is set to true if the response is for a create watch request.
// The client should record the watch_id and expect to receive events for
// the created watcher from the same stream.
// All events sent to the created watcher will attach with the same watch_id.
bool created = 3;
// canceled is set to true if the response is for a cancel watch request.
// No further events will be sent to the canceled watcher.
bool canceled = 4;
// compact_revision is set to the minimum index if a watcher tries to watch
// at a compacted index.
//
// This happens when creating a watcher at a compacted revision or the watcher cannot
// catch up with the progress of the key-value store.
// catch up with the progress of the key-value store.
//
// The client should treat the watcher as canceled and should not try to create any
// watcher with the same start_revision again.
int64 compact_revision = 5;
int64 compact_revision = 5;
// cancel_reason indicates the reason for canceling the watcher.
string cancel_reason = 6;
// framgment is true if large watch response was split over multiple responses.
bool fragment = 7;
repeated mvccpb.Event events = 11;
}
message LeaseGrantRequest {
// TTL is the advisory time-to-live in seconds.
// TTL is the advisory time-to-live in seconds. Expired lease will return -1.
int64 TTL = 1;
// ID is the requested ID for the lease. If ID is set to 0, the lessor chooses an ID.
int64 ID = 2;
@ -469,6 +784,22 @@ message LeaseRevokeResponse {
ResponseHeader header = 1;
}
message LeaseCheckpoint {
// ID is the lease ID to checkpoint.
int64 ID = 1;
// Remaining_TTL is the remaining time until expiry of the lease.
int64 remaining_TTL = 2;
}
message LeaseCheckpointRequest {
repeated LeaseCheckpoint checkpoints = 1;
}
message LeaseCheckpointResponse {
ResponseHeader header = 1;
}
message LeaseKeepAliveRequest {
// ID is the lease ID for the lease to keep alive.
int64 ID = 1;
@ -482,6 +813,38 @@ message LeaseKeepAliveResponse {
int64 TTL = 3;
}
message LeaseTimeToLiveRequest {
// ID is the lease ID for the lease.
int64 ID = 1;
// keys is true to query all the keys attached to this lease.
bool keys = 2;
}
message LeaseTimeToLiveResponse {
ResponseHeader header = 1;
// ID is the lease ID from the keep alive request.
int64 ID = 2;
// TTL is the remaining TTL in seconds for the lease; the lease will expire in under TTL+1 seconds.
int64 TTL = 3;
// GrantedTTL is the initial granted time in seconds upon lease creation/renewal.
int64 grantedTTL = 4;
// Keys is the list of keys attached to this lease.
repeated bytes keys = 5;
}
message LeaseLeasesRequest {
}
message LeaseStatus {
int64 ID = 1;
// TODO: int64 TTL = 2;
}
message LeaseLeasesResponse {
ResponseHeader header = 1;
repeated LeaseStatus leases = 2;
}
message Member {
// ID is the member ID for this member.
uint64 ID = 1;
@ -491,17 +854,23 @@ message Member {
repeated string peerURLs = 3;
// clientURLs is the list of URLs the member exposes to clients for communication. If the member is not started, clientURLs will be empty.
repeated string clientURLs = 4;
// isLearner indicates if the member is raft learner.
bool isLearner = 5;
}
message MemberAddRequest {
// peerURLs is the list of URLs the added member will use to communicate with the cluster.
repeated string peerURLs = 1;
// isLearner indicates if the added member is raft learner.
bool isLearner = 2;
}
message MemberAddResponse {
ResponseHeader header = 1;
// member is the member information for the added member.
Member member = 2;
// members is a list of all members after adding the new member.
repeated Member members = 3;
}
message MemberRemoveRequest {
@ -511,6 +880,8 @@ message MemberRemoveRequest {
message MemberRemoveResponse {
ResponseHeader header = 1;
// members is a list of all members after removing the member.
repeated Member members = 2;
}
message MemberUpdateRequest {
@ -522,6 +893,8 @@ message MemberUpdateRequest {
message MemberUpdateResponse{
ResponseHeader header = 1;
// members is a list of all members after updating the member.
repeated Member members = 2;
}
message MemberListRequest {
@ -533,6 +906,17 @@ message MemberListResponse {
repeated Member members = 2;
}
message MemberPromoteRequest {
// ID is the member ID of the member to promote.
uint64 ID = 1;
}
message MemberPromoteResponse {
ResponseHeader header = 1;
// members is a list of all members after promoting the member.
repeated Member members = 2;
}
message DefragmentRequest {
}
@ -540,9 +924,19 @@ message DefragmentResponse {
ResponseHeader header = 1;
}
message MoveLeaderRequest {
// targetID is the node ID for the new leader.
uint64 targetID = 1;
}
message MoveLeaderResponse {
ResponseHeader header = 1;
}
enum AlarmType {
NONE = 0; // default, used to query if any alarm is active
NOSPACE = 1; // space quota is exhausted
CORRUPT = 2; // kv store corruption detected
}
message AlarmRequest {
@ -582,14 +976,22 @@ message StatusResponse {
ResponseHeader header = 1;
// version is the cluster protocol version used by the responding member.
string version = 2;
// dbSize is the size of the backend database, in bytes, of the responding member.
// dbSize is the size of the backend database physically allocated, in bytes, of the responding member.
int64 dbSize = 3;
// leader is the member ID which the responding member believes is the current leader.
uint64 leader = 4;
// raftIndex is the current raft index of the responding member.
// raftIndex is the current raft committed index of the responding member.
uint64 raftIndex = 5;
// raftTerm is the current raft term of the responding member.
uint64 raftTerm = 6;
// raftAppliedIndex is the current raft applied index of the responding member.
uint64 raftAppliedIndex = 7;
// errors contains alarm/health information and status.
repeated string errors = 8;
// dbSizeInUse is the size of the backend database logically in use, in bytes, of the responding member.
int64 dbSizeInUse = 9;
// isLearner indicates if the member is raft learner.
bool isLearner = 10;
}
message AuthEnableRequest {
@ -606,6 +1008,7 @@ message AuthenticateRequest {
message AuthUserAddRequest {
string name = 1;
string password = 2;
authpb.UserAddOptions options = 3;
}
message AuthUserGetRequest {
@ -664,8 +1067,8 @@ message AuthRoleGrantPermissionRequest {
message AuthRoleRevokePermissionRequest {
string role = 1;
string key = 2;
string range_end = 3;
bytes key = 2;
bytes range_end = 3;
}
message AuthEnableResponse {

65
proto/v3lock.proto Normal file
View File

@ -0,0 +1,65 @@
syntax = "proto3";
package v3lockpb;
import "gogoproto/gogo.proto";
import "rpc.proto";
// for grpc-gateway
import "google/api/annotations.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.unmarshaler_all) = true;
// The lock service exposes client-side locking facilities as a gRPC interface.
service Lock {
// Lock acquires a distributed shared lock on a given named lock.
// On success, it will return a unique key that exists so long as the
// lock is held by the caller. This key can be used in conjunction with
// transactions to safely ensure updates to etcd only occur while holding
// lock ownership. The lock is held until Unlock is called on the key or the
// lease associate with the owner expires.
rpc Lock(LockRequest) returns (LockResponse) {
option (google.api.http) = {
post: "/v3/lock/lock"
body: "*"
};
}
// Unlock takes a key returned by Lock and releases the hold on lock. The
// next Lock caller waiting for the lock will then be woken up and given
// ownership of the lock.
rpc Unlock(UnlockRequest) returns (UnlockResponse) {
option (google.api.http) = {
post: "/v3/lock/unlock"
body: "*"
};
}
}
message LockRequest {
// name is the identifier for the distributed shared lock to be acquired.
bytes name = 1;
// lease is the ID of the lease that will be attached to ownership of the
// lock. If the lease expires or is revoked and currently holds the lock,
// the lock is automatically released. Calls to Lock with the same lease will
// be treated as a single acquisition; locking twice with the same lease is a
// no-op.
int64 lease = 2;
}
message LockResponse {
etcdserverpb.ResponseHeader header = 1;
// key is a key that will exist on etcd for the duration that the Lock caller
// owns the lock. Users should not modify this key or the lock may exhibit
// undefined behavior.
bytes key = 2;
}
message UnlockRequest {
// key is the lock ownership key granted by Lock.
bytes key = 1;
}
message UnlockResponse {
etcdserverpb.ResponseHeader header = 1;
}

View File

@ -1,7 +1,18 @@
add_library(etcd-cpp-api SHARED ../proto/kv.pb.cc ../proto/auth.pb.cc ../proto/rpc.pb.cc ../proto/rpc.grpc.pb.cc ../v3/src/AsyncTxnResponse.cpp ../v3/src/AsyncRangeResponse.cpp ../v3/src/Transaction.cpp ../v3/src/action_constants.cpp ../v3/src/AsyncSetAction.cpp ../v3/src/AsyncCompareAndSwapAction.cpp ../v3/src/AsyncUpdateAction.cpp ../v3/src/AsyncGetAction.cpp ../v3/src/AsyncDeleteAction.cpp ../v3/src/AsyncCompareAndDeleteAction.cpp ../v3/src/Action.cpp ../v3/src/AsyncWatchAction.cpp ../v3/src/V3Response.cpp ../v3/src/AsyncDeleteRangeResponse.cpp ../v3/src/AsyncWatchResponse.cpp ../v3/src/AsyncLeaseGrantResponse.cpp ../v3/src/AsyncLeaseGrantAction.cpp ../v3/src/KeyValue.cpp Client.cpp Response.cpp Value.cpp SyncClient.cpp Watcher.cpp)
set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11)
file(GLOB_RECURSE CPP_CLIENT_SRC RELATIVE "${CMAKE_SOURCE_DIR}/src"
"${CMAKE_SOURCE_DIR}/proto/*.cc"
"${CMAKE_SOURCE_DIR}/src/*.cpp"
"${CMAKE_SOURCE_DIR}/v3/src/*.cpp")
target_link_libraries(etcd-cpp-api ${CPPREST_LIB} boost_system ssl crypto protobuf grpc++)
add_library(etcd-cpp-api SHARED ${CPP_CLIENT_SRC})
set_property(TARGET etcd-cpp-api PROPERTY CXX_STANDARD 11)
target_include_directories(etcd-cpp-api PRIVATE ${CMAKE_SOURCE_DIR}/proto)
target_link_libraries(etcd-cpp-api PUBLIC
${CPPREST_LIB}
${Boost_LIBRARIES}
${PROTOBUF_LIBRARIES}
${OPENSSL_LIBRARIES}
${GRPC_LIBRARIES})
install (TARGETS etcd-cpp-api DESTINATION lib)
install (FILES ../etcd/Client.hpp
@ -10,4 +21,18 @@ install (FILES ../etcd/Client.hpp
../etcd/Value.hpp
../etcd/Watcher.hpp
DESTINATION include/etcd)
install (FILES ../proto/auth.pb.h
../proto/kv.pb.h
../proto/rpc.pb.h
../proto/rpc.grpc.pb.h
../proto/v3lock.pb.h
../proto/v3lock.grpc.pb.h
DESTINATION include/etcd/proto)
install (FILES ../v3/include/Transaction.hpp
DESTINATION include/etcd/v3/include)
install (FILES ../proto/gogoproto/gogo.pb.h
DESTINATION include/etcd/proto/gogoproto)
install (FILES ../proto/google/api/annotations.pb.h
../proto/google/api/http.pb.h
DESTINATION include/etcd/proto/google/api)

View File

@ -1,10 +1,12 @@
#include <memory>
#include "etcd/Client.hpp"
#include "v3/include/action_constants.hpp"
#include "v3/include/Action.hpp"
#include "v3/include/AsyncTxnResponse.hpp"
#include "v3/include/AsyncRangeResponse.hpp"
#include "v3/include/AsyncWatchResponse.hpp"
#include "v3/include/AsyncDeleteRangeResponse.hpp"
#include "v3/include/AsyncLockResponse.hpp"
#include "v3/include/Transaction.hpp"
#include <iostream>
@ -16,6 +18,8 @@
#include "v3/include/AsyncDeleteAction.hpp"
#include "v3/include/AsyncWatchAction.hpp"
#include "v3/include/AsyncLeaseGrantAction.hpp"
#include "v3/include/AsyncLockAction.hpp"
#include "v3/include/AsyncTxnAction.hpp"
using grpc::Channel;
@ -32,9 +36,11 @@ etcd::Client::Client(std::string const & address)
stripped_address = address.substr(i+substr.length());
}
std::shared_ptr<Channel> channel = grpc::CreateChannel(stripped_address, grpc::InsecureChannelCredentials());
std::cout << "channel is: " << channel << std::endl;
stub_= KV::NewStub(channel);
watchServiceStub= Watch::NewStub(channel);
leaseServiceStub= Lease::NewStub(channel);
lockServiceStub = Lock::NewStub(channel);
}
@ -325,5 +331,25 @@ pplx::task<etcd::Response> etcd::Client::leasegrant(int ttl)
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::lock(std::string const &key) {
etcdv3::ActionParameters params;
params.key = key;
params.lock_stub = lockServiceStub.get();
std::shared_ptr<etcdv3::AsyncLockAction> call(new etcdv3::AsyncLockAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::unlock(std::string const &key) {
etcdv3::ActionParameters params;
params.key = key;
params.lock_stub = lockServiceStub.get();
std::shared_ptr<etcdv3::AsyncUnlockAction> call(new etcdv3::AsyncUnlockAction(params));
return Response::create(call);
}
pplx::task<etcd::Response> etcd::Client::txn(etcdv3::Transaction const &txn) {
etcdv3::ActionParameters params;
params.kv_stub = stub_.get();
std::shared_ptr<etcdv3::AsyncTxnAction> call(new etcdv3::AsyncTxnAction(params, txn));
return Response::create(call);
}

View File

@ -1,4 +1,5 @@
#include "etcd/Response.hpp"
#include "v3/include/V3Response.hpp"
#include <iostream>
@ -25,6 +26,8 @@ etcd::Response::Response(const etcdv3::V3Response& reply)
_prev_value = Value(reply.get_prev_value());
_lock_key = reply.get_lock_key();
_events = reply.get_events();
}
@ -95,3 +98,11 @@ std::string const & etcd::Response::key(int index) const
{
return _keys[index];
}
std::string const & etcd::Response::lock_key() const {
return _lock_key;
}
std::vector<mvccpb::Event> const & etcd::Response::events() const {
return this->_events;
};

View File

@ -1,4 +1,5 @@
#include "etcd/Watcher.hpp"
#include "v3/include/AsyncWatchAction.hpp"
etcd::Watcher::Watcher(std::string const & address, std::string const & key, std::function<void(Response)> callback)
{

View File

@ -1,8 +1,12 @@
find_path(CATCH_INCLUDE_DIR NAMES catch.hpp)
find_path(CATCH_INCLUDE_DIR NAMES catch.hpp PATHS ${PROJECT_SOURCE_DIR})
include_directories(${CATCH_INCLUDE_DIR})
add_executable(etcd_test EtcdTest.cpp EtcdSyncTest.cpp WatcherTest.cpp)
add_executable(etcd_test EtcdTest.cpp
EtcdSyncTest.cpp
WatcherTest.cpp
LockTest.cpp)
set_property(TARGET etcd_test PROPERTY CXX_STANDARD 11)
target_include_directories(etcd_test PRIVATE ${CMAKE_SOURCE_DIR}/proto)
target_link_libraries(etcd_test etcd-cpp-api)

View File

@ -343,6 +343,21 @@ TEST_CASE("watch changes in the past")
CHECK("45" == res.value().as_string());
}
TEST_CASE("watch multiple keys and use promise") {
etcd::Client etcd("http://127.0.0.1:2379");
int start_index = etcd.add("/test/key1", "value1").get().index();
etcd.add("/test/key2", "value2").get();
pplx::task<size_t> res = etcd.watch("/test", start_index, true)
.then([](pplx::task<etcd::Response> const &resp_task) -> size_t {
auto const &resp = resp_task.get();
return resp.events().size();
});
size_t event_size = res.get();
CHECK(2 == event_size);
}
TEST_CASE("lease grant")
{
etcd::Client etcd("http://127.0.0.1:2379");

78
tst/LockTest.cpp Normal file
View File

@ -0,0 +1,78 @@
#include <atomic>
#include <catch.hpp>
#include <iostream>
#include <thread>
#include "etcd/Client.hpp"
TEST_CASE("lock and unlock")
{
etcd::Client etcd("http://127.0.0.1:2379");
// lock
etcd::Response resp1 = etcd.lock("/test/abcd").get();
CHECK("lock" == resp1.action());
REQUIRE(resp1.is_ok());
REQUIRE(0 == resp1.error_code());
// unlock
etcd::Response resp2 = etcd.unlock(resp1.lock_key()).get();
CHECK("unlock" == resp2.action());
REQUIRE(resp2.is_ok());
REQUIRE(0 == resp2.error_code());
}
TEST_CASE("double lock will fail")
{
etcd::Client etcd("http://127.0.0.1:2379");
// lock
etcd::Response resp1 = etcd.lock("/test/abcd").get();
CHECK("lock" == resp1.action());
REQUIRE(resp1.is_ok());
REQUIRE(0 == resp1.error_code());
bool first_lock_release = false;
std::string lock_key = resp1.lock_key();
auto second_lock_thr = std::thread([&](){
// lock again
etcd::Response resp2 = etcd.lock("/test/abcd").get();
CHECK("lock" == resp2.action());
REQUIRE(resp2.is_ok());
REQUIRE(0 == resp2.error_code());
lock_key = resp2.lock_key();
// will success after first lock released.
REQUIRE(first_lock_release);
});
auto first_lock_thr = std::thread([&]() {
// check the lock key exists.
etcd::Response resp3 = etcd.get(resp1.lock_key()).get();
CHECK("get" == resp3.action());
REQUIRE(resp3.is_ok());
REQUIRE(0 == resp3.error_code());
// create a duration
first_lock_release = true;
std::this_thread::sleep_for(std::chrono::seconds(1));
// unlock the first lock
etcd::Response resp4 = etcd.unlock(lock_key).get();
CHECK("unlock" == resp4.action());
REQUIRE(resp4.is_ok());
REQUIRE(0 == resp4.error_code());
});
first_lock_thr.join();
second_lock_thr.join();
// cleanup: unlock the second lock
etcd::Response resp5 = etcd.unlock(lock_key).get();
CHECK("unlock" == resp5.action());
REQUIRE(resp5.is_ok());
REQUIRE(0 == resp5.error_code());
}

View File

@ -59,64 +59,8 @@ TEST_CASE("create watcher")
}
CHECK(2 == watcher_called);
// TEST_CASE("wait for a value change")
// {
// etcd::Client etcd(etcd_uri);
// etcd.set("/test/key1", "42").wait();
// pplx::task<etcd::Response> res = etcd.watch("/test/key1");
// CHECK(!res.is_done());
// etcd.set("/test/key1", "43").get();
// sleep(1);
// REQUIRE(res.is_done());
// REQUIRE("set" == res.get().action());
// CHECK("43" == res.get().value().as_string());
// }
// TEST_CASE("wait for a directory change")
// {
// etcd::Client etcd(etcd_uri);
// pplx::task<etcd::Response> res = etcd.watch("/test", true);
// etcd.add("/test/key4", "44").wait();
// REQUIRE(res.is_done());
// CHECK("create" == res.get().action());
// CHECK("44" == res.get().value().as_string());
// pplx::task<etcd::Response> res2 = etcd.watch("/test", true);
// etcd.set("/test/key4", "45").wait();
// sleep(1);
// REQUIRE(res2.is_done());
// CHECK("set" == res2.get().action());
// CHECK("45" == res2.get().value().as_string());
// }
// TEST_CASE("watch changes in the past")
// {
// etcd::Client etcd(etcd_uri);
// int index = etcd.set("/test/key1", "42").get().index();
// etcd.set("/test/key1", "43").wait();
// etcd.set("/test/key1", "44").wait();
// etcd.set("/test/key1", "45").wait();
// etcd::Response res = etcd.watch("/test/key1", ++index).get();
// CHECK("set" == res.action());
// CHECK("43" == res.value().as_string());
// res = etcd.watch("/test/key1", ++index).get();
// CHECK("set" == res.action());
// CHECK("44" == res.value().as_string());
// res = etcd.watch("/test", ++index, true).get();
// CHECK("set" == res.action());
// CHECK("45" == res.value().as_string());
// }
etcd.rmdir("/test", true).error_code();
}
// TEST_CASE("request cancellation")
// {
@ -143,5 +87,4 @@ TEST_CASE("create watcher")
// std::cout << "std::exception: " << ex.what() << "\n";
// }
// }
etcd.rmdir("/test", true).error_code();
}

View File

@ -3,6 +3,7 @@
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "proto/v3lock.grpc.pb.h"
using grpc::ClientContext;
using grpc::CompletionQueue;
@ -11,6 +12,7 @@ using grpc::Status;
using etcdserverpb::KV;
using etcdserverpb::Watch;
using etcdserverpb::Lease;
using v3lockpb::Lock;
namespace etcdv3
{
@ -34,6 +36,7 @@ namespace etcdv3
KV::Stub* kv_stub;
Watch::Stub* watch_stub;
Lease::Stub* lease_stub;
Lock::Stub* lock_stub;
};
class Action

View File

@ -0,0 +1,41 @@
#ifndef __ASYNC_LOCKACTION_HPP__
#define __ASYNC_LOCKACTION_HPP__
#include <grpc++/grpc++.h>
#include "proto/v3lock.grpc.pb.h"
#include "v3/include/Action.hpp"
#include "v3/include/AsyncLockResponse.hpp"
#include "etcd/Response.hpp"
using grpc::ClientAsyncResponseReader;
using v3lockpb::LockRequest;
using v3lockpb::LockResponse;
using v3lockpb::UnlockRequest;
using v3lockpb::UnlockResponse;
namespace etcdv3
{
class AsyncLockAction : public etcdv3::Action
{
public:
AsyncLockAction(etcdv3::ActionParameters param);
AsyncLockResponse ParseResponse();
private:
LockResponse reply;
std::unique_ptr<ClientAsyncResponseReader<LockResponse>> response_reader;
};
class AsyncUnlockAction : public etcdv3::Action
{
public:
AsyncUnlockAction(etcdv3::ActionParameters param);
AsyncUnlockResponse ParseResponse();
private:
UnlockResponse reply;
std::unique_ptr<ClientAsyncResponseReader<UnlockResponse>> response_reader;
};
}
#endif

View File

@ -0,0 +1,33 @@
#ifndef __ASYNC_LOCK_HPP__
#define __ASYNC_LOCK_HPP__
#include <grpc++/grpc++.h>
#include "proto/v3lock.grpc.pb.h"
#include "v3/include/V3Response.hpp"
using grpc::ClientAsyncResponseReader;
using v3lockpb::LockRequest;
using v3lockpb::LockResponse;
using v3lockpb::UnlockRequest;
using v3lockpb::UnlockResponse;
namespace etcdv3
{
class AsyncLockResponse : public etcdv3::V3Response
{
public:
AsyncLockResponse(){};
void ParseResponse(LockResponse& resp);
};
class AsyncUnlockResponse : public etcdv3::V3Response
{
public:
AsyncUnlockResponse(){};
void ParseResponse(UnlockResponse& resp);
};
}
#endif

View File

@ -0,0 +1,28 @@
#ifndef __ASYNC_TXNACTION_HPP__
#define __ASYNC_TXNACTION_HPP__
#include <grpc++/grpc++.h>
#include "proto/rpc.grpc.pb.h"
#include "v3/include/Action.hpp"
#include "v3/include/AsyncTxnResponse.hpp"
#include "v3/include/Transaction.hpp"
using grpc::ClientAsyncResponseReader;
using etcdserverpb::TxnResponse;
using etcdserverpb::KV;
namespace etcdv3
{
class AsyncTxnAction : public etcdv3::Action
{
public:
AsyncTxnAction(etcdv3::ActionParameters param, etcdv3::Transaction const &tx);
AsyncTxnResponse ParseResponse();
private:
TxnResponse reply;
std::unique_ptr<ClientAsyncResponseReader<TxnResponse>> response_reader;
};
}
#endif

View File

@ -12,7 +12,8 @@ namespace etcdv3
{
public:
AsyncTxnResponse(){};
void ParseResponse(std::string const& key, bool prefix,TxnResponse& resp);
void ParseResponse(TxnResponse& resp);
void ParseResponse(std::string const& key, bool prefix, TxnResponse& resp);
};
}

View File

@ -24,10 +24,14 @@ public:
void setup_delete_sequence(std::string const &key, std::string const &range_end, bool recursive);
void setup_delete_failure_operation(std::string const &key, std::string const &range_end, bool recursive);
void setup_compare_and_delete_operation(std::string const& key);
void setup_lease_grant_operation(int ttl);
void setup_lease_grant_operation(int ttl);
// update without `get` and no `prev_kv` returned
void setup_put(std::string const &key, std::string const &value);
void setup_delete(std::string const &key);
etcdserverpb::TxnRequest txn_request;
etcdserverpb::LeaseGrantRequest leasegrant_request;
etcdserverpb::LeaseGrantRequest leasegrant_request;
private:
std::string key;

View File

@ -2,6 +2,8 @@
#define __V3_RESPONSE_HPP__
#include <grpc++/grpc++.h>
#include "proto/kv.pb.h"
#include "v3/include/KeyValue.hpp"
namespace etcdv3
@ -22,6 +24,9 @@ namespace etcdv3
etcdv3::KeyValue const & get_value() const;
etcdv3::KeyValue const & get_prev_value() const;
bool has_values() const;
void set_lock_key(std::string const &key);
std::string const &get_lock_key() const;
std::vector<mvccpb::Event> const & get_events() const;
protected:
int error_code;
int index;
@ -31,6 +36,8 @@ namespace etcdv3
etcdv3::KeyValue prev_value;
std::vector<etcdv3::KeyValue> values;
std::vector<etcdv3::KeyValue> prev_values;
std::string lock_key; // for lock
std::vector<mvccpb::Event> events; // for watch
};
}
#endif

View File

@ -10,7 +10,9 @@ namespace etcdv3
extern char const * DELETE_ACTION;
extern char const * COMPARESWAP_ACTION;
extern char const * COMPAREDELETE_ACTION;
extern char const * LOCK_ACTION;
extern char const * UNLOCK_ACTION;
extern char const * TXN_ACTION;
}
#endif

View File

@ -0,0 +1,63 @@
#include "v3/include/AsyncLockAction.hpp"
#include "v3/include/action_constants.hpp"
using v3lockpb::LockRequest;
using v3lockpb::UnlockRequest;
etcdv3::AsyncLockAction::AsyncLockAction(ActionParameters param)
: etcdv3::Action(param)
{
LockRequest lock_request;
lock_request.set_name(parameters.key);
response_reader = parameters.lock_stub->AsyncLock(&context, lock_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}
etcdv3::AsyncLockResponse etcdv3::AsyncLockAction::ParseResponse()
{
AsyncLockResponse lock_resp;
if(!status.ok())
{
std::cout << "lock error message is: " << status.error_message() << std::endl;
lock_resp.set_error_code(status.error_code());
lock_resp.set_error_message(status.error_message());
}
else
{
lock_resp.ParseResponse(reply);
lock_resp.set_action(etcdv3::LOCK_ACTION);
}
return lock_resp;
}
etcdv3::AsyncUnlockAction::AsyncUnlockAction(ActionParameters param)
: etcdv3::Action(param)
{
UnlockRequest unlock_request;
unlock_request.set_key(parameters.key);
response_reader = parameters.lock_stub->AsyncUnlock(&context, unlock_request, &cq_);
response_reader->Finish(&reply, &status, (void*)this);
}
etcdv3::AsyncUnlockResponse etcdv3::AsyncUnlockAction::ParseResponse()
{
AsyncUnlockResponse unlock_resp;
if(!status.ok())
{
std::cout << "unlock error message is: " << status.error_message() << std::endl;
unlock_resp.set_error_code(status.error_code());
unlock_resp.set_error_message(status.error_message());
}
else
{
unlock_resp.ParseResponse(reply);
unlock_resp.set_action(etcdv3::UNLOCK_ACTION);
}
return unlock_resp;
}

View File

@ -0,0 +1,14 @@
#include "v3/include/AsyncLockResponse.hpp"
#include "v3/include/action_constants.hpp"
void etcdv3::AsyncLockResponse::ParseResponse(LockResponse& resp)
{
index = resp.header().revision();
lock_key = resp.key();
}
void etcdv3::AsyncUnlockResponse::ParseResponse(UnlockResponse& resp)
{
index = resp.header().revision();
}

View File

@ -10,7 +10,7 @@ etcdv3::AsyncSetAction::AsyncSetAction(etcdv3::ActionParameters param, bool crea
etcdv3::Transaction transaction(parameters.key);
isCreate = create;
transaction.init_compare(Compare::CompareResult::Compare_CompareResult_EQUAL,
Compare::CompareTarget::Compare_CompareTarget_VERSION);
Compare::CompareTarget::Compare_CompareTarget_VERSION);
transaction.setup_basic_create_sequence(parameters.key, parameters.value, parameters.lease_id);

37
v3/src/AsyncTxnAction.cpp Normal file
View File

@ -0,0 +1,37 @@
#include "v3/include/action_constants.hpp"
#include "v3/include/AsyncTxnAction.hpp"
#include "v3/include/Transaction.hpp"
etcdv3::AsyncTxnAction::AsyncTxnAction(etcdv3::ActionParameters param, etcdv3::Transaction const &tx)
: etcdv3::Action(param)
{
response_reader = parameters.kv_stub->AsyncTxn(&context, tx.txn_request, &cq_);
response_reader->Finish(&reply, &status, (void *)this);
}
etcdv3::AsyncTxnResponse etcdv3::AsyncTxnAction::ParseResponse()
{
AsyncTxnResponse txn_resp;
if(!status.ok())
{
txn_resp.set_error_code(status.error_code());
txn_resp.set_error_message(status.error_message());
}
else
{
txn_resp.ParseResponse(parameters.key, parameters.withPrefix, reply);
txn_resp.set_action(etcdv3::TXN_ACTION);
//if there is an error code returned by parseResponse, we must
//not overwrite it.
if(!reply.succeeded() && !txn_resp.get_error_code())
{
txn_resp.set_error_code(101);
txn_resp.set_error_message("compare failed");
}
}
return txn_resp;
}

View File

@ -5,6 +5,10 @@
using etcdserverpb::ResponseOp;
void etcdv3::AsyncTxnResponse::ParseResponse(TxnResponse& reply) {
index = reply.header().revision();
}
void etcdv3::AsyncTxnResponse::ParseResponse(std::string const& key, bool prefix, TxnResponse& reply)
{
index = reply.header().revision();

View File

@ -15,7 +15,7 @@ etcdv3::AsyncUpdateAction::AsyncUpdateAction(etcdv3::ActionParameters param)
{
etcdv3::Transaction transaction(parameters.key);
transaction.init_compare(Compare::CompareResult::Compare_CompareResult_GREATER,
Compare::CompareTarget::Compare_CompareTarget_VERSION);
Compare::CompareTarget::Compare_CompareTarget_VERSION);
transaction.setup_compare_and_swap_sequence(parameters.value, parameters.lease_id);

View File

@ -27,8 +27,22 @@ etcdv3::AsyncWatchAction::AsyncWatchAction(etcdv3::ActionParameters param)
}
watch_req.mutable_create_request()->CopyFrom(watch_create_req);
stream->Write(watch_req, (void*)"write");
stream->Read(&reply, (void*)this);
// wait "create" success (the stream becomes ready)
void *got_tag;
bool ok = false;
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)"create") {
stream->Write(watch_req, (void *)"write");
} else {
throw std::runtime_error("failed to create a watch connection");
}
// wait "write" success
if (cq_.Next(&got_tag, &ok) && ok && got_tag == (void *)"write") {
stream->Read(&reply, (void*)this);
} else {
throw std::runtime_error("failed to read proper reply from server");
}
}
@ -39,10 +53,15 @@ void etcdv3::AsyncWatchAction::waitForResponse()
while(cq_.Next(&got_tag, &ok))
{
if(ok == false || (got_tag == (void*)"writes done"))
if(ok == false)
{
break;
}
if(got_tag == (void*)"writes done") {
isCancelled = true;
cq_.Shutdown();
break;
}
if(got_tag == (void*)this) // read tag
{
if(reply.events_size())
@ -79,6 +98,8 @@ void etcdv3::AsyncWatchAction::waitForResponse(std::function<void(etcd::Response
if(got_tag == (void*)"writes done")
{
isCancelled = true;
cq_.Shutdown();
break;
}
else if(got_tag == (void*)this) // read tag
{

View File

@ -4,6 +4,11 @@
void etcdv3::AsyncWatchResponse::ParseResponse(WatchResponse& reply)
{
index = reply.header().revision();
for (auto const &e: reply.events()) {
events.emplace_back(e);
}
std::cout << "index = " << index << ", event size = " << reply.events().size()
<< ", res event size = " << events.size() << std::endl;
for(int cnt =0; cnt < reply.events_size(); cnt++)
{
auto event = reply.events(cnt);

View File

@ -56,8 +56,8 @@ void etcdv3::Transaction::setup_set_failure_operation(std::string const &key, st
std::unique_ptr<PutRequest> put_request(new PutRequest());
put_request->set_key(key);
put_request->set_value(value);
put_request->set_prev_kv(true);
put_request->set_lease(leaseid);
put_request->set_prev_kv(true);
put_request->set_lease(leaseid);
RequestOp* req_failure = txn_request.add_failure();
req_failure->set_allocated_request_put(put_request.release());
@ -67,6 +67,58 @@ void etcdv3::Transaction::setup_set_failure_operation(std::string const &key, st
req_failure->set_allocated_request_range(get_request.release());
}
/**
* add key and then get new value of key
*/
void etcdv3::Transaction::setup_basic_create_sequence(std::string const& key, std::string const& value, int64_t leaseid) {
std::unique_ptr<PutRequest> put_request(new PutRequest());
put_request->set_key(key);
put_request->set_value(value);
put_request->set_prev_kv(true);
put_request->set_lease(leaseid);
RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_put(put_request.release());
std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key);
req_success = txn_request.add_success();
req_success->set_allocated_request_range(get_request.release());
}
/**
* get key value then modify and get new value
*/
void etcdv3::Transaction::setup_compare_and_swap_sequence(std::string const& value, int64_t leaseid) {
std::unique_ptr<PutRequest> put_request(new PutRequest());
put_request->set_key(key);
put_request->set_value(value);
put_request->set_prev_kv(true);
put_request->set_lease(leaseid);
RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_put(put_request.release());
std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key);
req_success = txn_request.add_success();
req_success->set_allocated_request_range(get_request.release());
}
/**
* get key, delete
*/
void etcdv3::Transaction::setup_delete_sequence(std::string const &key, std::string const &range_end, bool recursive) {
std::unique_ptr<DeleteRangeRequest> del_request(new DeleteRangeRequest());
del_request->set_key(key);
del_request->set_prev_kv(true);
if(recursive)
{
del_request->set_range_end(range_end);
}
RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_delete_range(del_request.release());
}
/**
* get key, delete
*/
@ -95,62 +147,10 @@ void etcdv3::Transaction::setup_delete_failure_operation(std::string const &key,
req_failure->set_allocated_request_delete_range(del_request.release());
}
/**
* add key and then get new value of key
*/
void etcdv3::Transaction::setup_basic_create_sequence(std::string const& key, std::string const& value, int64_t leaseid) {
std::unique_ptr<PutRequest> put_request(new PutRequest());
put_request->set_key(key);
put_request->set_value(value);
put_request->set_prev_kv(true);
put_request->set_lease(leaseid);
RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_put(put_request.release());
std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key);
req_success = txn_request.add_success();
req_success->set_allocated_request_range(get_request.release());
}
/**
* get key value then modify and get new value
*/
void etcdv3::Transaction::setup_compare_and_swap_sequence(std::string const& value, int64_t leaseid) {
std::unique_ptr<PutRequest> put_request(new PutRequest());
put_request->set_key(key);
put_request->set_value(value);
put_request->set_prev_kv(true);
put_request->set_lease(leaseid);
RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_put(put_request.release());
std::unique_ptr<RangeRequest> get_request(new RangeRequest());
get_request->set_key(key);
req_success = txn_request.add_success();
req_success->set_allocated_request_range(get_request.release());
}
/**
* get key, delete
*/
void etcdv3::Transaction::setup_delete_sequence(std::string const &key, std::string const &range_end, bool recursive) {
std::unique_ptr<DeleteRangeRequest> del_request(new DeleteRangeRequest());
del_request->set_key(key);
del_request->set_prev_kv(true);
if(recursive)
{
del_request->set_range_end(range_end);
}
RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_delete_range(del_request.release());
}
void etcdv3::Transaction::setup_compare_and_delete_operation(std::string const& key) {
std::unique_ptr<DeleteRangeRequest> del_request(new DeleteRangeRequest());
del_request->set_key(key);
del_request->set_prev_kv(true);
del_request->set_prev_kv(true);
RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_delete_range(del_request.release());
}
@ -160,6 +160,23 @@ void etcdv3::Transaction::setup_lease_grant_operation(int ttl)
leasegrant_request.set_ttl(ttl);
}
void etcdv3::Transaction::setup_put(std::string const &key, std::string const &value) {
std::unique_ptr<PutRequest> put_request(new PutRequest());
put_request->set_key(key);
put_request->set_value(value);
put_request->set_prev_kv(false);
RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_put(put_request.release());
}
void etcdv3::Transaction::setup_delete(std::string const &key) {
std::unique_ptr<DeleteRangeRequest> del_request(new DeleteRangeRequest());
del_request->set_key(key);
del_request->set_prev_kv(false);
RequestOp* req_success = txn_request.add_success();
req_success->set_allocated_request_delete_range(del_request.release());
}
etcdv3::Transaction::~Transaction() {
}

View File

@ -60,3 +60,15 @@ bool etcdv3::V3Response::has_values() const
{
return values.size() > 0;
}
void etcdv3::V3Response::set_lock_key(std::string const &key) {
this->lock_key = key;
}
std::string const & etcdv3::V3Response::get_lock_key() const {
return this->lock_key;
}
std::vector<mvccpb::Event> const & etcdv3::V3Response::get_events() const {
return this->events;
}

View File

@ -7,4 +7,7 @@ char const * etcdv3::SET_ACTION = "set";
char const * etcdv3::GET_ACTION = "get";
char const * etcdv3::DELETE_ACTION = "delete";
char const * etcdv3::COMPAREDELETE_ACTION = "compareAndDelete";
char const * etcdv3::LOCK_ACTION = "lock";
char const * etcdv3::UNLOCK_ACTION = "unlock";
char const * etcdv3::TXN_ACTION = "txn";