このブログは、読者にStormがどのように動作するかを理解してもらうことを目的とした、Stormを使い始める簡単な例です。その後、Stormのいくつかの高度な機能がリリースされ、最終的にはStormがHadoop 2.x YARNに統合されます。読者の目的は、高度なビッグデータHadoop、Sparkユーザーを持っているか、またはStormを理解することですStormの読者ユーザーの理解を深めたい。
プロジェクト・ポム
<repositories>
<repository>
<id>central</id>
<name>Maven Repository Switchboard</name>
<layout>default</layout>
<url>http://..net/content/groups/public/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>clojars</id>
<url>https://.org/repo/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.13</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.3.3</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.9.0.1</version>
</dependency>
<dependency>
<groupId>storm</groupId>
<artifactId>libthrift7</artifactId>
<version>0.7.0</version>
</dependency>
</dependencies>
以下に、Storm用HelloWordの例を示します。コードは簡略化されており、Stormに詳しい読者であれば、コードを整理して完全な例にすることができます。
public static void main(String[] args) {
Config conf = new Config();
conf.put(Config.STORM_LOCAL_DIR, "/Volumes/Study/data/storm");
conf.put(Config.STORM_CLUSTER_MODE, "local");
//conf.put("storm.local.mode.zmq", "false");
conf.put("storm.zookeeper.root", "/storm");
conf.put("storm.zookeeper.session.timeout", 50000);
conf.put("storm.zookeeper.servers", "nowledgedata-n15");
conf.put("storm.zookeeper.port", 2181);
//conf.setDebug(true);
//conf.setNumWorkers(2);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 2);
builder.setBolt("exclaim2", new DefaultStringBolt(), 5)
.shuffleGrouping("words");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
}
Config.STORM_LOCAL_DIRは、Stormが設定情報と一時データを書き込むローカルパスを設定します。
Config.STORM_CLUSTER_MODEはランタイムモードで、ローカルモードと分散モードの2つのオプションがあります。実行時のローカルモードは、マルチスレッドシミュレーション、開発、テスト、分散クラスタの分散モードは、マルチプロセス、真に分散されています。
デバッグは、より詳細なログ情報を持つテストモードです。
TestWordSpout は、ランダムな新しい String[] {"nathan", "mike", "jackson", "golda", "bertels"}; データソースを提供するために使用される文字列のリストです。
DefaultStringBoltのソースコード:
OutputCollector collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple tuple) {
log.info("rev a message: " + tuple.getString(0));
collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
collector.ack(tuple);
}
ランニングログ
10658 [Thread-29-exclaim2] INFO .pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
10658 [Thread-31-exclaim2] INFO .pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
10758 [Thread-26-exclaim2] INFO .pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
10758 [Thread-33-exclaim2] INFO .pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
10859 [Thread-26-exclaim2] INFO .pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
10859 [Thread-29-exclaim2] INFO .pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels
10961 [Thread-31-exclaim2] INFO .pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
10961 [Thread-33-exclaim2] INFO .pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
11061 [Thread-35-exclaim2] INFO .pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
11062 [Thread-35-exclaim2] INFO .pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
11162 [Thread-26-exclaim2] INFO .pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels
11163 [Thread-26-exclaim2] INFO .pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
データはSpoutと呼ばれるStormによって生成され、バックエンドのBlotのチェーンに渡され、最終的に変換されて消費されます。そしてSpoutもBlotも並列で、並列度はすべて自分で設定できます。例えば
builder.setSpout("words", new TestWordSpout(), 2);
builder.setBolt("exclaim2", new DefaultStringBolt(), 5)
ノズルTestWordSpoutの平行度は2、DefaultStringBoltの平行度は5です。
ログからわかるように、データはノズルを通り、事前にスケジュールされたBlotに送られ、ログが出力されました。私のテストコードに設定された並列度は5で、ログは確かに5スレッドであることをカウントしています:
Thread-29-exclaim2
Thread-31-exclaim2
Thread-26-exclaim2
Thread-33-exclaim2
Thread-35-exclaim2
ストームとは何ですか?詳しい説明は。
OSCのユーザーの言葉を借りれば、Hadoopはショッピングモールの自動エレベーターで、ユーザーは列に並んで待つ必要があり、フロアを押して選択し、それから到着します。一方、ストームはエスカレーターのようなもので、エスカレーターはあらかじめ動くように設定されており、入ってきた人はすぐに運ばれ、目的地は明確です。
Stormのこの機能は、ビッグデータ型のETLシステム開発に適しています。