Consuming Twitter Streaming API With Spring Integration
Let's take a look at consuming a Twitter streaming API with Spring Integration. Also explore some alternatives.
Join the DZone community and get the full member experience.
Join For FreeOverview
Spring Integration has been known to have a myriad of connectors for interacting with external systems. Twitter was no exception and for a long time, as Spring Social had been an out-of-the-box solution which Spring Integration leveraged in order to connect to social networks.
Spring Social EOL
Unfortunately, Spring Social has reached its end of life, the project is now in maintenance mode. The reason why the Spring Team decided that they wouldn't develop Spring Social further was that it became tedious to keep the API bindings in sync with the APIs of social networks.
Other than that, after Spring Framework 5 had been released, developers wished to leverage its reactive programming model and that would have required the team to re-implement a reactive Spring Social bindings next to the existing one.
Developers are now advised to either implement their own binding or use one of the purpose-built libraries to connect to social networks.Spring Integration's Twitter Module Moved to Extensions
The fact the Spring Social is now in maintenance mode forced the Spring Integration team to move the Twitter support module from the main project to the extensions. As Spring Social isn't going to receive updates, it's going to be built upon an earlier Spring Framework version. That would lead to class path conflict and would also hamper the development of Spring Integration.
Therefore, as of Spring Integration 5.1, the Twitter module is available as an extension.What Are the Alternatives?
Twitter4J is an unofficial Java libary for Twitter's API developed and maintained by Yusuke Yamamoto. The official HBC library (built by Twitter) is a Java HTTP Client for consuming Twitter's Streaming API. The latter hasn't seen major updates since 2016, while Twitter4J is receiving regular updates.
Implementing your own API binding is also an option. In Spring based projects using RestTemplate and is definitely an option and it's an easy way to make REST calls.
This guide uses Twitter4J in streaming mode in a way that can be integrated into a Spring Integration message flow.How Does Twitter Streaming Work?
In a nutshell, your app opens a single connection to to Twitter's API and new results are sent through that connection whenever new matches occur. In contrast, the other way around is delivering data in batches through repeated requests to a REST API.
Streaming provides a low-latency delivery mechanism that can support very high throughput without having to deal with rate limiting.
Example Project
The example project, which demonstrates the integration of Twitter's Streaming API into a Spring Integration message flow, is available on GitHub: https://github.com/springuni/springuni-examples/tree/master/spring-integration/twitter-streaming.
Maven Dependencies
As Spring Social is EOL now, we won't build upon it. All we pull in are spring-integration-core and twitter4j-stream.
<dependencies>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>4.0.1</version>
</dependency>
</dependencies>
This project also uses Lombok and Spring Boot testing support, but these are optional.
Listenable Message Source With Spring Integration
Spring Integration provides support for implementing inbound message components. They're divided into polling and listening behaviors.
The original Inbound Twitter Channel Adapter, that one which relies builds upon Spring Social and is now moved to the extensions, is a polling consumer. That is, you have to provide a poller configuration to use it. On the other hand, Twitter enforces Rate Limits in order to manage how often application can fetch updates. You should have taken Rate Limiting into consideration when the old Twitter Channel adapter was used, so that your configured poller intervals have been in compliance with the Twitter policies.
On the other hand, the listening inbound components are simpler and typically require only MessageProducerSupport to be implemented. Such a listening component looks like this.
public class MyMessageProducer extends MessageProducerSupport {
public MyMessageProducer(MessageChannel outputChannel) {
// Defining an output channel is required
setOutputChannel(outputChannel);
}
@Override
protected void onInit() {
super.onInit();
// Custom initialization - if applicable - comes here
}
@Override
public void doStart() {
// Lifecycle method for starting receiving messages
}
@Override
public void doStop() {
// Lifecycle method for stopping receiving messages
}
private void receiveMessage() {
// Receive data from upstream service
SomeData data = ...;
// Convert it to a message as appropriate and send it out
this.sendMessage(MessageBuilder.withPayload(data).build());
}
}
There Are Only Two Required Elements:
- Output message channel has to be defined
sendMessage
has to be called whenever the component receives a message
Optionally you might want to take control over the component's initialization and manage its lifecycle.
As Twitter's Streaming API is inherently message-driven, the listening behavior is a natural fit. Let's see how Twitter4J can be incorporated in such a context.
Connect to Twitter Streaming API With Twitter4J
Twitter4J manages the nuances of connection handing and receiving updates from Twitter's Streaming API. All we need to do is acquire a TwitterStream
instance, attach a listener and define filtering.
Instantiate TwitterStream
Streaming examples on Twitter4J's website suggest that a TwitterStream
instance should be created through TwitterStreamFactory
. That makes perfectly sense, however in a Spring application context we want it to be a managed bean.
FactoryBean
facility is clean and easy way to contain the details of making a singleton
TwitterStream
instance.
public class TwitterStreamFactory extends AbstractFactoryBean<TwitterStream> {
@Override
public Class<?> getObjectType() {
return TwitterStream.class;
}
@Override
protected TwitterStream createInstance() {
return new twitter4j.TwitterStreamFactory().getInstance();
}
@Override
protected void destroyInstance(TwitterStream twitterStream) {
twitterStream.shutdown();
}
}
Although we could also expose it as a regular bean without being created by a FactoryBean
, that wouldn't take care of properly shutting it down.
Attaching a Listener and Defining Filtering
That's going to be the responsibility of our custom MessageProducer
implementation.
@Slf4j
public class TwitterMessageProducer extends MessageProducerSupport {
private final TwitterStream twitterStream;
private List<Long> follows;
private List<String> terms;
private StatusListener statusListener;
private FilterQuery filterQuery;
public TwitterMessageProducer(TwitterStream twitterStream, MessageChannel outputChannel) {
this.twitterStream = twitterStream;
setOutputChannel(outputChannel);
}
@Override
protected void onInit() {
super.onInit();
statusListener = new StatusListener();
long[] followsArray = null;
if (!CollectionUtils.isEmpty(follows)) {
followsArray = new long[follows.size()];
for (int i = 0; i < follows.size(); i++) {
followsArray[i] = follows.get(i);
}
}
String[] termsArray = null;
if (!CollectionUtils.isEmpty(terms)) {
termsArray = terms.toArray(new String[0]);
}
filterQuery = new FilterQuery(0, followsArray, termsArray);
}
@Override
public void doStart() {
twitterStream.addListener(statusListener);
twitterStream.filter(filterQuery);
}
@Override
public void doStop() {
twitterStream.cleanUp();
twitterStream.clearListeners();
}
public void setFollows(List<Long> follows) {
this.follows = follows;
}
public void setTerms(List<String> terms) {
this.terms = terms;
}
StatusListener getStatusListener() {
return statusListener;
}
FilterQuery getFilterQuery() {
return filterQuery;
}
class StatusListener extends StatusAdapter {
@Override
public void onStatus(Status status) {
sendMessage(MessageBuilder.withPayload(status).build());
}
@Override
public void onException(Exception ex) {
log.error(ex.getMessage(), ex);
}
@Override
public void onStallWarning(StallWarning warning) {
log.warn(warning.toString());
}
}
}
Lifecycle methods provided by MessageProducerSupport
and TwitterStream
's management interface play nicely together. That's also going to enable us to stop and start the component at runtime when needed.
Java Configuration
Although Spring could auto-wire components, I still prefer controlling dependencies with manual configuration.
@Slf4j
@Configuration
public class TwitterConfig {
@Bean
TwitterStreamFactory twitterStreamFactory() {
return new TwitterStreamFactory();
}
@Bean
TwitterStream twitterStream(TwitterStreamFactory twitterStreamFactory) {
return twitterStreamFactory.getInstance();
}
@Bean
MessageChannel outputChannel() {
return MessageChannels.direct().get();
}
@Bean
TwitterMessageProducer twitterMessageProducer(
TwitterStream twitterStream, MessageChannel outputChannel) {
TwitterMessageProducer twitterMessageProducer =
new TwitterMessageProducer(twitterStream, outputChannel);
twitterMessageProducer.setTerms(Arrays.asList("java", "microservices", "spring"));
return twitterMessageProducer;
}
@Bean
IntegrationFlow twitterFlow(MessageChannel outputChannel) {
return IntegrationFlows.from(outputChannel)
.transform(Status::getText)
.handle(m -> log.info(m.getPayload().toString()))
.get();
}
}
Important part here is that how our custom message producer integrates with a message flow. Basically, we don't need to do anything, other than listing to messages at the producer's output channel.
Testing
Only Chuck Norris tests code in production. However, ordinary mortal folks like you and me, we do write test cases.
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = TestConfig.class)
public class TwitterMessageProducerTest {
@MockBean
private TwitterStream twitterStream;
@Autowired
private PollableChannel outputChannel;
@Autowired
private TwitterMessageProducer twitterMessageProducer;
@Test
public void shouldBeInitialized() {
StatusListener statusListener = twitterMessageProducer.getStatusListener();
verify(twitterStream).addListener(statusListener);
FilterQuery filterQuery = twitterMessageProducer.getFilterQuery();
verify(twitterStream).filter(filterQuery);
}
@Test
public void shouldReceiveStatus() {
StatusListener statusListener = twitterMessageProducer.getStatusListener();
Status status = mock(Status.class);
statusListener.onStatus(status);
Message<?> statusMessage = outputChannel.receive();
assertSame(status, statusMessage.getPayload());
}
@Import(TwitterConfig.class)
static class TestConfig {
@Bean
MessageChannel outputChannel() {
return MessageChannels.queue(1).get();
}
}
}
I like Twitter4J's design, because it leverages interfaces. Most of the important parts of the library are exposed as ordinary interfaces. TwitterStream
is no exception to that. That is, it can be mocked out easily in test cases.
Conclusion
- Spring Social is EoL now — it's not going to receive new features
- Spring Integration's Twitter module is available as an extension — it's been moved out from the main project.
- Twitter Inbound Channel adapter is a polling consumer — you have to deal with rate limiting when choosing your poll interval
- Twitter's Streaming API fits with the listening behavior of an inbound channel adapter
Published at DZone with permission of Laszlo Csontos, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments