Skip to content

Commit

Permalink
[HUDI-7476] Incremental loading for archived timeline (apache#10807)
Browse files Browse the repository at this point in the history
  • Loading branch information
danny0405 authored Mar 8, 2024
1 parent f308aa2 commit 58bc859
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.hudi.client.timeline;

import org.apache.hudi.avro.model.HoodieLSMTimelineInstant;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.table.timeline.MetadataConversionUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
Expand Down Expand Up @@ -72,21 +73,29 @@ public class LSMTimelineWriter {
public static final long MAX_FILE_SIZE_IN_BYTES = 1024 * 1024 * 1000;

private final HoodieWriteConfig config;
private final HoodieTable<?, ?, ?, ?> table;
private final TaskContextSupplier taskContextSupplier;
private final HoodieTableMetaClient metaClient;

private HoodieWriteConfig writeConfig;

private LSMTimelineWriter(HoodieWriteConfig config, HoodieTable<?, ?, ?, ?> table) {
this(config, table.getTaskContextSupplier(), table.getMetaClient());
}

private LSMTimelineWriter(HoodieWriteConfig config, TaskContextSupplier taskContextSupplier, HoodieTableMetaClient metaClient) {
this.config = config;
this.table = table;
this.metaClient = table.getMetaClient();
this.taskContextSupplier = taskContextSupplier;
this.metaClient = metaClient;
}

public static LSMTimelineWriter getInstance(HoodieWriteConfig config, HoodieTable<?, ?, ?, ?> table) {
return new LSMTimelineWriter(config, table);
}

public static LSMTimelineWriter getInstance(HoodieWriteConfig config, TaskContextSupplier taskContextSupplier, HoodieTableMetaClient metaClient) {
return new LSMTimelineWriter(config, taskContextSupplier, metaClient);
}

/**
* Writes the list of active actions into the timeline.
*
Expand Down Expand Up @@ -366,7 +375,7 @@ private HoodieWriteConfig getOrCreateWriterConfig() {
private HoodieFileWriter openWriter(Path filePath) {
try {
return HoodieFileWriterFactory.getFileWriter("", filePath, metaClient.getHadoopConf(), getOrCreateWriterConfig(),
HoodieLSMTimelineInstant.getClassSchema(), table.getTaskContextSupplier(), HoodieRecord.HoodieRecordType.AVRO);
HoodieLSMTimelineInstant.getClassSchema(), taskContextSupplier, HoodieRecord.HoodieRecordType.AVRO);
} catch (IOException e) {
throw new HoodieException("Unable to initialize archiving writer", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.common.table.timeline;

import org.apache.hudi.DummyActiveAction;
import org.apache.hudi.client.timeline.LSMTimelineWriter;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.engine.LocalTaskContextSupplier;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;

import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

/**
* Test cases for {@link HoodieArchivedTimeline}.
*/
public class TestHoodieArchivedTimeline extends HoodieCommonTestHarness {

@BeforeEach
public void setUp() throws Exception {
initMetaClient();
}

@AfterEach
public void tearDown() throws Exception {
cleanMetaClient();
}

@Test
public void testLoadingInstantsIncrementally() throws Exception {
writeArchivedTimeline(10, 10000000);
// now we got 500 instants spread in 5 parquets.
HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline("10000043");
assertThat(archivedTimeline.firstInstant().map(HoodieInstant::getTimestamp).orElse(""), is("10000043"));
assertThat(archivedTimeline.lastInstant().map(HoodieInstant::getTimestamp).orElse(""), is("10000050"));
// load incrementally
archivedTimeline.reload("10000034");
assertThat(archivedTimeline.firstInstant().map(HoodieInstant::getTimestamp).orElse(""), is("10000034"));
archivedTimeline.reload("10000011");
assertThat(archivedTimeline.firstInstant().map(HoodieInstant::getTimestamp).orElse(""), is("10000011"));
}

// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------

private void writeArchivedTimeline(int batchSize, long startTs) throws Exception {
HoodieTestTable testTable = HoodieTestTable.of(this.metaClient);
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(this.metaClient.getBasePathV2().toString())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
.withMarkersType("DIRECT")
.build();
HoodieEngineContext engineContext = new HoodieLocalEngineContext(new Configuration());
LSMTimelineWriter writer = LSMTimelineWriter.getInstance(writeConfig, new LocalTaskContextSupplier(), metaClient);
List<ActiveAction> instantBuffer = new ArrayList<>();
for (int i = 1; i <= 50; i++) {
long instantTimeTs = startTs + i;
String instantTime = String.valueOf(instantTimeTs);
String completionTime = String.valueOf(instantTimeTs + 10);
HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", instantTime, completionTime);
HoodieCommitMetadata metadata = testTable.createCommitMetadata(instantTime, WriteOperationType.INSERT, Arrays.asList("par1", "par2"), 10, false);
byte[] serializedMetadata = TimelineMetadataUtils.serializeCommitMetadata(metadata).get();
instantBuffer.add(new DummyActiveAction(instant, serializedMetadata));
if (i % batchSize == 0) {
// archive 10 instants each time
writer.write(instantBuffer, org.apache.hudi.common.util.Option.empty(), org.apache.hudi.common.util.Option.empty());
writer.compactAndClean(engineContext);
instantBuffer.clear();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {

private static final Logger LOG = LoggerFactory.getLogger(HoodieArchivedTimeline.class);

/**
* Used for loading the archived timeline incrementally, the earliest loaded instant time get memorized
* each time the timeline is loaded. The instant time is then used as the end boundary
* of the next loading.
*/
private String cursorInstant;

/**
* Loads all the archived instants.
* Note that there is no lazy loading, so this may not work if the archived timeline range is really long.
Expand All @@ -80,6 +87,7 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) {
this.metaClient = metaClient;
setInstants(this.loadInstants());
this.cursorInstant = firstInstant().map(HoodieInstant::getTimestamp).orElse(null);
// multiple casts will make this lambda serializable -
// http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16
this.details = (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails;
Expand All @@ -92,6 +100,7 @@ public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) {
public HoodieArchivedTimeline(HoodieTableMetaClient metaClient, String startTs) {
this.metaClient = metaClient;
setInstants(loadInstants(new StartTsFilter(startTs), LoadMode.METADATA));
this.cursorInstant = startTs;
// multiple casts will make this lambda serializable -
// http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16
this.details = (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails;
Expand Down Expand Up @@ -152,6 +161,26 @@ public HoodieArchivedTimeline reload() {
return new HoodieArchivedTimeline(metaClient);
}

/**
* Reloads the archived timeline incrementally with given beginning timestamp {@code startTs}.
* This method is not thread safe.
*
* <p>IMPORTANT: this is for multiple loading of one static snapshot of the timeline, if there is new instants got archived,
* use {@link #reload()} instead.
*/
public HoodieArchivedTimeline reload(String startTs) {
if (this.cursorInstant != null) {
if (HoodieTimeline.compareTimestamps(startTs, LESSER_THAN, this.cursorInstant)) {
appendInstants(loadInstants(new ClosedOpenTimeRangeFilter(startTs, this.cursorInstant), LoadMode.METADATA));
this.cursorInstant = startTs;
}
return this;
} else {
// a null cursor instant indicates an empty timeline
return new HoodieArchivedTimeline(metaClient, startTs);
}
}

private HoodieInstant readCommit(String instantTime, GenericRecord record, Option<BiConsumer<String, GenericRecord>> instantDetailsConsumer) {
final String action = record.get(ACTION_ARCHIVED_META_FIELD).toString();
final String completionTime = record.get(COMPLETION_TIME_ARCHIVED_META_FIELD).toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;

import org.slf4j.Logger;
Expand All @@ -35,6 +36,7 @@
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -75,15 +77,23 @@ public HoodieDefaultTimeline(Stream<HoodieInstant> instants, Function<HoodieInst

public void setInstants(List<HoodieInstant> instants) {
this.instants = instants;
final MessageDigest md;
try {
md = MessageDigest.getInstance(HASHING_ALGORITHM);
this.instants.forEach(i -> md
.update(getUTF8Bytes(StringUtils.joinUsingDelim("_", i.getTimestamp(), i.getAction(), i.getState().name()))));
} catch (NoSuchAlgorithmException nse) {
throw new HoodieException(nse);
this.timelineHash = computeTimelineHash(this.instants);
clearState();
}

public void appendInstants(List<HoodieInstant> newInstants) {
if (newInstants.isEmpty()) {
// the new instants is empty, nothing to do.
return;
}
if (this.instants.isEmpty()) {
// the existing instants is empty, set up the new ones directly.
setInstants(newInstants);
return;
}
this.timelineHash = StringUtils.toHexString(md.digest());
this.instants = mergeInstants(newInstants, this.instants);
this.timelineHash = computeTimelineHash(this.instants);
clearState();
}

/**
Expand Down Expand Up @@ -567,6 +577,11 @@ private static Option<HoodieInstant> findFirstNonSavepointCommit(List<HoodieInst
return Option.fromJavaOptional(instants.stream().findFirst());
}

private void clearState() {
instantTimeSet = null;
firstNonSavepointCommit = null;
}

/**
* Merge this timeline with the given timeline.
*/
Expand All @@ -581,4 +596,42 @@ public HoodieDefaultTimeline mergeTimeline(HoodieDefaultTimeline timeline) {
};
return new HoodieDefaultTimeline(instantStream, details);
}

/**
* Computes the timeline hash and returns.
*/
private String computeTimelineHash(List<HoodieInstant> instants) {
final MessageDigest md;
try {
md = MessageDigest.getInstance(HASHING_ALGORITHM);
instants.forEach(i -> md
.update(getUTF8Bytes(StringUtils.joinUsingDelim("_", i.getTimestamp(), i.getAction(), i.getState().name()))));
} catch (NoSuchAlgorithmException nse) {
throw new HoodieException(nse);
}
return StringUtils.toHexString(md.digest());
}

/**
* Merges the given instant list into one and keep the sequence.
*/
private static List<HoodieInstant> mergeInstants(List<HoodieInstant> instants1, List<HoodieInstant> instants2) {
ValidationUtils.checkArgument(!instants1.isEmpty() && !instants2.isEmpty(), "The instants to merge can not be empty");
// some optimizations are based on the assumption all the instant lists are already sorted.
// skip when one list contains all the instants of the other one.
final List<HoodieInstant> merged;
if (HoodieTimeline.compareTimestamps(instants1.get(instants1.size() - 1).getTimestamp(), LESSER_THAN_OR_EQUALS, instants2.get(0).getTimestamp())) {
merged = new ArrayList<>(instants1);
merged.addAll(instants2);
} else if (HoodieTimeline.compareTimestamps(instants2.get(instants2.size() - 1).getTimestamp(), LESSER_THAN_OR_EQUALS, instants1.get(0).getTimestamp())) {
merged = new ArrayList<>(instants2);
merged.addAll(instants1);
} else {
merged = new ArrayList<>(instants1);
merged.addAll(instants2);
// sort the instants explicitly
Collections.sort(merged);
}
return merged;
}
}

0 comments on commit 58bc859

Please sign in to comment.