blog

Pulsar Functionsに基づくイベント処理デザイン・パターン

この記事では、一般的なリアルタイム・ストリーミング・パターンとその実装を紹介します。 最初に、Apache Pulsar Functionsを使ったコンテンツ・ベース・ルーティングの実装方法を確認しま...

Mar 12, 2020 · 10 min. read
シェア

この記事では、一般的なリアルタイム・ストリーミング・モードとその実装について紹介します。

パターン1:ダイナミック・ルーティング

最初に、Apache Pulsar Functionsを使ってコンテンツ・ベース・ルーティングを実装する方法を確認します。コンテンツ・ベース・ルーティングは統合パターンです。このパターンは何年も前から存在し、イベント・センターやメッセージング・フレームワークでよく使われています。基本的な考え方は、各メッセージの内容を調べ、メッセージの内容に基づいて異なる宛先にメッセージをルーティングすることです。

以下の例では、3つの異なる値を設定できるApache Pulsar SDKを使用しています。

  • メッセージ内のマッチを見つけるために使用される正規表現
  • 式パターンにマッチしたときにメッセージが送信されたトピック。
  • 式パターンにマッチしない場合にメッセージが送信されるトピック。

この例では、機能ロジックに基づいてイベントの送信先を動的に決定するPulsar Functionsの威力を紹介します。

 import java.util.regex.*;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
 public ContentBasedRoutingFunction implements Function<String, String> {
 String process(String input, Context context) throws Exception {
 String regex = context
 .getUserConfigValue(「正規表現”).toString();
 String matchedTopic = context
 .getUserConfigValue(「マッチド・トピック”).toString();
 String unmatchedTopic = context
 .getUserConfigValue(「無比のトピック”).toString();
 Pattern p = Pattern.compile(regex);
 Matcher m = p.matcher(input);
 if (m.matches()) {
 context.publish(matchedTopic, input);
 } else {
 context.publish(unmatchedTopic, input);
 }
 }
 }

パターン2:フィルタリング

フィルタモードは、指定された条件を満たすイベントのみを保持することで、トピック上の ほとんどのイベントを除外する場合に選択します。フィルタ モードは、一定額以上のクレジット カードによる支払い、ログ ファイル内のエラー メッセージ、一定のしきい値を超えるセンサーの読み取り値など、関心のあるイベントのみを検索するのに特に効果的です。

あるユーザがクレジットカード取引のイベントフローを監視し、不正行為や不審な行動を検知しようとし ているとします。取引量が多く、「同意する/同意しない」を選択できる時間が限られているため、ユーザはまず、キャッシングや多額の支払いなど、「リスクが高い」特徴を持つ取引をフィルタリングする必要があります。

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import com.company.creditcard.Purchase;
public class FraudFilter implements Function<Purchase, Purchase> {
 Purchase process(Purchase p, Context context) throws Exception {
 if (p.getTransactionType() == キャッシング') ||
 p.getAmount > 500.00) {
 return p;
 }
 return null;
 }
}

フィルタは、「リスク」特性を持つトランザクションをフィルタリングするために使用できます。フィルタはこれらの「リスク」特性を特定し、これらのトランザクションのみを別のトピックにルーティングし、さらに評価します。フィルタリング後、すべてのクレジット カード決済を「不正の可能性」トピックにルーティングし、さらに評価することができます。

図 2 は、3 つの別々の支払いオブジェクトに基づく FraudFilter 関数を示しています。最初の支払いは所定の基準を満たし、さらなる評価のために「不正の可能性」トピックにルーティングされますが、2 番目と 3 番目の支払いは不正基準を満たしていないためフィルタリングされます。

パターン3:変換

変換モードは、イベントをあるタイプから別のタイプに変換したり、入力イベントの値を追加、削除、変更するために使用されます。

投影

プロジェクション・モードは、リレーショナル代数におけるプロジェクション演算子に似ており、入力イベントの属性のサブセットを選択し、それらの属性のみを含む出力イベントを作成します。プロジェクション・パターンは、イベントからセンシティブなフィールドを削除したり、イベントの必要な属性のみを保持するために使用できます。図 3 は、下流のトピックにレコードをパブリッシュする前に、入力された社会保障番号を「マス ク」するための、プロジェクション・モードのアプリケーションを示しています。

エンリッチメント・パターン

