[Spark] Programming-guide

Spark Programming Guide

from : https://spark.apache.org/docs/latest/programming-guide.html

개요 : 

각 스파크 어플리케이션은 main함수인 driver program과 각 클러스터에서 다양한 병렬 오퍼레이션으로 구성되어 실행된다.

스파크의 메인 추상화는 resilient distributed dataset(RDD)이다. 이것은 클러스터의 노드들간에  파티션된 엘리먼트의 컬렉션들이며 이들은 병렬로 처리되어진다.
RDD는 HDFS 에서 생성되어지며 (혹은 다른 하둡 지원의 파일 시스템) 혹은 드라이버 프로그램 내에서 스칼라 컬렉션으로 존재하는 데이터이며, 이들은 transforming되기도 한다.
사용자는 RDD를 메모리에 persist하도록 요청할 수 있으며, 병렬 처리를 위해 상호간에 효과적으로 재 사용 할 수 있도록 해준다.
마지막으로 RDD는 노드가 실패하는 경우 자동으로 복구 해준다.

두번째 추상화는 병렬 처리에서 사용될 수 있는 shared variables이다. 기본적으로 스파크가 서로다른 노드에서 태스크의 집합을 병렬로 수행할때 각 태스크의 함수에서 사용되는 각 변수들을 복제하여 옮긴다.
가끔 변수들은 태스크들 끼리 공유되거나 태스크와 드라이버 프로그램 사이에 공유된다.
스파크는 2개의 공유 변수를 제공한다.
broadcast variables : 모든 노드의 메모리에 캐시되어 사용될 수 있다.
accumulators : 이 변수는 counters와 sums와 같은 작업을 위해서 추가되는 변수이다.

# 참고 : 이문서는 python만을 정리하였다.
bin/pyspark 
상기 스크립트를 실행해보자.

Linking with Spark

Spark 1.6.1 은 Python 2.6 + 와 Python 3.4+ 에서 동작한다.
또한 CPython 인터프리터를 이용할 수 있다. NumPy와 같은 C라이브러리가 사용될 수 있다. 또한 이는 PyPy 2.3+ 버젼과 호환된다.

Spark어플리케이션을 Python에서 실행하기 위해서는 bin/spark-submit 스크립트를 이용할 수 있다.
이 스크립트는 Spark의 Java/Scala라이브러리를 로드할 것이다. 그리고 클러스터에 어플리케이션을 submit하게 해준다.
또한 bin/Pyspark 스크립트는 대화영 Python shell이다.

만약 HDFS데이터를 이용하고자 한다면, PySpark linking 빌드를 HDFS버젼과 연동 해야한다.
이는 Prebuilt packages에서 해당하는 HDFS 버젼에 맞게 설치하면 된다.

마지막으로 다음과 같이 프로그램에 Spark 클래스를 import할 필요가 있다.
from pyspark import SparkContext, SparkConf
PySpark 는 Python의 마이너 버젼과 동일한 버젼을 필요로 한다.
기본 파이선 버젼을 PATH에 걸어주는 작업이 필요하다. 특정 버젼을 설정하고자 한다면 PYSPARK_PYTHON의 값을 다음과 같이 설정하자.
$ PYSPARK_PYTHON=python3.4 bin/pyspark
$ PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python.pi.py

Initializing Spark

스파크 프로그램의 시작은 SparkContext를 생성하는 것으로 시작한다. 이것은 Spark에게 어떻게 클러스터에 접근할지를 알려준다.
SparkContext를 생성하기 위해서는 어플리케이션에 대한 정보를 포함하는 SparkConf를 빌드 해야한다.
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
appName은 어플리케이션의 이름으로 cluster UI에 나타난다. master는 Spark, Mesos혹은 YARN클러스터의 URL이거나 혹은 "local"(로컬모드 지원)이 올 수 있다.
실제에서는 클러스터에서 수행할때 프로그램에서 master를 하드코딩 하지 않을 것이다. 그러나 대신에 spark-submit으로 어플리케이션을 런치 하고, 결과를 수신 받을 것이다. 그러나 로컬 테스트와 유닛 테스트를 위해서는 "local"을 전달해서 스파크를 실행할 수 있다.

Using the Shell

PySpark쉘은 SparkContext가 이미 생성되어 있는 인터프리터이다. 이 변수는 sc라고 불린다.
자신의 SparkContext를 만들면 동작하지 않는다. 또한 master 아규먼트를 이용하여 마스터 컨텍스트 커넥션을 설정할 수 있다. 그리고 Python에 .zip, .egg, .py파일을 --py-files에 콤마로 나누어진 리스트를 전달할 수 있다.
또한 의존성을 쉘 세션에 전달할 수 있다. 이는 메이븐과 연동될 리스트를 콤마로 분리된 값을 전달하면 가능하며 --packages아규먼트에 실어서 보내면 된다. 만약 의존성에 필요한 추가적인 repository가 필요한경우라면 --repositories 아규먼트에 기술한다. 그리고 python에 필요한 스파크 의존성은 pip를 이용하여 필요한 의존성을 추가하자. 예를 들면 다음과 같이 할수 있다.

$./bin/pyspark --master local[4]
혹은 code.py 를 추가할 수있다.
$./bin/pyspark --master local[4] --py-files code.py
전체 옵션 항목을 확인해보기를 원한다면 pyspark --help 를 입력하자.
pyspark 는 더 많은 일반 spark-submit script를 호출한다.

IPython에서  PySpark 쉘을 런칭하는 것도가능하다. 이는 Python 인터럽터기능을 향상 시킨다. PySpark는 IPython 1.0.0과  이후 버젼에 호환한다. IPython을 사용하는 것은 PYSPARK_DRIVER_PYTHON 변수에 ipython 값을 지정하면된다.
$PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark
또한 ipython커맨드를 PYSPARK_DRIVER_PYTHON_OPTS 를 설정하여 커스터마이징 할 수 있다. 예를 들어 IPython Notebook 을 런칭할때 PyLab plot를 지원하도록 할 수 있다.
$PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook" ./bin/pyspark
IPython Notebook 서버가 런칭 된 이후에 "Python 2" 노트북을 "Files"탭으로부터 생성할 수 있다. notebook 내에서 %pylab inlines 커맨드를 IPython 노트북에서 Spark를 실행하기 전에 입력할 수 있다.

Resilient Distributed Datasets(RDDs)

스파크는 RDD(Resilient Distributed Datasets)의 개념을 바탕으로 수행된다. 이것은 엘리먼트들의 컬렉션으로 fault-talerant 한 컬렉션이다. 이것은 병렬로 수행될 수 있다.
RDD를 생성하기 위한 2가지 방법으로 드라이버 프로그램 내부에 존재하는 컬렉션을 parallelizing을 통해서 생성하는 것과 외부 저장 시스템에서 데이터 셋 혹은 공유 파일시스템, HDFS, HBase, 하둡 인풋 형식을 따르는 다양한 데이터 소스등을 참조하는 방법으로 생성이 가능하다.

Parallelized Collections

Parallelized Collections는 SparkContext의 parallelize메소드를 통해서 생성된다. 이것은 드라이버 프로그램 내부에 존재하는 반복가능한 데이터나 컬렉션을 이용한다. 컬렉션 엘리먼트들은 분산된 데이터 셋으로 부터 복제되어 병렬로 수행될 수 있다. 예를 들어 다음은 숫자 1 에서 5까지 값을 가지고 있는 컬렉션을 생성하는 것이다.

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
일단 생성이 되면 분산된 데이터셋(distData)는 병렬로 수행이 가능하다. 예를 들어 우리는 distData.reduce(lambda a, b: a + b)를 호출하여 각 엘리먼트의 리스트를 더할 수 있다.

중요한 파라미터중에 하나는 데이터를 잘라낼 파티션의 개수를 지정하는 것이다. 스파크는 각 클러스터의 파티션마다 하나의 태스크를 수행하게 된다. 일반적으로 CPU당 2 - 4개의 파티션을 생성한다. 보통 스파크는 클러스터에 따라 자동으로 파티션을 설정하는 작업을 시도한다. 그러나 수동으로 두번째 파라미터로 전달이 가능하다.

예)
sc.parallelize(data, 10)

External Datasets

PySpark는 Hadoop에 의해서 분산저장되어 있는 데이터셋을 생성할 수 있다. 로컬파일 시스템이나, HDFS, Cassandra, HBase, Amazon S3등으로 생성이 가능하다. Spark는 text 파일, SequenceFile, 그리고 다른 하둡 입력 포맷등을 지원한다.

Text 파일 RDD는 SparkContext의 textFile 메소드를 이용하여 생성이 가능하다. 이 메소드는 파일의 URI(hdfs://, s3n://, etc URI)를 획득한다. 그리고 라인의 컬렉션을 읽어 들인다.

>>> distFile = sc.textFile("data.txt")
일단 생성이 되면 dataFile는 dataset오퍼레이션에 의해서 동작된다. 예를 들어 우리는 모든 라인의 길이를 더하는 작업을 할 것이다. 이때 map과 reduce를 이용하는 예는 다음과 같다.
distFile.map(lambda s : len(s)).reduce(lambda a, b : a + b)
스파크에서 파일을 읽는 몇가지 주의 사항

1. 로컬 파일 시스템에서 패스를 이용하는 경우 파일은 반드시 워커 노드에서 동일하게 접근이 가능해야한다, 각 파일 복제본은 모든 워커들 혹은 네트워크 마운트로 공유된 파일 시스템으로 복제 된다.

2. 파일 기반의 입력 메소드는 모두 textFile를 포함한다. 디렉토리, 압축파일, 그리고 와일드카드 모두 가능하다. 예를 들면 다음과 같다.
textFile("/my/directory")
textFile("/my/directory/*.txt")
textFile("/my/directory/*.gz")
3. textFile메소드는 또한 추가적인 아규먼트를 가지며 이는 파일의 파티션 넘버를 지정하는 것이다. 기본적으로 스파크는 파일의 각 블록마다 하나의 파티션을 지정한다. (한 블록은 HDFS에서 기본적으로 64MB이다.) 더 큰 값을 지정할 수 있다. 그러나 주의할 점은 블록보다 더 적은 파티션을 지정하면 안된다.

텍스트 파일로 부터 스파크 파이선 API는 몇가지 데이터 포맷도 지원하고 있다.
1. SparkContext.wholeTextFiles
- 복수개의 작은 텍스트 파일들을 포함하는 디렉토리를 읽도록 한다. 그리고 그것들의 반환한다. (filename, context)쌍을 반환한다. 이것은 textFile과는 대조적으로 각 파일에서 각 라인은 하나의 레코드를 반환하게 된다.

2. RDD.saveAsPickleFile, SparkContext.pickleFile
- 파이선 객체의 피클로 구성된 단순 포맷으로 RDD를 저장한다. 이는 배치로 pickle serialization을 이용하며 기본 배치 크기는 10이다.

3. SequenceFile 그리고 Hadoop Input/Output 포맷

주의 : 이 기능은 현재 Experimental(실험용)로 마크 되어 있다. 그리고 advanced user을 대상으로 제공한다. 앞으로는 아마도 SparkSQL을 기준으로 read/write를 지원하게 될것이다. 앞으로는 Spark SQL을 선호되어질 것이다.

Writable Support
PySpark SequenceFile는 key-value 쌍의 RDD를 로드한다. 이는 또한 Java 타입으로 쓰기 가능한 형태로 변환되며 pickle들은 Java객체로 Pyrolite를 이용하여 변환된다. RDD의 key-value쌍을 SequenceFile로 저장할때에는 PySpark는 역으로 동작한다. Python object는 unpickle를 통해서 Java객체로 변환되고, 이 값은 writable 하도록 변환이 된다. 다음은 상호 자동 변환을 보여주는 표이다.
Writable TypePython Type
Textunicode str
IntWritableint
FloatWritablefloat
DoubleWritablefloat
BooleanWritablebool
BytesWritablebytearray
NullWritableNone
MapWritabledict
Arrays는 처리되지 않는다. 사용자는 특정 커스텀 ArrayWritable 서브 타입을 필요로 하게 된다. 쓰기를 할때 사용자들은 특정 커스텀 컨버터들을 필요로 하며 이는 arrays를 커스텀 ArrayWritable서브 타입으로 변환한다. 읽기시에 기본 컨버터는 커스텀 ArrayWritable서브타입을 Java객체 Object[]로 변환한다. 이 것은 pickle된 값을 Python tuples로 변환한다. Python array.array에서 프리미티브 타입의 배열을 획득하는 경우에도 사용자는 custom converter가 필요하다.

Saving and Loading SequenceFiles

텍스트파일과 마찬가지로 SequenceFiles는 특정 경로에 저장 및 로드할 수 있다. 키와 값의 클래스들이 지정될 수 있다. 그러나 표준 writables들은 필요하지 않다.

>>> rdd = sc.parallelize(range(1, 4)).map(lambda x : (x, "a" * x))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]
Saving and Loading Other Hadoop Input/Output Formats

PySpark는 또한 다른 Hadoop InputFormat으로부터 읽거나, Hadoop OutputFormat으로 부터 쓰기를 할 수 있다. new와 old하둡의 MapReduce API들 둘다 가능하다. 만약 필요한경우에 Hadoop설정은 Python dict에 전달될 수 있다. 다음 예제는 Elasticsearch ESInputFormat를 이용한 예제이다.
$ SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar ./bin/pyspark
>>> conf = {"es.resource" : "index/type"}  #참고 elasticsearch가 로컬에 수행되고 있다고 가정
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", \
            "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
>>> rdd.first() # 결과는 MapWritable이며 이는 Python dict로 변환된다.
(u'Elasticsearch ID',
    {u'field1' : True,
      u'field2' : u'Some Text',
      u'field3' : 12345} )
만약 InputFormat가 단순히 Hadoop 설정 input path에 의존되거나, key, value클래스가 쉽게 상단 테이블로 변환되어진다면, 해당 케이스에서 잘 동작 할 것이다.

