Logistic Regression Deployment Using Java Spark
Preparing Data
- For detailed, please check https://hackmd.io/s/SkEYWCnjg
- Notice 1. that the target class should be at the last column.
- Notice 2. the header column should be removed from the input dataset.
Year | Month | DayofMonth | DayOfWeek | DepTime | CRSDepTime | ArrTime | CRSArrTime | UniqueCarrier | FlightNum | ... | TaxiIn | TaxiOut | Cancelled | CancellationCode | Diverted | CarrierDelay | WeatherDelay | NASDelay | SecurityDelay | LateAircraftDelay | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2000 | 1 | 28 | 5 | 1647.0 | 1647 | 1906.0 | 1859 | HP | 154 | ... | 15 | 11 | 0 | NaN | 0 | NaN | NaN | NaN | NaN | NaN |
1 | 2000 | 1 | 29 | 6 | 1648.0 | 1647 | 1939.0 | 1859 | HP | 154 | ... | 5 | 47 | 0 | NaN | 0 | NaN | NaN | NaN | NaN | NaN |
2 | 2000 | 1 | 30 | 7 | NaN | 1647 | NaN | 1859 | HP | 154 | ... | 0 | 0 | 1 | NaN | 0 | NaN | NaN | NaN | NaN | NaN |
3 | 2000 | 1 | 31 | 1 | 1645.0 | 1647 | 1852.0 | 1859 | HP | 154 | ... | 7 | 14 | 0 | NaN | 0 | NaN | NaN | NaN | NaN | NaN |
4 | 2000 | 1 | 1 | 6 | 842.0 | 846 | 1057.0 | 1101 | HP | 609 | ... | 3 | 8 | 0 | NaN | 0 | NaN | NaN | NaN | NaN | NaN |
5 rows × 29 columns
Application Infrastructure
Canonical Maven directory structure
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/LogisticRegression.java
Main Program
/*
*
LogisticRegression.java
*
*/
public class LogisticRegression {
public static void main(String[] args) {
String inputFile = args[0];
String outputPath = args[1];
SparkConf conf = new SparkConf().setAppName("LogisticRegression");
SparkContext sc = new SparkContext(conf);
// Load and parse the data
int minPartition = 1;
RDD<String> input = sc.textFile(inputFile, minPartition);
JavaRDD<String> data = input.toJavaRDD();
JavaRDD<LabeledPoint> parsedData = data.map(line -> {
String[] features = line.split(",");
double[] v = new double[features.length-1];
for (int i = 0; i < features.length - 1; i++) {
v[i] = Double.parseDouble(features[i]);
}
return new LabeledPoint(Double.parseDouble(features[features.length-1]), Vectors.dense(v));
});
// Split initial RDD into two... [60% training data, 40% testing data].
JavaRDD<LabeledPoint>[] splits = parsedData.randomSplit(new double[] {0.6, 0.4}, 11L);
JavaRDD<LabeledPoint> training = splits[0].cache();
JavaRDD<LabeledPoint> test = splits[1];
// Run training algorithm to build the model.
LogisticRegressionModel model = new LogisticRegressionWithLBFGS()
.setNumClasses(10)
.run(training.rdd());
// Compute raw scores on the test set.
JavaRDD<Tuple2<Object, Object>> predictionAndLabels = test.map(
new Function<LabeledPoint, Tuple2<Object, Object>>() {
public Tuple2<Object, Object> call(LabeledPoint p) {
Double prediction = model.predict(p.features());
return new Tuple2<Object, Object>(prediction, p.label());
}
}
);
// Save Prediction result
predictionAndLabels.saveAsTextFile(outputPath);
// Get evaluation metrics.
MulticlassMetrics metrics = new MulticlassMetrics(predictionAndLabels.rdd());
double precision = metrics.precision();
System.out.println("Precision = " + precision);
// Save and load model
model.save(sc, outputPath+"/model/LogisticRegressionModel");
sc.stop();
}
}
Dependencies
...
<properties>
<java.version>1.8</java.version>
<spark.version>1.5.0</spark.version>
</properties>
<dependencies>
<!-- Spark core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
...
Deploy Application To Train The Model
cd LogisticRegression
mvn package
hadoop fs -rm -r test/output
spark-submit --class "LogisticRegression" target/spark-sample-0.0.1.jar test/input test/output
hadoop fs -copyToLocal test/output/model/LogisticRegressionModel ./Model
Package
the application using Maven.
# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/spark-sample-0.0.1.jar
Prepare input file and output environment
# Make input directory
$ hadoop fs -mkdir test/input
# Copy input file(s) from local to remote
$ hadoop fs -copyFromLocal ./input/logistic_input test/input
# Remove output directory to prevent conflicts
$ hadoop fs -rm -r test/output
Execute it with spark-submit
# Use spark-submit to run your application
$ spark-submit \
--class "LogisticRegression" \
target/spark-sample-0.0.1.jar
Check Precision
The application would split the input data into 2 parts (60% train, 40% test) and print out the precision of the trained model.
INFO Precision = 0.81475
Fetched The Trained Model
If you want to use this model in your next step, you can simply load it.
LogisticRegressionModel sameModel = LogisticRegressionModel.load(sc, "test/output/model/LogisticRegressionModel");
Or you can also copy the trained model from remote to local.
hadoop fs -copyToLocal test/output/model/LogisticRegressionModel ./Model