Skip to content

Commit

Permalink
Hotfix: issue 150 (#151)
Browse files Browse the repository at this point in the history
* Remove unused code (#141)

* Revert "Setting version to 0.3.4-SNAPSHOT"

This reverts commit 2f1d7be.

* README: update to 0.3.3

* README: fix javadoc badge

* remove unused param

* [sbt] version updates

* [conf] allow not_analyzed string fields (#145)

* [not-analyzed-fields] do not analyzed fields ending with _notanalyzed

* [hotfix] fixes issue 150

* [tests] issue 150

* fix typo

* [blockEntityLinkage] drop queryPartColumns
  • Loading branch information
zouzias committed Mar 11, 2019
1 parent 32aa70b commit e9dcec1
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 14 deletions.
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ You can link against this library (for Spark 1.4+) in your program at the follow
Using SBT:

```
libraryDependencies += "org.zouzias" %% "spark-lucenerdd" % "0.3.4"
libraryDependencies += "org.zouzias" %% "spark-lucenerdd" % "0.3.5"
```

Using Maven:
Expand All @@ -57,15 +57,15 @@ Using Maven:
<dependency>
<groupId>org.zouzias</groupId>
<artifactId>spark-lucenerdd_2.11</artifactId>
<version>0.3.4</version>
<version>0.3.5</version>
</dependency>
```

This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
For example, to include it when starting the spark shell:

```
$ bin/spark-shell --packages org.zouzias:spark-lucenerdd_2.11:0.3.4
$ bin/spark-shell --packages org.zouzias:spark-lucenerdd_2.11:0.3.5
```

Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
Expand All @@ -76,7 +76,8 @@ The project has the following compatibility with Apache Spark:

Artifact | Release Date | Spark compatibility | Notes | Status
------------------------- | --------------- | -------------------------- | ----- | ----
0.3.5-SNAPSHOT | | >= 2.3.1, JVM 8 | [develop](https://github.com/zouzias/spark-lucenerdd/tree/develop) | Under Development
0.3.6-SNAPSHOT | | >= 2.3.1, JVM 8 | [develop](https://github.com/zouzias/spark-lucenerdd/tree/develop) | Under Development
0.3.5 | 2019-02-7 | >= 2.4.0, JVM 8 | [tag v0.3.5](https://github.com/zouzias/spark-lucenerdd/tree/v0.3.5) | Released
0.3.4 | 2018-11-27 | >= 2.4.0, JVM 8 | [tag v0.3.4](https://github.com/zouzias/spark-lucenerdd/tree/v0.3.4) | Released
0.2.8 | 2017-05-30 | 2.1.x, JVM 7 | [tag v0.2.8](https://github.com/zouzias/spark-lucenerdd/tree/v0.2.8) | Released
0.1.0 | 2016-09-26 | 1.4.x, 1.5.x, 1.6.x| [tag v0.1.0](https://github.com/zouzias/spark-lucenerdd/tree/v0.1.0) | Cross-released with 2.10/2.11
Expand Down
13 changes: 7 additions & 6 deletions src/main/scala/org/zouzias/spark/lucenerdd/LuceneRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -496,17 +496,18 @@ object LuceneRDD extends Versionable
"Query Partition columns must be non-empty for block linkage")


val partColumn = "__PARTITION_COLUMN__"
val partColumnLeft = "__PARTITION_COLUMN_LEFT__"
val partColumnRight = "__PARTITION_COLUMN_RIGHT__"

// Prepare input DataFrames for cogroup operation.
// Keyed them on queryPartColumns and entityPartColumns
// I.e., Query/Entity DataFrame are now of type (String, Row)
val blocked = entities.withColumn(partColumn,
concat(entityPartColumns.map(entities.col): _*))
.rdd.keyBy(x => x.getString(x.fieldIndex(partColumn)))
val blockedQueries = queries.withColumn(partColumn,
val blocked = entities.withColumn(partColumnLeft,
concat(entityPartColumns.map(entities.col): _*))
.rdd.keyBy(x => x.getString(x.fieldIndex(partColumn)))
.rdd.keyBy(x => x.getString(x.fieldIndex(partColumnLeft)))
val blockedQueries = queries.withColumn(partColumnRight,
concat(queryPartColumns.map(queries.col): _*)).drop(queryPartColumns: _*)
.rdd.keyBy(x => x.getString(x.fieldIndex(partColumnRight)))

// Cogroup queries and entities. Map over each
// CoGrouped partition and instantiate Lucene index on partitioned
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,21 @@ class BlockingLinkageSpec extends FlatSpec
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

val people: Array[Person] = Array("fear", "death", "water", "fire", "house")
val peopleLeft: Array[Person] = Array("fear", "death", "water", "fire", "house")
.zipWithIndex.map { case (str, index) =>
val email = if (index % 2 == 0) "[email protected]" else "[email protected]"
Person(str, index, email)
}
val df = sc.parallelize(people).repartition(2).toDF()

val peopleRight: Array[Person] = Array("fear", "death", "water", "fire", "house")
.zipWithIndex.map { case (str, index) =>
val email = if (index % 2 == 0) "[email protected]" else "[email protected]"
Person(str, index, email)
}

val leftDF = sc.parallelize(peopleLeft).repartition(2).toDF()
val rightDF = sc.parallelize(peopleRight).repartition(3).toDF()


val linker: Row => String = { row =>
val name = row.getString(row.fieldIndex("name"))
Expand All @@ -51,10 +60,10 @@ class BlockingLinkageSpec extends FlatSpec
}


val linked = LuceneRDD.blockEntityLinkage(df, df, linker,
val linked = LuceneRDD.blockEntityLinkage(leftDF, rightDF, linker,
Array("email"), Array("email"))

val linkedCount, dfCount = (linked.count, df.count())
val linkedCount, dfCount = (linked.count, leftDF.count())

linkedCount should equal(dfCount)

Expand Down

0 comments on commit e9dcec1

Please sign in to comment.