만약 커스텀 serialized 바이너리 데이터를 가지고 있다면 (Cassandra / HBase로 부터 읽어들인 데이터같은..), 우선 데이터를 Scala/Java 진영의 객체로 변환될 필요가 있다. 이러한 작업은 Pyrolite의 pickler에서 처리 되도록 해준다. Converter 의 특징은 이것을 위해 제공된다. 단순히 이 특징과 convert메소드에서 변환된 코드들로 확장이 가능하다. 기억할 것은 이 클래스는 InputFormat로 접근할때 어따한 의존성이 필요하지 않아야 한다. 그리고 PySpark의 클래스 패스에 해당 경로의 jar파일을 포함하고 있어야 한다.

Cassandra / HBase의 InputFormat과 OutputFormat에 대한 커스텀 컨버터의 예제를 보려면 다음을 참조하자.
Python example, Converter example

RDD Operations

RDD는 2가지 타입의 오퍼레이션을 지원한다.
1. transformation
- 하나의 RDD에서 새로운 데이터 셋을 생성해 내는 작업을 말한다.
- map은 transformation이다. 이것은 각 데이터 엘리먼트를 특정 함수로 처리하여 새로운 RDD를 결과로 반환한다.

2. actions
- 데이터셋에서 계산을 마치고 드라이버 프로그램으로 값을 반환한다.
- reduce는 action으로 특정 function을 이용하여 RDD의 엘리먼트의 전체를 aggregate하고, 최종 결과를 driver 프로그램으로 반환한다. (참고, reduceByKey는 분산된 데이터 셋을 반환한다. 병렬로...)

모든 transformation은 lazy이다. 이것은 해당 결과가 바로 처리되지 않는다는 것이다. 대신에 변환이 적용되어야 한다는 것을 단지 기억만 한다. transformation은 action이 호출되는 경우에만 수행이 되며, 이는 드라이버 프로그램으로 최종값이 전달된다.
이러한 설계는 spark를 더욱 효과적으로 동작하게 만들어 준다. 예를 들어 우리는 map를 통해서 생성된 데이터는 reduce에 사용하기 위해서 만들어 진다는 것을 알고 있다. 그리고 reduce는 결국 driver로 전달되기 위해서 사용된다.

기본적으로 각 transform 처리된 RDD는 action이 호출될때 매번 재 계산이 된다.  그러나 RDD를 메모리에 저장할 수 있다. 이때에는 persist 나 cache메소드를 호출하면 된다. 이것은 다음 실행때 스파크를 더욱 빠르게 동작하도록 해준다. 또한 Disk에 RDD를 저장하거나 복수개의 노드에 복제하도록 할 수 도 있다.

Basis

RDD의 기본을 위해서 다음 프로그램을 살펴보자.
lines = sc.textFile("data.txt")
lineLength = lines.map(lambda s : len(s))
totalLength = lineLengths.reduce(lambda a, b : a + b)
첫번재 라인은 외부 파일로 RDD를 정의한다. 이 데이터셋은 action이 호출되기 전에는 로드되지 않는다. lines는 단지 파일의 포인터일 뿐이다.
두번째 라인은 lineLengths를 정의하고 잇으며 이는 map 변환을 수행하고 있다. 다시한변 lineLengths는 즉시 수행되지 않는다. 이는 laziness 때문이다.
최종적으로 reduce를 수행하며 이것은 action이다. 이 포인트에서 Spark는 태스크들을 몇몇 머신으로 작업을 분산 시키고, 각 머신은 map의 부분을 수행한다. 그리고 local reduction작업을 수행한다. 결과적으로 최종 결과는 driver 프로그램으로 반환된다.

만약 lineLengths라는 것을 이후에도 사용하고자 한다면 다음과 같이 하면 된다.
lineLengths.persist()
reduce이전에 lineLengths로 인해서 처음 수행이 끝이나면 메모리로 저장이 된다.

Passing Functions to Spark

Spark의 API는 driver 프로그램에서 클러스터로 수행되기 위해서 전달된다. 여기에는 3가지 추천 방법이 있다.
1. Lambda expressions
- 단순 함수를 위해서 표현식을 사용할 수 있다. (Lambda는 복수개의 함수들 혹은 스테이트먼트를 지원하지 않는다. 이것은 값을 바환하지 않는다.)

2. Local defs로 Spark로 호출되는 함수이다. 더 긴 코드

3. 모듈로 구성된 최상위 레벨의 함수들

예를 들어 lambda를 이용하는 것 보다 더 긴 함수를 전달하는 것은 다음과 같다.
"""MyScript.py"""
if __name__ == "__main__" :
    def myFunc(s) :
        words = s.split(" ")
        return len(words)
    sc = SparkContext(...)
    sc.textFile("file.txt").map(myFunc)
주의 : 클래스 인스턴스에 메소드의 참조를 전달할 수 있다. ( singleton object 에 반대하는것과 같이) 이것은 객체를 전달하는 것을 필요로 한다. 이러한 내용은 클래스가 메소드를 포함하고 있다는 것이다.
다음예를 보자.
class MyClass(object) :
    def func(self, s) :
        return s
    def doStuff(self, rdd) :
        return rdd.map(self.func)
만약 우리가 MyClass를 생성하고, doStuff를 호출한다면 map 내부에서는 MyClass인스턴스의 func메소드를 참조하고 있게 된다. 그래서 전체 객체가 클러스터에 전달 되어야 하는 문제가 생긴다.

유사한 방법으로 외부 객체의 접근은 전체 객체를 참조하게 된다.
class MyClass(object) :
    def __init__(self) :
        self.field = "Hello"
    def doStuff(self, rdd) :
        return rdd.map(lambda s : self.field + s)
이러한 이슈를 피하기 위해서는 단순한 방법으로 로컬 변수에 필드 값을 복사해서 해당 값을 전달하는 것이다. 전체 객체를 전달하는 것 대신에.
def doStuff(self, rdd) :
    field = self.field
    return rdd.map(lambda s : field + s)

Understanding closures

Spark에서 어려운 작업중에 하나는 scope의 이해와 변수의 라이프사이클, 그리고 클러스터들 사이에서 코드가 실행될때 메소드 들이다.
RDD작업은 자신의 scope의 외부 변수들을 수정한다. 이것은 자주 혼란한 소스를 만들어 낸다. 아래 예제에서 우리는 foreach를 이용하여 counter를 증가시키는 예를 그러한 내용을 확인해볼 것이다. 그러나 유사한 이슈가 다른 작업을 위해 발생할 수 있다.

Example
Native RDD 엘리먼트의 sum을 생각해보자. 이것은 동일한 JVM상에서 실행이 되고 있는것인지에 따라서 차이가 난다. 공통적인 예로 Spark가 local모드로 수행되고 있을때와 (--master = local[n]) Spark가 클러스터에서 수행될때(spark-submit로 YARN에서 수행될때)가 그것이다.

counter = 0
rdd = sc.parallelize(data)
# 오류 : 이렇게 하지말라.
def increment_counter(x)
    global counter
    counter += x
