Skip to content

Commit

Permalink
#latest should pick the latest available path (feathr-ai#1146)
Browse files Browse the repository at this point in the history
* #latest should pick the latest available path

* update gradle.properties

* add empty folder

---------

Co-authored-by: Rakesh Kashyap Hanasoge Padmanabha <[email protected]>
  • Loading branch information
2 people authored and Yuqing-cat committed May 23, 2023
1 parent 32c5c8a commit f7e9107
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ object HdfsUtils {
* @return A normalized (no multiple consecutive or trailing "/") String with all the identifiers replaced with the provided localDateTime in the given format
*/
def replaceTimestamp(inputPath: String,
localDateTime: LocalDateTime = LocalDateTime.now(ZoneOffset.UTC),
dateTimeFormatterPattern: DateTimeFormatter = timeStampFormatter): String = {
localDateTime: LocalDateTime = LocalDateTime.now(ZoneOffset.UTC),
dateTimeFormatterPattern: DateTimeFormatter = timeStampFormatter): String = {
validatePath(inputPath, timestampIdentifier)

val dateTimeString = localDateTime.format(dateTimeFormatterPattern)
Expand Down Expand Up @@ -185,10 +185,10 @@ object HdfsUtils {
* @return A Seq of paths under basePath covering the period [startInclusive, endExclusive).
*/
def getPaths(
basePath: String,
startInclusive: LocalDateTime,
endExclusive: LocalDateTime,
unit: ChronoUnit): Seq[String] = {
basePath: String,
startInclusive: LocalDateTime,
endExclusive: LocalDateTime,
unit: ChronoUnit): Seq[String] = {
val formatter = unit match {
case ChronoUnit.DAYS =>
TemporalPathFormats.Daily.formatter
Expand All @@ -211,11 +211,11 @@ object HdfsUtils {
* @return A Seq of paths under basePath covering the period [startInclusive, endExclusive).
*/
def getPaths(
basePath: String,
startInclusive: LocalDateTime,
endExclusive: LocalDateTime,
unit: ChronoUnit,
formatter: DateTimeFormatter): Seq[String] = {
basePath: String,
startInclusive: LocalDateTime,
endExclusive: LocalDateTime,
unit: ChronoUnit,
formatter: DateTimeFormatter): Seq[String] = {
getTemporalRange(startInclusive, endExclusive, unit)
.map { time => createStringPath(basePath, formatter.format(time)) }
}
Expand Down Expand Up @@ -255,15 +255,15 @@ object HdfsUtils {
* that starts with either `.` or `_`.
*/
def listFiles(path: String,
recursive: Boolean = true,
excludeFilesPrefixList: Seq[String] = List(".", "_"),
conf: Configuration = conf): Seq[String] = {
recursive: Boolean = true,
excludeFilesPrefixList: Seq[String] = List(".", "_"),
conf: Configuration = conf): Seq[String] = {

@tailrec
def traverseIterator(path: String,
iter: RemoteIterator[LocatedFileStatus],
accumulator: List[String],
conf: Configuration = conf): Seq[String] = {
iter: RemoteIterator[LocatedFileStatus],
accumulator: List[String],
conf: Configuration = conf): Seq[String] = {
if (!iter.hasNext) {
accumulator
} else {
Expand Down Expand Up @@ -292,11 +292,11 @@ object HdfsUtils {
* any char in excludeFilesPrefixList, it will be excluded.
*/
def getAllFilesOfGivenType(path: String,
fileType: String = "",
recursive: Boolean = true,
errorOnMissingFiles: Boolean = true,
excludeFilesPrefixList: Seq[String] = List(".", "_"),
conf: Configuration = conf): Seq[String] = {
fileType: String = "",
recursive: Boolean = true,
errorOnMissingFiles: Boolean = true,
excludeFilesPrefixList: Seq[String] = List(".", "_"),
conf: Configuration = conf): Seq[String] = {
def filterFileName(fileName: String): Boolean = {
val lastComponent = fileName.split("/").last
fileName.endsWith(fileType)
Expand All @@ -320,9 +320,9 @@ object HdfsUtils {
* @return A Seq of LocalDateTime covering the period [startInclusive, endExclusive).
*/
private def getTemporalRange(
startInclusive: LocalDateTime,
endExclusive: LocalDateTime,
unit: ChronoUnit): Seq[LocalDateTime] = {
startInclusive: LocalDateTime,
endExclusive: LocalDateTime,
unit: ChronoUnit): Seq[LocalDateTime] = {
if (startInclusive.truncatedTo(unit) != startInclusive) {
throw new IllegalArgumentException(
s"Invalid argument: The startInclusive (${startInclusive}}) " +
Expand Down Expand Up @@ -377,7 +377,7 @@ object HdfsUtils {
* @return modified output path with a date representation.
*/
def appendDateFormatted(outputPath: String, dateTimeFormatter: DateTimeFormatter,
localDateTime: LocalDateTime): String = {
localDateTime: LocalDateTime): String = {
val dateFormattedExtension = dateTimeFormatter.format(localDateTime)

HdfsUtils.createStringPath(outputPath, dateFormattedExtension)
Expand All @@ -402,11 +402,12 @@ object HdfsUtils {
* @return A List[String] containing the paths of the subfolders sorted in reverse lexicographical order
*/
private def getSortedSubfolderPaths(basePath: String,
excludeDirsPrefixList: Seq[String] = List(".", "_"),
conf: Configuration = conf): Seq[String] = {
excludeDirsPrefixList: Seq[String] = List(".", "_"),
conf: Configuration = conf): Seq[String] = {
val fs: FileSystem = FileSystem.get(conf)
val directories: Seq[String] = fs.listStatus(new Path(basePath))
.filter(_.isDirectory)
.filter(x => fs.getContentSummary(x.getPath).getLength > 0) // Filter out empty directories.
.map(_.getPath.getName)
.filter(dirName =>
!excludeDirsPrefixList.exists(prefix => dirName.startsWith(prefix)))
Expand Down Expand Up @@ -460,8 +461,8 @@ object HdfsUtils {
* @return Array[String]
*/
def hdfsSubdir(inputPath: String,
excludePathsPrefixList: Seq[String] = List(".", "_"),
conf: Configuration = conf): Array[String] = {
excludePathsPrefixList: Seq[String] = List(".", "_"),
conf: Configuration = conf): Array[String] = {
val filter = new PathFilter() {
override def accept(path : Path): Boolean =
!excludePathsPrefixList.exists(prefix => path.getName.startsWith(prefix))
Expand Down Expand Up @@ -513,7 +514,7 @@ object HdfsUtils {
* Returns temp file path
*/
def hdfsCreateTempFile(User: String, tempDir: String, prefix: String, suffix: String,
attempts: Int = maxAttempts, conf: Configuration = conf): // scalaStyle:off magic.number
attempts: Int = maxAttempts, conf: Configuration = conf): // scalaStyle:off magic.number
Option[String] = {
val usePrefix = if (prefix.nonEmpty) prefix else "file"
val useSuffix = if (suffix.nonEmpty) suffix else "tmp"
Expand Down Expand Up @@ -544,7 +545,7 @@ object HdfsUtils {
* Converts Iterator to InputStream on HDFS
*/
def hdfsConvertIteratorToInputStream(User: String, tempDirPrefix: String,
iter: Iterator[String], attempts: Int = maxAttempts, conf: Configuration = conf):
iter: Iterator[String], attempts: Int = maxAttempts, conf: Configuration = conf):
Option[InputStream] = {
val tempPathOpt = hdfsCreateTempFile(User, tempDirPrefix,
"iter", "tmp", attempts)
Expand Down
Binary file not shown.
Binary file not shown.
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,15 @@ class TestDataSourceAccessor extends TestFeathr {
assertTrue(accessor.isInstanceOf[NonTimeBasedDataSourceAccessor])
assertEquals(accessor.get().count(), 10L)
}

@Test(description = "test loading dataframe with BatchDataLoader having #LATEST in its path, also ensures that empty folders are not picked out.")
def testBatchDataLoaderWithLatestPathAndEmptyFolders(): Unit = {
// avroLatest folder has an empty folder with 10/01.
val path = "src/test/resources/avroLatest/#LATEST/#LATEST/#LATEST"
val source = DataSource(path, SourceFormatType.FIXED_PATH)
val accessor = DataSourceAccessor(ss = ss, source = source, dateIntervalOpt = sourceInterval,
expectDatumType = None, failOnMissingPartition = false, dataPathHandlers = List())
assertTrue(accessor.isInstanceOf[NonTimeBasedDataSourceAccessor])
assertEquals(accessor.get().count(), 10L)
}
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
version=1.0.2-rc4
version=1.0.2-rc5
SONATYPE_AUTOMATIC_RELEASE=true
POM_ARTIFACT_ID=feathr_2.12

0 comments on commit f7e9107

Please sign in to comment.