[Spark] Apache Spark Examples

[Spark] Apache Spark Examples

Apache Spark Examples

from : http://spark.apache.org/examples.html

이 예제들은 Spark API의 전체를 간략하게 보여준다. Spark는 분산 데이터셋의 개념으로 만들어 져있다. 이는 Java, Python 객체들을 포함하고 있다. 외부 데이터로 부터 데이터셋을 생성하고 병렬로 그것들을 처리할 수 있다. Spark API의 building block 을 확인해보자. RDD API에는 2가지 타입의 오퍼레이션을 제공하고 있다. transformation으로 이전것으로 부터 새로운 데이터셋을 만들어 내는 작업을 한다. 다음으로 actions로 클러스터 상에서 작업을 시작하는 것이다. Spark의 RDD API의 최상위에는 고수준의 API를 제공하며 DataFrame API와 Machine Learning API가 그것이다. 이 고수준 API는 특정 데이터 오퍼레이션에 대한 간단한 방법으로 작업을 수행할 수 있도록 제공한다. 이 페이지에서는 RDD 고수준 API를 이용한 간단한 예제를 살펴볼 것이다.

RDD API Examples

Word Count

이 예제는 몇가지 transformation으로 데이터셋을 만들고 난 후, (String, Int)쌍의 데이터셋을 만들어 내는 작업을 한다. 이는 count라고 부른다. 그리고 다음으로 결과를 파일로 저장한다.

1. Python 버젼 : 


text_file = sc.textFile("hdfs://...")
counts = text_file.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b : a + b)
counts.saveAsTextFile("hdfs://...")

2. Scala 버젼 : 

val textFile = sc.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
        .map(word => (word, 1))
        .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

3. Java 버젼 : 

JavaRDD<String> textFile = sc.textFile("hdfs://...");
JavaRDD<String> words = textFile.flatMap(new FlatMapFunction<String, String>() {
    public Iterable<String> call(String s) { return Arrays.asList(s.split(" ")); }
});
JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
    public Tuple2<String, Integer> call(String s) { return new Tuple2<String, String>(s, 1); }
});
JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>(){
    public Integer call(Integer a, Integer b) {return a + b;}
});
counts.saveAsTextFile("hdfs://...");

Pi Estimation

Spark는 계산에 집중적인 태스크들에 사용된다. 이 코드는 원에 "throwing darts"를 수행해서 파이를 계산하는 예제이다. 우리는 ((0, 0) to (1, 1)) 사각 정방영역에 랜덤 포인트를 찍고, 얼마나 많이 단위 단위 원 내에 들어가는지 검사하고자 한다. fraction은 Pi / 4이며, 우리는 이 공식을 우리의 계산에 이용할 것이다. 

1. Python 버전 : 

def sample(p) :
    x, y = random(), random()
    return 1 if x*x + y*y < 1 else 0
count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample) \
        .reduce(lambda a, b: a + b)
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)

2. Scala 버젼 : 

val count = sc.parallelize(1 to NUM_SAMPLES).map{ i =>
    val x = Math.random()
    val y = Math.random()
    if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / NUM_SAMPLES)

3. Java 버젼 :

List<Integer> l = new ArrayList<Integer>(NUM_SAMPLES);
for (int i = 0; i < NUM_SAMPLES; i++) {
    l.add(i);
}
long count = sc.parallelize(l).filter(new Function<Integer, Boolean>() {
    public Boolean call(Integer i) {
        double x = Math.random();
        double y = Math.random();
        return x * x + y * y < 1;
    }
}).count();
System.out.println("Pi is roughly " + 4.0 * count / NUM_SAMPLES);

DataFrame API Examples

Spark에서 DataFrame는 이름으로 된 칼럼 조합의 분산 데이터 집합이다. 사용자는 DataFrame API를 이용하여 외부 데이터 소스와 스파크의 내장 분산 컬렉션에 대해서 다양한 관계 연산을 수행할 수 있다. 이때 데이터 처리를 위한 별도의 프로시저 없이 이러한 연산이 가능하다. 또한 DataFrame API기반의 프로그램은 자동적으로 스파크 내장 옵티마이저에 의해서 최적화 된다. 

Text Search

이 예제에서는 로그파일에서 에러 메시지를 찾는 예제이다. 

1. Python 버젼 

textFile = sc.textFile("hdfs://...")
# Creates a DataFrame having a single column named "line"
df = textFile.map(lambda r : Row(r)).toDF(["line"])
errors = df.filter(col("line").like("%ERROR%"))
# Counts all the errors
errors.count()
# Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count()
# Fetches the MySQL errors as an array of strings
errors.filter(col("line").like("%MySQL%")).count()

2. Scala 버젼 

val textFile = sc.textFile("hdfs://...")
// Creates a DataFrame having a single column named "line"
val df = textFile.toDF("line")
val errors = df.filter(col("line").like("%ERROR%"))
// Counts all the errors
errors.count()
// Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count()
// Fetches the MySQL errors as an array of Strings
errors.filter(col("line").like("%MySQL%")).collect()

3. Java 버젼

// Creates a DataFrame having a single column named "line"
JavaRDD<String> textFile = sc.textFile("hdfs://...");
JavaRDD<Row> rowRDD = textFile.map(
  new Function<String, Row>() {
    public Row call(String line) throws Exception {
      return RowFactory.create(line);
    }
  });
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("line", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
DataFrame errors = df.filter(col("line").like("%ERROR%"));
// Counts all the errors
errors.count();
// Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count();
// Fetches the MySQL errors as an array of strings
errors.filter(col("line").like("%MySQL%")).collect();

Simple Data Operations

이번 예제는 데이터베이스에 있는 테이블을 읽고 각 나이별 사람의 수를 계산하는 예제이다. 최종적으로 우리는 계산된 결과를 JSON타입으로 S3에 저장한다. 단순한 MySQL테이블 "people"에는 "name"과 "age" 2개의 칼럼이 있다. 

1. Python 버젼

# Creates a DataFrame based on a table named "people"
# stored in a MySQL database
url = \
    "jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"
df = sqlContext \
    .read \
    .format("jdbc") \
    .option("url", url) \
    .option("dbtable", "people") \
    .load()
# Looks the schema of this DataFrame
df.printSchema()
# Counts people by age
countsByAge = df.groupBy("age").count()
countsByAge.show()
# Save countsByAge to S3 in the JSON format.
countsByAge.write.format("json").save("s3a://...")

2. Scala 버젼

// Creates a DataFrame based on a table named "people"
// stored in a MySQL database.
val url =
  "jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"
val df = sqlContext
  .read
  .format("jdbc")
  .option("url", url)
  .option("dbtable", "people")
  .load()
// Looks the schema of this DataFrame.
df.printSchema()
// Counts people by age
val countsByAge = df.groupBy("age").count()
countsByAge.show()
// Saves countsByAge to S3 in the JSON format.
countsByAge.write.format("json").save("s3a://...")

3. Java 버젼 

// Creates a DataFrame based on a table named "people"
// stored in a MySQL database.
String url =
  "jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword";
DataFrame df = sqlContext
  .read()
  .format("jdbc")
  .option("url", url)
  .option("dbtable", "people")
  .load();
// Looks the schema of this DataFrame.
df.printSchema();
// Counts people by age
DataFrame countsByAge = df.groupBy("age").count();
countsByAge.show();
// Saves countsByAge to S3 in the JSON format.
countsByAge.write().format("json").save("s3a://...");

Machine Learning Example

MLib는 Spark 머신러닝 라이브러리이다. 이는 많은 ML알고리즘을 제공해주고 있다. 이 알고리즘들은 extraction, classification, regression, clustering, recommendation, 등을 제공한다. MLlib 는 또한 워크플로우 구성을 위한 파이프라인을 제공하며, 튜닝 파라미터를 위한 CrossValidator, 그리고 모델의 저장 및 로드를 위한 퍼시스턴스 모델을 제공한다.

Prediction with Learning Example

이 예제에서는 labels과 feature 벡터들의 데이터셋을 획득하고, feature 벡터들에서 부터 labels를 예측하는 학습 수행할 것이다. 이는 Logistic Regression 알고리즘을 이용한다. 

1. Python 버젼

# Every record of this DataFrame contains the label and
# features represented by a vector.
df = sqlContext.createDataFrame(data, ["label", "features"])
# Set parameters for the algorithm.
# Here, we limit the number of iterations to 10.
lr = LogisticRegression(maxIter=10)
# Fit the model to the data.
model = lr.fit(df)
# Given a dataset, predict each point's label, and show the results.
model.transform(df).show()

2. Scala 버젼

// Every record of this DataFrame contains the label and
// features represented by a vector.
val df = sqlContext.createDataFrame(data).toDF("label", "features")
// Set parameters for the algorithm.
// Here, we limit the number of iterations to 10.
val lr = new LogisticRegression().setMaxIter(10)
// Fit the model to the data.
val model = lr.fit(df)
// Inspect the model: get the feature weights.
val weights = model.weights
// Given a dataset, predict each point's label, and show the results.
model.transform(df).show()

3. Java 버젼

// Every record of this DataFrame contains the label and
// features represented by a vector.
StructType schema = new StructType(new StructField[]{
  new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
DataFrame df = jsql.createDataFrame(data, schema);
// Set parameters for the algorithm.
// Here, we limit the number of iterations to 10.
LogisticRegression lr = new LogisticRegression().setMaxIter(10);
// Fit the model to the data.
LogisticRegressionModel model = lr.fit(df);
// Inspect the model: get the feature weights.
Vector weights = model.weights();
// Given a dataset, predict each point's label, and show the results.
model.transform(df).show();







[Spark] Programming-guide

[Spark] Programming-guide

Spark Programming Guide

from : https://spark.apache.org/docs/latest/programming-guide.html

개요 : 

각 스파크 어플리케이션은 main함수인 driver program과 각 클러스터에서 다양한 병렬 오퍼레이션으로 구성되어 실행된다.

스파크의 메인 추상화는 resilient distributed dataset(RDD)이다. 이것은 클러스터의 노드들간에  파티션된 엘리먼트의 컬렉션들이며 이들은 병렬로 처리되어진다.
RDD는 HDFS 에서 생성되어지며 (혹은 다른 하둡 지원의 파일 시스템) 혹은 드라이버 프로그램 내에서 스칼라 컬렉션으로 존재하는 데이터이며, 이들은 transforming되기도 한다.
사용자는 RDD를 메모리에 persist하도록 요청할 수 있으며, 병렬 처리를 위해 상호간에 효과적으로 재 사용 할 수 있도록 해준다.
마지막으로 RDD는 노드가 실패하는 경우 자동으로 복구 해준다.

두번째 추상화는 병렬 처리에서 사용될 수 있는 shared variables이다. 기본적으로 스파크가 서로다른 노드에서 태스크의 집합을 병렬로 수행할때 각 태스크의 함수에서 사용되는 각 변수들을 복제하여 옮긴다.
가끔 변수들은 태스크들 끼리 공유되거나 태스크와 드라이버 프로그램 사이에 공유된다.
스파크는 2개의 공유 변수를 제공한다.
broadcast variables : 모든 노드의 메모리에 캐시되어 사용될 수 있다.
accumulators : 이 변수는 counters와 sums와 같은 작업을 위해서 추가되는 변수이다.

# 참고 : 이문서는 python만을 정리하였다.
bin/pyspark 
상기 스크립트를 실행해보자.

Linking with Spark

Spark 1.6.1 은 Python 2.6 + 와 Python 3.4+ 에서 동작한다.
또한 CPython 인터프리터를 이용할 수 있다. NumPy와 같은 C라이브러리가 사용될 수 있다. 또한 이는 PyPy 2.3+ 버젼과 호환된다.

Spark어플리케이션을 Python에서 실행하기 위해서는 bin/spark-submit 스크립트를 이용할 수 있다.
이 스크립트는 Spark의 Java/Scala라이브러리를 로드할 것이다. 그리고 클러스터에 어플리케이션을 submit하게 해준다.
또한 bin/Pyspark 스크립트는 대화영 Python shell이다.

만약 HDFS데이터를 이용하고자 한다면, PySpark linking 빌드를 HDFS버젼과 연동 해야한다.
이는 Prebuilt packages에서 해당하는 HDFS 버젼에 맞게 설치하면 된다.

마지막으로 다음과 같이 프로그램에 Spark 클래스를 import할 필요가 있다.
from pyspark import SparkContext, SparkConf
PySpark 는 Python의 마이너 버젼과 동일한 버젼을 필요로 한다.
기본 파이선 버젼을 PATH에 걸어주는 작업이 필요하다. 특정 버젼을 설정하고자 한다면 PYSPARK_PYTHON의 값을 다음과 같이 설정하자.
$ PYSPARK_PYTHON=python3.4 bin/pyspark
$ PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python.pi.py

Initializing Spark

스파크 프로그램의 시작은 SparkContext를 생성하는 것으로 시작한다. 이것은 Spark에게 어떻게 클러스터에 접근할지를 알려준다.
SparkContext를 생성하기 위해서는 어플리케이션에 대한 정보를 포함하는 SparkConf를 빌드 해야한다.
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
appName은 어플리케이션의 이름으로 cluster UI에 나타난다. master는 Spark, Mesos혹은 YARN클러스터의 URL이거나 혹은 "local"(로컬모드 지원)이 올 수 있다.
실제에서는 클러스터에서 수행할때 프로그램에서 master를 하드코딩 하지 않을 것이다. 그러나 대신에 spark-submit으로 어플리케이션을 런치 하고, 결과를 수신 받을 것이다. 그러나 로컬 테스트와 유닛 테스트를 위해서는 "local"을 전달해서 스파크를 실행할 수 있다.

Using the Shell

PySpark쉘은 SparkContext가 이미 생성되어 있는 인터프리터이다. 이 변수는 sc라고 불린다.
자신의 SparkContext를 만들면 동작하지 않는다. 또한 master 아규먼트를 이용하여 마스터 컨텍스트 커넥션을 설정할 수 있다. 그리고 Python에 .zip, .egg, .py파일을 --py-files에 콤마로 나누어진 리스트를 전달할 수 있다.
또한 의존성을 쉘 세션에 전달할 수 있다. 이는 메이븐과 연동될 리스트를 콤마로 분리된 값을 전달하면 가능하며 --packages아규먼트에 실어서 보내면 된다. 만약 의존성에 필요한 추가적인 repository가 필요한경우라면 --repositories 아규먼트에 기술한다. 그리고 python에 필요한 스파크 의존성은 pip를 이용하여 필요한 의존성을 추가하자. 예를 들면 다음과 같이 할수 있다.

$./bin/pyspark --master local[4]
혹은 code.py 를 추가할 수있다.
$./bin/pyspark --master local[4] --py-files code.py
전체 옵션 항목을 확인해보기를 원한다면 pyspark --help 를 입력하자.
pyspark 는 더 많은 일반 spark-submit script를 호출한다.

IPython에서  PySpark 쉘을 런칭하는 것도가능하다. 이는 Python 인터럽터기능을 향상 시킨다. PySpark는 IPython 1.0.0과  이후 버젼에 호환한다. IPython을 사용하는 것은 PYSPARK_DRIVER_PYTHON 변수에 ipython 값을 지정하면된다.
$PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark
또한 ipython커맨드를 PYSPARK_DRIVER_PYTHON_OPTS 를 설정하여 커스터마이징 할 수 있다. 예를 들어 IPython Notebook 을 런칭할때 PyLab plot를 지원하도록 할 수 있다.
$PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook" ./bin/pyspark
IPython Notebook 서버가 런칭 된 이후에 "Python 2" 노트북을 "Files"탭으로부터 생성할 수 있다. notebook 내에서 %pylab inlines 커맨드를 IPython 노트북에서 Spark를 실행하기 전에 입력할 수 있다.

Resilient Distributed Datasets(RDDs)

스파크는 RDD(Resilient Distributed Datasets)의 개념을 바탕으로 수행된다. 이것은 엘리먼트들의 컬렉션으로 fault-talerant 한 컬렉션이다. 이것은 병렬로 수행될 수 있다.
RDD를 생성하기 위한 2가지 방법으로 드라이버 프로그램 내부에 존재하는 컬렉션을 parallelizing을 통해서 생성하는 것과 외부 저장 시스템에서 데이터 셋 혹은 공유 파일시스템, HDFS, HBase, 하둡 인풋 형식을 따르는 다양한 데이터 소스등을 참조하는 방법으로 생성이 가능하다.

Parallelized Collections

Parallelized Collections는 SparkContext의 parallelize메소드를 통해서 생성된다. 이것은 드라이버 프로그램 내부에 존재하는 반복가능한 데이터나 컬렉션을 이용한다. 컬렉션 엘리먼트들은 분산된 데이터 셋으로 부터 복제되어 병렬로 수행될 수 있다. 예를 들어 다음은 숫자 1 에서 5까지 값을 가지고 있는 컬렉션을 생성하는 것이다.

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
일단 생성이 되면 분산된 데이터셋(distData)는 병렬로 수행이 가능하다. 예를 들어 우리는 distData.reduce(lambda a, b: a + b)를 호출하여 각 엘리먼트의 리스트를 더할 수 있다.

중요한 파라미터중에 하나는 데이터를 잘라낼 파티션의 개수를 지정하는 것이다. 스파크는 각 클러스터의 파티션마다 하나의 태스크를 수행하게 된다. 일반적으로 CPU당 2 - 4개의 파티션을 생성한다. 보통 스파크는 클러스터에 따라 자동으로 파티션을 설정하는 작업을 시도한다. 그러나 수동으로 두번째 파라미터로 전달이 가능하다.

예)
sc.parallelize(data, 10)

External Datasets

PySpark는 Hadoop에 의해서 분산저장되어 있는 데이터셋을 생성할 수 있다. 로컬파일 시스템이나, HDFS, Cassandra, HBase, Amazon S3등으로 생성이 가능하다. Spark는 text 파일, SequenceFile, 그리고 다른 하둡 입력 포맷등을 지원한다.

Text 파일 RDD는 SparkContext의 textFile 메소드를 이용하여 생성이 가능하다. 이 메소드는 파일의 URI(hdfs://, s3n://, etc URI)를 획득한다. 그리고 라인의 컬렉션을 읽어 들인다.

>>> distFile = sc.textFile("data.txt")
일단 생성이 되면 dataFile는 dataset오퍼레이션에 의해서 동작된다. 예를 들어 우리는 모든 라인의 길이를 더하는 작업을 할 것이다. 이때 map과 reduce를 이용하는 예는 다음과 같다.
distFile.map(lambda s : len(s)).reduce(lambda a, b : a + b)
스파크에서 파일을 읽는 몇가지 주의 사항

1. 로컬 파일 시스템에서 패스를 이용하는 경우 파일은 반드시 워커 노드에서 동일하게 접근이 가능해야한다, 각 파일 복제본은 모든 워커들 혹은 네트워크 마운트로 공유된 파일 시스템으로 복제 된다.

2. 파일 기반의 입력 메소드는 모두 textFile를 포함한다. 디렉토리, 압축파일, 그리고 와일드카드 모두 가능하다. 예를 들면 다음과 같다.
textFile("/my/directory")
textFile("/my/directory/*.txt")
textFile("/my/directory/*.gz")
3. textFile메소드는 또한 추가적인 아규먼트를 가지며 이는 파일의 파티션 넘버를 지정하는 것이다. 기본적으로 스파크는 파일의 각 블록마다 하나의 파티션을 지정한다. (한 블록은 HDFS에서 기본적으로 64MB이다.) 더 큰 값을 지정할 수 있다. 그러나 주의할 점은 블록보다 더 적은 파티션을 지정하면 안된다.

텍스트 파일로 부터 스파크 파이선 API는 몇가지 데이터 포맷도 지원하고 있다.
1. SparkContext.wholeTextFiles
- 복수개의 작은 텍스트 파일들을 포함하는 디렉토리를 읽도록 한다. 그리고 그것들의 반환한다. (filename, context)쌍을 반환한다. 이것은 textFile과는 대조적으로 각 파일에서 각 라인은 하나의 레코드를 반환하게 된다.

2. RDD.saveAsPickleFile, SparkContext.pickleFile
- 파이선 객체의 피클로 구성된 단순 포맷으로 RDD를 저장한다. 이는 배치로 pickle serialization을 이용하며 기본 배치 크기는 10이다.

3. SequenceFile 그리고 Hadoop Input/Output 포맷

주의 : 이 기능은 현재 Experimental(실험용)로 마크 되어 있다. 그리고 advanced user을 대상으로 제공한다. 앞으로는 아마도 SparkSQL을 기준으로 read/write를 지원하게 될것이다. 앞으로는 Spark SQL을 선호되어질 것이다.

Writable Support
PySpark SequenceFile는 key-value 쌍의 RDD를 로드한다. 이는 또한 Java 타입으로 쓰기 가능한 형태로 변환되며 pickle들은 Java객체로 Pyrolite를 이용하여 변환된다. RDD의 key-value쌍을 SequenceFile로 저장할때에는 PySpark는 역으로 동작한다. Python object는 unpickle를 통해서 Java객체로 변환되고, 이 값은 writable 하도록 변환이 된다. 다음은 상호 자동 변환을 보여주는 표이다.
Writable TypePython Type
Textunicode str
IntWritableint
FloatWritablefloat
DoubleWritablefloat
BooleanWritablebool
BytesWritablebytearray
NullWritableNone
MapWritabledict
Arrays는 처리되지 않는다. 사용자는 특정 커스텀 ArrayWritable 서브 타입을 필요로 하게 된다. 쓰기를 할때 사용자들은 특정 커스텀 컨버터들을 필요로 하며 이는 arrays를 커스텀 ArrayWritable서브 타입으로 변환한다. 읽기시에 기본 컨버터는 커스텀 ArrayWritable서브타입을 Java객체 Object[]로 변환한다. 이 것은 pickle된 값을 Python tuples로 변환한다. Python array.array에서 프리미티브 타입의 배열을 획득하는 경우에도 사용자는 custom converter가 필요하다.

Saving and Loading SequenceFiles

텍스트파일과 마찬가지로 SequenceFiles는 특정 경로에 저장 및 로드할 수 있다. 키와 값의 클래스들이 지정될 수 있다. 그러나 표준 writables들은 필요하지 않다.

>>> rdd = sc.parallelize(range(1, 4)).map(lambda x : (x, "a" * x))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]
Saving and Loading Other Hadoop Input/Output Formats

PySpark는 또한 다른 Hadoop InputFormat으로부터 읽거나, Hadoop OutputFormat으로 부터 쓰기를 할 수 있다. new와 old하둡의 MapReduce API들 둘다 가능하다. 만약 필요한경우에 Hadoop설정은 Python dict에 전달될 수 있다. 다음 예제는 Elasticsearch ESInputFormat를 이용한 예제이다.
$ SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar ./bin/pyspark
>>> conf = {"es.resource" : "index/type"}  #참고 elasticsearch가 로컬에 수행되고 있다고 가정
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", \
            "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
>>> rdd.first() # 결과는 MapWritable이며 이는 Python dict로 변환된다.
(u'Elasticsearch ID',
    {u'field1' : True,
      u'field2' : u'Some Text',
      u'field3' : 12345} )
만약 InputFormat가 단순히 Hadoop 설정 input path에 의존되거나, key, value클래스가 쉽게 상단 테이블로 변환되어진다면, 해당 케이스에서 잘 동작 할 것이다.

만약 커스텀 serialized 바이너리 데이터를 가지고 있다면 (Cassandra / HBase로 부터 읽어들인 데이터같은..), 우선 데이터를 Scala/Java 진영의 객체로 변환될 필요가 있다. 이러한 작업은 Pyrolite의 pickler에서 처리 되도록 해준다. Converter 의 특징은 이것을 위해 제공된다. 단순히 이 특징과 convert메소드에서 변환된 코드들로 확장이 가능하다. 기억할 것은 이 클래스는 InputFormat로 접근할때 어따한 의존성이 필요하지 않아야 한다. 그리고 PySpark의 클래스 패스에 해당 경로의 jar파일을 포함하고 있어야 한다.

Cassandra / HBase의 InputFormat과 OutputFormat에 대한 커스텀 컨버터의 예제를 보려면 다음을 참조하자.
Python example, Converter example

RDD Operations

RDD는 2가지 타입의 오퍼레이션을 지원한다.
1. transformation
- 하나의 RDD에서 새로운 데이터 셋을 생성해 내는 작업을 말한다.
- map은 transformation이다. 이것은 각 데이터 엘리먼트를 특정 함수로 처리하여 새로운 RDD를 결과로 반환한다.

2. actions
- 데이터셋에서 계산을 마치고 드라이버 프로그램으로 값을 반환한다.
- reduce는 action으로 특정 function을 이용하여 RDD의 엘리먼트의 전체를 aggregate하고, 최종 결과를 driver 프로그램으로 반환한다. (참고, reduceByKey는 분산된 데이터 셋을 반환한다. 병렬로...)

모든 transformation은 lazy이다. 이것은 해당 결과가 바로 처리되지 않는다는 것이다. 대신에 변환이 적용되어야 한다는 것을 단지 기억만 한다. transformation은 action이 호출되는 경우에만 수행이 되며, 이는 드라이버 프로그램으로 최종값이 전달된다.
이러한 설계는 spark를 더욱 효과적으로 동작하게 만들어 준다. 예를 들어 우리는 map를 통해서 생성된 데이터는 reduce에 사용하기 위해서 만들어 진다는 것을 알고 있다. 그리고 reduce는 결국 driver로 전달되기 위해서 사용된다.

기본적으로 각 transform 처리된 RDD는 action이 호출될때 매번 재 계산이 된다.  그러나 RDD를 메모리에 저장할 수 있다. 이때에는 persist 나 cache메소드를 호출하면 된다. 이것은 다음 실행때 스파크를 더욱 빠르게 동작하도록 해준다. 또한 Disk에 RDD를 저장하거나 복수개의 노드에 복제하도록 할 수 도 있다.

Basis

RDD의 기본을 위해서 다음 프로그램을 살펴보자.
lines = sc.textFile("data.txt")
lineLength = lines.map(lambda s : len(s))
totalLength = lineLengths.reduce(lambda a, b : a + b)
첫번재 라인은 외부 파일로 RDD를 정의한다. 이 데이터셋은 action이 호출되기 전에는 로드되지 않는다. lines는 단지 파일의 포인터일 뿐이다.
두번째 라인은 lineLengths를 정의하고 잇으며 이는 map 변환을 수행하고 있다. 다시한변 lineLengths는 즉시 수행되지 않는다. 이는 laziness 때문이다.
최종적으로 reduce를 수행하며 이것은 action이다. 이 포인트에서 Spark는 태스크들을 몇몇 머신으로 작업을 분산 시키고, 각 머신은 map의 부분을 수행한다. 그리고 local reduction작업을 수행한다. 결과적으로 최종 결과는 driver 프로그램으로 반환된다.

만약 lineLengths라는 것을 이후에도 사용하고자 한다면 다음과 같이 하면 된다.
lineLengths.persist()
reduce이전에 lineLengths로 인해서 처음 수행이 끝이나면 메모리로 저장이 된다.

Passing Functions to Spark

Spark의 API는 driver 프로그램에서 클러스터로 수행되기 위해서 전달된다. 여기에는 3가지 추천 방법이 있다.
1. Lambda expressions
- 단순 함수를 위해서 표현식을 사용할 수 있다. (Lambda는 복수개의 함수들 혹은 스테이트먼트를 지원하지 않는다. 이것은 값을 바환하지 않는다.)

2. Local defs로 Spark로 호출되는 함수이다. 더 긴 코드

3. 모듈로 구성된 최상위 레벨의 함수들

예를 들어 lambda를 이용하는 것 보다 더 긴 함수를 전달하는 것은 다음과 같다.
"""MyScript.py"""
if __name__ == "__main__" :
    def myFunc(s) :
        words = s.split(" ")
        return len(words)
    sc = SparkContext(...)
    sc.textFile("file.txt").map(myFunc)
주의 : 클래스 인스턴스에 메소드의 참조를 전달할 수 있다. ( singleton object 에 반대하는것과 같이) 이것은 객체를 전달하는 것을 필요로 한다. 이러한 내용은 클래스가 메소드를 포함하고 있다는 것이다.
다음예를 보자.
class MyClass(object) :
    def func(self, s) :
        return s
    def doStuff(self, rdd) :
        return rdd.map(self.func)
만약 우리가 MyClass를 생성하고, doStuff를 호출한다면 map 내부에서는 MyClass인스턴스의 func메소드를 참조하고 있게 된다. 그래서 전체 객체가 클러스터에 전달 되어야 하는 문제가 생긴다.

유사한 방법으로 외부 객체의 접근은 전체 객체를 참조하게 된다.
class MyClass(object) :
    def __init__(self) :
        self.field = "Hello"
    def doStuff(self, rdd) :
        return rdd.map(lambda s : self.field + s)
이러한 이슈를 피하기 위해서는 단순한 방법으로 로컬 변수에 필드 값을 복사해서 해당 값을 전달하는 것이다. 전체 객체를 전달하는 것 대신에.
def doStuff(self, rdd) :
    field = self.field
    return rdd.map(lambda s : field + s)

Understanding closures

Spark에서 어려운 작업중에 하나는 scope의 이해와 변수의 라이프사이클, 그리고 클러스터들 사이에서 코드가 실행될때 메소드 들이다.
RDD작업은 자신의 scope의 외부 변수들을 수정한다. 이것은 자주 혼란한 소스를 만들어 낸다. 아래 예제에서 우리는 foreach를 이용하여 counter를 증가시키는 예를 그러한 내용을 확인해볼 것이다. 그러나 유사한 이슈가 다른 작업을 위해 발생할 수 있다.

Example
Native RDD 엘리먼트의 sum을 생각해보자. 이것은 동일한 JVM상에서 실행이 되고 있는것인지에 따라서 차이가 난다. 공통적인 예로 Spark가 local모드로 수행되고 있을때와 (--master = local[n]) Spark가 클러스터에서 수행될때(spark-submit로 YARN에서 수행될때)가 그것이다.

counter = 0
rdd = sc.parallelize(data)
# 오류 : 이렇게 하지말라.
def increment_counter(x)
    global counter
    counter += x
rdd.foreach(increment_counter)
print("Counter value: ", counter)
 Local Vs Cluster modes
상단코드의 행동은 정의되지 않았다. 그리고 의도되로 동작하지 않을 것이다. 작업 수행을 위해서 Spark는 RDD오퍼레이션 처리를 태스크로 분리하게 된다. 각 태스크는 executor에 의해서 수행된다. 이전 실행은 Spark 는 task의 closure를 계산한다. Clouser는 변수들이며, 메소드들이다. 이것은 RDD상에서 수행되기 위해서 visible하게 되어야 한다. (이 케이스에서는 foreach()임). 이 clouser는 serialized되며 각 executor에 전달된다.

closure와 함께 변수들은 각 executor에 전달되며, 이는 복제가 이루어진다. closure가 foreach함수에서 참조될때는 더이상 드라이버 노드의 counter이 아니다. driver node의 counter는 메모리에 존재하지만, executor에서는 보이지 않는다. executor들은 오직 serialized closure에서 복제된 경우에만 보여진다. 그러므로 counter의 최종값은 여전히 0으로 남아 있게 된다. 모든 오퍼레이션은 serialized closure에 대한 변수값을 참조하게 된다.

로컬 모드에서 몇몇 환경에서 foreach함수는 driver와 같이 같은 JVM에서 수행될 것이다. 그리고 동일한 원본 counter를 참조한다. 그리고 아마 실제 업데이트가 잘 될 것이다.

잘 동작하도록 정의하기 위해서는 이 시나리오를 정렬하여 Accumulator를 이용할 수 있도록 해야한다. Accumulators는 스파크에서 안전하게 변수값을 업데이트 하는 메커니즘을 위한 특별한 기능으로 사용된다. 이는 클러스터에서 워커 노드들 사이에 분산되어 수행될때 유용하다. Accumulator 섹션에서 더 자세히 다룰 것이다.

일반적으로 closures - constructs는 loop 혹은 로컬에 정의된 메소드와 같다. 이는 변경이 되는 몇가지 글로벌 상태값에는 사용되지 않는것이 보통이다. 스파크는 이러한 행동을 정의하지 않거나 혹은 보장하지 않는다. 이런 상황은 closures의 외부로 부터 참조된 객체를 변형하는 작업을 의미한다. 몇몇 코드는 로컬 모드에서 동작한다. 그러나 이는 우연으로 수행된 것이며 분산 모드에서는 정상적으로 수행되지 않는다. Accumulator의 사용은 만약 글로벌 집계가 필요한경우에 대신해서 사용 한다.

Printing elements of an RDD

또 하나의 공통 용어는 RDD를 rdd.foreach(println) 혹은 rdd.map(println)을 이용하여 엘리먼트를 출력하는 것이다.
단일 머신에서는 기대한 값을 출력할 것이다. 그리고 모든 RDD의 엘리먼트역시 출력된다.
그러나 클러스터 모드에서는 stdout을 이용한 출력은 executor에 의해서 호출되며, 이것은 executor의 stdout을 대신해서 출력한다. 이것은 driver의 stdout이 아니다. 그러므로 driver의 stdout은 보여지지 않게 된다.
모든 엘리먼트를 드라이버에서 보고자 한다면 collect() 메소드를 이용하여 우선 RDD를 driver노드로 보내야한다.
예) rdd.collect().foreach(println)
이것은 드라이버가 수행될때 out of memory가 발생할 수 있는 여지가 있다. 왜나하면 collect()는 전체 RDD의 값을 단일 머신으로 패치하는 일을 하기 때문이다. 만약 RDD의 몇몇 element들만 출력하고자 한다면 더 안전한 접근법인 take()메소드를 이용하자.
예) rdd.take(100).foreach(println)

Working with Key-Value Pairs

대부분의 Spark operation들은 RDD에 포함된 데이터 타입의 객체를 이용하여 수행된다. 몇가지 특정 오퍼레이션은 RDD의 Key-Value쌍에서만 수행이 가능하다. 가장 공통적인 부분은 "shuffle"오퍼레이션이다. grouping 혹은 aggregating 을 키를 이용하여 수행한다.

Python에서는 이러한 operation은 Python에 내장되어 있는 튜플을 포함하는 RDD에서 수행이 된다. 튜플은 (1, 2)의 형식이다. 단순히 튜플들을 생성하고 필요한 오퍼레이션을 호출한다.

예를 들어 다음 코드들은 reduceByKey오퍼레이션을 이용하며 이는 key-value쌍을 이용하여 각 라인이 얼마나 자주 나타나는지 카운트 하는 것이다.
lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
counts.sortByKey()를 이용할 수 있다. 예를 들어 알파벳으로 소트를 수행하고 최종적으로counts.collect()의 결과를 드라이버 프로그램으로 가져올 수 있다.

Transformations

다음 테이블 리스트는 Spark에서 제공하는 몇가지 공통 transformation을 나열한 것이다. RDD API를 참고하여 더욱 상세한 내용을 살펴보자.

TransformationMeaning
map(func)해당 함수를 이용하여 전달된 각 엘리먼트를 처리하고, 형식화된 데이터 셋을 생성한다. 
filter(func)func기 true값을 반환하는 소스 에리먼트들을 선택하여 새로운 형식화된 데이터셋을 생성한다. 
flatMap(func)map과 유사하다. 그러나 입력 아이템은 0 혹은 더 이상이 될 수 있다. (그래서 func는 단일 아이템 대신에 Seq를 반환하는 것이 맞다.)
mapPartitions(func)map과 유사하다. 그러나 각 파티션에서 분산된 RDD 수행된다. func는 반드시 Iterator<T> ==> Iterator<U>와 같이 설정해야한다. 이때에는 타입 T의 RDD에서 수행될때이다. 
mapPartitionsWithIndex(func)mapPartitions와 유사하다. 그러나 func를 지원하며 처리할 파티션의 인덱스 값이 정수 값을 받는다. 그러므로 func는 반드시 Type T의 RDD인경우에는 (int, Iterator<T>) ==> Iterator<U>의 타입이어야한다.
sample(withReplacementfractionseed)데이터의 일부에 대한 샘플 조각, replacement를 하거나 혹은 하지 않은 정보가 될수 있으며 랜덤 넘버를 위한 seed를 이용한다. 
union(otherDataset)소스데이터와 아규먼트로 들어온 데이터 셋을 union한 결과를 반환한다. 
intersection(otherDataset)소스 데이터셋과 아규먼트로 들어온 데이터 셋의 교집합을 구한 결과를 반환한다. 
distinct([numTasks]))소스 데이터셋의 엘리먼트를 중복 제거한 결과를 바환한다. 
groupByKey([numTasks])(K, V) 쌍의 데이터셋에 대해서 호출한다. 이 결과는 (K, Iterable<V>)쌍의 데이터셋을 반환한다.
Note: 만약 aggregation에 대한 작업을 수행에 대한 그루핑을 한다. (합계나, 평균) 에 대한 결과를 구하며 이는 각 키에 대해서 reduceByKey혹은 aggregateByKey를 이용하면 더 낳은 성능을 보여준다.
Note: 기본적으로 병렬 레벨은 부모 RDD의 파티션 수에 의해서 결정이 된다. numTasks아규먼트를 선택적으로 전달할 수 있으며 이 것으로 서로다른 개수의 tasks를 설정할 수 있다. 
reduceByKey(func, [numTasks])(K, V)쌍의 데이터셋에 대해서 호출하며 (K, V)쌍의 데이터셋을 반환한다. 이 것은 각 키에 대한 집계된 데이터를 Value로 하며 이는 주어진 reduce 함수인 func를 이용하여 집계된다. 이 식은 반드시 type(V, V) => V로 되어야 한다. groupByKey와 같이 리듀스 태스크의 수는 선택적으로 두번재 아규먼트를 이용하여 설정이 가능하다. 
aggregateByKey(zeroValue)(seqOpcombOp, [numTasks])(K, V) 쌍의 데이터 셋에 대해서 호출하며, (K, U)쌍의 데이터셋을 반환한다. 각 키에 대해서 주어진 컴바인 함수를 이용하여 집계하며, 기본값이 "zero"값으로 설정된다. 또한 입력된 값의 타입과 집계 결과 타입이 다른 것을 허용한다. 이는 불필요한 할당을 피하도록 한다. groupByKey와 같이 리듀스 태스크의 수는 옵션값으로 전달할 수 있다. 
sortByKey([ascending], [numTasks])(K, V)쌍의 데이터셋에 대해서 호출하며 K값에 따라 정렬된 결과를 반환한다. 결과는 (K, V)의 데이터셋으로 키에 의해서 asc와 desc로 졍렬된다. assending에 대해서 boolean값을 설정할 수 있다.
join(otherDataset, [numTasks])(K, V)와 (K, W) 타입의 데이터셋에 대해서 호출하며, (K, (V, W))쌍의 데이터셋을 반환한다. 이것은 각 키에 대해서 모든 엘리먼트의 쌍을 반환한다. Outer joins는 leftOuterJoin, rightOuterJoin, fullOuterJoin을 통해서 제공된다. 
cogroup(otherDataset, [numTasks])(K, V)와 (K, W) 타입의 데이터 셋에 대해서 호출된다. 이 결과는 (K, (Iterable<V>, Iterable<W>)) 튜플을 반환한다. 이 오퍼레이션은 또한 groupWith로 불린다. 
cartesian(otherDataset)T와 U 타입의 데이터셋에 대해서 호출한다. (T, U)쌍의 데이터셋을 반환한다. (모든 엘리먼트의 쌍을 반환한다.)
pipe(command[envVars])RDD의 각 파티션에 대해서 쉘 커맨드를 통해서 파이프처리한다. 쉘은 Perl 혹은 bash script등이 될수 있을 것이다. RDD엘리먼트들은 프로세스의 stdin으로 쓰기를 하고, 처리된 라인들의 출력을 위해 stdout은 스트링의 RDD로 반환된다. 
coalesce(numPartitions) : 합치기RDD에서 파티션들의 수를 numPartitions까지 감소 시킨다. 이것은 큰 데이터셋을 필터링 다운한 이후에 더 효과적인 처리를 수행하기 위해서 유용하게 작용한다. 
repartition(numPartitions)RDD의 데이터를 랜덤으로 셔플하여 더 적은 혹은 많은 파티션으로  재 파티션 데이터를 생성한다. 그리고 그것들 사이에 밸런스를 맞춘다. 이것은 항상 모든 데이터를 네트워크를 넘어서 셔플하도록 처리한다. 
repartitionAndSortWithinPartitions(partitioner)주어진 파티셔너에 따라 RDD를 재 파티션 한다. 그리고 각 결과 파티션은 그들의 키에 따라 소팅한다. 이것은 repartition 호출에 비해서 더 효과적으로 처리된다. 그리고 각 파티션에 대한 소팅도 된다. 왜냐하면 셔플 머신으로 소팅처리되게 푸시할 수 있기 때문이다. 

