gRPC中拦截器的使用详解
作者:童话ing
前言
本次主要介绍在gRPC中使用拦截器,包括一元拦截器和流式拦截器,在拦截器中添加JWT认证,客户端登录之后会获得token,请求特定的API时候需要带上token才能访问。由于代码中我们使用了grpc-gateway提供http服务,因此需要安装gateway的一些依赖
在本文中就简单介绍一下拦截器的使用,RPC请求分为一元RPC请求和流式RPC请求,所谓一元RPC指的就是请求和响应都是一次完成的,gRPC是基于HTTP2.0的,因此,一元RPC就可以看成客户端请求一次,服务端就响应一次。而流式RPC则是像流一样多次进行响应或者请求传输的。
相应地,拦截器分为服务端拦截和客户端拦截器,根据功能的不同又分为一元拦截器和流式拦截器。
服务端拦截器
1、一元拦截器:UnaryInterceptor
源码中写得比较清楚了, UnaryServerInterceptor 提供了一个钩子来拦截服务器上一元RPC的执行。 info 参数包含了这个RPC拦截器能操作的所有信息, handler 是服务方法实现的一个包装器,用于供拦截器中调用来处理RPC请求的逻辑。
// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the // server. Only one unary interceptor can be installed. The construction of multiple // interceptors (e.g., chaining) can be implemented at the caller. func UnaryInterceptor(i UnaryServerInterceptor) ServerOption { return newFuncServerOption(func(o *serverOptions) { if o.unaryInt != nil { panic("The unary server interceptor was already set and may not be reset.") } o.unaryInt = i }) }
主要看其中包含的参数 UnaryServerInterceptor :
// UnaryServerInterceptor provides a hook to intercept the execution of a unary RPC on the server. info // contains all the information of this RPC the interceptor can operate on. And handler // is the wrapper of the service method implementation. It is the responsibility of the // interceptor to invoke handler to complete the RPC. type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
其中几个参数解释为:
- ctx context.Context:请求上下文
- req interface{}:RPC 方法的请求参数
- info *UnaryServerInfo:包含了RPC 方法的所有信息
- handler UnaryHandler:RPC 方法真正执行逻辑
2、流式拦截器:StreamInterceptor
StreamServerInterceptor 提供了一个钩子来拦截服务器上流式RPC的执行。 info 包含拦截器可以操作的RPC的所有信息。 handler 是服务方法实现。拦截器负责调用 handler 来完成RPC。
// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the // server. Only one stream interceptor can be installed. func StreamInterceptor(i StreamServerInterceptor) ServerOption { return newFuncServerOption(func(o *serverOptions) { if o.streamInt != nil { panic("The stream server interceptor was already set and may not be reset.") } o.streamInt = i }) }
StreamServerInterceptor:
// StreamServerInterceptor provides a hook to intercept the execution of a streaming RPC //on the server.info contains all the information of this RPC the interceptor can operate //on. And handler is the service method implementation. It is the responsibility of the // interceptor to invoke handler to complete the RPC. type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
3、实现服务端拦截器
下面我们进行简单的实现:
// 一元拦截器 func Unary() grpc.UnaryServerInterceptor { return func( ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { log.Print("---------> the unaryServerInterceptor: ", info.FullMethod) err := interceptor.authorize(ctx, info.FullMethod)//实现拦截验证的逻辑,自行实现,我这里是截取的完整代码中的,也可以参考我的gitee上的代码 if err != nil { return nil, err } return handler(ctx, req) } } //stream拦截器 func Stream() grpc.StreamServerInterceptor { return func( srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { log.Print("--------->the streamServerInterceptor: ", info.FullMethod) err := interceptor.authorize(stream.Context(), info.FullMethod)//实现拦截验证的逻辑,自行实现,我这里是截取的完整代码中的,也可以参考我的gitee上的代码 if err != nil { return err } return handler(srv, stream) } }
将上面实现的拦截器加入到Server中即可:
serverOptions := []grpc.ServerOption{ grpc.UnaryInterceptor(Unary()), grpc.StreamInterceptor(Stream()), } grpcServer := grpc.NewServer(serverOptions...)
客户端拦截器
1、一元拦截器:WithUnaryInterceptor
// WithUnaryInterceptor returns a DialOption that specifies the interceptor for // unary RPCs. func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption { return newFuncDialOption(func(o *dialOptions) { o.unaryInt = f }) }
UnaryClientInterceptor
// UnaryClientInterceptor intercepts the execution of a unary RPC on the client. // Unary interceptors can be specified as a DialOption, using // WithUnaryInterceptor() or WithChainUnaryInterceptor(), when creating a // ClientConn. When a unary interceptor(s) is set on a ClientConn, gRPC // delegates all unary RPC invocations to the interceptor, and it is the // responsibility of the interceptor to call invoker to complete the processing // of the RPC. // // method is the RPC name. req and reply are the corresponding request and // response messages. cc is the ClientConn on which the RPC was invoked. invoker // is the handler to complete the RPC and it is the responsibility of the // interceptor to call it. opts contain all applicable call options, including // defaults from the ClientConn as well as per-call options. // // The returned error must be compatible with the status package. type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
2、流式拦截器:WithStreamInterceptor
// WithStreamInterceptor returns a DialOption that specifies the interceptor for // streaming RPCs. func WithStreamInterceptor(f StreamClientInterceptor) DialOption { return newFuncDialOption(func(o *dialOptions) { o.streamInt = f }) }
StreamClientInterceptor 拦截客户端流 ClientStream 的创建,流式拦截器可以指定为一个 Dial 选项。当创建一个客户端连接时,使用 WithStreamInterceptor() 或者 WithChainStreamInterceptor() 。当一个流拦截器被设置在客户端连接中的时候,gRPC将所有流的创建都交给拦截器,拦截器调用streamer。
// StreamClientInterceptor intercepts the creation of a ClientStream. Stream // interceptors can be specified as a DialOption, using WithStreamInterceptor() // or WithChainStreamInterceptor(), when creating a ClientConn. When a stream // interceptor(s) is set on the ClientConn, gRPC delegates all stream creations // to the interceptor, and it is the responsibility of the interceptor to call // streamer. // // desc contains a description of the stream. cc is the ClientConn on which the // RPC was invoked. streamer is the handler to create a ClientStream and it is // the responsibility of the interceptor to call it. opts contain all applicable // call options, including defaults from the ClientConn as well as per-call // options. // // StreamClientInterceptor may return a custom ClientStream to intercept all I/O // operations. The returned error must be compatible with the status package. type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
参数解释:
- ctx context.Context是请求上下文
- desc *StreamDesc包含了流中描述的信息
- cc *ClientConn是调用RPC的客户端连接
- method string是请求的方法名
- streamer Streamer是一个创建客户端流的处理器,流式拦截器中需要调用它。
- opts ...CallOption包含了所有适用呼叫选项,包括来自于客户端连接的默认选项和所有的呼叫。
3、实现客户端拦截器
下面是具体实现:
func Unary() grpc.UnaryClientInterceptor { return func( ctx context.Context, method string, req, reply interface{}, conn *grpc.ClientConn, invoker grpc.UnaryInvoker, //回调函数 opts ...grpc.CallOption, ) error { log.Printf("-------> unary interceptor: %s", method) if authMethods[method] { //如果是拦截的方法,在调用实际的rpc方法之前将token添加到context中,这个存储需要拦截的方法 return invoker(attachToken(ctx), method, req, reply, conn, opts...) //attachToken为自己实现的客户端拦截请求之后附加token的方法 } return invoker(ctx, method, req, reply, conn, opts...) } } // Stream returns a client interceptor to authenticate stream RPC func Stream() grpc.StreamClientInterceptor { return func( ctx context.Context, desc *grpc.StreamDesc, conn *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption, ) (grpc.ClientStream, error) { log.Printf("-------> stream interceptor: %s", method) if interceptor.authMethods[method] { return streamer(attachToken(ctx), desc, conn, method, opts...) } return streamer(ctx, desc, conn, method, opts...) } }
使用也比较简单,在Dial中添加即可:
conn2, err := grpc.Dial( Address, //地址 transportOption, //SSL/TLS证书选择 grpc.WithUnaryInterceptor(Unary()), grpc.WithStreamInterceptor(Stream())) if err != nil { log.Fatal("cannot dial server: ", err) }
到此这篇关于gRPC中拦截器的使用详解的文章就介绍到这了,更多相关gRPC中的拦截器内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!