Complex Event Processing - Using Drools Fusion
Drools can help you process streams to perform tasks such as threat detection, real-time event monitoring, tracking user behavior, and more.
Join the DZone community and get the full member experience.
Join For FreeGitHub Link (Code Samples from the Article & More)
https://github.com/sumithpuri/skp-code-marathon-sherlock
Complex Event Processing is used to process a large stream of information and can be used for real-time event monitoring or correlation. Events can be processed in two ways, that is either in the 'stream' mode or in the 'cloud' mode. The following image illustrates the difference between the two modes:
Stream Mode vs. Cloud Mode [Image Available From SlideShare]
The continuous flow of information or events can be classified into one of these brackets (or even both) for analysis or correlation. The cloud mode would be useful in the following circumstances: user behavior, market data, and activity monitoring. The stream mode could be most useful in applications, such as real-time event monitoring, event correlation, and sensor networks.
The most useful end-applications are Threat Detection, Anomaly Detection, Airport Security, Market Prediction, Forecasting Profits, Automating Algorithmic Trading Decision, among a host of other applications.
By the way, Sliding Window and Batch Window will need more clarity for any discussion on Complex Event Processing. For most architects and engineers, this will come across as a very novel way of analyzing information. If this is the first time they are reading about this:
The Batch Window illustration as given below, demonstrates that the information window is processed in discrete or fixed slot or block of events.
[Image Available From Oracle]
The Sliding Window illustration, as given below, demonstrates that the information window is processed in a continuous or moving slot or block of events.
[Image Available From Oracle]
Introducing Sherlock (Mystery That is Data), which is an event correlation application that demonstrates the above concept of Complex Event Processing. It is built for the domain of banking for anomaly and threat detection. It will analyze the following use-cases that have been listed at the 'Top Threats, Especially in Banking Sector By SANS Institute'. SANS Institute is a Co-operative Research and Training Institute for Information Security.
Detect if there are more than 10 port or IP scan attempts from the same IP address (and port) in any of the last 10 seconds [port scan and IP scan by SANS Institute].
Detect if there are more than five repeated login attempts from the same IP address in any of the last 30 seconds [database login or intrusion attempts by SANS Institute].
Detect if the traffic on a port x has spiked — if any of the last 30 seconds had more than five accesses [repeated port access by SANS Institute].
I will demonstrate only the first use-case in this blog (including how to run the 'Intelligent Data Loader' and Possibly Hookup with a 'User Interface') to understand the anomaly and complex event processing. You may need to do the following before you can download and understand Sherlock!:
A. Download Drools 6.1.0 Distribution (Include in Classpath)
B. Download the Eclipse Plugin for Drools (Include in Classpath)
C. Use JDK 1.8.0 and JEE 1.7 Libraries (If Required) (Include in Classpath)
D. Brief Read on MVEL Dialect and Drools Fusion (Above/Official)
Create the SherlockEvent (and SherlockEventCorrelation) Java Object
package com.bw2015.sherlock.biz.vo;
/**
* @author spuri
*
*/
public class SherlockEvent {
private int eventId;
private String eventType;
private String eventDescription;
private String eventSourceIp;
private String eventDestinationIp;
private String eventSourcePort;
private String eventDestinationPort;
private String eventSourceCountry;
private String eventDestinationCountry;
private String eventSourceUsername;
private String eventDestinationUsername;
private String eventRemarks;
private long eventSourceTime;
private long eventDestinationTimestamp; ... // Refer Bundled Code
Code the 'Rule/Condition' Using Drools 'MVEL' Dialect (Use-Case 01)
package com.bw2015.sherlock.biz.cep
// list any import classes here.
import com.bw2015.sherlock.biz.vo.SherlockEvent;
import com.bw2015.sherlock.biz.vo.SherlockEventCorrelation;
declare SherlockEvent
@role(event)
@expires(20s)
@timestamp (eventDestinationTimestamp)
end
declare SherlockEventCorrelation
@role(event)
@expires(20s)
@timestamp (eventDestinationTimestamp)
end
global Long startTime;
global Long startMemory;
global Long totalFactCount;
global java.util.HashMap threatMap;
// use case 01
// detect if there are more than ten port or ip scan attempts from the same ip address (and port)
// to the destination ip address (and multiple ports) in the given window
rule "Port and IP Scan Event Processing Initial"
dialect "mvel"
no-loop
when
e1: SherlockEvent(eventType == "port and ip scan") over window:time(10s)
not SherlockEventCorrelation(eventSourceIp == e1.eventSourceIp, eventDestinationIp == e1.eventDestinationIp, eventSourcePort == e1.eventSourcePort)
then
SherlockEventCorrelation plec = new SherlockEventCorrelation();
plec.setEventSourceIp(e1.eventSourceIp);
plec.setEventDestinationIp(e1.eventDestinationIp);
plec.setEventSourcePort(e1.eventSourcePort);
plec.setEventDestinationPort(e1.eventDestinationPort);
plec.setEventCorrelation(0);
insert(plec);
end
rule "Port and IP Scan Event Processing Correlation"
dialect "mvel"
no-loop
when
e1: SherlockEvent(eventType == "port and ip scan") over window:time(10s)
ce: SherlockEventCorrelation(eventSourceIp == e1.eventSourceIp, eventDestinationIp == e1.eventDestinationIp, eventSourcePort == e1.eventSourcePort, $eventCorr : eventCorrelation >= 0)
then
$eventCorr++;
ce.eventCorrelation=$eventCorr;
if(ce.eventCorrelation >= 10) {
System.out.println("");
System.out.println("+++++++++++++++ USE CASE 01 +++++++++++++++");
System.out.println("SOURCE INET: " + ce.eventSourceIp);
System.out.println("SOURCE PORT: " + ce.eventSourcePort);
System.out.println("DESTIN INET: " + ce.eventDestinationIp);
System.out.println("EVENT ACTN: " + "port and ip scan");
System.out.println("TIMESTAMP : " + new java.util.Date(ce.eventDestinationTimestamp));
System.out.println("OCCURENCES : " + $eventCorr);
System.out.println("+++++++++++++++++++++++++++++++++++++++++++");
System.out.println("");
}
update( ce );
threatMap.put(new java.util.Date(), ce);
end
Configure Drools Fusion
<?xml version="1.0" encoding="UTF-8"?>
<kmodule xmlns="http://jboss.org/kie/6.0.0/kmodule">
<kbase name="event" packages="event" eventProcessingMode="stream">
<ksession name="sherlock-event"/>
</kbase>
</kmodule>
The above file named 'kmodule.xml' is included in the META-INF of your project. Make sure you make it available in the classpath of your main class.
Code the Drools Java Runtime to Send or Process Events
The SherlockComplexEventProcessing includes the Java code for the Drools Fusion runtime. The following are the most important activities performed by this runtime.
Declare Drools Java Runtime Variables
package com.bw2015.sherlock.biz.cep;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import org.kie.api.KieBaseConfiguration;
import org.kie.api.KieServices;
import org.kie.api.conf.EventProcessingOption;
import org.kie.api.runtime.KieContainer;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.KieSessionConfiguration;
import org.kie.api.runtime.conf.ClockTypeOption;
import org.kie.internal.KnowledgeBase;
import org.kie.internal.KnowledgeBaseFactory;
import org.kie.internal.builder.KnowledgeBuilder;
import org.kie.internal.builder.KnowledgeBuilderFactory;
import com.bw2015.sherlock.biz.vo.SherlockEvent;
import com.bw2015.sherlock.biz.vo.SherlockEventCorrelation;
/**
* @author spuri
*
* SherlockComplexEventProcessing is a Sherlock service that provides the most
* essential part of feeding data to the Knowledge Is Everything API of Drools.
* It will provide data that is in order or even out-of-order. In essence, it
* provides the core of the Sherlock Intellect.
*
*/
public class SherlockComplexEventProcessing {
private static SherlockComplexEventProcessing cepService = null;
// Drools Fusion Runtime Configuration
private KieBaseConfiguration kieConfiguration;
private KnowledgeBase kieBase;
private KieServices ks;
private KieContainer kContainer;
private KieSession kSession;
private KnowledgeBuilder kbuilder; ... // Refer Bundled Code
Initialize and Instantiate Drools Variables
public void init() {
try {
System.out.println("initializing kie runtime for drools fusion...");
kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
// kbuilder.add(ResourceFactory.newClassPathResource("event.drl"),
// ResourceType.DRL);
if (kbuilder.hasErrors()) {
System.out.println(kbuilder.getErrors().toString());
}
kieConfiguration = KieServices.Factory.get().newKieBaseConfiguration();
kieConfiguration.setProperty("drools.dialect.mvel.strict", "false");
kieConfiguration.setProperty("org.kie.demo", "false");
kieConfiguration.setOption(EventProcessingOption.STREAM);
ks = KieServices.Factory.get();
kContainer = ks.getKieClasspathContainer();
kieBase = KnowledgeBaseFactory.newKnowledgeBase(kieConfiguration);
kieBase.addKnowledgePackages(kbuilder.getKnowledgePackages());
// clock type for the session
KieSessionConfiguration sessionConfiguration = KnowledgeBaseFactory.newKnowledgeSessionConfiguration();
sessionConfiguration.setOption(ClockTypeOption.get("realtime"));
kSession = kContainer.newKieSession("sherlock-event", sessionConfiguration);
kSession.setGlobal("threatMap", new HashMap<Long,SherlockEventCorrelation>());
kSession.setGlobal("startTime", new Date().getTime());
kSession.setGlobal("startMemory", Runtime.getRuntime().freeMemory());
kSession.setGlobal("totalFactCount", totalFactCount);
System.out.println("initialized the kie runtime for drools fusion...");
} catch (Exception e) {
e.printStackTrace();
}
}
Send Event by Event for Complex Event Processing to Drools Runtime
public void execute(SherlockEvent event) {
// try {
// anything to with event object
kSession.setGlobal("totalFactCount", totalFactCount++);
kSession.insert(event);
kSession.fireAllRules();
HashMap threatM=(HashMap) kSession.getGlobal("threatMap");
LinkedList list=new LinkedList();
list.addAll(threatM.values());
threats.pushAll(list);
if(prevTime==0) prevTime=Long.parseLong(kSession.getGlobal("startTime").toString());
currTime=new Date().getTime();
}
}
Setup the Data Loader (Asynchronous Is Preferred — Think JMS Extension)
Now run the SherlockDataLoaderDriver, which in turn starts the SherlockDataLoaderThread to intelligently load random data and 'Inject Positive Cases' into the large stream of information. We have controlled the above data load to create only 100 random records and then wait for 10 seconds. You can change this for your demo or POC purposes to suit a larger data stream and lesser or greater wait time.
Output or Outcome, of 'Anomaly/Threat Detection', From Sherlock (Use-Case 01)
You may include the Sherlock code for a demo, hack, or for proof-of-concept of Drools Expert or Drools Fusion, or simply Complex Event Processing.
Download the entire Sherlock code, Data Loader, and Basic User Interface as an Eclipse Project.
[Sherlock! was my Hackathon creation for the Societe Generale Brainwaves 2015 (along with three other team members, Team: The_Big_Billionaire$). It also was based on my previous experience at an information security company as a Java Senior Technical Architect. You may additionally refer to the Sherlock! Presentation Slide, which I had created for my Hackathon to understand Sherlock! better. We also had created a Basic User Interface which can be used on PC and be adapted to Mobile as well. I have not explained how to integrate — but if you can go through the code, there is Java Servlet code to get you started on the same.]
Published at DZone with permission of Sumith Puri. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments