grpc框架


采用proto buffer作为idl语言。
采用http2进行通信,body采用protobuf序列化后的二进制传输,将字段名去掉,以数字代替。

四种模式:一个端口可以对应多个service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
syntax = "proto3";
package helloworld; // 命名空间
option go_package = "grpc/helloworld/proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/any.proto"; // go 里面的interface类型

service Greeter {
// simple rpc
rpc GetFeature(Point) returns (Feature) {}

// server-to-client streaming rpc,下载文件
rpc ListFeatures(Rectangle) returns (stream Feature) {}

// client-to-server streaming rpc,上传文件
rpc RecoredRoute(stream Point) returns (RouteSummary) {}

// Bidirectional streaming rpc,开多线程读写实现,问答场景
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}

// 结构体以message开头
message Feature {
// 1-15 占一个字节
string name = 1;
Point location = 2;
google.protobuf.Timestamp birthday = 9;
Address address = 10; // 自定义类型
repeated string hobys = 11; // 多个兴趣
map<string, google.protobuf.Any> data = 12;
reserved 3, 4 to 8; // 保留字段
reserved "phone", "email";
}

enum Gender {
FEMALE = 0; // 必须0开始
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// ListFeatures
std::unique_ptr<ClientReader<Feature>> reader(stub_->ListFeatures(&context, rect)); // 通过stub_
while (reader->Read(&feature)) { // 重复接收
std::cout << "Found feature called " << feature.name() << " at " << feature.location().latitude() / kCoordFactor_ << ", " << feature.location().longitude() / kCoordFactor_ << << std::endl;
}
Status status = reader->Finish();
if (status.ok()) {
std::cout << "ListFeatures rpc success." << std::endl;
} else {
std::cout << "ListFeatures rpc failed." << std::endl;
}

// 主线程读
RouteNote server_note;
while (stream->Read(&server_note)) {
std::cout << "Got message " << server_note.message() << " at " << server_note.location().latitude() << ", " << server_note.location().longitude() << std::endl;
}
writer.join(); // 等待写线程完成

grpc用于微服务所面临的问题:

  1. 用户鉴权问题,鉴权方案的具体实现还包括了多种场景下的解决方案,例如:基于 JWT 或 OAuth 认证、基于多种细粒度授权方案进行授权、支持 OIDC 等。开发者需要根据具体的业务需求和安全要求,选择合适的鉴权方案。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    // 在 gRPC 客户端中,为 gRPC 通道设置用户凭据(Credentials),创建了基于 SSL 的通道凭据和通过访问令牌(access token)创建的凭据,并将两种凭据合成为一个 CompositeCredentials 对象,用于构建 gRPC 客户端的通道。
    st::shared_ptr<ChannelCredentials> creds = grpc::SslCredentials(
    grpc::SslCredentialsOptions({}));
    creds = grpc::CompositeChannelCredentials(creds, grpc::AccessTokenCredentials("access_token"));

    std::shared_ptr<grpc::Channel> channel = grpc::CreateChannel("localhost:50051", creds);
    // 服务端代码中,该拦截器检查 gRPC 服务的方法名称,仅在方法名称中包含“UnauthenticatedMethod”的情况下应用鉴权。然后该拦截器访问 gRPC 消息上下文中的客户端凭据,验证客户端身份是否合法,如果合法,则允许请求通过继续执行下一个拦截器或者是服务;如果不合法,则抛出 UNAUTHENTICATED 的异常信息。
    class AuthInterceptor : public grpc::Interceptor {
    public:
    explicit AuthInterceptor(const std::string& secret_token) : secret_token_(secret_token) {}

    void Intercept(grpc::experimental::InterceptorBatchMethods* methods) override {
    auto& ctx = methods->GetContex();
    auto method_name = ctx->method();
    auto call_creds = ctx->call_creds();

    if (method_name.find("UnauthenticatedMethod") == std::string::npos)
    return methods->Proceed();

    if (!call_creds || call_creds->ApplyToCall(method_name, nullptr).ok()) {
    return methods->Proceed();
    }

    return methods->Finish(
    grpc::Status(grpc::StatusCode::UNAUTHENTICATED, "Token invalid or expired"), nullptr);
    }

    private:
    std::string secret_token_;
    };
  2. grpc数据传输过程:

    • gRPC 序列化:当客户端在 gRPC 上调用远程过程时,客户端代码会将函数参数的数据打包(序列化),将其转换成文本或二进制形式(protobuf格式)。
    • gRPC 通信:序列化后的数据被传输到服务器,并从客户端通道发送到服务端通道。gRPC 通信使用 HTTP/2 协议进行传输,HTTP/2 支持双向流和二进制数据,因此 gRPC 可以同时支持请求和响应。
    • gRPC 反序列化:当服务端接收到请求时,它会对请求数据进行反序列化以获取方法参数。服务端代码执行完响应方法后,将返回值打包(序列化)并将其发送回客户端。
    • gRPC 解包:收到服务端返回的结果后,客户端反序列化结果。
    • 客户端调用响应回调函数:客户端调用响应回调函数,使用服务端响应结果完成调用。
  3. 拦截器,我们可以访问请求的元数据(Headers),验证身份、记录日志或者做其他任何自定义操作。需要注意的是,拦截器操作的顺序由服务的拦截器列表中拦截器的添加顺序决定,所以需要开发人员根据需要合理安排拦截器的顺序。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    void CustomInterceptor::Intercept(
    grpc::experimental::InterceptorBatchMethods* methods) {
    auto& ctx = methods->GetContex();

    // 在拦截器前执行的逻辑
    std::string method_name = ctx->method();
    std::string address = ctx->peer();
    ...

    // 执行下一个拦截器或服务逻辑
    methods->Proceed();

    // 在拦截器后执行的逻辑
    grpc::Status status = ctx->status();
    ...
    }
  4. 客户端负载均衡(如果服务端已经部署为负载均衡,那么无需客户端负载均衡)

    1
    2
    3
    4
    5
    6
    7
    grpc_core::RefCountedPtr<grpc_core::SubchannelInterface> PickSubchannel(
    grpc_core::LoadBalancingPolicy::PickSubchannelArgs args) override
    {
    grpc_core::SubchannelPicker::PickResult pick_result;
    pick_result.picked_subchannel = args.connections.at(0);
    return pick_result.subchannel;
    }

    nginx负载均衡

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    upstream redislock {
    server 192.168.1.3:8080 weight=1;
    server 192.168.1.3:8090 weight=1;
    }

    server {
    location / {
    root html;
    index index.html index.htm;
    proxy_pass http://redislock;
    }
    }
  5. 服务的健康检查,Watch 命令,可以在发生状态更改时更新客户端。客户端可以使用 Watch 命令订阅特定服务的状态。如果检测到状态更改,gRPC 服务器将使用更新的状态响应流进行响应。

  6. 服务之间的认证问题

    • SSL/TLS 是保护 gRPC 服务最常用的方法之一。使用 SSL/TLS 可以确保客户端和服务端之间的通信是加密的和私密的。gRPC 允许您在运行时将 SSL/TLS 配置添加到服务器和客户端中。您可以使用受信任的证书颁发机构 (CA) 颁发的证书或者使用自己的自签名证书。

      1
      2
      3
      4
      5
      // 生成私钥
      openssl genpkey -algorithm RSA -out server.key

      // 使用私钥生成自签名证书
      openssl req -new -key server.key -x509 -days 365 -out server.crt
    • OAuth2 是一种广泛使用的认证协议,允许客户端安全地访问受保护的资源。gRPC 支持 OAuth2 认证协议,可以使用 OAuth2 2.0 或 OpenID Connect 作为身份验证手段。在 gRPC 服务器端,您可以使用 grpc::AuthContext 对象来获取检索到的 OAuth2 访问令牌以进行身份验证。

