Skip to content

Commit

Permalink
s3 persistence (atum, sdk fs usage, ...) (#1526)
Browse files Browse the repository at this point in the history
#1526 
* FsUtils divided into LocalFsUtils & HdfsUtils
* PathConfigSuite update
* S3FsUtils with tail-recursive pagination accumulation - now generic with optional short-circuit breakOut
TestRunnerJob updated to manually cover the cases - should serve as a basis for tests
* HdfsUtils replace by trait DistributedFsUtils (except for MenasCredentials loading & nonSplittable splitting)
* using final version of s3-powered Atum (3.0.0)
* mockito-update version update, scalatest version update
* S3FsUtilsSuite: exists, read, sizeDir(hidden, non-hidden, reucursive), non-splittable (simple, recursive with breakOut), delete (recursive), version find (simple - empty, recursive)
* explicit stubbing fix for hyperdrive
  • Loading branch information
dk1844 authored Oct 16, 2020
1 parent 2b3c39c commit 5dfc40f
Show file tree
Hide file tree
Showing 154 changed files with 1,657 additions and 640 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.enceladus.dao.auth
import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.SparkSession
import sun.security.krb5.internal.ktab.KeyTab
import za.co.absa.enceladus.utils.fs.FileSystemVersionUtils
import za.co.absa.enceladus.utils.fs.HdfsUtils

sealed abstract class MenasCredentials {
val username: String
Expand All @@ -40,9 +40,9 @@ object MenasPlainCredentials {
* @return An instance of Menas Credentials.
*/
def fromFile(path: String)(implicit spark: SparkSession): MenasPlainCredentials = {
val fsUtils = new FileSystemVersionUtils(spark.sparkContext.hadoopConfiguration)
val fsUtils = new HdfsUtils(spark.sparkContext.hadoopConfiguration)

val conf = ConfigFactory.parseString(fsUtils.getFileContent(path))
val conf = ConfigFactory.parseString(fsUtils.getLocalOrDistributedFileContent(path))
MenasPlainCredentials(conf.getString("username"), conf.getString("password"))
}
}
Expand All @@ -55,9 +55,9 @@ object MenasKerberosCredentials {
* @return An instance of Menas Credentials.
*/
def fromFile(path: String)(implicit spark: SparkSession): MenasKerberosCredentials = {
val fsUtils = new FileSystemVersionUtils(spark.sparkContext.hadoopConfiguration)
val fsUtils = new HdfsUtils(spark.sparkContext.hadoopConfiguration)

val localKeyTabPath = fsUtils.getLocalPathToFile(path)
val localKeyTabPath = fsUtils.getLocalPathToFileOrCopyToLocal(path)
val keytab = KeyTab.getInstance(localKeyTabPath)
val username = keytab.getOneName.getName

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@

package za.co.absa.enceladus.dao.rest

import org.scalatest.mockito.MockitoSugar
import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}
import org.mockito.scalatest.MockitoSugar
import org.scalatest.wordspec.AnyWordSpec
import org.scalatest.BeforeAndAfter
import org.scalatest.matchers.should.Matchers

abstract class BaseTestSuite extends WordSpec
abstract class BaseTestSuite extends AnyWordSpec
with Matchers
with MockitoSugar
with BeforeAndAfter
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@

package za.co.absa.enceladus.dao.rest

import org.scalatest.{Matchers, WordSpec}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.enceladus.dao.UnauthorizedException
import za.co.absa.enceladus.dao.auth.{InvalidMenasCredentials, MenasKerberosCredentials, MenasPlainCredentials}

class RestDaoFactorySuite extends WordSpec with Matchers {
class RestDaoFactorySuite extends AnyWordSpec with Matchers {

private val menasApiBaseUrls = List("http://localhost:8080/menas/api")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@
package za.co.absa.enceladus.dao.rest.auth

import org.mockito.stubbing.OngoingStubbing
import org.scalatest.mockito.MockitoSugar
import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}
import org.scalatest.matchers.should.Matchers
import org.mockito.scalatest.MockitoSugar
import org.scalatest.wordspec.AnyWordSpec
import org.scalatest.BeforeAndAfter
import org.springframework.http.{HttpHeaders, ResponseEntity}
import org.springframework.util.LinkedMultiValueMap
import org.springframework.web.client.RestTemplate
import za.co.absa.enceladus.dao.UnauthorizedException
import za.co.absa.enceladus.dao.rest.{ApiCaller, ApiCallerStub, AuthClient}

abstract class AuthClientSuite() extends WordSpec
abstract class AuthClientSuite() extends AnyWordSpec
with Matchers
with MockitoSugar
with BeforeAndAfter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@

package za.co.absa.enceladus.dao.rest.auth

import org.scalatest.WordSpec
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.enceladus.dao.auth.MenasPlainCredentials
import za.co.absa.enceladus.utils.fs.FileSystemVersionUtils
import za.co.absa.enceladus.utils.fs.LocalFsUtils
import za.co.absa.enceladus.utils.testUtils.SparkTestBase

class MenasPlainCredentialsSuite extends WordSpec with SparkTestBase {
class MenasPlainCredentialsSuite extends AnyWordSpec with SparkTestBase {

"MenasPlainCredentials" should {
"be read from *.conf" in {
Expand All @@ -42,9 +42,7 @@ class MenasPlainCredentialsSuite extends WordSpec with SparkTestBase {
val homeDir = System.getProperty("user.home")
val expected = s"$homeDir/dir/file"

val fsUtils = new FileSystemVersionUtils(spark.sparkContext.hadoopConfiguration)

val actual = fsUtils.replaceHome("~/dir/file")
val actual = LocalFsUtils.replaceHome("~/dir/file")
assert(actual == expected)
}
}
Expand Down
6 changes: 6 additions & 0 deletions data-model/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@
<version>${scalatest.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-funsuite_${scala.compat.version}</artifactId>
<version>${scalatest.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ package za.co.absa.enceladus.model.conformanceRule

import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.scalatest.{Matchers, WordSpec}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

class ConformanceRuleTest extends WordSpec with Matchers {
class ConformanceRuleTest extends AnyWordSpec with Matchers {

private val objectMapper = new ObjectMapper()
.registerModule(DefaultScalaModule)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@

package za.co.absa.enceladus.model.menas.audit

import org.scalatest.FunSuite
import org.scalatest.funsuite.AnyFunSuite
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.model.conformanceRule.{DropConformanceRule, LiteralConformanceRule}
import za.co.absa.enceladus.model.conformanceRule.ConformanceRule

class AuditableTest extends FunSuite {
class AuditableTest extends AnyFunSuite {
val obj1 = Dataset(name = "Test DS",
version = 0,
hdfsPath = "oldPath",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import za.co.absa.enceladus.dao.auth.MenasKerberosCredentials
import za.co.absa.enceladus.dao.rest.RestDaoFactory
import za.co.absa.enceladus.examples.interpreter.rules.custom.UppercaseCustomConformanceRule
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.utils.fs.HdfsUtils
import za.co.absa.enceladus.utils.time.TimeZoneNormalizer

object CustomRuleSample1 {
Expand All @@ -37,6 +38,8 @@ object CustomRuleSample1 {
.getOrCreate()
TimeZoneNormalizer.normalizeAll(spark) //normalize the timezone of JVM and the spark session

implicit val fsUtils: HdfsUtils = new HdfsUtils(spark.sparkContext.hadoopConfiguration)

def main(args: Array[String]) {
// scalastyle:off magic.number
val menasBaseUrls = List("http://localhost:8080/menas")
Expand Down Expand Up @@ -78,7 +81,7 @@ object CustomRuleSample1 {
.setCatalystWorkaroundEnabled(isCatalystWorkaroundEnabled)
.setControlFrameworkEnabled(enableCF)

val outputData: DataFrame = DynamicInterpreter.interpret(conformanceDef, inputData)
val outputData: DataFrame = DynamicInterpreter().interpret(conformanceDef, inputData)

outputData.show(false)
//scalastyle:on magicnumber
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import za.co.absa.enceladus.dao.auth.MenasKerberosCredentials
import za.co.absa.enceladus.dao.rest.{MenasConnectionStringParser, RestDaoFactory}
import za.co.absa.enceladus.examples.interpreter.rules.custom.LPadCustomConformanceRule
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.utils.fs.HdfsUtils
import za.co.absa.enceladus.utils.time.TimeZoneNormalizer

object CustomRuleSample2 {
Expand All @@ -38,6 +39,8 @@ object CustomRuleSample2 {
.getOrCreate()
TimeZoneNormalizer.normalizeAll(spark) //normalize the timezone of JVM and the spark session

implicit val fsUtils: HdfsUtils = new HdfsUtils(spark.sparkContext.hadoopConfiguration)

def main(args: Array[String]) {
// scalastyle:off magic.number
val conf = ConfigFactory.load()
Expand Down Expand Up @@ -81,7 +84,7 @@ object CustomRuleSample2 {
.setCatalystWorkaroundEnabled(isCatalystWorkaroundEnabled)
.setControlFrameworkEnabled(enableCF)

val outputData: DataFrame = DynamicInterpreter.interpret(conformanceDef, inputData)
val outputData: DataFrame = DynamicInterpreter().interpret(conformanceDef, inputData)

outputData.show(false)
// scalastyle:on magic.number
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import za.co.absa.enceladus.dao.auth.MenasKerberosCredentials
import za.co.absa.enceladus.dao.rest.{MenasConnectionStringParser, RestDaoFactory}
import za.co.absa.enceladus.examples.interpreter.rules.custom.{LPadCustomConformanceRule, UppercaseCustomConformanceRule}
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.utils.fs.HdfsUtils
import za.co.absa.enceladus.utils.time.TimeZoneNormalizer

object CustomRuleSample3 {
Expand All @@ -33,6 +34,7 @@ object CustomRuleSample3 {
.config("spark.sql.codegen.wholeStage", value = false)
.getOrCreate()
TimeZoneNormalizer.normalizeAll(spark) //normalize the timezone of JVM and the spark session
implicit val fsUtils: HdfsUtils = new HdfsUtils(spark.sparkContext.hadoopConfiguration)

def main(args: Array[String]): Unit = {
val conf = ConfigFactory.load()
Expand Down Expand Up @@ -79,7 +81,7 @@ object CustomRuleSample3 {
.setCatalystWorkaroundEnabled(isCatalystWorkaroundEnabled)
.setControlFrameworkEnabled(enableCF)

val outputData: DataFrame = DynamicInterpreter.interpret(conformanceDef, inputData)
val outputData: DataFrame = DynamicInterpreter().interpret(conformanceDef, inputData)

outputData.show()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import za.co.absa.enceladus.dao.auth.MenasKerberosCredentials
import za.co.absa.enceladus.dao.rest.{MenasConnectionStringParser, RestDaoFactory}
import za.co.absa.enceladus.examples.interpreter.rules.custom.{LPadCustomConformanceRule, UppercaseCustomConformanceRule}
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.utils.fs.HdfsUtils
import za.co.absa.enceladus.utils.time.TimeZoneNormalizer

object CustomRuleSample4 {
Expand Down Expand Up @@ -138,6 +139,7 @@ object CustomRuleSample4 {
def main(args: Array[String]): Unit = {
val cmd: CmdConfigLocal = getCmdLineArguments(args)
implicit val spark: SparkSession = buildSparkSession()
implicit val fsUtils: HdfsUtils = new HdfsUtils(spark.sparkContext.hadoopConfiguration)

val conf = ConfigFactory.load()
val menasBaseUrls = MenasConnectionStringParser.parse(conf.getString("menas.rest.uri"))
Expand Down Expand Up @@ -186,7 +188,7 @@ object CustomRuleSample4 {
.setCatalystWorkaroundEnabled(true)
.setControlFrameworkEnabled(false)

val outputData: DataFrame = DynamicInterpreter.interpret(conformanceDef, inputData)
val outputData: DataFrame = DynamicInterpreter().interpret(conformanceDef, inputData)
outputData.show()
saveToCsv(outputData, cmd.outPath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ package za.co.absa.enceladus.examples.interpreter.rules.custom

import org.apache.spark.sql
import org.apache.spark.sql.DataFrame
import org.scalatest.FunSuite
import org.scalatest.mockito.MockitoSugar
import org.scalatest.funsuite.AnyFunSuite
import org.mockito.scalatest.MockitoSugar
import za.co.absa.enceladus.conformance.config.ConformanceConfig
import za.co.absa.enceladus.conformance.interpreter.{DynamicInterpreter, FeatureSwitches}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.utils.fs.HdfsUtils
import za.co.absa.enceladus.utils.testUtils.SparkTestBase


Expand All @@ -32,11 +33,12 @@ object TestOutputRow {
def apply(input: TestInputRow, doneUpper: String): TestOutputRow = TestOutputRow(input.id, input.mandatoryString, input.nullableString, doneUpper)
}

class UppercaseCustomConformanceRuleSuite extends FunSuite with SparkTestBase with MockitoSugar {
class UppercaseCustomConformanceRuleSuite extends AnyFunSuite with SparkTestBase with MockitoSugar {
import spark.implicits._

implicit val progArgs: ConformanceConfig = ConformanceConfig() // here we may need to specify some parameters (for certain rules)
implicit val dao: MenasDAO = mock[MenasDAO] // you may have to hard-code your own implementation here (if not working with menas)
implicit val fsUtils: HdfsUtils = new HdfsUtils(spark.sparkContext.hadoopConfiguration)

val experimentalMR = true
val isCatalystWorkaroundEnabled = true
Expand Down Expand Up @@ -67,7 +69,7 @@ class UppercaseCustomConformanceRuleSuite extends FunSuite with SparkTestBase wi
.setCatalystWorkaroundEnabled(isCatalystWorkaroundEnabled)
.setControlFrameworkEnabled(enableCF)

val outputData: sql.DataFrame = DynamicInterpreter.interpret(conformanceDef, inputData)
val outputData: sql.DataFrame = DynamicInterpreter().interpret(conformanceDef, inputData)

val output: Seq[TestOutputRow] = outputData.as[TestOutputRow].collect().toSeq
val expected: Seq[TestOutputRow] = (input zip Seq("HELLO WORLD", "ONE RING TO RULE THEM ALL", "ALREADY CAPS")).map(x => TestOutputRow(x._1, x._2))
Expand Down Expand Up @@ -101,7 +103,7 @@ class UppercaseCustomConformanceRuleSuite extends FunSuite with SparkTestBase wi
.setCatalystWorkaroundEnabled(isCatalystWorkaroundEnabled)
.setControlFrameworkEnabled(enableCF)

val outputData: sql.DataFrame = DynamicInterpreter.interpret(conformanceDef, inputData)
val outputData: sql.DataFrame = DynamicInterpreter().interpret(conformanceDef, inputData)

val output: Seq[TestOutputRow] = outputData.as[TestOutputRow].collect().toSeq
val expected: Seq[TestOutputRow] = (input zip Seq("1", "4", "9")).map(x => TestOutputRow(x._1, x._2))
Expand Down Expand Up @@ -134,7 +136,7 @@ class UppercaseCustomConformanceRuleSuite extends FunSuite with SparkTestBase wi
.setCatalystWorkaroundEnabled(isCatalystWorkaroundEnabled)
.setControlFrameworkEnabled(enableCF)

val outputData: sql.DataFrame = DynamicInterpreter.interpret(conformanceDef, inputData)
val outputData: sql.DataFrame = DynamicInterpreter().interpret(conformanceDef, inputData)

val output: List[TestOutputRow] = outputData.as[TestOutputRow].collect().toList
val expected: List[TestOutputRow] = (input zip Seq("WHAT A BEAUTIFUL PLACE", "ONE RING TO FIND THEM", null)).map(x => TestOutputRow(x._1, x._2)).toList
Expand Down
Loading

0 comments on commit 5dfc40f

Please sign in to comment.