blog

Spark-NLPでテキスト分類モデルを構築する

概要\nJohn Snow LabsのSpark NLPをAWS EHR上にセットアップし、BBCの記事の簡単なテキスト分類のためにライブラリを使用します。\n\nプレゼンテーション\n自然言語処理は...

Jun 25, 2020 · 9 min. read
シェア

概要

  • John Snow LabsのSpark NLPをAWS EHR上に構築し、BBCの記事の簡単なテキスト分類のためにライブラリを使用します。

プレゼンテーション

自然言語処理は、世界中のデータサイエンスチームにとって重要なプロセスの1つです。データの継続的な増加に伴い、ほとんどの組織はapachehadoopなどのビッグデータプラットフォームやAWS、Azure、GCPなどのクラウドオファリングに移行しています。

これらのプラットフォームは、ビッグデータを扱うことができるだけでなく、組織が非構造化データの大規模な分析を実行することを可能にします。しかし、機械学習に関しては、ビッグデータ・システムと機械学習ツールの間にはまだギャップがあります。

scikit-learnやGensimなどの一般的な機械学習Pythonライブラリは、シングルノードコンピュータでの実行に高度に最適化されており、分散環境向けに設計されていません。

Apache Spark MLlibは、線形回帰、ロジスティック回帰、サポートベクターマシン、ランダムフォレスト、K-means、LDAなどの機械学習モデルのほとんどを提供し、最も一般的な機械学習タスクを実行することで、このギャップを埋めるのに役立つ多くのツールの1つです。

基本的な NLP パイプラインを構築するには、これらのコンバーターとエクストラクターで十分ですが、より包括的でプロダクショングレードのパイプラインを構築するには、ステミング分析、レマタイゼーション、語彙タグ付け、名前付きエンティティ認識など、より高度なテクニックが必要です。

Spark NLPには、高度なNLPタスクを実行するためのさまざまなアノテーターが用意されています。詳細については、ウェブサイトのアノテーターとその使用法のリストを参照してください。

nlp.johnsnowlabs.com/docs/ja/ann...

環境のセットアップ

それでは、AWS EMR上でSpark NLPをセットアップする方法を見ていきましょう。

1.EMRクラスタを起動するには、ブートオペレーションを作成する必要があります。ブートストラップ操作は、他のソフトウェアまたはカスタム・クラスタ・ノードの構成を設定するために使用されます。以下のブートストラップ操作は、EMRクラスタ上でSpark NLPをセットアップするために使用できます。

#!/bin/bashsudo yum install -y python36-devel python36-pip python36-setuptools python36-virtualenvsudo python36 -m pip install --upgrade pip
#
sudo python36 -m pip install pandas
#
sudo python36 -m pip install boto3
#
sudo python36 -m pip install re
#
sudo python36 -m pip install spark-nlp==2.4.5

シェルスクリプトを作成したら、AWS S3の任意の場所にスクリプトをコピーします。必要に応じて他のpythonパッケージもインストールできます。

2.EMRクラスタは、AWSコンソール、API、またはpythonのboto3ライブラリを使用して開始することができます。Pythonを使用する利点は、クラスタのインスタンス化やワークフローへの追加が必要なときにいつでもコードを再利用できることです。

以下はEMRクラスタをインスタンス化するpythonコードです。

import boto3region_name='region_name'def get_security_group_id(group_name, region_name):
 ec2 = boto3.client('ec2', region_name=region_name)
 response = ec2.describe_security_groups(GroupNames=[group_name])
 return response['SecurityGroups'][0]['GroupId']emr = boto3.client('emr', region_name=region_name)cluster_response = emr.run_job_flow(
 Name='cluster_name', #  
 ReleaseLabel='emr-5.27.0',
 LogUri='s3_path_for_logs', #  
 Instances={
 'InstanceGroups': [
 {
 'Name': "Master nodes",
 'Market': 'ON_DEMAND',
 'InstanceRole': 'MASTER',
 'InstanceType': 'm5.2xlarge', # リクエストに応じて変更する
 'InstanceCount': 1 #マスターノードの高可用性のために、カウントを1以上に設定する
 },
 {
 'Name': "Slave nodes",
 'Market': 'ON_DEMAND',
 'InstanceRole': 'CORE',
 'InstanceType': 'm5.2xlarge', # リクエストに応じて変更する
 'InstanceCount': 2
 }
 ],
 'KeepJobFlowAliveWhenNoSteps': True,
 'Ec2KeyName' : 'key_pair_name', #  
 'EmrManagedMasterSecurityGroup': get_security_group_id('ElasticMapReduce-master', region_name=region_name)
 'EmrManagedSlaveSecurityGroup': get_security_group_id('ElasticMapReduce-master', region_name=region_name)
 },
 BootstrapActions=[ {
 'Name':'install_dependencies',
 'ScriptBootstrapAction':{
 'Args':[],
 'Path':'path_to_bootstrapaction_on_s3' #  
 }
 }],
 Steps = [],
 VisibleToAllUsers=True,
 JobFlowRole='EMR_EC2_DefaultRole',
 ServiceRole='EMR_DefaultRole',
 Applications=[
 { 'Name': 'hadoop' },
 { 'Name': 'spark' },
 { 'Name': 'hive' },
 { 'Name': 'zeppelin' },
 { 'Name': 'presto' }
 ],
 Configurations=[
 # YARN
 {
 "Classification": "yarn-site", 
 "Properties": {"yarn.nodemanager.vmem-pmem-ratio": "4",
 "yarn.nodemanager.pmem-check-enabled": "false",
 "yarn.nodemanager.vmem-check-enabled": "false"}
 },
 # HADOOP
 {
 "Classification": "hadoop-env", 
 "Configurations": [
 {
 "Classification": "export", 
 "Configurations": [], 
 "Properties": {"JAVA_HOME": "/usr/lib/jvm/java-1.8.0"}
 }
 ], 
 "Properties": {}
 },
 # SPARK
 {
 "Classification": "spark-env", 
 "Configurations": [
 {
 "Classification": "export", 
 "Configurations": [], 
 "Properties": {"PYSPARK_PYTHON":"/usr/bin/python3",
 "JAVA_HOME": "/usr/lib/jvm/java-1.8.0"}
 }
 ], 
 "Properties": {}
 },
 {
 "Classification": "spark",
 "Properties": {"maximizeResourceAllocation": "true"},
 "Configurations": []
 },
 {
 "Classification": "spark-defaults",
 "Properties": {
 "spark.dynamicAllocation.enabled": "true" #default is also true
 }
 }
 ]
 )

注:ログ記録とブートストラップ操作スクリプトの保存に使用するS3バケットへのアクセス権が正しいことを確認してください。

BBC記事のSpark-NLPベースのテキスト分類

クラスタの準備ができたので、Spark NLPとSpark MLlibを使ってBBCのデータで簡単なテキスト分類の例を作ってみましょう。

Sparkの初期化

# Spark NLPをインポートする
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
import sparknlp
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline# Spark NLPでSparkセッションを開始する
#spark = sparknlp.start()spark = SparkSession.builder \
 .appName("BBC Text Categorization")\
 .config("spark.driver.memory","8G")\ change accordingly
 .config("spark.memory.offHeap.enabled",True)\
 .config("spark.memory.offHeap.size","8G") \
 .config("spark.driver.maxResultSize", "2G") \
 .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.4.5")\
 .config("spark.kryoserializer.buffer.max", "1000M")\
 .config("spark.network.timeout","3600s")\
 .getOrCreate()

テキストデータのロード

BBCのデータを使用します。このリンクからデータをダウンロードできます。下記のデータをダウンロード後、sparkコードで読み込んでください;

www.kaggle.com/yufengdev/b...

# ファイルの場所と種類
file_location = r'path\to\bbc-text.csv'
file_type = "csv"# CSV
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","df = spark.read.format(file_type) \
 .option("inferSchema", infer_schema) \
 .option("header", first_row_is_header) \
 .option("sep", delimiter) \
 .load(file_location)df.count()

データセットをトレーニングセットとテストセットに分割

(trainingData, testData) = df.randomSplit([0.7, 0.3], seed = 100)

randomSplit()関数は、weight配列とseedという2つの引数を取ります。この例では、70%をトレーニングデータ、30%をテストデータとして、70/30分割が使用されます。

Spark NLPによるNLPパイプライン

Spark NLPの最大の利点の1つは、Spark MLLibモジュールとネイティブに統合されていることで、変換器と推定器からなる包括的なMLパイプラインを構築するのに役立ちます。

このパイプラインには、CountVectorizerやHashingTF、IDFなどの特徴抽出モジュールを含めることができます。機械学習モデルもこのパイプラインに含めることができます。

以下は、特徴抽出と機械学習モデルからなるNLPパイプラインの例です;

from pyspark.ml.feature import HashingTF, IDF, StringIndexer, SQLTransformer,IndexToString
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator# テキストをnlpファイルに変換する
document_assembler = DocumentAssembler() \
 .setInputCol("text") \
 .setOutputCol("document")#ドキュメントを識別子の配列に変換する
tokenizer = Tokenizer() \
 .setInputCols(["document"]) \
 .setOutputCol("token")
# ロゴをきれいにする
normalizer = Normalizer() \
 .setInputCols(["token"]) \
 .setOutputCol("normalized")# 無効化された単語を削除する
stopwords_cleaner = StopWordsCleaner()\
 .setInputCols("normalized")\
 .setOutputCol("cleanTokens")\
 .setCaseSensitive(False)
stemmer = Stemmer() \
 .setInputCols(["cleanTokens"]) \
 .setOutputCol("stem")# カスタムドキュメント構造をアイデンティティの配列に変換する。
finisher = Finisher() \
 .setInputCols(["stem"]) \
 .setOutputCols(["token_features"]) \
 .setOutputAsArray(True) \
 .setCleanAnnotations(False)# 生成頻度
hashingTF = HashingTF(inputCol="token_features", outputCol="rawFeatures", numFeatures=1000)# 逆文書頻度を生成する
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)# ラベルを整数に変換する
label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")# 単純な多項ロジスティック回帰モデルを定義する。ハイパーパラメータの様々な組み合わせを試して、どれがよりデータにフィットするかを確認する。スコアを比較するために異なるアルゴリズムを試すこともできる。
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.0)# インデックスを対応するクラスラベルに変換する
label_to_stringIdx = IndexToString(inputCol="label", outputCol="article_class")# NLPパイプラインを定義する
nlp_pipeline = Pipeline(
 stages=[document_assembler, 
 tokenizer,
 normalizer,
 stopwords_cleaner, 
 stemmer, 
 finisher,
 hashingTF,
 idf,
 label_stringIdx,
 lr,
 label_to_stringIdx])

モデルのトレーニング

これでNLPパイプラインは、トレーニングデータに基づいて学習されたモデルを使用できるようになりました。

# トレーニングデータにパイプラインをフィッティングする
pipeline_model = nlp_pipeline.fit(trainingData)

予測の実装

学習が完了すれば、テストデータのクラスラベルを予測することができます。

# テストデータで予測を行う
predictions = pipeline_model.transform(testData)

モデルの評価

学習済みモデルを評価することは、そのモデルが未知のデータでどのように機能するかを理解するために重要です。3つの一般的な評価指標、accuracy、precision、recallが見られます。

  1. 精度
# 評価器をインポートする
from pyspark.ml.evaluation import MulticlassClassificationEvaluatorevaluator = MulticlassClassificationEvaluator(
 labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))
print("Test Error = %g " % (1.0 - accuracy))

  1. 精度
evaluator = MulticlassClassificationEvaluator(
 labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))
print("Test Error = %g " % (1.0 - accuracy))

evaluator = MulticlassClassificationEvaluator(
 labelCol="label", predictionCol="prediction", metricName="weightedRecall")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))
print("Test Error = %g " % (1.0 - accuracy))

ビジネスのユースケースに応じて、モデルを評価するためにどの指標を使用するかを決めることができます。

例機械学習モデルが特定のパラメータに基づいて癌を検出するように設計されている場合、企業は偽陰性のケースを許容することができないため、リコールを使用する方が良いでしょう。

パイプラインモデルの保存

モデルのトレーニング、テスト、評価に成功したら、モデルをディスクに保存して、別のSparkアプリケーションで使用することができます。モデルをディスクに保存するには、以下のコードを使用します;

pipeline_model.save('/path/to/storage_location')

結論

Sparl NLPは、Spark MLLibとシームレスに統合されており、分散環境でエンドツーエンドの自然言語処理プロジェクトを構築できます。

この論文では、AWS EMRにSpark NLPをインストールし、BBCデータのテキスト分類を実装する方法を調査しました。また、Spark MLlibの様々な評価指標を調査し、さらに使用するためにモデルを保存する方法を理解しました。

この記事を楽しんでいただけましたか?

パンチャンAIブログサイトへようこそ:panchuang.net/

Panchuangのブログリソースラウンドアップへようこそ:docs.panchuang.net/

Read next

CountDownLatch、Semaphore、CyclicBarrier、Conditionソースコード解析

他のスレッドで実行されている一連の処理が完了するまで、1つ以上のスレッドが待機できるようにする同期ヘルパークラスです。カウントダウン計算の概念。 最初にある整数のパラメータ値が与えられると、countDownメソッドを渡すすべてのプログラムは待機しなければなりません。

Jun 24, 2020 · 10 min read