Redis Streams in Action (Part 4)
In the final part, we walk through how to build a serverless Go app to monitor tweets that have been abandoned due to processing failure or consumer instance failure.
Join the DZone community and get the full member experience.
Join For FreeWelcome to this series of blog posts that covers Redis Streams with the help of a practical example. We will use a sample application to make Twitter data available for search and query in real-time. RediSearch and Redis Streams serve as the backbone of this solution that consists of several co-operating components, each of which will be covered in a dedicated blog post.
The code is available in this GitHub repo - https://github.com/abhirockzz/redis-streams-in-action
We will continue from where we left off in the previous blog post and see how to build a monitoring app to make the overall system more robust in the face of high load or failure scenarios. This is because very often, data processing applications either slow down (due to high data volumes) or may even crash/stop due to circumstances beyond our control. If this happens with our tweets processing application, the messages that were assigned to a specific instance will be left unprocessed. The monitoring component covered in this blog post checks pending tweets (using XPENDING
), claims (XCLAIM
), processes (store them as HASH
using HSET
) and finally acknowledges them (XACK
).
This is a Go application that will be deployed to Azure Functions — yes, we will be using a serverless model, wherein the monitoring system will be executed based on a pre-defined timer trigger. As always, we will first configure and deploy it to Azure, see it working, and finally, walk through the code.
Before we move on, here is some background about Go support in Azure Functions.
Serverless Go Apps on Azure, Thanks to Custom Handlers
Those who have worked with Azure Functions might recall that Go is not one of the language handlers that is supported by default. That's where custom handlers come into the picture.
In a nutshell, a custom handler is a lightweight web server that receives events from the Functions host. The only thing you need to implement a custom handler in your favorite runtime/language is HTTP support!
An event trigger (via HTTP, storage, event hubs, etc.) invokes the Functions host. The way custom handlers differ from traditional functions is that the Functions host acts as a middle man: it issues a request payload to the web server of the custom handler (the function) along with a payload that contains a trigger, input binding data, and other metadata for the function. The function returns a response back to the Functions host which passes data from the response to the function's output bindings for processing.
Here is a summary of how custom handlers work at a high level (the diagram below has been picked from the documentation).
Alright, let's move on to the practical bits now.
Pre-Requisites
Please make sure that you read parts two and three of this series and have the respective applications up and running. Our monitoring application will build on top of the tweets producer and processor services that you deploy.
You will need an Azure account (which you can get for free) and the Azure CLI. Make sure to download and install Go if you don't have it already and also install the Azure Functions Core Tools — this will allow you to deploy the function using a CLI (and also run it test and debug it locally).
The upcoming sections will guide you on how to deploy and configure the Azure Function.
Deploy the Monitoring Service to Azure Functions
To do this, you will:
- Create an Azure Functions app
- Configure it
- Deploy the Function to the app that you created
Start by creating a resource group to host all the compon`ents of the solution.
Search for Function App in the Azure portal and click add.
Enter the required details: you should select custom handler as the runtime stack.
In the Hosting section, choose Linux and Consumption (Serverless) for the operating system and plan type respectively.
Enable application insights (if you need to).
Review the final settings and click create to proceed.
Once the process is complete, the following resources will also be created along with the Function App:
- App Service plan (a Consumption/Serverless plan in this case)
- An Azure storage account
- An Azure Application Insights (function)
Update the Function App Configuration
Our function needs a few environment variables to work properly — these can be added as Function Configuration using the Azure portal. Here is the list:
- Redis connectivity details:
REDIS_HOST
— host and port for Redis instance, e.g.myredis:10000
REDIS_PASSWORD
— access key (password) for Redis instance
- Redis Stream info:
STREAM_NAME
— the name of the Redis Stream (usetweets_stream
as the value)STREAM_CONSUMER_GROUP_NAME
— name of the Redis Streams consumer group (useredisearch_app_group
as the value)
- Monitoring app metadata:
MONITORING_CONSUMER_NAME
— name of the consumer instance represented by the monitoring app (it is part of the aforementioned consumer group)MIN_IDLE_TIME_SEC
— only pending messages that are older than the specified time interval will be claimed
We're Now Ready to Deploy the Function
First, clone the GitHub repo and build the function:
git clone https://github.com/abhirockzz/redis-streams-in-action
cd redis-streams-in-action/monitoring-app
GOOS=linux go build -o processor_monitor cmd/main.go
GOOS=linux
is used to build alinux
executable since we chose alinux
OS for our Function App
To deploy, use the Azure Functions core tools CLI:
func azure functionapp publish <enter name of the Azure Function app>
Once completed, you should see the following logs:
Getting site publishing info...
Uploading package...
Uploading 3.71 MB [###############################################################################]
Upload completed successfully.
Deployment completed successfully.
Syncing triggers...
Functions in streams-monitor:
monitor - [timerTrigger]
You should see the function in the Azure portal as well:
The function is configured to trigger every 20 seconds ("schedule": "*/20 * * * * *"
), as per function.json:
{
"bindings": [
{
"type": "timerTrigger",
"direction": "in",
"name": "req",
"schedule": "*/20 * * * * *"
}
]
}
Monitoring the Monitoring App
As before, we can inspect the state of our system using redis-cli
. Execute the XPENDING command:
XPENDING tweets_stream redisearch_app_group
You will get an output similar to this (the numbers will differ in your case depending on how many tweets processor instances you were running and for how long):
1) (integer) 209
2) "1620973121009-0"
3) "1621054539960-0"
4) 1) 1) "consumer-1f20d41d-e63e-40d2-bc0f-749f11f15026"
2) "3"
2) 1) "monitoring_app"
2) "206"
As explained before, the monitoring app will claim pending messages which haven't been processed by the other consumers (active or inactive). In the output above, notice that the number of messages currently being processed by monitoring_app
(name of our consumer) is 206 — it actually claimed these from another consumer instance(s). Once these messages have been claimed, their ownership moves from their original consumer to the monitoring_app
consumer.
You can check the same using XPENDING tweets_stream redisearch_app_group
again, but it might be hard to detect since the messages actually get processed pretty quickly.
Out of the 206 messages that were claimed, only the ones that have not been processed in the last 10 seconds (this is the MIN_IDLE_TIME_SEC
we had specified) will be processed — others will be ignored and picked up in the next run by XPENDING
call (if they are still in an unprocessed state). This is because we want to give some time for our consumer application to finish their work — 10 seconds is a pretty generous time-frame for the processing that involves saving to HASH
using HSET
followed by XACK
.
Please note that the 10 second time interval used above has been used as an example and you should determine these figures based on the end-to-end latencies required for your data pipelines/processing.
You have complete flexibility in terms of how you want to run/operate such a "monitoring" component. I chose a serverless function but you could run it as a standalone program, as a scheduled cron job, or even as a Kubernetes job!
Don't forget to execute RediSearch
queries to validate that you can search for tweets based on multiple criteria:
FT.SEARCH tweets-index hello
FT.SEARCH tweets-index hello|world
FT.SEARCH tweets-index "@location:India"
FT.SEARCH tweets-index "@user:jo* @location:India"
FT.SEARCH tweets-index "@user:jo* | @location:India"
FT.SEARCH tweets-index "@hashtags:{cov*}"
FT.SEARCH tweets-index "@hashtags:{cov*|Med*}"
Now that we have seen things in action, let's explore the code.
Code Walkthrough
Please refer to the code on GitHub
The app uses the excellent Go-Redis client library. As usual, it all starts with connecting to Redis (note the usage of TLS
):
client := redis.NewClient(&redis.Options{Addr: host, Password: password, TLSConfig: &tls.Config{MinVersion: tls.VersionTLS12}})
err = client.Ping(context.Background()).Err()
if err != nil {
log.Fatal(err)
}
Then comes the part where the bulk of the processing happens — think of it as a workflow with sub-parts.
We call XPENDING
to detect the number of pending messages, e.g. XPENDING tweets_stream group1
numPendingMessages := client.XPending(context.Background(), streamName, consumerGroupName).Val().Count
To get the pending messages, we invoke a different variant of XPENDING
, to which we pass on the number of messages we obtained in the previous call.
xpendingResult := client.XPendingExt(context.Background(), &redis.XPendingExtArgs{Stream: streamName,Group: consumerGroupName, Start: "-", End: "+", Count: numPendingMessages})
We can now claim the pending messages — the ownership of these will be the changes from the previous consumer to the new consumer (monitoringConsumerName
) whose name we specified.
xclaim := client.XClaim(context.Background(), &redis.XClaimArgs{Stream: streamName, Group: consumerGroupName, Consumer: monitoringConsumerName, MinIdle: time.Duration(minIdleTimeSec) * time.Second, Messages: toBeClaimed})
Once the ownership is transferred, we can process them. This involves, adding tweet info to HASH
(using HSET
) and acknowledging successful processing (XACK
). goroutine
s are used to keep things efficient, for example, if we get 100 claimed messages in a batch, a scatter-gather process is followed where a goroutine
is spawned to process each of these messages. A WaitGroup
is used to "wait" for the current batch to complete before looking for the next set of pending messages (if any).
for _, claimed := range xclaim.Val() {
if exitSignalled {
return
}
waitGroup.Add(1)
go func(tweetFromStream redis.XMessage) {
hashName := fmt.Sprintf("%s%s", indexDefinitionHashPrefix, tweetFromStream.Values["id"])
processed := false
defer func() {
waitGroup.Done()
}()
err = client.HSet(context.Background(), hashName, claimed.Values).Err()
if err != nil {
return // don't proceed (ACK) if HSET fails
}
err = client.XAck(context.Background(), streamName, consumerGroupName, tweetFromStream.ID).Err()
if err != nil {
return
}
processed = true
}(claimed)
}
waitGroup.Wait()
Before we dive into the other areas, it might help to understand the nitty-gritty by exploring the code (which is relatively simple, by the way).
A Quick Note on the Application Structure
Here is how the app is set up (folder structure):
.
├── cmd
│ └── main.go
├── monitor
│ └── function.json
├── go.mod
├── go.sum
├── host.json
host.json
tells the Functions host where to send requests by pointing to a web server capable of processing HTTP events. Notice the customHandler.description.defaultExecutablePath
which defines that processor_monitor
is the name of the executable that'll be used to run the web server.
{
"version": "2.0",
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[1.*, 2.0.0)"
},
"customHandler": {
"description": {
"defaultExecutablePath": "processor_monitor"
},
"enableForwardingHttpRequest": true
},
"logging": {
"logLevel": {
"default": "Trace"
}
}
}
That's a Wrap!
This brings us to the end of this blog series. Let's recap what we learned:
- In the first part, you got an overview of the use case, architecture, its components, along with an introduction to Redis Streams and
RediSearch
. It set up the scene for the rest of the series. - Part two dealt with the specifics of the Rust-based tweets consumer app that consumed from the Twitter Streaming API and queued up the tweets in Redis Streams for further processing.
- The third part was all about the Java app that processed these tweets by leveraging the Redis Streams Consumer Group feature and scaling out processing across multiple instances.
- And the final part (this one) was all about the Go app to monitor tweets that have been abandoned (in the pending entry list) either due to processing failure or consumer instance failure.
I hope you found this useful and apply it to building scalable solutions with Redis Streams. Happy coding!
Published at DZone with permission of Abhishek Gupta, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments