[Spark] Quick Start

Spark Quick Start

Interactive Analysis with the Spark Shell

기본 : 

스파크 쉘을 다음과 같이 실행하자. 이는 대화형 데이터 분석을 위한 강력한 툴이다.
./bin/pyspark
스파크의 중요한 추상화는 Resilient Distributed Dataset(RDD)라고 부르는 분산 컬렉션이다.
RDD는 Hadoop 입력 포맷 이나 transformation등을 통해서 생성된다. 스파크 패키지에 존재하는 README 파일을 읽어 들이는 예제를 보자.
>>> textFile = sc.textFile("README.md")
RDD는 actions과 transformations를 가진다. action은 반환 값을 가지며, transformation은 새로운 RDD를 생성한다.
다음 몇개의 액션들을 보자.
>>> textFile.count()       # 이 RDD에서 아이템의 총 개수를 센다.
126
>>> textFile.first()          # 이 RDD에서 첫번째 아이템을 출력한다.
u'# Apache Spark'
다음은 transformation을 사용한 예제이다.
이것은 filter transformation을 통해서 새로운 RDD를 생성한다. 이는 file의 subset이다.
>>> linesWithSpark = textFile.filter(lambda line: "Spark" in line)
그리고 transformations과 actions의 체인을 걸어줄 수 있다.
>>> textFile.filter(lambda line: "Spark" in line).count()   # "Spark"를 포함하는 라인이 몇개인가?
15

RDD 연산 더보기 

RDD actions와 transformations는 더 복잡한 계산을 위해서 사용이 가능하다.
가장 긴 단어의 라인을 찾는다고 해보자.
>>> textFile.map(lambda line: len(line.split())).reduce(lambda a, b: a if (a > b) else b)
15
이 첫번째 맵은 라인을 정수 값으로 하는 새로운 RDD를 생성한다. reduce는 가장 긴 라인 카운트를 찾는데 호출된다.map 아규먼트와 recude아규먼트는 (lambda)로 anonymous functions이다. 우리는 최상위 레벨의 파이선 함수를 전달할 수도 있다. 예를 들어 우리는 max함수를 이해하기 쉽게 다음과 같이 만들 수 있다.

>>> def max(a, b) :
...            if a > b:
...                return a
...            else:
...                return b
>>> textFile.map(lambda line: len(line.split())).reduce(max)
15
하나는 Hadoop에 잘 알려진 데이터 흐름 패턴인 MapReduce이다. Spark는 다음과 같이 쉽게 MapReduce를 구현할 수 있다.
>>> wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
여기에서 우리는 flatMap, map과 reduceByKey transformations를 서로 연결하여 RDD에 존재하는 각 단어의 개수를 새로운 RDD로 (string, int) 쌍으로 만들어 낸다.
우리에 쉘에서는 단어 수를 세는 작업은 collect 액션을 이용하여 구현한다.
>>> wordCounts.collect()
[[(u'and', 9), (u'A', 1), (u'webpage', 1), (u'README', 1), (u'Note', 1), (u'"local"', 1), (u'variable', 1), ...]

Caching

Spark는 또한 여러 클러스들에 넓게 메모리 캐시에 데이터를 넣을 수 있는 기능을 제공한다. 이것은 데이터를 반복적으로 접근할때 매우 유용한 기술이다. 작은 "hot" 데이터셋을 쿼리 할때나, PageRank와 같은 알고리즘을 반복적으로 수행하고자 할때 유용하다. 다음은 linesWithSpark데이터 셋을 캐시 하는 예이다.

>>> linesWithSpark.cache()
>>> linesWithSpark.count()
19
>>> linesWithSpark.count()
19

만약 100라인짜리 텍스트 파일을 캐시한다면 매우 우스운 일일 것이다. 흥미로운 것은 이러한 동일한 함수를 통해서 매우 큰 데이터 셋을 사용할 수 있다는 것이다. 비록 이들이 10개에서 수백개의 노드에 흩어져 있더라도 말이다. 또한 bin/pyspark를 통해서 클러스터에 연결하여 이러한 작업을 수행할 수 있다. 이는 프로그래밍 가이드를 참조하자. https://spark.apache.org/docs/latest/programming-guide.html#initializing-spark

Self-Contained Applications

우리는 Spark API를 이용하여 self-contained application을 작성할 수 있다. 
다음 SimpleApp.py 파일을 살펴보자. 

"""SimpleApp.py"""
from pyspark import SparkContext
logFile = "YOU_SPARK_HOME/README.md"
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
이 프로그램은 텍스트 파일에서 각 라인중에 a와 b를 포함한 라인의 수를 카운트 한다. YOUR_SPARK_HOME은 인스톨된 스파크의 위치를 지정하면 된다. SparkContext는 RDD를 생성하기 위해서 사용된다. 또한 Spark에 Python 함수를 전달할수 있다. 이것은 자동으로 해당 함수를 참조하도록 시리얼라이즈 된다. 어플리케이션에서 사용하기 위해 커스텀 클래스혹은 서드파티 라이브러리를 이용할 수 있다. 또한 이 외부 어플리케이션을 실행하기 위해서 spark-submit를 호출할 수 있다. 자세한 내용은 spark-submit --help를 통해서 사용법을 확인해보자. 

다음 과 같이 함수를 실행해보자. 
# spark-submit 를 실행하여 어플리케이션을 실행한다. 
$ YOUR_SPARK_HOME/bin/spark-submit \
   --master local[4] \
   SimpleApp.py
...
Lines with a: 46, Lines with b: 23



Share this

Related Posts

Previous
Next Post »