Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > Go语言 gPRC

Go语言中gPRC的使用

作者:爱吃芝麻汤圆

本文主要介绍了Go语言中gPRC的使用,包括Protobuf定义服务接口、HTTP/2协议与性能优势,以及流模式和发布-订阅系统的实现,具有一定的参考价值,感兴趣的可以了解一下

核心概念理解

1. Protobuf 与服务定义

gRPC 使用 Protobuf 来定义服务接口和消息格式。在你的例子中,hello.proto 文件定义了一个名为 HelloService 的服务,它包含一个方法 Hello,该方法接收一个 String 消息并返回一个 String 消息。

service HelloService {
  rpc Hello (String) returns (String);
}

这类似于定义一个函数接口,但它是跨网络调用的。

2. 代码生成

通过 Protobuf 编译器和 gRPC 插件,你可以生成客户端和服务器的代码:

protoc --go_out=plugins=grpc:. hello.proto

这行命令会生成:

3. 服务端实现

服务端需要实现 HelloServiceServer 接口:

type HelloServiceImpl struct{}

func (p *HelloServiceImpl) Hello(ctx context.Context, args *String) (*String, error) {
    reply := &String{Value: "hello:" + args.GetValue()}
    return reply, nil
}

这里的 Hello 方法是服务的具体实现,它接收一个字符串参数,添加 “hello:” 前缀后返回。

4. 服务启动

服务端需要启动 gRPC 服务器并注册服务:

func main() {
    grpcServer := grpc.NewServer()
    RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl))

    lis, err := net.Listen("tcp", ":1234")
    if err != nil {
        log.Fatal(err)
    }
    grpcServer.Serve(lis)
}

这部分代码:

5. 客户端调用

客户端需要连接到服务端并调用服务:

func main() {
    conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    client := NewHelloServiceClient(conn)
    reply, err := client.Hello(context.Background(), &String{Value: "hello"})
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println(reply.GetValue())
}

这部分代码:

与标准库 RPC 的区别

客户端异步调用

你提到 gRPC 不直接支持异步调用,但可以通过 Goroutine 实现:

// 客户端异步调用示例
go func() {
    reply, err := client.Hello(context.Background(), &String{Value: "async"})
    if err != nil {
        log.Println("Error:", err)
        return
    }
    fmt.Println("Async reply:", reply.GetValue())
}()

gPRC流

1. 为什么需要 gRPC 流?

传统 RPC 调用是"请求-响应"模式:

gRPC 流解决了这些问题,它允许:

2. 定义流服务

.proto 文件中,使用 stream 关键字定义流:

service HelloService {
  // 普通 RPC(一元)
  rpc Hello (String) returns (String);
  
  // 双向流 RPC
  rpc Channel (stream String) returns (stream String);
}

这里的 Channel 方法支持双向流:

3. 流接口详解

生成的代码中,流接口包含 SendRecv 方法:

// 服务端流接口
type HelloService_ChannelServer interface {
    Send(*String) error       // 向客户端发送消息
    Recv() (*String, error)   // 从客户端接收消息
    grpc.ServerStream
}

// 客户端流接口
type HelloService_ChannelClient interface {
    Send(*String) error       // 向服务器发送消息
    Recv() (*String, error)   // 从服务器接收消息
    grpc.ClientStream
}

这些方法允许双向、异步的数据交换。

4. 服务端实现

服务端实现 Channel 方法:

func (p *HelloServiceImpl) Channel(stream HelloService_ChannelServer) error {
    for {
        // 接收客户端消息
        args, err := stream.Recv()
        if err != nil {
            if err == io.EOF {
                return nil // 客户端关闭流
            }
            return err // 处理其他错误
        }

        // 处理消息并返回响应
        reply := &String{Value: "hello:" + args.GetValue()}
        err = stream.Send(reply)
        if err != nil {
            return err
        }
    }
}

关键点:

5. 客户端实现

客户端需要启动两个 Goroutine 分别处理发送和接收:

// 创建流
stream, err := client.Channel(context.Background())
if err != nil {
    log.Fatal(err)
}

// 发送消息的 Goroutine
go func() {
    for {
        if err := stream.Send(&String{Value: "hi"}); err != nil {
            log.Fatal(err)
        }
        time.Sleep(time.Second) // 每秒发送一次
    }
}()

// 接收消息的主循环
for {
    reply, err := stream.Recv()
    if err != nil {
        if err == io.EOF {
            break // 服务器关闭流
        }
        log.Fatal(err)
    }
    fmt.Println("Received:", reply.GetValue())
}

关键点:

6. 流的工作模式

gRPC 流有四种工作模式:

1. 一元 RPC(无流)

rpc UnaryMethod(Request) returns (Response) {}

2. 服务器流

rpc ServerStreamingMethod(Request) returns (stream Response) {}

3. 客户端流

rpc ClientStreamingMethod(stream Request) returns (Response) {}

4. 双向流