rdd.foreach(increment_counter)
print("Counter value: ", counter)
 Local Vs Cluster modes
상단코드의 행동은 정의되지 않았다. 그리고 의도되로 동작하지 않을 것이다. 작업 수행을 위해서 Spark는 RDD오퍼레이션 처리를 태스크로 분리하게 된다. 각 태스크는 executor에 의해서 수행된다. 이전 실행은 Spark 는 task의 closure를 계산한다. Clouser는 변수들이며, 메소드들이다. 이것은 RDD상에서 수행되기 위해서 visible하게 되어야 한다. (이 케이스에서는 foreach()임). 이 clouser는 serialized되며 각 executor에 전달된다.

closure와 함께 변수들은 각 executor에 전달되며, 이는 복제가 이루어진다. closure가 foreach함수에서 참조될때는 더이상 드라이버 노드의 counter이 아니다. driver node의 counter는 메모리에 존재하지만, executor에서는 보이지 않는다. executor들은 오직 serialized closure에서 복제된 경우에만 보여진다. 그러므로 counter의 최종값은 여전히 0으로 남아 있게 된다. 모든 오퍼레이션은 serialized closure에 대한 변수값을 참조하게 된다.

로컬 모드에서 몇몇 환경에서 foreach함수는 driver와 같이 같은 JVM에서 수행될 것이다. 그리고 동일한 원본 counter를 참조한다. 그리고 아마 실제 업데이트가 잘 될 것이다.

잘 동작하도록 정의하기 위해서는 이 시나리오를 정렬하여 Accumulator를 이용할 수 있도록 해야한다. Accumulators는 스파크에서 안전하게 변수값을 업데이트 하는 메커니즘을 위한 특별한 기능으로 사용된다. 이는 클러스터에서 워커 노드들 사이에 분산되어 수행될때 유용하다. Accumulator 섹션에서 더 자세히 다룰 것이다.

일반적으로 closures - constructs는 loop 혹은 로컬에 정의된 메소드와 같다. 이는 변경이 되는 몇가지 글로벌 상태값에는 사용되지 않는것이 보통이다. 스파크는 이러한 행동을 정의하지 않거나 혹은 보장하지 않는다. 이런 상황은 closures의 외부로 부터 참조된 객체를 변형하는 작업을 의미한다. 몇몇 코드는 로컬 모드에서 동작한다. 그러나 이는 우연으로 수행된 것이며 분산 모드에서는 정상적으로 수행되지 않는다. Accumulator의 사용은 만약 글로벌 집계가 필요한경우에 대신해서 사용 한다.

Printing elements of an RDD

또 하나의 공통 용어는 RDD를 rdd.foreach(println) 혹은 rdd.map(println)을 이용하여 엘리먼트를 출력하는 것이다.
단일 머신에서는 기대한 값을 출력할 것이다. 그리고 모든 RDD의 엘리먼트역시 출력된다.
그러나 클러스터 모드에서는 stdout을 이용한 출력은 executor에 의해서 호출되며, 이것은 executor의 stdout을 대신해서 출력한다. 이것은 driver의 stdout이 아니다. 그러므로 driver의 stdout은 보여지지 않게 된다.
모든 엘리먼트를 드라이버에서 보고자 한다면 collect() 메소드를 이용하여 우선 RDD를 driver노드로 보내야한다.
예) rdd.collect().foreach(println)
이것은 드라이버가 수행될때 out of memory가 발생할 수 있는 여지가 있다. 왜나하면 collect()는 전체 RDD의 값을 단일 머신으로 패치하는 일을 하기 때문이다. 만약 RDD의 몇몇 element들만 출력하고자 한다면 더 안전한 접근법인 take()메소드를 이용하자.
예) rdd.take(100).foreach(println)

Working with Key-Value Pairs

대부분의 Spark operation들은 RDD에 포함된 데이터 타입의 객체를 이용하여 수행된다. 몇가지 특정 오퍼레이션은 RDD의 Key-Value쌍에서만 수행이 가능하다. 가장 공통적인 부분은 "shuffle"오퍼레이션이다. grouping 혹은 aggregating 을 키를 이용하여 수행한다.

Python에서는 이러한 operation은 Python에 내장되어 있는 튜플을 포함하는 RDD에서 수행이 된다. 튜플은 (1, 2)의 형식이다. 단순히 튜플들을 생성하고 필요한 오퍼레이션을 호출한다.

예를 들어 다음 코드들은 reduceByKey오퍼레이션을 이용하며 이는 key-value쌍을 이용하여 각 라인이 얼마나 자주 나타나는지 카운트 하는 것이다.
lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
counts.sortByKey()를 이용할 수 있다. 예를 들어 알파벳으로 소트를 수행하고 최종적으로counts.collect()의 결과를 드라이버 프로그램으로 가져올 수 있다.

Transformations

다음 테이블 리스트는 Spark에서 제공하는 몇가지 공통 transformation을 나열한 것이다. RDD API를 참고하여 더욱 상세한 내용을 살펴보자.

TransformationMeaning
map(func)해당 함수를 이용하여 전달된 각 엘리먼트를 처리하고, 형식화된 데이터 셋을 생성한다. 
filter(func)func기 true값을 반환하는 소스 에리먼트들을 선택하여 새로운 형식화된 데이터셋을 생성한다. 
flatMap(func)map과 유사하다. 그러나 입력 아이템은 0 혹은 더 이상이 될 수 있다. (그래서 func는 단일 아이템 대신에 Seq를 반환하는 것이 맞다.)
mapPartitions(func)map과 유사하다. 그러나 각 파티션에서 분산된 RDD 수행된다. func는 반드시 Iterator<T> ==> Iterator<U>와 같이 설정해야한다. 이때에는 타입 T의 RDD에서 수행될때이다. 
mapPartitionsWithIndex(func)mapPartitions와 유사하다. 그러나 func를 지원하며 처리할 파티션의 인덱스 값이 정수 값을 받는다. 그러므로 func는 반드시 Type T의 RDD인경우에는 (int, Iterator<T>) ==> Iterator<U>의 타입이어야한다.
sample(withReplacementfractionseed)데이터의 일부에 대한 샘플 조각, replacement를 하거나 혹은 하지 않은 정보가 될수 있으며 랜덤 넘버를 위한 seed를 이용한다. 
union(otherDataset)소스데이터와 아규먼트로 들어온 데이터 셋을 union한 결과를 반환한다. 
intersection(otherDataset)소스 데이터셋과 아규먼트로 들어온 데이터 셋의 교집합을 구한 결과를 반환한다. 
distinct([numTasks]))소스 데이터셋의 엘리먼트를 중복 제거한 결과를 바환한다. 
groupByKey([numTasks])(K, V) 쌍의 데이터셋에 대해서 호출한다. 이 결과는 (K, Iterable<V>)쌍의 데이터셋을 반환한다.
Note: 만약 aggregation에 대한 작업을 수행에 대한 그루핑을 한다. (합계나, 평균) 에 대한 결과를 구하며 이는 각 키에 대해서 reduceByKey혹은 aggregateByKey를 이용하면 더 낳은 성능을 보여준다.
Note: 기본적으로 병렬 레벨은 부모 RDD의 파티션 수에 의해서 결정이 된다. numTasks아규먼트를 선택적으로 전달할 수 있으며 이 것으로 서로다른 개수의 tasks를 설정할 수 있다. 
reduceByKey(func, [numTasks])(K, V)쌍의 데이터셋에 대해서 호출하며 (K, V)쌍의 데이터셋을 반환한다. 이 것은 각 키에 대한 집계된 데이터를 Value로 하며 이는 주어진 reduce 함수인 func를 이용하여 집계된다. 이 식은 반드시 type(V, V) => V로 되어야 한다. groupByKey와 같이 리듀스 태스크의 수는 선택적으로 두번재 아규먼트를 이용하여 설정이 가능하다. 
aggregateByKey(zeroValue)(seqOpcombOp, [numTasks])(K, V) 쌍의 데이터 셋에 대해서 호출하며, (K, U)쌍의 데이터셋을 반환한다. 각 키에 대해서 주어진 컴바인 함수를 이용하여 집계하며, 기본값이 "zero"값으로 설정된다. 또한 입력된 값의 타입과 집계 결과 타입이 다른 것을 허용한다. 이는 불필요한 할당을 피하도록 한다. groupByKey와 같이 리듀스 태스크의 수는 옵션값으로 전달할 수 있다. 
sortByKey([ascending], [numTasks])(K, V)쌍의 데이터셋에 대해서 호출하며 K값에 따라 정렬된 결과를 반환한다. 결과는 (K, V)의 데이터셋으로 키에 의해서 asc와 desc로 졍렬된다. assending에 대해서 boolean값을 설정할 수 있다.
join(otherDataset, [numTasks])(K, V)와 (K, W) 타입의 데이터셋에 대해서 호출하며, (K, (V, W))쌍의 데이터셋을 반환한다. 이것은 각 키에 대해서 모든 엘리먼트의 쌍을 반환한다. Outer joins는 leftOuterJoin, rightOuterJoin, fullOuterJoin을 통해서 제공된다. 
cogroup(otherDataset, [numTasks])(K, V)와 (K, W) 타입의 데이터 셋에 대해서 호출된다. 이 결과는 (K, (Iterable<V>, Iterable<W>)) 튜플을 반환한다. 이 오퍼레이션은 또한 groupWith로 불린다. 
cartesian(otherDataset)T와 U 타입의 데이터셋에 대해서 호출한다. (T, U)쌍의 데이터셋을 반환한다. (모든 엘리먼트의 쌍을 반환한다.)
pipe(command[envVars])RDD의 각 파티션에 대해서 쉘 커맨드를 통해서 파이프처리한다. 쉘은 Perl 혹은 bash script등이 될수 있을 것이다. RDD엘리먼트들은 프로세스의 stdin으로 쓰기를 하고, 처리된 라인들의 출력을 위해 stdout은 스트링의 RDD로 반환된다. 
coalesce(numPartitions) : 합치기RDD에서 파티션들의 수를 numPartitions까지 감소 시킨다. 이것은 큰 데이터셋을 필터링 다운한 이후에 더 효과적인 처리를 수행하기 위해서 유용하게 작용한다. 
repartition(numPartitions)RDD의 데이터를 랜덤으로 셔플하여 더 적은 혹은 많은 파티션으로  재 파티션 데이터를 생성한다. 그리고 그것들 사이에 밸런스를 맞춘다. 이것은 항상 모든 데이터를 네트워크를 넘어서 셔플하도록 처리한다. 
repartitionAndSortWithinPartitions(partitioner)주어진 파티셔너에 따라 RDD를 재 파티션 한다. 그리고 각 결과 파티션은 그들의 키에 따라 소팅한다. 이것은 repartition 호출에 비해서 더 효과적으로 처리된다. 그리고 각 파티션에 대한 소팅도 된다. 왜냐하면 셔플 머신으로 소팅처리되게 푸시할 수 있기 때문이다. 

Action

다음 테이블 리스트는 공통적인 action을 보여준다.
ActionMeaning
reduce(func)데이터셋의 엘리먼트를 집계한다. 이때 func로 전달된 함수를 이용한다. (이는 보통 2개의 아규먼트를 입력받고 하나를 반환한다.) 함수는 commutative (결과가 동일)하며, associative(결합의)한 결과를 되도록 한다. 이것은 병렬 처리에서 계산을 올바르게 할 수 있다. 
collect()데이터셋의 모든 엘리먼트를 배열로 만들어 드라이버 프로그램에 반환한다. 이것은 보통 필터 혹은 다른 오퍼레이션을 처리한 후에 유용하며, 데이터의 충분히 작은 서브셋을 바환한다..
count()데이터셋의 엘리먼트 개수를 반환한다. 
first()데이터셋의 첫번째 엘리먼트를 반환한다. (take(1)과 유사하다)
take(n)데이터셋의 첫번째 n개의 엘리먼트를 배열로 반환한다. 
takeSample(withReplacement,num, [seed])num으로 들어온 개수만큼 데이터셋의 엘리먼트를 랜덤하게 샘플링한 결과를 배열로 반환한다. 이는 또한 replacement처리를 하고 선택적으로 사전에 지정한 랜덤넘버 생성 seed를 전달할 수 있다. 
takeOrdered(n[ordering])자연적인 순서 그리고 커스텀 비교를 이용하여 RDD의 엘리먼트에서 첫번째 n개의 결과를 반환한다. 
saveAsTextFile(path)데이터셋의 엘리먼트들을 텍스트 파일(텍스트 파일의 셋)으로 주어진 로컬 파일 시스템의 디렉토리에 저장한다. HDFS혹은 특정 다른 Hadoop-supported파일 시스템에 저장가능, 스파크는 각 엘리먼트를 파일에 저장할때 하나의 라인의 스트링으로 변환하기 위해서 toString을 호출한다. 
saveAsSequenceFile(path)
(Java and Scala)
데이터셋의 엘리먼트를 Hadoop SequenceFile에 저장한다. 이는 로컬 파일 시스템의 주어진 패스에 저장하거나,  HDFS혹은 다은 Hadoop서포트 파일 시스템에 저장된다. 이것은 Key-Value쌍의 RDD에서도 가능하며 Hadoop의 writable인터페이스이다. 스칼라에서는 이것은 암묵적으로 변환될 수 있고 쓰기가 될 수 있다. (스파크는 기본 타입으로 변환을 지원한다. Int, Double, String등)
saveAsObjectFile(path)
(Java and Scala)
자바의 serialization을 이용한 단순한 형식으로 데이터셋의 엘리먼트를 저장한다. 이것을 로드하기 위해서는 usingSparkContext.objectFile()를 이용할 수 있다. 
countByKey()(K, V)의 형식의 RDD만 가능하다. 이는 (K, Int)의 해시맵 쌍을 반환한다. 즉, 각 키의 카운트를 반환한다. 
foreach(func)각 데이터셋의 엘리먼트에 대해서 func를 통과하도록 한다. 이것은 보통 Accumulator 업데이팅 혹은 외부 스토리지 시스템과의 상호작용등의 사이드 이펙을 만든다.
Note : Accumulator 대신에 변수들의 변형은 정의되지 않은 행동의 결과를 낼 것이다. Understanding closures를 참조하자. 

Shuffle operations

이벤트에 대해 스파크 트리거와 함께 수행하는 오퍼레이션은 shuffle로 알려져 있다. 셔플은 스파크 메커니즘으로 데이터를 재 분배하고 서로다른 파티션으로 그룹 지어 주는 역할을 한다. 이것은 일반적으로 각 executors들과 machines로 데이터를 카피하는 것을 포함한다. 셔플은 복잡하고, 비용을 수반하는 작업이다.

Background
셔플이 진행되는 동안 어떠한 일이 일어나는지 이해하기 위해서 reduceByKey오퍼레이션의 예를 살펴볼 수 있다. reduceByKey오퍼레이션은 새로운 RDD를 생성한다. 이는 단일 키에 대한 튜플이 컴바인 되는 형태이다. 키와 리듀스 함수를 키에 대해서 모든 값들이 처리된다.

어려운 일은 단일 키에 대한 모든 값이 필수적으로 동일한 파티션이나 동일 머신에 존재하지 않는 것이다. 그러나 그것들은 반드시 결과를 계산하기 위해서 서로 같이 위치해야한다는 것이다.

스파크에서 데이터는 일반적으로 특정 오퍼레이션을 위해서 필요한 위치내에 존재하기 위해서 파티션들 사이에 분산되어 있지 않다. 계산이 수행되는 동안 단일 태스크는 단일 파티션에서 수행될 것이다. 그러므로 단일 리듀스 작업을 위한 reduceByKey를 위해 모든 데이터를 수행하기 위해 조직화 된다. 스파크는 all-to-all오퍼레이션을 필요로 한다. 이것은 모든 키를 위해서 모든 값을 찾는 행위는 모든 파티션에서 읽어야 한다. 그리고 계산을 위해서 파티션들 사이에서 가져와서 최종적으로 각 키에 대해 수행한다. 이것을 shuffle라고 부른다.

새롭게 셔플된 데이터 각 파티션 내의 엘리먼트 집합은 결정적이게 된다. 그리고 이것은 파티션들 그 자체의 정렬이 이루어지며, 이러한 엘리먼트의 졍렬은 아니다. 만약 예상대로 정렬된 데이터를 필요로 하면 다음 셔플을 따른다.
- mapPartitions 는 각 파티션을 이용하여 소트한다. 예) .sorted
- repartitionAndSortWithinParitions 는 동시에 리파티셔닝을 수행하는 동안 파티션의 소트를 효과적으로 수행한다.
- sortBy 는 글로벌적으로 정렬된 RDD를 만든다.

오퍼레이션들은 셔플의 원인이 될수 있다. 이는 repartition오퍼레이션들을 포함하며 repartition이나 coalesce 와 같다. 'ByKey'오퍼레이션들(카운팅은 예외)은 groupByKey그리고 reduceByKey와 같다. 그리고 join오퍼레이션은 cogroup과 join과 같다.
Performance Impact
Shupple은 매우 배싼 오퍼레이션이다. 이것은 disk I/O와 데이터 직렬화 그리고 network I/O를 수반한다.
셔플을 통해 데이터를 조직화 하기 위해서 Spark는 태스크의 셋들을 생성한다. map태스크는 데이터를 조직하고 reduce태스크의 셋들은 집계를 한다. 이 명명법은 MapReduce에서 따왔다. 그리고 직접적으로 Spark의 map과 reduce의 오퍼레이션과 연관되어 있지 않다. 

내부적으로 각 태스크들의 맵으로 부터 결과는 메모리에 유지된다. 그리고 이들은 타켓 파티션에서 소트된 기준으로 존재한다. 그리고 단일 파일에 쓰여진다. 리듀스 사이드에서 태스크는 관련된 소트된 블록을 읽는다. 

특정 셔플 오러페이션들은 매우 많은 양의 힙 메모리를 소비할 수 있다. 이들은 데이터를을 변환 전 혹운 후에 메모리 데이터 구조화를 수행할때 발생한다. 특별히 reduceByKey와 aggregateByKey는 이러한 구조를 맵 사이드에서 생성한다. 그리고 'ByKey'오퍼레이션들은 리듀스 사이드에서 진행된다. 만약 데이터가 메모리에 맞아떨어지지 않는경우 이것은 이러한 테이블을 디스크로 저장한다. 이는 추가적인 I/O오버헤드를 발생시키고 가비지 컬렉션을 증가 시킨다. 

셔플은 또한 매우큰 중간 파일을 디스크로 남긴다. Spark 1.3의 예와 같이 이러한 파일은 상응하는 RDD가 더이상 사용되지 않거나 가비지 컬렉터 되어지 전까지 보존된다. 이것은 만약 lineage가 재 계산되어지면 셔플 파일은 필요하지 않게 된다. 가비지 컬렉션은 오랜 기간의 작업 이후에 아마 발생할 것이다. 만약 어플리케이션이 RDD의 참조를 포함하고 있거나, GC가 자주 발생하지 않을경우가 그렇다. 이 의미는 오래 수행되는 스파크 잡은 아마 큰 용량의 디스크 공간을 차지할 것이다. 임시 스토리지 디렉토리는 thespark.local.dir설정을 통해서 수정할 수 있으며 이를 수행하여 Spark Context를 설정할 수 있다. 

셔플의 행동은 다양한 설정 파일을 수정하는 것에 달려 잇다. 셔플 행동부분 참조, 섹션은  Spark Configuration Guide에 포함된다. 

RDD Persistence

스파크에서 가장 중요한 기능중에 하나는 persisting(caching) 으로 오퍼레이션 간 데이터셋을 메모리에 저장하는 것이다. RDD를 persist할때 각 노드의 특정 파티션에 저장한다. 메모리에서 계산을 하고 데이터 셋 (혹은 데이터셋에서 드라이빙된 데이터셋)상에서 다른 엑션을 수행하기 위해 재 사용된다. 이는 이후 작업을 더욱 빠르게 해준다. (종종 10배 정도 더 빠르다.) Caching은 반복적인 알고리즘을 위한 핵심 툴이며 빠르게 사용이 가능하다. 
RDD는 persist()와 cache()메소드들을 이용하여 저장되도록 할 수 있다. 첫번째는 액션에서 수행되어진다. 이것은 노드들의 메모리에 저장된다. 스파크의 캐시는 fault-tolerant 이다. RDD의 특정 파티션이 유실되면 자동적으로 원본으로 부터 변환되어 다시 재 계산되어진다. 
또한 각 퍼시스트 된 RDD는 서로다른 스토리지 레벨을 이용하여 저장된다. 예를 들어 디스크에 데이터셋을 퍼시스트 하거나, 메모리에 하지만 자바 객체로 시리얼 라이즈 한 내용을 저장할 수 있다. (저장 공간을 위해서)
노드들간에 복제를 수행하거나 혹은 Tachyon내에 힙 외부에 저장될 수 있다. 이러한 레벨들은 StorageLevel객체를 전달하여 설정할수 있으며 persist()메소드에 이용한다. cache()메소드는 빠르게 사용하기 위한 메소드로 기본 스토리지 레벨을 가지고 있다. 이것은 StorageLevel.MEMORY_ONLY(로 메모리네에 역직렬화 되어 저장된다.)
스토리지 레벨은 다음과 같다. 
Storage LevelMeaning
MEMORY_ONLYRDD를 JVM내의 자바 객체로 역 직렬화 하여 저장한다. 만약 RDD가 메모리에 맞지 않을경우 몇몇 파티션은 캐시 되지 않을 것이다 그리고 필요한경우 매번 재 계산된다. 이것은 디폴트 레벨이다. 
MEMORY_AND_DISKRDD를 JVM내의 자바 객체로 역 직렬화 하여 저장한다. 만약 RDD가 메모리에 맞지 않을경우 파티션의 디스크에 저장하며 필요한경우 읽어 들인다. 
MEMORY_ONLY_SERRDD를 자바 객체의 직렬화된 객체로 저장한다. (각 파티션의 바이트 어레이로 저장된다.) 이것은 일반적으로 역직렬화된 객체에 비해서 공간을 더욱 절약할 수 있다. 특별히 fast serializer를 이용할때 더욱 그러하며, 읽기 작업시 CPU를 더 많이 쓴다. )
MEMORY_AND_DISK_SERMEMORY_ONLY_SER 와 유사하다. 그러나 메모리가 적당하지 않을때 파티션에 분산해서 저장한다. 이것은 매번 필요한경우 재계산을 하지 않기 위해 이런 작업을 한다. 
DISK_ONLYRDD파티션을 디스크로 저장한다. 
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.상단 레벨과 동일하다. 그러나 각 파티션을 2개의 노드에 복제본을 만든다. 
OFF_HEAP (experimental)
실험적인 레벨
RDD를 직렬화 하여 Tachyon에 저장한다. 이는 MEMORY_ONLY_SER, OFF_HEAP 과 비교하여 가비지 컬렉션 오버헤드가 줄어 들었다. 그리고 executors가 더 작고 메모리 풀을 공유하도록 해준다. 이는 큰 힙을 가진 환경이나, 멀티플 컨커런트 어플레케이션에 매력적인 모델이다.
나아가 Tachyon내부에 존재하는 RDD는 executor의 크래쉬가 캐시 메모리의 데이터를 유실하도록 하지 않는다. 이 모드는 Tachyon내 메모리는 버릴 수 있다. 그러므로 Tachyon은 블록을 재구성하는 것을 시도하지 않으며, 메모리로 부터 제거된다. 만약 Tachyon을 off heap 스토어로 사용할 계획이라면 사프크는 Tachyon out-of-the-box와 함께 호환이 된다. 다음 페이지를 참조하여 필요한 버젼을 확인해보라. http://tachyon-project.org/master/Running-Spark-on-Tachyon.html
Note: Python에서 저장된 객체는 항상 Pickle라이브러리를 이용하여 시리얼라이즈 된다. 그래서 어떤 시리얼 라이즈 레벨을 선택하든 상관이 없다. 
스파크는 또한 자동적으로 셔플 오퍼레이션에서 중간 데이터를 persists한다. (예, reduceByKey) 이것은 persist를 호출하지 않더라도 수행된다. 이는 셔플이 실패하는경우 전에 입력을 재 처리하지 않도록 한다. 우리는 여전히 재사용하고자 하는 경우라면 persist를 호출하는 것을 추천한다. 

Which Storage Level to Choose?

스파크의 스토리지 레벨은 메모리 사용과 CPU효율 사이에 트리이드 오프에 대한 차이로 인해 무엇을 쓸지 결정된다.
다음 선택 기준을 추천한다. 
  • RDD가 기본 스토리지 레벨로 적합하다면 (MEMORY_ONLY) 그대로 두라. 이것은 가장 CPU효율적인 옵션이다. 이는 가능하면 RDD에 대한 오퍼레이션을 빠르게 해준다. 
  • 그렇지 않다면 MEMORY_ONLY_SER과 selecting a fast serialization library를 이용해보라. 이것은 공간 효율을 더 좋게 한다. 그러나 여전히 접근빠른 접근을 유지한다. 
  • 계산된 데이터셋들이 비싸거나 필터가 매우 큰 데이터에 대해 수행되는 것과 같은 기능이 아니라면 디스크로 분산하지 마라, 그외의 경우는 디스크로 부터 가능하면 빠르게 수행될 수 있도록 파티션을 재 처리하라. 
  • 빠른 fault recovery를 원한다면 replicated storage 레벨을 이용하자. (만약 스파크를 이용하여 웹 어플리케이션으로 부터 요청을 처리한다면). 모든 스토리지 레벨은 유실된 데이터를 재 계산하는 것으로 full fault tolerance를 제공한다. 그러나 복제된 것은 분시될 파티션의 재 계산을 기다리지 않고 RDD상에서 태스크를 계속해서 수행할 수 있도록 해준다. 
  • 높은 모메리 양의 환경에서 혹은 복수 어플리케이션에서는 실험적인 OOF_HEAP 모드가 몇가지 이점이 있다. 
    • Tachyon에서 동일한 메모리 폴을 공유하여 복수개의 executors가 공유하도록 해준다. 
    • GC코스트를 극적으로 줄여준다. 
    • 긱 execitprs의 크래시가 나는경우라도 캐시된 데이터는 유실되지 않는다. 

Removing Data

스파크는 자동적으로 각 노드의 캐시 사용을 모니터 하고 LRU알고리즘을 통해서 오래된 파티션은 제거시킨다. 만약 수동으로 RDD를 먼저 제거하고자 한다면 RDD.unpresist()메소드를 이용하면 된다. 

Shared Variables

보통 함수가 Spark오퍼레이션에 전달되면 (map혹은 리듀스) 원격지의 클러스터 노드에서 수행된다. 이것은 함수내에 사용되는 모든 변수들의 복제본에서 수행된ㄷ.ㅏ 이러한 변수들은 각 머신으로 복제 되며, 원격 머신에서 변수의 업데이트 없이 드라이버 프로그램으로 전파된다. 일반적인 지원은 태스크들 사이에 읽기-쓰기 공유 변수들로 비 효율적이다. 그러나 스파크는 shared variables에 대해서 2가지 사용 패턴에 대해서 2가지의 제약된 타입을 제공한다.
broadcast variables와 accumulators이다. 

Broadcast Variables

Broadcast변수는 프로그래머가 읽기 전용의 변수값을 각 머신에 캐시할 수 있도록 해준다. 이는 실행때 해당 변수를 실도록 복제하는 것 대신에 수행된다. 이들은 사음과 같이 사용될 수 있다. 각 노드에 효과적인 처리를 위해서 큰 입력 데이터셋의 복제본을 제공하는 것이다. 또한 스파크는 broadcast 변슈들을 효과적인 브로드 캐스트 알고리즘을 이용하여 커뮤니케이션의 비용을 줄이는 방향으로 분배를 시도한다. 
스파크 액션들은 stages의 집합을 통해서 수행된다. 이는 shuffle 오퍼레이션을 통해서 분산되어 수행된다. 스파크는 자동적으로 브로드캐스트를 통해 각 태스크들이 각 스테이지마다 필요한 공통 데이터를 분배한다. 데이터 브로드캐스트는 각 태스크가 실행되기 이전에 시리얼라이즈된 형태의 캐시된 데이터를 역 시리얼라이즈 한다. 이 의미는 명시적으로 브로드캐스트 변수들을 생성하는 것은 동일한 데이터에 대해서 복수개의 스테이지를 통해서 태스크가 수행될때만 유용하거나 혹은 역질력화된 데이터 형태로 캐시될때 중요하다. 
브로드캐스트 변수들은 변수 v로 부터 SparkContext.broadcast(v)를 이용하여 생성된다. 브로드캐스트 변수는 v를 감싸고 있으며, 변수는 value메소드를 통해서 접근이 가능하다. 다음 코드를 확인해보자.

>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>
>>> broadcastVar.value
[1, 2, 3]

broadcast variable가 생성된 이후에 변수 v대신에 클러스터의 특정 함수에서 사용되어진다. 그래서 v는 한번 이상 적재 되지 않는다. 게다가 객체 v는 모든 노드가 브로드캐스트된 변수를 공유하기 때문에 한번 브로드캐스드 된 이후에는 변하지 않는 것을 보장한다. 

Accumulators

Accumulators는 변수들이며 이는 연관된 오퍼레이션들을 통해서 추가된다. 그리고 병렬로 효과적으로 제공된다. 그것들은 카운터를 구현하여 사용될 수 있다. (MapReduce) 에서 혹은 sum으로 이용된다.
스파크는 자연적으로 숫자형 타입의 accumulators를 제공하며 프로그래머는 새로운 타입을 제공할 수 있다. 만약 accumulators가 이름과 함께 생성이 되면 이는 Spark UI에서 나타난다. 이것은 현재 수행되고 있는 스테이지의 처리상태를 이해하는데 좋다. (노트 : Python에서는 제공되지 않는다.)

accumulator는 SparkContext.accumulator(v)를 호출하여 변수 v를 초기화 할 수 있다. 클러스터에서 수행되는 태스크들은 add메소드 혹은 +=오퍼레이터를 이용하여 추가될 수 있다.(스칼라나, 파이션) 그러나 값을 읽을 수 없다. 오직 드라이버 프로그램만이 accumulators의 값을 읽고 value메소드를 이용할 수 있다. 

아래 코드는 accumulator를 배열의 엘리먼트에 추가하는 예를 보여주고 있다. 
class VectorAccumulatorParam(AccumulatorParam) :
    def zero(self, initialValue) :
        return Vector.zeros(initialValue.size)
    def addInPlace(self, v1, v2) :
        v1 += v2
        return v1
# Then, create an Accumulator of this type :
    vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())
accumulator업데이트는 오직 action내부에서만 수행된다. 스파크는 이러한 태스크들의 업데이트들이 단 한번만 이루어 지는것을 보장한다. 태스크를 재 시작할경우 value값을 업데이트 하지 않을 것이다. transformation에서 사용자들은 각 태스크들의 업데이트를 수행할때 주의해야한다. 만약 태스크나 잡 스테이지가 재 실행되는 경우에는 한번 이상 이루어질 수 있다. 
Accumulators는 스파크의 lazy evaluation 모델을 변경하지 않는다. 만약 RDD오퍼레이션 상에서 수정이 되었다면 그 값은 오직 action의 파트에서 단 한번만 계슨으로 인해서 업데이트 된 것이다.
결과적으로 accumulator업데이트는 lazy transformation 을 수행할때에는 실행에 대한 보장을 하지 않는다. 아래 에제 코드는 이러한 속성을 잘 보여준다.

accum = sc.accumulator(0)
def g(x) :
    accum.add(x)
    return f(x)
data.map(g)
# Here, accum is still 0 because no actions have caused the 'map' to be computed.
Deploying to a Cluster

application submission guide에서는 어떻게 클러스터에 어플리케이션을 submit하는지 보여준다. 간단하게 어플리케이션을 우선 JAR(Java / Scala)로 패키징하거나, .py혹은 .zip파일(파이선을 위해서) 묶고 나면 bin/spark-submit 스크립트에 전달하여 클러스터 매니저를 수행할 수 있다.

Launching Spark jobs from Java / Scala
org.apacke.spark.launcher 패키지는 스파크 잡들을 자식 프로세스들에 런칭하기 위한 클래스를 제공한다. 이는 단순히 Java API를 이용하고 있다. 

Unit Testing

스파크는 유닛테스트에 프렌들리하게 인기있는 유니세 테스트 프레임워크를 제공한다. 단순하게 SparkContext를 생성하고 마스터 URL을 local로 세팅하고 오퍼레이션을 수행한다. 그리고 SparkContext.stop()를 tear down으로 수행하면 된다. finally블록에서 컨텍스트를 정지하거나 tearDown메소드를 수행하여 마지막 작업을 할 수 있다. 스파크는 2개의 컨텍스트를 동시에 하나의 프로그램에 수행 할 수 없다.

Migrating from pre-1.0 Versions of Spark

스파크 1.0dms 1.X시리즈의 스파크 코어의 API를 고정하고 있다. 어떤 API이든 현재 가능하며, "experimental"혹은 "developer API"로 마크 되어 있지 않고, 앞으로 버젼에는 제공될 것이다. Python사용자를 위해서 변경된 것은 그루핑 오퍼레이션으로  groupByKey, cogroup그리고 join이 그런것이며 (key, list of variables)쌍을 반환하던 것을 (key, iterable of values)로 변경되었다. 

Where to Go from Here

스파크 웹사이트에 있는 몇가지 예제 프로그램을 살펴보자. 추가적으로 스파크는 몇가지 examples디렉토리에 샘플 예제를 포함하고 있다. 이는 다음 명령어를 통해서 실행할 수 있다.
./bin/run-example SparkPi
파이썬의 예제의 경우 spark-submit를 이용하여 실행할 수 있다.
./bin/spark-submit examples/src/main/python/pi.py 
R의 예제는 다음과 같이 실행할 수 있다.
./bin/spark-submit examples/src/main/r/dataframe.R 
프로그램의 최적화를 위해서는 configurationtuning가이드에서 실전 예제를 통해서 제공하고 있다.
이는 메모리에 저장되는 데이터의 효과적인 포맷에 대한 가이드를 제공해준다. 분산된 오퍼레이션과 클러스터 메니저에 대한 컴포넌트의 설명은 cluster mode overview의 예제를 살펴보자.
마지낙으로 전체 API도큐먼트는 ScalaJavaPython and R 에서 살펴보자. 

Share this

Related Posts

Previous
Next Post »