Logistic Regression Deployment Using Java Spark
Preparing Data
- For detailed, please check https://hackmd.io/s/SkEYWCnjg
- Notice 1. that the target class should be at the last column.
- Notice 2. the header column should be removed from the input dataset.
| Year | Month | DayofMonth | DayOfWeek | DepTime | CRSDepTime | ArrTime | CRSArrTime | UniqueCarrier | FlightNum | ... | TaxiIn | TaxiOut | Cancelled | CancellationCode | Diverted | CarrierDelay | WeatherDelay | NASDelay | SecurityDelay | LateAircraftDelay | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 2000 | 1 | 28 | 5 | 1647.0 | 1647 | 1906.0 | 1859 | HP | 154 | ... | 15 | 11 | 0 | NaN | 0 | NaN | NaN | NaN | NaN | NaN | 
| 1 | 2000 | 1 | 29 | 6 | 1648.0 | 1647 | 1939.0 | 1859 | HP | 154 | ... | 5 | 47 | 0 | NaN | 0 | NaN | NaN | NaN | NaN | NaN | 
| 2 | 2000 | 1 | 30 | 7 | NaN | 1647 | NaN | 1859 | HP | 154 | ... | 0 | 0 | 1 | NaN | 0 | NaN | NaN | NaN | NaN | NaN | 
| 3 | 2000 | 1 | 31 | 1 | 1645.0 | 1647 | 1852.0 | 1859 | HP | 154 | ... | 7 | 14 | 0 | NaN | 0 | NaN | NaN | NaN | NaN | NaN | 
| 4 | 2000 | 1 | 1 | 6 | 842.0 | 846 | 1057.0 | 1101 | HP | 609 | ... | 3 | 8 | 0 | NaN | 0 | NaN | NaN | NaN | NaN | NaN | 
5 rows × 29 columns
Application Infrastructure
Canonical Maven directory structure
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/LogisticRegression.java
Main Program
/*
*
LogisticRegression.java
*
*/
public class LogisticRegression {
  public static void main(String[] args) {
    String inputFile = args[0];
    String outputPath = args[1];
    SparkConf conf = new SparkConf().setAppName("LogisticRegression");
    SparkContext sc = new SparkContext(conf);
    
     // Load and parse the data
    int minPartition = 1;
    RDD<String> input = sc.textFile(inputFile, minPartition);
    JavaRDD<String> data = input.toJavaRDD(); 
        
    JavaRDD<LabeledPoint> parsedData = data.map(line -> {
      String[] features = line.split(",");
      double[] v = new double[features.length-1];
      for (int i = 0; i < features.length - 1; i++) {
        v[i] = Double.parseDouble(features[i]);
      }
      return new LabeledPoint(Double.parseDouble(features[features.length-1]), Vectors.dense(v));
    });
    // Split initial RDD into two... [60% training data, 40% testing data].
    JavaRDD<LabeledPoint>[] splits = parsedData.randomSplit(new double[] {0.6, 0.4}, 11L);
    JavaRDD<LabeledPoint> training = splits[0].cache();
    JavaRDD<LabeledPoint> test = splits[1];
    // Run training algorithm to build the model.
    LogisticRegressionModel model = new LogisticRegressionWithLBFGS()
      .setNumClasses(10)
      .run(training.rdd());
    // Compute raw scores on the test set.
    JavaRDD<Tuple2<Object, Object>> predictionAndLabels = test.map(
      new Function<LabeledPoint, Tuple2<Object, Object>>() {
        public Tuple2<Object, Object> call(LabeledPoint p) {
          Double prediction = model.predict(p.features());
          return new Tuple2<Object, Object>(prediction, p.label());
        }
      }
    );
    // Save Prediction result
    predictionAndLabels.saveAsTextFile(outputPath);
    // Get evaluation metrics.
    MulticlassMetrics metrics = new MulticlassMetrics(predictionAndLabels.rdd());
    double precision = metrics.precision();
    System.out.println("Precision = " + precision);
    // Save and load model
    model.save(sc, outputPath+"/model/LogisticRegressionModel");
    sc.stop();
  }
}
Dependencies
...
  <properties>
    <java.version>1.8</java.version>
    <spark.version>1.5.0</spark.version>
  </properties>
  <dependencies>
    <!-- Spark core -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>${spark.version}</version>
      <!--<scope>provided</scope>-->
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-mllib_2.10</artifactId>
      <version>${spark.version}</version>
    </dependency>
  </dependencies>
...
Deploy Application To Train The Model
cd LogisticRegression
mvn package
hadoop fs -rm -r test/output
spark-submit  --class "LogisticRegression"  target/spark-sample-0.0.1.jar test/input test/output
hadoop fs -copyToLocal test/output/model/LogisticRegressionModel ./Model
Package the application using Maven.
# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/spark-sample-0.0.1.jar
Prepare input file and output environment
# Make input directory
$ hadoop fs -mkdir test/input
# Copy input file(s) from local to remote
$ hadoop fs -copyFromLocal ./input/logistic_input test/input
# Remove output directory to prevent conflicts 
$ hadoop fs -rm -r test/output
Execute it with spark-submit
# Use spark-submit to run your application
$ spark-submit \
  --class "LogisticRegression" \
  target/spark-sample-0.0.1.jar
Check Precision
The application would split the input data into 2 parts (60% train, 40% test) and print out the precision of the trained model.
INFO Precision = 0.81475
Fetched The Trained Model
If you want to use this model in your next step, you can simply load it.
LogisticRegressionModel sameModel = LogisticRegressionModel.load(sc, "test/output/model/LogisticRegressionModel");
Or you can also copy the trained model from remote to local.
hadoop fs -copyToLocal test/output/model/LogisticRegressionModel ./Model
