Apache Spark Examples
from : http://spark.apache.org/examples.html이 예제들은 Spark API의 전체를 간략하게 보여준다. Spark는 분산 데이터셋의 개념으로 만들어 져있다. 이는 Java, Python 객체들을 포함하고 있다. 외부 데이터로 부터 데이터셋을 생성하고 병렬로 그것들을 처리할 수 있다. Spark API의 building block 을 확인해보자. RDD API에는 2가지 타입의 오퍼레이션을 제공하고 있다. transformation으로 이전것으로 부터 새로운 데이터셋을 만들어 내는 작업을 한다. 다음으로 actions로 클러스터 상에서 작업을 시작하는 것이다. Spark의 RDD API의 최상위에는 고수준의 API를 제공하며 DataFrame API와 Machine Learning API가 그것이다. 이 고수준 API는 특정 데이터 오퍼레이션에 대한 간단한 방법으로 작업을 수행할 수 있도록 제공한다. 이 페이지에서는 RDD 고수준 API를 이용한 간단한 예제를 살펴볼 것이다.
RDD API Examples
Word Count
이 예제는 몇가지 transformation으로 데이터셋을 만들고 난 후, (String, Int)쌍의 데이터셋을 만들어 내는 작업을 한다. 이는 count라고 부른다. 그리고 다음으로 결과를 파일로 저장한다.1. Python 버젼 :
text_file = sc.textFile("hdfs://...")
counts = text_file.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b : a + b)
counts.saveAsTextFile("hdfs://...")
2. Scala 버젼 :
val textFile = sc.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")
3. Java 버젼 :
JavaRDD<String> textFile = sc.textFile("hdfs://...");
JavaRDD<String> words = textFile.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) { return Arrays.asList(s.split(" ")); }
});
JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) { return new Tuple2<String, String>(s, 1); }
});
JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>(){
public Integer call(Integer a, Integer b) {return a + b;}
});
counts.saveAsTextFile("hdfs://...");
Pi Estimation
Spark는 계산에 집중적인 태스크들에 사용된다. 이 코드는 원에 "throwing darts"를 수행해서 파이를 계산하는 예제이다. 우리는 ((0, 0) to (1, 1)) 사각 정방영역에 랜덤 포인트를 찍고, 얼마나 많이 단위 단위 원 내에 들어가는지 검사하고자 한다. fraction은 Pi / 4이며, 우리는 이 공식을 우리의 계산에 이용할 것이다.
1. Python 버전 :
def sample(p) :
x, y = random(), random()
return 1 if x*x + y*y < 1 else 0
count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample) \
.reduce(lambda a, b: a + b)
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
2. Scala 버젼 :
val count = sc.parallelize(1 to NUM_SAMPLES).map{ i =>
val x = Math.random()
val y = Math.random()
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / NUM_SAMPLES)
3. Java 버젼 :
List<Integer> l = new ArrayList<Integer>(NUM_SAMPLES);
for (int i = 0; i < NUM_SAMPLES; i++) {
l.add(i);
}
long count = sc.parallelize(l).filter(new Function<Integer, Boolean>() {
public Boolean call(Integer i) {
double x = Math.random();
double y = Math.random();
return x * x + y * y < 1;
}
}).count();
System.out.println("Pi is roughly " + 4.0 * count / NUM_SAMPLES);
DataFrame API Examples
Spark에서 DataFrame는 이름으로 된 칼럼 조합의 분산 데이터 집합이다. 사용자는 DataFrame API를 이용하여 외부 데이터 소스와 스파크의 내장 분산 컬렉션에 대해서 다양한 관계 연산을 수행할 수 있다. 이때 데이터 처리를 위한 별도의 프로시저 없이 이러한 연산이 가능하다. 또한 DataFrame API기반의 프로그램은 자동적으로 스파크 내장 옵티마이저에 의해서 최적화 된다.
Text Search
이 예제에서는 로그파일에서 에러 메시지를 찾는 예제이다.
1. Python 버젼
textFile = sc.textFile("hdfs://...")
# Creates a DataFrame having a single column named "line"
df = textFile.map(lambda r : Row(r)).toDF(["line"])
errors = df.filter(col("line").like("%ERROR%"))
# Counts all the errors
errors.count()
# Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count()
# Fetches the MySQL errors as an array of strings
errors.filter(col("line").like("%MySQL%")).count()
2. Scala 버젼
val textFile = sc.textFile("hdfs://...")
// Creates a DataFrame having a single column named "line"
val df = textFile.toDF("line")
val errors = df.filter(col("line").like("%ERROR%"))
// Counts all the errors
errors.count()
// Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count()
// Fetches the MySQL errors as an array of Strings
errors.filter(col("line").like("%MySQL%")).collect()
3. Java 버젼
// Creates a DataFrame having a single column named "line"
JavaRDD<String> textFile = sc.textFile("hdfs://...");
JavaRDD<Row> rowRDD = textFile.map(
new Function<String, Row>() {
public Row call(String line) throws Exception {
return RowFactory.create(line);
}
});
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("line", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
DataFrame errors = df.filter(col("line").like("%ERROR%"));
// Counts all the errors
errors.count();
// Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count();
// Fetches the MySQL errors as an array of strings
errors.filter(col("line").like("%MySQL%")).collect();
Simple Data Operations
이번 예제는 데이터베이스에 있는 테이블을 읽고 각 나이별 사람의 수를 계산하는 예제이다. 최종적으로 우리는 계산된 결과를 JSON타입으로 S3에 저장한다. 단순한 MySQL테이블 "people"에는 "name"과 "age" 2개의 칼럼이 있다.
1. Python 버젼
# Creates a DataFrame based on a table named "people"
# stored in a MySQL database
url = \
"jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"
df = sqlContext \
.read \
.format("jdbc") \
.option("url", url) \
.option("dbtable", "people") \
.load()
# Looks the schema of this DataFrame
df.printSchema()
# Counts people by age
countsByAge = df.groupBy("age").count()
countsByAge.show()
# Save countsByAge to S3 in the JSON format.
countsByAge.write.format("json").save("s3a://...")
2. Scala 버젼
// Creates a DataFrame based on a table named "people"
// stored in a MySQL database.
val url =
"jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"
val df = sqlContext
.read
.format("jdbc")
.option("url", url)
.option("dbtable", "people")
.load()
// Looks the schema of this DataFrame.
df.printSchema()
// Counts people by age
val countsByAge = df.groupBy("age").count()
countsByAge.show()
// Saves countsByAge to S3 in the JSON format.
countsByAge.write.format("json").save("s3a://...")
3. Java 버젼
// Creates a DataFrame based on a table named "people"
// stored in a MySQL database.
String url =
"jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword";
DataFrame df = sqlContext
.read()
.format("jdbc")
.option("url", url)
.option("dbtable", "people")
.load();
// Looks the schema of this DataFrame.
df.printSchema();
// Counts people by age
DataFrame countsByAge = df.groupBy("age").count();
countsByAge.show();
// Saves countsByAge to S3 in the JSON format.
countsByAge.write().format("json").save("s3a://...");
Machine Learning Example
MLib는 Spark 머신러닝 라이브러리이다. 이는 많은 ML알고리즘을 제공해주고 있다. 이 알고리즘들은 extraction, classification, regression, clustering, recommendation, 등을 제공한다. MLlib 는 또한 워크플로우 구성을 위한 파이프라인을 제공하며, 튜닝 파라미터를 위한 CrossValidator, 그리고 모델의 저장 및 로드를 위한 퍼시스턴스 모델을 제공한다.
Prediction with Learning Example
이 예제에서는 labels과 feature 벡터들의 데이터셋을 획득하고, feature 벡터들에서 부터 labels를 예측하는 학습 수행할 것이다. 이는 Logistic Regression 알고리즘을 이용한다.
1. Python 버젼
# Every record of this DataFrame contains the label and
# features represented by a vector.
df = sqlContext.createDataFrame(data, ["label", "features"])
# Set parameters for the algorithm.
# Here, we limit the number of iterations to 10.
lr = LogisticRegression(maxIter=10)
# Fit the model to the data.
model = lr.fit(df)
# Given a dataset, predict each point's label, and show the results.
model.transform(df).show()
2. Scala 버젼
// Every record of this DataFrame contains the label and
// features represented by a vector.
val df = sqlContext.createDataFrame(data).toDF("label", "features")
// Set parameters for the algorithm.
// Here, we limit the number of iterations to 10.
val lr = new LogisticRegression().setMaxIter(10)
// Fit the model to the data.
val model = lr.fit(df)
// Inspect the model: get the feature weights.
val weights = model.weights
// Given a dataset, predict each point's label, and show the results.
model.transform(df).show()
3. Java 버젼
// Every record of this DataFrame contains the label and
// features represented by a vector.
StructType schema = new StructType(new StructField[]{
new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
DataFrame df = jsql.createDataFrame(data, schema);
// Set parameters for the algorithm.
// Here, we limit the number of iterations to 10.
LogisticRegression lr = new LogisticRegression().setMaxIter(10);
// Fit the model to the data.
LogisticRegressionModel model = lr.fit(df);
// Inspect the model: get the feature weights.
Vector weights = model.weights();
// Given a dataset, predict each point's label, and show the results.
model.transform(df).show();
EmoticonEmoticon