blog

データ転送、ストレージ、プレゼンテーション、EMQ X + TDengineがMQTT IoTデータ可視化プラットフォームを構築する

IoTデータ収集は、大量のデバイスアクセス、大量の時系列データ伝送を含み、EMQXメッセージングミドルウェアとTDengineビッグデータプラットフォームを組み合わせた技術スタックは、シナリオ内の大量...

Aug 14, 2020 · 13 min. read
シェア

IoTデータ収集は、大量のデバイスアクセスと大量の時系列データ伝送を伴いますが、EMQ XメッセージングミドルウェアとTDengineビッグデータプラットフォームを組み合わせた技術スタックは、このシナリオにおける大量の時系列モニタリングデータの伝送、保存、計算が十分に可能です。

データがデータベースに格納された後、それは多くの場合、データの監視、指標の統計やその他のビジネスニーズを達成するために、ルールに従ってデータをカウントし、表示するために、データ可視化システムなどの他の方法を使用する必要がありますデータの価値を最大限に発揮するために、オープンソースソフトウェアGrafanaとTDengineはすぐにIoTデータ可視化プラットフォームを構築することができます。

上記の完全なソリューションはコード開発を必要とせず、関係する製品はオープンソースソフトウェア、エンタープライズサービス、クラウドSaaSサービスのさまざまなレベルの配信モードを提供することができ、プロジェクトの要件に応じて無料またはエンタープライズバージョンのプライベートランディングとクラウド展開を実現することができます。

ソリューション

EMQ X

EMQ X 、並行性の高いErlang/OTP言語プラットフォームをベースとしたオープンソースのMQTTメッセージングサーバで、数百万の接続と分散クラスタアーキテクチャ、パブリッシュ・サブスクライブモデルをサポートしています。 EMQ Xには、すぐに使える機能が多数組み込まれており、オープンソース版のEMQ X Brokerとエンタープライズ版のEMQ X Enterpriseは 、ルールエンジンを通じてTDengineへのデバイスメッセージの保存をサポートしています。TDengine

TDengine

TDengineは、モノのインターネット(IoT)、自動車のインターネット(IoV)、インダストリアル・インターネット(IIoT)、IT運用保守(O&M)向けに設計・最適化されたビッグデータプラットフォームです。コアとなる10倍高速な時系列データベース機能に加え、キャッシング、データサブスクリプション、ストリーミングコンピューティングなどの機能を提供し、R&Dや運用保守の複雑さを最小限に抑え、クラスタ機能を含むコアコードはすべてオープンソースです。

TDengineには、Community、Enterprise、Cloudの各エディションがあります。 インストール/使用方法のチュートリアルについては、 TDengine ドキュメント 参照してください。

Grafana

Grafanaは、さまざまな種類のデータソースからデータをクエリ処理し、視覚的に表示することができるクロスプラットフォーム、オープンソースのメトリクス分析および可視化ツールです。クライアントサイドのチャートを迅速かつ柔軟に作成することができ、パネルのプラグインは、メトリックやログを視覚化するための多くの異なる方法を持っています。公式ライブラリには、ヒートマップ、折れ線グラフ、チャート、および他の表示方法などのダッシュボードのプラグインが豊富にあります。Elasticsearch、CloudWatch、KairosDBなどのデータソースをサポートし、データアイテムの独立/混合クエリ表示をサポートします。カスタムアラートルールを作成し、他のメッセージ処理サービスやコンポーネントに通知することができます。

ビジネスシナリオ

本論文では、一定のデータを持つ既存の環境データ収集ポイントが存在し、収集ポイントからのすべてのデータがMQTTプロトコルを介して収集プラットフォームに送信されると仮定して、IoT環境データ収集シナリオをシミュレートし、以下のようにテーマを設計します:

sensor/data

センサーから送信されるデータ形式はJSONで、データには温度、湿度、騒音量、PM10、PM2.5、二酸化硫黄、二酸化窒素、一酸化炭素、センサーID、地域、センサーが収集した収集時間が含まれます。

{
 "temperature": 30,
 "humidity" : 20,
 "volume": 44.5,
 "PM10": 23,
 "pm25": 61,
 "SO2": 14,
 "NO2": 4,
 "CO": 5,
 "id": "10-c6-1f-1a-1f-47",
 "area": 1,
 "ts": 170
}

その後いつでもデータを見ることができるリアルタイム・ストレージの必要性は、現在、次のような要件を提示しています:

  • 各デバイスは5秒ごとにデータを報告し、データベースはその後の回顧分析のために各データを保存する必要があります;
  • 可視化システムにより、任意のエリアおよび時間間隔における平均値、最大値、最小値などの指標データを表示します。

環境の準備

この記事で使用しているコンポーネントは、ダウンロードとインストールを簡単にするためにいくつかの設定変更が必要なEMQ Xを除いて、すべてDockerイメージを持っています。

インストールパッケージのリソースやチュートリアルについては、それぞれのウェブサイトを参照してください:

EMQ Xのインストール

EMQ X を初めてお使いになる場合は、 EMQ X ご覧になることをお勧めします!

EMQ Xオープンソース版の最新バージョンはv4.1.2で、zipパッケージのダウンロード手順は以下の通りです:

## ダウンロードしたパッケージを解凍する
unzip emqx-macosx-v4.1.1.zip
cd emqx
## EMQ Xをコンソールモードで起動すると、デバッグが容易になる。
./bin/emqx console

起動に成功したら、 にアクセスして EMQ X Management Console Dashboard にアクセスし、デフォルトのユーザー名とパスワード adminpublic を使用して初期ログインを完了します。

TDengineのインストール

テスト目的であれば、Docker経由でインストールすることもできますし、インストーラー・パッケージを使ってインストールすることもできます:

## コンテナをプルして起動する
docker run -d --name tdengine -p : tdengine/tdengine:latest
## 起動後にコンテナの状態を確認する
docker ps -a

Grafana

以下のコマンドを使用して、Docker経由でGrafanaをインストールして起動します:

docker run -d --name=grafana -p  grafana/grafana

起動に成功したら、 にアクセスしてGrafanaの可視化パネルにアクセスします。adminadminのデフォルトのユーザー名とパスワードを使用して初期ログインを完了し、プロンプトに従ってパスワードを変更し、新しいパスワードを使用してメインインターフェイスにログインします。

TDengine にデータを保存するための EMQ X の設定

TDengine データベースとデータテーブルの作成

TDengineのDockerコンテナに入ります:

docker exec -it tdengine bash

test "データベースを作成します。

taos
create database test;

TDengineのデータ構造とSQLコマンドについては、 TAOS SQL参照してください:

use test;
CREATE TABLE sensor_data (
 ts timestamp,
 temperature float,
 humidity float,
 volume float,
 PM10 float,
 pm25 float,
 SO2 float,
 NO2 float,
 CO float,
 sensor_id NCHAR(255), 
 area TINYINT,
 coll_time timestamp
);

EMQ Xルールエンジンの設定

EMQ X Dashboared を開き、 Rule Engine -> Rules ページに進み、 Create ボタンをクリックして作成ページに入ります。

ルールSQL

ルール SQL は、EMQ X メッセージとイベントのフィルタリングに使用されます。 以下の SQL は、センサー/データトピックからのペイロードデータのフィルタリングを表します:

SELECT
 payload
FROM
 "sensor/data"

SQLのテスト機能を使ってテストデータを入力し、フィルタリングの結果をテストします。 テスト結果が以下のように出力されれば、SQLが正しく記述されていることを示します:

{
 "payload": "{"temperature":30,"humidity":20,"volume":44.5,"PM10":23,"pm2.5":61,"SO2":14,"NO2":4,"CO":5,"id":"10-c6-1f-1a-1f-47","area":1,"ts":170}"
}

レスポンスアクション

さまざまな種類のプラットフォームの開発をサポートするために、TDengineはREST設計標準に準拠したAPIを提供し、 RESTful Connector TDengineに接続する最も簡単な方法を提供します。

EMQ X Enterpriseの 次期リリース4.1.1では、ネイティブでより高性能な書き込みコネクタが提供される予定です。

関連リソースとメッセージ・コンテンツ・テンプレートです。

  • 関連リソース: HTTPサーバの設定情報、TDengine用RESTful Connectorはこちらです。

  • メッセージの内容テンプレート: 以下は、データを含むINSERT SQLです。SQLでデータベース名を指定し、文字型をシングルクォートで囲む必要があることに注意してください:

制作プロセス

Response Actions(レスポンス・アクション)」の下の「Add(追加)」ボタンをクリックし、ポップアップ・ボックスで「Send Data to Web Service(データを Web サービスに送信)」を選択し、「 New Resource(新規リソース) 」をクリックして新しい WebHook リソースを作成します。

リソースタイプに Webhookを選択し、リクエスト し、認証情報としてAuthorizationリクエストヘッダを追加します。

レスポンスアクションの作成ページで新しく作成したリソースを選択し、メッセージテンプレートの内容を入力するだけです。

アナログデータの生成

以下のスクリプトは、10,000台のデバイスが過去24時間、5秒ごとにシミュレートされたデータを報告し、EMQ Xに送信するシナリオをシミュレートします。

  • メッセージ TPS: 0020
  • メッセージ TPS: 0020

読者はNode.jsをインストールし、必要に応じて設定パラメータを変更した後、以下のコマンドで起動できます:

npm install mqtt mockjs --save --registry=https://...rg
node mock.js

添付ファイル:EMQ Xコードに同時に送信されるデータの生成をシミュレートし、クラスタのパフォーマンスに応じて関連するパラメータを調整してください。

// mock.js const mqtt = require('mqtt') const Mock = require('mockjs') const EMQX_SERVER = 'mqtt://localhost:1883' const CLIENT_NUM = 10000 const STEP = 5000 // シミュレーション取得間隔 ms const AWAIT = 5000 // 各送信後の休眠時間により、高速メッセージレートmsを防ぐ const CLIENT_POOL = [] startMock() function sleep(timer = 100) { return new Promise(resolve => { setTimeout(resolve, timer) }) } async function startMock() { const now = Date.now() for (let i = 0; i < CLIENT_NUM; i++) { const client = await createClient(`mock_client_${i}`) CLIENT_POOL.push(client) } // last 24h every 5s const last = * 1000 for (let ts = now - last; ts <= now; ts += STEP) { for (const client of CLIENT_POOL) { const mockData = generateMockData() const data = { ...mockData, id: client.clientId, area: 0, ts, } client.publish('sensor/data', JSON.stringify(data)) } const dateStr = new Date(ts).toLocaleTimeString() console.log(`${dateStr} send success.`) await sleep(AWAIT) } console.log(`Done, use ${(Date.now() - now) / 1000}s`) } /** * Init a virtual mqtt client * @param {string} clientId ClientID */ function createClient(clientId) { return new Promise((resolve, reject) => { const client = mqtt.connect(EMQX_SERVER, { clientId, }) client.on('connect', () => { console.log(`client ${clientId} connected`) resolve(client) }) client.on('reconnect', () => { console.log('reconnect') }) client.on('error', (e) => { console.error(e) reject(e) }) }) } /** * Generate mock data */ function generateMockData() { return { "temperature": parseFloat(Mock.Random.float().toFixed(2)), "humidity": parseFloat(Mock.Random.float(12, 86).toFixed(2)), "volume": parseFloat(Mock.Random.float().toFixed(2)), "PM10": parseFloat(Mock.Random.float(0, 300).toFixed(2)), "pm25": parseFloat(Mock.Random.float(0, 300).toFixed(2)), "SO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)), "NO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)), "CO": parseFloat(Mock.Random.float(0, 50).toFixed(2)), "area": Mock.Random.integer(0, 20), "ts": 170, } }

可視化の構成

コンポーネントがインストールされ、シミュレートされたデータが正常に書き込まれたら、Grafanaビジュアライゼーションインターフェースの指示に従って、ビジネスに必要なデータビジュアライゼーションの設定を完了します。

データソースの追加

データソースを追加します。 TDengine タイプのデータソースを選択し、接続パラメータを入力して構成します。 デフォルトでは、キーとなる構成情報は以下のとおりです:

ダッシュボードの追加

データソースを追加した後、表示する必要があるデータダッシュボード情報を追加します。ダッシュボードは複数の視覚化パネルの集まりです。[ 新規ダッシュボード]をクリックした後、 [+ クエリ ] を選択してクエリ別にデータパネルを追加します。

パネルを作成するには、「 クエリ」、「ビジュアライゼーション」、「一般」、「アラート」の4つのステップと、作成するタイミングがあります

平均パネル

Grafanaのビジュアルクエリビルダーを使用して、すべてのデバイスの平均値をクエリします。

以下の SQL は、指定された期間および指定された時間間隔に従って、データ内の主要メトリクスの平均値をクエリします:

select avg(temperature), avg(humidity), avg(volume), avg(PM10), avg(pm25), avg(SO2), avg(NO2), avg(CO) from test.sensor_data where coll_time >= $from and coll_time < $to interval($interval)

General] で、パネルの名前を [Historical Average]に変更します。 ビジネスを監視してアラートする必要がある場合は、 [Alert] でアラート・ルールをアレンジできます。

ダッシュボードの作成が完了したら、左上隅の [Back] ボタンをクリックして、ダッシュボードにデータ パネルを追加します。上部のナビゲーションバーにある保存アイコンをクリックし、ダッシュボード名を入力してダッシュボードの作成を完了します。

最大値と最小値のパネル

引き続き、ダッシュボードの パネル追加 ボタンをクリックして、最大と最小のチャートを追加します。手順は平均値の追加と同じですが、クエリの SELECT 統計手法フィールドだけが AVG 関数を MAXと MINに調整するために調整されます:

select max(temperature), max(humidity), max(volume), max(PM10), max(pm25), max(SO2), max(NO2), max(CO), min(temperature), min(humidity), min(volume), min(PM10), min(pm25), min(SO2), min(NO2), min(CO) from test.sensor_data where coll_time >= $from and coll_time < $to interval($interval)

ダッシュボード効果

ダッシュボードを保存し、ドラッグ&ドロップで各データパネルのサイズと位置を調整し、最終的にデータダッシュボードのより良い視覚効果を得ることができます。ダッシュボードの右上隅には、時間間隔を選択することができ、自動更新時間は、この時点でデバイスがデータ収集データを送信し続け、ダッシュボードのデータ値は、より良い視覚効果を達成するために、変更されます。

まとめ

EMQ X + TDengineで、IoTデータの伝送、保存、プレゼンテーションの全プロセスが完成しました。 読者は、EMQ Xの豊富な拡張機能とTDengineの優れたビッグデータプラットフォーム機能が、IoTデータ収集にどのように適用できるかを学ぶことができます。Grafanaの他の機能について詳しく学んだ後、データの可視化、さらにはモニタリングシステムをカスタマイズすることができます。

Read next

統合テスト・エンジンの ShardingSphere 4.x テスト・エンジン

Junitはすべてのテストデータを集約し、ひとつずつテストメソッドに渡してアサーションを行います。データは砂時計の砂のように扱われます。メタデータとテストデータはファイルに定義されます。メタデータとテストデータはファイルの中で定義されます。例えば、開発者はライブラリやテーブルを構築するステートメントをファイル内でカスタマイズすることができます。 このファイルは...

Aug 14, 2020 · 4 min read

JavaScript入門

Aug 14, 2020 · 1 min read

リボン

Aug 14, 2020 · 2 min read