Introduction to Machine Learning with Spark ML – II

In the earlier post, we went over some concepts regarding Machine Learning done with Spark ML. Here are primarily 2 types of objects relating to machine learning we saw:

  1. Transformers: Objects that took a DataFrame, changed something in it and returned a DataFrame. The method used here was “transform”.
  2. Estimator: Objects that are passed in a DataFrame and would apply an algorithm on it to return a transformer. E.g. GBTClassifier. We used the “fit” function to apply the algorithm on the Dataframe.

In our last example of predicting income level using Adult dataset, we had to change our input dataset to a format that is suitable for machine learning. There was a sequence of changes we had done e.g. converting categorical variables to numeric, One Hot Encoding & Assembling the columns in a single column. Everytime there is additional data available (which will be numerous times), we will need to do these steps again and again.

In this post, we will introduce a new Object that organises these steps in sequence that can be run as many times as needed and it is called the Pipeline. The Pipeline chains together various transformers and estimators in sequence. While we could do the machine learning without the Pipeline, it is a standard practice to put the sequence of steps in a Pipeline. Before we get there, let’s try to add an additional step in fixing our pipeline and that is to identify and remove Null data. This is indicated in our dataset as ‘?’.

To know how many null values exist let’s run this command:

from pyspark.sql.functions import isnull, when, count, col
adultDF.select([count(when(isnull(c), c)).alias(c) for c in adultDF.columns]).show()

The result shows that there are no null values. Inspecting the data, we see that null values have been replaced with “?”. We would need to remove these rows from our dataset. We can replace the ? with null values as follows:

adultDF = adultDF.replace('?', None)

Surprisingly this doesn’t change the ? values. It appeared that the ? is padded with some spaces. So we will use the when and trim function as follows:

from pyspark.sql.functions import isnull, when, count, col,trim
adultDF = adultDF.select([when(trim(col(c))=='?',None).otherwise(col(c)).alias(c) for c in adultDF.columns])

This replaces ? will null that we can now drop from our dataframe using dropna() function. The number of rows remaining are now 30,162.

Now let’s organise these steps in a Pipeline as follows:

from pyspark.ml import Pipeline

adultPipeline = Pipeline(stages = [wcindexer,eduindexer,maritalindexer,occupationindexer,relindexer,raceindexer,sexindexer,nativecountryindexer,categoryindexer,ohencoder,colvectors])

The stages list contains all the transformers we used to convert raw data into dataset ready for machine learning. This includes all the StringIndexers, OneHotEncoder and VectorAssembler. Next, the process of defining the GBTClassifier and BinaryClassificationEvaluator remains the same as in the earlier post. You can now include the GBTClassfier in the pipeline as well and run the fit() on this pipeline with train dataset as follows:

adultMLTrainingPipeline = Pipeline(stages = [adultPipeline,gbtclassifier])
gbmodel  = adultMLTrainingPipeline.fit(train)

However, we can perform another optimization at this point. The model currently trained is based of a random split of values from the dataset. Cross Validation can help generalise the model even better by determining best parameters from a list of parameters and do it by creating more than one train and test datasets (called as folds). The list of parameters are supplied as ParamGrid as follows:

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = ParamGridBuilder()\
  .addGrid(gbtclassifier.maxDepth, [2, 5])\
  .addGrid(gbtclassifier.maxIter, [10, 100])\
  .build()

# Declare the CrossValidator, which performs the model tuning.
cv = CrossValidator(estimator=gbtclassifier, evaluator=eval, estimatorParamMaps=paramGrid)

The cross validator object takes the estimator, evaluator and the paramGrid objects. The pipeline will need to be modified to use this cross validator instead of the classifier object we used earlier as follows:

adultMLTrainingPipeline = Pipeline(stages = [adultPipeline,gbtclassifier])


adultMLTrainingPipeline = Pipeline(stages = [adultPipeline,cv])

With these settings, the experiment ran for 22 mins and the evalution result came out to be 91.37% area under RoC.

Advertisement

Introduction to Machine Learning with Spark ML – I

Machine Learning is most widely done using Python and scikit-learn toolkit. The biggest disadvantage of using this combination is the single machine limit that Python imposes on training the model. This limits the amount of data that can be used for training to the maximum memory on the computer.

Industrial/ enterprise datasets tend to be in terabytes and hence the need for a parallel processing framework that could handle enormous datasets was felt. This is where Spark comes in. Spark comes with a machine learning framework that can be executed in parallel during training using a framework called Spark ML. Spark ML is based on the same Dataframe API that is widely used within the Spark ecosystem. This requires minimal additional learning for preprocessing of raw data.

In this post, we will cover how to train a model using Spark ML.

In the next post, we will introduce the concept of Spark ML pipelines that allow us to process the data in a defined sequence.

The final post will cover MLOps capabilities that MLFlow framework provides for operationalising our machine learning models.

We are going to be Adult Dataset from UCI Machine Learning Repository. Go ahead and download the dataset from the “Data Folder” link on the page. The file you are interested to download is named “adult.data” and contains the actual data. Since the format of this dataset is CSV, I saved it on my local machine as adult.data.csv. The schema of this dataset is available in another file titled – adult.names. The schema of the dataset is as follows:

age: continuous.
workclass: categorical.
fnlwgt: continuous.
education: categorical.
education-num: continuous.
marital-status: categorical.
race: categorical.
sex: categorical.
capital-gain: continuous.
capital-loss: continuous.
hours-per-week: continuous.
native-country: categorical.
summary: categorical

The prediction task is to determine whether a person makes over 50K in a year which is contained in the summary field. This field contains value of <50K or >=50K and is our target variable. The machine learning task is that of binary classification.

Upload file in DBFS

The first step was to upload the dataset from where it is accessible. I chose DBFS for ease of use and uploaded the file at the following location: /dbfs/FileStore/Abhishek-kant/adult_dataset.csv

Once loaded in DBFS, we need to access the same as a Dataframe. We will apply a schema while reading the data since the data doesn’t come with header values as indicated below:

adultSchema = "age int,workclass string,fnlwgt float,education string,educationnum float,maritalstatus string,occupation string,relationship string,race string,sex string,capitalgain double,capitalloss double,hoursperweek double,nativecountry string,category string"

adultDF = spark.read.csv("/FileStore/Abhishek-kant/adult_dataset.csv", inferSchema = True, header = False, schema = adultSchema)

A sample of the data is shown below:

We need to move towards making this dataset machine learning ready. Spark ML only works with numeric data. We have many text values in the dataframe that will need to be converted to numeric values.

One of the key changes is to convert categorical variables expressed as string into labels expressed as string. This can be done using StringIndexer object (available in pyspark.ml.feature namespace) as illustrated below:
eduindexer = StringIndexer(inputCol=”education”, outputCol =”edu”)

The inputCol indicates the column to be transformed and outputCol is the name of the column that will get added to the dataframe after converting to the categorical label. The result of the StringIndexer is shown to the right e.g. Private is converted to 0 while State-gov is converted to 4.

This conversion will need to be done for every column:

#Convert Categorical variables to numeric
from pyspark.ml.feature import StringIndexer

wcindexer = StringIndexer(inputCol="workclass", outputCol ="wc")
eduindexer = StringIndexer(inputCol="education", outputCol ="edu")
maritalindexer = StringIndexer(inputCol="maritalstatus", outputCol ="marital")
occupationindexer = StringIndexer(inputCol="occupation", outputCol ="occ")
relindexer = StringIndexer(inputCol="relationship", outputCol ="relation")
raceindexer = StringIndexer(inputCol="race", outputCol ="racecolor")
sexindexer = StringIndexer(inputCol="sex", outputCol ="gender")
nativecountryindexer = StringIndexer(inputCol="nativecountry", outputCol ="country")
categoryindexer = StringIndexer(inputCol="category", outputCol ="catlabel")

This creates what is called a “dense” matrix where a single column contains all the values. Further, we will need to convert this to “sparse” matrix where we have multiple columns for each value for a category and for each column we have a 0 or 1. This conversion can be done using the OneHotEncoder object (available in pyspark.ml.feature namespace) as shown below:
ohencoder = OneHotEncoder(inputCols=[“wc”], outputCols=[“v_wc”])

The inputCols is a list of columns that need to be “sparsed” and outputCols is the new column name. The confusion sometimes is around fitting sparse matrix in a single column. OneHotEncoder uses a schema based approach to fit this in a single column as shown to the left.

Note that we will not sparse the target variable i.e. “summary”.

The final step for preparing our data for machine learning is to “vectorise” it. Unlike most machine learning frameworks that take a matrix for training, Spark ML requires all feature columns to be passed in as a single vector of columns. This is achieved using VectorAssembler object (available in pyspark.ml.feature namespace) as shown below:

colvectors = VectorAssembler(inputCols=["age","v_wc","fnlwgt","educationnum","capitalgain","capitalloss","v_edu","v_marital","v_occ","v_relation","v_racecolor","v_gender","v_country","hoursperweek"],
outputCol="features")

As you can see above, we are adding all columns in a vector called as “features”. With this our dataframe is ready for machine learning task.

We will proceed to split the dataframe in training and test data set using randomSplit function of dataframe as shown:

(train, test) = adultMLDF.randomSplit([0.7,0.3])

This will split our dataframe into train and test dataframe in 70:30 ratio.
The classifier used will be Gradient Boosting classifier available as GBTClassifier object and initialised as follows:

from pyspark.ml.classification import GBTClassifier

classifier = GBTClassifier(labelCol="catlabel", featuresCol="features")

The target variable and features vector column is passed as attributes to the object. Once the classifier object is initialised we can use it to train our model using the “fit” method and passing the training dataset as an attribute:

gbmodel = classifier.fit(train)

Once the training is done, you can get predictions on the test dataset using the “transform” method of the model with test dataset passed in as attribute:

adultTestDF = gbmodel.transform(test)

The result of this function is addition of three columns to the dataset as shown below:

A very important task in machine learning is to determine the efficacy of the model. To evaluate how the model performed, we can use the BinaryClassificationEvaluator object as follows:

from pyspark.ml.evaluation import BinaryClassificationEvaluator

eval = BinaryClassificationEvaluator(labelCol = "catlabel", rawPredictionCol="rawPrediction")

eval.evaluate(adultTestDF)

In the initialisation of the BinaryClassificationEvaluator, the labelCol attribute specifies the actual value and rawPredictionCol represents the predicted value stored in the column – rawPrediction. The evaluate function will give the accuracy of the prediction in the test dataset represented as AreaUnderROC metric for classification tasks.

You would definitely want to save the trained model for use later by simply saving the model as follows:

gbmodel.save(path)

You can later retrieve this model using “load” function of the specific classifier:

from pyspark.ml.classification import GBTClassificationModel
classifierModel = GBTClassificationModel.load(path)

You can now use this classification model for inferencing as required.

Constructing a PySpark DataFrame Dynamically

Spark provides a lot of connectors to load data from various formats. Whether it is a CSV or JSON or Parquet you can use the magic of “spark.read”.

However, there are times when you would like to create a DataFrame dynamically using code. The one use case that I was presented with was to create a dataframe out of a very twisted incoming JSON from an API. So, I decided to parse the JSON manually and create a dataframe.

The approach we are going to use is to create a list of structured Row types and we are using PySpark for the task. The steps are as follows:

  1. Define the custom row class
personRow = Row("name","age")

2. Create an empty list to populate later

community = []

3. Create row objects with the specific data in them. In my case, this data is coming from the response that we get from calling the API.

qr = personRow(name, age)

4. Append the row objects to the list. In our program, we are using a loop to append multiple Row objects to the list.

community.append(qr)

5. Define the schema using StructType

person_schema = StructType([ \
                              StructField("name", StringType(), True), \
                              StructField("age", IntegerType(), True), \

                    ])

6. Create the dataframe using createDataFrame. The two parameters required is the data and schema to applied.

communityDF = spark.createDataFrame(community, person_schema)

Using the steps above, we are able to create a dataframe for use in Spark applications dynamically.

2017

Webinar Schedule for Jan-Feb 2017

We are at the last week of 2016 and getting ready to welcome 2017. We wish you all a very Happy & Prosperous Happy New Year 2017. We here at Progress are busy trying to bring you new learnings in 2017. Our team has been brainstorming on possible topics for Jan/Feb 2017 webinars and we cant wait to make it live. In this blog post we take a look at the Jan/Feb 2017 Webinar Schedule. So read on. Continue reading

Data Preparation Made Easy wit hEasyl

Resources for Webinar “Preparing Big Data for Analysis with Progress/Telerik Easyl”

IMPORTANT: Progress has since stopped active development on Easyl. 

 

We are back with one more webinar. On Jun 11 2015 we conducted a webinar titled “Preparing Big Data for Analysis with Progress/Telerik Easyl” at a new time. June & July month webinar will be held from 12PM to 1PM IST instead of 3PM IST. This blog post will provide you a recap of the webinar and you can catch up with the webinar in case you missed it live.

About Easyl:

HIGH-SPEED DATA PREPARATION YOU CAN DO YOURSELF – The slog of data preparation is over. What was slow and brutal, weighed down by days of Excel and email, is now fast and brilliant. For everyone. Business user, vendor, builder or partner — Progress Easyl puts you ahead of the pack. Modern, high-performance data preparation is now in your hands.

Data Preparation Made Easy wit hEasyl

Data Preparation Made Easy with Easyl

You can check out more information on Easyl here: Easyl

Slide Deck:

Here is the slide deck used in the webinar:

 

Q&A:

Q: What is Eloqua?
A: its a Marketing automation SaaS product from Oracle. Presenter is using that as one of the data source

Q: How Easyl help in Industry?
A: You can check out the features of Easyl here – https://www.progress.com/easyl/features. this will help you understand the capabilties of Easyl

Q: Hi Can we use this Easyl as DatASource for some other BI Tools Like Tableau?
Yes, tableau will be supported in the first GA release scheduled at the end of June-2015.

Q: How much amount of data space is provided by Easyl?
A: The data space depends on the plan your Easyl account is subscribed to. You can get information on Easyl plans at https://pacific.progress.com/cs-ui/service/plans after registering with Progress Pacific.

Q: Is it integrated with telerik test studio?
A: Progress Easyl is part of the Progress Pacific platform. It is not integrated with telerik test studio.

Q: Is SAP database supported in your tool ?
A: SAP is not currently supported.

Q: Is there any protection for data in Easyl?
A: Data generated in Easyl by a user can be accessed only by the owner and the sharees to whom the report is shared. All data is generated by the user is strictly bound to the particular user’s account.

Q: Can we secure data in Easyl?
A: Setting password for selected reports is not possible.

Q: One morething as you mentioned that it will be able to Map with Tableau so here Tableau will Be TableauAOnline Or in-premise Tableau Server?
A: Tableau Online.

Q: Will there be a DEC (from Telerik) data connector available?
A: DEC connector for easyl is currently not available currently.

Q: which cloud service is being used for storing data ?
A: Amazon EC2 instances.

Q: can you show the google analytics part in Easyl?
A: A tutorial of the google analytics integration will be available in the Easyl documentation after the GA release.

Q: Can we secure data in database with some passwords like MS Access?
A: Easyl user authentication is only available for the time being. Password protecting individual reports is not possible.

Q: How much amount of data space is provided by Easyl?
A: The data space depends on the plan your Easyl account is subscribed to. You can get information on Easyl plans in https://pacific.progress.com/cs-ui/service/plans after registering with Progress Pacific.

Q: Can Easyl be use as some sort of oData Server so that i can connect this with My TAbleau In Premise Server?
A: This is currently not possible. But you could generate your .tde files and work with your tableau desktop application.

Q: Is multi tenancy supported ?
A: Multitenancy is supported in the form of corporate accounts, where a bunch of account could be grouped and managed together using the Progress Pacific cloud services.

Q: Regarding Multi-tenancy ,  I am referring to data level multitenancy . For exampe,  If admin create a report and ihe would like to share the report with team for different location analyst but those user can see the data in report for the location  whch they have the access
A: Yes it is possible by using a corporate plan.

Q: Is this tool free?
A: It is available for a 30 day trial period, after which you’ll have to subscribe to a plan to resume working with full features.

Q: Can we attach easyl with developing softwares like Visual Studio?
A: No.

Q: How to open Easyl website?
A: You can register at https://pacific.progress.com/ . And select Easyl from the pacific from the console page.

Q: What is licensing cost ?
A: It depends on the plan your Easyl account is subscribed to. You can get information on Easyl plans at https://pacific.progress.com/cs-ui/service/plans after registering with Progress Pacific.

Q: How can I access Easyl
A: You can register at https://pacific.progress.com/ . And select Easyl from the pacific from the console page.

Q: Is there any support team for Easyl?
A: Yes. You could visit http://forms.progress.com/forms/ContactUs-v2 to contact our sales and service teams.

Q: Does Easyl offer some form of automatic removal of duplication of data, when say multiple datasources are used, say HubSpot and Google Analytics where journeys are obviously connected
A: Not currently.

Q: Can Easyl be installed & configured on any In-Premise Servers?
A: Easyl is available only as a cloud service. It cannot be installed as a standalone on-premise service.

Q: How can i beleive that my data is secure in easyl?
A: Data is secured in safe amazon instances which is closely monitored by an operations team.

Q: Can you show how to connect Easyl to SQL server
A: You just need to create a new datasource and enter the connection parameters in the SQL Server Datasource tab. Please refer documentation for details.

Q: In which fields, Easyl will be helpful?
A: Data Preparation for data analytics and business intelligence.

Q: does Easyl support RTL languages (Arabic) ?
A: Not currently.

Q: Is there any security for Easyl from virus attacks?
A: Data is secured in safe amazon instances which is closely monitored by an operations team to avoid any such attacks.

 

Hope you like the webinar & you will give Easyl a spin. Till next time – HappyCoding.