diff --git a/.gitignore b/.gitignore index 3877c83a..1af985e2 100644 --- a/.gitignore +++ b/.gitignore @@ -29,6 +29,13 @@ target/ .idea/ .eclipse/ *.iml +.project +.bloop +.metals +.settings +.vscode +.classpath +.factorypath spark-importer.ipr spark-importer.iws diff --git a/nebula-exchange/pom.xml b/nebula-exchange/pom.xml index 57251449..8f1b6cb7 100644 --- a/nebula-exchange/pom.xml +++ b/nebula-exchange/pom.xml @@ -736,6 +736,11 @@ clickhouse-jdbc 0.2.5 + + org.locationtech.jts + jts-core + 1.16.1 + diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala index 0cf028f3..f33faf31 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala @@ -1,3 +1,4 @@ + /* Copyright (c) 2020 vesoft inc. All rights reserved. * * This source code is licensed under Apache 2.0 License. @@ -5,13 +6,14 @@ package com.vesoft.nebula.exchange.processor -import com.vesoft.nebula.{Date, DateTime, NullType, Time, Value} +import com.vesoft.nebula.{Date, DateTime, NullType, Time, Value, Geography, Coordinate, Point, LineString, Polygon} import com.vesoft.nebula.exchange.utils.NebulaUtils.DEFAULT_EMPTY_VALUE import com.vesoft.nebula.exchange.utils.{HDFSUtils, NebulaUtils} import com.vesoft.nebula.meta.PropertyType import org.apache.spark.sql.Row import org.apache.spark.sql.types.{IntegerType, LongType, StringType} - +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer /** * processor is a converter. * It is responsible for converting the dataframe row data into Nebula Graph's vertex or edge, @@ -155,7 +157,9 @@ trait Processor extends Serializable { row.get(index).toString.toLong } case PropertyType.GEOGRAPHY => { - throw new IllegalArgumentException("sst import does not support GEOGRAPHY property yet.") + val wkt = row.get(index).toString + val jtsGeom = new org.locationtech.jts.io.WKTReader().read(wkt) + convertJTSGeometryToGeography(jtsGeom) } } } @@ -172,4 +176,44 @@ trait Processor extends Serializable { case StringType => row.getString(index).toLong } } + + def convertJTSGeometryToGeography(jtsGeom: org.locationtech.jts.geom.Geometry): Geography = { + jtsGeom.getGeometryType match { + case "Point" => { + val jtsPoint = jtsGeom.asInstanceOf[org.locationtech.jts.geom.Point] + val jtsCoord = jtsPoint.getCoordinate + Geography.ptVal(new Point(new Coordinate(jtsCoord.x, jtsCoord.y))) + } + case "LineString" => { + val jtsLineString = jtsGeom.asInstanceOf[org.locationtech.jts.geom.LineString] + val jtsCoordList = jtsLineString.getCoordinates + val coordList = new ListBuffer[Coordinate]() + for (jtsCoord <- jtsCoordList) { + coordList += new Coordinate(jtsCoord.x, jtsCoord.y) + } + Geography.lsVal(new LineString(coordList.asJava)) + } + case "Polygon" => { + val jtsPolygon = jtsGeom.asInstanceOf[org.locationtech.jts.geom.Polygon] + val coordListList = new java.util.ArrayList[java.util.List[Coordinate]]() + val jtsShell = jtsPolygon.getExteriorRing + val coordList = new ListBuffer[Coordinate]() + for (jtsCoord <- jtsShell.getCoordinates) { + coordList += new Coordinate(jtsCoord.x, jtsCoord.y) + } + coordListList.add(coordList.asJava) + + val jtsHolesNum = jtsPolygon.getNumInteriorRing + for (i <- 0 until jtsHolesNum) { + val coordList = new ListBuffer[Coordinate]() + val jtsHole = jtsPolygon.getInteriorRingN(i) + for (jtsCoord <- jtsHole.getCoordinates) { + coordList += new Coordinate(jtsCoord.x, jtsCoord.y) + } + coordListList.add(coordList.asJava) + } + Geography.pgVal(new Polygon(coordListList)) + } + } + } } diff --git a/nebula-exchange/src/test/scala/com/vesoft/nebula/exchange/processor/ProcessorSuite.scala b/nebula-exchange/src/test/scala/com/vesoft/nebula/exchange/processor/ProcessorSuite.scala index 6c1d7526..a4c0d539 100644 --- a/nebula-exchange/src/test/scala/com/vesoft/nebula/exchange/processor/ProcessorSuite.scala +++ b/nebula-exchange/src/test/scala/com/vesoft/nebula/exchange/processor/ProcessorSuite.scala @@ -6,7 +6,7 @@ package scala.com.vesoft.nebula.exchange.processor import com.vesoft.nebula.exchange.processor.Processor -import com.vesoft.nebula.{Date, DateTime, NullType, Time, Value} +import com.vesoft.nebula.{Date, DateTime, NullType, Time, Value, Geography, Coordinate, Point, LineString, Polygon} import com.vesoft.nebula.meta.PropertyType import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.types.{ @@ -139,6 +139,28 @@ class ProcessorSuite extends Processor { val nullValue = new Value() nullValue.setNVal(NullType.__NULL__) assert(extraValueForSST(row, "col14", map).equals(nullValue)) + + // POINT(3 8) + val geogPoint = Geography.ptVal(new Point(new Coordinate(3, 8))) + val geogPointExpect = extraValueForSST(row, "col15", map) + + assert(geogPointExpect.equals(geogPoint)) + // LINESTRING(3 8, 4.7 73.23) + val line = new java.util.ArrayList[Coordinate]() + line.add(new Coordinate(3, 8)) + line.add(new Coordinate(4.7, 73.23)) + val geogLineString = Geography.lsVal(new LineString(line)) + assert(extraValueForSST(row, "col16", map).equals(geogLineString)) + // POLYGON((0 1, 1 2, 2 3, 0 1)) + val shell: java.util.List[Coordinate] = new java.util.ArrayList[Coordinate]() + shell.add(new Coordinate(0, 1)) + shell.add(new Coordinate(1, 2)) + shell.add(new Coordinate(2, 3)) + shell.add(new Coordinate(0, 1)) + val rings = new java.util.ArrayList[java.util.List[Coordinate]]() + rings.add(shell) + val geogPolygon = Geography.pgVal(new Polygon(rings)) + assert(extraValueForSST(row, "col17", map).equals(geogPolygon)) } /**