blog

Twitter Stormの高度な初期設定

このブログは、Stormの簡単な入門例であり、読者にStormがどのような操作メカニズムであるかを理解してもらうことが目的です。また、その後のStormのいくつかの高度な機能のリリースや、最終的なHa...

Dec 11, 2015 · 5 min. read
シェア

このブログは、読者にStormがどのように動作するかを理解してもらうことを目的とした、Stormを使い始める簡単な例です。その後、Stormのいくつかの高度な機能がリリースされ、最終的にはStormがHadoop 2.x YARNに統合されます。読者の目的は、高度なビッグデータHadoop、Sparkユーザーを持っているか、またはStormを理解することですStormの読者ユーザーの理解を深めたい。

プロジェクト・ポム

<repositories> 
<repository> 
<id>central</id> 
<name>Maven Repository Switchboard</name> 
<layout>default</layout> 
<url>http://..net/content/groups/public/</url> 
<snapshots> 
<enabled>false</enabled> 
</snapshots> 
</repository> 
<repository> 
<id>clojars</id> 
<url>https://.org/repo/</url> 
<snapshots> 
<enabled>false</enabled> 
</snapshots> 
<releases> 
<enabled>true</enabled> 
</releases> 
</repository> 
</repositories> 
<dependencies> 
<dependency> 
<groupId>org.yaml</groupId> 
<artifactId>snakeyaml</artifactId> 
<version>1.13</version> 
</dependency> 
<dependency> 
<groupId>org.apache.zookeeper</groupId> 
<artifactId>zookeeper</artifactId> 
<version>3.3.3</version> 
</dependency> 
<dependency> 
<groupId>org.clojure</groupId> 
<artifactId>clojure</artifactId> 
<version>1.5.1</version> 
</dependency> 
<dependency> 
<groupId>storm</groupId> 
<artifactId>storm</artifactId> 
<version>0.9.0.1</version> 
</dependency> 
<dependency> 
<groupId>storm</groupId> 
<artifactId>libthrift7</artifactId> 
<version>0.7.0</version> 
</dependency> 
</dependencies> 

以下に、Storm用HelloWordの例を示します。コードは簡略化されており、Stormに詳しい読者であれば、コードを整理して完全な例にすることができます。

public static void main(String[] args) { 
Config conf = new Config(); 
conf.put(Config.STORM_LOCAL_DIR, "/Volumes/Study/data/storm"); 
conf.put(Config.STORM_CLUSTER_MODE, "local"); 
//conf.put("storm.local.mode.zmq", "false"); 
conf.put("storm.zookeeper.root", "/storm"); 
conf.put("storm.zookeeper.session.timeout", 50000); 
conf.put("storm.zookeeper.servers", "nowledgedata-n15"); 
conf.put("storm.zookeeper.port", 2181); 
//conf.setDebug(true); 
//conf.setNumWorkers(2); 
TopologyBuilder builder = new TopologyBuilder(); 
builder.setSpout("words", new TestWordSpout(), 2); 
builder.setBolt("exclaim2", new DefaultStringBolt(), 5) 
.shuffleGrouping("words"); 
LocalCluster cluster = new LocalCluster(); 
cluster.submitTopology("test", conf, builder.createTopology()); 
} 

Config.STORM_LOCAL_DIRは、Stormが設定情報と一時データを書き込むローカルパスを設定します。

Config.STORM_CLUSTER_MODEはランタイムモードで、ローカルモードと分散モードの2つのオプションがあります。実行時のローカルモードは、マルチスレッドシミュレーション、開発、テスト、分散クラスタの分散モードは、マルチプロセス、真に分散されています。

デバッグは、より詳細なログ情報を持つテストモードです。

TestWordSpout は、ランダムな新しい String[] {"nathan", "mike", "jackson", "golda", "bertels"}; データソースを提供するために使用される文字列のリストです。

DefaultStringBoltのソースコード:

OutputCollector collector; 
public void prepare(Map conf, TopologyContext context, OutputCollector collector) { 
this.collector = collector; 
} 
public void execute(Tuple tuple) { 
log.info("rev a message: " + tuple.getString(0)); 
collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); 
collector.ack(tuple); 
} 

ランニングログ

10658 [Thread-29-exclaim2] INFO .pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 
10658 [Thread-31-exclaim2] INFO .pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 
10758 [Thread-26-exclaim2] INFO .pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike 
10758 [Thread-33-exclaim2] INFO .pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan 
10859 [Thread-26-exclaim2] INFO .pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan 
10859 [Thread-29-exclaim2] INFO .pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels 
10961 [Thread-31-exclaim2] INFO .pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 
10961 [Thread-33-exclaim2] INFO .pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 
11061 [Thread-35-exclaim2] INFO .pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan 
11062 [Thread-35-exclaim2] INFO .pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan 
11162 [Thread-26-exclaim2] INFO .pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels 
11163 [Thread-26-exclaim2] INFO .pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 

データはSpoutと呼ばれるStormによって生成され、バックエンドのBlotのチェーンに渡され、最終的に変換されて消費されます。そしてSpoutもBlotも並列で、並列度はすべて自分で設定できます。例えば

builder.setSpout("words", new TestWordSpout(), 2); 
builder.setBolt("exclaim2", new DefaultStringBolt(), 5) 

ノズルTestWordSpoutの平行度は2、DefaultStringBoltの平行度は5です。

ログからわかるように、データはノズルを通り、事前にスケジュールされたBlotに送られ、ログが出力されました。私のテストコードに設定された並列度は5で、ログは確かに5スレッドであることをカウントしています:

Thread-29-exclaim2 
Thread-31-exclaim2 
Thread-26-exclaim2 
Thread-33-exclaim2 
Thread-35-exclaim2 

ストームとは何ですか?詳しい説明は。

OSCのユーザーの言葉を借りれば、Hadoopはショッピングモールの自動エレベーターで、ユーザーは列に並んで待つ必要があり、フロアを押して選択し、それから到着します。一方、ストームはエスカレーターのようなもので、エスカレーターはあらかじめ動くように設定されており、入ってきた人はすぐに運ばれ、目的地は明確です。

Stormのこの機能は、ビッグデータ型のETLシステム開発に適しています。

Read next

自分に合ったセキュリティ製品が一番安全だ!

近年、セキュリティ事件が頻発しているため、IT管理者は警戒を強めており、情報技術構築は重要な課題となっています。ネットワーク攻撃の多様化、新しい攻撃手法の出現により、IT管理者は多くのセキュリティソリューションに直面し、途方に暮れることが多く、ファイアウォール、侵入防御システム、アンチウィルスゲートウェイ、Webアプリケーションファイアウォール......結局、どの製品が自社に適しているのでしょうか?

Dec 9, 2015 · 5 min read