Skip to content

Commit

Permalink
Add a custom sc.textFile which return an RDD of records associated to…
Browse files Browse the repository at this point in the history
… the file path they come from
  • Loading branch information
xavierguihot committed Feb 8, 2018
1 parent 23f2096 commit b2b4cdc
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 11 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
## Overview


Version: 1.0.20
Version: 1.0.21

API Scaladoc: [SparkHelper](http://xavierguihot.com/spark_helper/#com.spark_helper.SparkHelper$)

Expand Down Expand Up @@ -131,7 +131,7 @@ With sbt, add these lines to your build.sbt:
```scala
resolvers += "jitpack" at "https://jitpack.io"

libraryDependencies += "com.github.xavierguihot" % "spark_helper" % "v1.0.20"
libraryDependencies += "com.github.xavierguihot" % "spark_helper" % "v1.0.21"
```

With maven, add these lines to your pom.xml:
Expand All @@ -147,7 +147,7 @@ With maven, add these lines to your pom.xml:
<dependency>
<groupId>com.github.xavierguihot</groupId>
<artifactId>spark_helper</artifactId>
<version>v1.0.20</version>
<version>v1.0.21</version>
</dependency>
```

Expand All @@ -161,7 +161,7 @@ allprojects {
}
dependencies {
compile 'com.github.xavierguihot:spark_helper:v1.0.20'
compile 'com.github.xavierguihot:spark_helper:v1.0.21'
}
```

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name := "spark_helper"

version := "1.0.20"
version := "1.0.21"

scalaVersion := "2.11.12"

Expand Down
42 changes: 41 additions & 1 deletion docs/com/spark_helper/SparkHelper$.html
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ <h4 id="signature" class="signature">
<span class="cmt">// line, it reads records spread over several lines.</span>
<span class="cmt">// This way, xml, json, yml or any multi-line record file format can be used</span>
<span class="cmt">// with Spark:</span>
SparkHelper.textFileWithDelimiter(<span class="lit">"/my/input/folder/path"</span>, sparkContext, <span class="lit">"---\n"</span>)</pre><p>Source <a href="https://github.com/xavierguihot/spark_helper/blob/master/src
SparkHelper.textFileWithDelimiter(<span class="lit">"/my/input/folder/path"</span>, sparkContext, <span class="lit">"---\n"</span>)
<span class="cmt">// Same as SparkContext.textFile, but instead of returning an RDD of</span>
<span class="cmt">// records, it returns an RDD of tuples containing both the record and the</span>
<span class="cmt">// path of the file it comes from:</span>
SparkHelper.textFileWithFileName(<span class="lit">"folder"</span>, sparkContext)</pre><p>Source <a href="https://github.com/xavierguihot/spark_helper/blob/master/src
/main/scala/com/spark_helper/SparkHelper.scala">SparkHelper</a>
</p></div><dl class="attributes block"> <dt>Since</dt><dd><p>2017-02</p></dd></dl><div class="toggleContainer block">
<span class="toggle">Linear Supertypes</span>
Expand Down Expand Up @@ -665,6 +669,42 @@ <h4 class="signature">
assert(computedRecords == expectedRecords)</pre></div><dl class="paramcmts block"><dt class="param">hdfsPath</dt><dd class="cmt"><p>the path of the file to read (folder or file, '*' works as
well).</p></dd><dt class="param">sparkContext</dt><dd class="cmt"><p>the SparkContext</p></dd><dt class="param">delimiter</dt><dd class="cmt"><p>the specific record delimiter which replaces &quot;\n&quot;</p></dd><dt class="param">maxRecordLength</dt><dd class="cmt"><p>the max length (not sure which unit) of a record
before considering the record too long to fit into memory.</p></dd><dt>returns</dt><dd class="cmt"><p>the RDD of records</p></dd></dl></div>
</li><li name="com.spark_helper.SparkHelper#textFileWithFileName" visbl="pub" data-isabs="false" fullComment="yes" group="Ungrouped">
<a id="textFileWithFileName(hdfsPath:String,sparkContext:org.apache.spark.SparkContext):org.apache.spark.rdd.RDD[(String,String)]"></a>
<a id="textFileWithFileName(String,SparkContext):RDD[(String,String)]"></a>
<h4 class="signature">
<span class="modifier_kind">
<span class="modifier"></span>
<span class="kind">def</span>
</span>
<span class="symbol">
<span class="name">textFileWithFileName</span><span class="params">(<span name="hdfsPath">hdfsPath: <span class="extype" name="scala.Predef.String">String</span></span>, <span name="sparkContext">sparkContext: <span class="extype" name="org.apache.spark.SparkContext">SparkContext</span></span>)</span><span class="result">: <span class="extype" name="org.apache.spark.rdd.RDD">RDD</span>[(<span class="extype" name="scala.Predef.String">String</span>, <span class="extype" name="scala.Predef.String">String</span>)]</span>
</span>
</h4><span class="permalink">
<a href="../../index.html#com.spark_helper.SparkHelper$@textFileWithFileName(hdfsPath:String,sparkContext:org.apache.spark.SparkContext):org.apache.spark.rdd.RDD[(String,String)]" title="Permalink" target="_top">
<img src="../../lib/permalink.png" alt="Permalink" />
</a>
</span>
<p class="shortcomment cmt">Equivalent to sparkContext.textFile(), but for each line is associated
with its file path.</p><div class="fullcomment"><div class="comment cmt"><p>Equivalent to sparkContext.textFile(), but for each line is associated
with its file path.</p><p>Produces a RDD[(file_name, line)] which provides a way to know from which
file a given line comes from.</p><pre><span class="cmt">// Considering this folder:</span>
<span class="cmt">// folder/file_1.txt whose content is data1\ndata2\ndata3</span>
<span class="cmt">// folder/file_2.txt whose content is data4\ndata4</span>
<span class="cmt">// folder/folder_1/file_3.txt whose content is data6\ndata7</span>
<span class="cmt">// then:</span>
SparkHelper.textFileWithFileName(<span class="lit">"folder"</span>, sparkContext)
<span class="cmt">// will return:</span>
RDD(
(<span class="lit">"file:/path/on/machine/folder/file_1.txt"</span>, <span class="lit">"data1"</span>),
(<span class="lit">"file:/path/on/machine/folder/file_1.txt"</span>, <span class="lit">"data2"</span>),
(<span class="lit">"file:/path/on/machine/folder/file_1.txt"</span>, <span class="lit">"data3"</span>),
(<span class="lit">"file:/path/on/machine/folder/file_2.txt"</span>, <span class="lit">"data4"</span>),
(<span class="lit">"file:/path/on/machine/folder/file_2.txt"</span>, <span class="lit">"data5"</span>),
(<span class="lit">"file:/path/on/machine/folder/folder_1/file_3.txt"</span>, <span class="lit">"data6"</span>),
(<span class="lit">"file:/path/on/machine/folder/folder_1/file_3.txt"</span>, <span class="lit">"data7"</span>)
)</pre></div><dl class="paramcmts block"><dt class="param">hdfsPath</dt><dd class="cmt"><p>the path of the folder (or structure of folders) to read</p></dd><dt class="param">sparkContext</dt><dd class="cmt"><p>the SparkContext</p></dd><dt>returns</dt><dd class="cmt"><p>the RDD of records where a record is a tuple containing the path
of the file the record comes from and the record itself.</p></dd></dl></div>
</li><li name="scala.AnyRef#toString" visbl="pub" data-isabs="false" fullComment="yes" group="Ungrouped">
<a id="toString():String"></a>
<a id="toString():String"></a>
Expand Down
6 changes: 5 additions & 1 deletion docs/com/spark_helper/package.html
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,11 @@ <h4 class="signature">
<span class="cmt">// line, it reads records spread over several lines.</span>
<span class="cmt">// This way, xml, json, yml or any multi-line record file format can be used</span>
<span class="cmt">// with Spark:</span>
SparkHelper.textFileWithDelimiter(<span class="lit">"/my/input/folder/path"</span>, sparkContext, <span class="lit">"---\n"</span>)</pre><p>Source <a href="https://github.com/xavierguihot/spark_helper/blob/master/src
SparkHelper.textFileWithDelimiter(<span class="lit">"/my/input/folder/path"</span>, sparkContext, <span class="lit">"---\n"</span>)
<span class="cmt">// Same as SparkContext.textFile, but instead of returning an RDD of</span>
<span class="cmt">// records, it returns an RDD of tuples containing both the record and the</span>
<span class="cmt">// path of the file it comes from:</span>
SparkHelper.textFileWithFileName(<span class="lit">"folder"</span>, sparkContext)</pre><p>Source <a href="https://github.com/xavierguihot/spark_helper/blob/master/src
/main/scala/com/spark_helper/SparkHelper.scala">SparkHelper</a>
</p></div><dl class="attributes block"> <dt>Since</dt><dd><p>2017-02</p></dd></dl></div>
</li><li name="com.spark_helper.monitoring" visbl="pub" data-isabs="false" fullComment="no" group="Ungrouped">
Expand Down
3 changes: 3 additions & 0 deletions docs/index/index-t.html
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
</div><div class="entry">
<div class="name">textFileWithDelimiter</div>
<div class="occurrences"><a href="../com/spark_helper/SparkHelper$.html" class="extype" name="com.spark_helper.SparkHelper">SparkHelper</a> </div>
</div><div class="entry">
<div class="name">textFileWithFileName</div>
<div class="occurrences"><a href="../com/spark_helper/SparkHelper$.html" class="extype" name="com.spark_helper.SparkHelper">SparkHelper</a> </div>
</div><div class="entry">
<div class="name">thresholdType</div>
<div class="occurrences"><a href="../com/spark_helper/monitoring/Test.html" class="extype" name="com.spark_helper.monitoring.Test">Test</a> </div>
Expand Down
63 changes: 61 additions & 2 deletions src/main/scala/com/spark_helper/SparkHelper.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.spark_helper

import org.apache.spark.{HashPartitioner, SparkContext}
import org.apache.spark.rdd.RDD

import org.apache.spark.rdd.{RDD, HadoopRDD}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapred.{FileSplit, TextInputFormat => TextInputFormat2}

import scala.util.Random

Expand All @@ -28,6 +28,10 @@ import scala.util.Random
* // This way, xml, json, yml or any multi-line record file format can be used
* // with Spark:
* SparkHelper.textFileWithDelimiter("/my/input/folder/path", sparkContext, "---\n")
* // Same as SparkContext.textFile, but instead of returning an RDD of
* // records, it returns an RDD of tuples containing both the record and the
* // path of the file it comes from:
* SparkHelper.textFileWithFileName("folder", sparkContext)
* }}}
*
* Source <a href="https://github.com/xavierguihot/spark_helper/blob/master/src
Expand Down Expand Up @@ -497,6 +501,61 @@ object SparkHelper extends Serializable {
Some(compressionCodec))
}

/** Equivalent to sparkContext.textFile(), but for each line is associated
* with its file path.
*
* Produces a RDD[(file_name, line)] which provides a way to know from which
* file a given line comes from.
*
* {{{
* // Considering this folder:
* // folder/file_1.txt whose content is data1\ndata2\ndata3
* // folder/file_2.txt whose content is data4\ndata4
* // folder/folder_1/file_3.txt whose content is data6\ndata7
* // then:
* SparkHelper.textFileWithFileName("folder", sparkContext)
* // will return:
* RDD(
* ("file:/path/on/machine/folder/file_1.txt", "data1"),
* ("file:/path/on/machine/folder/file_1.txt", "data2"),
* ("file:/path/on/machine/folder/file_1.txt", "data3"),
* ("file:/path/on/machine/folder/file_2.txt", "data4"),
* ("file:/path/on/machine/folder/file_2.txt", "data5"),
* ("file:/path/on/machine/folder/folder_1/file_3.txt", "data6"),
* ("file:/path/on/machine/folder/folder_1/file_3.txt", "data7")
* )
* }}}
*
* @param hdfsPath the path of the folder (or structure of folders) to read
* @param sparkContext the SparkContext
* @return the RDD of records where a record is a tuple containing the path
* of the file the record comes from and the record itself.
*/
def textFileWithFileName(
hdfsPath: String,
sparkContext: SparkContext
): RDD[(String, String)] = {

// In order to go through the folder structure recursively:
sparkContext.hadoopConfiguration
.set("mapreduce.input.fileinputformat.input.dir.recursive", "true")

sparkContext
.hadoopFile(
hdfsPath,
classOf[TextInputFormat2],
classOf[LongWritable],
classOf[Text],
sparkContext.defaultMinPartitions
)
.asInstanceOf[HadoopRDD[LongWritable, Text]]
.mapPartitionsWithInputSplit {
case (inputSplit, iterator) =>
val file = inputSplit.asInstanceOf[FileSplit]
iterator.map(tpl => (file.getPath.toString, tpl._2.toString))
}
}

//////
// Internal core:
//////
Expand Down
61 changes: 59 additions & 2 deletions src/test/scala/com/spark_helper/SparkHelperTest.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.spark_helper

import com.holdenkarau.spark.testing.SharedSparkContext
import com.holdenkarau.spark.testing.{SharedSparkContext, RDDComparisons}

import org.scalatest.FunSuite

Expand All @@ -9,7 +9,10 @@ import org.scalatest.FunSuite
* @author Xavier Guihot
* @since 2017-02
*/
class SparkHelperTest extends FunSuite with SharedSparkContext {
class SparkHelperTest
extends FunSuite
with SharedSparkContext
with RDDComparisons {

test("Save as Single Text File") {

Expand Down Expand Up @@ -252,4 +255,58 @@ class SparkHelperTest extends FunSuite with SharedSparkContext {

HdfsHelper.deleteFolder("src/test/resources/re_coalescence_test_output")
}

test(
"Extract lines of files to an RDD of tuple containing the line and file " +
"the line comes from") {

HdfsHelper.deleteFolder("src/test/resources/with_file_name")
HdfsHelper.writeToHdfsFile(
"data_1_a\ndata_1_b\ndata_1_c",
"src/test/resources/with_file_name/file_1.txt")
HdfsHelper.writeToHdfsFile(
"data_2_a\ndata_2_b",
"src/test/resources/with_file_name/file_2.txt")
HdfsHelper.writeToHdfsFile(
"data_3_a\ndata_3_b\ndata_3_c\ndata_3_d",
"src/test/resources/with_file_name/folder_1/file_3.txt")

val computedRdd = SparkHelper
.textFileWithFileName("src/test/resources/with_file_name", sc)
// We remove the part of the path which is specific to the local machine
// on which the test run:
.map {
case (filePath, line) =>
val nonLocalPath = filePath.split("src/test/") match {
case Array(localPartOfPath, projectRelativePath) =>
"file:/.../src/test/" + projectRelativePath
}
(nonLocalPath, line)
}

val expectedRDD = sc.parallelize(
Array(
("file:/.../src/test/resources/with_file_name/file_1.txt", "data_1_a"),
("file:/.../src/test/resources/with_file_name/file_1.txt", "data_1_b"),
("file:/.../src/test/resources/with_file_name/file_1.txt", "data_1_c"),
(
"file:/.../src/test/resources/with_file_name/folder_1/file_3.txt",
"data_3_a"),
(
"file:/.../src/test/resources/with_file_name/folder_1/file_3.txt",
"data_3_b"),
(
"file:/.../src/test/resources/with_file_name/folder_1/file_3.txt",
"data_3_c"),
(
"file:/.../src/test/resources/with_file_name/folder_1/file_3.txt",
"data_3_d"),
("file:/.../src/test/resources/with_file_name/file_2.txt", "data_2_a"),
("file:/.../src/test/resources/with_file_name/file_2.txt", "data_2_b")
))

assertRDDEquals(computedRdd, expectedRDD)

HdfsHelper.deleteFolder("src/test/resources/with_file_name")
}
}

0 comments on commit b2b4cdc

Please sign in to comment.