Spark - WordCount 예시

2024. 5. 14. 00:01SPARK

Spark를 설치하지 않았다면 spark부터 다운하자

https://spark.apache.org/

 

Apache Spark™ - Unified Engine for large-scale data analytics

Run now Install with 'pip' $ pip install pyspark $ pyspark Use the official Docker image $ docker run -it --rm spark:python3 /opt/spark/bin/pyspark QuickStart Machine Learning Analytics & Data Science df = spark.read.json("logs.json") df.where("age > 21").

spark.apache.org

 

다운 후 버전 확인해보기

$spark-submit --version

 

<주의>

$gradle wrapper --> 1. gradle 2. gradlew 이 두가지가 있어야 다른곳에서 compile가능하니 꼭 이 명령어를 쳐주자.

$gradlew build  --> build.gradle을 생성하는 명령어니 꼭 쓰도록 하자.

 

<WordCount 코드>

Rdd transformation, action 흐름 - transformation

1. words가 작성되어 있는 파일 읽어오기 - transformation

2. 텍스트 파일을 구분자('|' 또는 ';' 또는 '&')로 자르기 - transformation

3. 각 word들이 key가 되고 value는 1로 초기화 후 tuple로 변경 - transformation

4. 같은 key들 전부 aggregation 진행 - transformation

5. 결과는 다시 텍스트 파일로 저장 - action

 

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;

public class Main {
    public static void main(String[] args) {
        // SparkContext 생성
        JavaSparkContext sc = new JavaSparkContext("local", "WordCount");

        // 1. 텍스트 파일 읽어오기 - transformation
        JavaRDD<String> lines = sc.textFile("input.txt");

        // 2. 텍스트 파일을 구분자('|' 또는 ';' 또는 '&')로 자르기 - transformation
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                return Arrays.asList(line.split("[|;&]")).iterator();
            }
        });

        // 3. 각 word들이 key가 되고 value는 1로 초기화 후 tuple로 변경 - transformation
        JavaRDD<Tuple2<String, Integer>> wordTuple = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<>(word.trim(), 1);
            }
        });

        // 4. 같은 key들 전부 aggregation 진행 - transformation
        JavaRDD<Tuple2<String, Integer>> wordCounts = wordTuple.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer a, Integer b) throws Exception {
                return a + b;
            }
        });

        // 5. 결과를 다시 텍스트 파일로 저장 - action
        wordCounts.map(new Function<Tuple2<String, Integer>, String>() {
            @Override
            public String call(Tuple2<String, Integer> tuple) throws Exception {
                return tuple._1 + ": " + tuple._2;
            }
        }).saveAsTextFile("output");

        // SparkContext 종료
        sc.stop();
    }
}

 

spark 실행법

 

'SPARK' 카테고리의 다른 글

Spark - RDD 다루기  (0) 2024.05.13