Building a Real-Time Slackbot With Generative AI
Learn how to build a cool Slackbot with Apache NiFi, LLM, Foundation Models, and streaming. We will cover model choices and integration.
Join the DZone community and get the full member experience.
Join For FreeIn this article, I will show you how to use Cloudera DataFlow powered by Apache NiFi to interact with IBM WatsonX.AI foundation large language models in real time. We can work with any of the foundation models such as Google FLAN T5 XXL or IBM Granite models.
I’ll show you how easy it is to build a real-time data pipeline feeding your Slack-like and mobile applications questions directly to secure WatsonX.AI models running in IBM Cloud. We will handle all the security, management, lineage, and governance with Cloudera Data Flow. As part of decision-making, we can choose different WatsonX.AI models on the fly based on what type of prompt it is. For example, if we want to continue a sentence versus answering a question I can pick different models. For questions answering Google FLAN T5 XXL works well. If I want to continue sentences I would use one of the IBM Granite models.
You will notice how amazingly fast the WatsonX.AI models return the results we need. I do some quick enrichment and transformation and then send them out their way to Cloudera Apache Kafka to be used for continuous analytics and distribution to many other applications, systems, platforms, and downstream consumers. We will also output our answers to the original requester which could be someone in a Slack channel or someone in an application. All of this happens in real-time, with no code, full governance, lineage, data management, and security at any scale and on any platform.
The power of IBM and Cloudera together in private, public, and hybrid cloud environments for real-time data and AI is just getting started. Try it today.
Step By Step Real-Time Flow
First, in Slack, I type a question:
“Q: What is a good way to integrate Generative AI and Apache NiFi?”
Once that question is typed, the Slack server sends these events to our registered service. This can be hosted anywhere publicly facing.
- (Click here for Slack API link)
Once enabled, your server will start receiving JSON events for each Slack post. This is easy to receive and parse in NiFi. Cloudera DataFlow enables receiving secure HTTPS REST calls in the public cloud-hosted edition with ease, even in Designer mode.
In the first part of the flow, we received the REST JSON Post, which is as follows.
Slackbot 1.0 (+https://api.slack.com/robots) application/json POST HTTP/1.1 { "token" : "qHvJe59yetAp1bao6wmQzH0C", "team_id" : "T1SD6MZMF", "context_team_id" : "T1SD6MZMF", "context_enterprise_id" : null, "api_app_id" : "A04U64MN9HS", "event" : { "type" : "message", "subtype" : "bot_message", "text" : "==== NiFi to IBM <http://WatsonX.AI|WatsonX.AI> LLM Answers\n\nOn Date: Wed, 15 Nov 20
This is a very rich detailed JSON file that we could push immediately raw to an Apache Iceberg Open Cloud Lakehouse, a Kafka topic, or an object store as a JSON document (Enhancement Option). I am just going to parse what I need.
We parse out the channel ID and plain text of the post. I only want messages from general (“C1SD6N197”). Then I copy the texts to an inputs field as is required for Hugging Face.
We check our input: if it’s stocks or weather (more to come) we avoid calling the LLM.
SELECT * FROM FLOWFILE WHERE upper(inputs) like '%WEATHER%' AND not upper(inputs) like '%LLM SKIPPED%' SELECT * FROM FLOWFILE WHERE upper(inputs) like '%STOCK%' AND not upper(inputs) like '%LLM SKIPPED%' SELECT * FROM FLOWFILE WHERE (upper(inputs) like 'QUESTION:%' OR upper(inputs) like 'Q:%') and not upper(inputs) like '%WEATHER%' and not upper(inputs) like '%STOCK%'
For Stocks processing:
To parse what stock we need I am using my Open NLP processor to get it.
So you will need to download the processor and the Entity extraction models.
Then we pass that company name to an HTTP REST endpoint from AlphaVantage that converts the Company Name to Stock symbols. In free accounts, you only get a few calls a day, so if we fail we then bypass this step and try to just use whatever you passed in.
Using RouteOnContent we filter an Error message out.
Then we use a QueryRecord processor to convert from CSV to JSON and filter.
SELECT name as companyName, symbol FROM FLOWFILE ORDER BY matchScore DESC LIMIT 1
We do a SplitRecord to ensure we are only one record. We then run EvaluateJsonPath to get our fields as attributes.
In an UpdateAttribute we trim the symbol just in case.
${stockSymbol:trim()}
We then pass that stock symbol to Twelve Data via InvokeHTTP to get our stock data.
We then get a lot of stock data back.
{ "meta" : { "symbol" : "IBM", "interval" : "1min", "currency" : "USD", "exchange_timezone" : "America/New_York", "exchange" : "NYSE", "mic_code" : "XNYS", "type" : "Common Stock" }, "values" : [ { "datetime" : "2023-11-15 10:37:00", "open" : "152.07001", "high" : "152.08000", "low" : "151.99500", "close" : "152.00999", "volume" : "8525" }, { "datetime" : "2023-11-15 10:36:00", "open" : "152.08501", "high" : "152.12250", "low" : "152.08000", "close" : "152.08501", "volume" : "15204" } ...
We then run EvaluateJSONPath to grab the exchange information.
We fork the record to just get one record as this is just to return to Slack. We use UpdateRecord calls to enrich the stock data with other values. We then run a QueryRecord to limit us to 1 record to send to Slack.
SELECT * FROM FLOWFILE ORDER BY 'datetime' DESC LIMIT 1
We run an EvaluateJsonPath to get the most value fields to display.
We then run a PutSlack with our message.
LLM Skipped. Stock Value for ${companyName} [${nlp_org_1}/${stockSymbol}] on ${date} is ${closeStockValue}. stock date ${stockdateTime}. stock exchange ${exchange}
We also have a separate flow that is split from Company Name.
In the first step, we call Yahoo Finance to get RSS headlines for that stock.
https://feeds.finance.yahoo.com/rss/2.0/headline?s=${stockSymbol:trim()}®ion=US&lang=en-US
We use QueryRecord to convert RSS/XML Records to JSON.
We then run a SplitJSON to break out the news items.
We run a SplitRecord to limit to 1 record. We use EvaluateJSONPath to get the fields we need for our Slack message.
We then run UpdateRecord to finalize our JSON.
We then send this message to Slack.
LLM Skipped. Stock News Information for ${companyName} [${nlp_org_1}/${stockSymbol}] on ${date} ${title} : ${description}. ${guid} article date ${pubdate}
For those who selected weather, we follow a similar route (we should add caching with Redis @ Aiven) to stocks. We use my OpenNLP processor to extract locations you might want to have weather on.
The next step is taking the output of the processor and building a value to send to our Geoencoder.
weatherlocation = ${nlp_location_1:notNull():ifElse(${nlp_location_1}, "New York City")}
If we can’t find a valid location, I am going to say “New York City." We could use some other lookup. I am doing some work on loading all locations and could do some advanced PostgreSQL searches on that - or perhaps OpenSearch or a vectorized datastore.
I pass that location to Open Meteo to find the geo via InvokeHTTP.
https://geocoding-api.open-meteo.com/v1/search?name=${weatherlocation:trim():urlEncode()}&count=1&language=en&format=json
We then parse the values we need from the results.
{ "results" : [ { "id" : 5128581, "name" : "New York", "latitude" : 40.71427, "longitude" : -74.00597, "elevation" : 10.0, "feature_code" : "PPL", "country_code" : "US", "admin1_id" : 5128638, "timezone" : "America/New_York", "population" : 8175133, "postcodes" : [ "10001", "10002", "10003", "10004", "10005", "10006", "10007", "10008", "10009", "10010", "10011", "10012", "10013", "10014", "10016", "10017", "10018", "10019", "10020", "10021", "10022", "10023", "10024", "10025", "10026", "10027", "10028", "10029", "10030", "10031", "10032", "10033", "10034", "10035", "10036", "10037", "10038", "10039", "10040", "10041", "10043", "10044", "10045", "10055", "10060", "10065", "10069", "10080", "10081", "10087", "10090", "10101", "10102", "10103", "10104", "10105", "10106", "10107", "10108", "10109", "10110", "10111", "10112", "10113", "10114", "10115", "10116", "10117", "10118", "10119", "10120", "10121", "10122", "10123", "10124", "10125", "10126", "10128", "10129", "10130", "10131", "10132", "10133", "10138", "10150", "10151", "10152", "10153", "10154", "10155", "10156", "10157", "10158", "10159", "10160", "10161", "10162", "10163", "10164", "10165", "10166", "10167", "10168", "10169", "10170", "10171", "10172", "10173", "10174", "10175", "10176", "10177", "10178", "10179", "10185", "10199", "10203", "10211", "10212", "10213", "10242", "10249", "10256", "10258", "10259", "10260", "10261", "10265", "10268", "10269", "10270", "10271", "10272", "10273", "10274", "10275", "10276", "10277", "10278", "10279", "10280", "10281", "10282", "10285", "10286" ], "country_id" : 6252001, "country" : "United States", "admin1" : "New York" } ], "generationtime_ms" : 0.92196465 }
We then parse the results so we can call another API to get the current weather for that latitude and longitude via InvokeHTTP.
https://api.weather.gov/points/${latitude:trim()},${longitude:trim()}
The results are geo-json.
{ "@context": [ "https://geojson.org/geojson-ld/geojson-context.jsonld", { "@version": "1.1", "wx": "https://api.weather.gov/ontology#", "s": "https://schema.org/", "geo": "http://www.opengis.net/ont/geosparql#", "unit": "http://codes.wmo.int/common/unit/", "@vocab": "https://api.weather.gov/ontology#", "geometry": { "@id": "s:GeoCoordinates", "@type": "geo:wktLiteral" }, "city": "s:addressLocality", "state": "s:addressRegion", "distance": { "@id": "s:Distance", "@type": "s:QuantitativeValue" }, "bearing": { "@type": "s:QuantitativeValue" }, "value": { "@id": "s:value" }, "unitCode": { "@id": "s:unitCode", "@type": "@id" }, "forecastOffice": { "@type": "@id" }, "forecastGridData": { "@type": "@id" }, "publicZone": { "@type": "@id" }, "county": { "@type": "@id" } } ], "id": "https://api.weather.gov/points/40.7143,-74.006", "type": "Feature", "geometry": { "type": "Point", "coordinates": [ -74.006, 40.714300000000001 ] }, "properties": { "@id": "https://api.weather.gov/points/40.7143,-74.006", "@type": "wx:Point", "cwa": "OKX", "forecastOffice": "https://api.weather.gov/offices/OKX", "gridId": "OKX", "gridX": 33, "gridY": 35, "forecast": "https://api.weather.gov/gridpoints/OKX/33,35/forecast", "forecastHourly": "https://api.weather.gov/gridpoints/OKX/33,35/forecast/hourly", "forecastGridData": "https://api.weather.gov/gridpoints/OKX/33,35", "observationStations": "https://api.weather.gov/gridpoints/OKX/33,35/stations", "relativeLocation": { "type": "Feature", "geometry": { "type": "Point", "coordinates": [ -74.0279259, 40.745251000000003 ] }, "properties": { "city": "Hoboken", "state": "NJ", "distance": { "unitCode": "wmoUnit:m", "value": 3906.1522008034999 }, "bearing": { "unitCode": "wmoUnit:degree_(angle)", "value": 151 } } }, "forecastZone": "https://api.weather.gov/zones/forecast/NYZ072", "county": "https://api.weather.gov/zones/county/NYC061", "fireWeatherZone": "https://api.weather.gov/zones/fire/NYZ212", "timeZone": "America/New_York", "radarStation": "KDIX" } }
We use EvaluateJSONPath to grab a forecast URL.
Then we call that forecast URL via invokeHTTP.
That produces a larger JSON output that we will parse for the results we want to return to Slack.
{
"@context": [
"https://geojson.org/geojson-ld/geojson-context.jsonld",
{
"@version": "1.1",
"wx": "https://api.weather.gov/ontology#",
"geo": "http://www.opengis.net/ont/geosparql#",
"unit": "http://codes.wmo.int/common/unit/",
"@vocab": "https://api.weather.gov/ontology#"
}
],
"type": "Feature",
"geometry": {
"type": "Polygon",
"coordinates": [
[
[
-74.025095199999996,
40.727052399999998
],
[
-74.0295579,
40.705361699999997
],
[
-74.000948300000005,
40.701977499999998
],
[
-73.996479800000003,
40.723667899999995
],
[
-74.025095199999996,
40.727052399999998
]
]
]
},
"properties": {
"updated": "2023-11-15T14:34:46+00:00",
"units": "us",
"forecastGenerator": "BaselineForecastGenerator",
"generatedAt": "2023-11-15T15:11:39+00:00",
"updateTime": "2023-11-15T14:34:46+00:00",
"validTimes": "2023-11-15T08:00:00+00:00/P7DT17H",
"elevation": {
"unitCode": "wmoUnit:m",
"value": 2.1335999999999999
},
"periods": [
{
"number": 1,
"name": "Today",
"startTime": "2023-11-15T10:00:00-05:00",
"endTime": "2023-11-15T18:00:00-05:00",
"isDaytime": true,
"temperature": 51,
"temperatureUnit": "F",
"temperatureTrend": null,
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": null
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": 2.2222222222222223
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 68
},
"windSpeed": "1 to 7 mph",
"windDirection": "SW",
"icon": "https://api.weather.gov/icons/land/day/bkn?size=medium",
"shortForecast": "Partly Sunny",
"detailedForecast": "Partly sunny, with a high near 51. Southwest wind 1 to 7 mph."
},
{
"number": 2,
"name": "Tonight",
"startTime": "2023-11-15T18:00:00-05:00",
"endTime": "2023-11-16T06:00:00-05:00",
"isDaytime": false,
"temperature": 44,
"temperatureUnit": "F",
"temperatureTrend": null,
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": null
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": 3.8888888888888888
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 82
},
"windSpeed": "8 mph",
"windDirection": "SW",
"icon": "https://api.weather.gov/icons/land/night/sct?size=medium",
"shortForecast": "Partly Cloudy",
"detailedForecast": "Partly cloudy, with a low around 44. Southwest wind around 8 mph."
},
{
"number": 3,
"name": "Thursday",
"startTime": "2023-11-16T06:00:00-05:00",
"endTime": "2023-11-16T18:00:00-05:00",
"isDaytime": true,
"temperature": 60,
"temperatureUnit": "F",
"temperatureTrend": "falling",
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": null
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": 5.5555555555555554
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 82
},
"windSpeed": "6 mph",
"windDirection": "SW",
"icon": "https://api.weather.gov/icons/land/day/few?size=medium",
"shortForecast": "Sunny",
"detailedForecast": "Sunny. High near 60, with temperatures falling to around 58 in the afternoon. Southwest wind around 6 mph."
},
{
"number": 4,
"name": "Thursday Night",
"startTime": "2023-11-16T18:00:00-05:00",
"endTime": "2023-11-17T06:00:00-05:00",
"isDaytime": false,
"temperature": 47,
"temperatureUnit": "F",
"temperatureTrend": null,
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": null
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": 6.1111111111111107
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 80
},
"windSpeed": "3 mph",
"windDirection": "SW",
"icon": "https://api.weather.gov/icons/land/night/few?size=medium",
"shortForecast": "Mostly Clear",
"detailedForecast": "Mostly clear, with a low around 47. Southwest wind around 3 mph."
},
{
"number": 5,
"name": "Friday",
"startTime": "2023-11-17T06:00:00-05:00",
"endTime": "2023-11-17T18:00:00-05:00",
"isDaytime": true,
"temperature": 63,
"temperatureUnit": "F",
"temperatureTrend": "falling",
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": 20
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": 12.222222222222221
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 86
},
"windSpeed": "2 to 10 mph",
"windDirection": "S",
"icon": "https://api.weather.gov/icons/land/day/bkn/rain,20?size=medium",
"shortForecast": "Partly Sunny then Slight Chance Light Rain",
"detailedForecast": "A slight chance of rain after 1pm. Partly sunny. High near 63, with temperatures falling to around 61 in the afternoon. South wind 2 to 10 mph. Chance of precipitation is 20%."
},
{
"number": 6,
"name": "Friday Night",
"startTime": "2023-11-17T18:00:00-05:00",
"endTime": "2023-11-18T06:00:00-05:00",
"isDaytime": false,
"temperature": 51,
"temperatureUnit": "F",
"temperatureTrend": null,
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": 70
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": 12.777777777777779
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 100
},
"windSpeed": "6 to 10 mph",
"windDirection": "SW",
"icon": "https://api.weather.gov/icons/land/night/rain,60/rain,70?size=medium",
"shortForecast": "Light Rain Likely",
"detailedForecast": "Rain likely. Cloudy, with a low around 51. Chance of precipitation is 70%. New rainfall amounts between a quarter and half of an inch possible."
},
{
"number": 7,
"name": "Saturday",
"startTime": "2023-11-18T06:00:00-05:00",
"endTime": "2023-11-18T18:00:00-05:00",
"isDaytime": true,
"temperature": 55,
"temperatureUnit": "F",
"temperatureTrend": null,
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": 70
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": 11.111111111111111
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 100
},
"windSpeed": "8 to 18 mph",
"windDirection": "NW",
"icon": "https://api.weather.gov/icons/land/day/rain,70/rain,30?size=medium",
"shortForecast": "Light Rain Likely",
"detailedForecast": "Rain likely before 1pm. Partly sunny, with a high near 55. Chance of precipitation is 70%."
},
{
"number": 8,
"name": "Saturday Night",
"startTime": "2023-11-18T18:00:00-05:00",
"endTime": "2023-11-19T06:00:00-05:00",
"isDaytime": false,
"temperature": 40,
"temperatureUnit": "F",
"temperatureTrend": null,
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": null
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": 1.1111111111111112
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 65
},
"windSpeed": "12 to 17 mph",
"windDirection": "NW",
"icon": "https://api.weather.gov/icons/land/night/few?size=medium",
"shortForecast": "Mostly Clear",
"detailedForecast": "Mostly clear, with a low around 40."
},
{
"number": 9,
"name": "Sunday",
"startTime": "2023-11-19T06:00:00-05:00",
"endTime": "2023-11-19T18:00:00-05:00",
"isDaytime": true,
"temperature": 50,
"temperatureUnit": "F",
"temperatureTrend": null,
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": null
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": -0.55555555555555558
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 65
},
"windSpeed": "10 to 14 mph",
"windDirection": "W",
"icon": "https://api.weather.gov/icons/land/day/few?size=medium",
"shortForecast": "Sunny",
"detailedForecast": "Sunny, with a high near 50."
},
{
"number": 10,
"name": "Sunday Night",
"startTime": "2023-11-19T18:00:00-05:00",
"endTime": "2023-11-20T06:00:00-05:00",
"isDaytime": false,
"temperature": 38,
"temperatureUnit": "F",
"temperatureTrend": null,
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": null
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": -0.55555555555555558
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 67
},
"windSpeed": "13 mph",
"windDirection": "NW",
"icon": "https://api.weather.gov/icons/land/night/few?size=medium",
"shortForecast": "Mostly Clear",
"detailedForecast": "Mostly clear, with a low around 38."
},
{
"number": 11,
"name": "Monday",
"startTime": "2023-11-20T06:00:00-05:00",
"endTime": "2023-11-20T18:00:00-05:00",
"isDaytime": true,
"temperature": 46,
"temperatureUnit": "F",
"temperatureTrend": null,
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": null
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": -1.6666666666666667
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 70
},
"windSpeed": "13 mph",
"windDirection": "NW",
"icon": "https://api.weather.gov/icons/land/day/sct?size=medium",
"shortForecast": "Mostly Sunny",
"detailedForecast": "Mostly sunny, with a high near 46."
},
{
"number": 12,
"name": "Monday Night",
"startTime": "2023-11-20T18:00:00-05:00",
"endTime": "2023-11-21T06:00:00-05:00",
"isDaytime": false,
"temperature": 38,
"temperatureUnit": "F",
"temperatureTrend": null,
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": null
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": -1.1111111111111112
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 70
},
"windSpeed": "10 mph",
"windDirection": "N",
"icon": "https://api.weather.gov/icons/land/night/sct?size=medium",
"shortForecast": "Partly Cloudy",
"detailedForecast": "Partly cloudy, with a low around 38."
},
{
"number": 13,
"name": "Tuesday",
"startTime": "2023-11-21T06:00:00-05:00",
"endTime": "2023-11-21T18:00:00-05:00",
"isDaytime": true,
"temperature": 49,
"temperatureUnit": "F",
"temperatureTrend": null,
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": 30
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": 2.7777777777777777
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 73
},
"windSpeed": "9 to 13 mph",
"windDirection": "E",
"icon": "https://api.weather.gov/icons/land/day/bkn/rain,30?size=medium",
"shortForecast": "Partly Sunny then Chance Light Rain",
"detailedForecast": "A chance of rain after 1pm. Partly sunny, with a high near 49. Chance of precipitation is 30%."
},
{
"number": 14,
"name": "Tuesday Night",
"startTime": "2023-11-21T18:00:00-05:00",
"endTime": "2023-11-22T06:00:00-05:00",
"isDaytime": false,
"temperature": 46,
"temperatureUnit": "F",
"temperatureTrend": null,
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": 50
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": 7.7777777777777777
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 86
},
"windSpeed": "13 to 18 mph",
"windDirection": "S",
"icon": "https://api.weather.gov/icons/land/night/rain,50?size=medium",
"shortForecast": "Chance Light Rain",
"detailedForecast": "A chance of rain. Mostly cloudy, with a low around 46. Chance of precipitation is 50%."
}
]
}
}
We parse the data with EvaluateJSONPath to get primary fields for the weather.
We then format those fields to PutSlack.
LLM Skipped. Read forecast on ${date} for ${weatherlocation} @ ${latitude},${longitude} Used ${forecasturl} ${icon} Temp: ${temperature} ${temperatureunit} - ${temperaturetrend} There is a wind ${winddirection} at ${windspeed}. ${detailedforecast}
If we do have an LLM question, let’s make sure it’s just one record.
We use a few different models that are available at IBM WatsonX.AI on IBM Cloud to quickly be accessed by our REST prompts.
I tested and built the prompts initially at IBM’s Prompt Lab and then copied the initial curl statement from there.
Click here for supported foundation models available with IBM watsonx.ai.
ibm/mpt-7b-instruct2
meta-llama/llama-2–70b-chat
ibm/granite-13b-chat-v1
We have to send our unique secure key to IBM and they will give us a token to use in our next call.
We parse out the question and then send it to WatsonX via REST API.
We build a prompt to send to IBM as follows.
{ "model_id": "meta-llama/llama-2-70b-chat", "input": "${inputs:urlEncode()}", "parameters": { "decoding_method": "greedy", "max_new_tokens": 200, "min_new_tokens": 50, "stop_sequences": [], "repetition_penalty": 1 }, "project_id": "0ead8ec4-d137-4f9c-8956-50b0da4a7068" }
We parse the generated text which is our Generative AI results plus some helpful metadata on timings.
The result posted to Slack is as follows:
“You can use Apache NiFi to integrate Generative AI models in several ways:
- Data Preprocessing: Use NiFi to preprocess data before feeding it into your Generative AI model. This can include data cleaning, transformation, and feature engineering.
- Model Training: Use NiFi to automate the training process of your Generative AI model. You can use NiFi’s PutFile and PutFile_SFTP processors to write the training data to a file, and then use a processor like ExecuteScript to run the training script.
- Model Deployment: Once your Generative AI model is trained, you can use NiFi to deploy it. You can create a NiFi flow that takes in input data, runs it through the Generative AI model, and then outputs the generated data.
- Real-time Inference: You can use NiFi’s StreamingJobs”
After the Slackbot posted the results, it posted metrics and debugging information to the chat channel.
All of the metadata is posted to another Slack channel for administrator monitoring.
==== NiFi to IBM WatsonX.AI LLM Answers On Date: Wed, 15 Nov 2023 15:43:29 GMT Created: 2023-11-15T15:43:29.248Z Prompt: Q: What is a good way to integrate Generative AI and Apache NiFi? Response: ) You can use Apache NiFi to integrate Generative AI models in several ways: 1. Data Preprocessing: Use NiFi to preprocess data before feeding it into your Generative AI model. This can include data cleaning, transformation, and feature engineering. 2. Model Training: Use NiFi to automate the training process of your Generative AI model. You can use NiFi's PutFile and PutFile_SFTP processors to write the training data to a file, and then use a processor like ExecuteScript to run the training script. 3. Model Deployment: Once your Generative AI model is trained, you can use NiFi to deploy it. You can create a NiFi flow that takes in input data, runs it through the Generative AI model, and then outputs the generated data. 4. Real-time Inference: You can use NiFi's StreamingJobs Token: 200 Req Duration: 8153 HTTP TX ID: 89d71099-da23-4e7e-89f9-4e8f5620c0fb IBM Msg: This model is a Non-IBM Product governed by a third-party license that may impose use restrictions and other obligations. By using this model you agree to its terms as identified in the following URL. URL: https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/fm-models.html?context=wx IBM Msg ID: disclaimer_warning Model ID: meta-llama/llama-2-70b-chat Stop Reason: max_tokens Token Count: 38 TX ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756 UUID: da0806cb-6133-4bf4-808e-1fbf419c09e3 Corr ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756 Global TX ID: 20c3a9cf276c38bcdaf26e3c27d0479b Service Time: 478 Request ID: 03c2726a-dcb6-407f-96f1-f83f20fe9c9c File Name: 1a3c4386-86d2-4969-805b-37649c16addb Request Duration: 8153 Request URL: https://us-south.ml.cloud.ibm.com/ml/v1-beta/generation/text?version=2023-05-29 cf-ray: 82689bfd28e48ce2-EWR =====
Make Your Own Slackbot
Slack Output
Kafka Distribute
Apache Flink SQL Table Creation DDL
CREATE TABLE `ssb`.`Meetups`.`watsonairesults` ( `date` VARCHAR(2147483647), `x_global_transaction_id` VARCHAR(2147483647), `x_request_id` VARCHAR(2147483647), `cf_ray` VARCHAR(2147483647), `inputs` VARCHAR(2147483647), `created_at` VARCHAR(2147483647), `stop_reason` VARCHAR(2147483647), `x_correlation_id` VARCHAR(2147483647), `x_proxy_upstream_service_time` VARCHAR(2147483647), `message_id` VARCHAR(2147483647), `model_id` VARCHAR(2147483647), `invokehttp_request_duration` VARCHAR(2147483647), `message` VARCHAR(2147483647), `uuid` VARCHAR(2147483647), `generated_text` VARCHAR(2147483647), `transaction_id` VARCHAR(2147483647), `tokencount` VARCHAR(2147483647), `generated_token` VARCHAR(2147483647), `ts` VARCHAR(2147483647), `advisoryId` VARCHAR(2147483647), `eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp', WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND ) WITH ( 'deserialization.failure.policy' = 'ignore_and_log', 'properties.request.timeout.ms' = '120000', 'format' = 'json', 'properties.bootstrap.servers' = 'kafka:9092', 'connector' = 'kafka', 'properties.transaction.timeout.ms' = '900000', 'topic' = 'watsonxaillmanswers', 'scan.startup.mode' = 'group-offsets', 'properties.auto.offset.reset' = 'earliest', 'properties.group.id' = 'watsonxaillmconsumer' ) CREATE TABLE `ssb`.`Meetups`.`watsonxresults` ( `date` VARCHAR(2147483647), `x_global_transaction_id` VARCHAR(2147483647), `x_request_id` VARCHAR(2147483647), `cf_ray` VARCHAR(2147483647), `inputs` VARCHAR(2147483647), `created_at` VARCHAR(2147483647), `stop_reason` VARCHAR(2147483647), `x_correlation_id` VARCHAR(2147483647), `x_proxy_upstream_service_time` VARCHAR(2147483647), `message_id` VARCHAR(2147483647), `model_id` VARCHAR(2147483647), `invokehttp_request_duration` VARCHAR(2147483647), `message` VARCHAR(2147483647), `uuid` VARCHAR(2147483647), `generated_text` VARCHAR(2147483647), `transaction_id` VARCHAR(2147483647), `tokencount` VARCHAR(2147483647), `generated_token` VARCHAR(2147483647), `ts` VARCHAR(2147483647), `eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp', WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND ) WITH ( 'deserialization.failure.policy' = 'ignore_and_log', 'properties.request.timeout.ms' = '120000', 'format' = 'json', 'properties.bootstrap.servers' = 'kafka:9092', 'connector' = 'kafka', 'properties.transaction.timeout.ms' = '900000', 'topic' = 'watsonxaillm', 'scan.startup.mode' = 'group-offsets', 'properties.auto.offset.reset' = 'earliest', 'properties.group.id' = 'allwatsonx1' )
Example Prompt
{"inputs":"Please answer to the following question. What is the capital of the United States?"}
IBM DB2 SQL
alter table "DB2INST1"."TRAVELADVISORY" add column "summary" VARCHAR(2048); -- DB2INST1.TRAVELADVISORY definition CREATE TABLE "DB2INST1"."TRAVELADVISORY" ( "TITLE" VARCHAR(250 OCTETS) , "PUBDATE" VARCHAR(250 OCTETS) , "LINK" VARCHAR(250 OCTETS) , "GUID" VARCHAR(250 OCTETS) , "ADVISORYID" VARCHAR(250 OCTETS) , "DOMAIN" VARCHAR(250 OCTETS) , "CATEGORY" VARCHAR(4096 OCTETS) , "DESCRIPTION" VARCHAR(4096 OCTETS) , "UUID" VARCHAR(250 OCTETS) NOT NULL , "TS" BIGINT NOT NULL , "summary" VARCHAR(2048 OCTETS) ) IN "IBMDB2SAMPLEREL" ORGANIZE BY ROW; ALTER TABLE "DB2INST1"."TRAVELADVISORY" ADD PRIMARY KEY ("UUID") ENFORCED; GRANT CONTROL ON TABLE "DB2INST1"."TRAVELADVISORY" TO USER "DB2INST1"; GRANT CONTROL ON INDEX "SYSIBM "."SQL230620142604860" TO USER "DB2INST1"; SELECT "summary", TITLE , ADVISORYID , TS, PUBDATE FROM DB2INST1.TRAVELADVISORY t WHERE "summary" IS NOT NULL ORDER BY ts DESC
Example Output Email
Video
Source Code
Published at DZone with permission of Tim Spann, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments