Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix][Connector-V2] Fix paimon sink restore error #8598

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ public synchronized void init(long initializationTimestamp, boolean restart) thr
try {
Thread.currentThread().setContextClassLoader(classLoader);
if (!restart
&& !logicalDag.isStartWithSavePoint()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this?

Copy link
Author

@uniding uniding Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Execute the following command to start from savepoint, which will result in errors. Please refer to the bug for details
    seatunnel.sh -c paimon.config -r 938988912087400449 -n m2p
  2. After analysis and debugging, it was found that starting from savepoint does not execute the JobMaster::handleSaveMode method, so the handleSchemaSaveMode method of the PaimonSaveModeHandler class is not executed,Causing PaimonSink related member variables(paimonTable variable) to not be initialized resulting in a null pointer error when writing paimon
    3.The relevant code call stack is shown in the following figure:
image image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should move setLoadTable method out from handleSchemaSaveMode.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should move setLoadTable method out from handleSchemaSaveMode.

why add !logicalDag.isStartWithSavePoint()?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SchemaSaveMode used to create table or truncate table. When job start with savepoint, that's meaning the table already created or truncated. So we can not do it again.

&& ReadonlyConfig.fromMap(logicalDag.getJobConfig().getEnvOptions())
.get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION)
.equals(SaveModeExecuteLocation.CLUSTER)) {
Expand Down