Skip to content

Commit

Permalink
enhancement: delegate connect action to Connector (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
arjenzhou authored Aug 21, 2021
1 parent 365f73b commit 9eda588
Show file tree
Hide file tree
Showing 22 changed files with 357 additions and 236 deletions.
14 changes: 14 additions & 0 deletions api/src/main/java/de/xab/porter/api/annoation/Inject.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package de.xab.porter.api.annoation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Indicates the method is injectable
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Inject {
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* {@link SrcConnection} {@link SinkConnection}
*/
public class DataConnection {
private String connectorType;
private String type;
private String url;
private String username;
Expand All @@ -17,6 +18,7 @@ public DataConnection() {
}

protected DataConnection(AbstractBuilder<?> builder) {
this.connectorType = builder.connectorType;
this.type = builder.type;
this.url = builder.url;
this.username = builder.username;
Expand All @@ -27,6 +29,14 @@ protected DataConnection(AbstractBuilder<?> builder) {
}

//setter and getter
public String getConnectorType() {
return connectorType;
}

public void setConnectorType(String connectorType) {
this.connectorType = connectorType;
}

public String getType() {
return type;
}
Expand Down Expand Up @@ -87,6 +97,7 @@ public void setTable(String table) {
* builder
*/
public abstract static class AbstractBuilder<T extends DataConnection> {
private String connectorType;
private String type;
private String url;
private String username;
Expand All @@ -95,6 +106,11 @@ public abstract static class AbstractBuilder<T extends DataConnection> {
private String schema;
private String table;

public AbstractBuilder<T> connectorType(String connectorType) {
this.connectorType = connectorType;
return this;
}

public AbstractBuilder<T> type(String type) {
this.type = type;
return this;
Expand Down
1 change: 1 addition & 0 deletions api/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
exports de.xab.porter.api.task;
exports de.xab.porter.api.dataconnection;
exports de.xab.porter.api.exception;
exports de.xab.porter.api.annoation;
}
14 changes: 5 additions & 9 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
group 'de.0xab'
version "$porter"

wrapper.gradleVersion = "$gradleWrapper"

buildscript {
repositories {
mavenLocal()
Expand All @@ -16,6 +18,7 @@ allprojects {

sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11

repositories {
mavenLocal()
mavenCentral()
Expand All @@ -30,11 +33,8 @@ subprojects {
testImplementation "org.slf4j:slf4j-simple:$slf4j"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$jupiter"
}
test.useJUnitPlatform()
}

wrapper {
gradleVersion = "$gradleWrapper"
test.useJUnitPlatform()
}

task codeCoverageReport(type: JacocoReport) {
Expand All @@ -45,9 +45,5 @@ task codeCoverageReport(type: JacocoReport) {
additionalSourceDirs.from files(subprojects.sourceSets.main.allSource.srcDirs)
classDirectories.from files(subprojects.sourceSets.main.output)

reports {
xml.required = true
csv.required = false
html.required = false
}
reports.xml.required = true
}
82 changes: 50 additions & 32 deletions common/src/main/java/de/xab/porter/common/spi/ExtensionLoader.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package de.xab.porter.common.spi;

import de.xab.porter.api.annoation.Inject;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
Expand Down Expand Up @@ -48,20 +50,6 @@ public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> service) {
return loader;
}

public T loadExtension(String type) {
Class<T> clazz = loadExtensionClass(type);
if (!clazz.getModule().isOpen(clazz.getPackageName(), this.getClass().getModule())) {
throw new RuntimeException(String.format(
"cannot access class %s at %s", clazz.getName(), this.getClass().getModule()));
}
try {
return clazz.getConstructor().newInstance();
} catch (InstantiationException | NoSuchMethodException
| IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(String.format("cannot create extension %s of %s", type, this.service), e);
}
}

private Class<T> loadExtensionClass(String type) {
Class<T> extensionClass = this.extensions.get(type);
ClassLoader classLoader = findClassLoader(this.service);
Expand All @@ -79,16 +67,61 @@ private Class<T> loadExtensionClass(String type) {
throw new IllegalArgumentException(
String.format("cannot load class %s for %s: %s", extensionName, type, this.service));
}
if (!implementedInterface(extensionClass, this.service)) {
if (!isServiceImplementation(extensionClass, this.service)) {
throw new IllegalStateException(extensionName + " not implemented " + this.service);
}
this.extensions.put(type, extensionClass);
}
}
}
if (!extensionClass.getModule().isOpen(extensionClass.getPackageName(), this.getClass().getModule())) {
throw new RuntimeException(String.format(
"cannot access class %s at %s", extensionClass.getName(), this.getClass().getModule()));
}
return extensionClass;
}

public T loadExtension(String delegationType, String type) {
Class<T> clazz = loadExtensionClass(type);
T t;
try {
t = clazz.getConstructor().newInstance();
} catch (InstantiationException | NoSuchMethodException
| IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(String.format("cannot create extension %s of %s", type, this.service), e);
}
inject(clazz, t, delegationType);
return t;
}

/**
* inject dependencies on setter
*/
private void inject(Class<T> clazz, T t, String delegationType) {
if (delegationType != null) {
for (Method method : clazz.getMethods()) {
Class<?> dependencyClass = null;
if (injectable(method)) {
try {
dependencyClass = method.getParameterTypes()[0];
method.invoke(t, ExtensionLoader.getExtensionLoader(dependencyClass).
loadExtension(null, delegationType));
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(String.format("unable to inject %s to method %s",
dependencyClass, method.getName()), e);
}
}
}
}
}

private boolean injectable(Method method) {
return method.isAnnotationPresent(Inject.class)
&& method.getName().startsWith("set")
&& method.getParameterTypes().length == 1
&& Modifier.isPublic(method.getModifiers());
}

private String findExtensionName(ClassLoader classLoader, String type) throws IOException {
Enumeration<URL> urls;
String resourceFolder = FOLDER + this.service.getName();
Expand Down Expand Up @@ -133,35 +166,20 @@ private Map.Entry<String, String> parseLine(String line) {
return null;
}

private boolean implementedInterface(Class<T> extensionClass, Class<T> serviceClass) {
private boolean isServiceImplementation(Class<T> extensionClass, Class<T> serviceClass) {
Class<?> currentClass = extensionClass;
boolean isImplemented = false;
while (!isImplemented) {
if (currentClass == Object.class) {
break;
}
isImplemented = Arrays.stream(currentClass.getInterfaces()).
anyMatch(oneInterface -> oneInterface == serviceClass);
anyMatch(service -> service == serviceClass);
currentClass = currentClass.getSuperclass();
}
return isImplemented;
}

private void injectExtension(T instance, String type) throws InvocationTargetException, IllegalAccessException {
for (Method method : instance.getClass().getMethods()) {
if (!isTypeSetter(method)) {
continue;
}
method.invoke(instance, type);
}
}

private boolean isTypeSetter(Method method) {
return method.getName().startsWith("setType")
&& Modifier.isPublic(method.getModifiers())
&& method.getParameterCount() == 1;
}

private ClassLoader findClassLoader(Class<T> clazz) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
if (cl == null) {
Expand Down
10 changes: 5 additions & 5 deletions common/src/test/java/de/xab/porter/common/test/spi/SPITest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,31 @@
public class SPITest {
@Test
public void testExists() {
MockService impl = ExtensionLoader.getExtensionLoader(MockService.class).loadExtension("impl");
MockService impl = ExtensionLoader.getExtensionLoader(MockService.class).loadExtension(null, "impl");
assertEquals("hello world", impl.mock());
}

@Test
public void testNotExists() {
assertThrows(TypeNotPresentException.class, () ->
ExtensionLoader.getExtensionLoader(MockService.class).loadExtension("none"));
ExtensionLoader.getExtensionLoader(MockService.class).loadExtension(null, "none"));
}

@Test
public void testTypoImpl() {
assertThrows(IllegalArgumentException.class, () ->
ExtensionLoader.getExtensionLoader(MockService.class).loadExtension("typo"));
ExtensionLoader.getExtensionLoader(MockService.class).loadExtension(null, "typo"));
}

@Test
public void testNoImplemented() {
assertThrows(IllegalStateException.class, () ->
ExtensionLoader.getExtensionLoader(MockService.class).loadExtension("noimpl"));
ExtensionLoader.getExtensionLoader(MockService.class).loadExtension(null, "noimpl"));
}

@Test
public void testNoneRegistered() {
assertThrows(TypeNotPresentException.class, () ->
ExtensionLoader.getExtensionLoader(UnregisteredService.class).loadExtension("any"));
ExtensionLoader.getExtensionLoader(UnregisteredService.class).loadExtension(null, "any"));
}
}
16 changes: 9 additions & 7 deletions core/src/main/java/de/xab/porter/core/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@
public class Task {
private final Logger logger = Loggers.getLogger(this.getClass());
private final Context context;
private Reader reader;
private List<Map.Entry<Writer, Channel>> writers;
private Reader<?> reader;
private List<Map.Entry<? extends Writer<?>, Channel>> writers;

public Task(Context context) {
this.context = context;
}

public void init() {
SrcConnection srcConnection = context.getSrcConnection();
this.reader = ExtensionLoader.getExtensionLoader(Reader.class).loadExtension(srcConnection.getType());
this.reader = ExtensionLoader.getExtensionLoader(Reader.class).
loadExtension(srcConnection.getConnectorType(), srcConnection.getType());
this.reader.setChannels(new ArrayList<>());
register();
//todo split
Expand All @@ -43,17 +44,18 @@ public void init() {
*/
public void register() {
List<SinkConnection> sinkConnections = context.getSinkConnections();
this.writers = sinkConnections.stream().
writers = sinkConnections.stream().
map(sink -> {
Writer writer = ExtensionLoader.getExtensionLoader(Writer.class).loadExtension(sink.getType());
Writer<?> writer = ExtensionLoader.getExtensionLoader(Writer.class).
loadExtension(sink.getConnectorType(), sink.getType());
try {
writer.connect(sink);
} catch (ConnectionException e) {
logger.log(Level.WARNING, "writer connection failed" + e.getCause());
writer.close();
}
Channel channel = ExtensionLoader.getExtensionLoader(Channel.class).
loadExtension(this.context.getProperties().getChannel());
loadExtension(null, this.context.getProperties().getChannel());
channel.setOnReadListener(writer::write);
reader.getChannels().add(channel);
return Map.entry(writer, channel);
Expand Down Expand Up @@ -87,4 +89,4 @@ public void start() {
writers.forEach(writer -> writer.getKey().close());
}
}
}
}
3 changes: 3 additions & 0 deletions demo/src/test/java/de/xab/porter/demo/test/DemoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
public class DemoTest {
private final int rows = 4;
private final String connectorType = "hikari";
private final String type = "demo";
private final String catalog = "porter";
private final String schema = "PUBLIC";
Expand Down Expand Up @@ -57,6 +58,7 @@ public void testNewDataSource() {
Session session = new Session();
Context context = new Context();
SrcConnection srcConn = ((SrcConnection.Builder) new SrcConnection.Builder().
connectorType(connectorType).
type(type).
username("").
password("").
Expand All @@ -66,6 +68,7 @@ public void testNewDataSource() {
build();

SinkConnection sinkConnection = ((SinkConnection.Builder) new SinkConnection.Builder().
connectorType(connectorType).
type(type).
username("").
password("").
Expand Down
Loading

0 comments on commit 9eda588

Please sign in to comment.