blog

RPCフレームワークを手書きできるようにするために、それを読み取る。

リモート・プロシージャ・コール。 RPC はクライアント・サーバ構造を使用し、Request-Response メッセージ・パターンで実装されます。 サーバー側プロシージャの呼び出しにおけるサーバー・...

May 7, 2020 · 18 min. read
シェア

RPC基本

Remote Procedure Call:リモートプロシージャコール。

プロセスとは?

プロセスとは、ビジネスプロセス、計算タスクのことであり、より端的に理解すれば、手続きのことです。

RPCはClient-Server構造を使用し、Request-Responseメッセージパターンで実装されます。



  • クライアントスタブは、クライアントの処理中に呼び出され、パラメータを渡します;
  • Clientスタブは引数をメッセージにグループ化し、システムコールを介してサーバーにメッセージを送信します;
  • クライアントのローカル・オペレーティング・システムは、クライアント・マシンからサーバー・マシンにメッセージを送信します;
  • サーバーオペレーティングシステムは、受信したパケットをサーバースタブに渡します。
  • サーバスタブのグループ解除メッセージを引数に指定します;
  • ServerスタブはServer側プロシージャを呼び出しており、プロシージャの実行結果は同じ手順で逆方向にクライアントに応答されます。
  • Client stubサーバ・スタブの開発;
  • パラメータをメッセージにグループ化する方法と、メッセージのグループ化を解除する方法;
  • メッセージの送信方法
  • プロセスの結果がどのように表現され、例外がどのように処理されるか;
  • 安全なアクセス制御を実現するには

RPC呼び出しのプロセスでは、送信するためにパラメータをメッセージにグループ化する必要があり、受信側ではメッセージをパラメータにグループ化解除する必要があり、プロセスの結果もグループ化解除する必要があります。メッセージはどの部分から構成され、メッセージ表現形式がメッセージ・プロトコルを構成します。

RPCプロトコルは、リクエスト・メッセージとレスポンス・メッセージのフォーマットを指定します。

オプションまたはカスタマイズされたメッセージング・プロトコルは、RPCメッセージのやり取りを完了するためにTCP上で使用することができます。

一般的な標準プロトコルを選択することもできますが、独自のニーズに応じて、独自のメッセージ・プロトコルを定義することもできます。

パラメータ・グルーピング、メッセージ・アングルーピング、および基礎となるネットワーク通信をカプセル化するRPCプログラム開発フレームワークの利便性は、手続き的なコード記述のみに集中することで、それを直接構築できることです。

Javaドメイン:

  • 新しいマイクロサービスフレームワーク:Dubbo、spring cloud、Apache Thriftなど。

手書き RPC

は、プロジェクト内のスターターの導入、および簡単な設定を通じて、単純なRPCフレームワークを書いて、それをleisure-rpc-spring-boot-starterと呼ぶことにしましょう、リモートサービスを提供する能力を持っているプロジェクト。

カスタムアノテーション @Service を記述すると、このアノテーションでアノテーションされたクラスがリモートサービスを提供します。



どのようなサービスが利用可能かを知った上で、サービスコールを実行するサービスエージェントも存在しなければなりません。サービスと通信したいクライアントは、同じメッセージングプロトコルを持たなければなりません。リモートサービスを呼び出したいクライアントは、ネットワークリクエストを行う能力、すなわちネットワーク層の機能を持たなければなりません。

もちろん、これはクライアントが必要とする最も基本的な機能であり、実際にはロードバランシングのような拡張可能な機能もあります。

まず、クライアント・コードの構造を見てみましょう:



インターフェース指向プログラミングの概念に基づき、異なる役割は対応する仕様を定義するインターフェースを実装します。ここにはメッセージプロトコルに関連するコンテンツはありません。それは、サーバー側もメッセージプロトコルを必要とするため、それを抽出してパブリックレイヤーに配置しているからです。

/**
 * サービス・ディスカバリー仕様を定義するサービス・ディスカバリー抽象クラス
 *
 * 
 * @since 1.0.0
 */
public interface ServiceDiscoverer {
 List<Service> getServices(String name);
}
/**
 * Zookeeperサービスディスカバラは、Zookeeper をレジストリとしてサービスディスカバリの詳細を定義する。
 *
 * 
 * @since 1.0.0
 */
public class ZookeeperServiceDiscoverer implements ServiceDiscoverer {
 private ZkClient zkClient;
 public ZookeeperServiceDiscoverer(String zkAddress) {
 zkClient = new ZkClient(zkAddress);
 zkClient.setZkSerializer(new ZookeeperSerializer());
 }
 /**
 * Zookeeper クライアントを使用して、サービス名でサービスのリストを取得する。
 * サービス名形式:インターフェース・フル・パス
 *
 * @param name  
 * @return サービスのリスト
 */
 @Override
 public List<Service> getServices(String name) {
 String servicePath = LeisureConstant.ZK_SERVICE_PATH + LeisureConstant.PATH_DELIMITER + name + "/service";
 List<String> children = zkClient.getChildren(servicePath);
 return Optional.ofNullable(children).orElse(new ArrayList<>()).stream().map(str -> {
 String deCh = null;
 try {
 deCh = URLDecoder.decode(str, LeisureConstant.UTF_8);
 } catch (UnsupportedEncodingException e) {
 e.printStackTrace();
 }
 return JSON.parseObject(deCh, Service.class);
 }).collect(Collectors.toList());
 }
}

サービスディスカバラはZookeeperを使って実装されており、ZkClientを通してZKに登録されたサービスを簡単に発見することができます。もちろん、Redis など他のコンポーネントをレジストリとして利用することもできます。

/**
 * Web リクエスト・クライアントで、Web リクエスト仕様を定義する
 *
 * 
 * @since 1.0.0
 */
public interface NetClient {
 byte[] sendRequest(byte[] data, Service service) throws InterruptedException;
}
/**
 * Nettyネットワーク・リクエスト・クライアントでは、Netty経由でネットワーク・リクエストを実装する詳細を定義する。
 *
 * 
 * @since 1.0.0
 */
public class NettyNetClient implements NetClient {
 private static Logger logger = LoggerFactory.getLogger(NettyNetClient.class);
 /**
 * リクエストを送信する
 *
 * @param data データを要求する
 * @param service サービス情報
 * @return レスポンス・データ
 * @throws InterruptedException  
 */
 @Override
 public byte[] sendRequest(byte[] data, Service service) throws InterruptedException {
 String[] addInfoArray = service.getAddress().split(":");
 String serverAddress = addInfoArray[0];
 String serverPort = addInfoArray[1];
 SendHandler sendHandler = new SendHandler(data);
 byte[] respData;
 // クライアントを構成する
 EventLoopGroup group = new NioEventLoopGroup();
 try {
 Bootstrap b = new Bootstrap();
 b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
 .handler(new ChannelInitializer<SocketChannel>() {
 @Override
 public void initChannel(SocketChannel ch) {
 ChannelPipeline p = ch.pipeline();
 p.addLast(sendHandler);
 }
 });
 // クライアント接続を開始する
 b.connect(serverAddress, Integer.parseInt(serverPort)).sync();
 respData = (byte[]) sendHandler.rspData();
 logger.info("SendRequest get reply: {}", respData);
 } finally {
 // スレッド・グループ・リソースを解放する
 group.shutdownGracefully();
 }
 return respData;
 }
}
/**
 * Netty 受信処理の詳細を定義する処理クラスを送信する
 *
 * 
 * @since 1.0.0
 */
public class SendHandler extends ChannelInboundHandlerAdapter {
 private static Logger logger = LoggerFactory.getLogger(SendHandler.class);
 private CountDownLatch cdl;
 private Object readMsg = null;
 private byte[] data;
 public SendHandler(byte[] data) {
 cdl = new CountDownLatch(1);
 this.data = data;
 }
 /**
 * サーバーへの接続に成功したら、リクエスト・データを送信する。
 *
 * @param ctx チャネル・コンテキスト
 */
 @Override
 public void channelActive(ChannelHandlerContext ctx) {
 logger.info("Successful connection to server {}", ctx);
 ByteBuf reqBuf = Unpooled.buffer(data.length);
 reqBuf.writeBytes(data);
 logger.info("Client sends message {}", reqBuf);
 ctx.writeAndFlush(reqBuf);
 }
 /**
 * データを読み取り、データが読み取られたら CD ロックを解放する。
 *
 * @param ctx  
 * @param msg ByteBuf
 */
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) {
 logger.info("Client reads message: {}", msg);
 ByteBuf msgBuf = (ByteBuf) msg;
 byte[] resp = new byte[msgBuf.readableBytes()];
 msgBuf.readBytes(resp);
 readMsg = resp;
 cdl.countDown();
 }
 /**
 * データの読み取りが完了するまで待つ
 *
 * @return レスポンス・データ
 * @throws InterruptedException  
 */
 public Object rspData() throws InterruptedException {
 cdl.await();
 return readMsg;
 }
 @Override
 public void channelReadComplete(ChannelHandlerContext ctx) {
 ctx.flush();
 }
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
 // Close the connection when an exception is raised.
 cause.printStackTrace();
 logger.error("Exception occurred {}", cause.getMessage());
 ctx.close();
 }
}

ここではNettyを使ってネットワーク・リクエスト・クライアントを実装していますが、もちろんMinaを使うこともできます。ネットワーク・リクエスト・クライアントは、リモート・サーバーに接続して、グループ化されたリクエスト・データをサーバーに送信し、サーバーが処理した後のレスポンス・データをサーバーからクライアントに返します。

/**
 * クライアント・プロキシ・ファクトリ:リモート・サービス・プロキシ・クラスの作成に使用する。
 * リクエストのグループ化、リクエストの送信、レスポンスのグループ化などの操作をカプセル化する。
 *
 * 
 * @since 1.0.0
 */
public class ClientProxyFactory {
 private ServiceDiscoverer serviceDiscoverer;
 private Map<String, MessageProtocol> supportMessageProtocols;
 private NetClient netClient;
 private Map<Class<?>, Object> objectCache = new HashMap<>();
 /**
 * Java ダイナミック・プロキシでサービス・プロキシ・クラスを取得する
 *
 * @param clazz プロキシされるクラス クラス
 * @param <T>  
 * @return サービス・プロキシ・クラス
 */
 @SuppressWarnings("unchecked")
 public <T> T getProxy(Class<T> clazz) {
 return (T) this.objectCache.computeIfAbsent(clazz,
 cls -> newProxyInstance(cls.getClassLoader(), new Class<?>[]{cls}, new ClientInvocationHandler(cls)));
 }
 // getter setter ...
 /**
 * クライアント・サービス・プロキシ・クラスの呼び出し関数の詳細実装
 */
 private class ClientInvocationHandler implements InvocationHandler {
 private Class<?> clazz;
 private Random random = new Random();
 public ClientInvocationHandler(Class<?> clazz) {
 super();
 this.clazz = clazz;
 }
 @Override
 public Object invoke(Object proxy, Method method, Object[] args) throws Exception {
 if (method.getName().equals("toString")) {
 return proxy.getClass().toString();
 }
 if (method.getName().equals("hashCode")) {
 return 0;
 }
 // サービス情報を取得する
 String serviceName = this.clazz.getName();
 List<Service> services = serviceDiscoverer.getServices(serviceName);
 if (services == null || services.isEmpty()) {
 throw new LeisureException("No provider available!");
 }
 // サービス・プロバイダーをランダムに選択する
 Service service = services.get(random.nextInt(services.size()));
 // リクエスト・オブジェクトを構築する
 LeisureRequest req = new LeisureRequest();
 req.setServiceName(service.getName());
 req.setMethod(method.getName());
 req.setParameterTypes(method.getParameterTypes());
 req.setParameters(args);
 // プロトコル層のグループ化
 // メソッドに対応するプロトコルを取得する
 MessageProtocol protocol = supportMessageProtocols.get(service.getProtocol());
 // グループ・リクエストをコンパイルする
 byte[] data = protocol.marshallingRequest(req);
 // ネットワーク・レイヤーを呼び出してリクエストを送信する
 byte[] repData = netClient.sendRequest(data, service);
 // 5応答メッセージのグループ化を解除する
 LeisureResponse rsp = protocol.unmarshallingResponse(repData);
 // 結果処理
 if (rsp.getException() != null) {
 throw rsp.getException();
 }
 return rsp.getReturnValue();
 }
 }
}

サービス・プロキシ・クラスはClientProxyFactoryクラスによって生成され、プロキシ・メソッドはJavaベースのダイナミック・プロキシです。処理クラスClientInvocationHandlerのinvoke関数では、サービスの取得、サービス・プロバイダの選択、リクエスト・オブジェクトの構築、リクエスト・オブジェクトのグループ化、ネットワーク・リクエスト・クライアントからのリクエスト送信、レスポンス・メッセージのグループ解除、例外処理など、一連の処理が定義されています。

/**
 * メッセージ・プロトコル、グループ化リクエスト、グループ化解除リクエスト、グループ化レスポンス、グループ化解除レスポンス仕様の定義
 *
 * 
 * @since 1.0.0
 */
public interface MessageProtocol {
 /**
 * グループ・リクエストをコンパイルする
 *
 * @param req 情報を要求する
 * @return リクエスト・バイト配列
 * @throws Exception コンパイル要求の例外
 */
 byte[] marshallingRequest(LeisureRequest req) throws Exception;
 /**
 * リクエストのグループ化を解除する
 *
 * @param data リクエスト・バイト配列
 * @return 情報を要求する
 * @throws Exception グループ化されていないリクエスト例外
 */
 LeisureRequest unmarshallingRequest(byte[] data) throws Exception;
 /**
 * コンパイル・レスポンス
 *
 * @param rsp レスポンス・メッセージ
 * @return レスポンス・バイト配列
 * @throws Exception コンパイル・レスポンス例外
 */
 byte[] marshallingResponse(LeisureResponse rsp) throws Exception;
 /**
 * レスポンスのグループ化を解除する
 *
 * @param data レスポンス・バイト配列
 * @return レスポンス・メッセージ
 * @throws Exception 応答例外のグループ化を解除する
 */
 LeisureResponse unmarshallingResponse(byte[] data) throws Exception;
}
/**
 * Javaシリアル化メッセージ・プロトコル
 *
 * 
 * @since 1.0.0
 */
public class JavaSerializeMessageProtocol implements MessageProtocol {
 private byte[] serialize(Object obj) throws Exception {
 ByteArrayOutputStream bout = new ByteArrayOutputStream();
 ObjectOutputStream out = new ObjectOutputStream(bout);
 out.writeObject(obj);
 return bout.toByteArray();
 }
 @Override
 public byte[] marshallingRequest(LeisureRequest req) throws Exception {
 return this.serialize(req);
 }
 @Override
 public LeisureRequest unmarshallingRequest(byte[] data) throws Exception {
 ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data));
 return (LeisureRequest) in.readObject();
 }
 @Override
 public byte[] marshallingResponse(LeisureResponse rsp) throws Exception {
 return this.serialize(rsp);
 }
 @Override
 public LeisureResponse unmarshallingResponse(byte[] data) throws Exception {
 ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data));
 return (LeisureResponse) in.readObject();
 }
}

メッセージングプロトコルは、主にどのようにクライアントのグループ化要求、グループ化応答、サーバー側のグループ化要求、これらの4つの運用仕様のグループ化応答を解除する方法を定義します。この記事では、Javaのシリアライズとデシリアライズの実装を提供し、興味のある読者は、他のシリアライズ技術に基づいて、他のメッセージプロトコルを実装することができます。

まずサーバー側のコード構造を見てみましょう:

  • @Serviceサービスの登録

以下はその実装コードです:

/**
 * このアノテーションでタグ付けされたサービスは、リモート RPC アクセス機能を提供する。
 *
 * 
 * @since 1.0.0
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface Service {
 String value() default "";
}
/**
 * このアノテーションは、リモート・サービスを注入するために使用する。
 *
 * 
 * @since 1.0.0
 */
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface InjectService {
}
/**
 * サービス・レジストラ、サービス登録仕様を定義する
 *
 * 
 * @since 1.0.0
 */
public interface ServiceRegister {
 void register(ServiceObject so) throws Exception;
 ServiceObject getServiceObject(String name) throws Exception;
}
/**
 * デフォルトのサービス・レジストラ
 *
 * 
 * @since 1.0.0
 */
public class DefaultServiceRegister implements ServiceRegister {
 private Map<String, ServiceObject> serviceMap = new HashMap<>();
 protected String protocol;
 protected Integer port;
 @Override
 public void register(ServiceObject so) throws Exception {
 if (so == null) {
 throw new IllegalArgumentException("Parameter cannot be empty.");
 }
 this.serviceMap.put(so.getName(), so);
 }
 @Override
 public ServiceObject getServiceObject(String name) {
 return this.serviceMap.get(name);
 }
}
/**
 * Zookeeper例えば、@InjectServiceアノテーションを使用してリモート・サービスを注入する。
 *
 * 
 * @since 1.0.0
 */
public class ZookeeperExportServiceRegister extends DefaultServiceRegister implements ServiceRegister {
 /**
 * Zk 
 */
 private ZkClient client;
 public ZookeeperExportServiceRegister(String zkAddress, Integer port, String protocol) {
 client = new ZkClient(zkAddress);
 client.setZkSerializer(new ZookeeperSerializer());
 this.port = port;
 this.protocol = protocol;
 }
 /**
 * サービス登録
 *
 * @param so サービス・ホルダー
 * @throws Exception 登録例外
 */
 @Override
 public void register(ServiceObject so) throws Exception {
 super.register(so);
 Service service = new Service();
 String host = InetAddress.getLocalHost().getHostAddress();
 String address = host + ":" + port;
 service.setAddress(address);
 service.setName(so.getClazz().getName());
 service.setProtocol(protocol);
 this.exportService(service);
 }
 /**
 * サービスの公開
 *
 * @param serviceResource 公開するサービス情報
 */
 private void exportService(Service serviceResource) {
 String serviceName = serviceResource.getName();
 String uri = JSON.toJSONString(serviceResource);
 try {
 uri = URLEncoder.encode(uri, UTF_8);
 } catch (UnsupportedEncodingException e) {
 e.printStackTrace();
 }
 String servicePath = ZK_SERVICE_PATH + PATH_DELIMITER + serviceName + "/service";
 if (!client.exists(servicePath)) {
 client.createPersistent(servicePath, true);
 }
 String uriPath = servicePath + PATH_DELIMITER + uri;
 if (client.exists(uriPath)) {
 client.delete(uriPath);
 }
 client.createEphemeral(uriPath);
 }
}

このプロセスは、実際にはあまり言うことは、指定されたServiceObjectオブジェクトがシリアライズされ、クライアントが発見するためにZKに保存されます。同時に、サービスオブジェクトがキャッシュされます、クライアントがサービスを呼び出すときに、指定されたサービスのキャッシュされたServiceObjectオブジェクトの反射を介して、メソッドを呼び出します。

/**
 * RPCサービス側の抽象クラス
 *
 * 
 * @since 1.0.0
 */
public abstract class RpcServer {
 /**
 * サービス・ポート
 */
 protected int port;
 /**
 * サービス・プロトコル
 */
 protected String protocol;
 /**
 * リクエスト・ハンドラ
 */
 protected RequestHandler handler;
 public RpcServer(int port, String protocol, RequestHandler handler) {
 super();
 this.port = port;
 this.protocol = protocol;
 this.handler = handler;
 }
 /**
 * サービスを有効にする
 */
 public abstract void start();
 /**
 * サービスを停止する
 */
 public abstract void stop();
	// getter setter ...
}
/**
 * Netty RPCサーバーサイドで、Nettyネットワークサービスのオン/オフ機能を提供する
 *
 * 
 * @since 1.0.0
 */
public class NettyRpcServer extends RpcServer {
 private static Logger logger = LoggerFactory.getLogger(NettyRpcServer.class);
 private Channel channel;
 public NettyRpcServer(int port, String protocol, RequestHandler handler) {
 super(port, protocol, handler);
 }
 @Override
 public void start() {
 // サーバーを構成する
 EventLoopGroup bossGroup = new NioEventLoopGroup(1);
 EventLoopGroup workerGroup = new NioEventLoopGroup();
 try {
 ServerBootstrap b = new ServerBootstrap();
 b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)
 .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
 @Override
 public void initChannel(SocketChannel ch) {
 ChannelPipeline p = ch.pipeline();
 p.addLast(new ChannelRequestHandler());
 }
 });
 // サービスを開始する
 ChannelFuture f = b.bind(port).sync();
 logger.info("Server started successfully.");
 channel = f.channel();
 // サービス・チャネルが閉じるのを待つ
 f.channel().closeFuture().sync();
 } catch (Exception e) {
 e.printStackTrace();
 } finally {
 // スレッド・グループ・リソースを解放する
 bossGroup.shutdownGracefully();
 workerGroup.shutdownGracefully();
 }
 }
 @Override
 public void stop() {
 this.channel.close();
 }
 private class ChannelRequestHandler extends ChannelInboundHandlerAdapter {
 @Override
 public void channelActive(ChannelHandlerContext ctx) {
 logger.info("Channel active {}", ctx);
 }
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 logger.info("The server receives a message: {}", msg);
 ByteBuf msgBuf = (ByteBuf) msg;
 byte[] req = new byte[msgBuf.readableBytes()];
 msgBuf.readBytes(req);
 byte[] res = handler.handleRequest(req);
 logger.info("Send response {}", msg);
 ByteBuf respBuf = Unpooled.buffer(res.length);
 respBuf.writeBytes(res);
 ctx.write(respBuf);
 }
 @Override
 public void channelReadComplete(ChannelHandlerContext ctx) {
 ctx.flush();
 }
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
 // Close the connection when an exception is raised.
 cause.printStackTrace();
 logger.error("Exception occurred {}", cause.getMessage());
 ctx.close();
 }
 }
}
/**
 * リクエスト・ハンドラ。リクエストのグループ解除、レスポンスのグループ化などの操作を提供する。
 *
 * 
 * @since 1.0.0
 */
public class RequestHandler {
 private MessageProtocol protocol;
 private ServiceRegister serviceRegister;
 public RequestHandler(MessageProtocol protocol, ServiceRegister serviceRegister) {
 super();
 this.protocol = protocol;
 this.serviceRegister = serviceRegister;
 }
 public byte[] handleRequest(byte[] data) throws Exception {
 // メッセージのグループ化を解除する
 LeisureRequest req = this.protocol.unmarshallingRequest(data);
 // サービス・オブジェクトを検索する
 ServiceObject so = this.serviceRegister.getServiceObject(req.getServiceName());
 LeisureResponse rsp = null;
 if (so == null) {
 rsp = new LeisureResponse(LeisureStatus.NOT_FOUND);
 } else {
 // 対応するプロシージャー・メソッドを反射的に呼び出す
 try {
 Method m = so.getClazz().getMethod(req.getMethod(), req.getParameterTypes());
 Object returnValue = m.invoke(so.getObj(), req.getParameters());
 rsp = new LeisureResponse(LeisureStatus.SUCCESS);
 rsp.setReturnValue(returnValue);
 } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException
 | InvocationTargetException e) {
 rsp = new LeisureResponse(LeisureStatus.ERROR);
 rsp.setException(e);
 }
 }
 // 応答メッセージをコンパイルする
 return this.protocol.marshallingResponse(rsp);
 }
	// getter setter ...
}

ウェブサービスは、サービスを開始する詳細と、クライアントからのリクエストを処理する方法を定義します。

/**
 * RPC例えば、@InjectServiceアノテーションを使用してリモート・サービスを注入する。
 *
 * 
 * @since 1.0.0
 */
