Using the Stream Real-Time Firehose With AWS SQS, Lambda, and SNS
In this tutorial, we will discuss how to use AWS and to respond to feed updates and create a real-time data stream for your app.
Join the DZone community and get the full member experience.
Join For FreeStream enables you to listen to fee changes in near real-time using SQS, webhooks, or websockets. In this tutorial, we will discuss how to use AWS and to respond to feed updates.
While websockets are the preferred method to listen to changes, SQS notifications have a special spot in the feed infrastructure. Stream really shines in its ability to provide real-time because you can interact with the results that you receive. For example, you can listen to for changes to the feed and respond with, for example, an email to a user enticing them to pull the trigger on a purchase, send an SMS message based on an action taken within your application, among hundreds of other scenarios.
The goal of this post is for you to walk away with a strong understanding of AWS SQS, Lambda, and, of course, Stream.
Note: To ensure that you can follow along from start to finish, please create an account or make sure that you have the necessary permissions to use the services mentioned above — we won't be discussing permissions within AWS.Setting Up SQS
Setting up AWS SQS is rather straightforward. Once signed in, head on over to the SQS page. From there, we'll have the option to create a new queue. Click the "Get Started Now" button.
Next, you will be presented with a settings page for your new queue. For the name, go ahead and set it to "STREAM." Then, choose the option on the left (the "Standard Queue").
Once created, we'll need to specify permissions for the newly created STREAM queue. For this tutorial, we're going to open it up to the world.
Note: Never do this - I'm only doing this for example purposes. Always keep your services locked down and only let in applications and users that require access.Click "Add Permission" and you should be set!
Setting Up Stream for Use With SQS
Before we get started, we'll need to create a user for Stream. I prefer to create a separate user so that I can change permissions; however, you're free to use an existing key and secret if you'd like (just skip the first part of this section if that is the case).
Create a New IAM User and Role
The first step in this process is to head over to the IAM section of AWS. Once you are there, click "Users" and then "Add User."
For the "User name" enter "STREAM" and for the "Access type" select "Programmatic access." Click the next button to continue to the next step.
In the above image, you'll notice that we selected AmazonSQSFullAccess as our Policy name. This will ensure that the user you are creating has access to SQS. You will need to do the same, in addition to setting the "Group name" to "STREAM."
Note: You will need to search for this policy. To do so, simply drop in SQS in the search box.With the "STREAM" role created, continue on to the review stage.
Last, go ahead and create a user!
Copy the "Access key ID" and "Secret access key" — you'll need them in the next step.
Hook it Up to Stream
Now, let's go ahead and drop our API Key and Secret into the Stream dashboard so that the Stream backend knows where to route outbound messages.
- Create a flat feed with the feed name of "user" (be sure to enable notifications).
- Click on the "user" feed.
- Enable SQS notifications by clicking the button labeled "Active."
- Fill out the SQS URL, AWS Key, and AWS Secret.
- Click the save button in the top right-hand corner.
Configuring Lambda
AWS Lambda will allow us to run code without provisioning or managing servers. This service is going to come in handy for decoding the base64 encoded string that is sent to SQS as the payload. Once the message is decoded, you can do anything - send an SMS, email, etc.
Let's kick things off by setting up Lambda. First things first, you'll want to head over to the Lambda section of AWS (this can easily be found with the search bar). If you're a new user, go through the short two minute tutorial. If you've done this before, jump right in.
The first step will be to create a new Lambda function as shown in the screenshot below. To copy my configuration, you'll want to do the following:
- Select the "Author from scratch" box.
- Name your function "STREAM."
- Update the Node.js runtime to Node.js v8.10 (outdated, but it is the latest version that is available on Lambda).
- Specify your "Role" name as "STREAM."
- Include "SQS Poller" permissions.
Once you've done that, click "Create Function" on the lower right-hand side of the screen.
All set! Let's continue to the next step. If successful, you should see a screen that looks nearly identical to this:
To allow the Lambda function to access SQS, there is are some minor configurations that need to be made. Luckily, this step only requires a couple of clicks.
- Under the "Designer" section, click "SQS" in the scrollable section.
- In the "Configure triggers" section, search for your SQS queue (in our case, it will be named "STREAM").
- Keep the batch size at 10.
- Check "Enable trigger."
- Finally, click on "Add."
Sending SQS Messages to Lambda
To connect SQS to Lambda, we'll use the ARN for the Lambda function. The easiest way to do this is to go to the SQS page, click on "Queue Actions," and then click on "Trigger a Lambda Function." Doing so will open a dialog.
Select "STREAM" from the dropdown menu and the Lambda Function ARN should automatically populate. Click "Save" once you're finished. It takes around 30 seconds to 1 minute for the connection to get fully up and running. You can see the status under the "Lambda Triggers" section as shown in the screenshot below.
Now, when a message enters the queue, it'll automatically be piped to a Lambda function for processing!
Testing Lambda Functionality
Now that we have SQS, Lambda and Stream connected, let's run an end to end test to ensure that messages are making it to the Lambda function.
On the Lambda page, specify the contents of your function to be the following:
exports.handler = async (event) => {
console.log(event.Records[0].body);
};
View the code on Gist.
Then click on the "Save" button in the top right-hand corner.
Once you've successfully saved your Lambda code, you will need to click "Monitoring" which will take you to another page. Once redirected, click "View logs in CloudWatch."
Head back over to the Stream dashboard where you previously configured your SQS settings. There will be a handy button labeled "Test SQS." Clicking on this will send a test payload to SQS, which then be forwarded over to Lambda where it will be logged out into CloudWatch. Whew!
What you should see in the payload is an example activity for a tweet activity (example shown above). If nothing showed up in CloudWatch, try reloading. If that doesn't work, run through the steps outlined above one more time to be sure that you've checked every box.
Piping the Payload to AWS SNS
Let's do something with the example payload. I'm going to propose that we use AWS SNS to forward the payload as a text message. To enable SNS, we'll need to add the AmazonSNSFullAccess to the STREAM user. To do that, head back to the IAM section of AWS (here). From there, click Groups > STREAM > Add Permissions > Attach Policy and choose AmazonSNSFullAccess and AWSLambdaFullAccess from the dropdown menu. Once selected, click "Attach Policy" in the bottom right-hand corner.
Your permissions should now look like this:
The STREAM user now has permissions for SNS, so let's go ahead and set it up. Head over to the SNS section of AWS and click "Get Started." Next steps are listed below:
Steps:- Click "Create topic."
- Set your "Topic name" and "Display name" to STREAM and then click "Create topic."
- Copy your "Topic ARN" as you'll need that in a bit.
Next, we need to modify our Lambda. As you probably noticed, modifying Lambda code in the editor is pretty difficult — or at least it would be on a larger scale. Instead, I recommend creating a new Node.js project somewhere on your computer and installing node-lambda . This awesome little module has the ability to set up, dry-run, package, and deploy your Lambda to AWS without almost no effort.
Make a directory on your local machine called "stream" and move into that directory. Once inside, run the command node-lambda setup
and the node-lambda package should bootstrap a Lambda for you. After running setup, it's a good idea to ignore the generated event.json and .env files, as well as .lambda.
echo -e ".env\ndeploy.env\nevent.json\n.lambda" >> .gitignore
Copy and Paste the Following Commands
touch index.js && yarn add node-lambda aws-sdk --dev && yarn init
Note: For those who are not familiar with the command line, the command above creates a new file (index.js) and installs all of the node modules that we'll need.
Follow the prompts in the terminal to finish up.
Modify Your package.json File
Open package.json and change scripts section to this:
"scripts": {
"start": "node index.js",
"dry-run": "node-lambda run",
"deploy": "node-lambda deploy --configFile deploy.env",
"setup": "node-lambda setup",
"package": "node-lambda package"
}
Open .env and Modify Your Keys
Inside of the directory you generated, there should be a .env file and paste your AWS credentials, role ARN (which you've copied before), and fill in the rest of the parameters to match your configuration in AWS. I recommend that you specify the following settings:
AWS_ENVIRONMENT=development
AWS_ACCESS_KEY_ID=YOUR_AWS_KEY
AWS_SECRET_ACCESS_KEY=YOUR_AWS_SECRET
AWS_ROLE_ARN=YOUR_ROLE_ARN
AWS_REGION=us-east-1
AWS_FUNCTION_NAME=STREAM
AWS_HANDLER=index.handler
AWS_MEMORY_SIZE=128
AWS_TIMEOUT=3
AWS_RUNTIME=nodejs6.10
EXCLUDE_GLOBS="event.json"
PACKAGE_DIRECTORY=build
Update Your index.js File
Update your index.js file with the following code:
exports.handler = async (event) => {
const data = event.Records[0].body;
console.log(data);
};
Note: We're still logging out the contents that Stream sends as an example.
And most importantly, deploy your code!
yarn run deploy
Fix the SQS Connection
When we deployed this time around, it created a Lambda called "STREAM-development" or should have, assuming you followed the directions to a tee. If you did not name your Lambda "STREAM," follow along with your naming convention.
Provided that this is a new Lambda, we'll need to add a new "trigger." This is the same as we did previously in the post, so there's nothing new here. Just click on SQS from the left-hand drawer on the AWS Lambda page (for your new Lambda), and specify the configuration if necessary.
Associate Lambda With SNS Topic
Remember the ARN from the topic we created? Now is the time to use it. Open your .env file and create a new environment variable called AWS_SNS_TOPIC_ARN and set the value to your generated SNS Topic ARN.
Create Your SNS Subscription
Before we wrap up and test, we need to generate a subscription for your phone number. This can be done under the SNS section of the AWS dashboard. To create a subscription, click on topics > select your existing topic and click "Create Subscription." Specify the protocol as SMS and the endpoint should be a valid E.164 formatted phone number (e.g. 1-555-555-5555).
Upload the Final Code
The second to last part of this SNS setup is uploading some updated code to your Lambda function. Simply update your index.js file with the code below and use the yarn deploy
command to push it to AWS.
// Dependencies
const AWS = require('aws-sdk');
const sns = new AWS.SNS();
const moment = require('moment');
exports.handler = (event, context, callback) => {
// Parse incoming event payload
const payload = JSON.parse(event.Records[0].body)[0].new[0];
// This should be your database
const cache = {
actors: ['Nick'],
verbs: {
tweet: 'tweeted'
}
};
const actor = cache.actors[0]; // Hardcoding user, should be a database lookup
const verb = cache.verbs[payload.verb]; // Hardcoding verb (should be a ternary or if/else statement)
const tweet = payload.tweet; // The example always contains a tweet
const time = moment(payload.time).format('MMMM Do YYYY [at] h:mm:ss A'); // Time is ISO-8601 formatted
// Build params
const params = {
Message: `${actor} ${verb} "${tweet}" on ${time}`, // SMS message that gets sent to the subscriber (AKA your device)
TopicArn: 'YOUR_AWS_SNS_TOPIC_ARN' // Only works on localhost (hardcode value for AWS deployments)
};
// Send a SMS via AWS SNS
sns.publish(params, (err, data) => {
if (err) {
return callback(err.stack);
} else {
callback(null, data);
}
});
};
All Done!
Congratulations! You've set up an end-to-end test using Stream, AWS SQS, Lambda, and SNS to send an SMS message when an event happens to a feed (even if you are the one pressing the "Test SQS" button).
I've hosted the repo on GitHub, so feel free to clone it if you run into issues.
Happy coding!
Published at DZone with permission of Nick Parsons, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments