Amazing technological breakthrough possible @S-Logix pro@slogix.in

Office Address

  • #5, First Floor, 4th Street Dr. Subbarayan Nagar Kodambakkam, Chennai-600 024 Landmark : Samiyar Madam
  • pro@slogix.in
  • +91- 81240 01111

Social List

How to implement Random Forest for classification using Spark with Python

Objective

To implement Random Forest for classification using Spark with python

Process

  Set up Spark Context and Spark session

  Load the Data set

  Deal with categorical data and Covert the data to dense vectors(Features and Label)

  Transform the dataset to dataframe

  Identify categorical features, and index them

  Split the data into train and test set

  Fit the random forest model

  Predict using the test set

  Evaluate the metrics

Sample Code

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler,IndexToString
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.classification import RandomForestClassifier
from sklearn.metrics import confusion_matrix
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType
#Set up SparkContext and SparkSession
spark=SparkSession \
.builder \
.appName(“Python spark Random forest example”)\
.config(“spark.some.config.option”,”some-value”)\
.getOrCreate()
#Load the data set
df=spark.read.format(‘com.databricks.spark.csv’).options(header=’True’,inferschema=’True’).load(“/home/………../creditcard.csv”)

# Convert the label into a valid categorical value of type string
#in this case convert 0 as Non-Fraudulent and 1 as Fraudulent
def string_to_float(x):
return float(x)
#
def condition(r):
if(r==0):
label = “Non-Fraudulent”
else:
label = “Fraudulent”
return label
string_to_float_udf = udf(string_to_float, DoubleType())
class_udf = udf(lambda x: condition(x), StringType())
df = df.withColumn(“Class”, class_udf(“Class”))
#df.show(5,True)
#df.printSchema()

#To deal with categorical input data
def get_dummy(df,categoricalCols,continuousCols,labelCol):
indexers = [ StringIndexer(inputCol=c, outputCol=”{0}_indexed”.format(c))
for c in categoricalCols ]
# default setting: dropLast=True
encoders = [ OneHotEncoder(inputCsoft23/soft23/Sathish/practiceol=indexer.getOutputCol(),
outputCol=”{0}_encoded”.format(indexer.getOutputCol()))
for indexer in indexers ]
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
+ continuousCols, outputCol=”features”)
pipeline = Pipeline(stages=indexers + encoders + [assembler])
model=pipeline.fit(df)
data = model.transform(df)
data = data.withColumn(‘label’,col(labelCol))
return data.select(‘features’,’label’)
catcols =[]
num_cols = df.columns[:-1]
labelCol = ‘Class’
data = get_dummy(df,catcols,num_cols,labelCol)
# Index labels, adding metadata to the label column
labelIndexer = StringIndexer(inputCol=’label’,outputCol=’indexedLabel’).fit(data)
#labelIndexer.transform(data).show(5, True)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =VectorIndexer(inputCol=”features”, \
outputCol=”indexedFeatures”, \
maxCategories=4).fit(data)
# Split the data into training and test sets (20% held out for testing)
(trainingData, testData) = data.randomSplit([0.8, 0.2])
rf = RandomForestClassifier(featuresCol=’indexedFeatures’, labelCol=’indexedLabel’)
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol=”prediction”, outputCol=”predictedLabel”,
labels=labelIndexer.labels)
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf,labelConverter])
# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
y_true = predictions.select(“label”).toPandas()
y_pred = predictions.select(“predictedLabel”).toPandas()
class_temp = predictions.select(“label”).groupBy(“label”).count().sort(‘count’, ascending=False).toPandas()
class_temp = class_temp[“label”].values.tolist()
class_names = map(str, class_temp)
# # # print(class_name)
class_names
cnf_matrix = confusion_matrix(y_true, y_pred,labels=class_names)
print(“Confusion Matrix : “,cnf_matrix)

Screenshots
implement Random Forest for classification using Spark with Python