Data Flow Pipeline Using StreamSets
Learn about configuring JDBC Query Consumer, performing JDBC lookup with multiple tables, creating a data flow pipeline, and monitoring the stage and pipeline stats.
Join the DZone community and get the full member experience.
Join For FreeStreamSets Data Collector — an open-source, lightweight, powerful engine — is used to stream data in real time. It is a continuous big data ingest and enterprise-grade infrastructure used to route and process data in your data streams. It accelerates time to analysis by bringing unique transparency and processing to data in motion.
In this blog, let's discuss generating a data flow pipeline using StreamSets.
Prerequisites
- Install Java 1.8
- Install streamsets-datacollector-2.5.1.1
Use Case
Generating a data flow pipeline using StreamSets via JDBC connections.
What we need to do:
- Install StreamSets data collector.
- Create JDBC origin.
- Create JDBC lookup.
- Create a data flow pipeline.
- View pipeline and stage statistics.
Installing StreamSets Data Collector
As core software is developed in Java, the web interface is developed in JavaScript/Angular JS, D3.js, HTML, and CSS.
To install StreamSets, perform the following:
Installing Java
To install Java, use the below command:
sudo apt-add-repository ppa:webupd8team/java
sudo apt-get update
sudo apt-get install oracle-java8-installer
Use command whereis java
to check Java location.
Ensure that the JAVA_HOME
variable is set to:
/usr/lib/jvm/java-8-oracle
To set JAVA_HOME
, use the below command:
export JAVA_HOME=/usr/lib/jvm/java-8-oracle
Installing StreamSets (From Tarball)
To install StreamSets, perform the following:
Create a directory as follows:
mkdir /home/streamsets
Download Data Collector RPM package from the StreamSets website. Then, extract the TAR file using the below command:
tar -xzf streamsets-datacollector-core-2.5.1.1.tgz.
Create a system user and group named sdc
using the below commands:
sudo addgroup sdc
sudo adduser --ingroup sdc
Create the /etc/init.d
directory (in root) using the below command:
# mkdir /etc/init.d
Copy /home/streamsets/streamsets-datacollector-2.5.1.1/initd/_sdcinitd_prototype
to /etc/init.d
directory and change ownership of the file to sdc
using the below commands:
cp /home/streamsets/streamsets-datacollector-2.5.1.1/initd/_sdcinitd_prototype /etc/init.d/sdc
chown sdc:sdc /etc/init.d/sdc
Edit /etc/init.d/sdc
file and set $SDC_DIST
and $SDC_HOME
environment variables to the location from where tarball is extracted using the below commands:
export SDC_DIST="/home/ubuntu/streamsets/streamsets-datacollector-2.5.1.1/"
export SDC_HOME="/home/ubuntu/streamsets/streamsets-datacollector-2.5.1.1/"
Make the sdc
file executable using the below command:
chmod 755 /etc/init.d/sdc
Create the Data Collector configuration directory at /etc/sdc
(in root) using the below command:
# mkdir /etc/sdc
Copy all files from etc
into the Data Collector configuration directory that you just created and extracted the tarball using the below command:
cp -R etc/ /etc/sdc
Change the ownership of the /etc/sdc
directory and all files in the directory to sdc:sdc
using the below command:
chown -R sdc:sdc /etc/sdc
Provide ownership only permission for the form-realm.properties
file in the /etc/sdc
directory using the below command:
chmod go-rwx /etc/sdc/form-realm.properties
Create the Data Collector log directory at /var/log/sdc
and change the ownership to sdc:sdc
using the below commands:
mkdir /var/log/sdc
chown sdc:sdc /var/log/sdc
Create a Data Collector data directory at the path /var/lib/sdc
and change the ownership to sdc:sdc
using the below commands (in root):
mkdir /var/lib/sdc
chown sdc:sdc /var/lib/sdc
Create the Data Collector resources directory at /var/lib/sdc-resources
and change the ownership tosdc:sdc
using the below commands:
mkdir /var/lib/sdc-resources
chown sdc:sdc /var/lib/sdc-resources
Start Data Collector as a service using the below command:
service sdc start
Note: Upon getting the error called “sdc is dead” check the configured limit for the current user using the below command:
ulimit -n
Set the session limit using the below command:
ulimit -u unlimited
Access the Data Collector console by entering the following URL in the address bar of the browser: http://<system-ip>:18630/.
Note: The default username is “admin” and password is “admin”.
Creating JDBC Origin
To create a JDBC origin, perform the following steps:
- Click Create New Pipeline to create a pipeline.
- Add Title for the pipeline.
Note: In this analysis, the origin “JDBC Query consumer” is used.
Download the JDBC origin Package Manager as shown in the below diagram:
Note:You can also import the package manually using the below command:
/home/streamsets/streamsets-datacollector-2.5.1.1/bin/streamsets stagelibs -install=streamsets-datacollector-jdbc-lib
Add configurations to the JDBC Query Consumer origin.
Uncheck Incremental mode in Configuration to avoid the default Query consumer search for the where
and order by
clauses to execute the query as shown in the below diagram:
Add where
and order by
clauses using the Offset value.
Click Validate to check the connection.
Note: If you are unable to connect to JDBC Query Consumer, move mysql-connector-java-5.1.27-bin.jar
to the below path:
/home/streamsets/streamsets-datacollector-2.5.1.1/streamsets-libs/streamsets-datacollector-jdbc-lib/lib
Creating JDBC Lookup
To create JDBC lookup, lookup columns are required from source and lookup table. For example, use the applicantId
field in the applicant
(source) table to look up the applicantId
column in the applicant
(lookup) table using the below query:
SELECT *
FROM application
WHERE applicantid = '${record:value('/applicantid')}'
The query uses the value of an applicantId
column from the applicant
(source) table. In this example, three tables are used for lookup.
The result of the above JDBC lookup is given as an input to next lookup table loan_raw
by using the below query:
SELECT *
FROM loan_raw
WHERE applicationid = '${record:value('/applicationid')}'
Creating Dataflow Pipeline
Different processors” are used for creating the data flow pipeline.
Field Remover
It discards unnecessary fields in the pipeline.
Expression Evaluator
It performs calculations and writes the results to new or existing fields. It is also used to add or modify record header attributes and field attributes.
Stream Selector
It passes data to streams based on conditions and uses a default stream to pass records unmatched with user-defined conditions. You can also define a condition for each stream of data.
Local FS is used to store the resultant data.
The full data flow pipeline is as follows:
Viewing Pipeline and Stage Statistics
A pipeline can be monitored while running it. Real-time summary and error statistics can be viewed for the pipeline and for the stages in the pipeline. By default, the Data Collector console displays pipeline monitoring information while running the pipeline. Any stage can be selected to view its statistics. Similarly, error information for the pipeline and its stages can be viewed.
Previewing Dataflow Pipeline
In the Data Collector pipeline, upon clicking Preview, input and output data can be seen in each level.
Viewing Pipeline States
Pipeline state is defined as the current condition such as "running" or "stopped" of the pipeline. The pipeline state is displayed in the All Pipelines list and Data Collector log.
Viewing Pipeline Statistics
Record count, record and batch throughput, batch processing statistics, and heap memory usage are displayed for the pipeline as shown in the below diagram:
Values of the parameters currently used by the pipeline are displayed for a pipeline started with runtime parameters as shown in the below diagram:
Viewing Stage Statistics
Record and batch throughput, batch processing statistics, and heap memory usage are displayed for a stage as shown in the below diagram:
Conclusion
In this blog, we discussed configuring JDBC Query Consumer, performing JDBC lookup with more than one table, creating a data flow pipeline, and monitoring the stage statistics and pipeline statistics.
Published at DZone with permission of Rathnadevi Manivannan. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments