Amazing technological breakthrough possible @S-Logix

Office Address

  • #5, First Floor, 4th Street Dr. Subbarayan Nagar Kodambakkam, Chennai-600 024 Landmark : Samiyar Madam
  • +91- 81240 01111

Social List

How to implement Decision model for regression using Spark with Python


To implement decision tree model for regression using Spark with python


  Set up Spark Context and Spark session

  Load the Data set

  Deal with categorical data and Covert the data to dense vectors(Features and Label)

  Transform the dataset to dataframe

  Identify categorical features, and index them

  Split the data into train and test set

  Predict using the test set

  Evaluate the metrics

Sample Code

from pyspark.sql import SparkSession
from import Pipeline
from import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import col
from import VectorIndexer
from import DecisionTreeRegressor
from import RegressionEvaluator
#Set up SparkContext and SparkSession
spark=SparkSession \
.builder \
.appName(“Python spark regression example”)\
#Load the data set‘com.databricks.spark.csv’).options(header=’True’,inferschema=’True’).load(“/home/…./servo.csv”)
# Automatically identify categorical features, and index them.
def get_dummy(df,categoricalCols,continuousCols,labelCol):
indexers = [ StringIndexer(inputCol=c, outputCol=”{0}_indexed”.format(c))
for c in categoricalCols ]
# default setting: dropLast=True
encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(),
for indexer in indexers ]
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
+ continuousCols, outputCol=”features”)
pipeline = Pipeline(stages=indexers + encoders + [assembler])
data = model.transform(df)
data = data.withColumn(‘label’,col(labelCol))
catcols =[“Motor”,”Screw”]
num_cols = [“Pgain”,”Vgain”]
labelCol = ‘Class’
data = get_dummy(df,catcols,num_cols,labelCol)
# Index labels, adding metadata to the label column
labelIndexer = StringIndexer(inputCol=’label’,outputCol=’indexedLabel’).fit(data)
labelIndexer.transform(data).show(5, True)
# Set maxCategories so features with > 8 distinct values are treated as continuous.
featureIndexer =VectorIndexer(inputCol=”features”, \
outputCol=”indexedFeatures”, \
# Split the data into training and test sets (20% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol=”indexedFeatures”)

# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, dt])
# Train model. This also runs the indexer.
model =
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.“prediction”, “label”, “features”).show(5)
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
labelCol=”label”, predictionCol=”prediction”, metricName=”rmse”)
rmse = evaluator.evaluate(predictions)
treeModel = model.stages[1]
print (treeModel) # summary only

implement Decision model for regression using Spark with Python