Sensor Data Quality Management Using PySpark and Seaborn
Learn how to check data for required values, validate data types, and detect integrity violation using data quality management (DQM).
Join the DZone community and get the full member experience.
Join For FreeData quality management (DQM) is the process of analyzing, defining, monitoring, and improving the quality of data continuously. A few data quality dimensions widely used by the data practitioners are accuracy, completeness, consistency, timeliness, and validity. Various DQM rules are configured to apply DQM to the existing data. These DQM rules are applied to clean up, repair, and standardize incoming data and to identify and correct invalid data.
In this blog, let's check data for required values, validate data types, and detect integrity violation. DQM is applied to correct the data by providing default values, formatting numbers and dates, and removing missing values, null values, non-relevant values, duplicates, out of bounds, referential integrity violations, and value integrity violations.
Prerequisites
Install the following Python packages:
- PySpark
- XGBoost
- Pandas
- Matplotlib
- Seaborn
- NumPy
- sklearn
Data Description
Sensor data from the pub-nub source is used as the source file.
- Total record count: 6K
- File types: JSON and CSV
- # of columns: 11
- # of records: 600K
- # of duplicate records: 3.5K
- # of NA Values:
- Ambient temperature: 3370
- Humidity: 345
- Sensor IDs: 12
Use Case
Perform data quality management on sensor data using the Python API PySpark.
Data quality management process:
Synopsis:
Data integrity
Data profiling
Data cleansing
Data transformation
Data Integrity
Data integrity is the process of guaranteeing the quality of the data in the database.
- Analyzed input sensor data with:
- 11 columns
- 6K records
- Validated source metadata
- Populated relationships for an entity
Data Profiling
Data profiling is the process of discovering and analyzing enterprise metadata to discover patterns, entity relationships, data structure, and business rules. It provides statistics or informative summaries of the data to assess data issues and quality.
Few data profiling analyses include:
- Completeness analysis: Analyze frequency of attribute population versus blank or null values.
- Uniqueness analysis: Analyze and find unique or distinct values and duplicate values for a given attribute across all records.
- Values distribution analysis: Analyze and find the distribution of records across different values of a given attribute.
- Range analysis: Analyze and find minimum, maximum, median, and average values of a given attribute.
- Pattern analysis: Analyze and find character patterns and pattern frequency.
Generating Profile Reports
To generate profile reports, use either Pandas profiling or PySpark data profiling using the below commands:
Pandas profiling:
import pandas as pd
import pandas_profiling
import numpy as np
#Read the source file that contains sensor data details
df= pd.read_json('E:\sensor_data.json', lines=True)
#Preprocessing on data
df = df.replace(r'\s+', np.nan, regex=True)
df['ambient_temperature']= df['ambient_temperature'].astype(float)
df['humidity'] = df['humidity'].astype(float)
#Generate profile report using pandas_profiling
report = pandas_profiling.ProfileReport(df)
#covert profile report as html file
report.to_file("E:\sensor_data.html")
PySpark profiling:
import pandas as pd
import spark_df_profiling
import numpy as np
#Initializing PySpark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
#Spark Config
conf = SparkConf().setAppName("sample_app")
sc = SparkContext(conf=conf)
sql = SQLContext(sc)
# Loading transaction Data
sensor_data_df = sql.read.format("com.databricks.spark.csv").option("header", "true").load("E:\spireon\Data\ganga\sensor_data.csv")
report = spark_df_profiling.ProfileReport(sensor_data_df)
report.to_file("E:\spireon\Data\ganga\pyspark_sensor_data_profiling_v2.html")
The profile report provides the following details:
- Essentials: Type, unique values, missing values
- Quantile statistics: Minimum value, Q1, median, Q3, maximum, range, interquartile range
- Descriptive statistics: Mean, mode, standard deviation, sum, median absolute deviation, coefficient of variation, kurtosis, and skewness
- Most frequent values
- Histogram
Profile report overview:
The sample profile report for a single attribute (ambient temperature) is as follows:
Ambient temperature statistics:
Ambient temperature histogram:
Ambient temperature extreme values:
To view the complete profile report, see the References section.
Data Cleansing
Data cleansing is the process of identifying incomplete, incorrect, inaccurate, duplicate, or irrelevant data and modifying, replacing, or deleting dirty data.
Analyzed the number of null (NaN) values in the dataset using the command
df.isnull().sum()
.
The number of null values is as follows:
Deleted NaN values in String type columns using the below command:
df_v1 = df.dropna(subset=['sensor_id', 'sensor_name','sensor_uuid'], how='all')
df_v1.isnull().sum()
- Imputed missing values using one of the below methods.
Method 1: Impute Package
Imputation is defined as the process of replacing the missing data with substituted values using any of the following options:
most_frequent
: Columns of thedtype
object (String) are imputed with the most frequent values in the column as mean or median cannot be found for this data type.- Mean: Ratio of the sum of elements to the number of elements in the list.
- Median: Ratio of the sum of middle two numbers to two.
Note: If the missing values in the records are negligible, ignore those records.
In our use case, the most_frequent
strategy is used for substituting the missing values using the below command:
imputer=Imputer(missing_values='NaN',strategy='most_frequent', axis=0)
imputer=imputer.fit(df_v1.ix[:,[2,3,4,5,6]])
df_v1.ix[:,[2,3,4,5,6]] =imputer.transform(df_v1.ix[:,[2,3,4,5,6]])
Method 2: Linear Regression Model
To replace the missing data with the substituted values using the linear regression model, use the below commands:
from sklearn.linear_model import LinearRegression,LogisticRegression
# Split values into sets with known and unknown ambient_temperature values
df_v2 = df_v1[["ambient_temperature","humidity","photosensor","radiation_level"]]
knownTemperature = df_v2.loc[(df_v1.ambient_temperature.notnull())]
unknownTemperature = df_v2.loc[(df_v1.ambient_temperature.isnull())]
# All ambient_temperature values stored in a target array
Y = knownTemperature.values[:, 0]
# All the other values stored in the feature array
X = knownTemperature.values[:,1::]
# Create and fit a linear regression model
linear_regression = LinearRegression()
linear_regression.fit(X, Y)
# Use the fitted regression model to predict the missing values
predictedTemperature = linear_regression.predict(unknownTemperature.values[:, 1::])
# Assign those predicted values to the full data set
df_v1.loc[ (df_v1.ambient_temperature.isnull()), 'ambient_temperature' ] = predictedTemperature
Data Transformation
Data transformation deals with converting data from the source format into the required destination format.
Converted attributes such as
ambient_temperature
and humidity from object type to float type using the below command:
#Preprocessing on data transformation
df = df.replace(r'\s+', np.nan, regex=True)
df['ambient_temperature']= df['ambient_temperature'].astype(float)
df['humidity'] = df['humidity'].astype(float)
- Converted a
non_numeric
value ofsensor_name
into numeric data using the below command:
labelencoder_X=LabelEncoder()
labelencoder_X.fit(df_v1.ix[:,6])
list(labelencoder_X.classes_)
df_v1.ix[:,6] = labelencoder_X.transform(df_v1.ix[:,6])
- Converted a
non_numeric
sensor name into numeric data using the below command:
labelencoder_y=LabelEncoder()
labelencoder_y.fit(df_v1.ix[:,4])
list(labelencoder_y.classes_)
df_v1.ix[:,4] = labelencoder_y.transform(df_v1.ix[:,4])
- Converted a
non_numeric
value of sensor ID into numeric data using the below command:
labelencoder_z=LabelEncoder()
labelencoder_z.fit(df_v1.ix[:,5])
list(labelencoder_z.classes_)
df_v1.ix[:,5] = labelencoder_z.transform(df_v1.ix[:,5])
- Based on the above transformation, find feature importance using built-in function using the below commands:
# plot feature importance using built-in function<br> from numpy import loadtxt<br> from xgboost import XGBClassifier<br> from xgboost import plot_importance<br> from matplotlib import pyplot
# split data into X and y
X = df_v1.ix[:,[0,1,2,3,4,5,6,7,10]]
Y = df_v1.ix[:,[10]]
plt.clf()
# fit model no training data
model = XGBClassifier()
model.fit(X, Y)
# plot feature importance
plot_importance(model)
plt.gcf().subplots_adjust(bottom=0.15)
plt.tight_layout()
plt.show()
Feature importance chart:
From the above diagram, it is evident that photosensor feature has the highest importance and the latitude feature has the lowest importance.
Correlation Analysis
Performed correlation analysis to explore data relationships and data correlations to highlight weak data relationships and find potential incorrect relationships. The correlation analysis between the sensor data variables is shown in the below diagram:
From the above diagram, it is evident that the ambient_temperature
is highly correlated with the dewpoint and humidity and the latitude and longitude are negatively correlated as per the correlation analysis.
Reference
Published at DZone with permission of Rathnadevi Manivannan. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments