Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

control-service: add file filter before job upload #2540

Merged
merged 11 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ spec:
value: "{{ .Values.monitoringSync.initialDelayMillis }}"
- name: UPLOAD_VALIDATION_FILETYPES_ALLOWLIST
value: "{{ .Values.uploadValidationFileTypesAllowList }}"
- name: UPLOAD_VALIDATION_FILETYPES_FILTER_LIST
value: "{{ .Values.uploadValidationFileTypesFilterList }}"
- name: DATAJOBS_TEMP_STORAGE_FOLDER
value: /datajobs_temp_storage
{{- if .Values.datajobTemplate.enabled }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ uploadGitReadWritePassword: ""
# If set to empty, then all file types are allowed.
uploadValidationFileTypesAllowList: ""

# Works as the uploadValidationFileTypesAllowList above, only it deletes the files instead of failing
# the job upload. Runs before the allow list, therefore if only files of the same types are present in
# both lists, job upload will succeed.
uploadValidationFileTypesFilterList: ""

## [Required] The repository where the data job images will be stored (so write permission must be granted)
## Automatically a repository will be created for each data job in ${deploymentDockerRepository}/data-job-name
## (without https:// scheme)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class JobUpload {
private final FeatureFlags featureFlags;
private final AuthorizationProvider authorizationProvider;
private final JobUploadValidator jobUploadValidator;
private final JobUploadFileFilter jobUploadFileFilter;

@Autowired
public JobUpload(
Expand All @@ -48,13 +49,15 @@ public JobUpload(
GitWrapper gitWrapper,
FeatureFlags featureFlags,
AuthorizationProvider authorizationProvider,
JobUploadValidator jobUploadValidator) {
JobUploadValidator jobUploadValidator,
JobUploadFileFilter jobUploadFileFilter) {
this.datajobsTempStorageFolder = datajobsTempStorageFolder;
this.gitCredentialsProvider = gitCredentialsProvider;
this.gitWrapper = gitWrapper;
this.featureFlags = featureFlags;
this.authorizationProvider = authorizationProvider;
this.jobUploadValidator = jobUploadValidator;
this.jobUploadFileFilter = jobUploadFileFilter;
}

/**
Expand Down Expand Up @@ -116,6 +119,7 @@ public String publishDataJob(String jobName, Resource resource, String reason) {
try (var tempDirPath = new EphemeralFile(datajobsTempStorageFolder, jobName, "deploy")) {
File jobFolder =
FileUtils.unzipDataJob(resource, new File(tempDirPath.toFile(), "job"), jobName);
jobUploadFileFilter.filterDirectory(jobFolder, jobName);
jobUploadValidator.validateJob(jobName, jobFolder.toPath());

Git git =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2021-2023 VMware, Inc.
* SPDX-License-Identifier: Apache-2.0
*/

package com.vmware.taurus.service.upload;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

/**
* This class filters out the job directory from forbidden files so that it can be uploaded without
* them. Forbidden files are specified in a comma separated list in apache tika format. Full list of
* supported files can be found - https://tika.apache.org ; This operation is intended to allow the
* upload of a data job by deleting any files specified in the:
* upload.validation.fileTypes.filterlist property.
*/
@Service
@Slf4j
public class JobUploadFileFilter {

private final String[] filterList;
private FileFormatDetector formatDetector;

public JobUploadFileFilter(
@Value("${upload.validation.fileTypes.filterlist:}") String[] filterList) {
this.filterList = filterList;
this.formatDetector = new FileFormatDetector();
}

/**
* Data job directory to be filtered from files present in the
* upload.validation.fileTypes.filterlist variable. All matching files are deleted from the
* directory and sub-directories before being uploaded to git version control.
*
* @param jobFolder
* @param jobName
*/
public void filterDirectory(File jobFolder, String jobName) {

if (this.filterList.length == 0) {
return;
}

try (Stream<Path> stream = Files.walk(Paths.get(jobFolder.getPath()))) {
stream
.filter(Files::isRegularFile)
.forEach(
file -> {
validateFile(file.toAbsolutePath(), jobName);
});
} catch (IOException e) {
log.error("Exception while processing filter list: {}", e);
throw new InvalidJobUpload(
jobName,
"The control-service was unable to process the filter list of files.",
"Please check for any corrupted files and try again or contact support.");
}
}

private void validateFile(Path filePath, String jobName) {
if (filePath.toFile().isDirectory()) {
return;
}
try {
String fileType = this.formatDetector.detectFileType(filePath);
if (Arrays.stream(filterList)
.anyMatch(allowed -> formatDetector.matchTypes(fileType, allowed))) {
filePath.toFile().delete();
}
} catch (IOException e) {
throw new InvalidJobUpload(
jobName,
"The control-service was unable to process the file: " + filePath.getFileName(),
"Please check the file and fix any issues/try again or contact support.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ datajobs.temp.storage.folder=${DATAJOBS_TEMP_STORAGE_FOLDER:}
# If set to empty, then all file types are allowed.
upload.validation.fileTypes.allowlist=${UPLOAD_VALIDATION_FILETYPES_ALLOWLIST:}

# List of file types that are automatically deleted from data job source code
# before upload. Works exactly like the upload.validation.fileTypes.allowlist above,
# only it deletes the files. Good candidates are program generated files such as
# *.pyc etc. Runs before the allowList file logic.
upload.validation.fileTypes.filterlist=${UPLOAD_VALIDATION_FILETYPES_FILTER_LIST:}

# If the job builder image is saved in a private docker registry then this
# property should have the name of the secret in the env.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<mime-type type="application/pyc">
<_comment>Python compiled file</_comment>
<glob pattern="*.pyc"/>
<sub-class-of type="application/octet-stream"/>
</mime-type>
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2021-2023 VMware, Inc.
* SPDX-License-Identifier: Apache-2.0
*/

package com.vmware.taurus.service.upload;

import com.vmware.taurus.ControlplaneApplication;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.io.ClassPathResource;
import org.springframework.test.context.TestPropertySource;

@SpringBootTest(classes = ControlplaneApplication.class)
@TestPropertySource(properties = {"upload.validation.fileTypes.filterlist=application/pyc"})
public class JobFileFilterTest {

@Autowired private JobUploadFileFilter jobUploadFileFilter;

static File getTestJob() throws IOException {
return Paths.get(new ClassPathResource("filter-job").getURI()).toFile();
}

@Test
void testDeletePycFileBeforeUpload() throws IOException {
var jobDirectoryFiles =
FileUtils.listFiles(getTestJob(), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);

boolean pycFileExists =
jobDirectoryFiles.stream()
.anyMatch(file -> file.toString().endsWith("10_python_step.cpython-39.pyc"));
Assertions.assertTrue(pycFileExists);
Assertions.assertEquals(jobDirectoryFiles.stream().count(), 6);

jobUploadFileFilter.filterDirectory(getTestJob(), "test-job");

jobDirectoryFiles =
FileUtils.listFiles(getTestJob(), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
pycFileExists =
jobDirectoryFiles.stream()
.anyMatch(file -> file.toString().endsWith("10_python_step.cpython-39.pyc"));
Assertions.assertFalse(pycFileExists);
Assertions.assertEquals(jobDirectoryFiles.stream().count(), 5);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,20 @@

package com.vmware.taurus.service.upload;

import com.google.common.collect.Iterables;
import static com.vmware.taurus.service.upload.FileUtils.createTempDir;

import com.google.common.collect.Iterables;
import com.vmware.taurus.ControlplaneApplication;
import com.vmware.taurus.TestIOUtils;
import com.vmware.taurus.authorization.provider.AuthorizationProvider;
import com.vmware.taurus.base.FeatureFlags;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Optional;
import net.lingala.zip4j.ZipFile;
import net.lingala.zip4j.exception.ZipException;
import net.lingala.zip4j.model.ZipParameters;
Expand All @@ -19,7 +27,11 @@
import org.eclipse.jgit.api.ResetCommand;
import org.eclipse.jgit.api.errors.GitAPIException;
import org.eclipse.jgit.revwalk.RevCommit;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mock;
Expand All @@ -31,16 +43,6 @@
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;

import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Optional;

import static com.vmware.taurus.service.upload.FileUtils.createTempDir;

@ExtendWith(MockitoExtension.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@SpringBootTest(classes = ControlplaneApplication.class)
Expand All @@ -59,6 +61,8 @@ public class JobUploadTest {

@Mock private JobUploadValidator jobUploadValidator;

@Mock private JobUploadFileFilter jobUploadFileFilter;

private JobUpload jobUpload;

@Value("${datajobs.git.branch}")
Expand All @@ -85,7 +89,8 @@ public void setup() throws GitAPIException, IOException {
gitWrapper,
featureFlags,
authorizationProvider,
jobUploadValidator);
jobUploadValidator,
jobUploadFileFilter);
}

@AfterEach
Expand Down Expand Up @@ -253,7 +258,8 @@ public void testICanOverrideTheDefaultTempDirectoryAndUploadAndDeleteStillWork()
gitWrapper,
featureFlags,
authorizationProvider,
jobUploadValidator);
jobUploadValidator,
jobUploadFileFilter);

Mockito.when(featureFlags.isSecurityEnabled()).thenReturn(true);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging

from vdk.api.job_input import IJobInput

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
"""
Function named `run` is required in order for a python script to be recognized as a Data Job Python step and executed.

VDK provides to every python step an object - job_input - that has methods for:

* executing queries to OLAP Database.
* ingesting data into Data Lake
* processing Data Lake data into a dimensional model Data Warehouse.

See IJobInput documentation.
"""
write_directory = job_input.get_temporary_write_directory()
file_name = "example123.txt"
file_path = write_directory.joinpath(file_name)
with open(file_path, "a") as file:
log.info(f"file: {file}")
file.write("Content")

log.info(f"file_path: {file_path}")
log.info(f"file_name: {file_name}")
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT 1;
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# My shiny new job

Versatile Data Kit feature allows you to implement automated pull ingestion and batch data processing.

### Create the data job Files

Data Job directory can contain any files, however there are some files that are treated in a specific way:

* SQL files (.sql) - called SQL steps - are directly executed as queries against your configured database.
* Python files (.py) - called Python steps - are python scripts tha define run function that takes as argument the job_input object .
* config.ini is needed in order to configure the Job. This is the only required file.
* requirements.txt is an optional file needed when your Python steps use external python libraries.

Delete all files you do not need and replace them with your own

### Data Job Code

VDK supports having many Python and/or SQL steps in a single Data Job. Steps are executed in ascending alphabetical order based on file names.
Prefixing file names with numbers, makes it easy having meaningful names while maintaining steps execution order.

Run the Data Job from a Terminal:
* Make sure you have vdk installed. See Platform documentation on how to install it.
```
vdk run <path to Data Job directory>
```

### Deploy Data Job

When Job is ready to be deployed at Versatile Data Kit runtime(cloud) to be executed in regular manner:
Run below command and follow its instructions (you can see its options with `vdk --help`)
```python
vdk deploy
```
Binary file not shown.
Loading