How to Run Any Dockerized Application on Spring Cloud Data Flow
Let's address the issue of including any Docker image while building data pipelines using Spring Cloud Data Flow, which can natively interpret only Spring Boot images.
Join the DZone community and get the full member experience.
Join For FreeSpring Cloud Data Flow (SCDF) provides tools to build streaming and batch data pipelines. The data processing pipelines could be deployed on top of a container orchestration engine, for example, Kubernetes. Spring Boot applications could be one of the starter apps or custom boot applications packaged as Docker images if Kubernetes is the runtime for these pipelines. However, Python is a popular data munging tool, but SCDF cannot natively interpret Python Docker images. In this article, we will create a streaming data pipeline that starts a Python Docker image in one of the intermediate steps and use the result of computation in downstream components.
Scenario
Let us consider a scenario where we are receiving data from sensors that generate sensor data tags as time series data. The sensor data is a JSON array, an example of which is shown below.
{
"tags":[
{
"tagId":"d624523e-29eb-4d23-86eb-08d7b881a3a8",
"tagType":0,
"timestamp":1528927600678,
"value":"10.0",
"quality":3,
"datatype":"String",
"additionalProperties":{
}
}
]
}
Our data pipeline consists of accepting this JSON array via an HTTP POST. Let there be a configurable parameter, say value-threshold
, whose value is the upper limit for the event to be valid. For each event in the array, check to see if the value is less than the value of value-threshold
, say, 12, and add a property in the event indicating if it is valid or not. Finally, send the enriched event to a sink.
To develop the pipeline, we'll create a custom processor using familiar Spring Boot style called data-quality
, dockerize it and push to the Docker Hub. Register the processor with SCDF server using it's Docker URI. The pipeline can then be built with the SCDF DSL as shown below.
dataflow:>app register --type processor --name data-quality --uri docker:randhirkumars/data-quality:latest
dataflow:>stream create --name dataquality --definition "http --port=8086 | splitter --expression=#jsonPath(payload,'$.tags') --outputType=application/json | data-quality --value-threshold=12 --outputType=application/json | log"
and the corresponding graphical representation of the same data pipeline.
The data-quality processor sets a property to each item that indicates if it is valid or not. The code is shown below.
@EnableConfigurationProperties(SensorReadingProperties.class)
@EnableBinding(Processor.class)
@SpringBootApplication
public class DataQualityApplication {
public static void main(String[] args) {
SpringApplication.run(DataQualityApplication.class, args);
}
@Autowired
private SensorReadingProperties properties;
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public SensorReading SetValidity(SensorReading r) {
Double validThreshold = properties.getValueThreshold();
if (!isValid(r, String.valueOf(validThreshold))) {
r.setInvalid(true);
}
return r;
}
private boolean isValid(SensorReading r, String threshold) {
return Double.compare(Double.valueOf(r.getValue()), Double.valueOf(threshold)) > 0;
}
}
This is all fine with Spring Boot processor with a simple logic to check validity. What if the logic to determine validity has been comprehensively developed by analysts and it is available as a Python Docker image. For the sake of illustration, let the Python code be as is shown below.
import os
import json
try:
data = json.loads(os.environ["RECORD"])
threshold = float(os.environ["THRESHOLD"])
print "VALID" if float(data["value"]) < threshold else "INVALID"
except:
print "INVALID"
The item and the threshold parameters are passed as environment variables to the Docker image. The validation result is printed to the console. How can we include this Docker image in our data pipeline?
Generic Processor
To address this issue, we'll modify our Spring Boot processor to remove the validation logic written in Java. In its place, we run the Python Docker image and use its result in setting the validation property of the event. To run a Docker image in Java, we use Docker API for Java. For this to work, we'll have to use Docker-in-Docker (DinD) as the base image for our Spring Boot application. To dockerize our application, Google Jib Maven plugin is used. The plugin configuration in application's pom.xml
is shown below.
<plugin>
<groupId>com.google.cloud.tools</groupId>
<artifactId>jib-maven-plugin</artifactId>
<version>0.9.9</version>
<configuration>
<from>
<image>npalm/dind-java</image>
</from>
<to>
<image>data-quality</image>
</to>
</configuration>
</plugin>
The modified isValid
method is shown below.
private boolean isValid(SensorReading r, String threshold) {
Boolean valid = true;
String sensorReadingStr = r.toString();
DockerClient dockerClient = DockerClientBuilder.getInstance().build();
try {
dockerClient
.pullImageCmd("randhirkumars/python-docker")
.withTag("latest")
.exec(new PullImageResultCallback() {
@Override
public void onNext(PullResponseItem item) {
super.onNext(item);
System.out.println(item.getStatus());
}
}).awaitCompletion();
CreateContainerResponse createContainerResponse = dockerClient
.createContainerCmd("randhirkumars/python-docker")
.withEnv("RECORD=" + sensorReadingStr, "THRESHOLD=" + threshold)
.withBinds(new Bind("/var/run/docker.sock", new Volume("/var/run/docker.sock")))
.exec();
dockerClient
.startContainerCmd(createContainerResponse.getId())
.exec();
dockerClient
.waitContainerCmd(createContainerResponse.getId())
.exec(new WaitContainerResultCallback())
.awaitCompletion();
final List<Frame> loggingFrames = getLoggingFrames(dockerClient, createContainerResponse.getId());
for (final Frame frame : loggingFrames) {
if (frame.toString().indexOf("INVALID") > 0) {
valid = false;
}
}
} catch (Exception e) {
valid = false;
}
return valid;
}
private List<Frame> getLoggingFrames(DockerClient dockerClient, String containerId) throws Exception {
FrameReaderITestCallback collectFramesCallback = new FrameReaderITestCallback();
dockerClient.logContainerCmd(containerId).withStdOut(true).withStdErr(true)
.withTailAll()
.exec(collectFramesCallback).awaitCompletion();
return collectFramesCallback.frames;
}
public static class FrameReaderITestCallback extends LogContainerResultCallback {
public List<Frame> frames = new ArrayList<>();
@Override
public void onNext(Frame item) {
frames.add(item);
super.onNext(item);
}
}
To test the pipeline, find out the IP of the http
source Kubernetes Pod, and send an HTTP POST as shown below.
dataflow:>http post --target http://<ip-of-http-source-pod>:8080 --data "{\"tags\":[{\"tagId\":\"d6\",\"tagType\":0,\"timestamp\":1528927600678,\"value\":\"10.0\",\"quality\":3,\"datatype\":\"String\",\"additionalProperties\":{}}]}"
Check the logs of the log
sink.
dataflow:>! kubectl logs dataquality-log-65ccf64699-9dd7s
The logs should display the enriched sensor item reading with it's validating property set.
2018-09-10 11:44:19.163 INFO 1 --- [y.dataquality-1] log-sink
: {"tagId":"d6","tagType":0,"timestamp":1528927600678,"value":"10.0","quality":3,"datatype":"String","additionalProperties":{},"invalid":true}
Conclusion
In this article, we addressed the issue of including any Docker image while building data pipelines using Spring Cloud Data Flow which can natively interpret only Spring Boot images. We created a Java processor with Docker-in-Docker as the base image that allowed us to use Docker API for Java to use any Docker image built for our processing. The code is available on GitHub.
Opinions expressed by DZone contributors are their own.
Comments