blog

Netty ノート - RPC アプリケーションを手書きする

前のプロジェクトに基づいて、新しいサブプロジェクト03-netty-rpcを作成し、プロジェクトの依存関係とMavenの設定は、GitHubのプロジェクトリポジトリを参照してください。 RPCリクエス...

Feb 21, 2020 · 10 min. read
シェア

RPC(Remote Proceduce Callリモートプロシージャコール)は、一般的に異なるマシン上に配置されたシステム間のメソッドコールを実装するために使用され、プログラムがリモートシステムリソースにアクセスするためにネットワーク転送を介してリモートシステムリソースへのアクセスのようにローカルシステムのリソースにアクセスできるようにします。

ここでは、Nettyを使って非常に簡単なRPCプログラムを書いてみます:

前のプロジェクトに基づいて、新しいサブプロジェクト03-netty-rpcを作成します。プロジェクトの依存関係とMaven設定については、GitHubプロジェクトリポジトリを参照してください。

、プロトコル RPC リクエストプロトコル

RPCリクエストのデータ形式を定義する新しいクラスRpcProtocol.javaを作成します。リモート・プロシージャ・コールは、Nettyネットワークを通して、このクラスを通してカプセル化され、送信用のバイナリ・データ・ストリームにシリアライズされて送信されなければなりません。これはJavaのネイティブなシリアライズとNetty独自のオブジェクトのエンコードとデコードを使用します。

TODOは、返されるデータのシリアライズと統一されたカプセル化にprotobuf / kyroを使用します。

/**
 * RPCリクエストのフォーマットには、以下のフィールドが必要である。
 */
@Data
public class RpcProtocol implements Serializable {
 private static final long serialVersionUID = L;
 /**
 * インターフェイス名は、サーバー側は、インターフェイス名にリモートサービスクラスの実際の実装を呼び出すためによる。
 */
 private String interfaceName;
 /**
 * メソッド名、インターフェース メソッド名
 */
 private String methodName;
 /**
 * パラメータ値
 */
 private Object[] paramValues;
 /**
 * パラメータの種類
 */
 private Class<?>[] paramTypes;
}

、プロバイダーサーバー側

Provider サーバーサイドは、次の 3 つの主要なクラスで構成されています。

  • Provider: Netty サービスを開始するサーバー側のエントリクラスです。
public class ProviderRegister {
 /**
 * サービス名とオブジェクト
 */
 private static final Map<String, Object> SERVICE_MAP = new ConcurrentHashMap<>();
 /**
 * RPCプロバイダにサービスを追加する
 */
 public <T> void addService(T service, Class<T> clazz) {
 // getCanonicalName() 渡されたクラスの出力をjava言語仕様で定義されたフォーマットで取得する。
 String serviceName = clazz.getCanonicalName();
 log.info("という名前のサービスを追加する。{}", serviceName);
 if (!SERVICE_MAP.containsKey(serviceName)) {
 // サービス名と対応するオブジェクトを SERVICE に追加する。_MAP
 SERVICE_MAP.put(serviceName, service);
 }
 }
 /**
 * RPCプロバイダ側でサービスを取得する
 */
 public Object getService(String serviceName) {
 Object service = SERVICE_MAP.get(serviceName);
 if (service == null) {
 log.debug("PRCサービスが見つからない");
 return null;
 }
 log.info("サービスを見つける{}", serviceName);
 return service;
 }
}

ProviderHandlerは、RpcProtocolリクエストデータのインターフェイス名に従って、対応するサービスを取得し、結果を書き戻します。

@Slf4j
public class ProviderHandler extends ChannelInboundHandlerAdapter {
 private final ProviderRegister register = new ProviderRegister();
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 Object result;
 RpcProtocol rpcProtocol = (RpcProtocol) msg;
 try {
 // サービスがプロバイダから利用可能かどうかを調べる。
 Object service = register.getService(rpcProtocol.getInterfaceName());
 // メソッド名とパラメータの型に基づいて、サービスから特定のメソッドを取得する。
 Method method = service.getClass().getMethod(rpcProtocol.getMethodName(),
 rpcProtocol.getParamTypes());
 // このメソッドを実行する
 result = method.invoke(service, rpcProtocol.getParamValues());
 // 結果を
 ctx.writeAndFlush(result);
 log.info("サービス名{}呼び出すメソッドは{}", rpcProtocol.getInterfaceName(), rpcProtocol.getMethodName());
 } catch (NoSuchMethodException | IllegalArgumentException |
 InvocationTargetException | IllegalAccessException e) {
 log.error("サービスが見つからない。");
 } finally {
 ctx.flush();
 ctx.close();
 }
 }
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
 cause.printStackTrace();
 ctx.close();
 }
}

Providerはサーバーサイドのエントリクラスで、Nettyサービスを開始します。これは前の2つのデモとほとんど同じですが、エンコーダーとデコーダーを置き換えて、特定のイベントを処理するProviderHandlerを登録する点が異なります。

@Slf4j
public class Provider {
 private final int port;
 private final String host;
 private final ProviderRegister register = new ProviderRegister();
 public Provider(String host, int port) {
 this.port = port;
 this.host = host;
 }
 /**
 * Nettyサービスの起動は前回のデモと同様だが、エンコーダーとデコーダーの違いがある。
 */
 public void start() {
 log.info("でRPCサービスを開始する。{} ポート番号は以下の通りだ:{}", host, port);
 EventLoopGroup bossGroup = new NioEventLoopGroup();
 EventLoopGroup workerGroup = new NioEventLoopGroup();
 try {
 ServerBootstrap bootstrap = new ServerBootstrap();
 bootstrap.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 // 接続のタイムアウトは、この時間を過ぎても接続が確立できない場合は、接続に失敗する。
 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
 // TCPデフォルトでは、Nagleアルゴリズムが有効になっている。これは、ネットワークトラフィックを減らすために、大きなデータをできるだけ速く送信するように設計されている。
 // TCP_NODELAY パラメータの役割は、Nagleアルゴリズムを有効にするかどうかを制御することである。
 .childOption(ChannelOption.TCP_NODELAY, true)
 // TCPの基礎となるハートビート機構を有効にするかどうか
 .childOption(ChannelOption.SO_KEEPALIVE, true)
 //は、3回のハンドシェイクを終えたリクエストを一時的に保存するためにシステムが使用するキューの最大長を示す。 接続が頻繁に確立されると、サーバーは新しい接続の作成を処理するのが遅くなる。
 // このパラメータを増やすことができる
 .option(ChannelOption.SO_BACKLOG, 128)
 // Channel チャネルバインディング ChannelPipeline
 .childHandler(new ChannelInitializer<SocketChannel>() {
 @Override
 protected void initChannel(SocketChannel ch) throws Exception {
 ChannelPipeline pipeline = ch.pipeline();
 // JDK独自のシリアライゼーション・メカニズムを使用する TODOでは、シリアライゼーションにprotobufやkryoを使用している。
 // オブジェクトデコーダ
 pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE,
 ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
 // オブジェクトエンコーダ
 pipeline.addLast(new ObjectEncoder());
 pipeline.addLast(new ProviderHandler());
 }
 });
 // bindでホストとポートをリッスンする
 ChannelFuture future = bootstrap.bind(host, port).sync();
 future.channel().closeFuture().sync();
 } catch (InterruptedException e) {
 log.error("サービスエラーを開く:", e);
 } finally {
 log.info("bossGroupとworkerGroupをオフにする");
 bossGroup.shutdownGracefully();
 workerGroup.shutdownGracefully();
 }
 }
 /**
 * PRC Provider サーバ側でサービスを登録する
 *
 * @param service  
 * @param clazz サービスインターフェースで定義されるクラス
 * @param <T> サービス固有の実装クラス
 */
 public <T> void addService(T service, Class<T> clazz) {
 register.addService(service, clazz);
 }
}

、消費者クライアント

  • Consumer:Providerサーバーに接続し、リクエストを送信します。
  • ConsumerHandler : サーバーから返されたデータを処理します。
  • ConsumerProxy: InvocationHandler を使用して、動的プロキシオブジェクトへのメソッド呼び出しを処理します。
// 返された結果を取り出す
public class ConsumerHandler extends ChannelInboundHandlerAdapter {
 private Object result;
 public Object getResult() {
 return result;
 }
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) {
 result = msg;
 }
}

Netty を使ってプロバイダー側と通信するコンシューマー側の基本的な使い方は、Netty のクライアントメソッドと同じです。

public class Consumer {
 private final int port;
 private final String host;
 private final RpcProtocol protocol;
 public Consumer(String host, int port, RpcProtocol protocol) {
 this.port = port;
 this.host = host;
 this.protocol = protocol;
 }
 public Object start() throws InterruptedException {
 // TODO Netty 接続の再利用は、これらのサービスを抽出する
 EventLoopGroup group = new NioEventLoopGroup();
 ConsumerHandler consumerHandler = new ConsumerHandler();
 try {
 Bootstrap bootstrap = new Bootstrap();
 bootstrap.group(group)
 .channel(NioSocketChannel.class)
 // 接続のタイムアウトは、この時間を過ぎても接続が確立できない場合は、接続に失敗する。
 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
 // TCPハートビート機構を有効にするかどうか
 .option(ChannelOption.SO_KEEPALIVE, true)
 // TCPデフォルトでは、Nagleアルゴリズムが有効になっている。これは、ネットワーク転送を減らすために、できるだけ速くビッグデータを送信するように設計されている。_NODELAY パラメータの役割は、Nagleアルゴリズムを有効にするかどうかを制御することである。
 .option(ChannelOption.TCP_NODELAY, true)
 // Channel チャネルバインディング ChannelPipeline
 .handler(new ChannelInitializer<SocketChannel>() {
 @Override
 protected void initChannel(SocketChannel ch) {
 ChannelPipeline pipeline = ch.pipeline();
 // オブジェクトパラメータ型デコーダ
 pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE,
 ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
 // オブジェクト・パラメータ・タイプ・エンコーダ
 pipeline.addLast(new ObjectEncoder());
 pipeline.addLast(consumerHandler);
 }
 });
 // サーバにリンクし、返されたデータを受信するためにChannelFutureを使用する。
 ChannelFuture future = bootstrap.connect(host, port).sync();
 // リクエストを送信する
 future.channel().writeAndFlush(protocol).sync();
 future.channel().closeFuture().sync();
 } finally {
 group.shutdownGracefully().sync();
 }
 return consumerHandler.getResult();
 }
}

ConsumerProxy は InvocationHandler インターフェースを実装しており、ダイナミックプロキシのメソッドを呼び出すと、実際には invoke() メソッドを呼び出します。

public class ConsumerProxy implements InvocationHandler {
 private final String host;
 private final int port;
 public ConsumerProxy(String host, int port) {
 this.host = host;
 this.port = port;
 }
 @SuppressWarnings("unchecked")
 public <T> T getProxy(Class<T> clazz) {
 // this つまり、ConsumerProxy のインスタンスを渡し、ダイナミックプロキシオブジェクトを呼び出すと、実際に ConsumerProxy を呼び出すことになる。.invoke() 以下は、Nettyを使って非常にシンプルなRPCサービスを実装する例である。
 return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
 }
 @Override
 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
 // リクエストをRPCとしてカプセル化する
 RpcProtocol protocol = new RpcProtocol();
 // メソッドのクラス名を取得する
 protocol.setInterfaceName(method.getDeclaringClass().getName());
 //  
 protocol.setMethodName(method.getName());
 // メソッドに渡されるパラメータの種類
 protocol.setParamTypes(method.getParameterTypes());
 // メソッドに渡されるパラメータの実際の値は、以下のようになる。
 protocol.setParamValues(args);
 // PRCコンシューマを起動する
 Consumer consumer = new Consumer(host, port, protocol);
 return consumer.start();
 }
}

このPRCプログラムを利用

  • interfaces : クライアント・サービスとサーバー・サービスによって定義されたインターフェース・クラス
  • provider : サーバーサイドのサービスおよびインターフェースの実装 インターフェースクラス
  • consumer:クライアント。プロジェクトのPRCプログラムを使用して、サーバー側のサービスを呼び出します。

インターフェイス:インターフェイス・サービスの定義

public interface HelloService {
 /** 文字列を返す*/
 String hello(String username);
}
@Data
@AllArgsConstructor
public class Message implements Serializable {
 private static final long serialVersionUID = -L;
 private int code;
 private String message;
}
public interface MessageService {
 /** カスタムJavaオブジェクトを返す*/
 Message sayMessage(String name);
}

プロバイダ:RPCサービスの登録とRPCリクエストのリッスン

package org.xian.rpc.test.provider
// 対応するインターフェイス HelloServiceImpl を実装する。.java
public class HelloServiceImpl implements HelloService {
 @Override
 public String hello(String username) {
 return "Hi " + username;
 }
}
// MessageServiceImpl.java
public class MessageServiceImpl implements MessageService {
 @Override
 public Message sayMessage(String name) {
 return new Message(200, "Your name is " + name);
 }
}
// PRCサーバ、ProviderApplicationを起動する。.java
public class ProviderApplication {
 public static void main(String[] args) {
 Provider server = new Provider(".1", 8080);
 // HelloService と MessageService サービスを登録する。
 server.addService(new HelloServiceImpl(), HelloService.class);
 server.addService(new MessageServiceImpl(), MessageService.class);
 // RPCサーバを起動する
 server.start();
 }
}

コンシューマー:RPCリモートサービスの呼び出し

package org.xian.rpc.test.consumer;
@Slf4j
public class ConsumerApplication {
 public static void main(String[] args) {
 ConsumerProxy proxy = new ConsumerProxy(".1", 8080);
 // 動的プロキシオブジェクトを生成する
 HelloService helloService = proxy.getProxy(HelloService.class);
 // 実際の呼び出しは invoke(Object proxy) である。, Method method, Object[] args)
 String result = helloService.hello("xian");
 log.info("HelloService 呼び出しの結果は{}", result);
 MessageService messageService = proxy.getProxy(MessageService.class);
 Message message = messageService.sayMessage("xiaoxian");
 log.info("MessageService 呼び出しの結果は{}", message.toString());
 }
}

この例では、Nettyを使用して非常にシンプルなRPCサービスプログラムを実装しています。もちろん、クライアント接続の再利用、Zookeeperやその他のサービスディスカバリを使用しないなど、最適化のポイントはまだたくさんあります。

github.com/guide-rpc-framework

Read next

Github ActionsでGithubページへのAngularデプロイを自動化する

最近、Angularを勉強していて、基本的な構文も覚えたので、githubに新しいコードリポジトリを作成し、ng-zorroを使ってバックエンドアプリケーションのテンプレートを構築し、今後小さなものを書くときにそのまま使えるようにしようと思っています。フロントエンドのプロジェクトは、実際に見ることができることがメインなので、自分のブログがgithuにデプロイされているので、デプロイする場所を見つけることを検討します。

Feb 21, 2020 · 10 min read