Action

다음 테이블 리스트는 공통적인 action을 보여준다.
ActionMeaning
reduce(func)데이터셋의 엘리먼트를 집계한다. 이때 func로 전달된 함수를 이용한다. (이는 보통 2개의 아규먼트를 입력받고 하나를 반환한다.) 함수는 commutative (결과가 동일)하며, associative(결합의)한 결과를 되도록 한다. 이것은 병렬 처리에서 계산을 올바르게 할 수 있다. 
collect()데이터셋의 모든 엘리먼트를 배열로 만들어 드라이버 프로그램에 반환한다. 이것은 보통 필터 혹은 다른 오퍼레이션을 처리한 후에 유용하며, 데이터의 충분히 작은 서브셋을 바환한다..
count()데이터셋의 엘리먼트 개수를 반환한다. 
first()데이터셋의 첫번째 엘리먼트를 반환한다. (take(1)과 유사하다)
take(n)데이터셋의 첫번째 n개의 엘리먼트를 배열로 반환한다. 
takeSample(withReplacement,num, [seed])num으로 들어온 개수만큼 데이터셋의 엘리먼트를 랜덤하게 샘플링한 결과를 배열로 반환한다. 이는 또한 replacement처리를 하고 선택적으로 사전에 지정한 랜덤넘버 생성 seed를 전달할 수 있다. 
takeOrdered(n[ordering])자연적인 순서 그리고 커스텀 비교를 이용하여 RDD의 엘리먼트에서 첫번째 n개의 결과를 반환한다. 
saveAsTextFile(path)데이터셋의 엘리먼트들을 텍스트 파일(텍스트 파일의 셋)으로 주어진 로컬 파일 시스템의 디렉토리에 저장한다. HDFS혹은 특정 다른 Hadoop-supported파일 시스템에 저장가능, 스파크는 각 엘리먼트를 파일에 저장할때 하나의 라인의 스트링으로 변환하기 위해서 toString을 호출한다. 
saveAsSequenceFile(path)
(Java and Scala)
데이터셋의 엘리먼트를 Hadoop SequenceFile에 저장한다. 이는 로컬 파일 시스템의 주어진 패스에 저장하거나,  HDFS혹은 다은 Hadoop서포트 파일 시스템에 저장된다. 이것은 Key-Value쌍의 RDD에서도 가능하며 Hadoop의 writable인터페이스이다. 스칼라에서는 이것은 암묵적으로 변환될 수 있고 쓰기가 될 수 있다. (스파크는 기본 타입으로 변환을 지원한다. Int, Double, String등)
saveAsObjectFile(path)
(Java and Scala)
자바의 serialization을 이용한 단순한 형식으로 데이터셋의 엘리먼트를 저장한다. 이것을 로드하기 위해서는 usingSparkContext.objectFile()를 이용할 수 있다. 
countByKey()(K, V)의 형식의 RDD만 가능하다. 이는 (K, Int)의 해시맵 쌍을 반환한다. 즉, 각 키의 카운트를 반환한다. 
foreach(func)각 데이터셋의 엘리먼트에 대해서 func를 통과하도록 한다. 이것은 보통 Accumulator 업데이팅 혹은 외부 스토리지 시스템과의 상호작용등의 사이드 이펙을 만든다.
Note : Accumulator 대신에 변수들의 변형은 정의되지 않은 행동의 결과를 낼 것이다. Understanding closures를 참조하자. 

Shuffle operations

이벤트에 대해 스파크 트리거와 함께 수행하는 오퍼레이션은 shuffle로 알려져 있다. 셔플은 스파크 메커니즘으로 데이터를 재 분배하고 서로다른 파티션으로 그룹 지어 주는 역할을 한다. 이것은 일반적으로 각 executors들과 machines로 데이터를 카피하는 것을 포함한다. 셔플은 복잡하고, 비용을 수반하는 작업이다.

Background
셔플이 진행되는 동안 어떠한 일이 일어나는지 이해하기 위해서 reduceByKey오퍼레이션의 예를 살펴볼 수 있다. reduceByKey오퍼레이션은 새로운 RDD를 생성한다. 이는 단일 키에 대한 튜플이 컴바인 되는 형태이다. 키와 리듀스 함수를 키에 대해서 모든 값들이 처리된다.

어려운 일은 단일 키에 대한 모든 값이 필수적으로 동일한 파티션이나 동일 머신에 존재하지 않는 것이다. 그러나 그것들은 반드시 결과를 계산하기 위해서 서로 같이 위치해야한다는 것이다.

스파크에서 데이터는 일반적으로 특정 오퍼레이션을 위해서 필요한 위치내에 존재하기 위해서 파티션들 사이에 분산되어 있지 않다. 계산이 수행되는 동안 단일 태스크는 단일 파티션에서 수행될 것이다. 그러므로 단일 리듀스 작업을 위한 reduceByKey를 위해 모든 데이터를 수행하기 위해 조직화 된다. 스파크는 all-to-all오퍼레이션을 필요로 한다. 이것은 모든 키를 위해서 모든 값을 찾는 행위는 모든 파티션에서 읽어야 한다. 그리고 계산을 위해서 파티션들 사이에서 가져와서 최종적으로 각 키에 대해 수행한다. 이것을 shuffle라고 부른다.

새롭게 셔플된 데이터 각 파티션 내의 엘리먼트 집합은 결정적이게 된다. 그리고 이것은 파티션들 그 자체의 정렬이 이루어지며, 이러한 엘리먼트의 졍렬은 아니다. 만약 예상대로 정렬된 데이터를 필요로 하면 다음 셔플을 따른다.
- mapPartitions 는 각 파티션을 이용하여 소트한다. 예) .sorted
- repartitionAndSortWithinParitions 는 동시에 리파티셔닝을 수행하는 동안 파티션의 소트를 효과적으로 수행한다.
- sortBy 는 글로벌적으로 정렬된 RDD를 만든다.

오퍼레이션들은 셔플의 원인이 될수 있다. 이는 repartition오퍼레이션들을 포함하며 repartition이나 coalesce 와 같다. 'ByKey'오퍼레이션들(카운팅은 예외)은 groupByKey그리고 reduceByKey와 같다. 그리고 join오퍼레이션은 cogroup과 join과 같다.
Performance Impact
Shupple은 매우 배싼 오퍼레이션이다. 이것은 disk I/O와 데이터 직렬화 그리고 network I/O를 수반한다.
셔플을 통해 데이터를 조직화 하기 위해서 Spark는 태스크의 셋들을 생성한다. map태스크는 데이터를 조직하고 reduce태스크의 셋들은 집계를 한다. 이 명명법은 MapReduce에서 따왔다. 그리고 직접적으로 Spark의 map과 reduce의 오퍼레이션과 연관되어 있지 않다. 

내부적으로 각 태스크들의 맵으로 부터 결과는 메모리에 유지된다. 그리고 이들은 타켓 파티션에서 소트된 기준으로 존재한다. 그리고 단일 파일에 쓰여진다. 리듀스 사이드에서 태스크는 관련된 소트된 블록을 읽는다. 

특정 셔플 오러페이션들은 매우 많은 양의 힙 메모리를 소비할 수 있다. 이들은 데이터를을 변환 전 혹운 후에 메모리 데이터 구조화를 수행할때 발생한다. 특별히 reduceByKey와 aggregateByKey는 이러한 구조를 맵 사이드에서 생성한다. 그리고 'ByKey'오퍼레이션들은 리듀스 사이드에서 진행된다. 만약 데이터가 메모리에 맞아떨어지지 않는경우 이것은 이러한 테이블을 디스크로 저장한다. 이는 추가적인 I/O오버헤드를 발생시키고 가비지 컬렉션을 증가 시킨다. 

셔플은 또한 매우큰 중간 파일을 디스크로 남긴다. Spark 1.3의 예와 같이 이러한 파일은 상응하는 RDD가 더이상 사용되지 않거나 가비지 컬렉터 되어지 전까지 보존된다. 이것은 만약 lineage가 재 계산되어지면 셔플 파일은 필요하지 않게 된다. 가비지 컬렉션은 오랜 기간의 작업 이후에 아마 발생할 것이다. 만약 어플리케이션이 RDD의 참조를 포함하고 있거나, GC가 자주 발생하지 않을경우가 그렇다. 이 의미는 오래 수행되는 스파크 잡은 아마 큰 용량의 디스크 공간을 차지할 것이다. 임시 스토리지 디렉토리는 thespark.local.dir설정을 통해서 수정할 수 있으며 이를 수행하여 Spark Context를 설정할 수 있다. 

셔플의 행동은 다양한 설정 파일을 수정하는 것에 달려 잇다. 셔플 행동부분 참조, 섹션은  Spark Configuration Guide에 포함된다. 

RDD Persistence

