To implement logistic regression using Spark with Python
Set up Spark Context and Spark session
Load the Data set
Deal with categorical data and Covert the data to dense vectors(Features and Label)
Transform the dataset to dataframe
Identify categorical features, and index them
Split the data into train and test set
Fit the logistic regression model
Predict using the test set
Evaluate the metrics
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler,IndexToString
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.classification import LogisticRegression
from sklearn.metrics import confusion_matrix
#Set up SparkContext and SparkSession
spark=SparkSession \
.builder \
.appName(“Python spark logistic regression example”)\
.config(“spark.some.config.option”,”some-value”)\
.getOrCreate()
#Load the data set
df=spark.read.format(‘com.databricks.spark.csv’).options(header=’True’,inferschema=’True’).load(“/home/…../weight-height.csv”)
def get_dummy(df,categoricalCols,continuousCols,labelCol):
indexers = [ StringIndexer(inputCol=c, outputCol=”{0}_indexed”.format(c))
for c in categoricalCols ]
# default setting: dropLast=True
encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(),
outputCol=”{0}_encoded”.format(indexer.getOutputCol()))
for indexer in indexers ]
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
+ continuousCols, outputCol=”features”)
pipeline = Pipeline(stages=indexers + encoders + [assembler])
model=pipeline.fit(df)
data = model.transform(df)
data = data.withColumn(‘label’,col(labelCol))
return data.select(‘features’,’label’)
catcols =[]
num_cols = [‘Height’,’Weight’]
labelCol = ‘Gender’
data = get_dummy(df,catcols,num_cols,labelCol)
# Index labels, adding metadata to the label column
labelIndexer = StringIndexer(inputCol=’label’,outputCol=’indexedLabel’).fit(data)
labelIndexer.transform(data).show(5, True)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =VectorIndexer(inputCol=”features”, \
outputCol=”indexedFeatures”, \
maxCategories=4).fit(data)
# Split the data into training and test sets (20% held out for testing)
(trainingData, testData) = data.randomSplit([0.8, 0.2])
logr = LogisticRegression(featuresCol=’indexedFeatures’, labelCol=’indexedLabel’)
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol=”prediction”, outputCol=”predictedLabel”,
labels=labelIndexer.labels)
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, logr,labelConverter])
# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
y_true = predictions.select(“label”).toPandas()
y_pred = predictions.select(“predictedLabel”).toPandas()
class_temp = predictions.select(“label”).groupBy(“label”).count().sort(‘count’, ascending=False).toPandas()
class_temp = class_temp[“label”].values.tolist()
class_names = map(str, class_temp)
# # # print(class_name)
class_names
cnf_matrix = confusion_matrix(y_true, y_pred,labels=class_names)
print(“Confusion Matrix : “,cnf_matrix)