Cloud Hadoop에서 Spark Scala Job 제출하기

목차
시작하기 전에
로컬에서 Scala 코드 작성 및 컴파일
클러스터 종료

이 페이지에서는 Spark Scala Job을 생성하고 Cloud Hadoop 클러스터에 제출하는 방법에 대해서 설명합니다.

시작하기 전에

다음과 같은 단계로 Cloud Hadoop을 설정하세요.

  1. Object Storage 버킷 생성하기
  2. Cloud Hadoop 클러스터 생성하기

로컬에서 Scala 코드 작성 및 컴파일

이 섹션에서는 두가지 방법을 이용해서, Spark Application을 Scala로 작성하고 jar로 패키징하는 방법을 설명합니다.

Scala 사용

이 섹션에서는 터미널에서 "HelloScala"를 출력하는 매우 간단한 Scala 코드를 작성한 뒤 컴파일 하여 jar로 패키징합니다.

  1. Scala 바이너리를 다운로드하고 압축을 풉니다.

    MacOS에서 homebrew를 사용하는 경우 다음과 같이 설치할 수 있습니다.

     brew install scala
    
  2. 실행 파일(예: .bashrc)에 SCALA_HOME 환경변수를 설정하고, PATH$SCALA_HOME을 추가합니다.

     export SCALA_HOME=/usr/local/share/scala
     export PATH=$PATH:$SCALA_HOME/
    
  3. Scala REPL을 실행해서 scala로 class를 작성합니다.

     ❯ scala
     # Welcome to Scala version ...
     # Type in expressions to have them evaluated.
     # Type :help for more information.
     # scala>
    

    아래와 같이 HelloWorld.scala를 작성하고 저장합니다.

     object HelloWorld {
     def main(args: Array[String]): Unit = {
         println("Hello, world!")
     }
     }
    
     scala> :save HelloWorld.scala
     scala> :q
    
  4. scalac로 컴파일합니다.

     ❯ scalac  HelloWorld.scala
    

    컴파일된 .class파일을 확인할 수 있습니다.

     ❯ ls HelloWorld*.class
     HelloWorld$.class HelloWorld.class
    
  5. jar 파일 만들기

    jar 명령을 사용하여 jar 파일을 생성합니다.

    jar 명령어를 실행하려면, Java SE, JRE가 설치되어 있어야 합니다.

    HelloWorld*.class 파일이 있는 디렉토리로 이동한 후, 아래와 같은 명령어를 실행하여 클래스파일을 jar로 패키징 합니다.

     ❯ jar cvfe HelloWorld.jar HelloWorld HelloWorld*.class
     added manifest
     adding: HelloWorld$.class(in = 670) (out= 432)(deflated 35%)
     adding: HelloWorld.class(in = 645) (out= 524)(deflated 18%)
    

    패키징된 jar 파일에서 HelloWorld class가 애플리케이션의 entry point로 설정된 것을 MANIFEST.MF에서 확인할 수 있습니다.

     ❯ unzip -q -c HelloWorld.jar META-INF/MANIFEST.MF
     Manifest-Version: 1.0
     Created-By: 1.8.0_181 (Oracle Corporation)
     Main-Class: HelloWorld # entry point
    

IntelliJ SBT 플러그인 사용

이 섹션에서는 Spark Application을 개발/디버그하기 위한 환경을 IntelliJ에 설정하고 'Hello Scala'라는 Word Count Job을 빌드합니다. 빌드 매니저로는 SBT를 사용합니다. 예시를 작성한 환경은 MacOS Mojave, IntelliJ Ultimate 2018.2.5입니다.

  1. IntelliJ 설정
  2. 새 프로젝트 생성
  3. Import SBT 라이브러리
  4. Spark Application 작성
  5. jar 파일 생성

IntelliJ 설정

IntelliJ를 시작 후, Configure > Plugins > Browse Repositories에서 Scala를 검색하여 설치합니다. 설치 후, Plugin을 반영하기 위해서 IntelliJ를 재시작합니다.

새 프로젝트 생성

IntelliJ를 시작한 후, Create New Project를 선택합니다.

Scala와 빌드매니저로 sbt를 선택한 후, Next를 클릭합니다.

프로젝트의 이름을 WordCount로 지정한 후, sbtScala 버전을 선택합니다. Finish를 클릭하여 프로젝트 생성을 마무리합니다.

프로젝트를 생성하면, 기본적으로 다음과 같은 구조로 디렉토리 및 파일이 생성됩니다.

  • .idea: IntelliJ 구성 파일입니다.
  • project: 컴파일에 사용되는 파일입니다.
  • src: 소스코드. 애플리케이션 코드의 대부분은 src/main에 있어야 합니다. src/test는 테스트 스크립트를 위한 공간입니다.
  • target: 프로젝트를 컴파일하면 이 공간 안에 저장됩니다.
  • build.sbt: SBT 구성 파일입니다.

Import SBT 라이브러리

IntelliJ가 Spark 코드를 인식하기 위해서는 spark-core 라이브러리 및 문서를 import해야 합니다.

spark-core 라이브러리는 특정 버전의 Scala와만 호환 가능하기 때문에, 라이브러리를 import할 때에는 spark-core와 Scala 버전을 각각 확인해야 합니다.

mvn repository에서 Artifact Id와 함께 호환되는 Scala 버전도 확인할 수 있습니다.

알맞은 spark-core 라이브러리, Scala 버전을 선택한 후에, build.sbt에 위의 내용처럼 한 줄을 추가합니다.

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"

이제 라이브러리가 정상적으로 import되었는지 Build 콘솔을 확인합니다.

SBT에서 라이브러리를 import할 때는 다음과 같은 syntax를 사용합니다.

Group Id %% Artifact Id % Revision

Spark Application 작성

이 섹션에서 작성할 Spark Application은 셰익스피어 소넷 텍스트 파일을 데이터 셋으로 사용하여, 소넷에 포함된 단어를 세는 간단한 Word Count 애플리케이션입니다. shakespeare.txt을 다운로드하여 src/main/resources에 놓습니다. Cloud Hadoop 클러스터에서 이 애플리케이션을 실행할 때에는 S3 버킷 또는 HDFS에 데이터 셋을 업로드해서 사용합니다.

src/main/scala 우클릭 > New > Scala Class를 클릭해서 WordCount/src/main/scala 아래 새로운 클래스를 생성합니다. 이때 KindObject입니다.

실제 애플리케이션을 작성하기 전에, 설정이 제대로 됐는지 확인하기 위하여 WordCount.scala에 아래와 같은 샘플 코드를 복사해서 실행(Ctrl+Shift+R)해 봅니다.

object WordCount {
    def main(args: Array[String]): Unit = {
      println("This is WordCount application")
    }
}

아래와 같이 정상적으로 프린트되었다면 정상입니다.

이제 단어 수를 세는 코드로 WordCount.scala를 대체합니다.

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {

  def main(args: Array[String]) {

    //Create a SparkContext to initialize Spark
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("Word Count")
    val sc = new SparkContext(conf)

    // Load the text into a Spark RDD, which is a distributed representation of each line of text
    val textFile = sc.textFile("src/main/resources/shakespeare.txt")

    //word count
    val counts = textFile.flatMap(line => line.split(" "))
      .map(word => (word, 1))
      .reduceByKey(_ + _)

    counts.foreach(println)
    System.out.println("Total words: " + counts.count());
    counts.saveAsTextFile("/tmp/sonnetWordCount")
  }

}

Master URLs
Spark 배포 환경에 따라 Master URL이 달라집니다.

  • local(pseudo-cluster): local, local[N], local[*] (사용하는 thread 수에 따라 나뉨, *은 JVM에서 최대한 사용할 수 있는 프로세서만큼의 threads를 사용)
  • clustered
    • Spark Standalone: spark://host:port,host1:port1...
    • Spark on Hadoop YARN: yarn
    • Spark on Apache Mesos: mesos://

WordCount.scala를 실행하면 출력 결과는 아래와 같습니다.

jar 파일 생성

jar 파일을 생성해서 Cloud Hadoop 클러스터에 배포하기 전에, Object Storage 버킷에 데이터 셋을 업로드합니다. 그리고 소스코드의 resource 파일 경로를 다음과 같이 변경해야 합니다. 데이터 셋을 HDFS에 업로드해서 사용하려면 s3a:// 대신에 hdfs://를 사용하면 됩니다.

// FROM
conf.setMaster("local")

// TO
conf.setMaster("yarn-cluster")

