Skip to content

Commit

Permalink
[NOID] Fixes #4141: apoc.load.jdbcUpdate inside apoc.periodic.iterate…
Browse files Browse the repository at this point in the history
… leaves idle connections from 5.19 and forward (#4196)
  • Loading branch information
vga91 committed Nov 19, 2024
1 parent 76b33c5 commit a4530c7
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 19 deletions.
2 changes: 2 additions & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ dependencies {
testImplementation 'net.sourceforge.jexcelapi:jxl:2.6.12'

compileOnly group: 'com.amazonaws', name: 'aws-java-sdk-s3', version: '1.12.770'
compileOnly group: 'us.fatehi', name: 'schemacrawler', version: '16.22.2'

testImplementation group: 'org.reflections', name: 'reflections', version: '0.9.12'
testImplementation group: 'junit', name: 'junit', version: '4.13.1'
Expand All @@ -79,6 +80,7 @@ dependencies {

testImplementation group: 'org.neo4j', name: 'neo4j-kernel', version: neo4jVersionEffective, classifier: "tests"
testImplementation group: 'org.neo4j', name: 'neo4j-io', version: neo4jVersionEffective, classifier: "tests"
testImplementation group: 'us.fatehi', name: 'schemacrawler', version: '16.22.2'


def withoutServers = {
Expand Down
40 changes: 26 additions & 14 deletions core/src/main/java/apoc/load/util/JdbcUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@
package apoc.load.util;

import apoc.util.Util;
import us.fatehi.utility.datasource.DatabaseConnectionSource;
import us.fatehi.utility.datasource.DatabaseConnectionSources;
import us.fatehi.utility.datasource.MultiUseUserCredentials;

import java.net.URI;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.sql.Connection;
import java.sql.DriverManager;
import javax.security.auth.Subject;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.login.LoginContext;
import java.sql.SQLException;

public class JdbcUtil {

Expand All @@ -37,26 +41,22 @@ public class JdbcUtil {

private JdbcUtil() {}

public static Connection getConnection(String jdbcUrl, LoadJdbcConfig config) throws Exception {
if (config.hasCredentials()) {
return createConnection(
jdbcUrl,
config.getCredentials().getUser(),
config.getCredentials().getPassword());
public static Object getConnection(String jdbcUrl, LoadJdbcConfig config, Class<?> classType) throws Exception {
if(config.hasCredentials()) {
return createConnection(jdbcUrl, config.getCredentials().getUser(), config.getCredentials().getPassword(), classType);
} else {
URI uri = new URI(jdbcUrl.substring("jdbc:".length()));
String userInfo = uri.getUserInfo();
if (userInfo != null) {
String cleanUrl =
jdbcUrl.substring(0, jdbcUrl.indexOf("://") + 3) + jdbcUrl.substring(jdbcUrl.indexOf("@") + 1);
String cleanUrl = jdbcUrl.substring(0, jdbcUrl.indexOf("://") + 3) + jdbcUrl.substring(jdbcUrl.indexOf("@") + 1);
String[] user = userInfo.split(":");
return createConnection(cleanUrl, user[0], user[1]);
return createConnection(cleanUrl, user[0], user[1], classType);
}
return DriverManager.getConnection(jdbcUrl);
}
}

private static Connection createConnection(String jdbcUrl, String userName, String password) throws Exception {
private static Object createConnection(String jdbcUrl, String userName, String password, Class<?> classType) throws Exception {
if (jdbcUrl.contains(";auth=kerberos")) {
String client = System.getProperty("java.security.auth.login.config.client", "KerberosClient");
LoginContext lc = new LoginContext(client, callbacks -> {
Expand All @@ -68,14 +68,26 @@ private static Connection createConnection(String jdbcUrl, String userName, Stri
lc.login();
Subject subject = lc.getSubject();
try {
return Subject.doAs(subject, (PrivilegedExceptionAction<Connection>)
() -> DriverManager.getConnection(jdbcUrl, userName, password));
return Subject.doAs(subject, (PrivilegedExceptionAction<?>) () -> createConnectionByClass(jdbcUrl, userName, password, classType));
} catch (PrivilegedActionException pae) {
throw pae.getException();
}
} else {
return DriverManager.getConnection(jdbcUrl, userName, password);
return createConnectionByClass(jdbcUrl, userName, password, classType);
}
}

/**
* We return `DatabaseConnectionSources` for Model.java,
* as SchemaCrawlerUtility.getCatalog accepts only `DatabaseConnectionSource` class,
* otherwise we return a `Connection`, via `DriverManager.getConnection`, for Jdbc.java,
* as `DatabaseConnectionSource` causes these error: https://github.com/neo4j-contrib/neo4j-apoc-procedures/issues/4141
*/
private static Object createConnectionByClass(String jdbcUrl, String userName, String password, Class<?> classType) throws SQLException {
if (classType.isAssignableFrom(DatabaseConnectionSource.class)) {
return DatabaseConnectionSources.newDatabaseConnectionSource(jdbcUrl, new MultiUseUserCredentials(userName, password));
}
return DriverManager.getConnection(jdbcUrl, userName, password);
}

public static String getUrlOrKey(String urlOrKey) {
Expand Down
171 changes: 171 additions & 0 deletions extended-it/src/test/java/apoc/load/PostgresJdbcTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package apoc.load;

import apoc.periodic.Periodic;
import apoc.text.Strings;
import apoc.util.TestUtil;
import apoc.util.Util;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.neo4j.graphdb.Result;
import org.neo4j.test.rule.DbmsRule;
import org.neo4j.test.rule.ImpermanentDbmsRule;
import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.containers.PostgreSQLContainer;

import java.sql.SQLException;
import java.util.Map;

import static apoc.util.TestUtil.testCall;
import static apoc.util.TestUtil.testResult;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class PostgresJdbcTest extends AbstractJdbcTest {

// Even if Postgres support the `TIMESTAMP WITH TIMEZONE` type,
// the JDBC driver doesn't. Please check https://github.com/pgjdbc/pgjdbc/issues/996 and when the issue is closed fix this

@ClassRule
public static DbmsRule db = new ImpermanentDbmsRule();

public static JdbcDatabaseContainer postgress;

@BeforeClass
public static void setUp() throws Exception {
postgress = new PostgreSQLContainer().withInitScript("init_postgres.sql");
postgress.start();
TestUtil.registerProcedure(db,Jdbc.class, Periodic.class, Strings.class);
db.executeTransactionally("CALL apoc.load.driver('org.postgresql.Driver')");
}

@AfterClass
public static void tearDown() throws SQLException {
postgress.stop();
db.shutdown();
}

@Test
public void testLoadJdbc() throws Exception {
testCall(db, "CALL apoc.load.jdbc($url,'PERSON',[], $config)", Util.map("url", postgress.getJdbcUrl(),
"config", Util.map("schema", "test",
"credentials", Util.map("user", postgress.getUsername(), "password", postgress.getPassword()))),
(row) -> assertResult(row));
}

@Test
public void testLoadJdbSelect() throws Exception {
testCall(db, "CALL apoc.load.jdbc($url,'SELECT * FROM PERSON',[], $config)", Util.map("url", postgress.getJdbcUrl(),
"config", Util.map("schema", "test",
"credentials", Util.map("user", postgress.getUsername(), "password", postgress.getPassword()))),
(row) -> assertResult(row));
}

@Test
public void testLoadJdbSelectWithArrays() throws Exception {
testCall(db, "CALL apoc.load.jdbc($url,'SELECT * FROM ARRAY_TABLE',[], $config)", Util.map("url", postgress.getJdbcUrl(),
"config", Util.map("schema", "test",
"credentials", Util.map("user", postgress.getUsername(), "password", postgress.getPassword()))),
(result) -> {
Map<String, Object> row = (Map<String, Object>)result.get("row");
assertEquals("John", row.get("NAME"));
int[] intVals = (int[])row.get("INT_VALUES");
assertArrayEquals(intVals, new int[]{1, 2, 3});
double[] doubleVals = (double[])row.get("DOUBLE_VALUES");
assertArrayEquals(doubleVals, new double[]{ 1.0, 2.0, 3.0}, 0.01);
});
}

@Test
public void testLoadJdbcUpdate() throws Exception {
testCall(db, "CALL apoc.load.jdbcUpdate($url,'UPDATE PERSON SET \"SURNAME\" = ? WHERE \"NAME\" = ?', ['DOE', 'John'], $config)",
Util.map("url", postgress.getJdbcUrl(),
"config", Util.map("schema", "test",
"credentials", Util.map("user", postgress.getUsername(), "password", postgress.getPassword()))),
(row) -> assertEquals( Util.map("count", 1 ), row.get("row")));
}

@Test
public void testLoadJdbcParams() throws Exception {
testCall(db, "CALL apoc.load.jdbc($url,'SELECT * FROM PERSON WHERE \"NAME\" = ?',['John'], $config)", // YIELD row RETURN row
Util.map("url", postgress.getJdbcUrl(),
"config", Util.map("schema", "test",
"credentials", Util.map("user", postgress.getUsername(), "password", postgress.getPassword()))),
(row) -> assertResult(row));
}

@Test
public void testIssue4141PeriodicIterateWithJdbc() throws Exception {
var config = Util.map("url", postgress.getJdbcUrl(),
"config", Util.map("schema", "test",
"credentials", Util.map("user", postgress.getUsername(), "password", postgress.getPassword())));

String query = "WITH range(0, 100) as list UNWIND list as l CREATE (n:MyNode{id: l})";

db.executeTransactionally(query, Map.of(), Result::resultAsString);

// Redundant, only to reproduce issue 4141
query = "CALL apoc.load.driver(\"org.postgresql.Driver\")";

db.executeTransactionally(query, Map.of(), Result::resultAsString);

// To replicate the 4141 issue case,
// we cannot use container.getJdbcUrl() because it does not provide the url with username and password
String jdbUrl = getUrl(postgress);

query = """
CALL apoc.periodic.iterate(
"MATCH (n:MyNode) return n",
"WITH n, apoc.text.format('insert into nodes (my_id) values (\\\\\\'%d\\\\\\')',[n.id]) AS sql CALL apoc.load.jdbcUpdate('$url',sql) YIELD row AS row2 return row2,n",
{batchsize: 10,parallel: true})
yield operations
""".replace("$url", jdbUrl);

// check that periodic iterate doesn't throw errors
testResult(db, query, config, this::assertPeriodicIterate);

assertPgStatActivityHasOnlyActiveState();
}

private static void assertPgStatActivityHasOnlyActiveState() throws Exception {
// connect to postgres and execute the query `select state from pg_stat_activity`
String psql = postgress.execInContainer(
"psql", "postgresql://test:test@localhost/test", "-c", "select state from pg_stat_activity;")
.toString();

assertTrue("Current pg_stat_activity is: " + psql, psql.contains("active"));

// the result without the https://github.com/neo4j-contrib/neo4j-apoc-procedures/issues/4141 change
// is not deterministic, can be `too many clients already` or (not very often) `idle`
assertFalse("Current pg_stat_activity is: " + psql,
psql.contains("too many clients already") || psql.contains("idle"));

}

private void assertPeriodicIterate(Result result) {
Map<String, Object> res = result.next();
Map<String, Object> operations = (Map<String, Object>) res.get("operations");

long failed = (Long) operations.get("failed");
assertEquals(0L, failed);

long committed = (Long) operations.get("committed");
assertEquals(101L, committed);

assertFalse(result.hasNext());
}

private static String getUrl(JdbcDatabaseContainer container) {
return String.format(
"jdbc:postgresql://%s:%s@%s:%s/%s?loggerLevel=OFF",
container.getUsername(),
container.getPassword(),
container.getContainerIpAddress(),
container.getMappedPort(5432),
container.getDatabaseName()
);
}
}
2 changes: 1 addition & 1 deletion full/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ dependencies {
compileOnly group: 'org.ow2.asm', name: 'asm', version: '5.0.2'

// schemacrawler
implementation group: 'us.fatehi', name: 'schemacrawler', version: '15.04.01'
implementation group: 'us.fatehi', name: 'schemacrawler', version: '16.22.2'
testImplementation group: 'us.fatehi', name: 'schemacrawler-mysql', version: '15.04.01'

testImplementation group: 'org.apache.hive', name: 'hive-jdbc', version: '1.2.2', withoutServers
Expand Down
4 changes: 2 additions & 2 deletions full/src/main/java/apoc/load/Jdbc.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private Stream<RowResult> executeQuery(
String url = getUrlOrKey(urlOrKey);
String query = getSqlOrKey(tableOrSelect);
try {
Connection connection = getConnection(url, loadJdbcConfig);
Connection connection = (Connection) getConnection(url,loadJdbcConfig, Connection.class);
// see https://jdbc.postgresql.org/documentation/91/query.html#query-with-cursors
connection.setAutoCommit(loadJdbcConfig.isAutoCommit());
try {
Expand Down Expand Up @@ -162,7 +162,7 @@ private Stream<RowResult> executeUpdate(
String url = getUrlOrKey(urlOrKey);
LoadJdbcConfig jdbcConfig = new LoadJdbcConfig(config);
try {
Connection connection = getConnection(url, jdbcConfig);
Connection connection = (Connection) getConnection(url,jdbcConfig, Connection.class);
try {
PreparedStatement stmt =
connection.prepareStatement(query, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
Expand Down
10 changes: 8 additions & 2 deletions full/src/main/java/apoc/model/Model.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
import apoc.Extended;
import apoc.load.util.LoadJdbcConfig;
import apoc.result.VirtualNode;
import org.neo4j.graphdb.*;
import org.neo4j.procedure.Procedure;
import org.neo4j.procedure.*;
import us.fatehi.utility.datasource.DatabaseConnectionSource;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -83,8 +88,9 @@ public Stream<DatabaseModel> jdbc(
SchemaCrawlerOptionsBuilder.builder().withSchemaInfoLevel(SchemaInfoLevelBuilder.standard());
SchemaCrawlerOptions options = optionsBuilder.toOptions();

Catalog catalog = SchemaCrawlerUtility.getCatalog(getConnection(url, new LoadJdbcConfig(config)), options);

DatabaseConnectionSource connectionSource = (DatabaseConnectionSource) getConnection( url, new LoadJdbcConfig(config), DatabaseConnectionSource.class );
Catalog catalog = SchemaCrawlerUtility.getCatalog(connectionSource, options);

DatabaseModel databaseModel = new DatabaseModel();

ModelConfig modelConfig = new ModelConfig(config != null ? config : Collections.emptyMap());
Expand Down
2 changes: 2 additions & 0 deletions full/src/test/resources/init_postgres.sql
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,5 @@ CREATE TABLE ARRAY_TABLE (
);
INSERT INTO ARRAY_TABLE ("NAME", "INT_VALUES", "DOUBLE_VALUES")
VALUES ('John', '{ 1, 2, 3}', '{ 1.0, 2.0, 3.0 }');

CREATE TABLE nodes (id serial PRIMARY KEY, my_id integer);

0 comments on commit a4530c7

Please sign in to comment.