Skip to content

Commit

Permalink
Add ErrorConfig to hold errors contacting locations. (#484)
Browse files Browse the repository at this point in the history
Co-authored-by: Benjamin Gaidioz <[email protected]>
  • Loading branch information
miguelbranco80 and bgaidioz authored Aug 14, 2024
1 parent 3c5c420 commit eda8692
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 29 deletions.
5 changes: 5 additions & 0 deletions protocol/src/main/protobuf/raw/protocol/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ message LocationConfig {
DropboxUsernamePasswordConfig dropboxUsernamePassword = 14;
HttpHeadersConfig httpHeaders = 15;
SecretConfig secret = 99;
ErrorConfig error = 9999;
}
}

Expand Down Expand Up @@ -145,4 +146,8 @@ message HttpHeadersConfig {
message SecretConfig {
string name = 1;
string value = 2;
}

message ErrorConfig {
string message = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ object LocationDescription extends StrictLogging {
val delay = parser.getDuration("delay").toMillis
urlToLocationDescription(delegateUri, programEnvironment).right.map(MockPathLocationDescription(delay, _))
} catch {
case _: ConfigException => Left(s"not a mock location: $url")
case _: ConfigException => Left("not a mock location")
}
}
case "s3" =>
Expand Down Expand Up @@ -624,6 +624,8 @@ object LocationDescription extends StrictLogging {
objectKey
)
)
case Some(l) if l.hasError => Left(l.getError.getMessage)
case Some(_) => Left("not a S3 credential")
case None =>
// Anonymous access.
Right(S3PathLocationDescription(bucketName, None, None, None, objectKey))
Expand All @@ -636,6 +638,7 @@ object LocationDescription extends StrictLogging {
// In Dropbox, the host is the name of the credential
val DROPBOX_REGEX(name, path) = url
if (name == null) {
logger.warn("missing 'name' in Dropbox location")
return Left("missing Dropbox credential")
}
programEnvironment.locationConfigs.get(name) match {
Expand All @@ -651,7 +654,28 @@ object LocationDescription extends StrictLogging {
path
)
)
case None => Left("missing Dropbox credential")
case Some(l) if l.hasHttpHeaders =>
if (l.getHttpHeaders.getHeadersMap.containsKey("Authorization")) {
val splitted = l.getHttpHeaders.getHeadersMap.get("Authorization").split("Bearer ")
if (splitted.length == 2) {
Right(
DropboxAccessTokenLocationDescription(
splitted(1),
path
)
)
} else {
Left("invalid Dropbox credential")
}
} else {
logger.warn("missing Dropbox 'Authorization'")
Left("missing Dropbox credential")
}
case Some(l) if l.hasError => Left(l.getError.getMessage)
case Some(_) => Left("not a Dropbox credential")
case None =>
logger.warn("missing Dropbox credential")
Left("missing Dropbox credential")
}
case _ => Left(s"unsupported protocol: $protocol")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class MySQLInferAndReadEntry extends SugarEntryExtension {
new MySqlTableLocation(l1.getHost, l1.getPort, l1.getDatabase, l1.getUser, l1.getPassword, table)(
programContext.settings
)
case Some(l) if l.hasError => return Left(l.getError.getMessage)
case Some(_) => return Left("not a MySQL server")
case None => return Left(s"unknown credential: $db")
}
Expand Down Expand Up @@ -394,6 +395,7 @@ class MySQLInferAndQueryEntry extends SugarEntryExtension {
new MySqlServerLocation(l1.getHost, l1.getPort, l1.getDatabase, l1.getUser, l1.getPassword)(
programContext.settings
)
case Some(l) if l.hasError => return Left(l.getError.getMessage)
case Some(_) => return Left("not a MySQL server")
case None => return Left(s"unknown credential: $db")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ class OracleInferAndReadEntry extends SugarEntryExtension {
new OracleTableLocation(l1.getHost, l1.getPort, l1.getDatabase, l1.getUser, l1.getPassword, schema, table)(
programContext.settings
)
case Some(l) if l.hasError => return Left(l.getError.getMessage)
case Some(_) => return Left("not an Oracle server")
case None => return Left(s"unknown credential: $db")
}
Expand Down Expand Up @@ -423,6 +424,7 @@ class OracleInferAndQueryEntry extends SugarEntryExtension {
new OracleServerLocation(l1.getHost, l1.getPort, l1.getDatabase, l1.getUser, l1.getPassword)(
programContext.settings
)
case Some(l) if l.hasError => return Left(l.getError.getMessage)
case Some(_) => return Left("not an Oracle server")
case None => return Left(s"unknown credential: $db")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ class PostgreSQLInferAndReadEntry extends SugarEntryExtension {
)(
programContext.settings
)
case Some(l) if l.hasError => return Left(l.getError.getMessage)
case Some(_) => return Left("not a PostgreSQL server")
case None => return Left(s"unknown credential: $db")
}
Expand Down Expand Up @@ -433,6 +434,7 @@ class PostgreSQLInferAndQueryEntry extends SugarEntryExtension {
new PostgresqlServerLocation(l1.getHost, l1.getPort, l1.getDatabase, l1.getUser, l1.getPassword)(
programContext.settings
)
case Some(l) if l.hasError => return Left(l.getError.getMessage)
case Some(_) => return Left("not an Oracle server")
case None => return Left(s"unknown credential: $db")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ class SQLServerInferAndReadEntry extends SugarEntryExtension {
)(
programContext.settings
)
case Some(l) if l.hasError => return Left(l.getError.getMessage)
case Some(_) => return Left("not an Oracle server")
case None => return Left(s"unknown credential: $db")
}
Expand Down Expand Up @@ -435,6 +436,7 @@ class SQLServerInferAndQueryEntry extends SugarEntryExtension {
new SqlServerServerLocation(l1.getHost, l1.getPort, l1.getDatabase, l1.getUser, l1.getPassword)(
programContext.settings
)
case Some(l) if l.hasError => return Left(l.getError.getMessage)
case Some(_) => return Left("not an Oracle server")
case None => return Left(s"unknown credential: $db")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ class SnowflakeInferAndReadEntry extends SugarEntryExtension {
)(
programContext.settings
)
case Some(l) if l.hasError => return Left(l.getError.getMessage)
case Some(_) => return Left("not a Snowflake server")
case None => return Left(s"unknown credential: $db")
}
Expand Down Expand Up @@ -501,6 +502,7 @@ class SnowflakeInferAndQueryEntry extends SugarEntryExtension {
l1.getParametersMap,
programContext.settings
)
case Some(l) if l.hasError => return Left(l.getError.getMessage)
case Some(_) => return Left("not a Snowflake server")
case None => return Left(s"unknown credential: $db")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,11 @@ public LocationConfig getLocationConfig(String name) {
if (maybeLocationConfig.isEmpty()) {
throw new RawTruffleRuntimeException("unknown credential: " + name);
}
return maybeLocationConfig.get();
LocationConfig locationConfig = maybeLocationConfig.get();
if (locationConfig.hasError()) {
throw new RawTruffleRuntimeException(locationConfig.getError().getMessage());
}
return locationConfig;
}

@CompilerDirectives.TruffleBoundary
Expand Down
17 changes: 12 additions & 5 deletions sql-client/src/main/scala/raw/client/sql/SqlConnection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,18 @@ import java.util.concurrent.Executor
class SqlConnection(connectionPool: SqlConnectionPool, conn: Connection) extends java.sql.Connection {

override def close(): Unit = {
// We do not ACTUALLY close the connection; instead, we just release the borrow.
connectionPool.releaseConnection(
this,
isAlive = false // We are not sure if the connection is alive or not, e.g. it could be closed because it failed.
)
if (isClosed) {
// If the connection closed in the meantime (e.g. due to a crash), we cannot release it back to the pool.
connectionPool.actuallyRemoveConnection(this)
} else {
// If the connection seems "sane", then we do not ACTUALLY close the connection.
// Instead, we just release the borrow.
connectionPool.releaseConnection(
this,
isAlive =
false // We are not sure if the connection is alive or not, e.g. it could be closed because it failed.
)
}
}

// This is called by the connection pool when we *actually* want to close the connection.
Expand Down
47 changes: 26 additions & 21 deletions sql-client/src/main/scala/raw/client/sql/SqlConnectionPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class SqlConnectionPool()(implicit settings: RawSettings) extends RawService wit
logger.debug(s"Checking the connection health for $conn (state: ${connectionUrls(conn)})")
// Found one connection to check.
try {
if (conn.isValid(isValidSeconds)) {
if (!conn.isClosed() && conn.isValid(isValidSeconds)) {
logger.debug(s"Connection $conn is healthy")
// All good, so release borrow.
// This will update the last check is alive time.
Expand Down Expand Up @@ -225,27 +225,32 @@ class SqlConnectionPool()(implicit settings: RawSettings) extends RawService wit
}
}

private def actuallyRemoveConnection(conn: SqlConnection): Boolean = {
logger.debug(s"Actually removing connection $conn")
def actuallyRemoveConnection(conn: SqlConnection): Boolean = {
connectionPoolLock.synchronized {
val jdbcUrl = connectionUrls(conn)
try {
// First try to actually close the connection (note the use of actuallyClose), then clean up all the state.
conn.actuallyClose()
connectionState.remove(conn)
connectionUrls.remove(conn)
connectionCache.get(jdbcUrl) match {
case Some(conns) =>
val nconns = conns - conn
if (nconns.isEmpty) connectionCache.remove(jdbcUrl)
else connectionCache.put(jdbcUrl, nconns)
case None => // Nothing to do.
}
true
} catch {
case NonFatal(t) =>
// We failed to actually close the connection.
logger.warn(s"Failed to actually close the connection $conn", t)
connectionUrls.get(conn) match {
case Some(jdbcUrl) =>
logger.debug(s"Actually removing connection $conn")
try {
// First try to actually close the connection (note the use of actuallyClose), then clean up all the state.
conn.actuallyClose()
connectionState.remove(conn)
connectionUrls.remove(conn)
connectionCache.get(jdbcUrl) match {
case Some(conns) =>
val nconns = conns - conn
if (nconns.isEmpty) connectionCache.remove(jdbcUrl)
else connectionCache.put(jdbcUrl, nconns)
case None => // Nothing to do.
}
true
} catch {
case NonFatal(t) =>
// We failed to actually close the connection.
logger.warn(s"Failed to actually close the connection $conn", t)
false
}
case None =>
// Connection didn't exist/was removed already.
false
}
}
Expand Down

0 comments on commit eda8692

Please sign in to comment.