Calculating TF-IDF With Apache Spark
In this article, we look at using Apache Spark with the Spark SQL API to write TF-IDF algorithms using Scala for texting mining.
Join the DZone community and get the full member experience.
Join For FreeTerm Frequency-Inverse Document Frequency (TF-IDF) is a widely known technique in text processing. This technique allows one to assign each term in a document a weight. Terms with high frequency within a document have high weights. In addition, terms frequently appearing in all documents of the document corpus have lower weights.
TF-IDF is used in a large variety of applications. Typical use cases include:
- Document search.
- Document tagging.
- Text preprocessing and feature vector engineering for Machine Learning algorithms.
There is a vast number of resources on the web explaining the concept itself and the calculation algorithm. This article does not repeat the information in these other Internet resources, it just illustrates TF-IDF calculation with help of Apache Spark. Emml Asimadi, in his excellent article Understanding TF-IDF, shares an approach based on the old Spark RDD and the Python language. This article, on the other hand, uses the modern Spark SQL API and Scala language.
Although Spark MLlib has an API to calculate TF-IDF, this API is not convenient to learn the concept. MLlib tools are intended to generate feature vectors for ML algorithms. There is no way to figure out the weight for a particular term in a particular document. Well, let's make it from scratch, this will sharpen our skills.
First, we will implement the TF-IDF calculation algorithm. Then we will plug our implementation into a simple document search engine.
Calculating TF-IDF
1. Term Frequency - TF
TF is the number of times a term occurs within a document. This means that a term has different TF values for different documents of the corpus.
But what is a document? In our case, we suppose that each document in our document corpus is already preprocessed to a bag of words. For the sake of brevity, we omit preprocessing steps like tokenization, stop words removal, punctuation removal, other types of cleanup. Let's assume that we have a data set of documents (Spark DataFrame) like below:
+--------------------------------------------+
|document |
+--------------------------------------------+
|[one, flesh, one, bone, one, true, religion]|
|[all, flesh, is, grass] |
|[one, is, all, all, is, one] |
+--------------------------------------------+
First, we need unique identifiers for all documents in our document corpus. Let's make it:
documents.withColumn("doc_id", monotonically_increasing_id())
+--------------------------------------------+------+
|document |doc_id|
+--------------------------------------------+------+
|[one, flesh, one, bone, one, true, religion]|0 |
|[all, flesh, is, grass] |1 |
|[one, is, all, all, is, one] |2 |
+--------------------------------------------+------+
Now, remember: we want to count terms in documents. We need to make a step from a set of documents to a set of tokens belonging to documents. In other words, "unfold" each document:
val columns = documents.columns.map(col) :+
(explode(col("document")) as "token")
val unfoldedDocs = documents.select(columns: _*)
+--------------------------------------------+------+--------+
|document |doc_id|token |
+--------------------------------------------+------+--------+
|[one, flesh, one, bone, one, true, religion]|0 |one |
|[one, flesh, one, bone, one, true, religion]|0 |flesh |
|[one, flesh, one, bone, one, true, religion]|0 |one |
|[one, flesh, one, bone, one, true, religion]|0 |bone |
|[one, flesh, one, bone, one, true, religion]|0 |one |
|[one, flesh, one, bone, one, true, religion]|0 |true |
|[one, flesh, one, bone, one, true, religion]|0 |religion|
|[all, flesh, is, grass] |1 |all |
|[all, flesh, is, grass] |1 |flesh |
|[all, flesh, is, grass] |1 |is |
|[all, flesh, is, grass] |1 |grass |
|[one, is, all, all, is, one] |2 |one |
|[one, is, all, all, is, one] |2 |is |
|[one, is, all, all, is, one] |2 |all |
|[one, is, all, all, is, one] |2 |all |
|[one, is, all, all, is, one] |2 |is |
|[one, is, all, all, is, one] |2 |one |
+--------------------------------------------+------+--------+
The explode function from the Spark SQL API does the job: it "explodes" an array of tokens so each token comes in a separate row.
Now we are ready to calculate term frequencies - just count them for each document:
unfoldedDocs.groupBy("doc_id", "token")
.agg(count("document") as "tf")
+------+--------+---+
|doc_id|token |tf |
+------+--------+---+
|0 |one |3 |
|1 |is |1 |
|1 |all |1 |
|0 |true |1 |
|2 |all |2 |
|1 |grass |1 |
|0 |religion|1 |
|1 |flesh |1 |
|2 |one |2 |
|0 |bone |1 |
|2 |is |2 |
|0 |flesh |1 |
+------+--------+---+
The result is as expected: for example, the term "one" occurs three times in the first document and twice in the third document. Now we are putting this result aside for a while and advancing to the next topic.
2. Document Frequency - DF
DF of a term is the number of documents having this term. We can count documents for each term in the unfoldedDocs data set that we have calculated in the previous step:
unfoldedDocs.groupBy("token")
.agg(countDistinct("doc_id") as "df")
+--------+---+
|token |df |
+--------+---+
|bone |1 |
|religion|1 |
|one |2 |
|grass |1 |
|flesh |2 |
|is |2 |
|all |2 |
|true |1 |
+--------+---+
We use the countDistinct function from the Spark SQL API to count distinct documents for each term.
3. Inverse Document Frequency - IDF
Thanks to IDF, we can suppress commonly used words like "is" or "one" from our sample document corpus. Such words, despite having high TF, do not convey information about relevance or irrelevance of a particular document. IDF efficiently down-weights "background noise" terms.
In this article, we use the same formula to calculate IDF that is included in Spark MLlib:
IDF(t,D) = log[ (|D| + 1) / (DF(t,D) + 1) ],
Where:
- IDF(t, D) is the IDF of the term t in the document corpus D.
- |D| is the total number of documents in the corpus D.
- DF(t, D) is the DF of the term t in the document corpus D.
Sounds clear? Let's code it. We use the Swiss army knife of the Spark SQL API - user-defined functions (UDF) - to calculate IDF for all rows in the DF data set from the previous step:
val calcIdfUdf = udf { df: Long => calcIdf(docCount, df) }
tokensWithDf.withColumn("idf", calcIdfUdf(col("df")))
+--------+---+-------------------+
|token |df |idf |
+--------+---+-------------------+
|bone |1 |0.6931471805599453 |
|religion|1 |0.6931471805599453 |
|one |2 |0.28768207245178085|
|grass |1 |0.6931471805599453 |
|flesh |2 |0.28768207245178085|
|is |2 |0.28768207245178085|
|all |2 |0.28768207245178085|
|true |1 |0.6931471805599453 |
+--------+---+-------------------+
In the code snippet above, calcIdf()
is a function that literally implements the formula to calculate IDF.
We are almost done. Having TF and IDF at hands, we are ready to make the final step - calculate TF-IDF.
4. TF-IDF
TF-IDF of a term is a product of the term's TF and IDF. Thus, a term has different TF-IDF values for different documents in the corpus.
So far we have two data sets:
- tokensWithTf - term frequencies calculated at step 1.
- tokensWithIdf - inverse document frequencies obtained at step 3.
We have to join them in order to calculate TF-IDF for all terms on a per-document basis:
tokensWithTf
.join(tokensWithIdf, Seq("token"), "left")
.withColumn("tf_idf", col("tf") * col("idf"))
+--------+------+---+---+-------------------+-------------------+
|token |doc_id|tf |df |idf |tf_idf |
+--------+------+---+---+-------------------+-------------------+
|one |0 |3 |2 |0.28768207245178085|0.8630462173553426 |
|is |1 |1 |2 |0.28768207245178085|0.28768207245178085|
|all |1 |1 |2 |0.28768207245178085|0.28768207245178085|
|true |0 |1 |1 |0.6931471805599453 |0.6931471805599453 |
|all |2 |2 |2 |0.28768207245178085|0.5753641449035617 |
|grass |1 |1 |1 |0.6931471805599453 |0.6931471805599453 |
|religion|0 |1 |1 |0.6931471805599453 |0.6931471805599453 |
|flesh |1 |1 |2 |0.28768207245178085|0.28768207245178085|
|one |2 |2 |2 |0.28768207245178085|0.5753641449035617 |
|bone |0 |1 |1 |0.6931471805599453 |0.6931471805599453 |
|is |2 |2 |2 |0.28768207245178085|0.5753641449035617 |
|flesh |0 |1 |2 |0.28768207245178085|0.28768207245178085|
+--------+------+---+---+-------------------+-------------------+
Optionally, we can join this resulting data frame with the original data frame of documents in order to retain the info about documents. We can also omit intermediate results:
+------+--------------------------------------------+--------+-------------------+
|doc_id|document |token |tf_idf |
+------+--------------------------------------------+--------+-------------------+
|0 |[one, flesh, one, bone, one, true, religion]|one |0.8630462173553426 |
|0 |[one, flesh, one, bone, one, true, religion]|religion|0.6931471805599453 |
|0 |[one, flesh, one, bone, one, true, religion]|bone |0.6931471805599453 |
|0 |[one, flesh, one, bone, one, true, religion]|true |0.6931471805599453 |
|0 |[one, flesh, one, bone, one, true, religion]|flesh |0.28768207245178085|
|1 |[all, flesh, is, grass] |grass |0.6931471805599453 |
|1 |[all, flesh, is, grass] |all |0.28768207245178085|
|1 |[all, flesh, is, grass] |is |0.28768207245178085|
|1 |[all, flesh, is, grass] |flesh |0.28768207245178085|
|2 |[one, is, all, all, is, one] |all |0.5753641449035617 |
|2 |[one, is, all, all, is, one] |is |0.5753641449035617 |
|2 |[one, is, all, all, is, one] |one |0.5753641449035617 |
+------+--------------------------------------------+--------+-------------------+
That's it! We have calculated TF-IDF weights of each term within all documents of the document corpus.
Using TF-IDF to Search Documents
Now we have quite a powerful weapon in our hands, so let's test it in the field. The rest of this article describes a simple (by no means competing with Google) document search engine based on TF-IDF ranking.
The source code is available on GitHub: spark-sql-tfidf. You can find the build and running instructions on the project page.
The search engine accepts a set of user-specified keywords. Then it ranks all documents in the database against the user's set of keywords. The ranking formula is trivial:
Rank(d, keywords) = TF-IDF(keyword1, d) + ... + TF-IDF(keywordN, d)
In other words, the rank of a document is a sum of TF-IDF weights for all the user's keywords within this document. The search engine picks the top 5 documents and outputs them in the order of relevance (highest rank first).
The project has a dummy database of song lyrics, so you can try to look for songs matching your mood:
Enter keywords separated by spaces (CTRL-C for exit):
love forever
Found:
1. love_me_tender.txt
2. red_river_valley.txt
Happy text mining!
Published at DZone with permission of Arseniy Tashoyan. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments