To implement linear regression using Spark with python
Set up Spark Context and Spark session
Load the Data set
Covert the data to dense vectors(Features and Label)
Transform the dataset to dataframe
Identify categorical features, and index them
Split the data into train and test set
Fit the linear regression model
Predict using the test set
Take the summary of the model and evaluate metrics
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
import sklearn.metrics
#Set up SparkContext and SparkSession
spark=SparkSession \
.builder \
.appName(“Python spark regression example”)\
.config(“spark.some.config.option”,”some-value”)\
.getOrCreate()
#Load the data set
df=spark.read.format(‘com.databricks.spark.csv’).options(header=’True’,inferschema=’True’).load(“/home/…../weight-height.csv”)
#Convert the dataset to dense vector
def transData(data):
return data.rdd.map(lambda r: [Vectors.dense(r[-2]),r[-1]]).toDF([‘features’,’label’])
transformed= transData(df)
featureIndexer = VectorIndexer(inputCol=”features”, \
outputCol=”indexedFeatures”,\
maxCategories=4).fit(transformed)
# Split the data into training and test sets (20% held out for testing)
(trainingData, testData) = transformed.randomSplit([0.8, 0.2])
# Define LinearRegression algorithm
lr = LinearRegression()
pipeline = Pipeline(stages=[featureIndexer,lr])
model = pipeline.fit(trainingData)
lrm = model.stages[-1]
#Print the coefficient and intercept(Summary)
print(“Coefficient : “+str(lrm.coefficients))
print(“Intercept : “+str(lrm.intercept))
#To predict the data
pred=model.transform(testData)
pred.select(“features”,”label”,”prediction”).show(5)
#Evaluate the RMSE and required the metrics
evaluator=RegressionEvaluator(labelCol=”label”,predictionCol=”prediction”,metricName=”rmse”)
rmse=evaluator.evaluate(pred)
print(“The RMSE value is : “,rmse)
y_true = pred.select(“label”).toPandas()
y_pred = pred.select(“prediction”).toPandas()
r2_score = sklearn.metrics.r2_score(y_true, y_pred)
print(“R^2 score: {0}”.format(r2_score))