Skip to content

Commit

Permalink
Merge pull request #1282 from kkdlau/bugfix/1214-grab-zero-byte-pdf
Browse files Browse the repository at this point in the history
#1214 Only take pdf that are good for processing
  • Loading branch information
Frooodle authored May 30, 2024
2 parents 316b4e4 + 65b9544 commit 5d6e23d
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 4 deletions.
26 changes: 26 additions & 0 deletions src/main/java/stirling/software/SPDF/config/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Properties;
import java.util.function.Predicate;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
Expand Down Expand Up @@ -108,4 +110,28 @@ public boolean bookAndHtmlFormatsInstalled() {
public boolean missingActivSecurity() {
return false;
}

@Bean(name = "watchedFoldersDir")
public String watchedFoldersDir() {
return "./pipeline/watchedFolders/";
}

@Bean(name = "finishedFoldersDir")
public String finishedFoldersDir() {
return "./pipeline/finishedFolders/";
}

@Bean(name = "directoryFilter")
public Predicate<Path> processPDFOnlyFilter() {
return path -> {
if (Files.isDirectory(path)) {
return !path.toString()
.contains(
"processing");
} else {
String fileName = path.getFileName().toString();
return fileName.endsWith(".pdf");
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.Resource;
import org.springframework.scheduling.annotation.Scheduled;
Expand All @@ -28,18 +29,26 @@

import stirling.software.SPDF.model.PipelineConfig;
import stirling.software.SPDF.model.PipelineOperation;
import stirling.software.SPDF.utils.FileMonitor;

@Service
public class PipelineDirectoryProcessor {

private static final Logger logger = LoggerFactory.getLogger(PipelineDirectoryProcessor.class);
@Autowired private ObjectMapper objectMapper;
@Autowired private ApiDocService apiDocService;
@Autowired PipelineProcessor processor;
@Autowired FileMonitor fileMonitor;

final String watchedFoldersDir = "./pipeline/watchedFolders/";
final String finishedFoldersDir = "./pipeline/finishedFolders/";
final String watchedFoldersDir;
final String finishedFoldersDir;

@Autowired PipelineProcessor processor;
public PipelineDirectoryProcessor(
@Qualifier("watchedFoldersDir") String watchedFoldersDir,
@Qualifier("finishedFoldersDir") String finishedFoldersDir) {
this.watchedFoldersDir = watchedFoldersDir;
this.finishedFoldersDir = finishedFoldersDir;
}

@Scheduled(fixedRate = 60000)
public void scanFolders() {
Expand Down Expand Up @@ -130,7 +139,7 @@ private File[] collectFilesForProcessing(Path dir, Path jsonFile, PipelineOperat
throws IOException {
try (Stream<Path> paths = Files.list(dir)) {
if ("automated".equals(operation.getParameters().get("fileInput"))) {
return paths.filter(path -> !Files.isDirectory(path) && !path.equals(jsonFile))
return paths.filter(path -> !Files.isDirectory(path) && !path.equals(jsonFile) && fileMonitor.isFileReadyForProcessing(path))
.map(Path::toFile)
.toArray(File[]::new);
} else {
Expand Down
167 changes: 167 additions & 0 deletions src/main/java/stirling/software/SPDF/utils/FileMonitor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package stirling.software.SPDF.utils;

import static java.nio.file.StandardWatchEventKinds.*;

import java.io.IOException;
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class FileMonitor {
private static final Logger logger = LoggerFactory.getLogger(FileMonitor.class);
private final Map<Path, WatchKey> path2KeyMapping;
private final Set<Path> newlyDiscoveredFiles;
private final ConcurrentHashMap.KeySetView<Path, Boolean> readyForProcessingFiles;
private final WatchService watchService;
private final Predicate<Path> pathFilter;
private final Path rootDir;
private Set<Path> stagingFiles;

/**
* @param rootDirectory the root directory to monitor
* @param pathFilter the filter to apply to the paths, return true if the path should be monitored, false otherwise
*/
@Autowired
public FileMonitor(
@Qualifier("watchedFoldersDir") String rootDirectory,
@Qualifier("directoryFilter") Predicate<Path> pathFilter)
throws IOException {
this.newlyDiscoveredFiles = new HashSet<>();
this.path2KeyMapping = new HashMap<>();
this.stagingFiles = new HashSet<>();
this.pathFilter = pathFilter;
this.readyForProcessingFiles = ConcurrentHashMap.newKeySet();
this.watchService = FileSystems.getDefault().newWatchService();
this.rootDir = Path.of(rootDirectory);
}

private boolean shouldNotProcess(Path path) {
return !pathFilter.test(path);
}

private void recursivelyRegisterEntry(Path dir) throws IOException {
WatchKey key = dir.register(watchService, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
path2KeyMapping.put(dir, key);
logger.info("Registered directory: {}", dir);

try (Stream<Path> directoryVisitor = Files.walk(dir, 1)) {
final Iterator<Path> iterator = directoryVisitor.iterator();
while (iterator.hasNext()) {
Path path = iterator.next();
if (path.equals(dir) || shouldNotProcess(path)) continue;

if (Files.isDirectory(path)) {
recursivelyRegisterEntry(path);
} else if (Files.isRegularFile(path)) {
handleFileCreation(path);
}
}
}
}

@Scheduled(fixedRate = 5000)
public void trackFiles() {
/*
All files observed changes in the last iteration will be considered as staging files.
If those files are not modified in current iteration, they will be considered as ready for processing.
*/
stagingFiles = new HashSet<>(newlyDiscoveredFiles);
readyForProcessingFiles.clear();

if (path2KeyMapping.isEmpty()) {
logger.warn(
"not monitoring any directory, even the root directory itself: {}", rootDir);
if (Files.exists(
rootDir)) { // if the root directory exists, re-register the root directory
try {
recursivelyRegisterEntry(rootDir);
} catch (IOException e) {
logger.error("unable to register monitoring", e);
}
}
}

WatchKey key;
while ((key = watchService.poll()) != null) {
final Path watchingDir = (Path) key.watchable();
key.pollEvents()
.forEach(
(evt) -> {
final Path path = (Path) evt.context();
final WatchEvent.Kind<?> kind = evt.kind();
if (shouldNotProcess(path)) return;

try {
if (Files.isDirectory(path)) {
if (kind == ENTRY_CREATE) {
handleDirectoryCreation(path);
}
/*
we don't need to handle directory deletion or modification
- directory deletion will be handled by key.reset()
- directory modification indicates a new file creation or deletion, which is handled by below
*/
}
Path relativePathFromRoot = watchingDir.resolve(path);
if (kind == ENTRY_CREATE) {
handleFileCreation(relativePathFromRoot);
} else if (kind == ENTRY_DELETE) {
handleFileRemoval(relativePathFromRoot);
} else if (kind == ENTRY_MODIFY) {
handleFileModification(relativePathFromRoot);
}
} catch (Exception e) {
logger.error("Error while processing file: {}", path, e);
}
});

boolean isKeyValid = key.reset();
if (!isKeyValid) { // key is invalid when the directory itself is no longer exists
path2KeyMapping.remove((Path) key.watchable());
}
}
readyForProcessingFiles.addAll(stagingFiles);
}

private void handleDirectoryCreation(Path dir) throws IOException {
WatchKey key = dir.register(watchService, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
path2KeyMapping.put(dir, key);
}

private void handleFileRemoval(Path path) {
newlyDiscoveredFiles.remove(path);
stagingFiles.remove(path);
}

private void handleFileCreation(Path path) {
newlyDiscoveredFiles.add(path);
stagingFiles.remove(path);
}

private void handleFileModification(Path path) {
// the logic is the same
handleFileCreation(path);
}

/**
* Check if the file is ready for processing.
*
* <p>A file is ready for processing if it is not being modified for 5000ms.
*
* @param path the path of the file
* @return true if the file is ready for processing, false otherwise
*/
public boolean isFileReadyForProcessing(Path path) {
return readyForProcessingFiles.contains(path);
}
}

0 comments on commit 5d6e23d

Please sign in to comment.