Skip to content

Commit

Permalink
Add support for merging meta data files
Browse files Browse the repository at this point in the history
  • Loading branch information
dadoonet committed Feb 13, 2025
1 parent c54a1eb commit 253e458
Show file tree
Hide file tree
Showing 53 changed files with 615 additions and 76 deletions.
Original file line number Diff line number Diff line change
@@ -1,20 +1,67 @@
package fr.pilato.elasticsearch.crawler.fs.beans;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerIllegalConfigurationException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Iterator;

import static fr.pilato.elasticsearch.crawler.fs.framework.JsonUtil.*;
import static fr.pilato.elasticsearch.crawler.fs.framework.JsonUtil.mapper;

public class DocUtils {

private static final Logger logger = LogManager.getLogger(DocUtils.class);

/**
* Merge a json document with tags coming from a file
* @param doc The document to merge
* @param filename The filename to read
* @return The merged document
* @throws FsCrawlerIllegalConfigurationException If the tags can not be parsed
*/
public static Doc getMergedDoc(Doc doc, Path filename) throws FsCrawlerIllegalConfigurationException {
if (filename == null) {
return doc;
}

try (InputStream tags = Files.newInputStream(filename, StandardOpenOption.READ)) {
return getMergedDoc(doc, filename.getFileName().toString(), tags);
} catch (IOException e) {
logger.error("Error parsing tags", e);
throw new FsCrawlerIllegalConfigurationException("Error parsing tags: " + e.getMessage());
}
}

/**
* Merge a json document with tags coming from a file
* @param doc The document to merge
* @param filename The filename to read
* @return The merged document
* @throws FsCrawlerIllegalConfigurationException If the tags can not be parsed
*/
public static Doc getMergedDoc(Doc doc, String filename, InputStream tags) throws FsCrawlerIllegalConfigurationException {
if (filename == null) {
return doc;
}
logger.trace("Reading tags from {}", filename);
// We test if the extension is .json or .yml/.yaml
if (filename.endsWith(".json")) {
return getMergedDoc(doc, tags, prettyMapper);
} else {
return getMergedDoc(doc, tags, ymlMapper);
}
}

/**
* Merge a json document with tags
* @param doc The document to merge
Expand All @@ -23,6 +70,17 @@ public class DocUtils {
* @throws FsCrawlerIllegalConfigurationException If the tags can not be parsed
*/
public static Doc getMergedJsonDoc(Doc doc, InputStream tags) throws FsCrawlerIllegalConfigurationException {
return getMergedDoc(doc, tags, mapper);
}

/**
* Merge a json document with tags
* @param doc The document to merge
* @param tags The tags to merge
* @return The merged document
* @throws FsCrawlerIllegalConfigurationException If the tags can not be parsed
*/
public static Doc getMergedDoc(Doc doc, InputStream tags, ObjectMapper mapper) throws FsCrawlerIllegalConfigurationException {
if (tags == null) {
return doc;
}
Expand Down Expand Up @@ -70,4 +128,13 @@ private static JsonNode merge(JsonNode mainNode, JsonNode updateNode) {

return mainNode;
}

public static String prettyPrint(Doc doc) {
try {
return prettyMapper.writeValueAsString(doc);
} catch (JsonProcessingException e) {
logger.warn("Can not pretty print the document as json", e);
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package fr.pilato.elasticsearch.crawler.fs.beans;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.Test;

import java.io.InputStream;
import java.nio.file.Path;

import static fr.pilato.elasticsearch.crawler.fs.beans.DocUtils.prettyPrint;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

public class DocUtilsTest {
private static final Logger logger = LogManager.getLogger(DocUtilsTest.class);

// Get the tags path from resources
Path path = Path.of(DocUtilsTest.class.getResource("/tags").getPath());

@Test
public void testMergeDocsWithNothing() {
Doc mergedDoc = DocUtils.getMergedJsonDoc(getDocSample(), (InputStream) null);
logger.trace("Merged doc: {}", prettyPrint(mergedDoc));

assertThat(mergedDoc.getContent(), is("This is a test"));
assertThat(mergedDoc.getFile().getFilename(), is("foo.txt"));
assertThat(mergedDoc.getMeta(), notNullValue());
assertThat(mergedDoc.getPath(), notNullValue());
assertThat(mergedDoc.getAttachment(), nullValue());
assertThat(mergedDoc.getExternal(), nullValue());
assertThat(mergedDoc.getAttributes(), nullValue());
}

@Test
public void testMergeDocsWithAJsonFile() {
testMergeDocsWithAFile(path.resolve("meta-as-json.json"));
}

@Test
public void testMergeDocsWithAYamlFile() {
testMergeDocsWithAFile(path.resolve(".meta.yml"));
}

private void testMergeDocsWithAFile(Path file) {
Doc mergedDoc = DocUtils.getMergedDoc(getDocSample(), file);
logger.trace("Merged doc: {}", prettyPrint(mergedDoc));

assertThat(mergedDoc.getContent(), is("This is a test"));
assertThat(mergedDoc.getFile().getFilename(), is("foo.txt"));
assertThat(mergedDoc.getMeta(), notNullValue());
assertThat(mergedDoc.getPath(), notNullValue());
assertThat(mergedDoc.getExternal(), notNullValue());
assertThat(mergedDoc.getExternal().get("tenantId"), is(23));
assertThat(mergedDoc.getAttachment(), nullValue());
assertThat(mergedDoc.getAttributes(), nullValue());
}

private Doc getDocSample() {
File file = new File();
file.setFilename("foo.txt");
Doc doc = new Doc();
doc.setFile(file);
doc.setContent("This is a test");
return doc;
}
}
18 changes: 18 additions & 0 deletions beans/src/test/resources/tags/.meta.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
external:
tenantId: 23
company: "shoe company"
projectId: 34
project: "business development"
daysOpen:
- "Mon"
- "Tue"
- "Wed"
- "Thu"
- "Fri"
products:
- brand: "nike"
size: 41
sub: "Air MAX"
- brand: "reebok"
size: 43
sub: "Pump"
27 changes: 27 additions & 0 deletions beans/src/test/resources/tags/meta-as-json.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"external": {
"tenantId" : 23,
"company": "shoe company",
"projectId": 34,
"project": "business development",
"daysOpen": [
"Mon",
"Tue",
"Wed",
"Thu",
"Fri"
],
"products": [
{
"brand": "nike",
"size": 41,
"sub": "Air MAX"
},
{
"brand": "reebok",
"size": 43,
"sub": "Pump"
}
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,7 @@

package fr.pilato.elasticsearch.crawler.fs;

import fr.pilato.elasticsearch.crawler.fs.beans.Attributes;
import fr.pilato.elasticsearch.crawler.fs.beans.Doc;
import fr.pilato.elasticsearch.crawler.fs.beans.Folder;
import fr.pilato.elasticsearch.crawler.fs.beans.FsJob;
import fr.pilato.elasticsearch.crawler.fs.beans.FsJobFileHandler;
import fr.pilato.elasticsearch.crawler.fs.beans.ScanStatistic;
import fr.pilato.elasticsearch.crawler.fs.beans.*;
import fr.pilato.elasticsearch.crawler.fs.crawler.FileAbstractModel;
import fr.pilato.elasticsearch.crawler.fs.crawler.FileAbstractor;
import fr.pilato.elasticsearch.crawler.fs.crawler.fs.FileAbstractorFile;
Expand All @@ -39,17 +34,18 @@
import fr.pilato.elasticsearch.crawler.fs.settings.Server.PROTOCOL;
import fr.pilato.elasticsearch.crawler.fs.tika.TikaDocParser;
import fr.pilato.elasticsearch.crawler.fs.tika.XmlDocParser;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.*;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.stream.Collectors;
Expand All @@ -71,6 +67,7 @@ public abstract class FsParserAbstract extends FsParser {
private final MessageDigest messageDigest;
private final String pathSeparator;
private final FileAbstractor<?> fileAbstractor;
private final String metadataFilename;

private ScanStatistic stats;

Expand Down Expand Up @@ -103,6 +100,13 @@ public abstract class FsParserAbstract extends FsParser {
}

fileAbstractor = buildFileAbstractor(fsSettings);

if (fsSettings.getTags() != null && !StringUtils.isEmpty(fsSettings.getTags().getMetaFilename())) {
metadataFilename = fsSettings.getTags().getMetaFilename();
logger.debug("We are going to use [{}] as meta file if found whil crawling dirs", metadataFilename);
} else {
metadataFilename = null;
}
}

protected abstract FileAbstractor<?> buildFileAbstractor(FsSettings fsSettings);
Expand Down Expand Up @@ -251,6 +255,7 @@ private void addFilesRecursively(String filepath, LocalDateTime lastScanDate)

if (children != null) {
boolean ignoreFolder = false;
InputStream metadata = null;
for (FileAbstractModel child : children) {
// We check if we have a .fscrawlerignore file within this folder in which case
// we want to ignore all files and subdirs
Expand All @@ -259,13 +264,26 @@ private void addFilesRecursively(String filepath, LocalDateTime lastScanDate)
ignoreFolder = true;
break;
}

// We check if we have a .meta.yml file (or equivalent) within this folder in which case
// we want to merge its content with the current file metadata
if (child.getName().equalsIgnoreCase(metadataFilename)) {
logger.debug("We found a [{}] file in folder: [{}]", metadataFilename, filepath);
metadata = fileAbstractor.getInputStream(child);
}
}

if (!ignoreFolder) {
for (FileAbstractModel child : children) {
logger.trace("FileAbstractModel = {}", child);
String filename = child.getName();

// If the filename is the expected metadata file, we skip it
if (filename.equalsIgnoreCase(metadataFilename)) {
logger.trace("Skipping metadata file [{}]", filename);
continue;
}

String virtualFileName = computeVirtualPathName(stats.getRootPath(), computeRealPathName(filepath, filename));

// https://github.com/dadoonet/fscrawler/issues/1 : Filter documents
Expand All @@ -284,7 +302,7 @@ private void addFilesRecursively(String filepath, LocalDateTime lastScanDate)
if (fsSettings.getFs().isIndexContent() || fsSettings.getFs().isStoreSource()) {
inputStream = fileAbstractor.getInputStream(child);
}
indexFile(child, stats, filepath, inputStream, child.getSize());
indexFile(child, stats, filepath, inputStream, child.getSize(), metadata);
stats.addFile();
} catch (Exception e) {
if (fsSettings.getFs().isContinueOnError()) {
Expand Down Expand Up @@ -386,7 +404,7 @@ private Collection<String> getFolderDirectory(String path) throws Exception {
* Index a file
*/
private void indexFile(FileAbstractModel fileAbstractModel, ScanStatistic stats, String dirname, InputStream inputStream,
long filesize) throws Exception {
long filesize, InputStream externalTags) throws Exception {
final String filename = fileAbstractModel.getName();
final LocalDateTime created = fileAbstractModel.getCreationDate();
final LocalDateTime lastModified = fileAbstractModel.getLastModifiedDate();
Expand Down Expand Up @@ -451,16 +469,18 @@ private void indexFile(FileAbstractModel fileAbstractModel, ScanStatistic stats,
TikaDocParser.generate(fsSettings, inputStream, filename, fullFilename, doc, messageDigest, filesize);
}

Doc mergedDoc = DocUtils.getMergedDoc(doc, metadataFilename, externalTags);

// We index the data structure
if (isIndexable(doc.getContent(), fsSettings.getFs().getFilters())) {
if (isIndexable(mergedDoc.getContent(), fsSettings.getFs().getFilters())) {
if (!closed) {
FSCrawlerLogger.documentDebug(id,
computeVirtualPathName(stats.getRootPath(), fullFilename),
"Indexing content");
documentService.index(
fsSettings.getElasticsearch().getIndex(),
id,
doc,
mergedDoc,
fsSettings.getElasticsearch().getPipeline());
} else {
logger.warn("trying to add new file while closing crawler. Document [{}]/[{}] has been ignored",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,16 @@ private void loadAndPushIndexTemplate(int version, String name, String index) th
pushIndexTemplate(name + "_" + index, json);
}

public void deleteIndexTemplate(String indexTemplate) throws ElasticsearchClientException {
logger.debug("delete index template [{}]", indexTemplate);
String url = "_index_template/" + indexTemplate;
try {
httpDelete(url, null);
} catch (NotFoundException e) {
logger.trace("Index template [{}] does not exist", indexTemplate);
}
}

/**
* Reads a resource file from the classpath or from a JAR.
* @param source The target
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,13 @@ public interface IElasticsearchClient extends Closeable {
*/
void createIndexAndComponentTemplates() throws Exception;

/**
* Delete an index template (only needed for tests)
* @param indexTemplate Index template name
* @throws ElasticsearchClientException in case of error
*/
void deleteIndexTemplate(String indexTemplate) throws ElasticsearchClientException;

/**
* Run a search
* @param request Search Request
Expand Down
Loading

0 comments on commit 253e458

Please sign in to comment.