Cloud HadoopでSpark Scala Jobを提出する

目次
始める前に
ローカルでScalaコードの作成及びコンパイル
クラスタの終了

このページではSpark Scala Jobを生成してCloud Hadoopクラスタに提出する方法についてご説明します。

始める前に

次のような順番でCloud Hadoopを設定してください。

  1. Object Storageバケットを生成する
  2. Cloud Hadoopクラスタを生成する

ローカルでScalaコードの作成及びコンパイル

このセクションでは二つの方法を利用してSpark ApplicationをScalaで作成し、jarにパッケージングする方法をご説明します。

Scalaを使用

このセクションではターミナルで "HelloScala"を出力する非常に簡単なScalaコードを作成してコンパイルし、 jarにパッケージングします。

  1. Scala バイナリーを ダウンロードして圧縮を解除します。

    MacOSでhomebrewを使用する際は次のように設置できます。

     brew install scala
    
  2. 実行ファイル(例:.bashrc)で SCALA_HOME環境変数を設定して PATH$SCALA_HOMEを追加します。

     export SCALA_HOME=/usr/local/share/scala
     export PATH=$PATH:$SCALA_HOME/
    
  3. Scala REPLを実行してscalaでclassを作成します。

     ❯ scala
     # Welcome to Scala version ...
     # Type in expressions to have them evaluated.
     # Type :help for more information.
     # scala>
    

    以下のように HelloWorld.scalaを作成して保存します。

     object HelloWorld {
     def main(args: Array[String]): Unit = {
         println("Hello, world!")
     }
     }
    
     scala> :save HelloWorld.scala
     scala> :q
    
  4. scalacでコンパイルします。

     ❯ scalac  HelloWorld.scala
    

    コンパイルされた .classファイルを確認できます。

     ❯ ls HelloWorld*.class
     HelloWorld$.class HelloWorld.class
    
  5. jar ファイルを生成する

    jar 命令を使用してjarファイルを生成します。

    jar命令語を実行するためには Java SE, JREが設置されていなければなりません。

    HelloWorld*.class ファイルがあるディレクトリに移動後、以下のような命令語を実行してクラスファイルを jarにパッケージングします。

     ❯ jar cvfe HelloWorld.jar HelloWorld HelloWorld*.class
     added manifest
     adding: HelloWorld$.class(in = 670) (out= 432)(deflated 35%)
     adding: HelloWorld.class(in = 645) (out= 524)(deflated 18%)
    

    パッケージングされた jarファイルでHelloWorld classがアプリのentry pointとして設定されたのを MANIFEST.MFで確認できます。

     ❯ unzip -q -c HelloWorld.jar META-INF/MANIFEST.MF
     Manifest-Version: 1.0
     Created-By: 1.8.0_181 (Oracle Corporation)
     Main-Class: HelloWorld # entry point
    

IntelliJ SBT プラグインの使用

このセクションでは Spark Applicationを開発/デバッグするための環境をntelliJに設定して'Hello Scala'というWord Count Jobをビルドします。ビルドマネージャにはSBTを使用します。例を作成した環境はMacOS Mojave, IntelliJ Ultimate 2018.2.5です。

  1. IntelliJ の設定
  2. 新たなプロジェクトの生成
  3. Import SBTライブラリー
  4. Spark Applicationの作成
  5. jarファイルの生成

IntelliJの設定

IntelliJをスタートして Configure > Plugins > Browse Repositoriesから Scalaを検索して設置します。設置後はPluginを反映するためにIntelliJをリスタートします。

新たなプロジェクトの生成

IntelliJをスタートして Create New Projectを選択します。

Scalaとビルドマネージャにsbtを選択し、Nextをクリックします。

プロジェクト名をWordCountに指定した後、sbtScala のバージョンを選択します。Finishをクリックしてプロジェクトの生成を終わらせます。

プロジェクトを生成すると、基本的に次のような構造でディレクトリとファイルが生成されます。

  • .idea: IntelliJ 構成ファイルです。
  • project: コンパイルに使用されるファイルです。
  • src: ソースコード。アプリコードのほとんどはsrc/mainにある必要があります。 src/testはテストスクリプトのためのスペースです。
  • target: プロジェクトをコンパイルするとこのスペース内に保存されます。
  • build.sbt: SBT の構成ファイルです。

Import SBTライブラリー

IntelliJがSparkコードを認識するためには spark-core ライブラリー及び文書をimportしなければなりません。

spark-coreライブラリーは特定バージョンのScalaとのみ互換できるため、ライブラリーを importする際はspark-coreとScalaバージョンをそれぞれ確認する必要があります。

mvn repositoryでArtifact Idと一緒に互換されるScalaバージョンも確認できます。

適切なspark-core ライブラリー、Scalaバージョンを選択後、build.sbtに上の内容のように一行を追加します。

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"

それからライブラリーが正常にimportされたかBuildコンソールを確認します。

SBTでライブラリーをimportする際は次のようなsyntaxを使用します。

Group Id %% Artifact Id % Revision

Spark Applicationの作成

このセクションで作成するSpark Applicationはシェイクスピアソネットテキストファイルをデータセットとして使用して、ソネットに含まれた単語を数える簡単なWord Countアプリです。shakespeare.txtをダウンロードして src/main/resourcesに置きます。 Cloud Hadoopクラスタでこのアプリを実行する際はS3 バケットまたはHDFSにデータセットをアップロードして使用します。

src/main/scala 右クリック > New > Scala Classをクリックして WordCount/src/main/scala 下に新たなクラスを生成します。この際 KindObjectです。

実際にアプリを作成する前に設定がきちんとできているか確認するため、 WordCount.scalaに以下のようなサンプルコードをコピーして実行(Ctrl+Shift+R)してみます。

object WordCount {
    def main(args: Array[String]): Unit = {
      println("This is WordCount application")
    }
}

以下のように正常にプリントされたら正常です。

それでは、単語数を数えるコードとして WordCount.scalaを代替します。

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {

  def main(args: Array[String]) {

    //Create a SparkContext to initialize Spark
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("Word Count")
    val sc = new SparkContext(conf)

    // Load the text into a Spark RDD, which is a distributed representation of each line of text
    val textFile = sc.textFile("src/main/resources/shakespeare.txt")

    //word count
    val counts = textFile.flatMap(line => line.split(" "))
      .map(word => (word, 1))
      .reduceByKey(_ + _)

    counts.foreach(println)
    System.out.println("Total words: " + counts.count());
    counts.saveAsTextFile("/tmp/sonnetWordCount")
  }

}

Master URLs
Sparkの配備環境によってMaster URLが異なります。

  • local(pseudo-cluster): local, local[N], local[*] (使用するthread数によって分かれる、 *はJVMで最大に使用できるプロセス分の threadsを使用)
  • clustered
    • Spark Standalone: spark://host:port,host1:port1...
    • Spark on Hadoop YARN: yarn
    • Spark on Apache Mesos: mesos://

WordCount.scalaを実行すると出力結果は以下の通りです。

jarファイルの生成

jjarファイルを生成してCloud Hadoopクラスタにデプロイする前にObject Storageバケットにデータセットをアップロードします。そしてソースコードのresourceファイルルートを次のように変更する必要があります。データセットを HDFSにアップロードして使用する際は s3a://の代わりに hdfs://を使用します。

// FROM
conf.setMaster("local")

// TO
conf.setMaster("yarn-cluster")

// FROM
val textFile = sc.textFile("src/main/resources/shakespeare.txt")
// TO
val textFile = sc.textFile("s3a://deepdrive-hue/tmp/shakespeare.txt")

// FROM
counts.saveAsTextFile("/tmp/sonnetWordCount");

// TO
counts.saveAsTextFile("s3a://deepdrive-hue/tmp/sonnetWordCount");

ここではSpark1.6を基準にするので conf.setMaster()yarn-clusterに明示しなければなりません。Spark2からは yarnで使用できます。

次に、アップデートしたコードを Cloud Hadoopクラスタに提出できる compiled jarにパッケージングします。この jarファイルにはアプリコードとbuild.sbtに定義した全てのdependenciesが含まれています。

プロジェクトホームに移動して次のようにパッケージングします。 sbt package 命令は $PROJECT_HOME/target/scala-2.11 下に wordcount_2.11-0.1.jar ファイルを生成します。

cd ~/IdeaProjects/WordCount # PROJECT_HOME
sbt package

Cloud HadoopクラスタにSpark Jobを提出

このセクションではローカルで作成したSpark Application(.jar)をCloud Hadoopにデプロイして提出する方法をご説明します。

Object Storageにjarをアップロード

HueのS3ブラウザまたはObject Storageコンソールを使用してshakespeare.txtとjarをObject Storageバケットにコピーします。

Jobを提出する

このセクションではjar.ファイルをクラスタに提出する二つの方法についてご説明します。

spark-defaults.confに次のような propertyが正確に設定されていなければなりません。

spark.hadoop.fs.s3a.access.key=OBEJCT-STORAGE-ACCESS-KEY
spark.hadoop.fs.s3a.endpoint=http://kr.objectstorage.ncloud.com
spark.hadoop.fs.s3a.secret.key=OBJECT-STORAGE-SECRET-KEY
spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
  • HueのSpark Submit Jar 利用

  • spark clients ノードで提出

    クラスタで Spark Clientが設置されたノードで次のように spark-submit 命令を実行します。

      spark-submit --class WordCount --master yarn-cluster --deploy-mode cluster s3a://deepdrive-hue/tmp/wordcount_2.11-0.1.jar
    

Jobが完了したら結果が以下のように指定したバケットルート内に保存されるのを確認できます。

Deploy mode
Deploy modeはデプロイ環境でドライバー(Spark Context)が実行される位置によって決まります。モードには以下のようなオプションがあります。

  • client (default): Spark アプリが実行されるマシーンでドライバーが実行
  • cluster: クラスタ内のランダムなノード内でドライバーが実行

spark-submit 命令語の --deploy-mode cliオプション、またはSpark property構成で spark.submit.deployModeに変更できます。

クラスタの終了

作業が実行されていない時にコストがかかるのを防ぐためには、Cloud Hadoopクラスタを終了します。これ以上不要なデータセットファイルはバケットから削除するか、バケット自体を削除します。

削除が完了したファイルまたはバケットはリカバリーできません。

Cloud Hadoopクラスタの終了

Cloud Hadoop クラスタを終了すると、ノードのローカルファイルシステムや HDFS内に保存されたファイルが全て無くなります。必要なファイルはObject Stroageバケットにコピーするなど、別途コピーしておく必要があります。

.Cloud Hadoopコンソールから終了するクラスタを選択して 削除 ボタンをクリックします。

Object Stroage ファイルまたはバケットを削除する

Object Storageコンソールから削除するファイルを選択して 編集 > 削除するをクリックします。

バケットを削除する際は、バケットの右側ボタンをクリックして バケットを削除をクリックします。

に対する検索結果は~件です。 ""

    に対する検索結果がありません。 ""

    処理中...