public class DefaultRpcProcessor implements ApplicationListener<ContextRefreshedEvent> {
 @Resource
 private ClientProxyFactory clientProxyFactory;
 @Resource
 private ServiceRegister serviceRegister;
 @Resource
 private RpcServer rpcServer;
 @Override
 public void onApplicationEvent(ContextRefreshedEvent event) {
 if (Objects.isNull(event.getApplicationContext().getParent())) {
 ApplicationContext context = event.getApplicationContext();
 // サービスを有効にする
 startServer(context);
 // InjectService
 injectService(context);
 }
 }
 private void startServer(ApplicationContext context) {
 Map<String, Object> beans = context.getBeansWithAnnotation(Service.class);
 if (beans.size() != 0) {
 boolean startServerFlag = true;
 for (Object obj : beans.values()) {
 try {
 Class<?> clazz = obj.getClass();
 Class<?>[] interfaces = clazz.getInterfaces();
 ServiceObject so;
 if (interfaces.length != 1) {
 Service service = clazz.getAnnotation(Service.class);
 String value = service.value();
 if (value.equals("")) {
 startServerFlag = false;
 throw new UnsupportedOperationException("The exposed interface is not specific with '" + obj.getClass().getName() + "'");
 }
 so = new ServiceObject(value, Class.forName(value), obj);
 } else {
 Class<?> superClass = interfaces[0];
 so = new ServiceObject(superClass.getName(), superClass, obj);
 }
 serviceRegister.register(so);
 } catch (Exception e) {
 e.printStackTrace();
 }
 }
 if (startServerFlag) {
 rpcServer.start();
 }
 }
 }
 private void injectService(ApplicationContext context) {
 String[] names = context.getBeanDefinitionNames();
 for (String name : names) {
 Class<?> clazz = context.getType(name);
 if (Objects.isNull(clazz)) continue;
 Field[] fields = clazz.getDeclaredFields();
 for (Field field : fields) {
 InjectService injectLeisure = field.getAnnotation(InjectService.class);
 if (Objects.isNull(injectLeisure)) continue;
 Class<?> fieldClass = field.getType();
 Object object = context.getBean(name);
 field.setAccessible(true);
 try {
 field.set(object, clientProxyFactory.getProxy(fieldClass));
 } catch (IllegalAccessException e) {
 e.printStackTrace();
 }
 }
 }
 }
}

RPC フレームワークの使用

フレームワークの非常に重要な特徴は、使い方が簡単であることです。フレームワークを使うために必要なのは、たった1つの条件と4つのステップだけです。

レジストリとしてZookeeperを用意する必要がありますが、1ノードで十分です。

Maven 依存関係を導入します:

<dependency>
 <groupId>wang.leisure</groupId>
 <artifactId>leisure-rpc-spring-boot-starter</artifactId>
 <version>1.0.0-SNAPSHOT</version>
</dependency>

依存関係の取得方法がわからない場合は、ソースコードをダウンロードした後にプロジェクトディレクトリに移動し、mvn installコマンドを実行してローカルリポジトリにmaven依存関係を生成してください。

レジストリのアドレスは、プロジェクトの設定ファイルで設定します:

leisure.rpc.register-address=192.168.199.241:2181

リモート・サービスには @Service アノテーションを使用します:

import wang.leisure.rpc.annotation.Service;
@Service
public class UserServiceImpl implements UserService {
 @Override
 public ApiResult<User> getUser(Long id) {
 User user = getFromDbOrCache(id);
 return ApiResult.success(user);
 }
 private User getFromDbOrCache(Long id) {
 return new User(id, "プログラマー・マクドナルド", 1, "https://.wang");
 }
}
@RestController
@RequestMapping("/index/")
public class IndexController {
 @InjectService
 private UserService userService;
 /**
 * ユーザー情報を取得する
 * "http://localhost:8080"/index/getUser?id=1
 *
 * @param id ユーザーID
 * @return ユーザー情報
 */
 @GetMapping("getUser")
 public ApiResult<User> getUser(Long id) {
 return userService.getUser(id);
 }
}



Read next

これらの手書きのコードだろうか?ジュニア

"外観:"\n"味:カニ豆腐"\n「調理時間: 5分\nこの記事はフロントエンドの社員食堂のGithub倉庫にあります!

May 7, 2020 · 5 min read