How To Use SingleStore With Spark ML for Fraud Detection
In this final part of our Fraud Detection series, we’ll use Spark to build a Logistic Regression model from data stored in SingleStore.
Join the DZone community and get the full member experience.
Join For FreeAbstract
In this final part of our Fraud Detection series, we’ll use Spark to build a Logistic Regression model from data stored in SingleStore.
The notebook files used in this article series are available on GitHub in DBC, HTML, and iPython formats.
Introduction
This is a multi-part article series and it is structured as follows:
- Configure Databricks CE.
- Load the Credit Card data into SingleStore.
- Create and evaluate a Logistic Regression model.
In the first part of this Fraud Detection series, we created and configured a Databricks CE cluster. In the second part, we loaded credit card data into our Spark environment. This third article covers Part 3, Create and evaluate a Logistic Regression model. If you are following along with this series, please ensure that you have successfully completed the setup and requirements described in the two previous articles.
According to Andrea Dal Pozzolo, who was involved in the collection of the original dataset we are using for this example use case, fraud detection is a classification problem. Also, since investigators may only review a limited number of transactions, the probability that a transaction is fraudulent is more important than the true classification. Therefore, a good algorithm to use for the initial analysis is Logistic Regression. This is because the outcome has only two possible values.
Fill Out the Notebook
Let’s now create a new notebook. We’ll call it Fraud Detection using Logistic Regression. We’ll attach our new notebook to our Spark cluster.
In the first code cell, let’s add the following:
%run ./Setup
We can then execute the notebook that we previously created. We need to ensure that the server address and password have been added for our SingleStore Managed Service cluster.
In the next code cell we’ll set some parameters for the SingleStore Spark Connector, as follows:
spark.conf.set("spark.datasource.singlestore.ddlEndpoint", cluster)
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", password)
spark.conf.set("spark.datasource.singlestore.disablePushdown", "false")
These are parameters for the SingleStore cluster, username, password, and whether Pushdown is enabled or disabled. We’ll discuss Pushdown in a separate article.
In the next code cell, let’s read the data from the SingleStore table into a Spark Dataframe, as follows:
%%time
df = (spark.read
.format("singlestore")
.load("fraud_detection.credit_card_tx"))
Using the %%time
in the code cell allows us to measure the read operation. It should take just milliseconds to complete.
In the next code cell, we’ll get the number of rows:
df.count()
This value should match the result we obtained in the previous article. In the next code cell, we’ll drop any null values and then count the number of rows again, as follows:
df = df.dropna()
df.count()
The result should show that there are no null values.
As previously mentioned, the dataset is highly skewed. There are a number of solutions we can use to manage a skewed dataset. The initial approach we can take is to under-sample. We'll keep all the 492 fraudulent transactions and reduce the number of non-fraudulent transactions. There are several ways we could perform this dataset reduction:
- Randomly select majority class examples.
- Select every nth row from the majority class examples.
For our initial analysis, let’s use the first approach and select 1% of the majority class examples.
First, we’ll separate the two possible outcomes into two Dataframes in a code cell, as follows:
is_fraud = df.select("*").filter("Class == 1")
no_fraud = df.select("*").filter("Class == 0")
In the next code cell, we’ll randomly sample 1% of non-fraudulent transactions, without replacement, as follows:
no_fraud = no_fraud.sample(False, 0.01, seed = 123)
In the next code cell, we’ll concatenate the two Dataframes, sort on the Time column, and print out the number of rows:
df_concat = no_fraud.union(is_fraud)
df = df_concat.sort("Time")
df.count()
In the next code cell, we’ll check the structure of the Dataframe:
display(df)
Next, in a code cell we’ll create our train-test split:
train, test = df.randomSplit([0.7, 0.3], seed = 123)
print("train =", train.count(), " test =", test.count())
Here we are using 70% and 30% for train and test, respectively.
The code in the following sections was inspired by a Logistic Regression example, available on GitHub. In the next code cell, we’ll generate an is_fraud label column for the training data, using a UDF, as follows:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
is_fraud = udf(lambda fraud: 1.0 if fraud > 0 else 0.0, DoubleType())
train = train.withColumn("is_fraud", is_fraud(train.Class))
We are now ready to create and fit a Spark Machine Learning model:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
# Create the feature vectors.
assembler = VectorAssembler(
inputCols = [x for x in train.columns if x not in ["Time", "Class", "is_fraud"]],
outputCol = "features")
# Use Logistic Regression.
lr = LogisticRegression().setParams(
maxIter = 100000,
labelCol = "is_fraud",
predictionCol = "prediction")
model = Pipeline(stages = [assembler, lr]).fit(train)
For the VectorAsembler, we want to use the columns V1 to V28 as well as the Amount of the transaction. Therefore, we ignore the Time, Class, and is_fraud columns. Using Logistic Regression, we create our model.
Next, we’ll predict whether a transaction is fraudulent or not, using the test data, as follows:
predicted = model.transform(test)
And show the predictions as follows:
display(predicted)
Finally, we’ll check the performance of our model using a confusion matrix:
predicted = predicted.withColumn("is_fraud", is_fraud(predicted.Class))
predicted.crosstab("is_fraud", "prediction").show()
Overall, the results should show that our initial model makes good predictions. Because Data Science and Machine Learning are iterative processes, we can look for ways to improve and tune our classifier. For example, normalizing the data could be very useful and something to explore in the next iteration.
Summary
In this article series, we have seen how easily SingleStore can be used with Spark. The key benefits of the SingleStore Spark Connector can be summarised as follows:
- Implemented as a native Spark SQL plugin.
- Accelerates ingest from Spark via compression.
- Supports data loading and extraction from database tables and Spark Dataframes.
- Integrates with the Catalyst query optimizer and supports robust SQL Pushdown.
- Accelerates ML workloads.
In a future article, we'll explore External Functions and discuss how they could be useful for Machine Learning.
In the next article series, we’ll look at an example of Pushdown. Stay tuned!
Published at DZone with permission of Akmal Chaudhri. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments