Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to Flink 1.15 & FLIP-27 Source APIs #37

Merged
merged 12 commits into from
Apr 28, 2022
Merged

Upgrade to Flink 1.15 & FLIP-27 Source APIs #37

merged 12 commits into from
Apr 28, 2022

Conversation

knaufk
Copy link
Owner

@knaufk knaufk commented Mar 22, 2022

No description provided.

Copy link
Contributor

@MartijnVisser MartijnVisser left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've checked out this PR and build it locally, together with a locally build Flink 1.15. I've tried some of the SQL recipes but I'm getting some weird results on timestamps.

CREATE TABLE orders ( 
    bidtime TIMESTAMP(3),
    price DOUBLE, 
    item STRING,
    supplier STRING,
    WATERMARK FOR bidtime AS bidtime - INTERVAL '5' SECONDS
) WITH (
  'connector' = 'faker',
  'fields.bidtime.expression' = '#{date.past ''30'',''SECONDS''}',
  'fields.price.expression' = '#{Number.randomDouble ''2'',''1'',''150''}',
  'fields.item.expression' = '#{Commerce.productName}',
  'fields.supplier.expression' = '#{regexify ''(Alice|Bob|Carol|Alex|Joe|James|Jane|Jack)''}',
  'rows-per-second' = '100'
);

I'm getting the same result for every row:

image

One more question: since this version won't be compatible with Flink 1.14 and lower, will you release this with a major/minor version bump?

@knaufk
Copy link
Owner Author

knaufk commented Mar 24, 2022

@MartijnVisser Thank you for testing.

  • I will add a compatibility matrix to the README and I am planning a minor version bump.

  • I can reproduce the issue with the reapeated output when using a parallelism of 1. Can you confirm that your Job was running with parallelism 1. I think, this is actually unrelated to this change, but related to the migration to datafaker. When I replace data-faker with java-faker again, the issue disappears in tests. @bodiam, do you have an idea what could be causing this?

@MartijnVisser
Copy link
Contributor

@knaufk Yes, I was indeed testing with parallelism of 1. It also occurs when parallelism is set to 2 (and probably higher)

@bodiam
Copy link

bodiam commented Mar 24, 2022

@knaufk yes, I'm very sorry for this, but it seems a regression slipped in due to some aggressive caching. Going back to version 1.1.0 should fix this, and it's also fixed in 1.3.0, but that one hasn't been release yet (it's planned to be released in 7 days). My apologies for this!

@snuyanzin
Copy link
Contributor

snuyanzin commented Mar 24, 2022

just for the history, the problem with the same data for every row was fixed at datafaker-net/datafaker#88

@knaufk
Copy link
Owner Author

knaufk commented Mar 24, 2022

Thanks everyone. I've added a test to guard against this kind of regression in the future and downgraded to 1.1.0 for now.

@knaufk knaufk force-pushed the flink115 branch 2 times, most recently from d5d760b to 9e37222 Compare March 24, 2022 16:25
@MartijnVisser
Copy link
Contributor

I'm having another look. I do find it weird that after pulling your latest commits, I'm now getting a Flink related error. That's probably for the Flink community to look into because I don't expect that they should appear when running a mvn clean package on an external connector

/Users/martijnvisser/.sdkman/candidates/java/8.0.292.hs-adpt/bin/java -Dmaven.multiModuleProjectDirectory=/Users/martijnvisser/Developer/flink-faker -Dmaven.home=/Users/martijnvisser/Library/Application Support/JetBrains/Toolbox/apps/IDEA-C/ch-0/213.6777.52/IntelliJ IDEA CE.app/Contents/plugins/maven/lib/maven3 -Dclassworlds.conf=/Users/martijnvisser/Library/Application Support/JetBrains/Toolbox/apps/IDEA-C/ch-0/213.6777.52/IntelliJ IDEA CE.app/Contents/plugins/maven/lib/maven3/bin/m2.conf -Dmaven.ext.class.path=/Users/martijnvisser/Library/Application Support/JetBrains/Toolbox/apps/IDEA-C/ch-0/213.6777.52/IntelliJ IDEA CE.app/Contents/plugins/maven/lib/maven-event-listener.jar -javaagent:/Users/martijnvisser/Library/Application Support/JetBrains/Toolbox/apps/IDEA-C/ch-0/213.6777.52/IntelliJ IDEA CE.app/Contents/lib/idea_rt.jar=53680:/Users/martijnvisser/Library/Application Support/JetBrains/Toolbox/apps/IDEA-C/ch-0/213.6777.52/IntelliJ IDEA CE.app/Contents/bin -Dfile.encoding=UTF-8 -classpath /Users/martijnvisser/Library/Application Support/JetBrains/Toolbox/apps/IDEA-C/ch-0/213.6777.52/IntelliJ IDEA CE.app/Contents/plugins/maven/lib/maven3/boot/plexus-classworlds.license:/Users/martijnvisser/Library/Application Support/JetBrains/Toolbox/apps/IDEA-C/ch-0/213.6777.52/IntelliJ IDEA CE.app/Contents/plugins/maven/lib/maven3/boot/plexus-classworlds-2.6.0.jar org.codehaus.classworlds.Launcher -Didea.version=2021.3.2 clean package
[INFO] Scanning for projects...
[INFO] 
[INFO] -------------------< com.github.knaufk:flink-faker >--------------------
[INFO] Building flink-faker 0.4.1
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ flink-faker ---
[INFO] 
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ flink-faker ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 1 resource
[INFO] 
[INFO] --- maven-compiler-plugin:3.10.1:compile (default-compile) @ flink-faker ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 7 source files to /Users/martijnvisser/Developer/flink-faker/target/classes
[ERROR] error reading /Users/martijnvisser/.m2/repository/org/apache/flink/flink-rpc-akka-loader/1.15.0/flink-rpc-akka-loader-1.15.0.jar; zip file is empty
[ERROR] error reading /Users/martijnvisser/.m2/repository/org/apache/flink/flink-rpc-akka-loader/1.15.0/flink-rpc-akka-loader-1.15.0.jar; zip file is empty
[INFO] 
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ flink-faker ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 1 resource
[INFO] 
[INFO] --- maven-compiler-plugin:3.10.1:testCompile (default-testCompile) @ flink-faker ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 5 source files to /Users/martijnvisser/Developer/flink-faker/target/test-classes
[ERROR] error reading /Users/martijnvisser/.m2/repository/org/apache/flink/flink-rpc-akka-loader/1.15.0/flink-rpc-akka-loader-1.15.0.jar; zip file is empty
[ERROR] error reading /Users/martijnvisser/.m2/repository/org/apache/flink/flink-rpc-akka-loader/1.15.0/flink-rpc-akka-loader-1.15.0.jar; zip file is empty
[INFO] /Users/martijnvisser/Developer/flink-faker/src/test/java/com/github/knaufk/flink/faker/FlinkFakerTableSourceFactoryTest.java: /Users/martijnvisser/Developer/flink-faker/src/test/java/com/github/knaufk/flink/faker/FlinkFakerTableSourceFactoryTest.java uses unchecked or unsafe operations.
[INFO] /Users/martijnvisser/Developer/flink-faker/src/test/java/com/github/knaufk/flink/faker/FlinkFakerTableSourceFactoryTest.java: Recompile with -Xlint:unchecked for details.
[INFO] 
[INFO] --- maven-surefire-plugin:3.0.0-M5:test (default-test) @ flink-faker ---
[INFO] 
[INFO] -------------------------------------------------------
[INFO]  T E S T S
[INFO] -------------------------------------------------------
[INFO] Running com.github.knaufk.flink.faker.FlinkFakerTableSourceFactoryTest
[INFO] Tests run: 8, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 2.218 s - in com.github.knaufk.flink.faker.FlinkFakerTableSourceFactoryTest
[INFO] Running com.github.knaufk.flink.faker.FlinkFakerIntegrationTest
[ERROR] Tests run: 5, Failures: 0, Errors: 5, Skipped: 0, Time elapsed: 2.82 s <<< FAILURE! - in com.github.knaufk.flink.faker.FlinkFakerIntegrationTest
[ERROR] com.github.knaufk.flink.faker.FlinkFakerIntegrationTest.testWithComputedColumn  Time elapsed: 2.313 s  <<< ERROR!
org.apache.flink.table.api.TableException: Failed to execute sql
	at com.github.knaufk.flink.faker.FlinkFakerIntegrationTest.testWithComputedColumn(FlinkFakerIntegrationTest.java:78)
Caused by: org.apache.flink.runtime.rpc.exceptions.RpcLoaderException: Could not load RpcSystem.
	at com.github.knaufk.flink.faker.FlinkFakerIntegrationTest.testWithComputedColumn(FlinkFakerIntegrationTest.java:78)
Caused by: org.apache.flink.runtime.rpc.exceptions.RpcLoaderException: Could not initialize RPC system. Run 'mvn clean package -pl flink-rpc/flink-rpc-akka,flink-rpc/flink-rpc-akka-loader -DskipTests' on the command-line instead.
	at com.github.knaufk.flink.faker.FlinkFakerIntegrationTest.testWithComputedColumn(FlinkFakerIntegrationTest.java:78)
Caused by: java.lang.NullPointerException
	at com.github.knaufk.flink.faker.FlinkFakerIntegrationTest.testWithComputedColumn(FlinkFakerIntegrationTest.java:78)

[ERROR] com.github.knaufk.flink.faker.FlinkFakerIntegrationTest.testFlinkFakerWithComplexTypes  Time elapsed: 0.222 s  <<< ERROR!
org.apache.flink.table.api.TableException: Failed to execute sql
	at com.github.knaufk.flink.faker.FlinkFakerIntegrationTest.testFlinkFakerWithComplexTypes(FlinkFakerIntegrationTest.java:154)
Caused by: org.apache.flink.runtime.rpc.exceptions.RpcLoaderException: Could not load RpcSystem.
	at com.github.knaufk.flink.faker.FlinkFakerIntegrationTest.testFlinkFakerWithComplexTypes(FlinkFakerIntegrationTest.java:154)
Caused by: org.apache.flink.runtime.rpc.exceptions.RpcLoaderException: Could not initialize RPC system. Run 'mvn clean package -pl flink-rpc/flink-rpc-akka,flink-rpc/flink-rpc-akka-loader -DskipTests' on the command-line instead.
	at com.github.knaufk.flink.faker.FlinkFakerIntegrationTest.testFlinkFakerWithComplexTypes(FlinkFakerIntegrationTest.java:154)
Caused by: java.lang.NullPointerException
	at com.github.knaufk.flink.faker.FlinkFakerIntegrationTest.testFlinkFakerWithComplexTypes(FlinkFakerIntegrationTest.java:154)

[ERROR] com.github.knaufk.flink.faker.FlinkFakerIntegrationTest.testRandomResultsWithoutParallelism  Time elapsed: 0.064 s  <<< ERROR!
org.apache.flink.table.api.TableException: Failed to execute sql
	at com.github.knaufk.flink.faker.FlinkFakerIntegrationTest.testRandomResultsWithoutParallelism(FlinkFakerIntegrationTest.java:42)
Caused by: org.apache.flink.runtime.rpc.exceptions.RpcLoaderException: Could not load RpcSystem.
	at com.github.knaufk.flink.faker.FlinkFakerIntegrationTest.testRandomResultsWithoutParallelism(FlinkFakerIntegrationTest.java:42)
Caused by: org.apache.flink.runtime.rpc.exceptions.RpcLoaderException: Could not initialize RPC system. Run 'mvn clean package -pl flink-rpc/flink-rpc-akka,flink-rpc/flink-rpc-akka-loader -DskipTests' on the command-line instead.
	at com.github.knaufk.flink.faker.FlinkFakerIntegrationTest.testRandomResultsWithoutParallelism(FlinkFakerIntegrationTest.java:42)
Caused by: java.lang.NullPointerException
	at com.github.knaufk.flink.faker.FlinkFakerIntegrationTest.testRandomResultsWithoutParallelism(FlinkFakerIntegrationTest.java:42)

[ERROR] com.github.knaufk.flink.faker.FlinkFakerIntegrationTest.testFlinkFakerWithLimitedNumberOfRows  Time elapsed: 0.055 s  <<< ERROR!
org.apache.flink.table.api.TableException: Failed to execute sql
	at com.github.knaufk.flink.faker.FlinkFakerIntegrationTest.testFlinkFakerWithLimitedNumberOfRows(FlinkFakerIntegrationTest.java:114)
Caused by: org.apache.flink.runtime.rpc.exceptions.RpcLoaderException: Could not load RpcSystem.
	at com.github.knaufk.flink.faker.FlinkFakerIntegrationTest.testFlinkFakerWithLimitedNumberOfRows(FlinkFakerIntegrationTest.java:114)