스파크에서 가장 중요한 기능중에 하나는 persisting(caching) 으로 오퍼레이션 간 데이터셋을 메모리에 저장하는 것이다. RDD를 persist할때 각 노드의 특정 파티션에 저장한다. 메모리에서 계산을 하고 데이터 셋 (혹은 데이터셋에서 드라이빙된 데이터셋)상에서 다른 엑션을 수행하기 위해 재 사용된다. 이는 이후 작업을 더욱 빠르게 해준다. (종종 10배 정도 더 빠르다.) Caching은 반복적인 알고리즘을 위한 핵심 툴이며 빠르게 사용이 가능하다. 
RDD는 persist()와 cache()메소드들을 이용하여 저장되도록 할 수 있다. 첫번째는 액션에서 수행되어진다. 이것은 노드들의 메모리에 저장된다. 스파크의 캐시는 fault-tolerant 이다. RDD의 특정 파티션이 유실되면 자동적으로 원본으로 부터 변환되어 다시 재 계산되어진다. 
또한 각 퍼시스트 된 RDD는 서로다른 스토리지 레벨을 이용하여 저장된다. 예를 들어 디스크에 데이터셋을 퍼시스트 하거나, 메모리에 하지만 자바 객체로 시리얼 라이즈 한 내용을 저장할 수 있다. (저장 공간을 위해서)
노드들간에 복제를 수행하거나 혹은 Tachyon내에 힙 외부에 저장될 수 있다. 이러한 레벨들은 StorageLevel객체를 전달하여 설정할수 있으며 persist()메소드에 이용한다. cache()메소드는 빠르게 사용하기 위한 메소드로 기본 스토리지 레벨을 가지고 있다. 이것은 StorageLevel.MEMORY_ONLY(로 메모리네에 역직렬화 되어 저장된다.)
스토리지 레벨은 다음과 같다. 
Storage LevelMeaning
MEMORY_ONLYRDD를 JVM내의 자바 객체로 역 직렬화 하여 저장한다. 만약 RDD가 메모리에 맞지 않을경우 몇몇 파티션은 캐시 되지 않을 것이다 그리고 필요한경우 매번 재 계산된다. 이것은 디폴트 레벨이다. 
MEMORY_AND_DISKRDD를 JVM내의 자바 객체로 역 직렬화 하여 저장한다. 만약 RDD가 메모리에 맞지 않을경우 파티션의 디스크에 저장하며 필요한경우 읽어 들인다. 
MEMORY_ONLY_SERRDD를 자바 객체의 직렬화된 객체로 저장한다. (각 파티션의 바이트 어레이로 저장된다.) 이것은 일반적으로 역직렬화된 객체에 비해서 공간을 더욱 절약할 수 있다. 특별히 fast serializer를 이용할때 더욱 그러하며, 읽기 작업시 CPU를 더 많이 쓴다. )
MEMORY_AND_DISK_SERMEMORY_ONLY_SER 와 유사하다. 그러나 메모리가 적당하지 않을때 파티션에 분산해서 저장한다. 이것은 매번 필요한경우 재계산을 하지 않기 위해 이런 작업을 한다. 
DISK_ONLYRDD파티션을 디스크로 저장한다. 
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.상단 레벨과 동일하다. 그러나 각 파티션을 2개의 노드에 복제본을 만든다. 
OFF_HEAP (experimental)
실험적인 레벨
RDD를 직렬화 하여 Tachyon에 저장한다. 이는 MEMORY_ONLY_SER, OFF_HEAP 과 비교하여 가비지 컬렉션 오버헤드가 줄어 들었다. 그리고 executors가 더 작고 메모리 풀을 공유하도록 해준다. 이는 큰 힙을 가진 환경이나, 멀티플 컨커런트 어플레케이션에 매력적인 모델이다.
나아가 Tachyon내부에 존재하는 RDD는 executor의 크래쉬가 캐시 메모리의 데이터를 유실하도록 하지 않는다. 이 모드는 Tachyon내 메모리는 버릴 수 있다. 그러므로 Tachyon은 블록을 재구성하는 것을 시도하지 않으며, 메모리로 부터 제거된다. 만약 Tachyon을 off heap 스토어로 사용할 계획이라면 사프크는 Tachyon out-of-the-box와 함께 호환이 된다. 다음 페이지를 참조하여 필요한 버젼을 확인해보라. http://tachyon-project.org/master/Running-Spark-on-Tachyon.html
Note: Python에서 저장된 객체는 항상 Pickle라이브러리를 이용하여 시리얼라이즈 된다. 그래서 어떤 시리얼 라이즈 레벨을 선택하든 상관이 없다. 
스파크는 또한 자동적으로 셔플 오퍼레이션에서 중간 데이터를 persists한다. (예, reduceByKey) 이것은 persist를 호출하지 않더라도 수행된다. 이는 셔플이 실패하는경우 전에 입력을 재 처리하지 않도록 한다. 우리는 여전히 재사용하고자 하는 경우라면 persist를 호출하는 것을 추천한다. 

Which Storage Level to Choose?

스파크의 스토리지 레벨은 메모리 사용과 CPU효율 사이에 트리이드 오프에 대한 차이로 인해 무엇을 쓸지 결정된다.
다음 선택 기준을 추천한다. 
  • RDD가 기본 스토리지 레벨로 적합하다면 (MEMORY_ONLY) 그대로 두라. 이것은 가장 CPU효율적인 옵션이다. 이는 가능하면 RDD에 대한 오퍼레이션을 빠르게 해준다. 
  • 그렇지 않다면 MEMORY_ONLY_SER과 selecting a fast serialization library를 이용해보라. 이것은 공간 효율을 더 좋게 한다. 그러나 여전히 접근빠른 접근을 유지한다. 
  • 계산된 데이터셋들이 비싸거나 필터가 매우 큰 데이터에 대해 수행되는 것과 같은 기능이 아니라면 디스크로 분산하지 마라, 그외의 경우는 디스크로 부터 가능하면 빠르게 수행될 수 있도록 파티션을 재 처리하라. 
  • 빠른 fault recovery를 원한다면 replicated storage 레벨을 이용하자. (만약 스파크를 이용하여 웹 어플리케이션으로 부터 요청을 처리한다면). 모든 스토리지 레벨은 유실된 데이터를 재 계산하는 것으로 full fault tolerance를 제공한다. 그러나 복제된 것은 분시될 파티션의 재 계산을 기다리지 않고 RDD상에서 태스크를 계속해서 수행할 수 있도록 해준다. 
  • 높은 모메리 양의 환경에서 혹은 복수 어플리케이션에서는 실험적인 OOF_HEAP 모드가 몇가지 이점이 있다. 
    • Tachyon에서 동일한 메모리 폴을 공유하여 복수개의 executors가 공유하도록 해준다. 
    • GC코스트를 극적으로 줄여준다. 
    • 긱 execitprs의 크래시가 나는경우라도 캐시된 데이터는 유실되지 않는다. 

Removing Data

스파크는 자동적으로 각 노드의 캐시 사용을 모니터 하고 LRU알고리즘을 통해서 오래된 파티션은 제거시킨다. 만약 수동으로 RDD를 먼저 제거하고자 한다면 RDD.unpresist()메소드를 이용하면 된다. 

Shared Variables

보통 함수가 Spark오퍼레이션에 전달되면 (map혹은 리듀스) 원격지의 클러스터 노드에서 수행된다. 이것은 함수내에 사용되는 모든 변수들의 복제본에서 수행된ㄷ.ㅏ 이러한 변수들은 각 머신으로 복제 되며, 원격 머신에서 변수의 업데이트 없이 드라이버 프로그램으로 전파된다. 일반적인 지원은 태스크들 사이에 읽기-쓰기 공유 변수들로 비 효율적이다. 그러나 스파크는 shared variables에 대해서 2가지 사용 패턴에 대해서 2가지의 제약된 타입을 제공한다.
broadcast variables와 accumulators이다. 

Broadcast Variables

Broadcast변수는 프로그래머가 읽기 전용의 변수값을 각 머신에 캐시할 수 있도록 해준다. 이는 실행때 해당 변수를 실도록 복제하는 것 대신에 수행된다. 이들은 사음과 같이 사용될 수 있다. 각 노드에 효과적인 처리를 위해서 큰 입력 데이터셋의 복제본을 제공하는 것이다. 또한 스파크는 broadcast 변슈들을 효과적인 브로드 캐스트 알고리즘을 이용하여 커뮤니케이션의 비용을 줄이는 방향으로 분배를 시도한다. 
스파크 액션들은 stages의 집합을 통해서 수행된다. 이는 shuffle 오퍼레이션을 통해서 분산되어 수행된다. 스파크는 자동적으로 브로드캐스트를 통해 각 태스크들이 각 스테이지마다 필요한 공통 데이터를 분배한다. 데이터 브로드캐스트는 각 태스크가 실행되기 이전에 시리얼라이즈된 형태의 캐시된 데이터를 역 시리얼라이즈 한다. 이 의미는 명시적으로 브로드캐스트 변수들을 생성하는 것은 동일한 데이터에 대해서 복수개의 스테이지를 통해서 태스크가 수행될때만 유용하거나 혹은 역질력화된 데이터 형태로 캐시될때 중요하다. 
브로드캐스트 변수들은 변수 v로 부터 SparkContext.broadcast(v)를 이용하여 생성된다. 브로드캐스트 변수는 v를 감싸고 있으며, 변수는 value메소드를 통해서 접근이 가능하다. 다음 코드를 확인해보자.

>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>
>>> broadcastVar.value
[1, 2, 3]

broadcast variable가 생성된 이후에 변수 v대신에 클러스터의 특정 함수에서 사용되어진다. 그래서 v는 한번 이상 적재 되지 않는다. 게다가 객체 v는 모든 노드가 브로드캐스트된 변수를 공유하기 때문에 한번 브로드캐스드 된 이후에는 변하지 않는 것을 보장한다. 

Accumulators

Accumulators는 변수들이며 이는 연관된 오퍼레이션들을 통해서 추가된다. 그리고 병렬로 효과적으로 제공된다. 그것들은 카운터를 구현하여 사용될 수 있다. (MapReduce) 에서 혹은 sum으로 이용된다.
스파크는 자연적으로 숫자형 타입의 accumulators를 제공하며 프로그래머는 새로운 타입을 제공할 수 있다. 만약 accumulators가 이름과 함께 생성이 되면 이는 Spark UI에서 나타난다. 이것은 현재 수행되고 있는 스테이지의 처리상태를 이해하는데 좋다. (노트 : Python에서는 제공되지 않는다.)

accumulator는 SparkContext.accumulator(v)를 호출하여 변수 v를 초기화 할 수 있다. 클러스터에서 수행되는 태스크들은 add메소드 혹은 +=오퍼레이터를 이용하여 추가될 수 있다.(스칼라나, 파이션) 그러나 값을 읽을 수 없다. 오직 드라이버 프로그램만이 accumulators의 값을 읽고 value메소드를 이용할 수 있다. 

아래 코드는 accumulator를 배열의 엘리먼트에 추가하는 예를 보여주고 있다. 
class VectorAccumulatorParam(AccumulatorParam) :
    def zero(self, initialValue) :
        return Vector.zeros(initialValue.size)
    def addInPlace(self, v1, v2) :
        v1 += v2
        return v1
# Then, create an Accumulator of this type :
    vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())
accumulator업데이트는 오직 action내부에서만 수행된다. 스파크는 이러한 태스크들의 업데이트들이 단 한번만 이루어 지는것을 보장한다. 태스크를 재 시작할경우 value값을 업데이트 하지 않을 것이다. transformation에서 사용자들은 각 태스크들의 업데이트를 수행할때 주의해야한다. 만약 태스크나 잡 스테이지가 재 실행되는 경우에는 한번 이상 이루어질 수 있다. 
Accumulators는 스파크의 lazy evaluation 모델을 변경하지 않는다. 만약 RDD오퍼레이션 상에서 수정이 되었다면 그 값은 오직 action의 파트에서 단 한번만 계슨으로 인해서 업데이트 된 것이다.
결과적으로 accumulator업데이트는 lazy transformation 을 수행할때에는 실행에 대한 보장을 하지 않는다. 아래 에제 코드는 이러한 속성을 잘 보여준다.

accum = sc.accumulator(0)
def g(x) :
    accum.add(x)
    return f(x)
data.map(g)
# Here, accum is still 0 because no actions have caused the 'map' to be computed.
Deploying to a Cluster

application submission guide에서는 어떻게 클러스터에 어플리케이션을 submit하는지 보여준다. 간단하게 어플리케이션을 우선 JAR(Java / Scala)로 패키징하거나, .py혹은 .zip파일(파이선을 위해서) 묶고 나면 bin/spark-submit 스크립트에 전달하여 클러스터 매니저를 수행할 수 있다.

Launching Spark jobs from Java / Scala
org.apacke.spark.launcher 패키지는 스파크 잡들을 자식 프로세스들에 런칭하기 위한 클래스를 제공한다. 이는 단순히 Java API를 이용하고 있다. 

Unit Testing

스파크는 유닛테스트에 프렌들리하게 인기있는 유니세 테스트 프레임워크를 제공한다. 단순하게 SparkContext를 생성하고 마스터 URL을 local로 세팅하고 오퍼레이션을 수행한다. 그리고 SparkContext.stop()를 tear down으로 수행하면 된다. finally블록에서 컨텍스트를 정지하거나 tearDown메소드를 수행하여 마지막 작업을 할 수 있다. 스파크는 2개의 컨텍스트를 동시에 하나의 프로그램에 수행 할 수 없다.

Migrating from pre-1.0 Versions of Spark

스파크 1.0dms 1.X시리즈의 스파크 코어의 API를 고정하고 있다. 어떤 API이든 현재 가능하며, "experimental"혹은 "developer API"로 마크 되어 있지 않고, 앞으로 버젼에는 제공될 것이다. Python사용자를 위해서 변경된 것은 그루핑 오퍼레이션으로  groupByKey, cogroup그리고 join이 그런것이며 (key, list of variables)쌍을 반환하던 것을 (key, iterable of values)로 변경되었다. 

Where to Go from Here

스파크 웹사이트에 있는 몇가지 예제 프로그램을 살펴보자. 추가적으로 스파크는 몇가지 examples디렉토리에 샘플 예제를 포함하고 있다. 이는 다음 명령어를 통해서 실행할 수 있다.
./bin/run-example SparkPi
파이썬의 예제의 경우 spark-submit를 이용하여 실행할 수 있다.
./bin/spark-submit examples/src/main/python/pi.py 
R의 예제는 다음과 같이 실행할 수 있다.
./bin/spark-submit examples/src/main/r/dataframe.R 
프로그램의 최적화를 위해서는 configurationtuning가이드에서 실전 예제를 통해서 제공하고 있다.
이는 메모리에 저장되는 데이터의 효과적인 포맷에 대한 가이드를 제공해준다. 분산된 오퍼레이션과 클러스터 메니저에 대한 컴포넌트의 설명은 cluster mode overview의 예제를 살펴보자.
마지낙으로 전체 API도큐먼트는 ScalaJavaPython and R 에서 살펴보자. 

[Spark] Cluster Mode

[Spark] Cluster Mode

Cluster Mode Overview

from : https://spark.apache.org/docs/latest/cluster-overview.html

클러스터 모드에서 어떻게 스파크가 수행되는지 보여주는 문서이며, 컴포넌트들이 수행하는 일에 대해서 쉽게 이해할 수 있다.

Components

스파크 어플리케이션은 크러스터 상에서 독립된 처리 단위로 수행된다. 이는 SparkContext 객체에 의해서 코디네이트 된다. (이를 driver program이라고 한다.)

특히, 클러스터에서 수행될때 SparkContext는 몇가지 타입의 클러스터 관리자와 연결할 있다. (스파크의 스탠드 얼론 클러스터 관리자, Mesos, YARN등이 있다.) 이것은 어플리케이션 상호간의 자원을 할당한다. 한번 접속이 되면 스파크는 클러스터 내의 노드들에서 수행하며, 이 처리는 계산을 하거나 데이터를 저장하는 작업을 수행한다. 다음으로 어플리케이션 코드를 실행하는 executors에 전달한다. (JAR혹은 Python파일을 SparkContext에 전달한다.) 최종적으로 SparkContext는 태스크를 executors에 전달하여 실행한다.

Spark cluster components
이 아키텍쳐에는 주요 노트 몇가지가 있다.
1. 각 어플리케이션은 소유한 executor processes를 획득한다. 이것은 전체 어플리케이션의 기간동안 유지되며, 태스크를 멀티 쓰레드로 실행한다.
이것은 어플리케이션들 각각을 독립적으로 고립시키는 이점이 있다. 여기에는 스케줄링 사이드(각 드라이버는 자신의 태스크를 스케줄링한다.) 와 executor 사이드(서로다른 어플리케이션의 태스크들은 서로다른 JVM상에서 동작한다.)가 있다. 그러나 이 의미는 외부 스토리지 시스템에 해당 정보를 쓰는 것을 통하지 않고서는, 서로다른 스파크 어플리케이션 상호간에 데이터를 쉐어할 수 없다는 의미가 된다.

2. 스파크는 클러스터 매니저를 알지 못한다. 가능한 오랬동안 executor processes를 가질 수있으며, 이들 상호간에 서로 커뮤니케이션 하며, 비교직 쉽게 클러스터 매니저 상에서 실행이 가능하다. 이것은 또한 다른 어플리케이션을 지원한다. (Mesos/YARN)

3. 드라이버 프로그램은 executors의 라이프타임 내에서 들어오는 접근을 반드시 listen해야하며, 받아 들여야 한다. 드라이버 프로그램은 반드시 워커 노드들로 부터 네트워크 접근을 할 수 있도록 해야한다.

4. 클러스터에서 드라이버가 스케줄하기 때문에 워커노드 가까운 곳에서 수행되어야한다. 가급적이면 동일한 LAN내에 있어야 한다. 만약 원격지에 요청을 보내고자 한다면 드라이버에 대한 RPC를 오픈하고 오퍼레이션을 워커노드에서 가급적 가까운 곳에서 수행할 수 있도록 해야한다.

Cluster manager Types

현재 지원되는 클러스터 매니저는 다음과 같다.
Standalone - 단순 클러스터 매니저로 Spark를 포함하며, 클러스터를 쉽게 설정할 수 있다.
Apache Mesos - 일반적은 클러스터 관리자이며, Hadoop MapReduce와 서비스 어플리케이션을 수행할 수 있다.
Hadoop YARN - Hadoop2에 있는 리소스 매니저이다.

추가적으로 Spark의 EC2 launch scripts는 Amazone EC2에서 스탠드 얼론 클러스터에 런치하기 쉽게 해준다.

Submitting Applications

어플리케이션은 spark-submit스크립트를 이용하여 다양한 타입으로 서밋 할 수 있다.
다음 참조 : https://spark.apache.org/docs/latest/submitting-applications.html


Monitoring

각 드라이버 프로그램은 web UI를 가지고 있다. 일반적으로 4040포트를 이용하며 실행되는 태스크, executors, storage사용 등을 볼 수 있다. 단순히 http://<driver-node>:4040 에 접근하여 볼 수 있다.
다음을 참조 : https://spark.apache.org/docs/latest/monitoring.html

Job Scheduling

스파크는 어플리케이션들 간에 리소스 할당을 컨트롤 하도록 해준다. (클러스터 매니저의 역할에서) 그리고 어플리케이션 내부의 계산들은 동일한 SparkContext내에서 수행된다.
다음을 참조 : https://spark.apache.org/docs/latest/job-scheduling.html

Glossary

다음 표는 클러스터 개념을 나타내는 용어들을 보여준다.
TermMeaning
Application스파크내 사용자 프로그램, 드라이버 프로그램과 클러스터 내의 executors로 구성된다. 
Application jar사용자 스파크 어플리케이션을 포함하는 jar파일, 사용자는 "uber jar"를 생성하기를 원할수 있다. 이때 이 내부에는 의존성 패키지를 포함한다.
사용자의 jar는 Hadoop 혹은 Spark라이브럴리를 포함해서는 안된다. 이는 실행시점에 포함된다. 
Driver program어플리케이션의 main() 함수를 실행하며, SparkContext를 생성한다. 
Cluster manager클러스터 내의 필요한 리소스를 위한 외부 서비스이다. (standalone manager, Mesos, YARN등)
Deploy mode드라이버 어디에서 프로스세를 실행하는지를 구별한다. 클러스터 모드에서 프레임워크는 클래서터의 내부에 드라이버를 실행한다. 클라이언트 모드에서는 클러스터의 외부에서 드라이브를 런치한다. 
Worker node클러스터 내에서 어플리케이션 코드를 실행할 수 있는 특정 노드
Executor워커노드에서 어플리케이션을 위해서 린치된 프로세스이다. 이것은 태스크를 실행하고, 메모리에 데이터를 저장하거나 디스크에 접근하는 역할을 한다. 각 어플리케이션은 자신의 executors를 가지고 있다. 
Task하나에 executor에 전달될 작업의 단위
Job복수개의 태스크들로 구성된 병렬 계산처리를 하는 것으로 스파크 액션의 응답에서 만들어진다. (save, collect) 이것은 드라이버의 로그에서 이 용어를 사용한 것을 볼 수 있을 것이다. 
Stage각 job은 스테이지라고 불리는 태스크의 셋들로 나누어 진다. 이것은 서로 의존적이며 (이는 map, reduce와 유사하다.) 드라이버 로그에서 이 용어를 사용한 것을 볼 수 있을 것이다. 



[Spark] Quick Start

[Spark] Quick Start

Spark Quick Start

Interactive Analysis with the Spark Shell

기본 : 

스파크 쉘을 다음과 같이 실행하자. 이는 대화형 데이터 분석을 위한 강력한 툴이다.
./bin/pyspark
스파크의 중요한 추상화는 Resilient Distributed Dataset(RDD)라고 부르는 분산 컬렉션이다.
RDD는 Hadoop 입력 포맷 이나 transformation등을 통해서 생성된다. 스파크 패키지에 존재하는 README 파일을 읽어 들이는 예제를 보자.
>>> textFile = sc.textFile("README.md")
RDD는 actions과 transformations를 가진다. action은 반환 값을 가지며, transformation은 새로운 RDD를 생성한다.
다음 몇개의 액션들을 보자.
>>> textFile.count()       # 이 RDD에서 아이템의 총 개수를 센다.
126
>>> textFile.first()          # 이 RDD에서 첫번째 아이템을 출력한다.
u'# Apache Spark'
다음은 transformation을 사용한 예제이다.
이것은 filter transformation을 통해서 새로운 RDD를 생성한다. 이는 file의 subset이다.
>>> linesWithSpark = textFile.filter(lambda line: "Spark" in line)
그리고 transformations과 actions의 체인을 걸어줄 수 있다.
>>> textFile.filter(lambda line: "Spark" in line).count()   # "Spark"를 포함하는 라인이 몇개인가?
15

RDD 연산 더보기 

RDD actions와 transformations는 더 복잡한 계산을 위해서 사용이 가능하다.
가장 긴 단어의 라인을 찾는다고 해보자.
>>> textFile.map(lambda line: len(line.split())).reduce(lambda a, b: a if (a > b) else b)
15
이 첫번째 맵은 라인을 정수 값으로 하는 새로운 RDD를 생성한다. reduce는 가장 긴 라인 카운트를 찾는데 호출된다.map 아규먼트와 recude아규먼트는 (lambda)로 anonymous functions이다. 우리는 최상위 레벨의 파이선 함수를 전달할 수도 있다. 예를 들어 우리는 max함수를 이해하기 쉽게 다음과 같이 만들 수 있다.

>>> def max(a, b) :
...            if a > b:
...                return a
...            else:
...                return b
>>> textFile.map(lambda line: len(line.split())).reduce(max)
15
하나는 Hadoop에 잘 알려진 데이터 흐름 패턴인 MapReduce이다. Spark는 다음과 같이 쉽게 MapReduce를 구현할 수 있다.
>>> wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
여기에서 우리는 flatMap, map과 reduceByKey transformations를 서로 연결하여 RDD에 존재하는 각 단어의 개수를 새로운 RDD로 (string, int) 쌍으로 만들어 낸다.
우리에 쉘에서는 단어 수를 세는 작업은 collect 액션을 이용하여 구현한다.
>>> wordCounts.collect()
[[(u'and', 9), (u'A', 1), (u'webpage', 1), (u'README', 1), (u'Note', 1), (u'"local"', 1), (u'variable', 1), ...]

Caching

Spark는 또한 여러 클러스들에 넓게 메모리 캐시에 데이터를 넣을 수 있는 기능을 제공한다. 이것은 데이터를 반복적으로 접근할때 매우 유용한 기술이다. 작은 "hot" 데이터셋을 쿼리 할때나, PageRank와 같은 알고리즘을 반복적으로 수행하고자 할때 유용하다. 다음은 linesWithSpark데이터 셋을 캐시 하는 예이다.

>>> linesWithSpark.cache()
>>> linesWithSpark.count()
19
>>> linesWithSpark.count()
19

만약 100라인짜리 텍스트 파일을 캐시한다면 매우 우스운 일일 것이다. 흥미로운 것은 이러한 동일한 함수를 통해서 매우 큰 데이터 셋을 사용할 수 있다는 것이다. 비록 이들이 10개에서 수백개의 노드에 흩어져 있더라도 말이다. 또한 bin/pyspark를 통해서 클러스터에 연결하여 이러한 작업을 수행할 수 있다. 이는 프로그래밍 가이드를 참조하자. https://spark.apache.org/docs/latest/programming-guide.html#initializing-spark

Self-Contained Applications

우리는 Spark API를 이용하여 self-contained application을 작성할 수 있다. 
다음 SimpleApp.py 파일을 살펴보자. 

"""SimpleApp.py"""
from pyspark import SparkContext
logFile = "YOU_SPARK_HOME/README.md"
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
이 프로그램은 텍스트 파일에서 각 라인중에 a와 b를 포함한 라인의 수를 카운트 한다. YOUR_SPARK_HOME은 인스톨된 스파크의 위치를 지정하면 된다. SparkContext는 RDD를 생성하기 위해서 사용된다. 또한 Spark에 Python 함수를 전달할수 있다. 이것은 자동으로 해당 함수를 참조하도록 시리얼라이즈 된다. 어플리케이션에서 사용하기 위해 커스텀 클래스혹은 서드파티 라이브러리를 이용할 수 있다. 또한 이 외부 어플리케이션을 실행하기 위해서 spark-submit를 호출할 수 있다. 자세한 내용은 spark-submit --help를 통해서 사용법을 확인해보자. 

다음 과 같이 함수를 실행해보자. 
# spark-submit 를 실행하여 어플리케이션을 실행한다. 
$ YOUR_SPARK_HOME/bin/spark-submit \
   --master local[4] \
   SimpleApp.py
...
Lines with a: 46, Lines with b: 23



[Mac] ssh 자동로그인처리

Mac에서 ssh기능 실행하기.


1. Mac에서 ssh기능 실행하기. 

- 우선 Mac에서 ssh기능을 실행하기 위해서는
"시스템 환경설정" > "공유" > "원격 로그인" 을 실행해주자.



2. 인증키 생성하기. 

- 원격 로그인을 자동으로 실행하기 위해서는 인증키를 생성하여 클라이언트에 공개 인증키를 배포하면 된다. 

2.1 인증키 생성하기. 

ssh-keygen -t rsa
mac에서는 위와 같이 ssh-keygen을 이용하면 된다. 

Enter file in which to save the key (/Users/KIDO/.ssh/id_rsa):
생성할 파일을 입력하는 것으로 기본값을 이용하기 위해서 enter키를 치고 넘어간다. 

Enter passphrase (empty for no passphrase):
Enter same passphrase again:
생성할 비밀번호를 입력한다. 
이때 비밀번호를 입력하지 않으면, 공개키만 가지고 해당 서버에 접근할 수 있으므로 보안에 위험이 생길 수 있으므로 비밀번호를 입력하자. 
비밀번호는 10자에서 30자까지 넣으면 된다. 

The key's randomart image is:
+---[RSA 2048]----+
|o.  .oo=oo       |
|o. o oo ++       |
| oo =   o+.      |
|o  oooo.E.=      |
|. . .+  .. o     |
|   o  ... .      |
|  .   ...o       |
+----[SHA256]-----+
결과로 위와같은 내용이 출력되고 다음과 같은 파일이 생성된다. 
-rw-------   1 KIDO  staff  1766  5 13 22:57 id_rsa
-rw-r--r--   1 KIDO  staff   404  5 13 22:57 id_rsa.pub

2.2 인증 퍼미션 설정하기. 

인증 퍼미션은 중요도에 따라 다음과 같이 설정한다. 
id_rsa는 private_key이므로 타인에게 공개해서는 안되는 중요한 파일이다. 

chomd 700 .ssh
chmod 600 .ssh/id_rsa
chmod 644 .ssh/id_rsa.pub

2.3 클라이언트용 공개키항목에 추가하기 

cat id_rsa.pub >> authorized_keys
보통 클라이언트는 authorized_keys에 공개키들을 나열하고 있다. 그러므로 상기 명령어로 공개키를 추가해주자. 





[Spark] Spark Install Guide and Standalone mode start

Apache Spark Install Guide 

1. 스파크 다운로드 및 구성요소 

1. Apache-Spark 다운로드

http://spark.apache.org/downloads.html로 이동한다.

2. 다운로드 버젼 설정 

스파크 최신 버젼을 선택했다.

사전빌드(Pre-build)이고, 대부분의 하둡 배포판에서 이용할 수 있다고 한 두번째 항목은 선택했다.
필요한경우 이미 설치되어 있는 적합한 하둡 버젼을 다운받으면 된다. 

# 참고 : 하둡은 반드시 설치 되어 있지 않아도 된다.

3. 다운로드 받은 파일 압축 풀기. 

txr -xvf spark-1.6.1-bin-without-hadoop.tgz
ln -s spark-1.6.1-bin-without-hadoop spark
vim ~/.bash_profile
export SPARK_HOME=/Users/KIDO/Documents/00.TOOLS/spark
export PATH=$JAVA_HOME:$MYSQL_HOME:$ACTIVATOR_HOME:$SPARK_HOME/bin:$PATH
상기 항목을 입력하고 :wq로 빠져나온다.

source .bash_profile

4. 스파크 디렉토리 둘러보기


bin
  - 실행 명령이 들어있다.
conf
  - 스파크 설정을 위한 설정파일
  - 스파크 환경설정, log설정, docker관련 설정, slave관련 설정등 다양하다.
examples
  - 스파크를 사용한 다양한 예제를 살펴볼 수 있다.
lib
  - 스파크에 필요한 라이브러리들이 존재한다.
  - spark-yarn 셔플 라이브러리, 하둡 라이브러리등이 있다.
python
  - 파이선 스파크를 위한 다양한 라이브러리와 스크립트가 있다.
R
  - spark R을 위한 라이브러리
sbin
  - 스파크 마스터서버 및 슬레이브 서버 실행 명령

2. Spark Standalone Mode 

출처 : http://spark.apache.org/docs/latest/spark-standalone.html

스파크는 Mesos혹은 YARN 상에서 수행이 된다. 그러나 Standalone모드를 지원하고 있다.
스탠드 얼론 모드는 하나의 클러스터에서 마스터와 워커를 수동으로 실행 할 수 있도록 제공한다.

1. 클러스터 수동 실행 

1.1 마스터 서버 실행하기
./sbin/start-master.sh
마스터를 실행하면 스파크는 ://HOST:PORT의 형식으로 URL을 프린트한다.
이 정보를 이용하여 마스터 서버에 접근하거나 SparkContext로 해당 정보를 전달 할 수 있다.
또한 마스터의 WEB-UI는 http://localhost:8080으로 접근할 수 있다.

1.2 슬레이브 워커 실행하기.
./sbin/start-slave.sh <master-spark-URL>
슬레이브 워커를 수행하면 master WEB-UI에서 신규 노드가 접속되었음을 확인할 수 있다.
또한 CPU와 메모리 정보등을 직접 확인할 수 있다.

1.3 마스터와 슬레이브 설정 파라미터
ArgumentMeaning
-h HOST--host HOST리슨할 호스트명을 설정한다.
-i HOST--ip HOST리슨할 호스트 이름을 설정한다. (deprecated됨, -h나 --host이용)
-p PORT--port PORT리슨할 서비스의 포트번호 (기본 : 7077 마스터, worker은 랜덤)
--webui-port PORTweb UI를 위한 포트 (기본 마스터 : 8080, 워커 : 8081)
-c CORES--cores CORES스파크 어플리케이션이 사용할 CPU코어의 총 개수,
머신에 따라 다르게 설정될 수 있다. (기본 : all available)
이것은 worker에만 설정한다. 
-m MEM--memory MEM스파크 어플리케이션이 사용할 총 메모리 양,
머신에 따라 다르게 설정될 수 있다. (기본 : 머신 총 RAM - 1GB)
형식 : 1000M 혹은 2G 등으로 설정
이것은 worker에만 설정한다. 
-d DIR--work-dir DIR작업에 사용할 공간과 job가 출력할 로그를 저장할 디렉토리
(기본 : SPARK_HOME/work)이며 worker에만 설정한다. 
--properties-file FILE로드할 커스텀 Spark 프로퍼티 패스를 지정한다.
(기본 : conf/spark-defaults.conf)

2. 클러스터 런치 스크립트 

스파크를 스탠드 얼론 모드로 스크립트로 수행하기 위해서는, conf/slave 라고 불리는 파일을 생성해야한다. 이는 spark디렉토리 내부에 존재한다.
이 파일에는 스파크 워커가 실행할 모든 머신의 호스트 명이 기술 되어있어야 한다.
만약 conf/slave가 존재하지 않는다면  lunch script는 기본적으로 단일 머신에서 수행된다. (localhost)
이는 테스팅을 위해서 유용하게 사용된다.
마스터 머신은 각 워커 머신에 ssh를 이용하여 접근하는것이 일반적이다. 기본적으로는 ssh를 이용하며, 이를 이용하면 password없이 서버간 access가 가능하다.
만약 password-less설정을 하지 않았다면 SPARK_SSH_FOREGROUND 변수값에 각 워커를 위한 password를 설정할 수 있다.

설정을 완료하고 나면 launch나 stop를 다음 쉘 스크립트를 통해서 수행할 수 있다. 이 쉘들은 SPARK_HOME/sbin에 들어있다.
  • sbin/start-master.sh - 마스터 인스턴스를 실행한다. 
  • sbin/start-slaves.sh - conf/slaves 파일에 지정된 머신에 슬레이브를 실행한다. 
  • sbin/start-slave.sh - 머신내에 슬레이브 인스턴스를 실행한다. 
  • sbin/start-all.sh - 상단에 기술한 모든 마스터와 슬레이브를 실행한다. 
  • sbin/stop-master.sh - bin/start-master.sh스크립트를 통해 실행한 마스터 인스턴스를 종료한다. 
  • sbin/stop-slaves.sh - conf/slaves파일에 지정된 머신에서 슬레이브를 종료시킨다. 
  • sbin/stop-all.sh - 마스터와 슬레이브 모두 종료한다. 
주의할 것은 상기 스크립트는 마스터 노드에서 실행해야한다. 슬레이브 노드에서 각각 실행해서는 안된다.

클러스터의 설정은 conf/spark-env.sh에 있는 환경 변수에 값을 설정하여 선택적으로 수행할 수 있다.
이 파일은 conf/spark-env.sh.template를 복사하여 모든 워커 머신에 설정값을 지정하면 된다.

환경변수 목록 : 
Environment VariableMeaning
SPARK_MASTER_IP특정 IP주소에 마스터를 바인딩한다. 
SPARK_MASTER_PORT다른 포트로 마스터를 실행한다. (기본 : 7077)
SPARK_MASTER_WEBUI_PORT마스터 web UI용 포트 (기본:8080)
SPARK_MASTER_OPTS마스터에 설정될 옵션들을 설정한다.
형식 : "-Dx=y" 의 형식으로 설정한다.
아래 가능한 옵션 참조. 
SPARK_LOCAL_DIRS스파크가 사용할 디렉토리를 지정한다. 이것은 map 출력파일, RDD 등을 저장한다. 성능을 위해서는 시스템 내의 로컬 디스크에 저장하는것이 좋다. 콤마를 이용하여 서로다른 디스크상의 디렉토리를 나열할 수 있다. 
SPARK_WORKER_CORES스파크 어플리케이션이 머신내에서 사용할 총 코어의 개수를 지정한다. (기본값 : all avaliable cores)
SPARK_WORKER_MEMORY스파크 어플리케이션이 머신내에서 사용할 총 메모리의 양을 지정한다.
형식 : 1000m, 2G (기본적으로 총 메모리 - 1G를 설정한다.)
각 어플리케이션들의 개별적인 메모리 설정은 spark.executor.memoryproperty를 이용하여 설정할 수 있다. 
SPARK_WORKER_PORT특정 포트로 스파크 워커를 실행한다. (기본값 : random)
SPARK_WORKER_WEBUI_PORT워커의 web UI를 수행할 포트를 지정한다. (기본값 : 8081)
SPARK_WORKER_INSTANCES각 머신에서 수행할 워커 인스턴스의 개수를 지정한다. (기본 : 1)
1 이상을 설정할 수 있으며, 매우 큰 머신에서 이용하거나, 복수개의 워커 프로세스를 두고자 할때 이 값을 증가 시킬 수 있다.
이 값을 변경한경우에는 SPARK_WORKER_CORES 를 함께 확인할 필요가 있다. 이것은 각 워커의 워커를 명시적으로 제한하기 때문이다.
혹은 각 워커는 모든 코어를 사용할 수 있다. 
SPARK_WORKER_DIR어플리케이션들을 수행할 디렉토리를 지정한다. 이곳에는 로그와 워커 작업 공간으로 이용된다. (기본 : SPARK_HOME/work)
SPARK_WORKER_OPTS워커에 대한 설정값을 지정하고자 할때 사용한다.
형식 : "-Dx=y"
아래 옵션을 참조하자. 
SPARK_DAEMON_MEMORY스파크 마스터와 워커 데몬 자체에 할당할 메모리를 지정한다. (기본 : 1g)
SPARK_DAEMON_JAVA_OPTSJVM옵션으로 스파크 마스터와 워커 데몬 자체에 설정하는 옵션값이다.
형식 : "-Dx=y"의 형태이다. 
SPARK_PUBLIC_DNSpublic DNS명을 지정한다. 
* 참고 : launch script는 window에서는 적용되지 않는다. 윈도우에서 실행하기 위해서는 수동으로 직접 설정해야한다.

SPARK_MASTER_OPTS 옵션 : 
Property NameDefaultMeaning
spark.deploy.retainedApplications200디스플레이할 완료된 어플리케이션의 총 개수를 지정한다. 오래된 어플리케이션은 UI에서 이 제한에 걸려 나타나지 않을 것이다. 
spark.deploy.retainedDrivers200디스프레이할 완료된 드라이버들의 최대 개수를 지정한다. 오래된 드라이버는 UI에서 이 제한에 걸려 나타나지 않을 것이다. 
spark.deploy.spreadOuttrue스탠드 얼론 클러스터 매니저는 노드들로 어플리케이션을 분산하게 될 것이다. 아니면 가능하면 몇개의 노드들로 통합해서 처리하게 된다. 분산 하는 작업은 보통 HDFS에서 더 낳은 데이터 로컬리티를 위해 이용된다. 그러나 통합 작업은 계산에 더 집중적인 작업을 위해서 효과적이다. 
spark.deploy.defaultCores(infinite)만약 spark.cores.max를 지정하지 않은경우라고 하면 스파크 스탠드 얼론에서 어플리케이션에 할당할 기본 코어수를 지정하도록 한다. 설정되지 않았다면 어플리케이션은 항상 모든 가능한 코어를 spark.cores.max에 할당한다. 공유된 클러스에 이 값을 낮게 지정하여 전체 클러스터를 사용자가 잡아버리는 것을 막기 위해 이용할 수 있다. 
spark.worker.timeout60만약 마스터가 허트비트를 수힌하지 못한경우 얼마의 시간이 지나면 워커가 유실 되었다는 것을 파악할지 지정하는 시간이다. (초단위)
SPARK_WORKER_OPTS 옵션 : 
Property NameDefaultMeaning
spark.worker.cleanup.enabledfalseworker의 /application 디렉토리를 주기적으로 클린업 할지 여부를 설정한다. 이것은 오직 standalone모드에서만 효과가 있는 설정값이다. WARN은 다르게 동작한다. 이것은 오직 정지된 어플리케이션의 디렉토리만을 클린업 하게 된다. 
spark.worker.cleanup.interval1800 (30 minutes)로컬머신에서 오래된 어플리케이션 작업의 디렉토리를 클린업 할 인터벌 시간을 나타낸다. 초단위이다. 
spark.worker.cleanup.appDataTtl7 * 24 * 3600 (7 days)각 워커에서 유지해야할 작업 디렉토리의 TTL을 지정한다. 초단위이며 이것은 Time To Live이며, 가능한 디스크의 용량에 의존하게 된다. 어플리케이션 로그와 jars들이 어플리케이션 워크 디렉토리에 다운로드 된다. 시간이 지나면 work dirs들은 빠르게 디스크 공간을 차지해 버리게 된다. 특히 job들을 매우 빈번하게 실행하면 말이다. 

3. 어플리케이션에서 Cluster로 커넥팅 하기

스파크 클러스터에서 어플리케이션을 수행하는 것은 단순히 spark://IP:PORT 를 SparkContext constructor에 전달하는 것으로 가능하다.
스파크 쉘이 클러스터와 상호 인터렉션 하는 것은 다음 커맨드로 가능하다.

./bin/spark-shell --master spark://IP:PORT
또한 옵션을 전달 할 수 있으며 --total-executor-cores <numCores> 를 지정하여 스파크 쉘이 클러스터에서 사용할 코어의 개수를 설정하게 할 수 있다.

4. Spark Application들 런칭하기. 

spark-submit 스크립트는 직관적으로 컴파일된 스파크 어플리케이션을 클러스터로 서밋 하게 할 수 있다.
standalone 클러스터들에서는 현재 제공되는 2가지 모드가 있다. client모드에서 드라이버는 동일한 프로세스에서 런치 되며, 클라이언트 어플리케이션에서 서밋이 된다. cluster모드에서 드라이버는 클러스터내의 하나의 워커 프로세스들의 하나로 부터 런치되며, 클라이언트 프로세스는 바로 어플리케이션의 서밋 작업에 대한 응답을 수행한다. 이는 어플리케이션이 종료되기 까지 대기하지 않고 수행된다.

만약 어플리케이션이 Spark 서밋을 통해서 런치되며 어플리케이션 jar는 자동적으로 모든 워커 노드로 배포된다. 어플리케이션에 의존된 다른 jar들은 --jars옵션에 콤마로 분리하여 (ex --jars jar1,jar2) 참조 가능하다.
어플리케이션의 설정 혹은 실행 환경을 컨트롤 하는 것은 Spark Configuration을 참조하자.

추가적으로 스탠드 얼론 클러스터 모드는 0이 아닌 값으로 종료 되었을때 자동으로 재 시작 하는 모드를 제공한다. 이러한 기능을 이용하기 위해서는 --supervise플래스에서 spark-submit를 전달하면 되며, 이는 어플리케이션 실행시에 전달된다. 그리고 반복되는 실패때문에 어플리케이션을 종료하고자 한다면 다음과 같이 수행하면 된다.

./bin/spark-class org.apache.spark.deploy.client kill <master url> <driver ID>
드라이버 아이디는 http://<master url>:8080에서 web UI를 이용하여 해당 아이디를 찾을 수 있다.

5. Resource Scheduling 

standalone 클러스터 모드에서는 현재 단순 FIFO 스케줄러만을 제공하고 있다.
그러나 복수의 동시 이용자를 허용하기 위해서 각 어플리케이션 이 사용할 최대 리소스의 개수를 조정할 수 있다. 기본적으로 이것은 클러스터 내의 모든 코어를 필요로 하게 되지만 한번에 하나의 어플리케이션만을 실행할때에만 유효하다. 이 설정은 spark.cores.max 를 SparkConf내에서 지정해줄 수 있다.

예제 )
val conf = new SparkConf()
            .setMaster(...)
            .setAppName(...)
            .set("spark.cores.max", "10")
val sc = new SparkContext(conf)
추가적으로 spark.deploy.defaultCores를 클러스터 마스터 프로세스에서 설정할 수 있다.
이 설정은 conf/spark-env.sh에서 다음과 같이 설정할 수 있다.
export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"
이것은 공유 클러스터 상에서 유용하다. 사용자가 개인적으로 최대수의 cores를 지정하지는 않을때 말이다.

6. Monitoring and Logging

Spark 스탠드얼론 모드는 웹 기반의 사용자 인터페이스를 제공하여 클러스터 모니터링을 수행할 수 있도록 한다. 마스터와 각 워커들은 각자의 web UI를 가지고 있으며 각 클러스터와 잡들을 볼수 있다. 기본적으로 웹 UI에 8080을 통해서 접근할 수 있다. 포트는 설정 파일에서 변경하거나, 수행시 파라미터를 통해서 설정이 가능하다.
추가적으로 각 잡의 상세 로그는 각 슬레이브 노드의 워크 디렉토리에 저장된다. 기본적으로 SPARK_HOME/work디렉토리에 저장된다. 또한 잡들에 대해서 2개의 파일이 존재하며 stdout과 stderr로 저장이 된다.

7. Running Alongside Hadoop

존재하는 하둡을 이용하고자 하는 경우에는 단순히 동일한 머신에서 분리된 서비스로 이용하면 된다. Hadoop 데이터를 스파크에서 엑세스 하기 위해서는 hdfs://URL (보통 : hdfs://<namenode>:9000/path 이다. ) 반대로 완젼 다른 서버에서 하둡과 연동할 수 있다. 이때에는 네트워크를 통해서 접속할 수 있으며, 로컬 위치의 데이터 엑세스에 비해서 성능은 떨어진다.

8. Configuring Ports for Network Security

 스파크는 네트워크 트래픽을 많이 발생 시킨다. 또한 어떠한 시스템에서는 파이어월 설정을 이용하기 위해서 제한된 요구사항을 가지기도 한다. 이러한 설정을 확인하려면 다음 경로를 찾아가자. http://spark.apache.org/docs/latest/security.html#configuring-ports-for-network-security


9. High Availability

기본적으로 스탠드얼론 스케줄링 클러스터는 Worker의 실패들에 대해서 회복력이 있다. 그러나 스케줄러는 Master를 이용하여 스케줄링 결정을 내린다. 그리고 기본적으로 이것은 단일 실패지점 (SPOF)가 된다. 만약 마스터가 크래쉬되면, 어떠한 새로운 어플리케이션도 생성되지 않는다. 이러한 문제를 극복하기 이ㅜ해서 2개의 high availability 스키마를 아래와 같이 구성한다.

10. Standby Masters with ZooKeeper

Overview
zookeeper의 주 용도는 리더의 선출과 상태를 저장하는 것이다. 사용자는 복수개의 마스터를 클러스터에 설정하여 동일한 ZooKeeper인스턴스에 연결할 수 있다. 한대는 "leader"로 선출되고 나머지는 standby mode로 마스터를 구성한다. 만약 현재 leader가 죽으면, 다른것이 Master가 된다. 그리고 마스터의 상태를 복구하게 되고 동일한 스케줄링을 다시 수행하게 된다.
전체 복구 프로세스는 아마도 처음 마스터가 죽고 나서 부터 약 1 ~ 2분 정도 소요된다.
이 딜레이는 새로운 어플리케이션 스케줄링에만 영향을 준다. 어플리케이션들은 마스터 failover를 수행하는 동안 계속해서 수행되고 있는 상태가 된다.

Zookeeper에 대해서 더 많은 것을 보려면 http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html 을 참조하자.

Configuration
스파크 복구 모드는 다음과 같이 spark-env스크립트 내에 SPARK_DAEMON_JAVA_OPTS 에 설정할 수 있다.
System propertyMeaning
spark.deploy.recoveryModezookeeper를 standby 모드로 설정하도록 한다. (기본 : NONE)
spark.deploy.zookeeper.urlZooKeeper 클러스터 url을 설정한다.
예) 192.168.1.100:2181,168.1.101:2181
spark.deploy.zookeeper.dirThe directory in ZooKeeper to store recovery state (default: /spark).
멀티플 마스터를 가지고 있는 상태에서 마스터가 주키퍼를 이용하는 설정에 실패를 하게 되면 마스터는 서로를 확인하는 작업에 실패하게 되며, 모든 마스터들이 리더라고 생각하게 된다. 이때에는 올바른 클러스터 상태를 이용하기 어렵게 된다. (모든 마스터들은 독립적으로 스케줄링 하게 된다.)

Details
주키퍼 설정을 완료하고 나면 high availability를 기동할 수 있다. 단순히 복수개의 마스터를 서로다른 노드에서 실행하고 동일한 주키퍼 설정을 가지고 있으면 된다. (ZooKeeper URL과 디렉토리 설정). 마스터들을 언제든지 추가나 제거 할 수 있다.
새로운 어플리케이션을 스케줄 하거나 클러스터에 worker를 추가할때에는 현재 Master의 IP를 알아야 한다. 이것은 단순히 마스터 리스트에서 하나에다가 전달 하는 것으로 가능하다. 예를 들어 SparkContext를 다음과 같이 마스터의 위치를 포인트 할 수 있다. spark://host1:port1,host2:port2 로 지정하는 것이다.
이렇게 하면 SparkContext는 두개의 마스터에 접근을 시도한다. host1이 내려가 있으면 이 설정은 새로운 리더를 찾을 것이며 이것이 host2가 됨을 알려 줄 것이다.

"registering with a Master"과 normal operation사이에는 중요한 구별 되는 점이 있다. 어플리케이션이나 워커가 실행될때 현재의 마스터를 찾거나 등록하는 작업이 필요하다. 이 작업이 한번 성공적으로 수행되고 나면 이 내용을 Zookeeper에 저장한다. 만약 failover가 발생하면 새로운 리더가 모든 등록된 어플리케이션들과 Workers에 접근을 시도하며, 이후 모든 클러스터에게 새로운 리더를 알린다.

이러한 속성으로 인해서 새로운 마스터는 언제든지 생성될 수 있다, 고민해야할 것은 오직 새로운 어플리케이션과 워커들이 새로운 리더를 찾고, 이를 등록하는 작업만 하면 된다. 한번 등록하면 언제든지 이용할 수 있다.

11. Single-Node Recovery with Local File System

Overview
주키퍼는 high availability를 위한 가장 좋은 방법을 제공한다. 그러나 마스터가 다운되었을때 단지 리스타트를 하기를 원할 수 있다. FILESYSTEM모드는 이러한 작업을 할 수 있다. 어플리케이션과 워커가 등록될때 제공된 디렉토리에 상태를 저장하고, 이것은 마스터 프로세스가 리스타트 될때 이용 할 수 있다.

Configuration
복구 모드를 기동하기 위해서는 spark-env 설정파일의 SPARK_DAEMON_JAVA_OPTS에 지정할 수 있다.
System propertyMeaning
spark.deploy.recoveryModeFILESYSTEM을 설정하여 단일 노드 복구 모드를 실행할지 지정한다.
(기본 :  ㅜNONE)
spark.deploy.recoveryDirectorySpark가 복구 상태를 저장하고 접근할수있는 상태를 저장할 디렉토리를 지정한다. 
Detail 
  • This solution can be used in tandem with a process monitor/manager like monit, or just to enable manual recovery via restart.
  • While filesystem recovery seems straightforwardly better than not doing any recovery at all, this mode may be suboptimal for certain development or experimental purposes. In particular, killing a master via stop-master.sh does not clean up its recovery state, so whenever you start a new Master, it will enter recovery mode. This could increase the startup time by up to 1 minute if it needs to wait for all previously-registered Workers/clients to timeout.
  • While it’s not officially supported, you could mount an NFS directory as the recovery directory. If the original Master node dies completely, you could then start a Master on a different node, which would correctly recover all previously registered Workers/applications (equivalent to ZooKeeper recovery). Future applications will have to be able to find the new Master, however, in order to register.