// FROM
val textFile = sc.textFile("src/main/resources/shakespeare.txt")
// TO
val textFile = sc.textFile("s3a://deepdrive-hue/tmp/shakespeare.txt")

// FROM
counts.saveAsTextFile("/tmp/sonnetWordCount");

// TO
counts.saveAsTextFile("s3a://deepdrive-hue/tmp/sonnetWordCount");

여기서는 Spark1.6을 기준으로 하므로 conf.setMaster()에서 yarn-cluster로 명시해야 합니다. Spark2부터는 yarn으로 사용 가능합니다.

이제 업데이트한 코드를 Cloud Hadoop 클러스터에 제출할 수 있는 compiled jar로 패키징해야 합니다. 이 jar 파일에는 애플리케이션 코드와 build.sbt에 정의한 모든 dependencies가 포함되어 있습니다.

프로젝트 홈으로 이동해서 다음과 같이 패키징합니다. sbt package 명령은 $PROJECT_HOME/target/scala-2.11 아래에 wordcount_2.11-0.1.jar 파일을 생성합니다.

cd ~/IdeaProjects/WordCount # PROJECT_HOME
sbt package

Cloud Hadoop 클러스터에 Spark Job 제출

이 섹션에서는 로컬에서 작성한 Spark Application(.jar)를 Cloud Hadoop에 배포하여 제출하는 방법을 설명합니다.

Object Storage에 jar 업로드

Hue의 S3브라우저 또는 Object Storage 콘솔을 사용하여 shakespeare.txt와 jar를 Object Storage 버킷에 복사합니다.

Job 제출하기

이 섹션에서는 jar 파일을 클러스터에 제출하는 두 가지 방법에 대해서 설명합니다.

spark-defaults.conf에 다음과 같은 property가 제대로 설정되어 있어야 합니다.

spark.hadoop.fs.s3a.access.key=OBEJCT-STORAGE-ACCESS-KEY
spark.hadoop.fs.s3a.endpoint=http://kr.objectstorage.ncloud.com
spark.hadoop.fs.s3a.secret.key=OBJECT-STORAGE-SECRET-KEY
spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
  • Hue의 Spark Submit Jar 이용

  • spark clients 노드에서 제출

    클러스터에서 Spark Client가 설치된 노드에서 다음과 같이 spark-submit 명령을 실행합니다.

      spark-submit --class WordCount --master yarn-cluster --deploy-mode cluster s3a://deepdrive-hue/tmp/wordcount_2.11-0.1.jar
    

Job이 완료되면 아래와 같이 결과가 지정한 버킷 경로 안에 저장되는 것을 확인할 수 있습니다.

Deploy mode
Deploy mode는 배포 환경에서 드라이버(Spark Context)가 실행되는 위치에 따라 결정됩니다. 모드에는 아래와 같은 옵션이 있습니다.

  • client (default): Spark 애플리케이션이 실행되는 머신에서 드라이버가 실행
  • cluster: 클러스터안의 랜덤한 노드안에서 드라이버가 실행

spark-submit 명령어의 --deploy-mode cli 옵션, 또는 Spark property 구성에서 spark.submit.deployMode로 변경할 수 있습니다.

클러스터 종료

작업이 실행되고 있지 않을 때 비용이 나가는 것을 막으려면, Cloud Hadoop 클러스터를 종료합니다. 더 이상 필요하지 않은 데이터 셋 파일은 버킷에서 삭제하거나 버킷 자체를 삭제합니다.

삭제가 완료된 파일 또는 버킷은 복구할 수 없습니다.

Cloud Hadoop 클러스터 종료

Cloud Hadoop 클러스터를 종료하면, 노드의 로컬 파일 시스템이나 HDFS 안에 저장된 파일은 모두 유실됩니다. 필요한 파일은 Object Stroage 버킷에 복제 등, 별도 복제를 해두어야 합니다.

Cloud Hadoop 콘솔에서 종료할 클러스터를 선택한 후 [삭제] 버튼을 클릭합니다.

Object Stroage 파일 또는 버킷 삭제하기

Object Storage 콘솔에서 삭제할 파일을 선택한 후 편집 > 삭제하기를 클릭합니다.

버킷을 삭제하려면, 버킷 우측 버튼을 클릭해서 버킷 삭제를 클릭합니다.

""에 대한 건이 검색되었습니다.

    ""에 대한 검색 결과가 없습니다.

    처리중...