-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-9147] Support HoodieFileGroupReader for Flink and use FileGroup reader in compaction #13078
base: master
Are you sure you want to change the base?
Conversation
d65a9ab
to
1fb19df
Compare
cc @danny0405 PTAL, thks |
|
||
@Override | ||
public String getRecordKey(RowData record, Schema schema) { | ||
return Objects.toString(getValue(record, schema, RECORD_KEY_METADATA_FIELD)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the record key metadata always there in the row data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here just follows the current FG reader based compaction in HoodieCompactor
:
Line 152 in 45dedd8
&& config.populateMetaFields(); // Virtual key support by fg reader is not ready |
i.e., one of the prerequisites for FG reader based compaction is populateMetaFields
is enabled.
(String) metadataMap.get(INTERNAL_META_PARTITION_PATH)); | ||
// delete record | ||
if (recordOption.isEmpty()) { | ||
return new HoodieEmptyRecord<>(hoodieKey, HoodieRecord.HoodieRecordType.FLINK); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to set up the ordering value correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch, will update.
public RowData seal(RowData rowData) { | ||
if (rowDataSerializer == null) { | ||
RowType requiredRowType = (RowType) AvroSchemaConverter.convertToDataType(getSchemaHandler().getRequiredSchema()).getLogicalType(); | ||
rowDataSerializer = new RowDataSerializer(requiredRowType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to cache the serializer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The serializer here is not created at record level, it's a member field for the FlinkRowDataReaderContext
...ient/hudi-flink-client/src/main/java/org/apache/hudi/client/model/AbstractHoodieRowData.java
Outdated
Show resolved
Hide resolved
} | ||
} | ||
|
||
AbstractHoodieRowData rowWithMetaFields = HoodieRowDataCreation.create(metaFields, data, withOperationField, withMetaFields); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we just store StringData
for AbstractHoodieRowData
metadata fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do that for memory efficiency.
@@ -309,7 +309,7 @@ public Map<String, Object> generateMetadataForRecord( | |||
* @param schema The Avro schema of the record. | |||
* @return A mapping containing the metadata. | |||
*/ | |||
public Map<String, Object> generateMetadataForRecord(T record, Schema schema) { | |||
public Map<String, Object> generateMetadataForRecord(T record, Schema schema, Option<String> orderingFieldName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
orderingFieldName
is added to generate ordering value in FlinkRowDataReaderContext
.
generateMetadataForRecord
only generates recordKey by default, and the generated metadata map will be used to construct HoodieFlinkRecord in constructHoodieRecord(Option<RowData> recordOption, Map<String, Object> metadataMap)
, where ordering value is necessary.
@@ -138,6 +139,15 @@ private FlinkOptions() { | |||
+ "These merger impls will filter by record.merger.strategy. " | |||
+ "Hudi will pick most efficient implementation to perform merging/combining of the records (during update, reading MOR table, etc)"); | |||
|
|||
@AdvancedConfig | |||
public static final ConfigOption<String> RECORD_MERGE_MODE = ConfigOptions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to add the option for Hoodie core options, all the hoodie options are applied automically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, users can configure merging strategy by the option payload.class
(default value is EventTimeAvroPayload
for event time merging semantics), which is Avro-based.
After we introduce FG reader based compaction, users should not use the legacy config based on Avro payload, and the merging mode configs should be exposed to users to choose the expected merging behavior.
Btw, the compatibility work for payload config is also included in in the PR.
...link-datasource/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java
Outdated
Show resolved
Hide resolved
...datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
Outdated
Show resolved
Hide resolved
String instantTime, | ||
Option<EngineBroadcastManager> broadcastManagerOpt) throws IOException { | ||
Configuration conf = metaClient.getStorage().getConf().unwrapAs(Configuration.class); | ||
FlinkRowDataReaderContext readerContext = new FlinkRowDataReaderContext( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like there is no need to store Flink conf in FlinkRowDataReaderContext
, the InternalSchemaManager
needs the Flink conf to do 2 things:
- decide if the schema evolution is enabled: we can move this check into
HoodieWriteConfig.isSchemaEvolutionEnabled
; - generates Hadoop conf from it, but the hadoop conf is already in
FlinkRowDataReaderContext
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides the usage you mentioned, flink conf stored in FlinkRowDataReaderContext
is also used to:
- generate partition specs for
FlinkParquetReader
; - get config
read.utc-timezone
to create field converter (Flink value -> Avro value) ingetOrderingValue
;
Any recommended cleaner way to achieve these?
|
… reader in compaction
fix test update
Change Logs
Impact
Improve perf for Flink compaction
Risk level (write none, low medium or high below)
medium
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist