Word Count With Storm and Scala
Sure, knowing how to implemented a word count Hadoop job using Scala and uploaded it to HDInsight is good — but having a real-world example of how to do so is better.
Join the DZone community and get the full member experience.
Join For FreeApache Storm is a free and open-source distributed realtime computation system running on the JVM.
To get started, we will implement a very simple example. Previously, we implemented a word count Hadoop job using Scala and uploaded it to HDInsight. We will focus on the same word count concept, but for real-time cases and implementing a word count topology utilizing Apache Storm. Our source code will be based on the official Storm examples.
Storm works with spouts and bolts.
First, we shall implement a spout that will emit fake data events — in our case, sentences.
package com.gkatzioura.scala.storm
import org.apache.storm.spout.SpoutOutputCollector
import org.apache.storm.task.TopologyContext
import org.apache.storm.topology.OutputFieldsDeclarer
import org.apache.storm.topology.base.BaseRichSpout
import org.apache.storm.tuple. {
Fields,
Values
}
import org.apache.storm.utils.Utils
import scala.util.Random /** * Created by gkatzioura on 2/17/17. */ class RandomSentenceSpout extends BaseRichSpout {
var _collector: SpoutOutputCollector = _
var _rand: Random = _ override def nextTuple(): Unit = {
Utils.sleep(100) val sentences = Array("the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature") val sentence = sentences(_rand.nextInt(sentences.length)) _collector.emit(new Values(sentence))
}
override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
_collector = collector _rand = Random
}
override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
declarer.declare(new Fields("word"))
}
}
The next step is to implement a bolt that splits the sentences and emits them:
package com.gkatzioura.scala.storm
import java.text.BreakIterator
import org.apache.storm.topology. {
BasicOutputCollector,
OutputFieldsDeclarer
}
import org.apache.storm.topology.base.BaseBasicBolt
import org.apache.storm.tuple. {
Fields,
Tuple,
Values
} /** * Created by gkatzioura on 2/18/17. */
class SplitSentenceBolt extends BaseBasicBolt {
override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
val sentence = input.getString(0) val boundary = BreakIterator.getWordInstance boundary.setText(sentence) var start = boundary.first
var end: Int = start
while (end != BreakIterator.DONE) {
end = boundary.next val word = sentence.substring(start, end).replaceAll("\\s+", "") start = end
if (!word.equals("")) {
collector.emit(new Values(word))
}
}
}
override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
declarer.declare(new Fields("word"))
}
}
The next step is the word count bolt:
package com.gkatzioura.scala.storm
import org.apache.storm.topology. {
BasicOutputCollector,
OutputFieldsDeclarer
}
import org.apache.storm.topology.base.BaseBasicBolt
import org.apache.storm.tuple. {
Fields,
Tuple,
Values
}
/** * Created by gkatzioura on 2/18/17. */
class WordCountBolt extends BaseBasicBolt {
val counts = scala.collection.mutable.Map[String, Int]()
override def execute(input: Tuple, collector: BasicOutputCollector):
Unit = {
val word = input.getString(0)
val optCount = counts.get(word)
if (optCount.isEmpty) {
counts.put(word, 1)
} else {
counts.put(word, optCount.get + 1)
}
collector.emit(new Values(word, counts))
}
override def declareOutputFields(declarer: OutputFieldsDeclarer):
Unit = {
declarer.declare(new Fields("word", "count"));
}
}
The final step is to create our topology, which takes care of whether we run locally or in a cluster environment:
package com.gkatzioura.scala.storm
import org.apache.storm. {
Config,
LocalCluster,
StormSubmitter
}
import org.apache.storm.topology.TopologyBuilder
import org.apache.storm.tuple.Fields
/** * Created by gkatzioura on 2/18/17. */
object WordCountTopology {
def main(args: Array[String]): Unit = {
println("Hello, world!")
val builder = new TopologyBuilder
builder.setSpout("spout", new RandomSentenceSpout, 5)
builder.setBolt("split",
new SplitSentenceBolt, 8).shuffleGrouping("spout")
builder.setBolt("count", new WordCountBolt, 12).fieldsGrouping("split",
new Fields("word"))
val conf = new Config()
conf.setDebug(true)
if (args != null && args.length > 0) {
conf.setNumWorkers(3)
StormSubmitter.submitTopology(args(0),
conf, builder.createTopology())
} else {
conf.setMaxTaskParallelism(3)
val cluster = new LocalCluster
cluster.submitTopology("word-count", conf, builder.createTopology())
Thread.sleep(10000) cluster.shutdown()
}
}
}
Now, we shall build our app. To do so we need to include the assembly plugin in our plugins.sbt
file.
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
Our SBT file is as follows:
name := "ScalaStorm" version := "1.0"
scalaVersion := "2.12.1"
scalacOptions += "-Yresolve-term-conflict:package"
libraryDependencies += "org.apache.storm" % "storm-core" % "1.0.2" % "provided"
Then, we issue a build:
sbt clean compile assembly
You can find the source code on GitHub.
On the next post, we shall deploy our Storm app to HDInsight.
Published at DZone with permission of Emmanouil Gkatziouras, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments