How to implement FP-Growth using Apache Spark in java


The frequent items are extracted from transactions using spark.mllib package. It provides a parallel implementation of FP-growth to mining frequent itemsets. The Spark takes JavaRDD of transactions as input, where each transaction is converted to set of items. Then FPGrowth algorithm is implemented with set of parameters setMinSupport and setNumPartitions. The setMinSupport and setNumPartitions define the minimum support required for an itemset to be identified as frequent and number of partitions used to distribute the work respectively. Finally, FPGrowth run function is executed which returns an FPGrowthModel that stores the frequent itemsets with their frequencies.

import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.mllib.fpm.AssociationRules;
import org.apache.spark.mllib.fpm.FPGrowth;
import org.apache.spark.mllib.fpm.FPGrowthModel;
public class SparkFPGrowth {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName(“Spark-FPGrowth”).setMaster(“local[2]”);
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> data =c.textFile(“hdfs://localhost:54310/sparkinput/FPGrowth/fpgrowth.txt”);
JavaRDD<List<String>> transactions = Function<String, List<String>>() {
public List<String> call(String line) {
String[] parts = line.split(” “);
return Arrays.asList(parts);
}} );
FPGrowth fpg = new FPGrowth().setMinSupport(0.2).setNumPartitions(10);
FPGrowthModel<String> model =;
for (FPGrowth.FreqItemset<String>i1: model.freqItemsets().toJavaRDD().collect()) {
System.out.println(“[” + i1.javaItems() + “], ” + i1.freq());

Spark input-HDFS

Input-HDFS  for FP-Growth using Apache Spark

Spark job status

Spark job status for FP-Growth using Apache Spark

Frequent Itemset

Frequent Itemset for FP-Growth using Apache Spark