Deep Dive Into Apache Flink's TumblingWindow — Part 2
In this post, I am going to provide examples of overriding these TumblingWindow defaults and handling late values.
Join the DZone community and get the full member experience.
Join For FreeIn my previous post, I shared examples of how to define and use TumblingWindow in its default behavior. In this post, I am going to provide examples of overriding these defaults and handling late values.
You may also like: Deep Dive Into Apache Flink's TumblingWindow — Part 1
Customizing Tumbling Window's Time Offset
Let us take an example as shown below (continuing from the previous post where Flink stream execution environment object has been created and a simple integer generator is our source).
xxxxxxxxxx
intStream
.windowAll( TumblingProcessingTimeWindows.of( Time.seconds( 5 ), Time.seconds( 2 ) ) )
.process( new ProcessAllWindowFunction<Integer, Integer ,TimeWindow>()
{
public void process( Context arg0, Iterable<Integer> input, Collector<Integer> output ) throws Exception
{
logger.info( "Computing sum for {}", input );
int sum = 0;
for(int i : input) {
sum += i;
}
output.collect( sum );
}
})
.print();
The only change here is Line #2 which now instead of using "timeWindowAll( Time.seconds( 5 ) )"
is now using more detailed "windowAll( TumblingProcessingTimeWindows.of( Time.seconds( 5 ),
Time.seconds( 2 ) ) )"
TimeWindowAll() is a wrapper method and defaults to windowAll(TumblingProcessingTimeWindows.of(size)) i.e. A window of fixed size by time (this time is system's time where Flink job is running).
Note — Flink has more than one notion of time which I will discuss later in this post.
As I shared in the previous post, by default Flink starts window at clock boundary but using a second parameter to windowAll()
we can customize the clock boundary.
Following shows, sample run for above code
Line #1 — #5 = Flink starts a window, collects integers. However, at 19:26:37, this window closure triggers and sum of [1,2,3,4] is printed at Line #6
Note — If the offset was not provided then Flink would have closed the window at "19:26:35". But since the offset was 2 seconds, it made the window end at extra 2 seconds beyond clock boundary.
TumblingWindow With EventTime
So far in our discussion, we have taken 'time' as the default system time where Flink is executing the job. However, in many use cases, we want to use actual time of event i.e. when the event was created at the event source. To handle such scenarios Flink supports 3 ways to handle 'time'. Let us look at Event Time and how it can be used in Flink.
In Event time, elements are grouped into windows based on the timestamp of the element itself and not any system clock. Let us see an example.
First I defined a simple POJO class named "Element" as follows. I have used lombok to generate getters/setters for me through annotations.
xxxxxxxxxx
public class Element
{
Integer value;
Long timestamp;
public Element( int counter, long currTime )
{
this.value = counter;
this.timestamp = currTime;
}
public String toString()
{
return "" + value;
}
}
Next, I define a simple Source class called "ElementGeneratorSource" which will create objects of type Element and assign random increasing timestamp. This is to ensure I do not produce an Element with matching system time. In practice, you would have the timestamp coming as part of the event itself.
xxxxxxxxxx
class ElementGeneratorSource implements SourceFunction<Element>
{
volatile boolean isRunning = true;
final Logger logger = LoggerFactory.getLogger(ElementGeneratorSource.class);
public void run( SourceContext<Element> ctx ) throws Exception
{
int counter = 1;
// 20 seconds behind flink program's start time
long eventStartTime = System.currentTimeMillis() - 20000;
// create first event using above timestamp
Element element = new Element(counter++, eventStartTime);
while( isRunning )
{
logger.info("Produced Element with value {} and timestamp {}", element.getValue(), printTime(element.getTimestamp()));
ctx.collect( element );
// create elements and assign timestamp with randomness so that they are not same as current system clock time
element = new Element(counter++, element.getTimestamp() + ThreadLocalRandom.current().nextLong( 1000, 6000 ));
Thread.sleep( 1000 );
}
}
public void cancel()
{
isRunning = false;
}
// helper function to print epoch time in readable format
String printTime(long longValue)
{
return LocalDateTime.ofInstant(Instant.ofEpochMilli(longValue), ZoneId.systemDefault()).toString();
}
}
Now, let us define a pipeline to process these elements using TumblingEventTime Windows. (I have removed class and method declaration lines to focus on important code blocks.)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// set to EventTime else it defaults to ProcessTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<Element> elementStream = env.addSource( new ElementGeneratorSource() );
elementStream
.assignTimestampsAndWatermarks( new AscendingTimestampExtractor<Element>()
{
public long extractAscendingTimestamp( Element element )
{
return element.getTimestamp();
}
})
.windowAll( TumblingEventTimeWindows.of( Time.seconds( 10 ) ) )
.process( new ProcessAllWindowFunction<Element, Integer ,TimeWindow>()
{
public void process( Context arg0, Iterable<Element> input, Collector<Integer> output ) throws Exception
{
logger.info( "Computing sum for {}", input );
int sum = 0;
for(Element e : input) {
sum += e.getValue();
}
output.collect( sum );
}
})
.print();
env.execute();
Line #1 — Define a streaming execution environment to start with Flink streaming.
Line #4 — Needs to be set to EventTime otherwise Flink will ignore timestamp inside elements and use the default system clock.
Line #6 — Create a DataStream using the ElementGenerator as source (discussed earlier in this article)
Line #9 — Before defining a window, I need to inform Flink how to get timestamp and watermark for each element it receives.
In this example, I am using a very handy class "AscendingTimestampExtractor" which as per Flink doc is, "A timestamp assigner and watermark generator for streams where timestamps are monotonously ascending. In this case, the local watermarks for the streams are easy to generate, because they strictly follow the timestamps." One more benefit of using this Flink-provided API is that it will generate a watermark for me. A watermark is a way for Flink to know when to close the current Window (last element belonging to a window has arrived).
In short,assignTimestampsAndWatermarks()
will allow Flink to know how to read the timestamp from event/element coming to Flink and most important, how to compute watermarks.
Line #17 — Define a window of type TumblingEventTimeWindows
with size as 10 seconds.
The rest of the code is similar to previous where we sum the values and print it.
Sample output of the above example
Three elements are produced at Line #1, #2, #3 with timestamp different than that of the system clock. (the system clock time is printed first before log level).
When 3rd element is produced at "2020-02-22T22:22:02.495", it triggers current window closure because watermark has been breached. With a 10 seconds TimeWindow, the end time here will be "2020-02-22T22:21:59.000". So the current window collects only the first two values.
In the next run, the window will close at "2020-02-22T22:22:09.000" which means that value 3 and value 4 will be collected in a new window since Line #7 has an element with timestamp >= current watermark.
Conclusion
In this article, we discussed overriding the default time clock boundary and how to work with TumblingEventTimeWindow. We also saw an example of assigning timestamp to elements.
In the next part, I will share and discuss handling late elements that arrive after a watermark is generated for the current window.
Further Reading
Apache Flink Basic Transformation Example
Log Analysis 101 With Apache Flink, Spring Boot, and ActiveMQ
Opinions expressed by DZone contributors are their own.
Comments