On Some Aspects of Big Data Processing in Apache Spark, Part 3: How To Deal With Malformed Data?
In this post I present some solutions on how to deal with malformed date/time data, and on how to set a default value for malformed data.
Join the DZone community and get the full member experience.
Join For FreeIn my previous post, I presented design patterns to program Spark applications in a modular, maintainable, and serializable way. This time I demonstrate a solution to deal with malformed date/time data, and how to set a default value to malformed data.
When I worked on a big data project, my tasks were to load data in different formats (JSON, orc, etc) from different sources (Kafka, Hadoop Distributed File System, Apache Hive, Postgres, Oracle), then transform the data, and to save the data to the same or different sources. The simplest task was to load data from a single data source (Postgres), and then save the data to another source (Hive), without any transformations.
Even in this simplest case, there were a lot of malformed data! Especially, malformed date/time data took our team a lot of time to deal with. Also, rather often there were null values, and, sometimes empty arrays of data. So, it is worthwhile to have compact and versatile solutions to process such irregularities.
This post is organized as follows:
- Section 1 describes how to deal with malformed date/time data,
- Section 2 describes a simple decorator pattern to assign a default value to a malformed piece of data.
A fully workable code can be found here. Let's go.
I. Malformed Date/Time Data.
In my case, data was received in JSON format either from Kafka or from JSON files. Json-formatted data is built on two structures: a collection (object) of key-value pairs and an ordered list (array) of values. Values can be an object, an array, a string, a decimal number, a boolean, or null. So, a piece of date/time data usually comes as a string.
Java doesn't have a built-in Date class, but there is a java.time
package to work with date and time. The package contains LocalDate
, LocalTime
, LocalDateTime
, ZonedDateTime
, etc classes to store date/time data and parse date/time strings. Also, there are parsers, like SimpleDateFormat and DateTimeFormatter. These parsers accept a pattern string, like "dd.MM.yyyy HH:mm", and an input string, like "20.10.2020 12:30", to return a DateTime or Date object out of the input string. These objects can then act as fields in a Hibernate @Entity
. Looks pretty straightforward, right?
Unfortunately, no. In many big data projects, date/time data comes in many different formats. Moreover, I encountered examples, when date/time data strings contained substrings in a different language! So, it may not be possible to parse such date strings with a single pattern or even by a single parsing tool.
To attack this problem, lets recall Chain of Responsibility design pattern. The pattern consists of a Handler
interface and ConcreteHandler
s implementations. A ConcreteHandler refers to another ConcreteHandler; all the ConcreteHandlers form a linked list. The last ConcreteHandler refers to null.
In our case, this pattern is implemented as follows. Our Handler
interface is called IChainDT
:
public interface IChainDT {
LocalDate getDateTime(String input);
Date parseDateTime(String input);
void setNextChain(IChainDT element);
}
Here parseDateTime
method parses date/time strings, getDateTime
converts a Date object to a more convenient LocalDate object, and setNextChain
method sets a link to another parser. The converter is added to demonstrate how the parser can make output dates "prettier" before the dates are returned.
SimpleDateTimeParser
class implements IChainDT
interface:
public class SimpleDateTimeParser implements IChainDT {
String shortDateTimePattern;
IChainDT dateTimeParser = null;
Date defaultTime = new Date(0L);
public SimpleDateTimeParser(String pattern) {
shortDateTimePattern = pattern;
}
public SimpleDateTimeParser(String pattern, IChainDT nextValidator) {
this(pattern);
this.dateTimeParser = nextValidator;
}
public LocalDate getDateTime(String json) {
Date result =parseDateTime(json);
return result.toInstant().atZone(ZoneId.systemDefault()).toLocalDate());
}
public void setNextChain(IChainDT validator) {
this.dateTimeParser = validator;
}
public Date parseDateTime(String input) {
DateFormat simpleDateFormatter=new SimpleDateFormat(shortDateTimePattern);
try {
return simpleDateFormatter.parse(input);
} catch (Exception e) {
if (this.dateTimeParser != null) return this.dateTimeParser.parseDateTime(input);
else return defaultTime;
}
}
}
Here IChainDT dateTimeParser
is a reference to another parser, String shortDateTimePattern
is a date/time pattern string. The other parser reference can be set either via the two-argument constructor or via the setter setNextChain
.
Notice how the parseDateTime
method works. Firstly, the method creates an instance SimpleDateFormat
with a specific pattern; we need the instance to be a local variable for the SimpleDateTimeParser
to be serializable (this post explains how Spark serializes tasks). If the simpleDateFormatter (with the specified pattern) fails to parse the input string, the formatter throws an exception.
The exception gets caught in the catch block. If there is a dateTimeParser
next in the chain, the next dateTimeParser.parseDateTime(input)
gets called. If the current parser is the last in the chain, the last parser's default value is returned; the value may be null.
Finally, let's see what this parser is called.
@Test
public void parserTest(){
String pattern1 = "yyyy-MM-dd";
String pattern2 = "yyyy.MM.dd";
IChainDT validator1 = new SimpleDateTimeParser(pattern1);
IChainDT validator2 = new SimpleDateTimeParser(pattern2);
validator1.setNextChain(validator2);
String testString = "2020-10-19";
LocalDate result = validator1.getDateTime(testString);
assertEquals(result.getYear(),2020);
testString = "2020.10.19";
result = validator1.getDateTime(testString);
assertEquals(result.getYear(),2020);
testString="10/19/2020";
result = validator1.getDateTime(testString);
assertEquals(result.getYear(),1969);
}
First, we create parsers for every pattern string. Next, we chain the parsers. Finally, we call the first parser in the chain on a date/time string. If none of the parsers succeeds in parsing the string, the default LocalDate
value (new Date(0L)
in this case) of the last parser in the chain is returned.
This parser can also be implemented via an abstract class. In this case, we define an abstract class AChainDT
instead of the interface IChainDT:
public abstract class AChainDT {
public AChainDT( String shortDateTimePattern) {
this.shortDateTimePattern = shortDateTimePattern;
}
protected AChainDT nextParser=null;
protected String shortDateTimePattern;
protected Date defaultTime = new Date(0L);
public void setNextParser(AChainDT nextParser) {
this.nextParser = nextParser;
}
public LocalDate getDateTime(String input) {
Date result =parseDateTime(input);
LocalDate localDate = result.toInstant().atZone(ZoneId.systemDefault()).toLocalDate();
return localDate;
}
public abstract Date parseDateTime(String input);
}
Here, the abstract class contains the common part of all parsers - another parser, a pattern string, and the Date to LocalDate converter. A ConcreteHandler now looks more concise:
public class SimpleDateTimeParserA extends AChainDT{
public SimpleDateTimeParserA(String shortDateTimePattern) {
super(shortDateTimePattern);
}
@Override
public Date parseDateTime(String input) {
DateFormat simpleDateFormatter=new SimpleDateFormat(shortDateTimePattern);
try {
Date result = simpleDateFormatter.parse(input);
return result;
} catch (Exception e) {
if (nextParser != null) return nextParser.parseDateTime(input);
else return defaultTime;
}
}
}
Again, we create a SimpleDateFormatter
instance as a local variable for the parser to be serializable. This parser runs as before, except we replace IChainDT
with AChainDT
and SimpleDateTimeParser
with SimpleDateTmeParserA
. See the code for details.
II. Default Value Decorator.
As I mentioned in the introduction, a lot of nulls and empty arrays come as values in JSON strings. Also, sometimes when data is transferred from one database to another, the data types need to be converted, like Integer to BigDecimal. In all these cases, NullPointerExceptions, ArrayIndexOutOfBondsExceptions, and other exceptions need to get caught and processed.
A common scenario is when there is a functional interface to be fed as a callback to RDD transformations or actions. Let's decorate such an interface to catch and process exceptions.
import org.apache.spark.api.java.function.Function;
public interface IExceptionDecoratorSpark {
static <Input, Output> Function<Input, Output> process(Function<Input, Output> fun, Output def) {
return new Function<Input, Output>() {
@Override
public Output call(Input o) {
try {
return (Output) fun.call(o);
} catch (NullPointerException e) {
return null;
} catch (Exception e) {
return def;
}
}
};
}
}
Here the fun
is an input function that implements Function
interface. This function's input is an Input
type object, and the output is an Output
-type object. The interface, returned by the process
method, overrides a call
method; inside the call
method the fun
is called. If there are exceptions, they get caught in the catch block and a null or a provided default def
value is returned. As it should be in Java, more specific exceptions should be processed first.
This decorator is called the following way:
@Test
public void basicProcessorSparkTest() throws Exception {
Double def = 10000.0;
Double shouldBe = 0.5;
Function<Integer, Double> fun = (x) -> 1.0 / x;
Function<Integer, Double> outFun = IExceptionDecoratorSpark.process(fun, def);
Double result = outFun.call(2);
assertEquals(result, shouldBe);
}
In this case, the fun
returns an inverse of its input. In this case, fun works regularly.
On the other hand, here is an example of when an exception is thrown and processed; a provided default value is returned as a result:
@Test
public void exceptionProcessorSparkTest() throws Exception {
Integer def = 10;
Double shouldBe = 0.5;
Integer[] input = new Integer[0];
Function<Integer[], Integer> fun = (x) -> x[1];
Function<Integer[], Integer> outFun = IExceptionDecoratorSpark.process(fun, def);
Integer result = outFun.call(input);
assertEquals(result, def);
}
The fun
returns the second element of an input array of integers. If such an element doesn't exist, the provided default value is returned.
Notice that org.apache.spark.api.java.function.Function
interface is not the same as java.util.function.Function
. The former has to implement a call
method; also the former interface is Serializable
. The latter has to implement an apply
method that is not necessarily serializable. The presented approach also works for java.util.function.Function
interfaces, if we replace the input function type and the call
method for an apply
method. See the code for details.
Conclusions
In this post, I demonstrated possible ways how to process malformed date/time data and how to create a default value decorator. The date/time processor can handle date/time strings that can not be parsed by means of a single template string. The decorator returns different default values for different exceptions thrown. Hope these tricks will be helpful for you.
Opinions expressed by DZone contributors are their own.
Comments