rpc BidirectionalStreamingMethod(stream Request) returns (stream Response) {}

7. 流的特性

  1. 全双工通信:客户端和服务器可以同时发送和接收数据
  2. 异步处理:发送和接收操作不阻塞其他操作
  3. 高效传输:基于 HTTP/2 的多路复用,单个连接支持多个流
  4. 大数据支持:适合传输大文件或持续数据流
  5. 实时性:数据可以立即传输,无需等待整个消息完成

8. 适用场景

9. 注意事项

  1. 错误处理:流可能因网络问题或服务器关闭而中断,需要适当处理错误
  2. 资源管理:流使用完后需要关闭,避免资源泄漏
  3. 并发控制:多个 Goroutine 访问同一个流时需要考虑同步问题
  4. 消息顺序:在双向流中,发送和接收的顺序可能不同步

gRPC发布-订阅

发布-订阅(PubSub)模式是一种消息传递模式,其中发送者(发布者)不会直接将消息发送给特定的接收者(订阅者),而是将消息分类发布到主题中。订阅者可以订阅一个或多个主题,只接收他们感兴趣的消息。这种模式实现了发布者和订阅者之间的解耦,非常适合构建分布式系统。

本地发布-订阅实现

首先,我们看一下基于 moby/moby/pkg/pubsub 包的本地实现:

import (
    "github.com/moby/moby/pkg/pubsub"
)

func main() {
    // 创建发布者,设置超时和队列大小
    p := pubsub.NewPublisher(100*time.Millisecond, 10)

    // 订阅golang主题
    golang := p.SubscribeTopic(func(v interface{}) bool {
        if key, ok := v.(string); ok {
            return strings.HasPrefix(key, "golang:")
        }
        return false
    })
    
    // 订阅docker主题
    docker := p.SubscribeTopic(func(v interface{}) bool {
        if key, ok := v.(string); ok {
            return strings.HasPrefix(key, "docker:")
        }
        return false
    })

    // 发布消息
    go p.Publish("hi")
    go p.Publish("golang: https://golang.org")
    go p.Publish("docker: https://www.docker.com/")
    
    // 接收消息
    go func() {
        fmt.Println("golang topic:", <-golang)
    }()
    go func() {
        fmt.Println("docker topic:", <-docker)
    }()

    <-make(chan bool) // 保持主程序运行
}

关键点:

基于gRPC的远程发布-订阅系统

现在,我们将使用gRPC扩展这个系统,使其能够跨网络工作。

1. 定义服务接口

service PubsubService {
  rpc Publish (String) returns (String);       // 发布消息
  rpc Subscribe (String) returns (stream String); // 订阅主题
}

这里:

2. 服务端实现

type PubsubService struct {
    pub *pubsub.Publisher
}

func NewPubsubService() *PubsubService {
    return &PubsubService{
        pub: pubsub.NewPublisher(100*time.Millisecond, 10),
    }
}

// 发布消息
func (p *PubsubService) Publish(ctx context.Context, arg *String) (*String, error) {
    p.pub.Publish(arg.GetValue())
    return &String{}, nil
}

// 订阅主题
func (p *PubsubService) Subscribe(arg *String, stream PubsubService_SubscribeServer) error {
    // 订阅特定主题
    ch := p.pub.SubscribeTopic(func(v interface{}) bool {
        if key, ok := v.(string); ok {
            return strings.HasPrefix(key, arg.GetValue())
        }
        return false
    })

    // 将接收到的消息通过流发送给客户端
    for v := range ch {
        if err := stream.Send(&String{Value: v.(string)}); err != nil {
            return err
        }
    }

    return nil
}

关键点:

3. 客户端发布消息

func main() {
    conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    client := NewPubsubServiceClient(conn)

    // 发布golang主题消息
    _, err = client.Publish(context.Background(), &String{Value: "golang: hello Go"})
    if err != nil {
        log.Fatal(err)
    }
    
    // 发布docker主题消息
    _, err = client.Publish(context.Background(), &String{Value: "docker: hello Docker"})
    if err != nil {
        log.Fatal(err)
    }
}

4. 客户端订阅消息

func main() {
    conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    client := NewPubsubServiceClient(conn)
    
    // 订阅golang主题
    stream, err := client.Subscribe(context.Background(), &String{Value: "golang:"})
    if err != nil {
        log.Fatal(err)
    }

    // 持续接收消息
    for {
        reply, err := stream.Recv()
        if err != nil {
            if err == io.EOF {
                break
            }
            log.Fatal(err)
        }

        fmt.Println(reply.GetValue())
    }
}

工作原理详解

服务端

客户端发布

客户端订阅

优势与应用场景

这种基于gRPC的发布-订阅系统具有以下优势:

典型应用场景包括:

通过这种方式,你可以构建一个跨网络的、高效的发布-订阅系统,充分利用gRPC的流特性和类型安全优势。

到此这篇关于Go语言中gPRC的使用的文章就介绍到这了,更多相关Go语言 gPRC内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文