Reading AWS S3 File Content to Kafka Topic
How to Read S3 files to Kafka topic using CamelAWSS3SourceConnector.
Join the DZone community and get the full member experience.
Join For FreeApache Camel
Apache Camel is an open-source framework for message-oriented middleware with a rule-based routing and mediation engine that provides a Java object-based implementation of the Enterprise Integration Patterns using an application programming interface to configure routing and mediation rules
Red Hat AMQ Streams
Red Hat AMQ Streams is a massively-scalable, distributed, and high-performance data streaming platform based on the Apache ZooKeeper and Apache Kafka projects.
The main components comprise:
- Kafka Broker
-
Messaging broker responsible for delivering records from producing clients to consuming clients.
Apache ZooKeeper is a core dependency for Kafka, providing a cluster coordination service for highly reliable distributed coordination.
AMQ Streams architecture
camel-kafka-connector
Camel Kafka Connector allows you to use all Camel components as Kafka Connect connectors.
This is a "Camel Kafka connector adapter" that aims to provide a user friendly way to use all Apache Camel components in Kafka Connect.
Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka.For more information about Kafka Connect take a look here.
Prerequisites
For this demonstration, you will need the following technologies set up in your development environment:
- Apache Maven 3.6.3+
- JDK 11 Installed
- kafka cluster
- AWS Account set up and Files available in S3 bucket.
In this article, we demonstrate how to read files from S3 buckets and write to kafka Topic using
CamelAWSS3SourceConnector
Prepare the Needed Bits to Develop the Example
setup the plugin.path property in your kafka
Open the $KAFKA_HOME/config/connect-standalone.properties
and set the plugin.path property to your chosen location
xxxxxxxxxx
plugin.path=/home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/camel-kafka-connectors/
In this example we’ll use /home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/camel-kafka-connectors
Download 'camel-aws-s3-kafka-connector'
wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-aws-s3-kafka-connector/0.4.0/camel-aws-s3-kafka-connector-0.4.0-package.zip
unzip camel-aws-s3-kafka-connector-0.4.0-package.zip
Configure properties in file CamelAWSS3SourceConnector.properties
xxxxxxxxxx
name=CamelAWSS3SourceConnector
connector.class=org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter
camel.source.maxPollDuration=10000
topics=mytopic
camel.component.aws-s3.access-key=xxxxx
camel.component.aws-s3.secret-key=yyyyyy
camel.component.aws-s3.region=US_EAST_2
camel.source.path.bucketNameOrArn=arn:aws:s3:::kkakarla-test-kafka-connector
camel.source.endpoint.autocloseBody=true
Start zookeeper and kafka server
xxxxxxxxxx
./bin/zookeeper-server-start.sh config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties
Create mytopic
xxxxxxxxxx
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
Now start the application
xxxxxxxxxx
./bin/connect-standalone.sh /home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/kafka_2.12-2.5.0.redhat-00003/config/connect-standalone.properties /home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/camel-kafka-connector-examples/examples/aws-s3/CamelAWSS3SourceConnector.properties
Now upload any file to the S3 bucket 'kkakarla-test-kafka-connector'
logs
xxxxxxxxxx
2020-09-01 12:43:17,149] INFO Kafka version: 2.5.0.redhat-00003 (org.apache.kafka.common.utils.AppInfoParser:117)
[2020-09-01 12:43:17,149] INFO Kafka commitId: f960e3745ec74111 (org.apache.kafka.common.utils.AppInfoParser:118)
[2020-09-01 12:43:17,149] INFO Kafka startTimeMs: 1598944397149 (org.apache.kafka.common.utils.AppInfoParser:119)
[2020-09-01 12:43:17,156] INFO Starting CamelSourceTask connector task (org.apache.camel.kafkaconnector.CamelSourceTask:77)
[2020-09-01 12:43:17,156] INFO [Producer clientId=connector-producer-CamelAWSS3SourceConnector-0] Cluster ID: jmyQzmm2QUe12p8is0zNAQ (org.apache.kafka.clients.Metadata:280)
[2020-09-01 12:43:17,156] INFO Created connector CamelAWSS3SourceConnector (org.apache.kafka.connect.cli.ConnectStandalone:112)
[2020-09-01 12:43:17,157] INFO CamelAwss3SourceConnectorConfig values:
camel.component.aws-s3.accelerateModeEnabled = false
camel.component.aws-s3.accessKey = null
camel.component.aws-s3.amazonS3Client = null
camel.component.aws-s3.autoCreateBucket = true
camel.component.aws-s3.autocloseBody = true
camel.component.aws-s3.basicPropertyBinding = false
camel.component.aws-s3.bridgeErrorHandler = false
camel.component.aws-s3.chunkedEncodingDisabled = false
camel.component.aws-s3.configuration = null
camel.component.aws-s3.deleteAfterRead = true
camel.component.aws-s3.delimiter = null
camel.component.aws-s3.dualstackEnabled = false
camel.component.aws-s3.encryptionMaterials = null
camel.component.aws-s3.endpointConfiguration = null
camel.component.aws-s3.fileName = null
camel.component.aws-s3.forceGlobalBucketAccessEnabled = false
camel.component.aws-s3.includeBody = true
camel.component.aws-s3.pathStyleAccess = false
camel.component.aws-s3.payloadSigningEnabled = false
camel.component.aws-s3.policy = null
camel.component.aws-s3.prefix = null
camel.component.aws-s3.proxyHost = null
camel.component.aws-s3.proxyPort = null
camel.component.aws-s3.proxyProtocol = HTTPS
camel.component.aws-s3.region = US_EAST_2
camel.component.aws-s3.secretKey = null
camel.component.aws-s3.useEncryption = false
camel.component.aws-s3.useIAMCredentials = false
camel.source.camelMessageHeaderKey = null
camel.source.component = aws-s3
camel.source.contentLogLevel = OFF
camel.source.endpoint.accelerateModeEnabled = false
camel.source.endpoint.accessKey = null
camel.source.endpoint.amazonS3Client = null
camel.source.endpoint.autoCreateBucket = true
camel.source.endpoint.autocloseBody = true
camel.source.endpoint.backoffErrorThreshold = null
camel.source.endpoint.backoffIdleThreshold = null
camel.source.endpoint.backoffMultiplier = null
camel.source.endpoint.basicPropertyBinding = false
camel.source.endpoint.bridgeErrorHandler = false
camel.source.endpoint.chunkedEncodingDisabled = false
camel.source.endpoint.delay = 500
camel.source.endpoint.deleteAfterRead = true
camel.source.endpoint.delimiter = null
camel.source.endpoint.dualstackEnabled = false
camel.source.endpoint.encryptionMaterials = null
camel.source.endpoint.endpointConfiguration = null
camel.source.endpoint.exceptionHandler = null
camel.source.endpoint.exchangePattern = null
camel.source.endpoint.fileName = null
camel.source.endpoint.forceGlobalBucketAccessEnabled = false
camel.source.endpoint.greedy = false
camel.source.endpoint.includeBody = true
camel.source.endpoint.initialDelay = 1000
camel.source.endpoint.maxConnections = 60
camel.source.endpoint.maxMessagesPerPoll = 10
camel.source.endpoint.pathStyleAccess = false
camel.source.endpoint.payloadSigningEnabled = false
camel.source.endpoint.policy = null
camel.source.endpoint.pollStrategy = null
camel.source.endpoint.prefix = null
camel.source.endpoint.proxyHost = null
camel.source.endpoint.proxyPort = null
camel.source.endpoint.proxyProtocol = HTTPS
camel.source.endpoint.region = null
camel.source.endpoint.repeatCount = 0
camel.source.endpoint.runLoggingLevel = TRACE
camel.source.endpoint.scheduledExecutorService = null
camel.source.endpoint.scheduler = none
camel.source.endpoint.schedulerProperties = null
camel.source.endpoint.secretKey = null
camel.source.endpoint.sendEmptyMessageWhenIdle = false
camel.source.endpoint.startScheduler = true
camel.source.endpoint.synchronous = false
camel.source.endpoint.timeUnit = MILLISECONDS
camel.source.endpoint.useEncryption = false
camel.source.endpoint.useFixedDelay = true
camel.source.endpoint.useIAMCredentials = false
camel.source.marshal = null
camel.source.maxBatchPollSize = 1000
camel.source.maxPollDuration = 10000
camel.source.path.bucketNameOrArn = arn:aws:s3:::kkakarla-test-kafka-connector
camel.source.pollingConsumerBlockTimeout = 0
camel.source.pollingConsumerBlockWhenFull = true
camel.source.pollingConsumerQueueSize = 1000
camel.source.unmarshal = null
camel.source.url = null
topics = mytopic
(org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnectorConfig:347)
[2020-09-01 12:43:17,239] INFO Setting initial properties in Camel context: [{connector.class=org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector, camel.source.endpoint.autocloseBody=true, camel.source.maxPollDuration=10000, topics=mytopic, camel.component.aws-s3.region=US_EAST_2, camel.source.component=aws-s3, task.class=org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceTask, camel.source.path.bucketNameOrArn=arn:aws:s3:::kkakarla-test-kafka-connector, camel.component.aws-s3.access-key=xxxxxxxx, name=CamelAWSS3SourceConnector, value.converter=org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter, camel.component.aws-s3.secret-key=xxxxxxxxxxxx, key.converter=org.apache.kafka.connect.storage.StringConverter}] (org.apache.camel.kafkaconnector.utils.CamelMainSupport:91)
[2020-09-01 12:43:17,244] INFO Using properties from: classpath:application.properties;optional=true (org.apache.camel.main.BaseMainSupport:463)
[2020-09-01 12:43:17,271] INFO No additional Camel XML routes discovered from: classpath:camel/*.xml (org.apache.camel.main.DefaultRoutesCollector:126)
[2020-09-01 12:43:17,272] INFO No additional Camel XML rests discovered from: classpath:camel-rest/*.xml (org.apache.camel.main.DefaultRoutesCollector:162)
[2020-09-01 12:43:17,285] INFO Creating Camel route from({}) (org.apache.camel.kafkaconnector.utils.CamelMainSupport:102)
[2020-09-01 12:43:17,285] INFO .to(direct:end?pollingConsumerQueueSize=1000&pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true) (org.apache.camel.kafkaconnector.utils.CamelMainSupport:130)
[2020-09-01 12:43:17,299] INFO Starting CamelContext (org.apache.camel.kafkaconnector.utils.CamelMainSupport:138)
[2020-09-01 12:43:17,360] INFO Apache Camel 3.4.2 (camel-1) is starting (org.apache.camel.impl.engine.AbstractCamelContext:2630)
[2020-09-01 12:43:17,361] INFO StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html (org.apache.camel.impl.engine.AbstractCamelContext:2773)
[2020-09-01 12:43:19,848] INFO Route: route1 started and consuming from: aws-s3://arn:aws:s3:::kkakarla-test-kafka-connector (org.apache.camel.impl.engine.InternalRouteStartupManager:158)
The content of the file will be written to kafka topic mytopic
xxxxxxxxxx
[kkakarla kafka_2.12-2.5.0.redhat-00003]$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic mytopic
hi hello how are you
For complete properties please refer CAMEL-AWS-S3-KAFKA-CONNECTOR SOURCE CONFIGURATION
Summary
CamelKafkaConnectors helps those who do not want to write the code to read and write from external systems to kafka
Opinions expressed by DZone contributors are their own.
Comments