Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/1556 file access PoC using Hadoop FS API #1586

Merged
merged 12 commits into from
Nov 9, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ trait StandardizationExecution extends CommonJobExecution {
val rawFs = preparationResult.pathCfg.raw.fileSystem
val rawFsUtils = rawFs.toFsUtils

val stdFs = preparationResult.pathCfg.standardization.fileSystem

val stdDirSize = rawFsUtils.getDirectorySize(preparationResult.pathCfg.raw.path)
preparationResult.performance.startMeasurement(stdDirSize)

Expand Down Expand Up @@ -112,10 +110,9 @@ trait StandardizationExecution extends CommonJobExecution {

protected def readStandardizationInputData[T](schema: StructType,
cmd: StandardizationConfigParser[T],
path: String,
rawInput: PathWithFs,
dataset: Dataset)
(implicit spark: SparkSession,
rawFs: FileSystem,
dao: MenasDAO): DataFrame = {
val numberOfColumns = schema.fields.length
val standardizationReader = new StandardizationPropertiesProvider()
Expand All @@ -127,9 +124,9 @@ trait StandardizationExecution extends CommonJobExecution {
val inputSchema = PlainSchemaGenerator.generateInputSchema(schema, optColumnNameOfCorruptRecord)
dfReaderConfigured.schema(inputSchema)
}
val dfWithSchema = readerWithOptSchema.load(s"$path/*")
val dfWithSchema = readerWithOptSchema.load(s"${rawInput.path}/*")

ensureSplittable(dfWithSchema, path, schema)(spark, rawFs.toFsUtils)
ensureSplittable(dfWithSchema, rawInput, schema)
}

private def getColumnNameOfCorruptRecord[R](schema: StructType, cmd: StandardizationConfigParser[R])
Expand Down Expand Up @@ -222,9 +219,10 @@ trait StandardizationExecution extends CommonJobExecution {

//scalastyle:off parameter.number

private def ensureSplittable(df: DataFrame, path: String, schema: StructType)
(implicit spark: SparkSession, fsUtils: DistributedFsUtils): DataFrame = {
if (fsUtils.isNonSplittable(path)) {
private def ensureSplittable(df: DataFrame, input: PathWithFs, schema: StructType)
(implicit spark: SparkSession): DataFrame = {
implicit val fsUtils = input.fileSystem.toFsUtils
if (fsUtils.isNonSplittable(input.path)) {
convertToSplittable(df, schema)
} else {
df
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ object StandardizationJob extends StandardizationExecution {

val preparationResult = prepareJob()
val schema = prepareStandardization(args, menasCredentials, preparationResult)
implicit val rawFs: FileSystem = preparationResult.pathCfg.raw.fileSystem
val inputData = readStandardizationInputData(schema, cmd, preparationResult.pathCfg.raw.path, preparationResult.dataset)
val inputData = readStandardizationInputData(schema, cmd, preparationResult.pathCfg.raw, preparationResult.dataset)

try {
val result = standardize(inputData, schema, cmd)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ object StandardizationAndConformanceJob extends StandardizationAndConformanceExe

val preparationResult = prepareJob()
val schema = prepareStandardization(args, menasCredentials, preparationResult)
implicit val rawFs = preparationResult.pathCfg.raw.fileSystem
val inputData = readStandardizationInputData(schema, cmd, preparationResult.pathCfg.raw.path, preparationResult.dataset)
val inputData = readStandardizationInputData(schema, cmd, preparationResult.pathCfg.raw, preparationResult.dataset)

try {
val standardized = standardize(inputData, schema, cmd)
Expand Down