Spark - WordCount 예시
2024. 5. 14. 00:01ㆍSPARK
Spark를 설치하지 않았다면 spark부터 다운하자
다운 후 버전 확인해보기
$spark-submit --version
<주의>
$gradle wrapper --> 1. gradle 2. gradlew 이 두가지가 있어야 다른곳에서 compile가능하니 꼭 이 명령어를 쳐주자.
$gradlew build --> build.gradle을 생성하는 명령어니 꼭 쓰도록 하자.
<WordCount 코드>
Rdd transformation, action 흐름 - transformation
1. words가 작성되어 있는 파일 읽어오기 - transformation
2. 텍스트 파일을 구분자('|' 또는 ';' 또는 '&')로 자르기 - transformation
3. 각 word들이 key가 되고 value는 1로 초기화 후 tuple로 변경 - transformation
4. 같은 key들 전부 aggregation 진행 - transformation
5. 결과는 다시 텍스트 파일로 저장 - action
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
public class Main {
public static void main(String[] args) {
// SparkContext 생성
JavaSparkContext sc = new JavaSparkContext("local", "WordCount");
// 1. 텍스트 파일 읽어오기 - transformation
JavaRDD<String> lines = sc.textFile("input.txt");
// 2. 텍스트 파일을 구분자('|' 또는 ';' 또는 '&')로 자르기 - transformation
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
return Arrays.asList(line.split("[|;&]")).iterator();
}
});
// 3. 각 word들이 key가 되고 value는 1로 초기화 후 tuple로 변경 - transformation
JavaRDD<Tuple2<String, Integer>> wordTuple = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<>(word.trim(), 1);
}
});
// 4. 같은 key들 전부 aggregation 진행 - transformation
JavaRDD<Tuple2<String, Integer>> wordCounts = wordTuple.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer a, Integer b) throws Exception {
return a + b;
}
});
// 5. 결과를 다시 텍스트 파일로 저장 - action
wordCounts.map(new Function<Tuple2<String, Integer>, String>() {
@Override
public String call(Tuple2<String, Integer> tuple) throws Exception {
return tuple._1 + ": " + tuple._2;
}
}).saveAsTextFile("output");
// SparkContext 종료
sc.stop();
}
}
spark 실행법
'SPARK' 카테고리의 다른 글
Spark - RDD 다루기 (0) | 2024.05.13 |
---|