SparkカーネルはScala言語で開発されているので、Scala言語を使ってSparkアプリケーションを開発するのは自然な流れです。Scala言語に馴染みがない場合は、Webチュートリアル「A Scala Tutorial for Java Programmers」や関連するScala書籍を読んで学ぶことができます。
この記事では、Sparkの3つの代表的なアプリケーションであるWordCount、TopK、SparkJoinの3つのScala Sparkプログラミング例を紹介します。
1.WordCountプログラミング例
WordCountは最も単純な分散アプリケーションの例の1つで、主な機能は入力ディレクトリ内の全単語の出現回数の合計を数えることです:
Spark On YARNの場合、最も重要なのは最初の2つのパラメータで、***最初のパラメータは "yarn-standalone"、2番目のパラメータはカスタム文字列です。standalone"、2番目のパラメータはカスタム文字列です:
val sc = new SparkContext(args(0), "WordCount",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))
ステップ2:入力データを読み込むHDFSからテキストデータを読み込むには、SparkConの
val textFile = sc.textFile(args(1))
val inputFormatClass = classOf[SequenceFileInputFormat[Text,Text]]
var hadoopRdd = sc.hadoopRDD(conf, inputFormatClass, classOf[Text], classOf[Text])
または、HadoopRDD オブジェクトを作成します:
var hadoopRdd = new HadoopRDD(sc, conf,
classOf[SequenceFileInputFormat[Text,Text, classOf[Text], classOf[Text])
ステップ3:RDD変換演算子によるRDDの操作と変換、WordCountの場合、まず入力データの文字列の各行から単語を解析し、同じ単語をバケツに入れ、***例として、各バケツ内の各単語の出現頻度の統計を取る必要があります:
val result = hadoopRdd.flatMap{
case(key, value) => value.toString().split("\s+");
}.map(word => (word, 1)). reduceByKey (_ + _)
ステップ4: 結果のRDDデータセットをHDFSに保存します。SparkContextのsaveAsTextFileハッシュを使用して、データセットをHDFSディレクトリに保存できます。デフォルトでは、Hadoopが提供するTextOutputFormatが使用され、各レコードは""の形式で出力されます。saveAsSequenceFile 関数を使用して SequenceFile 形式などで保存することもできます:
result.saveAsSequenceFile(args(2))
もちろん、一般的なSparkプログラムを書く際には、以下の2つのヘッダーファイルをインクルードする必要があります:
import org.apache.spark._
import SparkContext._
WordCount の完全な手順は、「Apache Spark Learning: Using Eclipse to build the Spark Integrated Development Environment」の記事で紹介しましたので、今回は割愛します。
入力ファイルと出力ファイルを指定する場合は、hdfsのURIを指定する必要があることに注意する必要があります。例えば、入力ディレクトリはhdfs://hadoop-test/tmp/input、出力ディレクトリはhdfs://hadoop-test/tmp/outputです。-test "はHadoop設定ファイルcore-site.xmlのパラメータfs.default.nameで指定されます。
2.TopKプログラミングの例
TopKプログラムのタスクは、テキストの束に対して単語の頻度カウントを実行し、頻度***で出現するK個の単語を返すことです。MapReduceの実装を使用する場合、WordCountとTopKの2つのジョブを記述する必要がありますが、Sparkの使用は1つのジョブのみで、WordCountの部分は前の実装によって実装されており、次のTopKの単語を見つけるために、前の実装に従います。この記事の実装は***ではないことに注意してください、多くの改善の余地があります。
ステップ1:まず、すべての単語を単語頻度順に並べ替えます:
val sorted = result.map {
case(key, value) => (value, key); //exchange key and value
}.sortByKey(true, 1)
ステップ2:最初のKに戻ります:
val topK = sorted.top(args(3).toInt)
ステップ3:Kの単語をそれぞれプリントアウトします:
topK.foreach(println)
アプリケーションの標準出力の内容については、YARN はコンテナの標準出力ログに保存することに注意してください。YARNでは、各Containerにstdout、stderr、syslogの3つのログファイルがあります。 最初の2つは標準出力によって生成された内容を保存し、3つ目はlog4jによって出力されたログを保存します。
この手順のコード一式、コンパイル済みjarパッケージ、実行スクリプトはこちらからダウンロードできます。ダウンロード後は、「Apache Spark Learning: Using Eclipse to build the Spark integrated development environment」の記事に従って操作を行ってください。
3.SparkJoinプログラミング例
import org.apache.spark._
import SparkContext._
object SparkJoin {
def main(args: Array[String]) {
if (args.length != 4 ){
println("usage is org.test.WordCount <master> <rating> <movie> <output>")
return
}
val sc = new SparkContext(args(0), "WordCount",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))
// Read rating from HDFS file
val textFile = sc.textFile(args(1))
//extract (movieid, rating)
val rating = textFile.map(line => {
val fileds = line.split("::")
(fileds(1).toInt, fileds(2).toDouble)
})
val movieScores = rating
.groupByKey()
.map(data => {
val avg = data._2.sum / data._2.size
(data._1, avg)
})
// Read movie from HDFS file
val movies = sc.textFile(args(2))
val movieskey = movies.map(line => {
val fileds = line.split("::")
(fileds(0).toInt, fileds(1))
}).keyBy(tup => tup._1)
// by join, we get <movie, averageRating, movieName>
val result = movieScores
.keyBy(tup => tup._1)
.join(movieskey)
.filter(f => f._2._1._2 > 4.0)
.map(f => (f._1, f._2._1._2, f._2._2._2))
result.saveAsTextFile(args(3))
}
}
コード、コンパイル済みjarパッケージ、実行スクリプトはここからできます。
4.まとめ
HadoopプログラミングでJava言語があまり必要ないように、SparkプログラミングではScala言語があまり必要ありません。最低限の構文でプログラムを書くことが可能で、一般的な構文や表現はほとんどありません。一般的に、プログラムはまず、Scala、Java、Pythonの例を含む公式の例をモデルにして書かれます。