      1
      2
      3
      4
      5
      6
      7
       // 服务端添加 OAuth2 认证
      std::shared_ptr<grpc::ServerCredentials> auth_creds = grpc::GoogleDefaultCredentials();
      builder.SetDefaultServiceAccount("my_account_id@appspot.gserviceaccount.com");
      builder.SetAuthCredentials(auth_creds);
      // 客户端添加 OAuth2 认证
      std::shared_ptr<grpc::ChannelCredentials> creds = grpc::GoogleDefaultCredentials();
      std::shared_ptr<grpc::Channel> channel = grpc::CreateChannel(server_address, creds);
    • Token-based authentication 是一种常用的身份验证机制,它使用短期访问令牌来确定客户端是否有权访问受保护的资源。您可以使用基于 JWT(JSON Web Token)的 token-based authentication 机制来保护您的 gRPC 服务。gRPC 提供了开箱即用的 token-based authentication 机制,可以使用 grpc::Metadata 对象来检索检索到的 JWT 令牌。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      // 添加 JWT 认证
      std::shared_ptr<grpc::experimental::ServerInterceptor> interceptor =
      grpc::jwt::CreateJwtVerificationInterceptor(
      std::bind(&MyServiceImpl::GetPublicKey, this, std::placeholders::_1));
      builder.AddListeningPort(server_address, creds);
      builder.RegisterService(&service);
      builder.AddInterceptor(interceptor); // 通过拦截器实现
      // 客户端添加 JWT 认证
      const std::string kToken = "my_access_token";
      std::shared_ptr<grpc::CallCredentials> creds =
      grpc::AccessTokenCredentials(kToken);
      std::shared_ptr<grpc::Channel> channel =
      grpc::CreateChannel(server_address, creds);
  7. 服务限流的问题,服务接口限流

    • 客户端限流:在gRPC客户端中实现限流,可以通过控制客户端请求的数量和速率来保护服务端免受过载和拒绝服务攻击。客户端可以使用gRPC提供的client_interceptor实现限流器,截获客户端的请求并进行限制,例如在一定时间窗口内只允许最多n个请求,或在单位时间内只允许最多m个请求等等。
    • 服务端限流:在gRPC服务端中实现限流,可以通过控制服务端的并发请求数和服务速率来保护系统免受过载和拒绝服务攻击。服务端可以利用gRPC的server_interceptor实现限流器,截获服务端的请求并进行限制,例如在一定时间窗口内只允许最多n个请求,或在单位时间内只允许最多m个请求等等。
    • 结合负载均衡进行限流:将限流逻辑与负载均衡策略结合,可以更加细粒度地对服务进行限制,并在不同服务节点之间进行均衡。负载均衡器可以基于不同的负载指标进行服务节点选择,同时限流器根据节点的负载情况进行流量控制,避免过载和服务不可用的情况。
    • 基于流控的限流: gRPC支持基于流控的限流,流控是一种通过调整请求和响应的发送速率来控制通信速率的技术。通过限制请求流的速率,可以避免服务端压力过大和无效负载。流控限速的具体实现,可以通过gRPC提供的拦截器和流过滤器进行控制和调整。
  8. 服务端熔断,通过判断发生错误的次数,对服务做降级。C++中,可以实现一个熔断器类用于处理熔断逻辑。需要定义状态机,当状态机状态从CLOSED状态进入OPEN状态时,需要触发熔断处理;当状态从OPEN状态进入CLOSED或HALF_OPEN时,需要重新接收客户请求并处理。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    struct CircuitBreakerConfig {
    int consecutiveErrorsThreshold; // 连续错误阈值
    int requestVolumeThreshold; // 请求阈值
    int sleepWindow; // 熔断窗口大小
    };

    class CircuitBreaker {
    public:
    CircuitBreaker(const CircuitBreakerConfig& config) : config_{config} {}
    bool isOpen(); // 判断是否熔断
    bool allowRequest(); //是否允许请求
    bool markSuccess(); // 标记请求成功
    bool markNonSuccess(); // 标记请求失败

    private:
    CircuitBreakerConfig config_;
    std::atomic<int> consecutiveErrors_{0}; // 连续错误次数
    std::atomic<int> requestVolume_{0}; // 请求数量
    std::mutex mtx_;
    enum State {
    CLOSED = 1,
    OPEN = 2,
    HALF_OPEN = 3,
    };
    std::atomic<State> state_{CLOSED}; // 状态机
    std::chrono::steady_clock::time_point lastTested_;
    };

