Skip to content

Commit

Permalink
Feature/1556 file access PoC using Hadoop FS API (#1586)
Browse files Browse the repository at this point in the history
* s3 using hadoop fs api
* s3 sdk usage removed (pom, classes, tests)
* atum final version 3.1.0 used
* readStandardizationInputData(... path: String)(implicit ... fs: FileSystem) -> readStandardizationInputData(input: PathWithFs)
  • Loading branch information
dk1844 authored Nov 9, 2020
1 parent 5dfc40f commit 5b5628e
Show file tree
Hide file tree
Showing 47 changed files with 423 additions and 1,103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.enceladus.dao.auth
import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.SparkSession
import sun.security.krb5.internal.ktab.KeyTab
import za.co.absa.enceladus.utils.fs.HdfsUtils
import za.co.absa.enceladus.utils.fs.{FileSystemUtils, HadoopFsUtils}

sealed abstract class MenasCredentials {
val username: String
Expand All @@ -40,9 +40,9 @@ object MenasPlainCredentials {
* @return An instance of Menas Credentials.
*/
def fromFile(path: String)(implicit spark: SparkSession): MenasPlainCredentials = {
val fsUtils = new HdfsUtils(spark.sparkContext.hadoopConfiguration)
val fs = FileSystemUtils.getFileSystemFromPath(path)(spark.sparkContext.hadoopConfiguration)

val conf = ConfigFactory.parseString(fsUtils.getLocalOrDistributedFileContent(path))
val conf = ConfigFactory.parseString(HadoopFsUtils.getOrCreate(fs).getLocalOrDistributedFileContent(path))
MenasPlainCredentials(conf.getString("username"), conf.getString("password"))
}
}
Expand All @@ -55,9 +55,9 @@ object MenasKerberosCredentials {
* @return An instance of Menas Credentials.
*/
def fromFile(path: String)(implicit spark: SparkSession): MenasKerberosCredentials = {
val fsUtils = new HdfsUtils(spark.sparkContext.hadoopConfiguration)
val fs = FileSystemUtils.getFileSystemFromPath(path)(spark.sparkContext.hadoopConfiguration)

val localKeyTabPath = fsUtils.getLocalPathToFileOrCopyToLocal(path)
val localKeyTabPath = HadoopFsUtils.getOrCreate(fs).getLocalPathToFileOrCopyToLocal(path)
val keytab = KeyTab.getInstance(localKeyTabPath)
val username = keytab.getOneName.getName

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import za.co.absa.enceladus.dao.auth.MenasKerberosCredentials
import za.co.absa.enceladus.dao.rest.RestDaoFactory
import za.co.absa.enceladus.examples.interpreter.rules.custom.UppercaseCustomConformanceRule
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.utils.fs.HdfsUtils
import za.co.absa.enceladus.utils.testUtils.HadoopFsTestBase
import za.co.absa.enceladus.utils.time.TimeZoneNormalizer

object CustomRuleSample1 {
object CustomRuleSample1 extends HadoopFsTestBase {

case class ExampleRow(id: Int, makeUpper: String, leave: String)
case class OutputRow(id: Int, makeUpper: String, leave: String, doneUpper: String)
Expand All @@ -38,8 +38,6 @@ object CustomRuleSample1 {
.getOrCreate()
TimeZoneNormalizer.normalizeAll(spark) //normalize the timezone of JVM and the spark session

implicit val fsUtils: HdfsUtils = new HdfsUtils(spark.sparkContext.hadoopConfiguration)

def main(args: Array[String]) {
// scalastyle:off magic.number
val menasBaseUrls = List("http://localhost:8080/menas")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import za.co.absa.enceladus.dao.auth.MenasKerberosCredentials
import za.co.absa.enceladus.dao.rest.{MenasConnectionStringParser, RestDaoFactory}
import za.co.absa.enceladus.examples.interpreter.rules.custom.LPadCustomConformanceRule
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.utils.fs.HdfsUtils
import za.co.absa.enceladus.utils.testUtils.HadoopFsTestBase
import za.co.absa.enceladus.utils.time.TimeZoneNormalizer

object CustomRuleSample2 {
object CustomRuleSample2 extends HadoopFsTestBase {

case class ExampleRow(id: Int, addPad: String, leave: String)
case class OutputRow(id: Int, addPad: String, leave: String, donePad: String)
Expand All @@ -39,8 +39,6 @@ object CustomRuleSample2 {
.getOrCreate()
TimeZoneNormalizer.normalizeAll(spark) //normalize the timezone of JVM and the spark session

implicit val fsUtils: HdfsUtils = new HdfsUtils(spark.sparkContext.hadoopConfiguration)

def main(args: Array[String]) {
// scalastyle:off magic.number
val conf = ConfigFactory.load()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,16 @@ import za.co.absa.enceladus.dao.auth.MenasKerberosCredentials
import za.co.absa.enceladus.dao.rest.{MenasConnectionStringParser, RestDaoFactory}
import za.co.absa.enceladus.examples.interpreter.rules.custom.{LPadCustomConformanceRule, UppercaseCustomConformanceRule}
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.utils.fs.HdfsUtils
import za.co.absa.enceladus.utils.testUtils.HadoopFsTestBase
import za.co.absa.enceladus.utils.time.TimeZoneNormalizer

object CustomRuleSample3 {
object CustomRuleSample3 extends HadoopFsTestBase {
implicit val spark: SparkSession = SparkSession.builder
.master("local[*]")
.appName("CustomRuleSample3")
.config("spark.sql.codegen.wholeStage", value = false)
.getOrCreate()
TimeZoneNormalizer.normalizeAll(spark) //normalize the timezone of JVM and the spark session
implicit val fsUtils: HdfsUtils = new HdfsUtils(spark.sparkContext.hadoopConfiguration)

def main(args: Array[String]): Unit = {
val conf = ConfigFactory.load()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import za.co.absa.enceladus.dao.auth.MenasKerberosCredentials
import za.co.absa.enceladus.dao.rest.{MenasConnectionStringParser, RestDaoFactory}
import za.co.absa.enceladus.examples.interpreter.rules.custom.{LPadCustomConformanceRule, UppercaseCustomConformanceRule}
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.utils.fs.HdfsUtils
import za.co.absa.enceladus.utils.testUtils.HadoopFsTestBase
import za.co.absa.enceladus.utils.time.TimeZoneNormalizer

object CustomRuleSample4 {
object CustomRuleSample4 extends HadoopFsTestBase {
TimeZoneNormalizer.normalizeJVMTimeZone() //normalize JVM time zone as soon as possible

/**
Expand Down Expand Up @@ -136,10 +136,10 @@ object CustomRuleSample4 {
result
}

implicit val spark: SparkSession = buildSparkSession()

def main(args: Array[String]): Unit = {
val cmd: CmdConfigLocal = getCmdLineArguments(args)
implicit val spark: SparkSession = buildSparkSession()
implicit val fsUtils: HdfsUtils = new HdfsUtils(spark.sparkContext.hadoopConfiguration)

val conf = ConfigFactory.load()
val menasBaseUrls = MenasConnectionStringParser.parse(conf.getString("menas.rest.uri"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import za.co.absa.enceladus.conformance.config.ConformanceConfig
import za.co.absa.enceladus.conformance.interpreter.{DynamicInterpreter, FeatureSwitches}
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.utils.fs.HdfsUtils
import za.co.absa.enceladus.utils.testUtils.SparkTestBase
import za.co.absa.enceladus.utils.fs.HadoopFsUtils
import za.co.absa.enceladus.utils.testUtils.{HadoopFsTestBase, SparkTestBase}


case class TestInputRow(id: Int, mandatoryString: String, nullableString: Option[String])
Expand All @@ -33,12 +33,12 @@ object TestOutputRow {
def apply(input: TestInputRow, doneUpper: String): TestOutputRow = TestOutputRow(input.id, input.mandatoryString, input.nullableString, doneUpper)
}

class UppercaseCustomConformanceRuleSuite extends AnyFunSuite with SparkTestBase with MockitoSugar {
class UppercaseCustomConformanceRuleSuite extends AnyFunSuite with SparkTestBase with MockitoSugar with HadoopFsTestBase {
import spark.implicits._

implicit val progArgs: ConformanceConfig = ConformanceConfig() // here we may need to specify some parameters (for certain rules)
implicit val dao: MenasDAO = mock[MenasDAO] // you may have to hard-code your own implementation here (if not working with menas)
implicit val fsUtils: HdfsUtils = new HdfsUtils(spark.sparkContext.hadoopConfiguration)


val experimentalMR = true
val isCatalystWorkaroundEnabled = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,20 @@ import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.dao.auth.MenasKerberosCredentials
import za.co.absa.enceladus.dao.rest.{MenasConnectionStringParser, RestDaoFactory}
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.utils.fs.HdfsUtils
import za.co.absa.enceladus.utils.testUtils.SparkTestBase
import za.co.absa.enceladus.utils.fs.HadoopFsUtils
import za.co.absa.enceladus.utils.testUtils.{HadoopFsTestBase, SparkTestBase}

case class XPadTestInputRow(intField: Int, stringField: Option[String])
case class XPadTestOutputRow(intField: Int, stringField: Option[String], targetField: String)
object XPadTestOutputRow {
def apply(input: XPadTestInputRow, targetField: String): XPadTestOutputRow = XPadTestOutputRow(input.intField, input.stringField, targetField)
}

class LpadCustomConformanceRuleSuite extends AnyFunSuite with SparkTestBase with MockitoSugar {
class LpadCustomConformanceRuleSuite extends AnyFunSuite with SparkTestBase with MockitoSugar with HadoopFsTestBase {
import spark.implicits._

implicit val progArgs: ConformanceConfig = ConformanceConfig() // here we may need to specify some parameters (for certain rules)
implicit val dao: MenasDAO = mock[MenasDAO] // you may have to hard-code your own implementation here (if not working with menas)
implicit val fsUtils: HdfsUtils = new HdfsUtils(spark.sparkContext.hadoopConfiguration)

val experimentalMR = true
val isCatalystWorkaroundEnabled = true
Expand Down Expand Up @@ -180,7 +179,7 @@ class LpadCustomConformanceRuleSuite extends AnyFunSuite with SparkTestBase with
}


class RpadCustomConformanceRuleSuite extends AnyFunSuite with SparkTestBase {
class RpadCustomConformanceRuleSuite extends AnyFunSuite with SparkTestBase with HadoopFsTestBase {

import spark.implicits._

Expand All @@ -189,7 +188,6 @@ class RpadCustomConformanceRuleSuite extends AnyFunSuite with SparkTestBase {
private val meansCredentials = MenasKerberosCredentials("[email protected]", "src/test/resources/user.keytab.example")
implicit val progArgs: ConformanceConfig = ConformanceConfig() // here we may need to specify some parameters (for certain rules)
implicit val dao: MenasDAO = RestDaoFactory.getInstance(meansCredentials, menasBaseUrls) // you may have to hard-code your own implementation here (if not working with menas)
implicit val fsUtils: HdfsUtils = new HdfsUtils(spark.sparkContext.hadoopConfiguration)

val experimentalMR = true
val isCatalystWorkaroundEnabled = true
Expand Down
20 changes: 2 additions & 18 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@
<maven.gpg.plugin.version>1.6</maven.gpg.plugin.version>
<!--dependency versions-->
<abris.version>3.1.1</abris.version>
<atum.version>3.0.0</atum.version>
<atum.version>3.1.0</atum.version>
<aws.java.sdk.version>2.13.65</aws.java.sdk.version>
<bower.chart.js.version>2.7.3</bower.chart.js.version>
<bson.codec.jsr310.version>3.5.4</bson.codec.jsr310.version>
Expand Down Expand Up @@ -204,18 +204,6 @@
<log.specialfilters.acceptonmatch>false</log.specialfilters.acceptonmatch>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>${aws.java.sdk.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
Expand All @@ -239,17 +227,13 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
<exclusions>
<!-- using AWS SDK v2 instead of v1 -->
<!-- using AWS SDK v2 instead of v1 if desired -->
<exclusion>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
Expand Down
6 changes: 0 additions & 6 deletions spark-jobs/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,3 @@ timezone="UTC"
#kafka.security.protocol="SASL_SSL"
#kafka.sasl.mechanism="GSSAPI"

# S3 specific settings:
s3.region = "eu-west-1" # default region, overridable

# s3.kmsKeyId is recommended to set externally only:
# s3.kmsKeyId = "arn:aws:kms:eu-west-1:XXXX:key/YYYY"

Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,24 @@ import java.text.MessageFormat
import java.time.Instant

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql.SparkSession
import org.slf4j.{Logger, LoggerFactory}
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.regions.Region
import za.co.absa.atum.AtumImplicits
import za.co.absa.atum.core.{Atum, ControlType}
import za.co.absa.atum.persistence.S3KmsSettings
import za.co.absa.enceladus.common.Constants.{InfoDateColumn, InfoVersionColumn}
import za.co.absa.enceladus.common.config.{JobConfigParser, PathConfig, S3Config}
import za.co.absa.enceladus.common.config.{JobConfigParser, PathConfig}
import za.co.absa.enceladus.common.plugin.PostProcessingService
import za.co.absa.enceladus.common.plugin.menas.{MenasPlugin, MenasRunUrl}
import za.co.absa.enceladus.common.version.SparkVersionGuard
import za.co.absa.enceladus.dao.MenasDAO
import za.co.absa.enceladus.dao.rest.MenasConnectionStringParser
import za.co.absa.enceladus.model.Dataset
import za.co.absa.enceladus.plugins.builtin.errorsender.params.ErrorSenderPluginParams
import za.co.absa.enceladus.utils.config.{ConfigReader, SecureConfig}
import za.co.absa.enceladus.utils.fs.{DistributedFsUtils, S3FsUtils}
import za.co.absa.enceladus.utils.config.{ConfigReader, PathWithFs, SecureConfig}
import za.co.absa.enceladus.utils.fs.{FileSystemUtils, HadoopFsUtils}
import za.co.absa.enceladus.utils.general.ProjectMetadataTools
import za.co.absa.enceladus.utils.modules.SourcePhase
import za.co.absa.enceladus.utils.modules.SourcePhase.Standardization
Expand All @@ -52,7 +51,6 @@ trait CommonJobExecution {
protected case class PreparationResult(dataset: Dataset,
reportVersion: Int,
pathCfg: PathConfig,
s3Config: S3Config,
performance: PerformanceMeasurer)

TimeZoneNormalizer.normalizeJVMTimeZone()
Expand Down Expand Up @@ -82,18 +80,18 @@ trait CommonJobExecution {
protected def prepareJob[T]()
(implicit dao: MenasDAO,
cmd: JobConfigParser[T],
fsUtils: DistributedFsUtils,
spark: SparkSession): PreparationResult = {
val confReader: ConfigReader = new ConfigReader(conf)
confReader.logEffectiveConfigProps(Constants.ConfigKeysToRedact)

dao.authenticate()

implicit val hadoopConf = spark.sparkContext.hadoopConfiguration

val dataset = dao.getDataset(cmd.datasetName, cmd.datasetVersion)
val reportVersion = getReportVersion(cmd, dataset)
val pathCfg: PathConfig = getPathConfig(cmd, dataset, reportVersion)
val s3Config: S3Config = getS3Config

validateOutputPath(s3Config, pathCfg)
validateOutputPath(pathCfg)

// Enable Spline
import za.co.absa.spline.harvester.SparkLineageInitializer._
Expand All @@ -102,24 +100,25 @@ trait CommonJobExecution {
// Enable non-default persistence storage level if provided in the command line
cmd.persistStorageLevel.foreach(Atum.setCachingStorageLevel)

PreparationResult(dataset, reportVersion, pathCfg, s3Config, new PerformanceMeasurer(spark.sparkContext.appName))
PreparationResult(dataset, reportVersion, pathCfg, new PerformanceMeasurer(spark.sparkContext.appName))
}

protected def validateOutputPath(s3Config: S3Config, pathConfig: PathConfig)(implicit fsUtils: DistributedFsUtils): Unit
protected def validateOutputPath(pathConfig: PathConfig): Unit

protected def validateIfPathAlreadyExists(s3Config: S3Config, path: String)(implicit fsUtils: DistributedFsUtils): Unit = {
if (fsUtils.exists(path)) {
protected def validateIfPathAlreadyExists(entry: PathWithFs): Unit = {
val fsUtils = HadoopFsUtils.getOrCreate(entry.fileSystem)
if (fsUtils.exists(entry.path)) {
throw new IllegalStateException(
s"Path $path already exists. Increment the run version, or delete $path"
s"Path ${entry.path} already exists. Increment the run version, or delete ${entry.path}"
)
}
}

protected def runPostProcessing[T](sourcePhase: SourcePhase, preparationResult: PreparationResult, jobCmdConfig: JobConfigParser[T])
(implicit spark: SparkSession, fileSystemVersionUtils: DistributedFsUtils): Unit = {
(implicit spark: SparkSession): Unit = {
val outputPath = sourcePhase match {
case Standardization => preparationResult.pathCfg.standardizationPath
case _ => preparationResult.pathCfg.publishPath
case Standardization => preparationResult.pathCfg.standardization.path
case _ => preparationResult.pathCfg.publish.path
}

log.info(s"rereading outputPath $outputPath to run postProcessing")
Expand Down Expand Up @@ -161,24 +160,14 @@ trait CommonJobExecution {
}
}

protected def getPathConfig[T](cmd: JobConfigParser[T], dataset: Dataset, reportVersion: Int): PathConfig = {
PathConfig(
rawPath = buildRawPath(cmd, dataset, reportVersion),
publishPath = buildPublishPath(cmd, dataset, reportVersion),
standardizationPath = getStandardizationPath(cmd, reportVersion)
)
}
protected def getPathConfig[T](cmd: JobConfigParser[T], dataset: Dataset, reportVersion: Int)
(implicit hadoopConf: Configuration): PathConfig = {

protected def getS3Config: S3Config = {
val keyId = conf.getString("s3.kmsKeyId")
val region = Region.of(conf.getString("s3.region"))
val rawPath = buildRawPath(cmd, dataset, reportVersion)
val publishPath = buildPublishPath(cmd, dataset, reportVersion)
val standardizationPath = getStandardizationPath(cmd, reportVersion)

S3Config(region, keyId)
}

protected def getS3FsUtil(implicit credentialsProvider: AwsCredentialsProvider): S3FsUtils = {
val s3Config = getS3Config
S3FsUtils(s3Config.region, S3KmsSettings(s3Config.kmsKeyId))
PathConfig.fromPaths(rawPath, publishPath, standardizationPath)
}

private def buildPublishPath[T](cmd: JobConfigParser[T], ds: Dataset, reportVersion: Int): String = {
Expand Down Expand Up @@ -251,10 +240,15 @@ trait CommonJobExecution {
}
}

private def getReportVersion[T](jobConfig: JobConfigParser[T], dataset: Dataset)(implicit fsUtils: DistributedFsUtils): Int = {
private def getReportVersion[T](jobConfig: JobConfigParser[T], dataset: Dataset)(implicit hadoopConf: Configuration): Int = {
jobConfig.reportVersion match {
case Some(version) => version
case None =>

// publishFs for this specific feature (needed for missing reportVersion until reusable
// common "PathConfig" with FS objects is established)
val tempPublishFs: FileSystem = FileSystemUtils.getFileSystemFromPath(dataset.hdfsPublishPath)
val fsUtils = HadoopFsUtils.getOrCreate(tempPublishFs)
val newVersion = fsUtils.getLatestVersion(dataset.hdfsPublishPath, jobConfig.reportDate) + 1
log.warn(s"Report version not provided, inferred report version: $newVersion")
log.warn("This is an EXPERIMENTAL feature.")
Expand Down
Loading

0 comments on commit 5b5628e

Please sign in to comment.