blog

gRPCでStreamとTransportイベントをリスニングする

gRPC はリクエストを監視するインターセプターを提供しますが、ストリームまたはトランスポートの特定のイベントはインターセプターを通じて実装できません。gRPC はそのよ...

Sep 4, 2022 · 3 min. read


gRPCでSreamとTransportイベントをリスニングする

gRPC はリクエストを監視するインターセプターを提供しますが、ストリームまたはトランスポートの特定のイベントはインターセプターを通じて実装できません。gRPC はそのような機能をサポートする StreamTracer と TransportFilter を提供します。

StreamTracer

StreamTracer は、ストリームの終了、受信および送信メッセージ、受信および送信ストリーム サイズなどを含む、ストリームのすべてのイベントを監視するために使用されます。

StreamTracer にはクライアント用の ClientStreamTracer とサーバー用の ServerStreamTracer があります。

クライアント用の ClientStreamTracer

クライアントの StreamTracer はインターセプターに挿入されます。リクエストが実行されると、カスタム ClientStreamTracerFactory を callOptions に追加できるため、対応する StreamTracer が作成されて監視が実装されます。

  • MyClientInterceptor.java

インターセプタで指定する

public class MyClientInterceptor implements ClientInterceptor {
        @Override
        public <T, Q> ClientCall<T, Q> interceptCall(MethodDescriptor<T, Q> method, CallOptions options, Channel next) {
            options = options.withStreamTracerFactory(new CustomClientStreamTracerFactory(method, options, next));
    
            return next.newCall(method, options);
        }
    }
    
  • MyClientStreamTracerFactory.java

実装クラスで監視する必要があるイベントメソッドをオーバーライドします。

public class MyClientStreamTracerFactory<T, Q> extends ClientStreamTracer.Factory {
    
        private final MethodDescriptor<T, Q> method;
        private final CallOptions options;
        private final Channel next;
    
        public CustomClientStreamTracerFactory(MethodDescriptor<T, Q> method, CallOptions options, Channel next) {
            this.method = method;
            this.callOptions = options;
            this.next = next;
        }
    
        @Override
        public ClientStreamTracer newClientStreamTracer(ClientStreamTracer.StreamInfo info, Metadata headers) {
            return new CustomClientStreamTracer<>(method, options, next, info, headers);
        }
    }
    
    class MyClientStreamTracer<T, Q> extends ClientStreamTracer {
    
        public MyClientStreamTracer(MethodDescriptor<T, Q> method, CallOptions options, Channel next, StreamInfo info, Metadata headers) {
            // do something
        }
    }
    

サーバー用の ServerStreamTracer

サーバー側の StreamTracer はサーバーの構築時に指定され、リクエストの実行時に対応するメソッドをリッスンします。

  • サーバーの構築
Server server = ServerBuilder.forPort(11111) 
                                 .addStreamTracerFactory(new MyServerStreamTracerFactory())
                                 .build();
    
  • MyServerStreamTracerFactory.java

リッスンするイベントのメソッドは CustomServerStreamTracer でオーバーライドできます。

public class MyServerStreamTracerFactory extends ServerStreamTracer.Factory {
        @Override
        public ServerStreamTracer newServerStreamTracer(String methodName, Metadata metadata) {
            return new CustomServerStreamTracer(methodName, metadata);
        }
    }
    
    class MyServerStreamTracer extends ServerStreamTracer {
    
        private final String methodName;
    
        private final Metadata metadata;
    
        public CustomServerStreamTracer(String methodName, Metadata metadata) {
            this.methodName = methodName;
            this.metadata = metadata;
        }
    }
    

Transport リスニング

Transportの準備完了イベントと終了イベントをリッスンできます

サーバ側

  • MyServerTransportFilter.java
public class MyServerTransportFilter extends ServerTransportFilter {
    
        @Override
        public Attributes transportReady(Attributes attrs) {
            // do something
            return super.transportReady(attrs);
        }
    
        @Override
        public void transportTerminated(Attributes attrs) {
            // do something
            super.transportTerminated(attrs);
        }
    }
    
Read next