Caused by: org.apache.flink.runtime.rpc.exceptions.RpcLoaderException: Could not initialize RPC system. Run 'mvn clean package -pl flink-rpc/flink-rpc-akka,flink-rpc/flink-rpc-akka-loader -DskipTests' on the command-line instead.
	at com.github.knaufk.flink.faker.FlinkFakerIntegrationTest.testFlinkFakerWithLimitedNumberOfRows(FlinkFakerIntegrationTest.java:114)
Caused by: java.lang.NullPointerException
	at com.github.knaufk.flink.faker.FlinkFakerIntegrationTest.testFlinkFakerWithLimitedNumberOfRows(FlinkFakerIntegrationTest.java:114)

[ERROR] com.github.knaufk.flink.faker.FlinkFakerIntegrationTest.testLimitPushDown  Time elapsed: 0.157 s  <<< ERROR!
org.apache.flink.table.api.TableException: Failed to execute sql
	at com.github.knaufk.flink.faker.FlinkFakerIntegrationTest.testLimitPushDown(FlinkFakerIntegrationTest.java:194)
Caused by: org.apache.flink.runtime.rpc.exceptions.RpcLoaderException: Could not load RpcSystem.
	at com.github.knaufk.flink.faker.FlinkFakerIntegrationTest.testLimitPushDown(FlinkFakerIntegrationTest.java:194)
Caused by: org.apache.flink.runtime.rpc.exceptions.RpcLoaderException: Could not initialize RPC system. Run 'mvn clean package -pl flink-rpc/flink-rpc-akka,flink-rpc/flink-rpc-akka-loader -DskipTests' on the command-line instead.
	at com.github.knaufk.flink.faker.FlinkFakerIntegrationTest.testLimitPushDown(FlinkFakerIntegrationTest.java:194)
Caused by: java.lang.NullPointerException
	at com.github.knaufk.flink.faker.FlinkFakerIntegrationTest.testLimitPushDown(FlinkFakerIntegrationTest.java:194)

[INFO] Running com.github.knaufk.flink.faker.FlinkFakerGeneratorTest
[INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.043 s - in com.github.knaufk.flink.faker.FlinkFakerGeneratorTest
[INFO] Running com.github.knaufk.flink.faker.FlinkFakerLookupFunctionTest
[INFO] Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.015 s - in com.github.knaufk.flink.faker.FlinkFakerLookupFunctionTest
[INFO] 
[INFO] Results:
[INFO] 
[ERROR] Errors: 
[ERROR]   FlinkFakerIntegrationTest.testFlinkFakerWithComplexTypes:154 » Table Failed to...
[ERROR]   FlinkFakerIntegrationTest.testFlinkFakerWithLimitedNumberOfRows:114 » Table Fa...
[ERROR]   FlinkFakerIntegrationTest.testLimitPushDown:194 » Table Failed to execute sql
[ERROR]   FlinkFakerIntegrationTest.testRandomResultsWithoutParallelism:42 » Table Faile...
[ERROR]   FlinkFakerIntegrationTest.testWithComputedColumn:78 » Table Failed to execute ...
[INFO] 
[ERROR] Tests run: 19, Failures: 0, Errors: 5, Skipped: 0
[INFO] 
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  9.051 s
[INFO] Finished at: 2022-03-24T18:38:06+01:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:3.0.0-M5:test (default-test) on project flink-faker: There are test failures.
[ERROR] 
[ERROR] Please refer to /Users/martijnvisser/Developer/flink-faker/target/surefire-reports for the individual test results.
[ERROR] Please refer to dump files (if any exist) [date].dump, [date]-jvmRun[N].dump and [date].dumpstream.
[ERROR] -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException

Process finished with exit code 1

@knaufk
Copy link
Owner Author

knaufk commented Mar 24, 2022

@MartijnVisser I suspect this somehow related to your local setup. Neither the CI nor me locally can reproduce this.

@MartijnVisser
Copy link
Contributor

MartijnVisser commented Mar 25, 2022

@knaufk Yes, I needed to remove locally downloaded Maven artifacts

Copy link
Contributor

@MartijnVisser MartijnVisser left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I've confirmed that it works with Flink 1.15 RC0

@bodiam
Copy link

bodiam commented Apr 1, 2022

@knaufk @MartijnVisser Datafaker 1.3.0 has been released, this fixes the issue related to the cached data generation.

@knaufk knaufk merged commit fc1691a into master Apr 28, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants