ソケットは長いリンクを確立し、双方がハンドシェイクした後、一方がドロップし続けるとチャネルは常に存在します。この記事のビジネスシナリオは、温度と湿度センサーがゲートウェイを介してサーバーに温度と湿度のデータを送信し、サーバーがメッセージを受信して応答することをシミュレートします。
プロジェクトはspringboot2.1.3ビルドに基づいており、主にradishとcommons-codecを補助的に使用しています。
<!--バイト変換-->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.12</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.20</version>
<scope>provided</scope>
</dependency>
具体的なアイデア
まず、サーバー側のソケットを起動し、SocketServerを作成します。
@Slf4j
@Data
@Component
@NoArgsConstructor
public class SocketServer {
private Integer port = 9090;
private boolean started;
private ServerSocket serverSocket;
// ソケットスレッドリンクオブジェクトの重複生成によるリソースの浪費を防ぐ
private ExecutorService executorService = Executors.newCachedThreadPool();
public void start(){
start(null);
}
public void start(Integer port){
log.info("port: {}, {}", this.port, port);
try {
serverSocket = new ServerSocket(port == null ? this.port : port);
started = true;
log.info("Socketサービスが開始され、ポートを占有している: {}", serverSocket.getLocalPort());
} catch (IOException e) {
log.error("ポートの競合、例外メッセージ:{}", e);
System.exit(0);
}
while (true){
try {
// ソケットリッスンを有効にする
Socket socket = serverSocket.accept();
ClientSocket register = register(socket);
// ここで、ソケットオブジェクトの生成スレッドが重複しているかどうかを判断する。
if (register != null){
executorService.submit(register);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
次に、サーバにリンクされたソケットSocketPoolを格納するグローバルMapを作成します。
public class SocketPool {
private static final ConcurrentHashMap<String, ClientSocket> ONLINE_SOCKET_MAP = new ConcurrentHashMap<>();
public static void add(ClientSocket clientSocket){
if (clientSocket != null && !clientSocket.getKey().isEmpty())
ONLINE_SOCKET_MAP.put(clientSocket.getKey(), clientSocket);
}
public static void remove(String key){
if (!key.isEmpty())
ONLINE_SOCKET_MAP.remove(key);
}
}
第三に、各リンクがClientSocketオブジェクトであるソケットオブジェクトClientSocketをカプセル化します。このタイプの操作では、クライアントの効果を聞くために受信することができます。また、その後のビジネスニーズのためにデータベースに受信したデータを保存することができます。
@Slf4j
@Data
public class ClientSocket implements Runnable {
private Socket socket;
private DataInputStream inputStream;
private DataOutputStream outputStream;
private String key;
private String message;
@Override
public void run() {
while (true){
try {
onMessage(this);
log.info(LocalDateTime.now()+"電流デバイス:"+this.key+" 受信データ: <<<<<<" + this.message);
} catch (Exception e) {
e.printStackTrace();
}
if (isSocketClosed(this)){
log.info("クライアントはクローズされ、そのKey値は次のようになる:{}", this.getKey());
//対応するサーバー側リソースを閉じる
閉じる;
break;
}
}
}
}
第四に、ソケットタスクプロセッサSocketHandlerを作成し、データの読み取りと書き込み、リンクの破棄、その他のメソッドをカプセル化します。
@Slf4j
public class SocketHandler {
/**
* 接続されたソケットをソケットプールに登録する
* @param socket
* @return
*/
public static ClientSocket register(Socket socket){
ClientSocket clientSocket = new ClientSocket();
clientSocket.setSocket(socket);
try {
clientSocket.setInputStream(new DataInputStream(socket.getInputStream()));
clientSocket.setOutputStream(new DataOutputStream(socket.getOutputStream()));
byte[] bytes = new byte[1024];
clientSocket.getInputStream().read(bytes);
clientSocket.setKey(new String(bytes, "utf-8"));
add(clientSocket);
return clientSocket;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* 指定したクライアントにメッセージを送信する
* @param clientSocket
* @param message
*/
public static void sendMessage(ClientSocket clientSocket, String message){
try {
log.info("クライアントにメッセージを送る : >>>>>" + message);
clientSocket.getOutputStream().write(message.getBytes("utf-8"));
//clientSocket.getOutputStream().writeUTF(message);
} catch (IOException e) {
log.error("アノマリーにメッセージを送る:{}", e);
close(clientSocket);
}
}
/**
* 指定したクライアントのアップロード情報を取得する
* @param clientSocket
* @return
*/
public static void onMessage(ClientSocket clientSocket){
byte[] keyByte = new byte[1024];
byte[] msgByte = new byte[1];
try {
// 最初にシーケンス番号を送信する
if(StringUtils.isEmpty(clientSocket.getKey())) {
clientSocket.getInputStream().read(keyByte);
clientSocket.setKey(new String(keyByte, "UTF-8"));
//return clientSocket.getKey();
// 後でデータを送信する
}else {
String info = "";
while (true) {
if (clientSocket.getInputStream().available() > 0) {
clientSocket.getInputStream().read(msgByte);
String tempStr = HexEcodeUtil.ByteArrayToHexStr(msgByte);
info += tempStr;
//が読み込まれた
if (clientSocket.getInputStream().available() == 0) {
//リセットしないと、受信するたびにデータが加算されてしまう。
clientSocket.setMessage(info);
break;
}
}
}
//return clientSocket.getMessage();
}
} catch (IOException e) {
e.printStackTrace();
close(clientSocket);
}
//return null;
}
/**
* ソケットリソース回復を指定する
* @param clientSocket
*/
public static void close(ClientSocket clientSocket){
log.info("資源回収を行う");
if (clientSocket != null){
log.info("のキーでソケット関連リソースの再利用を開始する。{}", clientSocket.getKey());
remove(clientSocket.getKey());
Socket socket = clientSocket.getSocket();
try {
socket.shutdownInput();
socket.shutdownOutput();
} catch (IOException e) {
log.error("入出力ストリーム例外を閉じる。{}", e);
}finally {
try {
socket.close();
} catch (IOException e) {
log.error("ソケットを閉じる例外{}", e);
}
}
}
}
/**
* パケットを送信してデータ接続の状態を判断する
* @param clientSocket
* @return
*/
public static boolean isSocketClosed(ClientSocket clientSocket){
try {
clientSocket.getSocket().sendUrgentData(1);
return false;
} catch (IOException e) {
return true;
}
}
}
最後のステップは、スプリングコンテナの起動クラスと一緒にソケットを起動することです。
public static void main(String[] args) {
ApplicationContext run = SpringApplication.run(CollectionApplication.class, args);
run.getBean(SocketServer.class).start();
}
このデモは基本的に完了し、一部の人々は、ゲートウェイデバイスのプッシュデータが16進数コードであるため、omons-codecをインポートする前に何を使用するか奇妙になりますが、これはピットであり、読み取りデータの文字列は、トランスコードは文字化けされ、デバイスのシリアル番号は文字化けされません....一度悩まされた私は、方法がわからないし、データの応答をチェックし、エンコーディングの問題である可能性があり、その後、トランスコードutilを(パケットが16進数コードでない場合はomons -コーデックパッケージを使用していない)を書きました
以下はその利用方法です。
/**
* 16システム整数コンバーター
*/
@Slf4j
public class HexEcodeUtil {
//1610進数文字セット
public static final String HEXMAXSTRING = "0123456789ABCDEF";
public static final String HEXMINSTRING = "0123456789abcdef";
/**
* byte[]16進数に変換する
*
* @param byteArray
*/
public static String ByteArrayToHexStr(byte[] byteArray){
if (byteArray == null)
return null;
char[] hexArray = HEXMAXSTRING.toCharArray();
char[] hexChars = new char[byteArray.length * 2];
for (int i = 0; i < byteArray.length; i++){
int temp = byteArray[i] & 0xFF;
hexChars[i * 2] = hexArray[temp >>> 4];
hexChars[i * 2 + 1] = hexArray[temp & 0x0F];
}
return new String(hexChars);
}
/**
* Str16進数に変換する
*
* @param str
* @return
*/
public static String StrToHexStr(String str) {
//デフォルトのエンコーディングに基づいてバイト配列を取得する
byte[] bytes = str.getBytes();
StringBuilder stringBuilder = new StringBuilder(bytes.length * 2);
//バイト配列の各バイトを2ビットの16進整数に分割する。
for (int i = 0; i < bytes.length; i++){
stringBuilder.append("0x");
stringBuilder.append(HEXMAXSTRING.charAt((bytes[i] & 0xf0) >> 4));
stringBuilder.append(HEXMAXSTRING.charAt((bytes[i] & 0x0f) >> 0));
//最後のコンマを取り除く
if (i != bytes.length - 1)
stringBuilder.append(",");
}
return stringBuilder.toString();
}
/**
* 16バイナリからバイトへ[]
*
* @param hexStr 空白、0x、カンマを除いた16進数の文字列。:06EEF7F1
* @return
*/
public static byte[] HexStrToByteArray(String hexStr){
byte[] byteArray = new byte[hexStr.length() / 2];
for (int i = 0; i < byteArray.length; i++){
String subStr = hexStr.substring(2 * i, 2 * i + 2);
byteArray[i] = ((byte) Integer.parseInt(subStr, 16));
}
return byteArray;
}
/**
* 温湿度データ変換
*
*/
public static Map<String , String> HexToRead(String info){
HashMap<String, String> hashMap = new HashMap<>();
return hashMap;
}
/**
* 16進文字列をバイト配列に変換する
* @param hexItr 16丸付き文字列
* @return
*/
public static byte[] hexItr2Arr(String hexItr) {
try {
return Hex.decodeHex(hexItr);
} catch (DecoderException e) {
log.info("16丸めた文字列をバイト例外に変換する。!");
e.printStackTrace();
}
return null;
}
public static void main(String[] args) {
//byte[] bytes = hexItr2Arr("454E383739465134563249393936394F");
byte[] bytes = hexItr2Arr("010300000002C40B010304012202585B5F");
try {
String s = new String(bytes, "UTF-8");
log.info(s);
} catch (Exception e) {
e.printStackTrace();
}
}
}