References

Startup

A Simple Application (Word Count)

Sample Code

/* SimpleApp.java */
/**
    * Illustrates a wordcount in Java
    */
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;

public class WordCount {
    private static Logger logger = Logger.getLogger(WordCount.class);

    public static void main(String[] args) throws Exception {
        String inputFile = args[0];
        String outputPath = args[1];

        // Create a Java Spark Context.
        SparkConf conf = new SparkConf().setAppName("Simple Project");
        JavaSparkContext sc = new JavaSparkContext(conf);

        // Load our input data.
        JavaRDD<String> input = sc.textFile(inputFile);

        // Split up into words.
        JavaRDD<String> words = input .flatMap(new FlatMapFunction<String, String>() {
                public Iterable<String> call(String x) {
                    return Arrays.asList(x.split(" "));
                }
            });

        // Transform into <word, one> pair.
        JavaPairRDD<String, Integer> word_one = words .mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<>(s, 1);
                }
            }).cache();

        List<Tuple2<String, Integer>> result;

        JavaPairRDD<String, Integer> counts_apporache_1 = word_one .reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            });
        result = counts_apporache_1.collect();
        counts_apporache_1.saveAsTextFile(outputPath);
        for(Tuple2 r : result)
            logger.info(r);
    }
}
<!--pom.xml-->
<project>
    <groupId>edu.berkeley</groupId>
    <artifactId>simple-project</artifactId>
    <modelVersion>4.0.0</modelVersion>
    <name>Simple Project</name>
    <packaging>jar</packaging>
    <version>1.0</version>
    <dependencies>
    <dependency> <!-- Spark dependency -->
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>
    </dependencies>
</project>
  • Note that SparkConf().setAppName in main java class must match project->name in pom.xml.
  • Note that project->dependencies in pom.xml must contain all libraries we import in our java classes.

Canonical Maven directory structure

$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

Package the application using Maven.

# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar

Prepare input file and output environment

# Make input directory
$ hadoop fs -mkdir test/input
# Copy input file(s) from local to remote
$ hadoop fs -copyFromLocal ./input/data test/input
# Remove output directory to prevent conflicts 
$ hadoop fs -rm -r test/output

Execute it with spark-submit

# Use spark-submit to run your application
$ spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/simple-project-1.0.jar
  • Note that you fill up your main java class name after --class
  • Note that we run with local[4], meaning 4 threads - which represents “minimal” parallelism.

View and download the output files

# List the output files
$ hadoop fs -ls test/output
# View the output files
$ hadoop fs -cat test/output/part-*
# Download output files from remote to local
for i in `seq 0 10`;
do
hadoop fs -copyToLocal test/output/part-0000$i ./
done
  • Note that you can modify seq 0 10 as your need.

MapReduce Functions

Map

map

flatmap

mapToPair

flapMapToPair

mapValues

Filter

filter

Reduce

reduce

reduceByKey

Others

groupBy

sortByKey

distinct