Skip to content

Commit

Permalink
[SPARKNLP-1113] Adding Text Reader
Browse files Browse the repository at this point in the history
  • Loading branch information
danilojsl committed Feb 17, 2025
1 parent 7d2bed7 commit e9c8493
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 1 deletion.
18 changes: 18 additions & 0 deletions python/sparknlp/reader/sparknlp_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,22 @@ def doc(self, docPath):
if not isinstance(docPath, str):
raise TypeError("docPath must be a string")
jdf = self._java_obj.doc(docPath)
return self.getDataFrame(self.spark, jdf)

def txt(self, docPath):
"""Reads TXT files and returns a Spark DataFrame.
Parameters
----------
docPath : str
Path to a TXT file.
Returns
-------
pyspark.sql.DataFrame
A DataFrame containing parsed document content.
"""
if not isinstance(docPath, str):
raise TypeError("docPath must be a string")
jdf = self._java_obj.txt(docPath)
return self.getDataFrame(self.spark, jdf)
15 changes: 14 additions & 1 deletion python/test/sparknlp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,17 @@ def runTest(self):
word_df = sparknlp.read().doc(self.word_file)
word_df.show()

self.assertTrue(word_df.select("doc").count() > 0)
self.assertTrue(word_df.select("doc").count() > 0)

@pytest.mark.fast
class SparkNLPTestTXTFilesSpec(unittest.TestCase):

def setUp(self):
self.data = SparkContextForTest.data
self.txt_file = f"file:///{os.getcwd()}/../src/test/resources/reader/txt/simple-text.txt"

def runTest(self):
txt_df = sparknlp.read().txt(self.txt_file)
txt_df.show()

self.assertTrue(txt_df.select("txt").count() > 0)
58 changes: 58 additions & 0 deletions src/main/scala/com/johnsnowlabs/reader/SparkNLPReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,62 @@ class SparkNLPReader(params: java.util.Map[String, String] = new java.util.HashM
wordReader.doc(docPath)
}

/** Instantiates class to read txt files.
*
* filePath: this is a path to a directory of TXT files or a path to an TXT file E.g.
* "path/txt/files"
*
* ==Example==
* {{{
* val filePath = "home/user/txt/files"
* val sparkNLPReader = new SparkNLPReader()
* val txtDf = sparkNLPReader.txt(filePath)
* }}}
*
* ==Example 2==
* You can use SparkNLP for one line of code
* {{{
* val txtDf = SparkNLP.read.txt(filePath)
* }}}
*
* {{{
* txtDf.select("txt").show(false)
* +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
* |txt |
* +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
* |[{Title, BIG DATA ANALYTICS, {paragraph -> 0}}, {NarrativeText, Apache Spark is a fast and general-purpose cluster computing system.\nIt provides high-level APIs in Java, Scala, Python, and R., {paragraph -> 0}}, {Title, MACHINE LEARNING, {paragraph -> 1}}, {NarrativeText, Spark's MLlib provides scalable machine learning algorithms.\nIt includes tools for classification, regression, clustering, and more., {paragraph -> 1}}]|
* +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
*
* emailDf.printSchema()
* root
* |-- path: string (nullable = true)
* |-- content: binary (nullable = true)
* |-- txt: array (nullable = true)
* | |-- element: struct (containsNull = true)
* | | |-- elementType: string (nullable = true)
* | | |-- content: string (nullable = true)
* | | |-- metadata: map (nullable = true)
* | | | |-- key: string
* | | | |-- value: string (valueContainsNull = true)
* }}}
*
* @param params
* Parameter with custom configuration
*/
def txt(filePath: String): DataFrame = {
val textReader = new TextReader(getTitleLengthSize)
textReader.txt(filePath)
}

private def getTitleLengthSize: Int = {
val titleLengthSize =
try {
params.asScala.getOrElse("titleLengthSize", "50").toInt
} catch {
case _: IllegalArgumentException => 50
}

titleLengthSize
}

}
110 changes: 110 additions & 0 deletions src/main/scala/com/johnsnowlabs/reader/TextReader.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright 2017-2025 John Snow Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.johnsnowlabs.reader

import com.johnsnowlabs.nlp.util.io.ResourceHelper
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.udf

import scala.collection.mutable

class TextReader(titleLengthSize: Int = 50) extends Serializable {

private val spark = ResourceHelper.spark
import spark.implicits._

/** Parses TXT files and returns a DataFrame.
*
* The DataFrame will contain:
* - "path": the file path,
* - "content": the raw text content,
* - "txt": a Seq[HTMLElement] containing the parsed elements.
*/
def txt(filePath: String): DataFrame = {
if (ResourceHelper.validFile(filePath)) {
val textFilesRDD = spark.sparkContext.wholeTextFiles(filePath)
textFilesRDD
.toDF("path", "content")
.withColumn("txt", parseTxtUDF($"content"))
} else {
throw new IllegalArgumentException(s"Invalid filePath: $filePath")
}
}

private val parseTxtUDF = udf((text: String) => parseTxt(text))

/** Parses the given text into a sequence of HTMLElements.
*
* Parsing logic:
* - Split the text into blocks using a delimiter of two or more consecutive newlines.
* - Using heuristics, consider a block a title if it is all uppercase and short.
* - If a block is a title candidate and the following block exists and is not a title
* candidate, treat the first as the Title and the second as its NarrativeText.
* - Otherwise, treat blocks as narrative text.
* - Omit any element with empty content.
*/
private def parseTxt(text: String): Seq[HTMLElement] = {
val blocks = text.split("\\n\\n+").map(_.trim).filter(_.nonEmpty)
val elements = mutable.ArrayBuffer[HTMLElement]()
var i = 0
while (i < blocks.length) {
val currentBlock = blocks(i)
if (isTitleCandidate(currentBlock)) {
elements += HTMLElement(
"Title",
currentBlock,
mutable.Map("paragraph" -> (i / 2).toString))
if (i + 1 < blocks.length && !isTitleCandidate(blocks(i + 1))) {
val narrative = blocks(i + 1)
if (narrative.nonEmpty) {
elements += HTMLElement(
"NarrativeText",
narrative,
mutable.Map("paragraph" -> (i / 2).toString))
}
i += 2
} else {
i += 1
}
} else {
elements += HTMLElement(
"NarrativeText",
currentBlock,
mutable.Map("paragraph" -> (i / 2).toString))
i += 1
}
}
elements
}

/** Heuristic function to determine if a given line/block is a title candidate.
*
* Currently, we consider a block a title candidate if:
* - It is non-empty.
* - It consists mostly of uppercase letters (ignoring non-letter characters).
* - It is relatively short (e.g., 50 characters or fewer).
*/
private def isTitleCandidate(text: String): Boolean = {
val trimmed = text.trim
if (trimmed.isEmpty) return false
val isAllUpper = trimmed.forall(c => !c.isLetter || c.isUpper)
val isTitleCase = trimmed.split("\\s+").forall(word => word.headOption.exists(_.isUpper))
val isShort = trimmed.length <= 50
val hasLetters = trimmed.exists(_.isLetter)
(isAllUpper || isTitleCase) && isShort && hasLetters
}

}
9 changes: 9 additions & 0 deletions src/test/resources/reader/txt/simple-text.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
BIG DATA ANALYTICS

Apache Spark is a fast and general-purpose cluster computing system.
It provides high-level APIs in Java, Scala, Python, and R.

MACHINE LEARNING

Spark's MLlib provides scalable machine learning algorithms.
It includes tools for classification, regression, clustering, and more.
34 changes: 34 additions & 0 deletions src/test/scala/com/johnsnowlabs/reader/TextReaderTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2017-2025 John Snow Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.johnsnowlabs.reader

import com.johnsnowlabs.tags.FastTest
import org.apache.spark.sql.functions.col
import org.scalatest.flatspec.AnyFlatSpec

class TextReaderTest extends AnyFlatSpec {

val txtDirectory = "src/test/resources/reader/txt/"

"Text Reader" should "read a directory of text files" taggedAs FastTest in {
val textReader = new TextReader()
val textDf = textReader.txt(s"$txtDirectory/simple-text.txt")
textDf.select("txt").show(false)

assert(!textDf.select(col("txt").getItem(0)).isEmpty)
}

}

0 comments on commit e9c8493

Please sign in to comment.