From b2b4cdcb3e07f679498d5b29beafcc7bc398fc8d Mon Sep 17 00:00:00 2001 From: Xavier GUIHOT Date: Thu, 8 Feb 2018 19:18:49 +0100 Subject: [PATCH] Add a custom sc.textFile which return an RDD of records associated to the file path they come from --- README.md | 8 +-- build.sbt | 2 +- docs/com/spark_helper/SparkHelper$.html | 42 ++++++++++++- docs/com/spark_helper/package.html | 6 +- docs/index/index-t.html | 3 + .../scala/com/spark_helper/SparkHelper.scala | 63 ++++++++++++++++++- .../com/spark_helper/SparkHelperTest.scala | 61 +++++++++++++++++- 7 files changed, 174 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index c4ff407..0365781 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ ## Overview -Version: 1.0.20 +Version: 1.0.21 API Scaladoc: [SparkHelper](http://xavierguihot.com/spark_helper/#com.spark_helper.SparkHelper$) @@ -131,7 +131,7 @@ With sbt, add these lines to your build.sbt: ```scala resolvers += "jitpack" at "https://jitpack.io" -libraryDependencies += "com.github.xavierguihot" % "spark_helper" % "v1.0.20" +libraryDependencies += "com.github.xavierguihot" % "spark_helper" % "v1.0.21" ``` With maven, add these lines to your pom.xml: @@ -147,7 +147,7 @@ With maven, add these lines to your pom.xml: com.github.xavierguihot spark_helper - v1.0.20 + v1.0.21 ``` @@ -161,7 +161,7 @@ allprojects { } dependencies { - compile 'com.github.xavierguihot:spark_helper:v1.0.20' + compile 'com.github.xavierguihot:spark_helper:v1.0.21' } ``` diff --git a/build.sbt b/build.sbt index 43594bd..24d76b2 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ name := "spark_helper" -version := "1.0.20" +version := "1.0.21" scalaVersion := "2.11.12" diff --git a/docs/com/spark_helper/SparkHelper$.html b/docs/com/spark_helper/SparkHelper$.html index bf988fb..f7024b1 100644 --- a/docs/com/spark_helper/SparkHelper$.html +++ b/docs/com/spark_helper/SparkHelper$.html @@ -57,7 +57,11 @@

// line, it reads records spread over several lines. // This way, xml, json, yml or any multi-line record file format can be used // with Spark: -SparkHelper.textFileWithDelimiter("/my/input/folder/path", sparkContext, "---\n")

Source "/my/input/folder/path", sparkContext, "---\n") +// Same as SparkContext.textFile, but instead of returning an RDD of +// records, it returns an RDD of tuples containing both the record and the +// path of the file it comes from: +SparkHelper.textFileWithFileName("folder", sparkContext)

Source SparkHelper

Since

2017-02

Linear Supertypes @@ -665,6 +669,42 @@

assert(computedRecords == expectedRecords)

hdfsPath

the path of the file to read (folder or file, '*' works as well).

sparkContext

the SparkContext

delimiter

the specific record delimiter which replaces "\n"

maxRecordLength

the max length (not sure which unit) of a record before considering the record too long to fit into memory.

returns

the RDD of records

+
  • + + +

    + + + def + + + textFileWithFileName(hdfsPath: String, sparkContext: SparkContext): RDD[(String, String)] + +

    + + Permalink + + +

    Equivalent to sparkContext.textFile(), but for each line is associated +with its file path.

    Equivalent to sparkContext.textFile(), but for each line is associated +with its file path.

    Produces a RDD[(file_name, line)] which provides a way to know from which +file a given line comes from.

    // Considering this folder:
    +// folder/file_1.txt whose content is data1\ndata2\ndata3
    +// folder/file_2.txt whose content is data4\ndata4
    +// folder/folder_1/file_3.txt whose content is data6\ndata7
    +// then:
    +SparkHelper.textFileWithFileName("folder", sparkContext)
    +// will return:
    +RDD(
    +  ("file:/path/on/machine/folder/file_1.txt", "data1"),
    +  ("file:/path/on/machine/folder/file_1.txt", "data2"),
    +  ("file:/path/on/machine/folder/file_1.txt", "data3"),
    +  ("file:/path/on/machine/folder/file_2.txt", "data4"),
    +  ("file:/path/on/machine/folder/file_2.txt", "data5"),
    +  ("file:/path/on/machine/folder/folder_1/file_3.txt", "data6"),
    +  ("file:/path/on/machine/folder/folder_1/file_3.txt", "data7")
    +)
    hdfsPath

    the path of the folder (or structure of folders) to read

    sparkContext

    the SparkContext

    returns

    the RDD of records where a record is a tuple containing the path +of the file the record comes from and the record itself.

  • diff --git a/docs/com/spark_helper/package.html b/docs/com/spark_helper/package.html index 7b46059..fef8491 100644 --- a/docs/com/spark_helper/package.html +++ b/docs/com/spark_helper/package.html @@ -178,7 +178,11 @@

    // line, it reads records spread over several lines. // This way, xml, json, yml or any multi-line record file format can be used // with Spark: -SparkHelper.textFileWithDelimiter("/my/input/folder/path", sparkContext, "---\n")

    Source "/my/input/folder/path", sparkContext, "---\n") +// Same as SparkContext.textFile, but instead of returning an RDD of +// records, it returns an RDD of tuples containing both the record and the +// path of the file it comes from: +SparkHelper.textFileWithFileName("folder", sparkContext)

    Source SparkHelper

    Since

    2017-02

  • diff --git a/docs/index/index-t.html b/docs/index/index-t.html index 67f3a04..bb8d1a4 100644 --- a/docs/index/index-t.html +++ b/docs/index/index-t.html @@ -19,6 +19,9 @@
    textFileWithDelimiter
    +
    +
    textFileWithFileName
    +
    thresholdType
    diff --git a/src/main/scala/com/spark_helper/SparkHelper.scala b/src/main/scala/com/spark_helper/SparkHelper.scala index e630a77..7237260 100644 --- a/src/main/scala/com/spark_helper/SparkHelper.scala +++ b/src/main/scala/com/spark_helper/SparkHelper.scala @@ -1,14 +1,14 @@ package com.spark_helper import org.apache.spark.{HashPartitioner, SparkContext} -import org.apache.spark.rdd.RDD - +import org.apache.spark.rdd.{RDD, HadoopRDD} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.io.{LongWritable, NullWritable, Text} import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat import org.apache.hadoop.mapreduce.lib.input.TextInputFormat +import org.apache.hadoop.mapred.{FileSplit, TextInputFormat => TextInputFormat2} import scala.util.Random @@ -28,6 +28,10 @@ import scala.util.Random * // This way, xml, json, yml or any multi-line record file format can be used * // with Spark: * SparkHelper.textFileWithDelimiter("/my/input/folder/path", sparkContext, "---\n") + * // Same as SparkContext.textFile, but instead of returning an RDD of + * // records, it returns an RDD of tuples containing both the record and the + * // path of the file it comes from: + * SparkHelper.textFileWithFileName("folder", sparkContext) * }}} * * Source + val file = inputSplit.asInstanceOf[FileSplit] + iterator.map(tpl => (file.getPath.toString, tpl._2.toString)) + } + } + ////// // Internal core: ////// diff --git a/src/test/scala/com/spark_helper/SparkHelperTest.scala b/src/test/scala/com/spark_helper/SparkHelperTest.scala index 932faa2..8d04bc5 100644 --- a/src/test/scala/com/spark_helper/SparkHelperTest.scala +++ b/src/test/scala/com/spark_helper/SparkHelperTest.scala @@ -1,6 +1,6 @@ package com.spark_helper -import com.holdenkarau.spark.testing.SharedSparkContext +import com.holdenkarau.spark.testing.{SharedSparkContext, RDDComparisons} import org.scalatest.FunSuite @@ -9,7 +9,10 @@ import org.scalatest.FunSuite * @author Xavier Guihot * @since 2017-02 */ -class SparkHelperTest extends FunSuite with SharedSparkContext { +class SparkHelperTest + extends FunSuite + with SharedSparkContext + with RDDComparisons { test("Save as Single Text File") { @@ -252,4 +255,58 @@ class SparkHelperTest extends FunSuite with SharedSparkContext { HdfsHelper.deleteFolder("src/test/resources/re_coalescence_test_output") } + + test( + "Extract lines of files to an RDD of tuple containing the line and file " + + "the line comes from") { + + HdfsHelper.deleteFolder("src/test/resources/with_file_name") + HdfsHelper.writeToHdfsFile( + "data_1_a\ndata_1_b\ndata_1_c", + "src/test/resources/with_file_name/file_1.txt") + HdfsHelper.writeToHdfsFile( + "data_2_a\ndata_2_b", + "src/test/resources/with_file_name/file_2.txt") + HdfsHelper.writeToHdfsFile( + "data_3_a\ndata_3_b\ndata_3_c\ndata_3_d", + "src/test/resources/with_file_name/folder_1/file_3.txt") + + val computedRdd = SparkHelper + .textFileWithFileName("src/test/resources/with_file_name", sc) + // We remove the part of the path which is specific to the local machine + // on which the test run: + .map { + case (filePath, line) => + val nonLocalPath = filePath.split("src/test/") match { + case Array(localPartOfPath, projectRelativePath) => + "file:/.../src/test/" + projectRelativePath + } + (nonLocalPath, line) + } + + val expectedRDD = sc.parallelize( + Array( + ("file:/.../src/test/resources/with_file_name/file_1.txt", "data_1_a"), + ("file:/.../src/test/resources/with_file_name/file_1.txt", "data_1_b"), + ("file:/.../src/test/resources/with_file_name/file_1.txt", "data_1_c"), + ( + "file:/.../src/test/resources/with_file_name/folder_1/file_3.txt", + "data_3_a"), + ( + "file:/.../src/test/resources/with_file_name/folder_1/file_3.txt", + "data_3_b"), + ( + "file:/.../src/test/resources/with_file_name/folder_1/file_3.txt", + "data_3_c"), + ( + "file:/.../src/test/resources/with_file_name/folder_1/file_3.txt", + "data_3_d"), + ("file:/.../src/test/resources/with_file_name/file_2.txt", "data_2_a"), + ("file:/.../src/test/resources/with_file_name/file_2.txt", "data_2_b") + )) + + assertRDDEquals(computedRdd, expectedRDD) + + HdfsHelper.deleteFolder("src/test/resources/with_file_name") + } }