diff --git a/docs/sql-data-sources-load-save-functions.md b/docs/sql-data-sources-load-save-functions.md
index 0866f37fbdc71..25df34ef5b008 100644
--- a/docs/sql-data-sources-load-save-functions.md
+++ b/docs/sql-data-sources-load-save-functions.md
@@ -105,9 +105,11 @@ To load a CSV file you can use:
The extra options are also used during write operation.
For example, you can control bloom filters and dictionary encodings for ORC data sources.
The following ORC example will create bloom filter and use dictionary encoding only for `favorite_color`.
-For Parquet, there exists `parquet.enable.dictionary`, too.
+For Parquet, there exists `parquet.bloom.filter.enabled` and `parquet.enable.dictionary`, too.
To find more detailed information about the extra ORC/Parquet options,
-visit the official Apache ORC/Parquet websites.
+visit the official Apache [ORC](https://orc.apache.org/docs/spark-config.html) / [Parquet](https://github.com/apache/parquet-mr/tree/master/parquet-hadoop) websites.
+
+ORC data source:
@@ -146,6 +148,46 @@ OPTIONS (
+Parquet data source:
+
+
+
+
+{% include_example manual_save_options_parquet scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
+
+
+
+{% include_example manual_save_options_parquet java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
+
+
+
+{% include_example manual_save_options_parquet python/sql/datasource.py %}
+
+
+
+{% include_example manual_save_options_parquet r/RSparkSQLExample.R %}
+
+
+
+
+{% highlight sql %}
+CREATE TABLE users_with_options (
+ name STRING,
+ favorite_color STRING,
+ favorite_numbers array
+) USING parquet
+OPTIONS (
+ `parquet.bloom.filter.enabled#favorite_color` true,
+ `parquet.bloom.filter.expected.ndv#favorite_color` 1000000,
+ parquet.enable.dictionary true,
+ parquet.page.write-checksum.enabled true
+)
+{% endhighlight %}
+
+
+
+
+
### Run SQL on files directly
Instead of using read API to load a file into DataFrame and query it, you can also query that
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
index 53eb8fd355a53..5dcf321a4c830 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
@@ -195,6 +195,14 @@ private static void runBasicDataSourceExample(SparkSession spark) {
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc");
// $example off:manual_save_options_orc$
+ // $example on:manual_save_options_parquet$
+ usersDF.write().format("parquet")
+ .option("parquet.bloom.filter.enabled#favorite_color", "true")
+ .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
+ .option("parquet.enable.dictionary", "true")
+ .option("parquet.page.write-checksum.enabled", "false")
+ .save("users_with_options.parquet");
+ // $example off:manual_save_options_parquet$
// $example on:direct_sql$
Dataset sqlDF =
spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
diff --git a/examples/src/main/python/sql/datasource.py b/examples/src/main/python/sql/datasource.py
index f3ad65f5a7a5f..4d7aa045b4b87 100644
--- a/examples/src/main/python/sql/datasource.py
+++ b/examples/src/main/python/sql/datasource.py
@@ -126,6 +126,16 @@ def basic_datasource_example(spark):
.save("users_with_options.orc"))
# $example off:manual_save_options_orc$
+ # $example on:manual_save_options_parquet$
+ df = spark.read.parquet("examples/src/main/resources/users.parquet")
+ (df.write.format("parquet")
+ .option("parquet.bloom.filter.enabled#favorite_color", "true")
+ .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
+ .option("parquet.enable.dictionary", "true")
+ .option("parquet.page.write-checksum.enabled", "false")
+ .save("users_with_options.parquet"))
+ # $example off:manual_save_options_parquet$
+
# $example on:write_sorting_and_bucketing$
df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
# $example off:write_sorting_and_bucketing$
diff --git a/examples/src/main/r/RSparkSQLExample.R b/examples/src/main/r/RSparkSQLExample.R
index 86ad5334248bc..15118e118ab3a 100644
--- a/examples/src/main/r/RSparkSQLExample.R
+++ b/examples/src/main/r/RSparkSQLExample.R
@@ -157,6 +157,11 @@ df <- read.df("examples/src/main/resources/users.orc", "orc")
write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name")
# $example off:manual_save_options_orc$
+# $example on:manual_save_options_parquet$
+df <- read.df("examples/src/main/resources/users.parquet", "parquet")
+write.parquet(df, "users_with_options.parquet", parquet.bloom.filter.enabled#favorite_color = true, parquet.bloom.filter.expected.ndv#favorite_color = 1000000, parquet.enable.dictionary = true, parquet.page.write-checksum.enabled = false)
+# $example off:manual_save_options_parquet$
+
# $example on:direct_sql$
df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
# $example off:direct_sql$
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
index 207961bbea325..6bd2bd6d3bf5e 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
@@ -129,6 +129,14 @@ object SQLDataSourceExample {
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc")
// $example off:manual_save_options_orc$
+ // $example on:manual_save_options_parquet$
+ usersDF.write.format("parquet")
+ .option("parquet.bloom.filter.enabled#favorite_color", "true")
+ .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
+ .option("parquet.enable.dictionary", "true")
+ .option("parquet.page.write-checksum.enabled", "false")
+ .save("users_with_options.parquet")
+ // $example off:manual_save_options_parquet$
// $example on:direct_sql$
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 329a3e4983792..94bda56bc8738 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -1634,6 +1634,35 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}
}
}
+
+ test("SPARK-34562: Bloom filter push down") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ spark.range(100).selectExpr("id * 2 AS id")
+ .write
+ .option(ParquetOutputFormat.BLOOM_FILTER_ENABLED + "#id", true)
+ // Disable dictionary because the distinct values less than 40000.
+ .option(ParquetOutputFormat.ENABLE_DICTIONARY, false)
+ .parquet(path)
+
+ Seq(true, false).foreach { bloomFilterEnabled =>
+ withSQLConf(ParquetInputFormat.BLOOM_FILTERING_ENABLED -> bloomFilterEnabled.toString) {
+ val accu = new NumRowGroupsAcc
+ sparkContext.register(accu)
+
+ val df = spark.read.parquet(path).filter("id = 19")
+ df.foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0)))
+ if (bloomFilterEnabled) {
+ assert(accu.value === 0)
+ } else {
+ assert(accu.value > 0)
+ }
+
+ AccumulatorContext.remove(accu.id)
+ }
+ }
+ }
+ }
}
@ExtendedSQLTest