エンリッチメント・パターンは、入力属性に存在しないデータを出力イベントに追加するために使用されます。典型的なエンリッチメント・パターンは、入力イベント内のキー値に基づいて参照されるデータのある種のルックアップを含みます。次の例は、入力イベントに含まれる IP アドレスに基づいて、地理的な場所を出力イベントに追加する方法を示しています。

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import com.company.creditcard.Purchase;
import com.company.services.GeoService;
public class IPLookup implements Function<Purchase, Purchase> {
 Purchase process(Purchase p) throws Exception {
 Geo g = GeoService.getByIp(p.getIPAddress());
 // By default, these fields are blank, so we just modify the object
 p.setLongitude(g.getLon());
 p.setLatitiude(g.getLat());
 return p;
 }
}

分離パターン

スプリットモードでは、イベントプロセッサは1つの入力イベントを複数の出力イベントに分割します。分割モードは、入力イベントが複数の個別イベントを含むバッチで、各イベントを個別に処理したい場合に便利です。次の図は、分割モードでの処理を示しています。入力は改行で区切られ、設定された出力トピックに 1 行ずつポストされます。

この機能は以下のように実装されています:

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public class Splitter implements Function<String, String> {
 String process(String s, Context context) throws Exception {
 Arrays.asLists(s.split( \R”).forEach(line ->
 context.publish(context.getOutputTopic(), line));
 return null;
 }
}

モード4:アラームとしきい値

アラームとしきい値モードは、検出条件に基づくアラームの検出と生成を可能にします。アラームは、単純な値に基づいて生成することも、より複雑な条件に基づいて生成することもできます。

以下の例では、ユーザが設定したしきい値パラメータと、アラート通知を受信する電子メー ルアドレスに基づいて、アラートを生成します。この関数は、設定されたしきい値を超えるセンサーイベントを受信すると、電子メールを送信します。

import javax.mail.*;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public SimpleAlertFunction implements Function<Sensor, Void> {
 Void process(Sensor sensor, Context context) throws Exception {
 Double threshold = context
 .getUserConfigValue(閾値”).toString();
 String alertEmail = context
 .getUserConfigValue(「アラート・メール”).toString();
 if (sensor.getReading() >= threshold) {
 Session s = Session.getDefaultInstance();
 MimeMessage msg = new MineMessage(s);
 msg.setText(「センサーへの警告:” + sensor.getId());
 Transport.send(msg);
 }
 return null;
 }
}

以下は、特定のセンサー読み取り値の成長率に基づいてアラートを生成するステートフルファンクションの例です。アラートを発生させるかどうかを決定する際、過去のセンサー読み取り値にアクセスする必要があります。

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public ComplexAlertFunction implements Function<Sensor, Void> {
 Void process(Sensor sensor, Context context) throws Exception {
 Double threshold = context
 .getUserConfigValue(閾値”).toString();
 String alertTopic = context
 .getUserConfigValue(「アラート・トピック”).toString();
 // Get previous & current metric values
 Float previous = context.getState(sensor.getId() + 「メトリック”);
 Long previous_time = context.getState(sensor.getId() + "-metric-time”);
 Float current = sensor.getMetric();
 Long current_time = sensor.getMetricTime();
 // Calculate Rate of change & compare to threshold.
 Double rateOfChange = (current-previous) /
 (current_time-previous_time);
 if (abs(rateOfChange) >= threshold) {
 // Publish the sensor ID to the alert topic for handling
 context.publish(alertTopic, sensor.getId());
 }
 // Update metric values
 context.putState(sensor.getId() + 「メトリック”, current);
 context.putState(sensor.getId() + "-metric-time”, current_time);
 }
}

アパッチ・パルサー機能ステータス管理機能では、以前の測定値と時間のみが保持され、センサーIDがこれらの値に追加されます。簡単のため、イベントは正しい順序で到着するものとします。つまり、常に最新の測定値が表示され、順序から外れた測定値は表示されません。

さらに、今回は、センサー ID は、電子メールを送信するだけでなく、さらなる処理のために専用のアラートトピックに転送されます。このようにして、イベントをさらに充実させることができます。例えば、センサーの地理的位置を取得し、適切な人々に通知することができます。

モード5:単純カウントとウィンドウカウント

Simple CountモードとWindow Countモードでは、入力としてイベントのコレクションを受け取り、入力イベントに関数を適用することで目的の出力イベントを生成する集計関数を使用します。集約関数には、合計、平均、最大、最小、パーセンタイルなどがあります。

以下は、Pulsar Functionsを使って、与えられたトピック内の各単語の出現回数の合計を計算する「単語カウント」を実装する例です。

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public WordCountFunction implements Function<String, Void> {
 Void process(String s, Context context) throws Exception {
 Arrays.asLists(s.split( \.”).forEach(word -> context.incrCounter(word, 1));
 return null;
 }
}

ストリーミング・データ・ソースのエンドレスな性質を考えると、これらの計算は通常データ・ウィンドウにわたって実行されるため、不定な集約はほとんど役に立ちません。

図7に示すように、データ・ウィンドウはイベント・ストリームの有限のサブセットを表します。しかし、データ・ウィンドウの境界はどのように定義すべきでしょうか?ウィンドウを定義するために使用される一般的なプロパティが2つあります:

  • **トリガー・ポリシー:** 関数コードが実行される、またはトリガーされるタイミングを制御します。Apache Pulsar Functionフレームワークは、これらのルールを使って、処理ウィンドウに収集された全てのデータをコードに通知します。
  • **クリア・ポリシー:** ウィンドウに保持されるデータ量を制御します。これらのルールは、ウィンドウからデータ要素をパージするかどうかを決定するために使用されます。

どちらの戦略も、時間またはウィンドウ内のデータ量によって動きます。この2つの違いは何でしょうか?また、両者はどのように連動するのでしょうか?複数のウィンドウのテクニックのうち、最もよく使われるのはスクロールウィンドウとスライドウィンドウです。

スクロール・ウィンドウ

ウィンドウが一杯になることだけが、スクロールウィンドウを消去するストラテジーの条件なので、トリガーストラテジーを使いたいことを指定するだけです。カウントベースのスクロールウィンドウはどのように動作しますか?

図8の最初の例では、トリガー・ポリシーが2に設定されています。これは、ウィンドウに2つのアイテムがあるとトリガーが発火し、パルサー・ファンクション・コードの実行が始まることを意味します。この一連の動作は時間に依存せず、ウィンドウ・カウントが2に達するまで5秒かかったか5時間かかったかは関係ありません。重要なのは、ウィンドウ・カウントが2に達したことです。

上記のカウントベースのスクロールウィンドウと時間ベースのスクロールウィンドウを比較してください。10秒のインターバルの後、ウィンドウ内のイベント数に関係なく関数コードがトリガーされます。下図では、1つ目のウィンドウには7つのイベントがあり、2つ目のウィンドウには3つのイベントしかありません。

スライディング・ウィンドウ

スライディング ウィンドウ カウントはウィンドウの長さを定義し、ウィンドウの長さは処理のために保持されるデータ量を制限するクリア ポリシーを設定し、スライディング インターバルはトリガ ポリシーを定義します。ローリング・ウィンドウ・ポリシーとスライディング・ウィンドウ・ポリシーの両方は、時間または長さに基づいて定義できます。

つまり、2秒より前のデータはクリアされ、計算に使用されません。スライディング間隔は1秒で、これはパルサー関数コードが1秒ごとに実行されることを意味します。このようにして、ウィンドウの全長にわたってデータを処理することができます。

これまでの例では、すべて時間に基づいてクリアおよびトリガポリシーを定義していますが、長さに基づいてクリアまたはトリガポリシーを定義することもできます。

Pulsar Functionsでどちらのタイプのウィンドウ関数を実装するのも簡単で、以下のように入力タイプとしてjava.util.Collectionを指定し、関数を作成する際に-userConfigフラグで適切なウィンドウ構成プロパティを指定するだけです。

前述の時間窓の4つのシナリオを実装するために使用される設定パラメータは以下のとおりです:

  • "-windowLengthCount": ウィンドウごとのメッセージ数
  • "-windowLengthDurationMs": ウィンドウの長さ
  • "-slidingIntervalCount": ウィンドウスライド後のメッセージ数
  • "-slidingIntervalDurationMs": ウィンドウスライド後の時間

正しい組み合わせは下表の通りです:

時間、バッチウィンドウ -windowLengthDurationMs = XXXX
長さ、スライディングウィンドウ

-windowLengthCount = XXXX -slidingIntervalCount = XXXX
長さ、バッチウィンドウ -windowLengthCount = XXXX

概要

この記事では、Apache Pulsar Functionsを使用して汎用ストリーム処理パターンを実装する方法をいくつか説明します。これらの処理モードには、コンテンツ・ベースのルーティング、フィルタリング、変換、警告、単純なカウント・アプリケーションなどが含まれます。また、基本的なウィンドウの概念とApache Pulsar Functionsが提供するウィンドウ機能についても説明します。

Read next

Flutterアプリケーションの起動解析(Androidの視点)

Flutterプロジェクトはandで設定されているので、アプリケーション起動時に何が行われるかを見てみましょう。 主な初期化は、aotの共有ライブラリ名、flutterのAscetsディレクトリ...など、多くの変数を初期化することです。

Mar 12, 2020 · 40 min read