From ff4862a8af9bba40b8f34e5b61948ac7f4e588c9 Mon Sep 17 00:00:00 2001 From: liyanl Date: Mon, 18 Oct 2021 16:46:46 +0800 Subject: [PATCH 1/3] metaclient timeout use paramter from configs --- .../com/vesoft/nebula/connector/nebula/MetaProvider.scala | 4 ++-- .../nebula/connector/reader/NebulaPartitionReader.scala | 2 +- .../vesoft/nebula/connector/reader/NebulaSourceReader.scala | 2 +- .../com/vesoft/nebula/connector/writer/NebulaWriter.scala | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/nebula/MetaProvider.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/nebula/MetaProvider.scala index ab1abdea..effb1472 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/nebula/MetaProvider.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/nebula/MetaProvider.scala @@ -15,10 +15,10 @@ import com.vesoft.nebula.meta.{PropertyType, Schema} import scala.collection.JavaConverters._ import scala.collection.mutable -class MetaProvider(addresses: List[Address]) extends AutoCloseable { +class MetaProvider(addresses: List[Address],timeout:Int, connectionRetry:Int, executionRetry:Int) extends AutoCloseable { val metaAddress = addresses.map(address => new HostAddress(address._1, address._2)).asJava - val client = new MetaClient(metaAddress) + val client = new MetaClient(metaAddress,timeout,connectionRetry,executionRetry) client.connect() def getPartitionNumber(space: String): Int = { diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaPartitionReader.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaPartitionReader.scala index aad4a7ad..64bedab9 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaPartitionReader.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaPartitionReader.scala @@ -46,7 +46,7 @@ abstract class NebulaPartitionReader extends InputPartitionReader[InternalRow] { this() this.schema = schema - metaProvider = new MetaProvider(nebulaOptions.getMetaAddress) + metaProvider = new MetaProvider(nebulaOptions.getMetaAddress,nebulaOptions.timeout,nebulaOptions.connectionRetry,nebulaOptions.executionRetry) val address: ListBuffer[HostAddress] = new ListBuffer[HostAddress] for (addr <- nebulaOptions.getMetaAddress) { diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaSourceReader.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaSourceReader.scala index d83aef2b..8cb5ac55 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaSourceReader.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaSourceReader.scala @@ -42,7 +42,7 @@ abstract class NebulaSourceReader(nebulaOptions: NebulaOptions) extends DataSour val returnCols = nebulaOptions.getReturnCols val noColumn = nebulaOptions.noColumn val fields: ListBuffer[StructField] = new ListBuffer[StructField] - val metaProvider = new MetaProvider(nebulaOptions.getMetaAddress) + val metaProvider = new MetaProvider(nebulaOptions.getMetaAddress,nebulaOptions.timeout,nebulaOptions.connectionRetry,nebulaOptions.executionRetry) import scala.collection.JavaConverters._ var schemaCols: Seq[ColumnDef] = Seq() diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaWriter.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaWriter.scala index d1d83ded..2dd342cb 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaWriter.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaWriter.scala @@ -21,7 +21,7 @@ class NebulaWriter(nebulaOptions: NebulaOptions) extends Serializable { val failedExecs: ListBuffer[String] = new ListBuffer[String] val graphProvider = new GraphProvider(nebulaOptions.getGraphAddress) - val metaProvider = new MetaProvider(nebulaOptions.getMetaAddress) + val metaProvider = new MetaProvider(nebulaOptions.getMetaAddress,nebulaOptions.timeout,nebulaOptions.connectionRetry,nebulaOptions.executionRetry) val isVidStringType = metaProvider.getVidType(nebulaOptions.spaceName) == VidType.STRING def prepareSpace(): Unit = { From 04717b830f324859d1c788055a6e0d7c3ffc2dd1 Mon Sep 17 00:00:00 2001 From: liyanl Date: Wed, 20 Oct 2021 10:01:33 +0800 Subject: [PATCH 2/3] format code --- .../com/vesoft/nebula/connector/nebula/MetaProvider.scala | 4 ++-- .../nebula/connector/reader/NebulaPartitionReader.scala | 5 ++++- .../vesoft/nebula/connector/reader/NebulaSourceReader.scala | 5 ++++- .../com/vesoft/nebula/connector/writer/NebulaWriter.scala | 5 ++++- 4 files changed, 14 insertions(+), 5 deletions(-) diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/nebula/MetaProvider.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/nebula/MetaProvider.scala index effb1472..08a8b4c7 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/nebula/MetaProvider.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/nebula/MetaProvider.scala @@ -15,10 +15,10 @@ import com.vesoft.nebula.meta.{PropertyType, Schema} import scala.collection.JavaConverters._ import scala.collection.mutable -class MetaProvider(addresses: List[Address],timeout:Int, connectionRetry:Int, executionRetry:Int) extends AutoCloseable { +class MetaProvider(addresses: List[Address], timeout: Int, connectionRetry: Int, executionRetry: Int) extends AutoCloseable { val metaAddress = addresses.map(address => new HostAddress(address._1, address._2)).asJava - val client = new MetaClient(metaAddress,timeout,connectionRetry,executionRetry) + val client = new MetaClient(metaAddress, timeout, connectionRetry, executionRetry) client.connect() def getPartitionNumber(space: String): Int = { diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaPartitionReader.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaPartitionReader.scala index 64bedab9..ed043092 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaPartitionReader.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaPartitionReader.scala @@ -46,7 +46,10 @@ abstract class NebulaPartitionReader extends InputPartitionReader[InternalRow] { this() this.schema = schema - metaProvider = new MetaProvider(nebulaOptions.getMetaAddress,nebulaOptions.timeout,nebulaOptions.connectionRetry,nebulaOptions.executionRetry) + metaProvider = new MetaProvider(nebulaOptions.getMetaAddress, + nebulaOptions.timeout, + nebulaOptions.connectionRetry, + nebulaOptions.executionRetry) val address: ListBuffer[HostAddress] = new ListBuffer[HostAddress] for (addr <- nebulaOptions.getMetaAddress) { diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaSourceReader.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaSourceReader.scala index 8cb5ac55..64b34903 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaSourceReader.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaSourceReader.scala @@ -42,7 +42,10 @@ abstract class NebulaSourceReader(nebulaOptions: NebulaOptions) extends DataSour val returnCols = nebulaOptions.getReturnCols val noColumn = nebulaOptions.noColumn val fields: ListBuffer[StructField] = new ListBuffer[StructField] - val metaProvider = new MetaProvider(nebulaOptions.getMetaAddress,nebulaOptions.timeout,nebulaOptions.connectionRetry,nebulaOptions.executionRetry) + val metaProvider = new MetaProvider(nebulaOptions.getMetaAddress, + nebulaOptions.timeout, + nebulaOptions.connectionRetry, + nebulaOptions.executionRetry) import scala.collection.JavaConverters._ var schemaCols: Seq[ColumnDef] = Seq() diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaWriter.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaWriter.scala index 2dd342cb..e93adfd4 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaWriter.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaWriter.scala @@ -21,7 +21,10 @@ class NebulaWriter(nebulaOptions: NebulaOptions) extends Serializable { val failedExecs: ListBuffer[String] = new ListBuffer[String] val graphProvider = new GraphProvider(nebulaOptions.getGraphAddress) - val metaProvider = new MetaProvider(nebulaOptions.getMetaAddress,nebulaOptions.timeout,nebulaOptions.connectionRetry,nebulaOptions.executionRetry) + val metaProvider = new MetaProvider(nebulaOptions.getMetaAddress, + nebulaOptions.timeout, + nebulaOptions.connectionRetry, + nebulaOptions.executionRetry) val isVidStringType = metaProvider.getVidType(nebulaOptions.spaceName) == VidType.STRING def prepareSpace(): Unit = { From 4292d0e32b318884abfe5cd19ab24490d3a63e5d Mon Sep 17 00:00:00 2001 From: liyanl Date: Fri, 22 Oct 2021 16:09:04 +0800 Subject: [PATCH 3/3] metaclient, sync the test code --- .../com/vesoft/nebula/connector/nebula/MetaProviderTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/nebula/MetaProviderTest.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/nebula/MetaProviderTest.scala index 75a86d10..4cdbe261 100644 --- a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/nebula/MetaProviderTest.scala +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/nebula/MetaProviderTest.scala @@ -18,7 +18,7 @@ class MetaProviderTest extends AnyFunSuite with BeforeAndAfterAll { override def beforeAll(): Unit = { val addresses: List[Address] = List(new Address("127.0.0.1", 9559)) - metaProvider = new MetaProvider(addresses) + metaProvider = new MetaProvider(addresses, 6000, 3, 3) val graphMock = new NebulaGraphMock graphMock.mockStringIdGraph()