    class CircuitBreakerInterceptor final : public grpc::experimental::Interceptor {
    public:
    CircuitBreakerInterceptor(CircuitBreaker* circuit_breaker,
    const std::shared_ptr<Tracer>& tracer)
    : circuit_breaker_{circuit_breaker}, tracer_{tracer} {}

    void Intercept(grpc::experimental::InterceptorBatchMethods* methods) override {
    if (!circuit_breaker_->allowRequest()) {
    methods->SetCallStatus(grpc::Status(grpc::StatusCode::UNAVAILABLE, "Circuit breaker is opened"));
    return;
    }

    tracer_->StartSpan("grpc_request");
    methods->Next();
    if (methods->QueryStatus().ok()) {
    circuit_breaker_->markSuccess();
    } else {
    circuit_breaker_->markNonSuccess();
    }
    tracer_->EndSpan();
    }

    private:
    CircuitBreaker* circuit_breaker_;
    const std::shared_ptr<Tracer>& tracer_;
    };
  9. 日志追踪,将日志记录器和追踪器集成到 gRPC 客户端和服务端中,可以通过选项或拦截器的方式实现。在拦截器中针对请求和响应数据,调用追踪器的接口记录相应的日志和追踪数据。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    std::shared_ptr<opentracing::Tracer> tracer = ...   // 创建追踪器
    std::shared_ptr<Tracer> tracerWrapper = std::make_shared<TracerImpl>(std::move(tracer)); // 将追踪器封装为Tracer实例
    LoggingInterceptor interceptor(loggerWrapper, tracerWrapper); // 将日志记录器和追踪器封装为LoggingInterceptor实例

    //分布式追踪是指在分布式系统中,通过记录每个服务的请求信息、传递信息和响应信息,并将其组合起来,形成一个完整的请求流程,从而实现分布式追踪和排查问题的目的。需要记录的信息包括请求时间、响应时间、传递的参数、调用链等。这些信息通常是跨多个进程和主机进行传递和组合,因此需要一种标准的接口和格式来描述这些信息。
    class TracerImpl : public Tracer {
    public:
    TracerImpl(std::unique_ptr<opentracing::Tracer>&& tracer) : tracer_{std::move(tracer)} {}

    void StartSpan(const std::string& operation_name) override {
    tracing::SpanPtr parent_span = tracing::SpanContext::GetSpan();
    tracing::SpanPtr span;
    if (parent_span != nullptr) {
    span = tracer_->StartSpan(operation_name, {opentracing::ChildOf(&parent_span->context())});
    } else {
    span = tracer_->StartSpan(operation_name);
    }
    tracing::SpanContext::SetSpan(std::move(span));
    }

    void EndSpan() override {
    tracing::SpanPtr span = tracing::SpanContext::GetSpan();
    if (span != nullptr) {
    span->Finish();
    tracing::SpanContext::SetSpan(nullptr);
    }
    }

    private:
    std::unique_ptr<opentracing::Tracer> tracer_;
    };

    class LoggingInterceptor final : public grpc::experimental::Interceptor {
    public:
    LoggingInterceptor(const std::shared_ptr<Logger>& logger, const std::shared_ptr<Tracer>& tracer)
    : logger_{logger}, tracer_{tracer} {}

    void Intercept(grpc::experimental::InterceptorBatchMethods* methods) override {
    tracer_->StartSpan("grpc_request");
    auto context = methods->GetCallContext();
    auto endpoint = context->peer();
    auto method = context->method();
    auto deadline = methods->GetDeadline().time_since_epoch().count();
    logger_->Log("Start grpc request: " + method + " from " + endpoint + " with deadline " + std::to_string(deadline));

    methods->Next();

    if (methods->QueryStatus().ok()) {
    logger_->Log("End grpc request successfully: " + method);
    } else {
    logger_->Log("End grpc request with error: " + methods->QueryStatus().error_message());
    }
    tracer_->EndSpan();
    }

    private:
    const std::shared_ptr<Logger>& logger_;
    const std::shared_ptr<Tracer>& tracer_;
    };

微服务优化方法主要涉及以下几个方面:

  1. 负载均衡:对于集群中的微服务,通过负载均衡来分配请求,确保所有服务实例能够得到平均的工作量,提高系统的可用性和可靠性。
    • 轮询策略是一种较为简单的负载均衡策略,将请求依次分配到每台服务器上,便于实现和管理。此策略实现简单,但难以处理请求处理时间长短不均衡的情况。
    • 随机策略是在请求到达时随机选择一台服务器处理。此策略实现简单,能在一定程度上处理负载均衡,但无法处理后端服务器不同或者资源不同的情况。
    • 最小连接数策略是指将新的请求发送到当前连接数最少的服务器上,以达到负载均衡的目的。此策略能更加均衡地分配负载,但难以处理负载突然爆发的情况。
    • 加权轮询策略是在轮询策略的基础上,增加了不同服务器的权重,服务器权重越高则处理请求的比例越高。此策略能根据不同服务器的处理能力分配请求,但权重设置需要经验和实践的调整。
    • IP Hash策略是根据请求的IP地址选择一个服务器处理,这样每个IP地址的请求都会被分配到同一台服务器上。此策略能在一定程度上减少会话迁移带来的开销,但会造成一个IP地址请求的负载不均衡情况。
  2. 异步处理:采用异步处理机制来提高系统的吞吐量,减少服务的响应时间。可以通过消息队列等方式来实现。例如,通过使用消息队列将请求剥离开来,服务可以实时地做出响应并处理其他请求,消息队列在服务空闲时可以异步地处理剩余请求。
  3. 熔断机制:通过熔断机制,及时发现故障并进行相应的快速处理,提高服务的稳定性。
    熔断机制是一种类似于保险丝的机制,通过类似的手段自动切换服务,当服务出现故障或者异常的情况时会触发熔断机制。熔断机制通常由三种状态组成(closed、open、half-open):
    • Closed状态:默认状态下,当服务正常运行时,熔断器处于关闭状态,所有请求正常处理。
    • Open状态:当服务异常达到一定阈值(例如错误率、请求超时等),熔断器自动进入打开状态,拒绝请求并调用回退函数来处理请求。
    • Half-Open状态:经历一段时间后,进入半打开状态,允许一个请求通过来测试服务状态。若请求测试成功,则熔断器重新进入关闭状态。否则,熔断器回退到打开状态
  4. 自动伸缩机制:自动化的服务伸缩处理策略,能够根据当前负载情况来动态调整服务的实例数量,并确保服务能够以高效的方式运行。Kubernetes是一种容器编排平台,可通过水平扩展器(Horizontal Pod Autoscaler)实现自动伸缩。该工具能够根据预设置的指标,自动增加或减少Pod数量,从而实现微服务的自动伸缩。
  5. 统一配置管理:在微服务中,使用统一的配置管理策略,以便轻松地管理服务中不同模块的配置信息。类似于云配置。
  6. 微服务监控:实时监控微服务的性能,及时发现和解决问题,在各种情况下保障服务的准备性、可用性、可靠性和性能。
  • Prometheus和Grafana: Prometheus是一种开源的指标收集系统,可用于监视微服务的性能和健康状况。Grafana是一个面向数据可视化的开源工具,它可以将Prometheus的指标数据进行可视化展示
  • ELK Stack: ELK Stack是一组工具,包括Elasticsearch、Logstash和Kibana,可用于实时分析和可视化微服务应用程序日志。Elasticsearch是一个开源搜索和分析引擎,Logstash是一个日志收集和转换工具,Kibana是一个数据可视化工具。

Logstash使用:

  • 配置input: 向Logstash提供数据的源称为“input”。可以在Logstash的配置文件中定义input,例如,从文件、网络套接字、管道或Kafka中读取数据。
  • 配置filter: 在input和output之间可以定义“filter”,用于处理和转换输入数据。filter可以加入多个步骤来清理、处理和分析数据。例如,可以配置grok filter来分解日志记录数据,并将其转换为可索引的格式。
  • 配置output: 最终,需要将处理过的数据输出到目标源或存储。output可以是网络套接字、文件、Kafka或Elasticsearch等等。例如,设置output为Elasticsearch,用于存储和检索数据。
nephen wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!
坚持原创技术分享,您的支持将鼓励我继续创作!