Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [connector-tdengine] fix sql exception and concurrentmodifyexception when connect to taos and read data #6088

Merged
merged 7 commits into from
Aug 8, 2024

Conversation

alextinng
Copy link
Contributor

Purpose of this pull request

close #6032
close #5998

Does this PR introduce any user-facing change?

no

How was this patch tested?

test with seatunnel-engine-example, belowing is application log:

2023-12-26 16:12:21,967 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - checkpoint is disabled, because in batch mode and 'checkpoint.interval' of env is missing.
2023-12-26 16:12:22,001 INFO  org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask - received enough reader, starting enumerator...
2023-12-26 16:12:22,011 INFO  org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceSplitEnumerator - Assign pendingSplits to readers [0]
2023-12-26 16:12:22,012 INFO  org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceSplitEnumerator - Assign splits [org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceSplit@6ace399c, org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceSplit@432a36dc, org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceSplit@7669b2b7, org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceSplit@7599e205] to reader 0
2023-12-26 16:12:22,020 INFO  org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceSplitEnumerator - No more splits to assign. Sending NoMoreSplitsEvent to reader [0].
2023-12-26 16:12:22,021 INFO  org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceReader - no more split accepted!
2023-12-26 16:12:22,061 INFO  org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceReader - polling new split from queue!
2023-12-26 16:12:22,062 INFO  org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceReader - starting run new split d_1, query sql: select `ts`,`s_1`,`s_2`,`city_code` from test.d_1!
2023-12-26 16:12:22,169 INFO  org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceReader - polling new split from queue!
2023-12-26 16:12:22,169 INFO  org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceReader - starting run new split d_2, query sql: select `ts`,`s_1`,`s_2`,`city_code` from test.d_2!
2023-12-26 16:12:22,171 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0  rowIndex=1:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_1, 2023-12-26T13:43:54.609, 1.0, 2.0, HF
2023-12-26 16:12:22,171 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0  rowIndex=2:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_1, 2023-12-26T13:44:05.303, 1.1, 2.0, HF
2023-12-26 16:12:22,171 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0  rowIndex=3:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_1, 2023-12-26T13:44:09.812, 1.2, 2.1, HF
2023-12-26 16:12:22,181 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0  rowIndex=4:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_2, 2023-12-26T13:44:24.184, 1.1, 2.2, HF
2023-12-26 16:12:22,181 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0  rowIndex=5:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_2, 2023-12-26T13:44:27.687, 1.2, 2.2, HF
2023-12-26 16:12:22,181 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0  rowIndex=6:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_2, 2023-12-26T13:44:32.231, 1.3, 2.3, HF
2023-12-26 16:12:22,184 INFO  org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceReader - polling new split from queue!
2023-12-26 16:12:22,184 INFO  org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceReader - starting run new split d_4, query sql: select `ts`,`s_1`,`s_2`,`city_code` from test.d_4!
2023-12-26 16:12:22,193 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0  rowIndex=7:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_4, 2023-12-26T13:45:21.366, 1.1, 2.1, HF
2023-12-26 16:12:22,193 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0  rowIndex=8:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_4, 2023-12-26T13:45:24.381, 1.2, 2.1, HF
2023-12-26 16:12:22,193 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0  rowIndex=9:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_4, 2023-12-26T13:45:26.941, 1.3, 2.1, HF
2023-12-26 16:12:22,193 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0  rowIndex=10:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_4, 2023-12-26T13:45:30.452, 1.4, 2.2, HF
2023-12-26 16:12:22,197 INFO  org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceReader - polling new split from queue!
2023-12-26 16:12:22,197 INFO  org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceReader - starting run new split d_3, query sql: select `ts`,`s_1`,`s_2`,`city_code` from test.d_3!
2023-12-26 16:12:22,222 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0  rowIndex=11:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_3, 2023-12-26T13:44:43.803, 1.0, 2.0, HF
2023-12-26 16:12:22,223 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0  rowIndex=12:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_3, 2023-12-26T13:44:49.397, 1.1, 2.0, HF
2023-12-26 16:12:22,223 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0  rowIndex=13:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : d_3, 2023-12-26T13:44:52.932, 1.2, 2.1, HF
2023-12-26 16:12:22,237 INFO  org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceReader - polling new split from queue!
2023-12-26 16:12:22,237 INFO  org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceReader - Closed the bounded jdbc source
2023-12-26 16:12:22,361 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - wait checkpoint completed: 9223372036854775807
2023-12-26 16:12:23,021 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - pending checkpoint(9223372036854775807/1@791940519440678913) notify finished!
2023-12-26 16:12:23,021 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start notify checkpoint completed, job id: 791940519440678913, pipeline id: 1, checkpoint id:9223372036854775807
2023-12-26 16:12:23,043 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start clean pending checkpoint cause CheckpointCoordinator completed.
2023-12-26 16:12:23,054 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - Turn checkpoint_state_791940519440678913_1 state from null to FINISHED
2023-12-26 16:12:23,126 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-670882] [5.1] taskDone, taskId = 50000, taskGroup = TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=30000}
2023-12-26 16:12:23,149 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-670882] [5.1] taskDone, taskId = 20000, taskGroup = TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=1}
2023-12-26 16:12:23,150 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-670882] [5.1] Task TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=1} complete with state FINISHED
2023-12-26 16:12:23,150 INFO  org.apache.seatunnel.engine.server.CoordinatorService - [localhost]:5801 [seatunnel-670882] [5.1] Received task end from execution TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=1}, state FINISHED
2023-12-26 16:12:23,155 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job fake_to_console.conf (791940519440678913), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-TDengine]-SplitEnumerator (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=1}] turned from state RUNNING to FINISHED.
2023-12-26 16:12:23,156 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job fake_to_console.conf (791940519440678913), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-TDengine]-SplitEnumerator (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=1}] state process is stopped
2023-12-26 16:12:23,157 INFO  org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job fake_to_console.conf (791940519440678913), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-TDengine]-SplitEnumerator (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=1}] future complete with state FINISHED
2023-12-26 16:12:23,157 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-670882] [5.1] taskDone, taskId = 40000, taskGroup = TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=30000}
2023-12-26 16:12:23,158 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-670882] [5.1] Task TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=30000} complete with state FINISHED
2023-12-26 16:12:23,158 INFO  org.apache.seatunnel.engine.server.CoordinatorService - [localhost]:5801 [seatunnel-670882] [5.1] Received task end from execution TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=30000}, state FINISHED
2023-12-26 16:12:23,162 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job fake_to_console.conf (791940519440678913), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-TDengine]-SourceTask (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=30000}] turned from state RUNNING to FINISHED.
2023-12-26 16:12:23,163 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job fake_to_console.conf (791940519440678913), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-TDengine]-SourceTask (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=30000}] state process is stopped
2023-12-26 16:12:23,163 INFO  org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job fake_to_console.conf (791940519440678913), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-TDengine]-SourceTask (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=791940519440678913, pipelineId=1, taskGroupId=30000}] future complete with state FINISHED
2023-12-26 16:12:23,163 INFO  org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job fake_to_console.conf (791940519440678913), Pipeline: [(1/1)] will end with state FINISHED
2023-12-26 16:12:23,163 INFO  org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job fake_to_console.conf (791940519440678913), Pipeline: [(1/1)] turned from state RUNNING to FINISHED.
2023-12-26 16:12:23,207 INFO  org.apache.seatunnel.engine.server.master.JobMaster - release the pipeline Job fake_to_console.conf (791940519440678913), Pipeline: [(1/1)] resource
2023-12-26 16:12:23,215 INFO  org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 791940519440678913, slot: SlotProfile{worker=[localhost]:5801, slotID=2, ownerJobID=791940519440678913, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='19c5a1c0-2b4e-4dea-968a-cc15ff536616'}
2023-12-26 16:12:23,215 INFO  org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 791940519440678913, slot: SlotProfile{worker=[localhost]:5801, slotID=1, ownerJobID=791940519440678913, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='19c5a1c0-2b4e-4dea-968a-cc15ff536616'}
2023-12-26 16:12:23,215 INFO  org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job fake_to_console.conf (791940519440678913), Pipeline: [(1/1)] state process is stop
2023-12-26 16:12:23,216 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan - Job fake_to_console.conf (791940519440678913), Pipeline: [(1/1)] future complete with state FINISHED
2023-12-26 16:12:23,216 DEBUG org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan - Try to update the Job fake_to_console.conf (791940519440678913) state from RUNNING to FINISHED
2023-12-26 16:12:23,216 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan - Job fake_to_console.conf (791940519440678913) turned from state RUNNING to FINISHED.
2023-12-26 16:12:23,217 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan - Job fake_to_console.conf (791940519440678913) state process is stop
2023-12-26 16:12:23,251 INFO  org.apache.seatunnel.engine.client.job.ClientJobProxy - Job (791940519440678913) end with state FINISHED
2023-12-26 16:12:23,401 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - 
***********************************************
           Job Statistic Information
