Machine Learning on Spark using Java
References
- Download Apache Spark
- MLlib is a built-in library of Spark
- Spark supports Python, Scala, and Java
- Spark Programming Guide
- Sark Configuration
- Spark Java Examples
- MLlib Guide
- Apache Spark Machine Learning Tutorial
- Spark Java API doc
- Sampling Large Datasets using Spark
Startup
A Simple Application (Word Count)
Sample Code
/* SimpleApp.java */
/**
* Illustrates a wordcount in Java
*/
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
public class WordCount {
private static Logger logger = Logger.getLogger(WordCount.class);
public static void main(String[] args) throws Exception {
String inputFile = args[0];
String outputPath = args[1];
// Create a Java Spark Context.
SparkConf conf = new SparkConf().setAppName("Simple Project");
JavaSparkContext sc = new JavaSparkContext(conf);
// Load our input data.
JavaRDD<String> input = sc.textFile(inputFile);
// Split up into words.
JavaRDD<String> words = input .flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String x) {
return Arrays.asList(x.split(" "));
}
});
// Transform into <word, one> pair.
JavaPairRDD<String, Integer> word_one = words .mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s, 1);
}
}).cache();
List<Tuple2<String, Integer>> result;
JavaPairRDD<String, Integer> counts_apporache_1 = word_one .reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
result = counts_apporache_1.collect();
counts_apporache_1.saveAsTextFile(outputPath);
for(Tuple2 r : result)
logger.info(r);
}
}
<!--pom.xml-->
<project>
<groupId>edu.berkeley</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
</project>
- Note that
SparkConf().setAppName
in main java class must matchproject->name
in pom.xml. - Note that
project->dependencies
in pom.xml must contain all libraries we import in our java classes.
Canonical Maven directory structure
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java
Package
the application using Maven.
# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar
Prepare input file and output environment
# Make input directory
$ hadoop fs -mkdir test/input
# Copy input file(s) from local to remote
$ hadoop fs -copyFromLocal ./input/data test/input
# Remove output directory to prevent conflicts
$ hadoop fs -rm -r test/output
Execute it with spark-submit
# Use spark-submit to run your application
$ spark-submit \
--class "SimpleApp" \
--master local[4] \
target/simple-project-1.0.jar
- Note that you fill up your main java class name after
--class
- Note that we run with
local[4]
, meaning 4 threads - which represents “minimal” parallelism.
View and download the output files
# List the output files
$ hadoop fs -ls test/output
# View the output files
$ hadoop fs -cat test/output/part-*
# Download output files from remote to local
for i in `seq 0 10`;
do
hadoop fs -copyToLocal test/output/part-0000$i ./
done
- Note that you can modify
seq 0 10
as your need.