Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark DataFrame insert into Exasol Table #37

Merged
merged 13 commits into from
Jan 21, 2019
Merged

Spark DataFrame insert into Exasol Table #37

merged 13 commits into from
Jan 21, 2019

Conversation

morazow
Copy link
Contributor

@morazow morazow commented Jan 16, 2019

What changes are proposed in this PR?

Add a new feature that enable connector users to save the Spark DataFrame into Exasol table.

In order to save a dataframe, the option table should be provided that identify Exasol table. Please note that it should also include the Exasol schema in the table name, e.g. my_schema.my_table.

For example:

df.write
  .mode("overwrite")
  .option("host", "10.0.0.11")
  .option("port", "8563")
  .option("username", "sys")
  .option("password", "exaTheBest")
  .option("table", "my_schema.my_table")
  .format("exasol")
  .save()

It is important to know what each Spark SaveMode means here. The Spark write comes with several SaveMode-s, please check their descriptions:

/**
 * SaveMode is used to specify the expected behavior of saving a DataFrame to a
 * data source.
 *
 * @since 1.3.0
 */
@InterfaceStability.Stable
public enum SaveMode {
  /** 
   * Append mode means that when saving a DataFrame to a data source, if
   * data/table already exists, contents of the DataFrame are expected to be
   * appended to existing data.
   *
   * @since 1.3.0
   */
  Append,
  /** 
   * Overwrite mode means that when saving a DataFrame to a data source, if
   * data/table already exists, existing data is expected to be overwritten by
   * the contents of the DataFrame.
   *
   * @since 1.3.0
   */
  Overwrite,
  /** 
   * ErrorIfExists mode means that when saving a DataFrame to a data source, if
   * data already exists, an exception is expected to be thrown.
   *
   * @since 1.3.0
   */
  ErrorIfExists,
  /** 
   * Ignore mode means that when saving a DataFrame to a data source, if data
   * already exists, the save operation is expected to not save the contents of
   * the DataFrame and to not change the existing data.
   *
   * @since 1.3.0
   */
  Ignore
}

So if the mode is overwrite then the table will be truncated and then dataframe will be saved into that table.

Additionally, I introduced a new user provided parameter called create_table that is by default set to false. This enables connector to create a table if it does not exists.

For instance:

df.write
  .option("host", "10.0.0.11")
  .option("port", "8563")
  .option("username", "sys")
  .option("password", "exaTheBest")
  .option("create_table", "true")
  .option("table", "my_schema.non_existing_table)
  .format("exasol")
  .save()

Addresses #32.

Add more comments, refactor tests.
Moreover, bump some dependency and plugin versions.
They will be useful when doing Exasol insertion, so that we can check if table already available
or truncate it if needed.
This commit adds save modes: overwrite and append.
The problem:

If we have more Exasol data nodes than the number of Dataframe partitions, we will open more
sub-connections and some of them will not be used at all. This currently is a problem, since all
opened sub-connections should be connected and closed.

The solution:

It is easy to perform repartition on dataframe and increase the number of partitions. This is what
current commit does.

However, `repartition` is expensive operation that might involve shuffle. Therefore, we need to
check if we can use fewer (as number of partitions) when initiating sub-connections.
With some of save modes (for instance, Overwrite or Append), if table does not exist in Exasol,
then we first create it using dataframe schema as table schema.

This will allow saving dataframe even though the table was not available in Exasol.

However, maybe creating table action should be permitted via user provided parameter.
`create_table`:

  This is used to create a table, when saving a dataframe, if it does not exist already in Exasol.
  If it is not set to "true" and table does not exists the connector throws an exception. By
  default it is "false".

`batch_size`:

  This parameter is used to configure batch size when writing rows into Exasol jdbc statement. The
  default value is `1000`.
@morazow
Copy link
Contributor Author

morazow commented Jan 16, 2019

Hey @3cham, @jpizagno,

I am not sure saving dataframe into exasol is interesting for your; nevertheless, please feel free to review this pull request.

@3cham
Copy link
Contributor

3cham commented Jan 16, 2019

@morazow: sure, I will look into it tonight.

@jpizagno
Copy link
Contributor

@morazow Yes , saving a dataframe into Exasol seems like a great idea.

@3cham
Copy link
Contributor

3cham commented Jan 17, 2019

well done @morazow, at first glance it looks good to me, however I could not test thoroughly. @jpizagno and I will review more this afternoon.

@morazow
Copy link
Contributor Author

morazow commented Jan 17, 2019

Hey @3cham,

Thanks a lot for looking so far! Please let me know if you have any feedback.

@3cham
Copy link
Contributor

3cham commented Jan 17, 2019

Hi @morazow, so far the integration tests worked, also with our added test. However, when we use our cluster for testing it, we have the following exception. We couldn't understand why it occurs

java.lang.RuntimeException: com.exasol.spark.DefaultSource does not allow create table as select.
  at scala.sys.package$.error(package.scala:27)
  at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:476)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
  ... 48 elided

@morazow
Copy link
Contributor Author

morazow commented Jan 17, 2019

Hey @3cham,

Seems like that error is thrown when the datasource does not provide CreatableRelationProvider or FileFormat classes. Could you please check if the connector jar is latest assembled jar including this pull request commits inside the cluster environment?

providingClass.newInstance() match {
   case dataSource: CreatableRelationProvider =>
     dataSource.createRelation(
       sparkSession.sqlContext, mode, caseInsensitiveOptions, Dataset.ofRows(sparkSession, data))
   case format: FileFormat =>
     // ...
   case _ =>
     sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}   

Could you please also provide the Spark version and the query / insert syntax? Maybe I can try to reproduce the error here from my side also.

@3cham
Copy link
Contributor

3cham commented Jan 17, 2019

Hi @morazow: yeah, the jar is built from your branch. I used Spark 2.2.0 for testing it.

val exasolQueryString = "SELECT * FROM $schema.$table"

val df = spark.read.format("exasol").option("host", "10.*.*.*").option("port", "8563").option("username", "USERNAME").option("password", "****").option("query", exasolQueryString).load()

df.write.mode("overwrite").option("host", "10.*.*.*").option("port", "8563").option("table", "SCHEMA.TABLE2").format("exasol").save()

Could it be Spark Version causing the problem? I will test with newer Spark Version.

@3cham
Copy link
Contributor

3cham commented Jan 17, 2019

Hi @morazow, there was an older jar that built into my spark distribution. After I remove the old jar another Exception occurs:

java.sql.SQLException: object A.TYPE not found [line 1, column 25] (Session: 1622932759260545923)
  at com.exasol.jdbc.ExceptionFactory.createSQLException(ExceptionFactory.java:175)
  at com.exasol.jdbc.EXASQLException.getSQLExceptionIntern(EXASQLException.java:50)
  at com.exasol.jdbc.AbstractEXAStatement.execute(AbstractEXAStatement.java:468)
  at com.exasol.jdbc.EXAStatement.execute(EXAStatement.java:278)
  at com.exasol.jdbc.AbstractEXAStatement.executeQuery(AbstractEXAStatement.java:601)
  at com.exasol.spark.rdd.ExasolRDD.getPartitions(ExasolRDD.scala:80)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
  at org.apache.spark.rdd.RDD.getNumPartitions(RDD.scala:267)
  at com.exasol.spark.DefaultSource.repartitionPerNode(DefaultSource.scala:178)
  at com.exasol.spark.DefaultSource.saveDFTable(DefaultSource.scala:131)
  at com.exasol.spark.DefaultSource.createRelation(DefaultSource.scala:92)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
  ... 49 elided

@morazow
Copy link
Contributor Author

morazow commented Jan 18, 2019

Hey @3cham,

No I do not think Spark version should be a problem. It should be compatible with all version above Spark 2.1+.

However, the second error is happens to be on getPartitions just before saving. The error is related to query string and Exasol execute statement.

Could you please check if the 'TYPE' column is available? Similarly, look for another log line starting with Running with enriched query: ....

@3cham
Copy link
Contributor

3cham commented Jan 18, 2019

Hi @morazow, I should elaborate more in the last comment. However, good news is the data could be written back into Exasol. Great job!

The cause for the first problem is that there was an older artifact of spark-exasol-connector in the distribution that I used. Spark somehow ignored the new one :)

For the second problem. The table that I used has a column defined in lowercase. This is possible in Exasol if you wrap the column name inside "". However in the enrichQuery method, we don't have the quotes: https://github.com/exasol/spark-exasol-connector/blob/master/src/main/scala/com/exasol/spark/ExasolRelation.scala#L105.

I think

    val columnStr = if (columns.isEmpty) "COUNT(*)" else columns.map(c => s"A.$c").mkString(", ")
should be
    val columnStr = if (columns.isEmpty) "COUNT(*)" else columns.map(c => s"A.\"$c\"").mkString(", ")

That why the column type was directly as TYPE interpreted.

@morazow
Copy link
Contributor Author

morazow commented Jan 19, 2019

Hey @3cham,

Ahh okay. But it is good news that you could save the dataframe!

Then if there are no more reviews, I am going to merge this and do a release beginning of the next week.

However, lets address the column quoted issue in separate ticket. I think it is good next task to consider. Similarly, we have now for a while related reserved keywords (#14) issue open.

Thanks a lot for doing the review!

@3cham
Copy link
Contributor

3cham commented Jan 20, 2019

@morazow: yes, LGTM. thank you for this PR.

@morazow morazow merged commit 3a4c43e into exasol:master Jan 21, 2019
@morazow morazow mentioned this pull request Jan 21, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants