Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Disable Comet shuffle with AQE coalesce partitions enabled #380

Merged
merged 6 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,17 @@ object CometConf {
.booleanConf
.createWithDefault(false)

val COMET_SHUFFLE_ENFORCE_MODE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.shuffle.enforceMode.enabled")
.doc(
"Comet shuffle doesn't support Spark AQE coalesce partitions. If AQE coalesce " +
"partitions is enabled, Comet shuffle won't be triggered even enabled. This config " +
"is used to enforce Comet to trigger shuffle even if AQE coalesce partitions is " +
"enabled. This is for testing purpose only.")
.internal()
.booleanConf
.createWithDefault(false)

val COMET_EXEC_BROADCAST_FORCE_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.broadcast.enabled")
.doc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,10 @@ object CometSparkSessionExtensions extends Logging {
private[comet] def isCometShuffleEnabled(conf: SQLConf): Boolean =
COMET_EXEC_SHUFFLE_ENABLED.get(conf) &&
(conf.contains("spark.shuffle.manager") && conf.getConfString("spark.shuffle.manager") ==
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") &&
// TODO: AQE coalesce partitions feature causes Comet shuffle memory leak.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coalesce partitions is a grate feature of AQE, which is enabled by default in Spark. It would be better to handle the combined case in Comet Shuffle rather than disable Comet Shuffle when Coalesce partitions is enabled. Do you have any clue why there's memory leak?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned in the ticket, when coalesce partitions is enabled, Spark will combine the partitions of multiple reducers. I suspect that causes incorrect format to read for Java Arrow StreamReader.

We should address this issue further to unblock Comet shuffle with coalesce partitions. But for now, I think it is better to disable it temporarily for the cases we know it will cause some issues.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for putting this behind a config and disabling by default until we have a solution

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an issue logged for the followup?

Copy link
Contributor

@advancedxy advancedxy May 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But for now, I think it is better to disable it temporarily for the cases we know it will cause some issues.

Ah, yeah. It makes sense to disable it first if it takes a lot of time and resources to debug and fix later on.

The issue you described seems like a similar problem I encountered when adding support for CometRowToColumnar in #206. So I just went ahead and did a quick investigation based on your branch. It seems that we cannot close the allocator prematurely as the record might still be used in the native side, see these comments for more details: https://github.com/apache/datafusion-comet/pull/206/files#diff-04037044481f9a656275a63ebb6a3a63badf866f19700d4a6909d2e17c8d7b72R37-R46

I also submit a new commit(advancedxy@48517ef) as a potential fix in my branch, hoping that helps you out. The test code is just modified for demonstration purposes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an issue logged for the followup?

Let me create one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #387 to track it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @advancedxy. I debugged this issue but didn't find a quick fix so decided to disable it temporarily.

I took a look at your branch. The Java allocator instance will report the memory leak when getting closed if it has allocated memory size is larger then zero. So as you did, if we find there is non-zero number (getAllocatedMemory > 0), we don't close the allocator, it won't report that.

However, I'm not sure if it is correct fix and if we will ignore real memory leak. Maybe it is a false positive one. But If it is real memory leak and we ignore it, it will be a potential issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The allocator is force closed at two places: task completion callback and the CompletionIterator per input stream. The memory leak issue should be reported if the arrow buffers are not released in these two places.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it is a false positive one. But If it is real memory leak and we ignore it, it will be a potential issue.

Ah, yeah. I’m not 100 percent sure that the memory leak report is a false positive as I haven’t verified at the native side with jvm running(it might be quite tricky). Based on previous experience, the allocator could be closed without failure at task completion though.

😂😂, it comes back to our previous conclusion that we may need to bridge the java side with arrow-rs instead of arrow-java in the long-term. The allocator API in the arrow-java is easy to misuse.

// We should disable Comet shuffle when AQE coalesce partitions is enabled.
(!conf.coalesceShufflePartitionsEnabled || COMET_SHUFFLE_ENFORCE_MODE_ENABLED.get())

private[comet] def isCometScanEnabled(conf: SQLConf): Boolean = {
COMET_SCAN_ENABLED.get(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@
: : :- * SortMergeJoin LeftAnti (19)
: : : :- * Project (13)
: : : : +- * SortMergeJoin LeftSemi (12)
: : : : :- * ColumnarToRow (6)
: : : : : +- CometSort (5)
: : : : : +- CometExchange (4)
: : : : :- * Sort (6)
: : : : : +- Exchange (5)
: : : : : +- * ColumnarToRow (4)
: : : : : +- CometProject (3)
: : : : : +- CometFilter (2)
: : : : : +- CometScan parquet spark_catalog.default.catalog_sales (1)
: : : : +- * ColumnarToRow (11)
: : : : +- CometSort (10)
: : : : +- CometExchange (9)
: : : : +- * Sort (11)
: : : : +- Exchange (10)
: : : : +- * ColumnarToRow (9)
: : : : +- CometProject (8)
: : : : +- CometScan parquet spark_catalog.default.catalog_sales (7)
: : : +- * ColumnarToRow (18)
: : : +- CometSort (17)
: : : +- CometExchange (16)
: : : +- * Sort (18)
: : : +- Exchange (17)
: : : +- * ColumnarToRow (16)
: : : +- CometProject (15)
: : : +- CometScan parquet spark_catalog.default.catalog_returns (14)
: : +- BroadcastExchange (24)
Expand Down Expand Up @@ -61,16 +61,16 @@ Condition : ((isnotnull(cs_ship_date_sk#1) AND isnotnull(cs_ship_addr_sk#2)) AND
Input [8]: [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_warehouse_sk#4, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7, cs_sold_date_sk#8]
Arguments: [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_warehouse_sk#4, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7], [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_warehouse_sk#4, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7]

(4) CometExchange
(4) ColumnarToRow [codegen id : 1]
Input [7]: [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_warehouse_sk#4, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7]
Arguments: hashpartitioning(cs_order_number#5, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=1]

(5) CometSort
(5) Exchange
Input [7]: [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_warehouse_sk#4, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7]
Arguments: [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_warehouse_sk#4, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7], [cs_order_number#5 ASC NULLS FIRST]
Arguments: hashpartitioning(cs_order_number#5, 5), ENSURE_REQUIREMENTS, [plan_id=1]

(6) ColumnarToRow [codegen id : 1]
(6) Sort [codegen id : 2]
Input [7]: [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_warehouse_sk#4, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7]
Arguments: [cs_order_number#5 ASC NULLS FIRST], false, 0

(unknown) Scan parquet spark_catalog.default.catalog_sales
Output [3]: [cs_warehouse_sk#9, cs_order_number#10, cs_sold_date_sk#11]
Expand All @@ -82,24 +82,24 @@ ReadSchema: struct<cs_warehouse_sk:int,cs_order_number:int>
Input [3]: [cs_warehouse_sk#9, cs_order_number#10, cs_sold_date_sk#11]
Arguments: [cs_warehouse_sk#9, cs_order_number#10], [cs_warehouse_sk#9, cs_order_number#10]

(9) CometExchange
(9) ColumnarToRow [codegen id : 3]
Input [2]: [cs_warehouse_sk#9, cs_order_number#10]
Arguments: hashpartitioning(cs_order_number#10, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=2]

(10) CometSort
(10) Exchange
Input [2]: [cs_warehouse_sk#9, cs_order_number#10]
Arguments: [cs_warehouse_sk#9, cs_order_number#10], [cs_order_number#10 ASC NULLS FIRST]
Arguments: hashpartitioning(cs_order_number#10, 5), ENSURE_REQUIREMENTS, [plan_id=2]

(11) ColumnarToRow [codegen id : 2]
(11) Sort [codegen id : 4]
Input [2]: [cs_warehouse_sk#9, cs_order_number#10]
Arguments: [cs_order_number#10 ASC NULLS FIRST], false, 0

(12) SortMergeJoin [codegen id : 3]
(12) SortMergeJoin [codegen id : 5]
Left keys [1]: [cs_order_number#5]
Right keys [1]: [cs_order_number#10]
Join type: LeftSemi
Join condition: NOT (cs_warehouse_sk#4 = cs_warehouse_sk#9)

(13) Project [codegen id : 3]
(13) Project [codegen id : 5]
Output [6]: [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7]
Input [7]: [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_warehouse_sk#4, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7]

Expand All @@ -113,18 +113,18 @@ ReadSchema: struct<cr_order_number:int>
Input [2]: [cr_order_number#12, cr_returned_date_sk#13]
Arguments: [cr_order_number#12], [cr_order_number#12]

(16) CometExchange
(16) ColumnarToRow [codegen id : 6]
Input [1]: [cr_order_number#12]
Arguments: hashpartitioning(cr_order_number#12, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=3]

(17) CometSort
(17) Exchange
Input [1]: [cr_order_number#12]
Arguments: [cr_order_number#12], [cr_order_number#12 ASC NULLS FIRST]
Arguments: hashpartitioning(cr_order_number#12, 5), ENSURE_REQUIREMENTS, [plan_id=3]

(18) ColumnarToRow [codegen id : 4]
(18) Sort [codegen id : 7]
Input [1]: [cr_order_number#12]
Arguments: [cr_order_number#12 ASC NULLS FIRST], false, 0

(19) SortMergeJoin [codegen id : 8]
(19) SortMergeJoin [codegen id : 11]
Left keys [1]: [cs_order_number#5]
Right keys [1]: [cr_order_number#12]
Join type: LeftAnti
Expand All @@ -145,20 +145,20 @@ Condition : (((isnotnull(d_date#15) AND (d_date#15 >= 2002-02-01)) AND (d_date#1
Input [2]: [d_date_sk#14, d_date#15]
Arguments: [d_date_sk#14], [d_date_sk#14]

(23) ColumnarToRow [codegen id : 5]
(23) ColumnarToRow [codegen id : 8]
Input [1]: [d_date_sk#14]

(24) BroadcastExchange
Input [1]: [d_date_sk#14]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=4]

(25) BroadcastHashJoin [codegen id : 8]
(25) BroadcastHashJoin [codegen id : 11]
Left keys [1]: [cs_ship_date_sk#1]
Right keys [1]: [d_date_sk#14]
Join type: Inner
Join condition: None

(26) Project [codegen id : 8]
(26) Project [codegen id : 11]
Output [5]: [cs_ship_addr_sk#2, cs_call_center_sk#3, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7]
Input [7]: [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7, d_date_sk#14]

Expand All @@ -177,20 +177,20 @@ Condition : ((isnotnull(ca_state#17) AND (ca_state#17 = GA)) AND isnotnull(ca_ad
Input [2]: [ca_address_sk#16, ca_state#17]
Arguments: [ca_address_sk#16], [ca_address_sk#16]

(30) ColumnarToRow [codegen id : 6]
(30) ColumnarToRow [codegen id : 9]
Input [1]: [ca_address_sk#16]

(31) BroadcastExchange
Input [1]: [ca_address_sk#16]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=5]

(32) BroadcastHashJoin [codegen id : 8]
(32) BroadcastHashJoin [codegen id : 11]
Left keys [1]: [cs_ship_addr_sk#2]
Right keys [1]: [ca_address_sk#16]
Join type: Inner
Join condition: None

(33) Project [codegen id : 8]
(33) Project [codegen id : 11]
Output [4]: [cs_call_center_sk#3, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7]
Input [6]: [cs_ship_addr_sk#2, cs_call_center_sk#3, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7, ca_address_sk#16]

Expand All @@ -209,38 +209,38 @@ Condition : ((isnotnull(cc_county#19) AND (cc_county#19 = Williamson County)) AN
Input [2]: [cc_call_center_sk#18, cc_county#19]
Arguments: [cc_call_center_sk#18], [cc_call_center_sk#18]

(37) ColumnarToRow [codegen id : 7]
(37) ColumnarToRow [codegen id : 10]
Input [1]: [cc_call_center_sk#18]

(38) BroadcastExchange
Input [1]: [cc_call_center_sk#18]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=6]

(39) BroadcastHashJoin [codegen id : 8]
(39) BroadcastHashJoin [codegen id : 11]
Left keys [1]: [cs_call_center_sk#3]
Right keys [1]: [cc_call_center_sk#18]
Join type: Inner
Join condition: None

(40) Project [codegen id : 8]
(40) Project [codegen id : 11]
Output [3]: [cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7]
Input [5]: [cs_call_center_sk#3, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7, cc_call_center_sk#18]

(41) HashAggregate [codegen id : 8]
(41) HashAggregate [codegen id : 11]
Input [3]: [cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7]
Keys [1]: [cs_order_number#5]
Functions [2]: [partial_sum(UnscaledValue(cs_ext_ship_cost#6)), partial_sum(UnscaledValue(cs_net_profit#7))]
Aggregate Attributes [2]: [sum(UnscaledValue(cs_ext_ship_cost#6))#20, sum(UnscaledValue(cs_net_profit#7))#21]
Results [3]: [cs_order_number#5, sum#22, sum#23]

(42) HashAggregate [codegen id : 8]
(42) HashAggregate [codegen id : 11]
Input [3]: [cs_order_number#5, sum#22, sum#23]
Keys [1]: [cs_order_number#5]
Functions [2]: [merge_sum(UnscaledValue(cs_ext_ship_cost#6)), merge_sum(UnscaledValue(cs_net_profit#7))]
Aggregate Attributes [2]: [sum(UnscaledValue(cs_ext_ship_cost#6))#20, sum(UnscaledValue(cs_net_profit#7))#21]
Results [3]: [cs_order_number#5, sum#22, sum#23]

(43) HashAggregate [codegen id : 8]
(43) HashAggregate [codegen id : 11]
Input [3]: [cs_order_number#5, sum#22, sum#23]
Keys: []
Functions [3]: [merge_sum(UnscaledValue(cs_ext_ship_cost#6)), merge_sum(UnscaledValue(cs_net_profit#7)), partial_count(distinct cs_order_number#5)]
Expand All @@ -251,7 +251,7 @@ Results [3]: [sum#22, sum#23, count#25]
Input [3]: [sum#22, sum#23, count#25]
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=7]

(45) HashAggregate [codegen id : 9]
(45) HashAggregate [codegen id : 12]
Input [3]: [sum#22, sum#23, count#25]
Keys: []
Functions [3]: [sum(UnscaledValue(cs_ext_ship_cost#6)), sum(UnscaledValue(cs_net_profit#7)), count(distinct cs_order_number#5)]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
WholeStageCodegen (9)
WholeStageCodegen (12)
HashAggregate [sum,sum,count] [sum(UnscaledValue(cs_ext_ship_cost)),sum(UnscaledValue(cs_net_profit)),count(cs_order_number),order count ,total shipping cost ,total net profit ,sum,sum,count]
InputAdapter
Exchange #1
WholeStageCodegen (8)
WholeStageCodegen (11)
HashAggregate [cs_order_number] [sum(UnscaledValue(cs_ext_ship_cost)),sum(UnscaledValue(cs_net_profit)),count(cs_order_number),sum,sum,count,sum,sum,count]
HashAggregate [cs_order_number] [sum(UnscaledValue(cs_ext_ship_cost)),sum(UnscaledValue(cs_net_profit)),sum,sum,sum,sum]
HashAggregate [cs_order_number,cs_ext_ship_cost,cs_net_profit] [sum(UnscaledValue(cs_ext_ship_cost)),sum(UnscaledValue(cs_net_profit)),sum,sum,sum,sum]
Expand All @@ -14,53 +14,59 @@ WholeStageCodegen (9)
BroadcastHashJoin [cs_ship_date_sk,d_date_sk]
SortMergeJoin [cs_order_number,cr_order_number]
InputAdapter
WholeStageCodegen (3)
WholeStageCodegen (5)
Project [cs_ship_date_sk,cs_ship_addr_sk,cs_call_center_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit]
SortMergeJoin [cs_order_number,cs_order_number,cs_warehouse_sk,cs_warehouse_sk]
InputAdapter
WholeStageCodegen (1)
ColumnarToRow
WholeStageCodegen (2)
Sort [cs_order_number]
InputAdapter
CometSort [cs_order_number]
CometExchange [cs_order_number] #2
CometProject [cs_ship_date_sk,cs_ship_addr_sk,cs_call_center_sk,cs_warehouse_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit]
CometFilter [cs_ship_date_sk,cs_ship_addr_sk,cs_call_center_sk]
CometScan parquet spark_catalog.default.catalog_sales [cs_ship_date_sk,cs_ship_addr_sk,cs_call_center_sk,cs_warehouse_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit,cs_sold_date_sk]
Exchange [cs_order_number] #2
WholeStageCodegen (1)
ColumnarToRow
InputAdapter
CometProject [cs_ship_date_sk,cs_ship_addr_sk,cs_call_center_sk,cs_warehouse_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit]
CometFilter [cs_ship_date_sk,cs_ship_addr_sk,cs_call_center_sk]
CometScan parquet spark_catalog.default.catalog_sales [cs_ship_date_sk,cs_ship_addr_sk,cs_call_center_sk,cs_warehouse_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit,cs_sold_date_sk]
InputAdapter
WholeStageCodegen (2)
ColumnarToRow
WholeStageCodegen (4)
Sort [cs_order_number]
InputAdapter
CometSort [cs_order_number]
CometExchange [cs_order_number] #3
CometProject [cs_warehouse_sk,cs_order_number]
CometScan parquet spark_catalog.default.catalog_sales [cs_warehouse_sk,cs_order_number,cs_sold_date_sk]
Exchange [cs_order_number] #3
WholeStageCodegen (3)
ColumnarToRow
InputAdapter
CometProject [cs_warehouse_sk,cs_order_number]
CometScan parquet spark_catalog.default.catalog_sales [cs_warehouse_sk,cs_order_number,cs_sold_date_sk]
InputAdapter
WholeStageCodegen (4)
ColumnarToRow
WholeStageCodegen (7)
Sort [cr_order_number]
InputAdapter
CometSort [cr_order_number]
CometExchange [cr_order_number] #4
CometProject [cr_order_number]
CometScan parquet spark_catalog.default.catalog_returns [cr_order_number,cr_returned_date_sk]
Exchange [cr_order_number] #4
WholeStageCodegen (6)
ColumnarToRow
InputAdapter
CometProject [cr_order_number]
CometScan parquet spark_catalog.default.catalog_returns [cr_order_number,cr_returned_date_sk]
InputAdapter
BroadcastExchange #5
WholeStageCodegen (5)
WholeStageCodegen (8)
ColumnarToRow
InputAdapter
CometProject [d_date_sk]
CometFilter [d_date,d_date_sk]
CometScan parquet spark_catalog.default.date_dim [d_date_sk,d_date]
InputAdapter
BroadcastExchange #6
WholeStageCodegen (6)
WholeStageCodegen (9)
ColumnarToRow
InputAdapter
CometProject [ca_address_sk]
CometFilter [ca_state,ca_address_sk]
CometScan parquet spark_catalog.default.customer_address [ca_address_sk,ca_state]
InputAdapter
BroadcastExchange #7
WholeStageCodegen (7)
WholeStageCodegen (10)
ColumnarToRow
InputAdapter
CometProject [cc_call_center_sk]
Expand Down
Loading
Loading