***********************************************
Start Time                : 2023-12-26 16:08:53
End Time                  : 2023-12-26 16:12:23
Total Time(s)             :                 209
Total Read Count          :                  13
Total Write Count         :                  13
Total Failed Count        :                   0
***********************************************

Check list

// signal to the source that we have reached the end of the data.
log.info("Closed the bounded TDengine source");
context.signalNoMoreElement();
log.info("polling new split from queue!");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTAL @hailin0

@alextinng
Copy link
Contributor Author

e2e test failed

image

@Hisoka-X
Copy link
Member

Hisoka-X commented Jan 3, 2024

please try merge from dev.

@alextinng
Copy link
Contributor Author

alextinng commented Jan 3, 2024

@Hisoka-X checked out dev and still got same error

image

@Hisoka-X
Copy link
Member

Hisoka-X commented Jan 3, 2024

Are you try rebuild source code? Because before run e2e, seatunnel need build to refresh jar.

@alextinng
Copy link
Contributor Author

alextinng commented Jan 3, 2024

@Hisoka-X I rebuild project and still got same error

command: mvn package -pl seatunnel-dist -am -Dmaven.test.skip=true

image

@hailin0
Copy link
Member

hailin0 commented Jan 13, 2024

Your commit list needs to be cleaned up

@hailin0 hailin0 added the no update The owner doesn't provide further feedback. label Jan 13, 2024
@alextinng alextinng force-pushed the fix/connector-tdengine_source_errors branch 2 times, most recently from 6c96d24 to cfe371a Compare January 22, 2024 02:41
@alextinng
Copy link
Contributor Author

@hailin0 PTAL

@liugddx
Copy link
Member

liugddx commented Jan 22, 2024

Can you provide some test cases?

@alextinng
Copy link
Contributor Author

Can you provide some test cases?

@liugddx done

}
} else if (noMoreSplit && sourceSplits.isEmpty()) {
// signal to the source that we have reached the end of the data.
log.info("Closed the bounded jdbc source");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is more appropriate to replace jdbc source with TDengine source.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand what you mean~

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean the log print content may need to be adjusted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean the log print content may need to be adjusted.

hah~ get it!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alextinng alextinng force-pushed the fix/connector-tdengine_source_errors branch from 3a92d00 to a7eb663 Compare February 20, 2024 06:31
@Hisoka-X
Copy link
Member

Please fix ci.

@alextinng alextinng force-pushed the fix/connector-tdengine_source_errors branch from eeeab0a to e5feb0a Compare April 28, 2024 01:22
DESKTOP-GHPCOV0\dingaolong and others added 6 commits July 26, 2024 16:20
1. fix no suitable driver found exception while connecting to taos by jdbc
2. print cause when throw TDengineConnectorException
3. fix concurrentmodifyexception when operating splits
test poll and add split in reader
@alextinng alextinng force-pushed the fix/connector-tdengine_source_errors branch from e5feb0a to ed27f84 Compare July 26, 2024 08:27
liugddx
liugddx previously approved these changes Jul 30, 2024
Comment on lines +97 to +98
properties.put(TSDBDriver.PROPERTY_KEY_USER, config.getUsername());
properties.put(TSDBDriver.PROPERTY_KEY_PASSWORD, config.getPassword());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can other jdbc parameters be added here?

@Hisoka-X Hisoka-X requested review from hailin0 and removed request for happyboy1024 August 7, 2024 01:32
StringUtils.join(
config.getUrl(),
config.getDatabase(),
"?user=",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why update this config?

Copy link
Contributor Author

@alextinng alextinng Aug 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why update this config?

It's not safe to put username and password in jdbc url, it's better to put password in properties and pass properties to jdbc driver in production

There's a risk that the JDBC URL, including the password, could be logged inadvertently, especially if the logging level is not properly configured to exclude sensitive information.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alextinng here

Copy link
Member

@hailin0 hailin0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did you fix this?

No suitable driver found for jdbc:TAOS-RS://localhost:6041/test?user=root&password=taosdata

@alextinng
Copy link
Contributor Author

How did you fix this?

No suitable driver found for jdbc:TAOS-RS://localhost:6041/test?user=root&password=taosdata

already fixed in previous commit by others, see line: 134-135
// check td driver whether exist and if not, try to register
checkDriverExist(jdbcUrl);

Copy link
Member

@hailin0 hailin0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Hisoka-X Hisoka-X merged commit a18fca8 into apache:dev Aug 8, 2024
9 checks passed
@alextinng alextinng deleted the fix/connector-tdengine_source_errors branch August 8, 2024 04:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants