From 61371022f127a5297f43ddae90307a1bddaa4978 Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Wed, 17 Jun 2020 04:19:06 +0300 Subject: [PATCH] FeatureSets are delivered to Ingestion Job through Kafka (#792) * intermediate table in m2m Job <-> FeatureSet with delivery status + FeatureSet version switch to spring-kafka (configs) specService send message to kafka & expect ack & update status accordingly jobs runner to send source & specs config (source + ack) ingestion job to read specs from kafka and send ack return featureSets in ingestionJob generate uniq topic name for each test run prevent listJobs from failing when job failed on start * fix listJobs with filter --- core/pom.xml | 4 +- .../feast/core/config/FeastProperties.java | 14 + .../core/config/FeatureStreamConfig.java | 127 ++++++--- .../java/feast/core/config/JobConfig.java | 38 ++- .../java/feast/core/dao/JobRepository.java | 8 +- .../java/feast/core/job/JobUpdateTask.java | 43 +-- .../core/job/dataflow/DataflowJobManager.java | 48 ++-- .../job/direct/DirectRunnerJobManager.java | 33 +-- .../java/feast/core/model/FeatureSet.java | 37 ++- .../feast/core/model/FeatureSetJobStatus.java | 64 +++++ core/src/main/java/feast/core/model/Job.java | 22 +- .../core/service/JobCoordinatorService.java | 38 +-- .../java/feast/core/service/JobService.java | 11 +- .../java/feast/core/service/SpecService.java | 109 +++++++- .../feast/core/util/KafkaSerialization.java | 68 +++++ .../main/resources/application-override.yaml | 0 core/src/main/resources/application.yml | 4 + .../feast/core/job/JobUpdateTaskTest.java | 16 +- .../job/dataflow/DataflowJobManagerTest.java | 45 ++-- .../direct/DirectRunnerJobManagerTest.java | 37 +-- .../service/JobCoordinatorServiceTest.java | 18 +- .../feast/core/service/JobServiceTest.java | 7 +- .../feast/core/service/SpecServiceTest.java | 188 +++++++++++++- .../feast/core/service/TestObjectFactory.java | 21 +- .../java/feast/core/util/ModelHelpers.java | 41 +++ .../scripts/test-end-to-end-batch-dataflow.sh | 5 +- .../values-end-to-end-batch-dataflow.yaml | 3 + ingestion/pom.xml | 7 + .../main/java/feast/ingestion/ImportJob.java | 155 +++++------ .../ingestion/options/ImportOptions.java | 23 +- .../transform/FeatureRowToStoreAllocator.java | 93 +++++++ .../ProcessAndValidateFeatureRows.java | 22 +- .../transform/fn/ValidateFeatureRowDoFn.java | 25 +- .../specs/FilterRelevantFunction.java | 52 ++++ .../specs/KafkaRecordToFeatureSetSpec.java | 53 ++++ .../transform/specs/ReadFeatureSetSpecs.java | 107 ++++++++ .../specs/WriteFeatureSetSpecAck.java | 85 ++++++ .../java/feast/ingestion/utils/SpecUtil.java | 97 ++++--- .../java/feast/ingestion/ImportJobTest.java | 67 +++-- .../FeatureRowToStoreAllocatorTest.java | 99 +++++++ .../ProcessAndValidateFeatureRowsTest.java | 26 +- .../specs/FeatureSetSpecReadAndWriteTest.java | 244 ++++++++++++++++++ .../transform/specs/FilterRelevantTest.java | 109 ++++++++ .../src/test/java/feast/test/TestUtil.java | 64 +++-- protos/feast/core/FeatureSet.proto | 9 + protos/feast/core/IngestionJob.proto | 15 ++ 46 files changed, 1981 insertions(+), 420 deletions(-) create mode 100644 core/src/main/java/feast/core/model/FeatureSetJobStatus.java create mode 100644 core/src/main/java/feast/core/util/KafkaSerialization.java delete mode 100644 core/src/main/resources/application-override.yaml create mode 100644 core/src/test/java/feast/core/util/ModelHelpers.java create mode 100644 ingestion/src/main/java/feast/ingestion/transform/FeatureRowToStoreAllocator.java create mode 100644 ingestion/src/main/java/feast/ingestion/transform/specs/FilterRelevantFunction.java create mode 100644 ingestion/src/main/java/feast/ingestion/transform/specs/KafkaRecordToFeatureSetSpec.java create mode 100644 ingestion/src/main/java/feast/ingestion/transform/specs/ReadFeatureSetSpecs.java create mode 100644 ingestion/src/main/java/feast/ingestion/transform/specs/WriteFeatureSetSpecAck.java create mode 100644 ingestion/src/test/java/feast/ingestion/transform/FeatureRowToStoreAllocatorTest.java create mode 100644 ingestion/src/test/java/feast/ingestion/transform/specs/FeatureSetSpecReadAndWriteTest.java create mode 100644 ingestion/src/test/java/feast/ingestion/transform/specs/FilterRelevantTest.java diff --git a/core/pom.xml b/core/pom.xml index f28a1fa28b..81dcb144de 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -171,8 +171,8 @@ - org.apache.kafka - kafka-clients + org.springframework.kafka + spring-kafka diff --git a/core/src/main/java/feast/core/config/FeastProperties.java b/core/src/main/java/feast/core/config/FeastProperties.java index 6dad278242..b508de76e5 100644 --- a/core/src/main/java/feast/core/config/FeastProperties.java +++ b/core/src/main/java/feast/core/config/FeastProperties.java @@ -139,6 +139,10 @@ public static class StreamProperties { /* Feature stream options */ @NotNull private FeatureStreamOptions options; + /* FeatureSetSpec stream options - communication channel between SpecService and IngestionJob + * to update Spec inside job w/o restart */ + @NotNull private FeatureSetSpecStreamProperties specsOptions; + /** Feature stream options */ @Getter @Setter @@ -159,6 +163,16 @@ public static class FeatureStreamOptions { /* Number of Kafka partitions to to use for managed feature stream. */ @Positive private int partitions = 1; } + + @Getter + @Setter + public static class FeatureSetSpecStreamProperties { + /* Kafka topic to send feature set spec to ingestion streaming job */ + @NotBlank private String specsTopic = "feast-feature-set-specs"; + + /* Kafka topic to receive acknowledgment from ingestion job on successful processing of new specs */ + @NotBlank private String specsAckTopic = "feast-feature-set-specs-ack"; + } } /** Feast population job metrics */ diff --git a/core/src/main/java/feast/core/config/FeatureStreamConfig.java b/core/src/main/java/feast/core/config/FeatureStreamConfig.java index c1982604c3..40647bb2e2 100644 --- a/core/src/main/java/feast/core/config/FeatureStreamConfig.java +++ b/core/src/main/java/feast/core/config/FeatureStreamConfig.java @@ -16,30 +16,119 @@ */ package feast.core.config; -import com.google.common.base.Strings; import feast.core.config.FeastProperties.StreamProperties; import feast.core.model.Source; +import feast.core.util.KafkaSerialization; +import feast.proto.core.FeatureSetProto; +import feast.proto.core.IngestionJobProto; import feast.proto.core.SourceProto.KafkaSourceConfig; import feast.proto.core.SourceProto.SourceType; -import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ExecutionException; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; -import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.*; @Slf4j @Configuration public class FeatureStreamConfig { String DEFAULT_KAFKA_REQUEST_TIMEOUT_MS_CONFIG = "15000"; + int DEFAULT_SPECS_TOPIC_PARTITIONING = 1; + short DEFAULT_SPECS_TOPIC_REPLICATION = 3; + + @Bean + public KafkaAdmin admin(FeastProperties feastProperties) { + String bootstrapServers = feastProperties.getStream().getOptions().getBootstrapServers(); + + Map configs = new HashMap<>(); + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + configs.put( + AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, DEFAULT_KAFKA_REQUEST_TIMEOUT_MS_CONFIG); + return new KafkaAdmin(configs); + } + + @Bean + public NewTopic featureRowsTopic(FeastProperties feastProperties) { + StreamProperties streamProperties = feastProperties.getStream(); + + return new NewTopic( + streamProperties.getOptions().getTopic(), + streamProperties.getOptions().getPartitions(), + streamProperties.getOptions().getReplicationFactor()); + } + + @Bean + public NewTopic featureSetSpecsTopic(FeastProperties feastProperties) { + StreamProperties streamProperties = feastProperties.getStream(); + Map configs = new HashMap<>(); + configs.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT); + + NewTopic topic = + new NewTopic( + streamProperties.getSpecsOptions().getSpecsTopic(), + DEFAULT_SPECS_TOPIC_PARTITIONING, + DEFAULT_SPECS_TOPIC_REPLICATION); + + topic.configs(configs); + return topic; + } + + @Bean + public NewTopic featureSetSpecsAckTopic(FeastProperties feastProperties) { + StreamProperties streamProperties = feastProperties.getStream(); + + return new NewTopic( + streamProperties.getSpecsOptions().getSpecsAckTopic(), + DEFAULT_SPECS_TOPIC_PARTITIONING, + (short) 1); + } + + @Bean + public KafkaTemplate specKafkaTemplate( + FeastProperties feastProperties) { + StreamProperties streamProperties = feastProperties.getStream(); + Map props = new HashMap<>(); + + props.put( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + streamProperties.getOptions().getBootstrapServers()); + + KafkaTemplate t = + new KafkaTemplate<>( + new DefaultKafkaProducerFactory<>( + props, new StringSerializer(), new KafkaSerialization.ProtoSerializer<>())); + t.setDefaultTopic(streamProperties.getSpecsOptions().getSpecsTopic()); + return t; + } + + @Bean + public ConsumerFactory ackConsumerFactory(FeastProperties feastProperties) { + StreamProperties streamProperties = feastProperties.getStream(); + Map props = new HashMap<>(); + + props.put( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + streamProperties.getOptions().getBootstrapServers()); + props.put( + ConsumerConfig.GROUP_ID_CONFIG, + String.format( + "core-service-%s", FeatureStreamConfig.class.getPackage().getImplementationVersion())); + + return new DefaultKafkaConsumerFactory<>( + props, + new StringDeserializer(), + new KafkaSerialization.ProtoDeserializer<>(IngestionJobProto.FeatureSetSpecAck.parser())); + } @Autowired @Bean @@ -50,31 +139,7 @@ public Source getDefaultSource(FeastProperties feastProperties) { case KAFKA: String bootstrapServers = streamProperties.getOptions().getBootstrapServers(); String topicName = streamProperties.getOptions().getTopic(); - Map map = new HashMap<>(); - map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - map.put( - AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, DEFAULT_KAFKA_REQUEST_TIMEOUT_MS_CONFIG); - AdminClient client = AdminClient.create(map); - - NewTopic newTopic = - new NewTopic( - topicName, - streamProperties.getOptions().getPartitions(), - streamProperties.getOptions().getReplicationFactor()); - CreateTopicsResult createTopicsResult = - client.createTopics(Collections.singleton(newTopic)); - try { - createTopicsResult.values().get(topicName).get(); - } catch (InterruptedException | ExecutionException e) { - if (e.getCause().getClass().equals(TopicExistsException.class)) { - log.warn( - Strings.lenientFormat( - "Unable to create topic %s in the feature stream, topic already exists, using existing topic.", - topicName)); - } else { - throw new RuntimeException(e.getMessage(), e); - } - } + KafkaSourceConfig sourceConfig = KafkaSourceConfig.newBuilder() .setBootstrapServers(bootstrapServers) diff --git a/core/src/main/java/feast/core/config/JobConfig.java b/core/src/main/java/feast/core/config/JobConfig.java index 30023de064..fd0dc6ef5c 100644 --- a/core/src/main/java/feast/core/config/JobConfig.java +++ b/core/src/main/java/feast/core/config/JobConfig.java @@ -24,8 +24,10 @@ import feast.core.job.dataflow.DataflowJobManager; import feast.core.job.direct.DirectJobRegistry; import feast.core.job.direct.DirectRunnerJobManager; +import feast.proto.core.IngestionJobProto; import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions; import feast.proto.core.RunnerProto.DirectRunnerConfigOptions; +import feast.proto.core.SourceProto; import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -38,6 +40,30 @@ public class JobConfig { private final Gson gson = new Gson(); + /** + * Create SpecsStreamingUpdateConfig, which is used to set up communications (bi-directional + * channel) to send new FeatureSetSpec to IngestionJob and receive acknowledgments. + * + * @param feastProperties feast config properties + */ + @Bean + public IngestionJobProto.SpecsStreamingUpdateConfig createSpecsStreamingUpdateConfig( + FeastProperties feastProperties) { + FeastProperties.StreamProperties streamProperties = feastProperties.getStream(); + + return IngestionJobProto.SpecsStreamingUpdateConfig.newBuilder() + .setSource( + SourceProto.KafkaSourceConfig.newBuilder() + .setBootstrapServers(streamProperties.getOptions().getBootstrapServers()) + .setTopic(streamProperties.getSpecsOptions().getSpecsTopic()) + .build()) + .setAck( + SourceProto.KafkaSourceConfig.newBuilder() + .setBootstrapServers(streamProperties.getOptions().getBootstrapServers()) + .setTopic(streamProperties.getSpecsOptions().getSpecsAckTopic())) + .build(); + } + /** * Get a JobManager according to the runner type and Dataflow configuration. * @@ -45,7 +71,9 @@ public class JobConfig { */ @Bean @Autowired - public JobManager getJobManager(FeastProperties feastProperties) + public JobManager getJobManager( + FeastProperties feastProperties, + IngestionJobProto.SpecsStreamingUpdateConfig specsStreamingUpdateConfig) throws InvalidProtocolBufferException { JobProperties jobProperties = feastProperties.getJobs(); @@ -60,13 +88,17 @@ public JobManager getJobManager(FeastProperties feastProperties) DataflowRunnerConfigOptions.Builder dataflowRunnerConfigOptions = DataflowRunnerConfigOptions.newBuilder(); JsonFormat.parser().merge(configJson, dataflowRunnerConfigOptions); - return new DataflowJobManager(dataflowRunnerConfigOptions.build(), metrics); + return new DataflowJobManager( + dataflowRunnerConfigOptions.build(), metrics, specsStreamingUpdateConfig); case DIRECT: DirectRunnerConfigOptions.Builder directRunnerConfigOptions = DirectRunnerConfigOptions.newBuilder(); JsonFormat.parser().merge(configJson, directRunnerConfigOptions); return new DirectRunnerJobManager( - directRunnerConfigOptions.build(), new DirectJobRegistry(), metrics); + directRunnerConfigOptions.build(), + new DirectJobRegistry(), + metrics, + specsStreamingUpdateConfig); default: throw new IllegalArgumentException("Unsupported runner: " + runner); } diff --git a/core/src/main/java/feast/core/dao/JobRepository.java b/core/src/main/java/feast/core/dao/JobRepository.java index c61f3eacc0..58af8c7ec5 100644 --- a/core/src/main/java/feast/core/dao/JobRepository.java +++ b/core/src/main/java/feast/core/dao/JobRepository.java @@ -16,10 +16,8 @@ */ package feast.core.dao; -import feast.core.model.FeatureSet; +import feast.core.model.FeatureSetJobStatus; import feast.core.model.Job; -import feast.core.model.JobStatus; -import java.util.Collection; import java.util.List; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository; @@ -27,13 +25,11 @@ /** JPA repository supplying Job objects keyed by ID. */ @Repository public interface JobRepository extends JpaRepository { - List findByStatusNotIn(Collection statuses); - List findBySourceIdAndStoreNameOrderByLastUpdatedDesc(String sourceId, String storeName); // find jobs by feast store name List findByStoreName(String storeName); // find jobs by featureset - List findByFeatureSetsIn(List featureSets); + List findByFeatureSetJobStatusesIn(List featureSetsJobStatuses); } diff --git a/core/src/main/java/feast/core/job/JobUpdateTask.java b/core/src/main/java/feast/core/job/JobUpdateTask.java index 0c79bb620d..9abdfa3e45 100644 --- a/core/src/main/java/feast/core/job/JobUpdateTask.java +++ b/core/src/main/java/feast/core/job/JobUpdateTask.java @@ -20,12 +20,8 @@ import feast.core.log.Action; import feast.core.log.AuditLogger; import feast.core.log.Resource; -import feast.core.model.FeatureSet; -import feast.core.model.Job; -import feast.core.model.JobStatus; -import feast.core.model.Source; -import feast.core.model.Store; -import feast.proto.core.FeatureSetProto.FeatureSetStatus; +import feast.core.model.*; +import feast.proto.core.FeatureSetProto; import java.time.Instant; import java.util.List; import java.util.Optional; @@ -103,16 +99,12 @@ public Job call() { } boolean requiresUpdate(Job job) { - // If set of feature sets has changed - if (!Sets.newHashSet(featureSets).equals(Sets.newHashSet(job.getFeatureSets()))) { + // if store subscriptions have changed + if (!Sets.newHashSet(store.getSubscriptions()) + .equals(Sets.newHashSet(job.getStore().getSubscriptions()))) { return true; } - // If any of the incoming feature sets were updated - for (FeatureSet featureSet : featureSets) { - if (featureSet.getStatus() == FeatureSetStatus.STATUS_PENDING) { - return true; - } - } + return false; } @@ -123,10 +115,15 @@ private Job createJob() { /** Start or update the job to ingest data to the sink. */ private Job startJob(String jobId) { + Job job = new Job(); + job.setId(jobId); + job.setRunner(jobManager.getRunnerType()); + job.setSource(source); + job.setStore(store); + job.setStatus(JobStatus.PENDING); + + updateFeatureSets(job); - Job job = - new Job( - jobId, "", jobManager.getRunnerType(), source, store, featureSets, JobStatus.PENDING); try { logAudit(Action.SUBMIT, job, "Building graph and submitting to %s", runnerName); @@ -151,9 +148,19 @@ private Job startJob(String jobId) { } } + private void updateFeatureSets(Job job) { + for (FeatureSet fs : featureSets) { + FeatureSetJobStatus status = new FeatureSetJobStatus(); + status.setFeatureSet(fs); + status.setJob(job); + status.setDeliveryStatus(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); + job.getFeatureSetJobStatuses().add(status); + } + } + /** Update the given job */ private Job updateJob(Job job) { - job.setFeatureSets(featureSets); + updateFeatureSets(job); job.setStore(store); logAudit(Action.UPDATE, job, "Updating job %s for runner %s", job.getId(), runnerName); return jobManager.updateJob(job); diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index 50a2322407..181b2339b3 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -31,21 +31,16 @@ import feast.core.exception.JobExecutionException; import feast.core.job.JobManager; import feast.core.job.Runner; -import feast.core.job.option.FeatureSetJsonByteConverter; import feast.core.model.*; import feast.ingestion.ImportJob; -import feast.ingestion.options.BZip2Compressor; import feast.ingestion.options.ImportOptions; -import feast.ingestion.options.OptionCompressor; -import feast.proto.core.FeatureSetProto; +import feast.proto.core.IngestionJobProto; import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions; import feast.proto.core.SourceProto; import feast.proto.core.StoreProto; import java.io.IOException; import java.security.GeneralSecurityException; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; import lombok.extern.slf4j.Slf4j; import org.apache.beam.runners.dataflow.DataflowPipelineJob; import org.apache.beam.runners.dataflow.DataflowRunner; @@ -62,15 +57,19 @@ public class DataflowJobManager implements JobManager { private final Dataflow dataflow; private final DataflowRunnerConfig defaultOptions; private final MetricsProperties metrics; + private final IngestionJobProto.SpecsStreamingUpdateConfig specsStreamingUpdateConfig; public DataflowJobManager( - DataflowRunnerConfigOptions runnerConfigOptions, MetricsProperties metricsProperties) { - this(runnerConfigOptions, metricsProperties, getGoogleCredential()); + DataflowRunnerConfigOptions runnerConfigOptions, + MetricsProperties metricsProperties, + IngestionJobProto.SpecsStreamingUpdateConfig specsStreamingUpdateConfig) { + this(runnerConfigOptions, metricsProperties, specsStreamingUpdateConfig, getGoogleCredential()); } public DataflowJobManager( DataflowRunnerConfigOptions runnerConfigOptions, MetricsProperties metricsProperties, + IngestionJobProto.SpecsStreamingUpdateConfig specsStreamingUpdateConfig, Credential credential) { defaultOptions = new DataflowRunnerConfig(runnerConfigOptions); @@ -91,6 +90,7 @@ public DataflowJobManager( this.metrics = metricsProperties; this.projectId = defaultOptions.getProject(); this.location = defaultOptions.getRegion(); + this.specsStreamingUpdateConfig = specsStreamingUpdateConfig; } private static Credential getGoogleCredential() { @@ -112,17 +112,9 @@ public Runner getRunnerType() { @Override public Job startJob(Job job) { try { - List featureSetProtos = new ArrayList<>(); - for (FeatureSet featureSet : job.getFeatureSets()) { - featureSetProtos.add(featureSet.toProto()); - } String extId = submitDataflowJob( - job.getId(), - featureSetProtos, - job.getSource().toProto(), - job.getStore().toProto(), - false); + job.getId(), job.getSource().toProto(), job.getStore().toProto(), false); job.setExtId(extId); return job; @@ -223,13 +215,9 @@ public JobStatus getJobStatus(Job job) { } private String submitDataflowJob( - String jobName, - List featureSetProtos, - SourceProto.Source source, - StoreProto.Store sink, - boolean update) { + String jobName, SourceProto.Source source, StoreProto.Store sink, boolean update) { try { - ImportOptions pipelineOptions = getPipelineOptions(jobName, featureSetProtos, sink, update); + ImportOptions pipelineOptions = getPipelineOptions(jobName, source, sink, update); DataflowPipelineJob pipelineResult = runPipeline(pipelineOptions); String jobId = waitForJobToRun(pipelineResult); return jobId; @@ -240,19 +228,17 @@ private String submitDataflowJob( } private ImportOptions getPipelineOptions( - String jobName, - List featureSets, - StoreProto.Store sink, - boolean update) + String jobName, SourceProto.Source source, StoreProto.Store sink, boolean update) throws IOException, IllegalAccessException { ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(defaultOptions.toArgs()).as(ImportOptions.class); - OptionCompressor> featureSetJsonCompressor = - new BZip2Compressor<>(new FeatureSetJsonByteConverter()); + JsonFormat.Printer jsonPrinter = JsonFormat.printer(); - pipelineOptions.setFeatureSetJson(featureSetJsonCompressor.compress(featureSets)); - pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink))); + pipelineOptions.setSpecsStreamingUpdateConfigJson( + jsonPrinter.print(specsStreamingUpdateConfig)); + pipelineOptions.setSourceJson(jsonPrinter.print(source)); + pipelineOptions.setStoreJson(Collections.singletonList(jsonPrinter.print(sink))); pipelineOptions.setProject(projectId); pipelineOptions.setDefaultFeastProject(Project.DEFAULT_NAME); pipelineOptions.setUpdate(update); diff --git a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java index 715adbdd43..242ac43496 100644 --- a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java +++ b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java @@ -22,22 +22,15 @@ import feast.core.exception.JobExecutionException; import feast.core.job.JobManager; import feast.core.job.Runner; -import feast.core.job.option.FeatureSetJsonByteConverter; -import feast.core.model.FeatureSet; -import feast.core.model.Job; -import feast.core.model.JobStatus; -import feast.core.model.Project; +import feast.core.model.*; import feast.ingestion.ImportJob; -import feast.ingestion.options.BZip2Compressor; import feast.ingestion.options.ImportOptions; -import feast.ingestion.options.OptionCompressor; -import feast.proto.core.FeatureSetProto; +import feast.proto.core.IngestionJobProto; import feast.proto.core.RunnerProto.DirectRunnerConfigOptions; +import feast.proto.core.SourceProto; import feast.proto.core.StoreProto; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; import lombok.extern.slf4j.Slf4j; import org.apache.beam.runners.direct.DirectRunner; import org.apache.beam.sdk.PipelineResult; @@ -51,14 +44,17 @@ public class DirectRunnerJobManager implements JobManager { private DirectRunnerConfig defaultOptions; private final DirectJobRegistry jobs; private MetricsProperties metrics; + private final IngestionJobProto.SpecsStreamingUpdateConfig specsStreamingUpdateConfig; public DirectRunnerJobManager( DirectRunnerConfigOptions directRunnerConfigOptions, DirectJobRegistry jobs, - MetricsProperties metricsProperties) { + MetricsProperties metricsProperties, + IngestionJobProto.SpecsStreamingUpdateConfig specsStreamingUpdateConfig) { this.defaultOptions = new DirectRunnerConfig(directRunnerConfigOptions); this.jobs = jobs; this.metrics = metricsProperties; + this.specsStreamingUpdateConfig = specsStreamingUpdateConfig; } @Override @@ -74,12 +70,8 @@ public Runner getRunnerType() { @Override public Job startJob(Job job) { try { - List featureSetProtos = new ArrayList<>(); - for (FeatureSet featureSet : job.getFeatureSets()) { - featureSetProtos.add(featureSet.toProto()); - } ImportOptions pipelineOptions = - getPipelineOptions(job.getId(), featureSetProtos, job.getStore().toProto()); + getPipelineOptions(job.getId(), job.getSource().toProto(), job.getStore().toProto()); PipelineResult pipelineResult = runPipeline(pipelineOptions); DirectJob directJob = new DirectJob(job.getId(), pipelineResult); jobs.add(directJob); @@ -93,15 +85,14 @@ public Job startJob(Job job) { } private ImportOptions getPipelineOptions( - String jobName, List featureSets, StoreProto.Store sink) + String jobName, SourceProto.Source source, StoreProto.Store sink) throws IOException, IllegalAccessException { ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(defaultOptions.toArgs()).as(ImportOptions.class); - OptionCompressor> featureSetJsonCompressor = - new BZip2Compressor<>(new FeatureSetJsonByteConverter()); - - pipelineOptions.setFeatureSetJson(featureSetJsonCompressor.compress(featureSets)); + pipelineOptions.setSpecsStreamingUpdateConfigJson( + JsonFormat.printer().print(specsStreamingUpdateConfig)); + pipelineOptions.setSourceJson(JsonFormat.printer().print(source)); pipelineOptions.setJobName(jobName); pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink))); pipelineOptions.setRunner(DirectRunner.class); diff --git a/core/src/main/java/feast/core/model/FeatureSet.java b/core/src/main/java/feast/core/model/FeatureSet.java index f7b2dc7cd4..e1b50098b9 100644 --- a/core/src/main/java/feast/core/model/FeatureSet.java +++ b/core/src/main/java/feast/core/model/FeatureSet.java @@ -29,6 +29,7 @@ import lombok.Getter; import lombok.Setter; import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.springframework.data.util.Pair; import org.tensorflow.metadata.v0.*; @Getter @@ -84,6 +85,12 @@ public class FeatureSet extends AbstractTimestampEntity { @Column(name = "labels", columnDefinition = "text") private String labels; + @Column(name = "version", columnDefinition = "integer default 0") + private int version; + + @OneToMany(mappedBy = "featureSet", cascade = CascadeType.ALL) + private Set jobStatuses = new HashSet<>(); + public FeatureSet() { super(); } @@ -125,6 +132,29 @@ public void setProject(Project project) { this.project = project; } + public String getReference() { + return String.format("%s/%s", getProjectName(), getName()); + } + + /** @return Pair <ProjectName, FeatureSetName> */ + public static Pair parseReference(String reference) { + String[] split = reference.split("/", 2); + if (split.length == 1) { + return Pair.of(Project.DEFAULT_NAME, split[0]); + } + + if (split.length > 2) { + throw new RuntimeException( + "FeatureSet reference must have the format /"); + } + + return Pair.of(split[0], split[1]); + } + + public int incVersion() { + return ++version; + } + public static FeatureSet fromProto(FeatureSetProto.FeatureSet featureSetProto) { FeatureSetSpec featureSetSpec = featureSetProto.getSpec(); Source source = Source.fromProto(featureSetSpec.getSource()); @@ -257,7 +287,8 @@ public FeatureSetProto.FeatureSet toProto() throws InvalidProtocolBufferExceptio .addAllEntities(entitySpecs) .addAllFeatures(featureSpecs) .putAllLabels(TypeConversion.convertJsonStringToMap(labels)) - .setSource(source.toProto()); + .setSource(source.toProto()) + .setVersion(version); return FeatureSetProto.FeatureSet.newBuilder().setMeta(meta).setSpec(spec).build(); } @@ -300,6 +331,10 @@ public boolean equals(Object obj) { return false; } + if (version != other.version) { + return false; + } + // Create a map of all fields in this feature set Map entitiesMap = new HashMap<>(); Map featuresMap = new HashMap<>(); diff --git a/core/src/main/java/feast/core/model/FeatureSetJobStatus.java b/core/src/main/java/feast/core/model/FeatureSetJobStatus.java new file mode 100644 index 0000000000..7f21c26335 --- /dev/null +++ b/core/src/main/java/feast/core/model/FeatureSetJobStatus.java @@ -0,0 +1,64 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.model; + +import feast.proto.core.FeatureSetProto.FeatureSetJobDeliveryStatus; +import java.io.Serializable; +import javax.persistence.*; +import javax.persistence.Entity; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +@Entity +@Table( + name = "jobs_feature_sets", + indexes = { + @Index(name = "idx_jobs_feature_sets_job_id", columnList = "job_id"), + @Index(name = "idx_jobs_feature_sets_feature_sets_id", columnList = "feature_sets_id") + }) +public class FeatureSetJobStatus { + @Embeddable + @EqualsAndHashCode + public static class FeatureSetJobStatusKey implements Serializable { + public FeatureSetJobStatusKey() {} + + @Column(name = "job_id") + String jobId; + + @Column(name = "feature_sets_id") + long featureSetId; + } + + @EmbeddedId private FeatureSetJobStatusKey id = new FeatureSetJobStatusKey(); + + @ManyToOne + @MapsId("jobId") + @JoinColumn(name = "job_id") + private Job job; + + @ManyToOne + @MapsId("featureSetId") + @JoinColumn(name = "feature_sets_id") + private FeatureSet featureSet; + + @Enumerated(EnumType.STRING) + @Column(name = "delivery_status") + private FeatureSetJobDeliveryStatus deliveryStatus; +} diff --git a/core/src/main/java/feast/core/model/Job.java b/core/src/main/java/feast/core/model/Job.java index 5fce3dffbe..ae54221b9c 100644 --- a/core/src/main/java/feast/core/model/Job.java +++ b/core/src/main/java/feast/core/model/Job.java @@ -21,7 +21,9 @@ import feast.proto.core.FeatureSetProto; import feast.proto.core.IngestionJobProto; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import javax.persistence.*; import javax.persistence.Entity; import lombok.AllArgsConstructor; @@ -59,17 +61,9 @@ public class Job extends AbstractTimestampEntity { @JoinColumn(name = "store_name") private Store store; - // FeatureSets populated by the job - @ManyToMany(cascade = CascadeType.ALL) - @JoinTable( - name = "jobs_feature_sets", - joinColumns = @JoinColumn(name = "job_id"), - inverseJoinColumns = @JoinColumn(name = "feature_sets_id"), - indexes = { - @Index(name = "idx_jobs_feature_sets_job_id", columnList = "job_id"), - @Index(name = "idx_jobs_feature_sets_feature_sets_id", columnList = "feature_sets_id") - }) - private List featureSets; + // FeatureSets populated by the job via intermediate FeatureSetJobStatus model + @OneToMany(mappedBy = "job", cascade = CascadeType.ALL) + private Set featureSetJobStatuses = new HashSet<>(); @Enumerated(EnumType.STRING) @Column(name = "status", length = 16) @@ -100,8 +94,8 @@ public IngestionJobProto.IngestionJob toProto() throws InvalidProtocolBufferExce // convert featuresets of job to protos List featureSetProtos = new ArrayList<>(); - for (FeatureSet featureSet : this.getFeatureSets()) { - featureSetProtos.add(featureSet.toProto()); + for (FeatureSetJobStatus featureSet : this.getFeatureSetJobStatuses()) { + featureSetProtos.add(featureSet.getFeatureSet().toProto()); } // build ingestion job proto with job data @@ -110,9 +104,9 @@ public IngestionJobProto.IngestionJob toProto() throws InvalidProtocolBufferExce .setId(this.getId()) .setExternalId(this.getExtId()) .setStatus(this.getStatus().toProto()) - .addAllFeatureSets(featureSetProtos) .setSource(this.getSource().toProto()) .setStore(this.getStore().toProto()) + .addAllFeatureSets(featureSetProtos) .build(); return ingestJob; diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index 90ee54ca16..69da3e507e 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -23,13 +23,9 @@ import feast.core.dao.JobRepository; import feast.core.job.JobManager; import feast.core.job.JobUpdateTask; -import feast.core.model.FeatureSet; -import feast.core.model.Job; -import feast.core.model.Source; -import feast.core.model.Store; +import feast.core.model.*; import feast.proto.core.CoreServiceProto.ListStoresRequest.Filter; import feast.proto.core.CoreServiceProto.ListStoresResponse; -import feast.proto.core.FeatureSetProto.FeatureSetStatus; import feast.proto.core.StoreProto; import feast.proto.core.StoreProto.Store.Subscription; import java.util.ArrayList; @@ -121,9 +117,6 @@ public void Poll() throws InvalidProtocolBufferException { log.info("Creating/Updating {} jobs...", jobUpdateTasks.size()); startOrUpdateJobs(jobUpdateTasks); - - log.info("Updating feature set status"); - updateFeatureSetStatuses(jobUpdateTasks); } void startOrUpdateJobs(List tasks) { @@ -148,35 +141,6 @@ void startOrUpdateJobs(List tasks) { executorService.shutdown(); } - // TODO: make this more efficient - private void updateFeatureSetStatuses(List jobUpdateTasks) { - Set ready = new HashSet<>(); - Set pending = new HashSet<>(); - for (JobUpdateTask task : jobUpdateTasks) { - getJob(task.getSource(), task.getStore()) - .ifPresent( - job -> { - if (job.isRunning()) { - ready.addAll(job.getFeatureSets()); - } else { - pending.addAll(job.getFeatureSets()); - } - }); - } - ready.removeAll(pending); - ready.forEach( - fs -> { - fs.setStatus(FeatureSetStatus.STATUS_READY); - featureSetRepository.save(fs); - }); - pending.forEach( - fs -> { - fs.setStatus(FeatureSetStatus.STATUS_JOB_STARTING); - featureSetRepository.save(fs); - }); - featureSetRepository.flush(); - } - @Transactional public Optional getJob(Source source, Store store) { List jobs = diff --git a/core/src/main/java/feast/core/service/JobService.java b/core/src/main/java/feast/core/service/JobService.java index cc125305ec..b0b7aeeb35 100644 --- a/core/src/main/java/feast/core/service/JobService.java +++ b/core/src/main/java/feast/core/service/JobService.java @@ -122,7 +122,11 @@ public ListIngestionJobsResponse listJobs(ListIngestionJobsRequest request) .collect(Collectors.toList()); // find jobs for the matching featuresets - Collection matchingJobs = this.jobRepository.findByFeatureSetsIn(featureSets); + Collection matchingJobs = + this.jobRepository.findByFeatureSetJobStatusesIn( + featureSets.stream() + .flatMap(fs -> fs.getJobStatuses().stream()) + .collect(Collectors.toList())); List jobIds = matchingJobs.stream().map(Job::getId).collect(Collectors.toList()); matchingJobIds = this.mergeResults(matchingJobIds, jobIds); } @@ -137,6 +141,11 @@ public ListIngestionJobsResponse listJobs(ListIngestionJobsRequest request) List ingestJobs = new ArrayList<>(); for (String jobId : matchingJobIds) { Job job = this.jobRepository.findById(jobId).get(); + // job that failed on start won't be converted toProto successfully + // and they're irrelevant here + if (job.getStatus() == JobStatus.ERROR) { + continue; + } ingestJobs.add(job.toProto()); } diff --git a/core/src/main/java/feast/core/service/SpecService.java b/core/src/main/java/feast/core/service/SpecService.java index 01cd264c76..9000ee983e 100644 --- a/core/src/main/java/feast/core/service/SpecService.java +++ b/core/src/main/java/feast/core/service/SpecService.java @@ -16,6 +16,7 @@ */ package feast.core.service; +import static feast.core.model.FeatureSet.parseReference; import static feast.core.validators.Matchers.checkValidCharacters; import static feast.core.validators.Matchers.checkValidCharactersAllowAsterisk; @@ -24,10 +25,7 @@ import feast.core.dao.ProjectRepository; import feast.core.dao.StoreRepository; import feast.core.exception.RetrievalException; -import feast.core.model.FeatureSet; -import feast.core.model.Project; -import feast.core.model.Source; -import feast.core.model.Store; +import feast.core.model.*; import feast.core.validators.FeatureSetValidator; import feast.proto.core.CoreServiceProto.ApplyFeatureSetResponse; import feast.proto.core.CoreServiceProto.ApplyFeatureSetResponse.Status; @@ -42,13 +40,19 @@ import feast.proto.core.CoreServiceProto.UpdateStoreResponse; import feast.proto.core.FeatureSetProto; import feast.proto.core.FeatureSetProto.FeatureSetStatus; +import feast.proto.core.IngestionJobProto; import feast.proto.core.SourceProto; import feast.proto.core.StoreProto; import feast.proto.core.StoreProto.Store.Subscription; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.util.Pair; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -60,21 +64,26 @@ @Service public class SpecService { + private final int SPEC_PUBLISHING_TIMEOUT_SECONDS = 5; + private final FeatureSetRepository featureSetRepository; private final ProjectRepository projectRepository; private final StoreRepository storeRepository; private final Source defaultSource; + private final KafkaTemplate specPublisher; @Autowired public SpecService( FeatureSetRepository featureSetRepository, StoreRepository storeRepository, ProjectRepository projectRepository, - Source defaultSource) { + Source defaultSource, + KafkaTemplate specPublisher) { this.featureSetRepository = featureSetRepository; this.storeRepository = storeRepository; this.projectRepository = projectRepository; this.defaultSource = defaultSource; + this.specPublisher = specPublisher; } /** @@ -240,6 +249,7 @@ public ListStoresResponse listStores(ListStoresRequest.Filter filter) { * * @param newFeatureSet Feature set that will be created or updated. */ + @Transactional public ApplyFeatureSetResponse applyFeatureSet(FeatureSetProto.FeatureSet newFeatureSet) throws InvalidProtocolBufferException { // Autofill default project if not specified @@ -299,6 +309,39 @@ public ApplyFeatureSetResponse applyFeatureSet(FeatureSetProto.FeatureSet newFea status = Status.UPDATED; } + featureSet.incVersion(); + + // Sending latest version of FeatureSet to all currently running IngestionJobs (there's one + // topic for all sets). + // All related jobs would apply new FeatureSet on the fly. + // We wait for Kafka broker to ack that the message was added to topic before actually + // committing this FeatureSet. + // In case kafka doesn't respond within SPEC_PUBLISHING_TIMEOUT_SECONDS we abort current + // transaction and return error to client. + try { + specPublisher + .sendDefault(featureSet.getReference(), featureSet.toProto().getSpec()) + .get(SPEC_PUBLISHING_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (Exception e) { + throw io.grpc.Status.UNAVAILABLE + .withDescription( + String.format( + "Unable to publish FeatureSet to Kafka. Cause: %s", + e.getCause() != null ? e.getCause().getMessage() : "unknown")) + .withCause(e) + .asRuntimeException(); + } + + // Updating delivery status for related jobs (that are currently using this FeatureSet). + // We now set status to IN_PROGRESS, so listenAckFromJobs would be able to + // monitor delivery progress for each new version. + featureSet.getJobStatuses().stream() + .filter(s -> s.getJob().isRunning()) + .forEach( + s -> + s.setDeliveryStatus( + FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS)); + // Persist the FeatureSet object featureSet.setStatus(FeatureSetStatus.STATUS_PENDING); project.addFeatureSet(featureSet); @@ -347,4 +390,60 @@ public UpdateStoreResponse updateStore(UpdateStoreRequest updateStoreRequest) .setStore(updateStoreRequest.getStore()) .build(); } + + /** + * Listener for ACK messages coming from IngestionJob when FeatureSetSpec is installed (in + * pipeline). + * + *

Updates FeatureSetJobStatus for respected FeatureSet (selected by reference) and Job (select + * by Id). + * + *

When all related (running) to FeatureSet jobs are updated - FeatureSet receives READY status + * + * @param record ConsumerRecord with key: FeatureSet reference and value: Ack message + */ + @KafkaListener(topics = {"${feast.stream.specsOptions.specsAckTopic}"}) + @Transactional + public void listenAckFromJobs( + ConsumerRecord record) { + String setReference = record.key(); + Pair projectAndSetName = parseReference(setReference); + FeatureSet featureSet = + featureSetRepository.findFeatureSetByNameAndProject_Name( + projectAndSetName.getSecond(), projectAndSetName.getFirst()); + if (featureSet == null) { + log.warn( + String.format("ACKListener received message for unknown FeatureSet %s", setReference)); + return; + } + + if (featureSet.getVersion() != record.value().getFeatureSetVersion()) { + log.warn( + String.format( + "ACKListener received outdated ack for %s. Current %d, Received %d", + setReference, featureSet.getVersion(), record.value().getFeatureSetVersion())); + return; + } + + featureSet.getJobStatuses().stream() + .filter(js -> js.getJob().getId().equals(record.value().getJobName())) + .findFirst() + .ifPresent( + featureSetJobStatus -> + featureSetJobStatus.setDeliveryStatus( + FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED)); + + boolean allDelivered = + featureSet.getJobStatuses().stream() + .filter(js -> js.getJob().isRunning()) + .allMatch( + js -> + js.getDeliveryStatus() + .equals(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED)); + + if (allDelivered) { + featureSet.setStatus(FeatureSetStatus.STATUS_READY); + featureSetRepository.saveAndFlush(featureSet); + } + } } diff --git a/core/src/main/java/feast/core/util/KafkaSerialization.java b/core/src/main/java/feast/core/util/KafkaSerialization.java new file mode 100644 index 0000000000..881d8eb9cf --- /dev/null +++ b/core/src/main/java/feast/core/util/KafkaSerialization.java @@ -0,0 +1,68 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.util; + +import com.google.protobuf.GeneratedMessageV3; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.Parser; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; + +/* +Serializer & Deserializer implementation to write & read protobuf object from/to kafka + */ +public class KafkaSerialization { + public static class ProtoSerializer implements Serializer { + @Override + public byte[] serialize(String topic, T data) { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + try { + data.writeTo(stream); + } catch (IOException e) { + throw new RuntimeException( + String.format( + "Unable to serialize object of type %s. Reason: %s", + data.getClass().getName(), e.getCause().getMessage())); + } + + return stream.toByteArray(); + } + } + + public static class ProtoDeserializer implements Deserializer { + private Parser parser; + + public ProtoDeserializer(Parser parser) { + this.parser = parser; + } + + @Override + public T deserialize(String topic, byte[] data) { + try { + return parser.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException( + String.format( + "Unable to deserialize object from topic %s. Reason: %s", + topic, e.getCause().getMessage())); + } + } + } +} diff --git a/core/src/main/resources/application-override.yaml b/core/src/main/resources/application-override.yaml deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/core/src/main/resources/application.yml b/core/src/main/resources/application.yml index 5dc5698e88..9f02ed302d 100644 --- a/core/src/main/resources/application.yml +++ b/core/src/main/resources/application.yml @@ -78,6 +78,10 @@ feast: replicationFactor: 1 partitions: 1 + specsOptions: + specsTopic: feast-specs + specsAckTopic: feast-specs-ack + spring: jpa: properties.hibernate: diff --git a/core/src/test/java/feast/core/job/JobUpdateTaskTest.java b/core/src/test/java/feast/core/job/JobUpdateTaskTest.java index d182673801..fc633ba15f 100644 --- a/core/src/test/java/feast/core/job/JobUpdateTaskTest.java +++ b/core/src/test/java/feast/core/job/JobUpdateTaskTest.java @@ -24,11 +24,8 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; -import feast.core.model.FeatureSet; -import feast.core.model.Job; -import feast.core.model.JobStatus; -import feast.core.model.Source; -import feast.core.model.Store; +import feast.core.model.*; +import feast.core.util.ModelHelpers; import feast.proto.core.FeatureSetProto; import feast.proto.core.FeatureSetProto.FeatureSetMeta; import feast.proto.core.FeatureSetProto.FeatureSetSpec; @@ -92,7 +89,14 @@ public void setUp() { } Job makeJob(String extId, List featureSets, JobStatus status) { - return new Job("job", extId, RUNNER, source, store, featureSets, status); + return new Job( + "job", + extId, + RUNNER, + source, + store, + ModelHelpers.makeFeatureSetJobStatus(featureSets), + status); } JobUpdateTask makeTask(List featureSets, Optional currentJob) { diff --git a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java index ea9caa91ff..5081288202 100644 --- a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java +++ b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java @@ -26,20 +26,19 @@ import com.google.api.client.googleapis.testing.auth.oauth2.MockGoogleCredential; import com.google.api.services.dataflow.Dataflow; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.protobuf.Duration; import com.google.protobuf.util.JsonFormat; import com.google.protobuf.util.JsonFormat.Printer; import feast.core.config.FeastProperties.MetricsProperties; import feast.core.exception.JobExecutionException; import feast.core.job.Runner; -import feast.core.job.option.FeatureSetJsonByteConverter; import feast.core.model.*; -import feast.ingestion.options.BZip2Compressor; import feast.ingestion.options.ImportOptions; -import feast.ingestion.options.OptionCompressor; import feast.proto.core.FeatureSetProto; import feast.proto.core.FeatureSetProto.FeatureSetMeta; import feast.proto.core.FeatureSetProto.FeatureSetSpec; +import feast.proto.core.IngestionJobProto; import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions; import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions.Builder; import feast.proto.core.SourceProto; @@ -50,8 +49,6 @@ import feast.proto.core.StoreProto.Store.StoreType; import feast.proto.core.StoreProto.Store.Subscription; import java.io.IOException; -import java.util.Collections; -import java.util.List; import org.apache.beam.runners.dataflow.DataflowPipelineJob; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.sdk.PipelineResult.State; @@ -71,6 +68,7 @@ public class DataflowJobManagerTest { @Mock private Dataflow dataflow; private DataflowRunnerConfigOptions defaults; + private IngestionJobProto.SpecsStreamingUpdateConfig specsStreamingUpdateConfig; private DataflowJobManager dfJobManager; @Before @@ -94,7 +92,17 @@ public void setUp() { e.printStackTrace(); } - dfJobManager = new DataflowJobManager(defaults, metricsProperties, credential); + specsStreamingUpdateConfig = + IngestionJobProto.SpecsStreamingUpdateConfig.newBuilder() + .setSource( + KafkaSourceConfig.newBuilder() + .setTopic("specs_topic") + .setBootstrapServers("servers:9092") + .build()) + .build(); + + dfJobManager = + new DataflowJobManager(defaults, metricsProperties, specsStreamingUpdateConfig, credential); dfJobManager = spy(dfJobManager); } @@ -142,11 +150,7 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException { expectedPipelineOptions.setLabels(defaults.getLabelsMap()); expectedPipelineOptions.setJobName(jobName); expectedPipelineOptions.setStoreJson(Lists.newArrayList(printer.print(store))); - - OptionCompressor> featureSetJsonCompressor = - new BZip2Compressor<>(new FeatureSetJsonByteConverter()); - expectedPipelineOptions.setFeatureSetJson( - featureSetJsonCompressor.compress(Collections.singletonList(featureSet))); + expectedPipelineOptions.setSourceJson(printer.print(source)); ArgumentCaptor captor = ArgumentCaptor.forClass(ImportOptions.class); @@ -155,6 +159,10 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException { when(mockPipelineResult.getJobId()).thenReturn(expectedExtJobId); doReturn(mockPipelineResult).when(dfJobManager).runPipeline(any()); + + FeatureSetJobStatus featureSetJobStatus = new FeatureSetJobStatus(); + featureSetJobStatus.setFeatureSet(FeatureSet.fromProto(featureSet)); + Job job = new Job( jobName, @@ -162,7 +170,7 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException { Runner.DATAFLOW, Source.fromProto(source), Store.fromProto(store), - Lists.newArrayList(FeatureSet.fromProto(featureSet)), + Sets.newHashSet(featureSetJobStatus), JobStatus.PENDING); Job actual = dfJobManager.startJob(job); @@ -184,9 +192,6 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException { // Assume the files that are staged are correct expectedPipelineOptions.setFilesToStage(actualPipelineOptions.getFilesToStage()); - assertThat( - actualPipelineOptions.getFeatureSetJson(), - equalTo(expectedPipelineOptions.getFeatureSetJson())); assertThat( actualPipelineOptions.getDeadLetterTableSpec(), equalTo(expectedPipelineOptions.getDeadLetterTableSpec())); @@ -197,6 +202,11 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException { equalTo(expectedPipelineOptions.getMetricsExporterType())); assertThat( actualPipelineOptions.getStoreJson(), equalTo(expectedPipelineOptions.getStoreJson())); + assertThat( + actualPipelineOptions.getSourceJson(), equalTo(expectedPipelineOptions.getSourceJson())); + assertThat( + actualPipelineOptions.getSpecsStreamingUpdateConfigJson(), + equalTo(printer.print(specsStreamingUpdateConfig))); assertThat(actual.getExtId(), equalTo(expectedExtJobId)); } @@ -231,6 +241,9 @@ public void shouldThrowExceptionWhenJobStateTerminal() throws IOException { doReturn(mockPipelineResult).when(dfJobManager).runPipeline(any()); + FeatureSetJobStatus featureSetJobStatus = new FeatureSetJobStatus(); + featureSetJobStatus.setFeatureSet(FeatureSet.fromProto(featureSet)); + Job job = new Job( "job", @@ -238,7 +251,7 @@ public void shouldThrowExceptionWhenJobStateTerminal() throws IOException { Runner.DATAFLOW, Source.fromProto(source), Store.fromProto(store), - Lists.newArrayList(FeatureSet.fromProto(featureSet)), + Sets.newHashSet(featureSetJobStatus), JobStatus.PENDING); expectedException.expect(JobExecutionException.class); diff --git a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java index 0128f5aa0b..0e797dc64d 100644 --- a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java +++ b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java @@ -16,6 +16,7 @@ */ package feast.core.job.direct; +import static feast.core.util.ModelHelpers.makeFeatureSetJobStatus; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; @@ -31,17 +32,15 @@ import com.google.protobuf.util.JsonFormat.Printer; import feast.core.config.FeastProperties.MetricsProperties; import feast.core.job.Runner; -import feast.core.job.option.FeatureSetJsonByteConverter; import feast.core.model.FeatureSet; import feast.core.model.Job; import feast.core.model.JobStatus; import feast.core.model.Source; import feast.core.model.Store; -import feast.ingestion.options.BZip2Compressor; import feast.ingestion.options.ImportOptions; -import feast.ingestion.options.OptionCompressor; import feast.proto.core.FeatureSetProto; import feast.proto.core.FeatureSetProto.FeatureSetSpec; +import feast.proto.core.IngestionJobProto; import feast.proto.core.RunnerProto.DirectRunnerConfigOptions; import feast.proto.core.SourceProto; import feast.proto.core.SourceProto.KafkaSourceConfig; @@ -51,8 +50,6 @@ import feast.proto.core.StoreProto.Store.StoreType; import feast.proto.core.StoreProto.Store.Subscription; import java.io.IOException; -import java.util.Collections; -import java.util.List; import org.apache.beam.runners.direct.DirectRunner; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -71,6 +68,7 @@ public class DirectRunnerJobManagerTest { private DirectRunnerJobManager drJobManager; private DirectRunnerConfigOptions defaults; + private IngestionJobProto.SpecsStreamingUpdateConfig specsStreamingUpdateConfig; @Before public void setUp() { @@ -79,7 +77,18 @@ public void setUp() { MetricsProperties metricsProperties = new MetricsProperties(); metricsProperties.setEnabled(false); - drJobManager = new DirectRunnerJobManager(defaults, directJobRegistry, metricsProperties); + specsStreamingUpdateConfig = + IngestionJobProto.SpecsStreamingUpdateConfig.newBuilder() + .setSource( + KafkaSourceConfig.newBuilder() + .setTopic("specs_topic") + .setBootstrapServers("servers:9092") + .build()) + .build(); + + drJobManager = + new DirectRunnerJobManager( + defaults, directJobRegistry, metricsProperties, specsStreamingUpdateConfig); drJobManager = Mockito.spy(drJobManager); } @@ -125,11 +134,7 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException { expectedPipelineOptions.setTargetParallelism(1); expectedPipelineOptions.setStoreJson(Lists.newArrayList(printer.print(store))); expectedPipelineOptions.setProject(""); - - OptionCompressor> featureSetJsonCompressor = - new BZip2Compressor<>(new FeatureSetJsonByteConverter()); - expectedPipelineOptions.setFeatureSetJson( - featureSetJsonCompressor.compress(Collections.singletonList(featureSet))); + expectedPipelineOptions.setSourceJson(printer.print(source)); ArgumentCaptor pipelineOptionsCaptor = ArgumentCaptor.forClass(ImportOptions.class); @@ -145,7 +150,7 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException { Runner.DIRECT, Source.fromProto(source), Store.fromProto(store), - Lists.newArrayList(FeatureSet.fromProto(featureSet)), + makeFeatureSetJobStatus(FeatureSet.fromProto(featureSet)), JobStatus.PENDING); Job actual = drJobManager.startJob(job); verify(drJobManager, times(1)).runPipeline(pipelineOptionsCaptor.capture()); @@ -156,9 +161,6 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException { expectedPipelineOptions.setOptionsId( actualPipelineOptions.getOptionsId()); // avoid comparing this value - assertThat( - actualPipelineOptions.getFeatureSetJson(), - equalTo(expectedPipelineOptions.getFeatureSetJson())); assertThat( actualPipelineOptions.getDeadLetterTableSpec(), equalTo(expectedPipelineOptions.getDeadLetterTableSpec())); @@ -169,6 +171,11 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException { equalTo(expectedPipelineOptions.getMetricsExporterType())); assertThat( actualPipelineOptions.getStoreJson(), equalTo(expectedPipelineOptions.getStoreJson())); + assertThat( + actualPipelineOptions.getSourceJson(), equalTo(expectedPipelineOptions.getSourceJson())); + assertThat( + actualPipelineOptions.getSpecsStreamingUpdateConfigJson(), + equalTo(printer.print(specsStreamingUpdateConfig))); assertThat(jobStarted.getPipelineResult(), equalTo(mockPipelineResult)); assertThat(jobStarted.getJobId(), equalTo(expectedJobId)); diff --git a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java index 8386efb28f..caec99f228 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java @@ -34,9 +34,8 @@ import feast.core.job.JobManager; import feast.core.job.JobMatcher; import feast.core.job.Runner; -import feast.core.model.FeatureSet; -import feast.core.model.Job; -import feast.core.model.JobStatus; +import feast.core.model.*; +import feast.core.util.ModelHelpers; import feast.proto.core.CoreServiceProto.ListFeatureSetsRequest.Filter; import feast.proto.core.CoreServiceProto.ListFeatureSetsResponse; import feast.proto.core.CoreServiceProto.ListStoresResponse; @@ -50,7 +49,6 @@ import feast.proto.core.StoreProto.Store.RedisConfig; import feast.proto.core.StoreProto.Store.StoreType; import feast.proto.core.StoreProto.Store.Subscription; -import java.util.Arrays; import java.util.Collections; import java.util.List; import org.junit.Before; @@ -159,7 +157,7 @@ public void shouldGenerateAndSubmitJobsIfAny() throws InvalidProtocolBufferExcep Runner.DATAFLOW, feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), - Arrays.asList(featureSet1, featureSet2), + ModelHelpers.makeFeatureSetJobStatus(featureSet1, featureSet2), JobStatus.PENDING); Job expected = @@ -169,7 +167,7 @@ public void shouldGenerateAndSubmitJobsIfAny() throws InvalidProtocolBufferExcep Runner.DATAFLOW, feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), - Arrays.asList(featureSet1, featureSet2), + ModelHelpers.makeFeatureSetJobStatus(featureSet1, featureSet2), JobStatus.RUNNING); when(featureSetRepository.findAllByNameLikeAndProject_NameLikeOrderByNameAsc("%", "project1")) @@ -246,7 +244,7 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { Runner.DATAFLOW, feast.core.model.Source.fromProto(source1), feast.core.model.Store.fromProto(store), - Arrays.asList(featureSet1), + ModelHelpers.makeFeatureSetJobStatus(featureSet1), JobStatus.PENDING); Job expected1 = @@ -256,7 +254,7 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { Runner.DATAFLOW, feast.core.model.Source.fromProto(source1), feast.core.model.Store.fromProto(store), - Arrays.asList(featureSet1), + ModelHelpers.makeFeatureSetJobStatus(featureSet1), JobStatus.RUNNING); Job expectedInput2 = @@ -266,7 +264,7 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { Runner.DATAFLOW, feast.core.model.Source.fromProto(source2), feast.core.model.Store.fromProto(store), - Arrays.asList(featureSet2), + ModelHelpers.makeFeatureSetJobStatus(featureSet2), JobStatus.PENDING); Job expected2 = @@ -276,7 +274,7 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { Runner.DATAFLOW, feast.core.model.Source.fromProto(source2), feast.core.model.Store.fromProto(store), - Arrays.asList(featureSet2), + ModelHelpers.makeFeatureSetJobStatus(featureSet2), JobStatus.RUNNING); ArgumentCaptor> jobArgCaptor = ArgumentCaptor.forClass(List.class); diff --git a/core/src/test/java/feast/core/service/JobServiceTest.java b/core/src/test/java/feast/core/service/JobServiceTest.java index ff056287f9..27509620ff 100644 --- a/core/src/test/java/feast/core/service/JobServiceTest.java +++ b/core/src/test/java/feast/core/service/JobServiceTest.java @@ -16,6 +16,7 @@ */ package feast.core.service; +import static feast.core.util.ModelHelpers.makeFeatureSetJobStatus; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.fail; @@ -25,6 +26,7 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; +import com.google.common.collect.Lists; import com.google.protobuf.InvalidProtocolBufferException; import feast.core.dao.JobRepository; import feast.core.job.JobManager; @@ -126,7 +128,8 @@ public void setupJobRepository() { when(this.jobRepository.findById(this.job.getId())).thenReturn(Optional.of(this.job)); when(this.jobRepository.findByStoreName(this.dataStore.getName())) .thenReturn(Arrays.asList(this.job)); - when(this.jobRepository.findByFeatureSetsIn(Arrays.asList(this.featureSet))) + when(this.jobRepository.findByFeatureSetJobStatusesIn( + Lists.newArrayList((this.featureSet.getJobStatuses())))) .thenReturn(Arrays.asList(this.job)); when(this.jobRepository.findAll()).thenReturn(Arrays.asList(this.job)); } @@ -155,7 +158,7 @@ private Job newDummyJob(String id, String extId, JobStatus status) { Runner.DATAFLOW, this.dataSource, this.dataStore, - Arrays.asList(this.featureSet), + makeFeatureSetJobStatus(this.featureSet), status); } diff --git a/core/src/test/java/feast/core/service/SpecServiceTest.java b/core/src/test/java/feast/core/service/SpecServiceTest.java index b5fd03fc7f..9ec219b6c9 100644 --- a/core/src/test/java/feast/core/service/SpecServiceTest.java +++ b/core/src/test/java/feast/core/service/SpecServiceTest.java @@ -16,12 +16,13 @@ */ package feast.core.service; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; import static org.mockito.MockitoAnnotations.initMocks; import com.google.api.client.util.Lists; @@ -45,16 +46,21 @@ import feast.proto.core.FeatureSetProto.EntitySpec; import feast.proto.core.FeatureSetProto.FeatureSetSpec; import feast.proto.core.FeatureSetProto.FeatureSpec; +import feast.proto.core.IngestionJobProto; import feast.proto.core.StoreProto; import feast.proto.core.StoreProto.Store.RedisConfig; import feast.proto.core.StoreProto.Store.StoreType; import feast.proto.core.StoreProto.Store.Subscription; import feast.proto.types.ValueProto.ValueType.Enum; +import io.grpc.StatusRuntimeException; import java.sql.Date; import java.time.Instant; import java.util.*; import java.util.Map.Entry; +import java.util.concurrent.CancellationException; import java.util.stream.Collectors; +import lombok.SneakyThrows; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -62,6 +68,8 @@ import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.Mock; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.scheduling.annotation.AsyncResult; import org.tensorflow.metadata.v0.BoolDomain; import org.tensorflow.metadata.v0.FeaturePresence; import org.tensorflow.metadata.v0.FeaturePresenceWithinGroup; @@ -86,6 +94,8 @@ public class SpecServiceTest { @Mock private ProjectRepository projectRepository; + @Mock private KafkaTemplate kafkaTemplate; + @Rule public final ExpectedException expectedException = ExpectedException.none(); private SpecService specService; @@ -143,8 +153,11 @@ public void setUp() { when(storeRepository.findById("SERVING")).thenReturn(Optional.of(store1)); when(storeRepository.findById("NOTFOUND")).thenReturn(Optional.empty()); + when(kafkaTemplate.sendDefault(any(), any())).thenReturn(new AsyncResult<>(null)); + specService = - new SpecService(featureSetRepository, storeRepository, projectRepository, defaultSource); + new SpecService( + featureSetRepository, storeRepository, projectRepository, defaultSource, kafkaTemplate); } @Test @@ -284,6 +297,7 @@ public void applyFeatureSetShouldApplyFeatureSetIfNotExists() .build(); assertThat(applyFeatureSetResponse.getStatus(), equalTo(Status.CREATED)); assertThat(applyFeatureSetResponse.getFeatureSet().getSpec(), equalTo(expected.getSpec())); + assertThat(applyFeatureSetResponse.getFeatureSet().getSpec().getVersion(), equalTo(1)); } @Test @@ -308,7 +322,12 @@ public void applyFeatureSetShouldUpdateAndSaveFeatureSetIfAlreadyExists() .toBuilder() .setMeta(incomingFeatureSet.getMeta().toBuilder().build()) .setSpec( - incomingFeatureSet.getSpec().toBuilder().setSource(defaultSource.toProto()).build()) + incomingFeatureSet + .getSpec() + .toBuilder() + .setVersion(2) + .setSource(defaultSource.toProto()) + .build()) .build(); ApplyFeatureSetResponse applyFeatureSetResponse = @@ -318,6 +337,74 @@ public void applyFeatureSetShouldUpdateAndSaveFeatureSetIfAlreadyExists() assertEquals( FeatureSet.fromProto(applyFeatureSetResponse.getFeatureSet()), FeatureSet.fromProto(expected)); + + assertThat(applyFeatureSetResponse.getFeatureSet().getSpec().getVersion(), equalTo(2)); + verify(kafkaTemplate) + .sendDefault(eq(featureSets.get(0).getReference()), any(FeatureSetSpec.class)); + } + + @Test + @SneakyThrows + public void applyFeatureSetShouldNotWorkWithoutKafkaAck() { + FeatureSet fsInTest = featureSets.get(1); + FeatureSetProto.FeatureSet incomingFeatureSet = fsInTest.toProto(); + CancellationException exc = new CancellationException(); + when(kafkaTemplate.sendDefault(eq(fsInTest.getReference()), any()).get()).thenThrow(exc); + + incomingFeatureSet = + incomingFeatureSet + .toBuilder() + .setMeta(incomingFeatureSet.getMeta()) + .setSpec( + incomingFeatureSet + .getSpec() + .toBuilder() + .addFeatures( + FeatureSpec.newBuilder().setName("feature2").setValueType(Enum.STRING)) + .build()) + .build(); + + expectedException.expect(StatusRuntimeException.class); + specService.applyFeatureSet(incomingFeatureSet); + verify(featureSetRepository, never()).saveAndFlush(ArgumentMatchers.any(FeatureSet.class)); + } + + @Test + @SneakyThrows + public void applyFeatureSetShouldUpdateDeliveryStatuses() { + FeatureSet fsInTest = featureSets.get(1); + FeatureSetJobStatus j1 = + newJob( + fsInTest, + JobStatus.RUNNING, + FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED); + FeatureSetJobStatus j2 = + newJob( + fsInTest, + JobStatus.ABORTED, + FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED); + + fsInTest.getJobStatuses().addAll(Arrays.asList(j1, j2)); + + FeatureSetProto.FeatureSet incomingFeatureSet = fsInTest.toProto(); + incomingFeatureSet = + incomingFeatureSet + .toBuilder() + .setMeta(incomingFeatureSet.getMeta()) + .setSpec( + incomingFeatureSet + .getSpec() + .toBuilder() + .addFeatures( + FeatureSpec.newBuilder().setName("feature2").setValueType(Enum.STRING)) + .build()) + .build(); + + specService.applyFeatureSet(incomingFeatureSet); + assertThat( + j1.getDeliveryStatus(), is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS)); + assertThat( + j2.getDeliveryStatus(), is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED)); } @Test @@ -681,6 +768,70 @@ public void getOrListFeatureSetShouldUseDefaultProjectIfProjectUnspecified() assertThat(listResponse.getFeatureSetsList(), equalTo(Arrays.asList(expected.toProto()))); } + @Test + public void specAckListenerShouldDoNothingWhenMessageIsOutdated() { + FeatureSet fsInTest = featureSets.get(1); + FeatureSetJobStatus j1 = + newJob( + fsInTest, + JobStatus.RUNNING, + FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); + FeatureSetJobStatus j2 = + newJob( + fsInTest, + JobStatus.RUNNING, + FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); + + fsInTest.getJobStatuses().addAll(Arrays.asList(j1, j2)); + + specService.listenAckFromJobs(newAckMessage("project/invalid", 0, j1.getJob().getId())); + specService.listenAckFromJobs(newAckMessage(fsInTest.getReference(), 0, "")); + specService.listenAckFromJobs(newAckMessage(fsInTest.getReference(), -1, j1.getJob().getId())); + + assertThat( + j1.getDeliveryStatus(), is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS)); + assertThat( + j2.getDeliveryStatus(), is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS)); + } + + @Test + public void specAckListenerShouldUpdateFeatureSetStatus() { + FeatureSet fsInTest = featureSets.get(1); + fsInTest.setStatus(FeatureSetProto.FeatureSetStatus.STATUS_PENDING); + + FeatureSetJobStatus j1 = + newJob( + fsInTest, + JobStatus.RUNNING, + FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); + FeatureSetJobStatus j2 = + newJob( + fsInTest, + JobStatus.RUNNING, + FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); + FeatureSetJobStatus j3 = + newJob( + fsInTest, + JobStatus.ABORTED, + FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); + + fsInTest.getJobStatuses().addAll(Arrays.asList(j1, j2, j3)); + + specService.listenAckFromJobs( + newAckMessage(fsInTest.getReference(), fsInTest.getVersion(), j1.getJob().getId())); + + assertThat( + j1.getDeliveryStatus(), is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED)); + assertThat(fsInTest.getStatus(), is(FeatureSetProto.FeatureSetStatus.STATUS_PENDING)); + + specService.listenAckFromJobs( + newAckMessage(fsInTest.getReference(), fsInTest.getVersion(), j2.getJob().getId())); + + assertThat( + j2.getDeliveryStatus(), is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED)); + assertThat(fsInTest.getStatus(), is(FeatureSetProto.FeatureSetStatus.STATUS_READY)); + } + private FeatureSet newDummyFeatureSet(String name, String project) { FeatureSpec f1 = FeatureSpec.newBuilder() @@ -698,6 +849,33 @@ private FeatureSet newDummyFeatureSet(String name, String project) { return fs; } + private FeatureSetJobStatus newJob( + FeatureSet fs, JobStatus status, FeatureSetProto.FeatureSetJobDeliveryStatus deliveryStatus) { + Job job = new Job(); + job.setStatus(status); + job.setId(UUID.randomUUID().toString()); + + FeatureSetJobStatus featureSetJobStatus = new FeatureSetJobStatus(); + featureSetJobStatus.setJob(job); + featureSetJobStatus.setFeatureSet(fs); + featureSetJobStatus.setDeliveryStatus(deliveryStatus); + + return featureSetJobStatus; + } + + private ConsumerRecord newAckMessage( + String key, int version, String jobName) { + return new ConsumerRecord<>( + "topic", + 0, + 0, + key, + IngestionJobProto.FeatureSetSpecAck.newBuilder() + .setFeatureSetVersion(version) + .setJobName(jobName) + .build()); + } + private Store newDummyStore(String name) { // Add type to this method when we enable filtering by type Store store = new Store(); diff --git a/core/src/test/java/feast/core/service/TestObjectFactory.java b/core/src/test/java/feast/core/service/TestObjectFactory.java index 40c379d3cf..a498d6c21c 100644 --- a/core/src/test/java/feast/core/service/TestObjectFactory.java +++ b/core/src/test/java/feast/core/service/TestObjectFactory.java @@ -39,15 +39,18 @@ public class TestObjectFactory { public static FeatureSet CreateFeatureSet( String name, String project, List entities, List features) { - return new FeatureSet( - name, - project, - 100L, - entities, - features, - defaultSource, - new HashMap<>(), - FeatureSetProto.FeatureSetStatus.STATUS_READY); + FeatureSet fs = + new FeatureSet( + name, + project, + 100L, + entities, + features, + defaultSource, + new HashMap<>(), + FeatureSetProto.FeatureSetStatus.STATUS_READY); + fs.setVersion(1); + return fs; } public static Feature CreateFeature(String name, ValueProto.ValueType.Enum valueType) { diff --git a/core/src/test/java/feast/core/util/ModelHelpers.java b/core/src/test/java/feast/core/util/ModelHelpers.java new file mode 100644 index 0000000000..f864a5562d --- /dev/null +++ b/core/src/test/java/feast/core/util/ModelHelpers.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.util; + +import feast.core.model.FeatureSet; +import feast.core.model.FeatureSetJobStatus; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class ModelHelpers { + public static Set makeFeatureSetJobStatus(FeatureSet... featureSets) { + return Stream.of(featureSets) + .map( + fs -> { + FeatureSetJobStatus s = new FeatureSetJobStatus(); + s.setFeatureSet(fs); + return s; + }) + .collect(Collectors.toSet()); + } + + public static Set makeFeatureSetJobStatus(List featureSets) { + return makeFeatureSetJobStatus(featureSets.toArray(FeatureSet[]::new)); + } +} diff --git a/infra/scripts/test-end-to-end-batch-dataflow.sh b/infra/scripts/test-end-to-end-batch-dataflow.sh index 9ad2918557..e138a75bde 100755 --- a/infra/scripts/test-end-to-end-batch-dataflow.sh +++ b/infra/scripts/test-end-to-end-batch-dataflow.sh @@ -14,6 +14,8 @@ test -z ${K8_CLUSTER_NAME} && K8_CLUSTER_NAME="feast-e2e-dataflow" test -z ${HELM_RELEASE_NAME} && HELM_RELEASE_NAME="pr-$PULL_NUMBER" test -z ${HELM_COMMON_NAME} && HELM_COMMON_NAME="deps" test -z ${DATASET_NAME} && DATASET_NAME=feast_e2e_$(date +%s) +test -z ${SPECS_TOPIC} && SPECS_TOPIC=feast-specs-$(date +%s) + feast_kafka_1_ip_name="feast-kafka-1" feast_kafka_2_ip_name="feast-kafka-2" @@ -209,8 +211,9 @@ export GCLOUD_SUBNET=$GCLOUD_SUBNET export GCLOUD_REGION=$GCLOUD_REGION export HELM_COMMON_NAME=$HELM_COMMON_NAME export IMAGE_TAG=${PULL_PULL_SHA:1} +export SPECS_TOPIC=$SPECS_TOPIC -envsubst $'$TEMP_BUCKET $DATASET_NAME $GCLOUD_PROJECT $GCLOUD_NETWORK \ +envsubst $'$TEMP_BUCKET $DATASET_NAME $GCLOUD_PROJECT $GCLOUD_NETWORK $SPECS_TOPIC \ $GCLOUD_SUBNET $GCLOUD_REGION $IMAGE_TAG $HELM_COMMON_NAME $feast_kafka_1_ip $feast_kafka_2_ip $feast_kafka_3_ip $feast_redis_ip $feast_statsd_ip' < $ORIGINAL_DIR/infra/scripts/test-templates/values-end-to-end-batch-dataflow.yaml > $ORIGINAL_DIR/infra/charts/feast/values-end-to-end-batch-dataflow-updated.yaml diff --git a/infra/scripts/test-templates/values-end-to-end-batch-dataflow.yaml b/infra/scripts/test-templates/values-end-to-end-batch-dataflow.yaml index 345d523418..48231face6 100644 --- a/infra/scripts/test-templates/values-end-to-end-batch-dataflow.yaml +++ b/infra/scripts/test-templates/values-end-to-end-batch-dataflow.yaml @@ -15,6 +15,9 @@ feast-core: stream: options: bootstrapServers: $feast_kafka_1_ip:31090 + specsOptions: + specsTopic: $SPECS_TOPIC + specsAckTopic: $SPECS_TOPIC-ack jobs: active_runner: dataflow diff --git a/ingestion/pom.xml b/ingestion/pom.xml index 36d5cd6269..3eadcdca25 100644 --- a/ingestion/pom.xml +++ b/ingestion/pom.xml @@ -230,6 +230,13 @@ test + + org.slf4j + slf4j-simple + 1.7.30 + test + + com.google.guava guava diff --git a/ingestion/src/main/java/feast/ingestion/ImportJob.java b/ingestion/src/main/java/feast/ingestion/ImportJob.java index 5cef8e5744..8dee52e08a 100644 --- a/ingestion/src/main/java/feast/ingestion/ImportJob.java +++ b/ingestion/src/main/java/feast/ingestion/ImportJob.java @@ -16,20 +16,20 @@ */ package feast.ingestion; -import static feast.ingestion.utils.SpecUtil.getFeatureSetReference; import static feast.ingestion.utils.StoreUtil.getFeatureSink; import com.google.protobuf.InvalidProtocolBufferException; -import feast.ingestion.options.BZip2Decompressor; import feast.ingestion.options.ImportOptions; -import feast.ingestion.options.StringListStreamConverter; +import feast.ingestion.transform.FeatureRowToStoreAllocator; import feast.ingestion.transform.ProcessAndValidateFeatureRows; import feast.ingestion.transform.ReadFromSource; import feast.ingestion.transform.metrics.WriteFailureMetricsTransform; import feast.ingestion.transform.metrics.WriteSuccessMetricsTransform; +import feast.ingestion.transform.specs.ReadFeatureSetSpecs; +import feast.ingestion.transform.specs.WriteFeatureSetSpecAck; import feast.ingestion.utils.SpecUtil; -import feast.proto.core.FeatureSetProto.FeatureSet; import feast.proto.core.FeatureSetProto.FeatureSetSpec; +import feast.proto.core.IngestionJobProto.SpecsStreamingUpdateConfig; import feast.proto.core.SourceProto.Source; import feast.proto.core.StoreProto.Store; import feast.proto.types.FeatureRowProto.FeatureRow; @@ -39,15 +39,16 @@ import feast.storage.api.writer.WriteResult; import feast.storage.connectors.bigquery.writer.BigQueryDeadletterSink; import java.io.IOException; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.transforms.*; import org.apache.beam.sdk.values.*; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; public class ImportJob { @@ -73,11 +74,14 @@ public static void main(String[] args) throws IOException { public static PipelineResult runPipeline(ImportOptions options) throws IOException { /* * Steps: - * 1. Read messages from Feast Source as FeatureRow - * 2. Validate the feature rows to ensure the schema matches what is registered to the system - * 3. Write FeatureRow to the corresponding Store - * 4. Write elements that failed to be processed to a dead letter queue. - * 5. Write metrics to a metrics sink + * 1. Read FeatureSetSpec messages from kafka + * 2. Read messages from Feast Source as FeatureRow + * 3. Validate the feature rows to ensure the schema matches what is registered to the system + * 4. Distribute rows across stores by subscription + * 5. Write FeatureRow to the corresponding Store + * 6. Write elements that failed to be processed to a dead letter queue. + * 7. Write metrics to a metrics sink + * 8. Send ack on receiving FeatureSetSpec */ PipelineOptionsValidator.validate(ImportOptions.class, options); @@ -85,75 +89,71 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti log.info("Starting import job with settings: \n{}", options.toString()); - BZip2Decompressor> decompressor = - new BZip2Decompressor<>(new StringListStreamConverter()); - List featureSetJson = decompressor.decompress(options.getFeatureSetJson()); - List featureSets = SpecUtil.parseFeatureSetSpecJsonList(featureSetJson); List stores = SpecUtil.parseStoreJsonList(options.getStoreJson()); + Source source = SpecUtil.parseSourceJson(options.getSourceJson()); + SpecsStreamingUpdateConfig specsStreamingUpdateConfig = + SpecUtil.parseSpecsStreamingUpdateConfig(options.getSpecsStreamingUpdateConfigJson()); + + // Step 1. Read FeatureSetSpecs from Spec source + PCollection> featureSetSpecs = + pipeline.apply( + "ReadFeatureSetSpecs", + ReadFeatureSetSpecs.newBuilder() + .setSource(source) + .setStores(stores) + .setSpecsStreamingUpdateConfig(specsStreamingUpdateConfig) + .build()); + + PCollectionView>> globalSpecView = + featureSetSpecs.apply("GlobalSpecView", View.asMultimap()); + + // Step 2. Read messages from Feast Source as FeatureRow. + PCollectionTuple convertedFeatureRows = + pipeline.apply( + "ReadFeatureRowFromSource", + ReadFromSource.newBuilder() + .setSource(source) + .setSuccessTag(FEATURE_ROW_OUT) + .setFailureTag(DEADLETTER_OUT) + .build()); + + // Step 3. Process and validate incoming FeatureRows + PCollectionTuple validatedRows = + convertedFeatureRows + .get(FEATURE_ROW_OUT) + .apply( + ProcessAndValidateFeatureRows.newBuilder() + .setDefaultProject(options.getDefaultFeastProject()) + .setFeatureSetSpecs(globalSpecView) + .setSuccessTag(FEATURE_ROW_OUT) + .setFailureTag(DEADLETTER_OUT) + .build()); + + Map> storeTags = + stores.stream() + .map(s -> Pair.of(s, new TupleTag())) + .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); + + // Step 4. Allocate validated rows to stores by store subscription + PCollectionTuple storeAllocatedRows = + validatedRows + .get(FEATURE_ROW_OUT) + .apply( + FeatureRowToStoreAllocator.newBuilder() + .setStores(stores) + .setStoreTags(storeTags) + .build()); for (Store store : stores) { - List subscribedFeatureSets = - SpecUtil.getSubscribedFeatureSets(store.getSubscriptionsList(), featureSets); - - // Generate tags by key - Map featureSetSpecsByKey = new HashMap<>(); - subscribedFeatureSets.stream() - .forEach( - fs -> { - String ref = getFeatureSetReference(fs.getSpec()); - featureSetSpecsByKey.put(ref, fs.getSpec()); - }); - - PCollection> staticSpecs = - pipeline - .apply(Create.of("")) - .apply( - "TemporarySpecSource", - ParDo.of( - new DoFn>() { - @ProcessElement - public void process(ProcessContext c) { - featureSetSpecsByKey.forEach((key, value) -> c.output(KV.of(key, value))); - } - })); - - FeatureSink featureSink = getFeatureSink(store, staticSpecs); - - // TODO: make the source part of the job initialisation options - Source source = subscribedFeatureSets.get(0).getSpec().getSource(); - - for (FeatureSet featureSet : subscribedFeatureSets) { - // Ensure Store has valid configuration and Feast can access it. - featureSink.prepareWrite(featureSet); - } + FeatureSink featureSink = getFeatureSink(store, featureSetSpecs); - // Step 1. Read messages from Feast Source as FeatureRow. - PCollectionTuple convertedFeatureRows = - pipeline.apply( - "ReadFeatureRowFromSource", - ReadFromSource.newBuilder() - .setSource(source) - .setSuccessTag(FEATURE_ROW_OUT) - .setFailureTag(DEADLETTER_OUT) - .build()); - - // Step 2. Process and validate incoming FeatureRows - PCollectionTuple validatedRows = - convertedFeatureRows - .get(FEATURE_ROW_OUT) - .apply( - ProcessAndValidateFeatureRows.newBuilder() - .setDefaultProject(options.getDefaultFeastProject()) - .setFeatureSetSpecs(featureSetSpecsByKey) - .setSuccessTag(FEATURE_ROW_OUT) - .setFailureTag(DEADLETTER_OUT) - .build()); - - // Step 3. Write FeatureRow to the corresponding Store. + // Step 5. Write FeatureRow to the corresponding Store. WriteResult writeFeatureRows = - validatedRows.get(FEATURE_ROW_OUT).apply("WriteFeatureRowToStore", featureSink.writer()); + storeAllocatedRows + .get(storeTags.get(store)) + .apply("WriteFeatureRowToStore", featureSink.writer()); - // Step 4. Write FailedElements to a dead letter table in BigQuery. + // Step 6. Write FailedElements to a dead letter table in BigQuery. if (options.getDeadLetterTableSpec() != null) { // TODO: make deadletter destination type configurable DeadletterSink deadletterSink = @@ -172,7 +172,7 @@ public void process(ProcessContext c) { .apply("WriteFailedElements_WriteFeatureRowToStore", deadletterSink.write()); } - // Step 5. Write metrics to a metrics sink. + // Step 7. Write metrics to a metrics sink. writeFeatureRows .getSuccessfulInserts() .apply("WriteSuccessMetrics", WriteSuccessMetricsTransform.create(store.getName())); @@ -182,6 +182,13 @@ public void process(ProcessContext c) { .apply("WriteFailureMetrics", WriteFailureMetricsTransform.create(store.getName())); } + // Step 8. Send ack that FeatureSetSpec state is updated + featureSetSpecs.apply( + "WriteAck", + WriteFeatureSetSpecAck.newBuilder() + .setSpecsStreamingUpdateConfig(specsStreamingUpdateConfig) + .build()); + return pipeline.run(); } } diff --git a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java index e3a1b841c4..9ee1ff891a 100644 --- a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java +++ b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java @@ -36,16 +36,29 @@ public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions, @Required @Description( - "JSON string representation of the FeatureSet that the import job will process, in BZip2 binary format." - + "FeatureSet follows the format in feast.core.FeatureSet proto." - + "Mutliple FeatureSetSpec can be passed by specifying '--featureSet={...}' multiple times" + "JSON string representation of the SpecsPipe configuration." + + "Job will use this to know where read new FeatureSetSpec from (kafka broker & topic)" + + "and where send acknowledgment on successful update of job's state to." + + "SpecsStreamingUpdateConfig follows the format in feast.core.IngestionJob.SpecsStreamingUpdateConfig proto." + "The conversion of Proto message to JSON should follow this mapping:" + "https://developers.google.com/protocol-buffers/docs/proto3#json" + "Please minify and remove all insignificant whitespace such as newline in the JSON string" + "to prevent error when parsing the options") - byte[] getFeatureSetJson(); + String getSpecsStreamingUpdateConfigJson(); - void setFeatureSetJson(byte[] featureSetJson); + void setSpecsStreamingUpdateConfigJson(String json); + + @Required + @Description( + "JSON string representation of the Source that will be used to read FeatureRows from." + + "Source follows the format in featst.core.Source proto. Currently only kafka source is supported" + + "The conversion of Proto message to JSON should follow this mapping:" + + "https://developers.google.com/protocol-buffers/docs/proto3#json" + + "Please minify and remove all insignificant whitespace such as newline in the JSON string" + + "to prevent error when parsing the options") + String getSourceJson(); + + void setSourceJson(String json); @Required @Description( diff --git a/ingestion/src/main/java/feast/ingestion/transform/FeatureRowToStoreAllocator.java b/ingestion/src/main/java/feast/ingestion/transform/FeatureRowToStoreAllocator.java new file mode 100644 index 0000000000..48889773c4 --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/transform/FeatureRowToStoreAllocator.java @@ -0,0 +1,93 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.transform; + +import static feast.ingestion.utils.SpecUtil.parseFeatureSetReference; + +import com.google.auto.value.AutoValue; +import feast.ingestion.utils.SpecUtil; +import feast.proto.core.StoreProto; +import feast.proto.types.FeatureRowProto; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.commons.lang3.tuple.Pair; + +/** + * For each incoming {@link FeatureRowProto.FeatureRow} allocator choose only stores that + * subscripted to its project and featureSet names. + * + *

Return PCollectionTuple with one {@link TupleTag} per {@link StoreProto.Store}. Tags must be + * generated in advance. + */ +@AutoValue +public abstract class FeatureRowToStoreAllocator + extends PTransform, PCollectionTuple> { + public abstract List getStores(); + + public abstract Map> getStoreTags(); + + public static Builder newBuilder() { + return new AutoValue_FeatureRowToStoreAllocator.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setStores(List stores); + + public abstract Builder setStoreTags( + Map> tags); + + public abstract FeatureRowToStoreAllocator build(); + } + + @Override + public PCollectionTuple expand(PCollection input) { + return input.apply( + "AssignRowToStore", + ParDo.of( + new DoFn() { + @ProcessElement + public void process(ProcessContext c, @Element FeatureRowProto.FeatureRow row) { + Pair projectAndSetNames = + parseFeatureSetReference(row.getFeatureSet()); + getStores().stream() + .filter( + s -> + SpecUtil.isSubscribedToFeatureSet( + s.getSubscriptionsList(), + projectAndSetNames.getLeft(), + projectAndSetNames.getRight())) + .forEach(s -> c.output(getStoreTags().get(s), row)); + } + }) + .withOutputTags( + getStoreTags().get(getStores().get(0)), + TupleTagList.of( + getStores().stream() + .skip(1) + .map(getStoreTags()::get) + .collect(Collectors.toList())))); + } +} diff --git a/ingestion/src/main/java/feast/ingestion/transform/ProcessAndValidateFeatureRows.java b/ingestion/src/main/java/feast/ingestion/transform/ProcessAndValidateFeatureRows.java index 2ce8918ccf..729bff69a7 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/ProcessAndValidateFeatureRows.java +++ b/ingestion/src/main/java/feast/ingestion/transform/ProcessAndValidateFeatureRows.java @@ -19,25 +19,20 @@ import com.google.auto.value.AutoValue; import feast.ingestion.transform.fn.ProcessFeatureRowDoFn; import feast.ingestion.transform.fn.ValidateFeatureRowDoFn; -import feast.ingestion.values.FeatureSet; import feast.proto.core.FeatureSetProto; import feast.proto.types.FeatureRowProto.FeatureRow; import feast.storage.api.writer.FailedElement; import java.util.Map; -import java.util.stream.Collectors; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TupleTagList; -import org.apache.commons.lang3.tuple.Pair; +import org.apache.beam.sdk.values.*; @AutoValue public abstract class ProcessAndValidateFeatureRows extends PTransform, PCollectionTuple> { - public abstract Map getFeatureSetSpecs(); + public abstract PCollectionView>> + getFeatureSetSpecs(); public abstract String getDefaultProject(); @@ -53,7 +48,7 @@ public static Builder newBuilder() { public abstract static class Builder { public abstract Builder setFeatureSetSpecs( - Map featureSets); + PCollectionView>> featureSets); public abstract Builder setDefaultProject(String defaultProject); @@ -66,22 +61,17 @@ public abstract Builder setFeatureSetSpecs( @Override public PCollectionTuple expand(PCollection input) { - - Map featureSets = - getFeatureSetSpecs().entrySet().stream() - .map(e -> Pair.of(e.getKey(), new FeatureSet(e.getValue()))) - .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); - return input .apply("ProcessFeatureRows", ParDo.of(new ProcessFeatureRowDoFn(getDefaultProject()))) .apply( "ValidateFeatureRows", ParDo.of( ValidateFeatureRowDoFn.newBuilder() - .setFeatureSets(featureSets) + .setFeatureSets(getFeatureSetSpecs()) .setSuccessTag(getSuccessTag()) .setFailureTag(getFailureTag()) .build()) + .withSideInputs(getFeatureSetSpecs()) .withOutputTags(getSuccessTag(), TupleTagList.of(getFailureTag()))); } } diff --git a/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java index 856d94828a..b89af89140 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java +++ b/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java @@ -17,8 +17,10 @@ package feast.ingestion.transform.fn; import com.google.auto.value.AutoValue; +import com.google.common.collect.Iterators; import feast.ingestion.values.FeatureSet; import feast.ingestion.values.Field; +import feast.proto.core.FeatureSetProto; import feast.proto.types.FeatureRowProto.FeatureRow; import feast.proto.types.FieldProto; import feast.proto.types.ValueProto.Value.ValCase; @@ -27,12 +29,14 @@ import java.util.List; import java.util.Map; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @AutoValue public abstract class ValidateFeatureRowDoFn extends DoFn { - public abstract Map getFeatureSets(); + public abstract PCollectionView>> + getFeatureSets(); public abstract TupleTag getSuccessTag(); @@ -45,7 +49,8 @@ public static Builder newBuilder() { @AutoValue.Builder public abstract static class Builder { - public abstract Builder setFeatureSets(Map featureSets); + public abstract Builder setFeatureSets( + PCollectionView>> featureSets); public abstract Builder setSuccessTag(TupleTag successTag); @@ -58,9 +63,14 @@ public abstract static class Builder { public void processElement(ProcessContext context) { String error = null; FeatureRow featureRow = context.element(); - FeatureSet featureSet = getFeatureSets().get(featureRow.getFeatureSet()); + Iterable featureSetSpecs = + context.sideInput(getFeatureSets()).get(featureRow.getFeatureSet()); + List fields = new ArrayList<>(); - if (featureSet != null) { + if (featureSetSpecs != null) { + FeatureSetProto.FeatureSetSpec latestSpec = Iterators.getLast(featureSetSpecs.iterator()); + FeatureSet featureSet = new FeatureSet(latestSpec); + for (FieldProto.Field field : featureRow.getFieldsList()) { Field fieldSpec = featureSet.getField(field.getName()); if (fieldSpec == null) { @@ -98,9 +108,10 @@ public void processElement(ProcessContext context) { .setJobName(context.getPipelineOptions().getJobName()) .setPayload(featureRow.toString()) .setErrorMessage(error); - if (featureSet != null) { - String[] split = featureSet.getReference().split("/"); - failedElement = failedElement.setProjectName(split[0]).setFeatureSetName(split[1]); + if (featureSetSpecs != null) { + FeatureSetProto.FeatureSetSpec spec = Iterators.getLast(featureSetSpecs.iterator()); + failedElement = + failedElement.setProjectName(spec.getProject()).setFeatureSetName(spec.getName()); } context.output(getFailureTag(), failedElement.build()); } else { diff --git a/ingestion/src/main/java/feast/ingestion/transform/specs/FilterRelevantFunction.java b/ingestion/src/main/java/feast/ingestion/transform/specs/FilterRelevantFunction.java new file mode 100644 index 0000000000..cb2c84e2cd --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/transform/specs/FilterRelevantFunction.java @@ -0,0 +1,52 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.transform.specs; + +import feast.ingestion.utils.SpecUtil; +import feast.proto.core.FeatureSetProto; +import feast.proto.core.SourceProto; +import feast.proto.core.StoreProto; +import java.util.List; +import org.apache.beam.sdk.transforms.ProcessFunction; +import org.apache.beam.sdk.values.KV; + +/** + * Selects only those FeatureSetSpecs that have the same source as current Job and their featureSet + * reference matches to current Job Subscriptions (extracted from Stores) + */ +public class FilterRelevantFunction + implements ProcessFunction, Boolean> { + private final List stores; + private final SourceProto.Source source; + + public FilterRelevantFunction(SourceProto.Source source, List stores) { + this.source = source; + this.stores = stores; + } + + @Override + public Boolean apply(KV input) throws Exception { + return stores.stream() + .anyMatch( + s -> + SpecUtil.isSubscribedToFeatureSet( + s.getSubscriptionsList(), + input.getValue().getProject(), + input.getValue().getName())) + && input.getValue().getSource().equals(source); + } +} diff --git a/ingestion/src/main/java/feast/ingestion/transform/specs/KafkaRecordToFeatureSetSpec.java b/ingestion/src/main/java/feast/ingestion/transform/specs/KafkaRecordToFeatureSetSpec.java new file mode 100644 index 0000000000..47aefd0c0b --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/transform/specs/KafkaRecordToFeatureSetSpec.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.transform.specs; + +import com.google.protobuf.InvalidProtocolBufferException; +import feast.proto.core.FeatureSetProto; +import org.apache.beam.sdk.io.kafka.KafkaRecord; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.KV; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Parse FeatureSetSpec from KafkaRecord. We expect message with: + * + *

+ * + *

    + *
  • key: String - FeatureSet reference + *
  • value: FeatureSetSpec serialized in proto + *
+ */ +public class KafkaRecordToFeatureSetSpec + extends DoFn, KV> { + private static final Logger log = LoggerFactory.getLogger(KafkaRecordToFeatureSetSpec.class); + + @ProcessElement + public void process(ProcessContext c) { + try { + FeatureSetProto.FeatureSetSpec featureSetSpec = + FeatureSetProto.FeatureSetSpec.parseFrom(c.element().getKV().getValue()); + c.output(KV.of(new String(c.element().getKV().getKey()), featureSetSpec)); + } catch (InvalidProtocolBufferException e) { + log.error( + String.format( + "Unable to decode FeatureSetSpec with reference %s", c.element().getKV().getKey())); + } + } +} diff --git a/ingestion/src/main/java/feast/ingestion/transform/specs/ReadFeatureSetSpecs.java b/ingestion/src/main/java/feast/ingestion/transform/specs/ReadFeatureSetSpecs.java new file mode 100644 index 0000000000..c777438be6 --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/transform/specs/ReadFeatureSetSpecs.java @@ -0,0 +1,107 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.transform.specs; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import feast.proto.core.FeatureSetProto.FeatureSetSpec; +import feast.proto.core.IngestionJobProto; +import feast.proto.core.SourceProto; +import feast.proto.core.StoreProto; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.transforms.*; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.joda.time.Duration; + +/** + * Source of {@link FeatureSetSpec} + * + *

Reads from Kafka topic (from the beginning) {@link FeatureSetSpec}. + * + *

Filters only relevant Specs by {@link feast.proto.core.StoreProto.Store} subscriptions and + * {@link feast.proto.core.SourceProto.Source}. + * + *

Compacts {@link FeatureSetSpec} by reference (if it was not yet compacted in Kafka). + */ +@AutoValue +public abstract class ReadFeatureSetSpecs + extends PTransform>> { + public abstract IngestionJobProto.SpecsStreamingUpdateConfig getSpecsStreamingUpdateConfig(); + + public abstract SourceProto.Source getSource(); + + public abstract List getStores(); + + public static Builder newBuilder() { + return new AutoValue_ReadFeatureSetSpecs.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setSpecsStreamingUpdateConfig( + IngestionJobProto.SpecsStreamingUpdateConfig config); + + public abstract Builder setSource(SourceProto.Source source); + + public abstract Builder setStores(List stores); + + public abstract ReadFeatureSetSpecs build(); + } + + @Override + public PCollection> expand(PBegin input) { + return input + .apply( + KafkaIO.readBytes() + .withBootstrapServers( + getSpecsStreamingUpdateConfig().getSource().getBootstrapServers()) + .withTopic(getSpecsStreamingUpdateConfig().getSource().getTopic()) + .withConsumerConfigUpdates( + ImmutableMap.of( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + "earliest", + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, + false))) + .apply("ParseFeatureSetSpec", ParDo.of(new KafkaRecordToFeatureSetSpec())) + .apply("OnlyRelevantSpecs", Filter.by(new FilterRelevantFunction(getSource(), getStores()))) + .apply( + Window.>into(new GlobalWindows()) + .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) + .accumulatingFiredPanes() + .withAllowedLateness(Duration.ZERO)) + .apply( + Combine.perKey( + (SerializableFunction, FeatureSetSpec>) + specs -> { + ArrayList featureSetSpecs = Lists.newArrayList(specs); + featureSetSpecs.sort( + Comparator.comparing(FeatureSetSpec::getVersion).reversed()); + return featureSetSpecs.get(0); + })); + } +} diff --git a/ingestion/src/main/java/feast/ingestion/transform/specs/WriteFeatureSetSpecAck.java b/ingestion/src/main/java/feast/ingestion/transform/specs/WriteFeatureSetSpecAck.java new file mode 100644 index 0000000000..b5d49b7076 --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/transform/specs/WriteFeatureSetSpecAck.java @@ -0,0 +1,85 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.transform.specs; + +import com.google.auto.value.AutoValue; +import feast.proto.core.FeatureSetProto; +import feast.proto.core.IngestionJobProto; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; + +/** + * Converts input {@link feast.proto.core.FeatureSetProto.FeatureSetSpec} into {@link + * feast.proto.core.IngestionJobProto.FeatureSetSpecAck} message and writes it to kafka (ack-topic). + */ +@AutoValue +public abstract class WriteFeatureSetSpecAck + extends PTransform>, PDone> { + public abstract IngestionJobProto.SpecsStreamingUpdateConfig getSpecsStreamingUpdateConfig(); + + public static Builder newBuilder() { + return new AutoValue_WriteFeatureSetSpecAck.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setSpecsStreamingUpdateConfig( + IngestionJobProto.SpecsStreamingUpdateConfig config); + + public abstract WriteFeatureSetSpecAck build(); + } + + @Override + public PDone expand(PCollection> input) { + return input + .apply("FeatureSetSpecToAckMessage", ParDo.of(new BuildAckMessage())) + .apply( + "ToKafka", + KafkaIO.write() + .withBootstrapServers( + getSpecsStreamingUpdateConfig().getAck().getBootstrapServers()) + .withTopic(getSpecsStreamingUpdateConfig().getAck().getTopic()) + .withKeySerializer(StringSerializer.class) + .withValueSerializer(ByteArraySerializer.class)); + } + + private static class BuildAckMessage + extends DoFn, KV> { + @ProcessElement + public void process(ProcessContext c) throws IOException { + ByteArrayOutputStream encodedAck = new ByteArrayOutputStream(); + + IngestionJobProto.FeatureSetSpecAck.newBuilder() + .setFeatureSetReference(c.element().getKey()) + .setJobName(c.getPipelineOptions().getJobName()) + .setFeatureSetVersion(c.element().getValue().getVersion()) + .build() + .writeTo(encodedAck); + + c.output(KV.of(c.element().getKey(), encodedAck.toByteArray())); + } + } +} diff --git a/ingestion/src/main/java/feast/ingestion/utils/SpecUtil.java b/ingestion/src/main/java/feast/ingestion/utils/SpecUtil.java index a1356584b3..79c81c162d 100644 --- a/ingestion/src/main/java/feast/ingestion/utils/SpecUtil.java +++ b/ingestion/src/main/java/feast/ingestion/utils/SpecUtil.java @@ -20,9 +20,11 @@ import com.google.protobuf.util.JsonFormat; import feast.ingestion.values.Field; import feast.proto.core.FeatureSetProto.EntitySpec; -import feast.proto.core.FeatureSetProto.FeatureSet; import feast.proto.core.FeatureSetProto.FeatureSetSpec; import feast.proto.core.FeatureSetProto.FeatureSpec; +import feast.proto.core.IngestionJobProto; +import feast.proto.core.IngestionJobProto.SpecsStreamingUpdateConfig; +import feast.proto.core.SourceProto.Source; import feast.proto.core.StoreProto.Store; import feast.proto.core.StoreProto.Store.Subscription; import java.util.ArrayList; @@ -30,62 +32,60 @@ import java.util.List; import java.util.Map; import java.util.regex.Pattern; +import org.apache.commons.lang3.tuple.Pair; public class SpecUtil { + public static String PROJECT_DEFAULT_NAME = "default"; public static String getFeatureSetReference(FeatureSetSpec featureSetSpec) { return String.format("%s/%s", featureSetSpec.getProject(), featureSetSpec.getName()); } + public static Pair parseFeatureSetReference(String reference) { + String[] split = reference.split("/", 2); + if (split.length == 1) { + return Pair.of(PROJECT_DEFAULT_NAME, split[0]); + } else { + return Pair.of(split[0], split[1]); + } + } + /** Get only feature set specs that matches the subscription */ - public static List getSubscribedFeatureSets( - List subscriptions, List featureSets) { - List subscribed = new ArrayList<>(); - for (FeatureSet featureSet : featureSets) { - for (Subscription sub : subscriptions) { - // If configuration missing, fail - if (sub.getProject().isEmpty() || sub.getName().isEmpty()) { - throw new IllegalArgumentException( - String.format("Subscription is missing arguments: %s", sub.toString())); - } + public static boolean isSubscribedToFeatureSet( + List subscriptions, String projectName, String featureSetName) { - // If all wildcards, subscribe to everything - if (sub.getProject().equals("*") || sub.getName().equals("*")) { - subscribed.add(featureSet); - break; - } + for (Subscription sub : subscriptions) { + // If configuration missing, fail + if (sub.getProject().isEmpty() || sub.getName().isEmpty()) { + throw new IllegalArgumentException( + String.format("Subscription is missing arguments: %s", sub.toString())); + } - // Match project name - if (!featureSet.getSpec().getProject().equals(sub.getProject())) { - continue; - } + // If all wildcards, subscribe to everything + if (sub.getProject().equals("*") || sub.getName().equals("*")) { + return true; + } - // Convert wildcard to regex - String subName = sub.getName(); - if (!sub.getName().contains(".*")) { - subName = subName.replace("*", ".*"); - } + // Match project name + if (!projectName.equals(sub.getProject())) { + continue; + } - // Match feature set name to pattern - Pattern pattern = Pattern.compile(subName); - if (!pattern.matcher(featureSet.getSpec().getName()).matches()) { - continue; - } - subscribed.add(featureSet); + // Convert wildcard to regex + String subName = sub.getName(); + if (!sub.getName().contains(".*")) { + subName = subName.replace("*", ".*"); } - } - return subscribed; - } - public static List parseFeatureSetSpecJsonList(List jsonList) - throws InvalidProtocolBufferException { - List featureSets = new ArrayList<>(); - for (String json : jsonList) { - FeatureSetSpec.Builder builder = FeatureSetSpec.newBuilder(); - JsonFormat.parser().merge(json, builder); - featureSets.add(FeatureSet.newBuilder().setSpec(builder.build()).build()); + // Match feature set name to pattern + Pattern pattern = Pattern.compile(subName); + if (!pattern.matcher(featureSetName).matches()) { + continue; + } + return true; } - return featureSets; + + return false; } public static List parseStoreJsonList(List jsonList) @@ -99,6 +99,19 @@ public static List parseStoreJsonList(List jsonList) return stores; } + public static Source parseSourceJson(String jsonSource) throws InvalidProtocolBufferException { + Source.Builder builder = Source.newBuilder(); + JsonFormat.parser().merge(jsonSource, builder); + return builder.build(); + } + + public static IngestionJobProto.SpecsStreamingUpdateConfig parseSpecsStreamingUpdateConfig( + String jsonConfig) throws InvalidProtocolBufferException { + SpecsStreamingUpdateConfig.Builder builder = SpecsStreamingUpdateConfig.newBuilder(); + JsonFormat.parser().merge(jsonConfig, builder); + return builder.build(); + } + public static Map getFieldsByName(FeatureSetSpec featureSetSpec) { Map fieldByName = new HashMap<>(); for (EntitySpec entitySpec : featureSetSpec.getEntitiesList()) { diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java index 39e4296378..47b5aa3f9b 100644 --- a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java +++ b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java @@ -16,15 +16,18 @@ */ package feast.ingestion; +import static feast.ingestion.utils.SpecUtil.getFeatureSetReference; + +import com.google.common.collect.ImmutableList; import com.google.common.io.Files; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; -import feast.ingestion.options.BZip2Compressor; import feast.ingestion.options.ImportOptions; import feast.proto.core.FeatureSetProto.EntitySpec; import feast.proto.core.FeatureSetProto.FeatureSet; import feast.proto.core.FeatureSetProto.FeatureSetSpec; import feast.proto.core.FeatureSetProto.FeatureSpec; +import feast.proto.core.IngestionJobProto; import feast.proto.core.SourceProto.KafkaSourceConfig; import feast.proto.core.SourceProto.Source; import feast.proto.core.SourceProto.SourceType; @@ -53,6 +56,7 @@ import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -74,6 +78,9 @@ public class ImportJobTest { private static final String KAFKA_BOOTSTRAP_SERVERS = KAFKA_HOST + ":" + KAFKA_PORT; private static final short KAFKA_REPLICATION_FACTOR = 1; private static final String KAFKA_TOPIC = "topic_1"; + private static final String KAFKA_SPECS_TOPIC = "topic_specs_1"; + private static final String KAFKA_SPECS_ACK_TOPIC = "topic_specs_ack_1"; + private static final long KAFKA_PUBLISH_TIMEOUT_SEC = 10; @SuppressWarnings("UnstableApiUsage") @@ -117,6 +124,30 @@ public static void tearDown() { @Test public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow() throws IOException, InterruptedException { + Source featureSource = + Source.newBuilder() + .setType(SourceType.KAFKA) + .setKafkaSourceConfig( + KafkaSourceConfig.newBuilder() + .setBootstrapServers(KAFKA_HOST + ":" + KAFKA_PORT) + .setTopic(KAFKA_TOPIC) + .build()) + .build(); + + IngestionJobProto.SpecsStreamingUpdateConfig specsStreamingUpdateConfig = + IngestionJobProto.SpecsStreamingUpdateConfig.newBuilder() + .setSource( + KafkaSourceConfig.newBuilder() + .setBootstrapServers(KAFKA_HOST + ":" + KAFKA_PORT) + .setTopic(KAFKA_SPECS_TOPIC) + .build()) + .setAck( + KafkaSourceConfig.newBuilder() + .setBootstrapServers(KAFKA_HOST + ":" + KAFKA_PORT) + .setTopic(KAFKA_SPECS_ACK_TOPIC) + .build()) + .build(); + FeatureSetSpec spec = FeatureSetSpec.newBuilder() .setName("feature_set") @@ -140,15 +171,7 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow() FeatureSpec.newBuilder().setName("feature_2").setValueType(Enum.STRING).build()) .addFeatures( FeatureSpec.newBuilder().setName("feature_3").setValueType(Enum.INT64).build()) - .setSource( - Source.newBuilder() - .setType(SourceType.KAFKA) - .setKafkaSourceConfig( - KafkaSourceConfig.newBuilder() - .setBootstrapServers(KAFKA_HOST + ":" + KAFKA_PORT) - .setTopic(KAFKA_TOPIC) - .build()) - .build()) + .setSource(featureSource) .build(); FeatureSet featureSet = FeatureSet.newBuilder().setSpec(spec).build(); @@ -167,20 +190,16 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow() .build(); ImportOptions options = PipelineOptionsFactory.create().as(ImportOptions.class); - BZip2Compressor compressor = - new BZip2Compressor<>( - option -> { - JsonFormat.Printer printer = - JsonFormat.printer().omittingInsignificantWhitespace().printingEnumsAsInts(); - return printer.print(option).getBytes(); - }); - options.setFeatureSetJson(compressor.compress(spec)); + + options.setSpecsStreamingUpdateConfigJson( + JsonFormat.printer().print(specsStreamingUpdateConfig)); + options.setSourceJson(JsonFormat.printer().print(featureSource)); options.setStoreJson(Collections.singletonList(JsonFormat.printer().print(redis))); options.setDefaultFeastProject("myproject"); options.setProject(""); options.setBlockOnRun(false); - List input = new ArrayList<>(); + List> input = new ArrayList<>(); Map expected = new HashMap<>(); LOGGER.info("Generating test data ..."); @@ -189,7 +208,7 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow() i -> { FeatureRow randomRow = TestUtil.createRandomFeatureRow(featureSet.getSpec()); RedisKey redisKey = TestUtil.createRedisKey(featureSet.getSpec(), randomRow); - input.add(randomRow); + input.add(Pair.of("", randomRow)); List fields = randomRow.getFieldsList().stream() .filter( @@ -216,7 +235,13 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow() Assert.assertEquals(pipelineResult.getState(), State.RUNNING); LOGGER.info("Publishing {} Feature Row messages to Kafka ...", input.size()); - TestUtil.publishFeatureRowsToKafka( + TestUtil.publishToKafka( + KAFKA_BOOTSTRAP_SERVERS, + KAFKA_SPECS_TOPIC, + ImmutableList.of(Pair.of(getFeatureSetReference(spec), spec)), + ByteArraySerializer.class, + KAFKA_PUBLISH_TIMEOUT_SEC); + TestUtil.publishToKafka( KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, input, diff --git a/ingestion/src/test/java/feast/ingestion/transform/FeatureRowToStoreAllocatorTest.java b/ingestion/src/test/java/feast/ingestion/transform/FeatureRowToStoreAllocatorTest.java new file mode 100644 index 0000000000..0ba5d56369 --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/transform/FeatureRowToStoreAllocatorTest.java @@ -0,0 +1,99 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.transform; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import feast.proto.core.StoreProto; +import feast.proto.types.FeatureRowProto; +import java.util.Map; +import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.junit.Rule; +import org.junit.Test; + +public class FeatureRowToStoreAllocatorTest { + @Rule public transient TestPipeline p = TestPipeline.create(); + + private StoreProto.Store newStore(String s) { + return StoreProto.Store.newBuilder() + .addSubscriptions( + StoreProto.Store.Subscription.newBuilder().setProject("project").setName(s).build()) + .build(); + } + + @Test + public void featureRowShouldBeAllocatedToStoreTagsAccordingToSubscription() { + StoreProto.Store bqOnlyStore = newStore("bq*"); + StoreProto.Store redisOnlyStore = newStore("redis*"); + StoreProto.Store anyStore = newStore("*"); + + Map> storeTags = + ImmutableMap.of( + bqOnlyStore, new TupleTag<>(), + redisOnlyStore, new TupleTag<>(), + anyStore, new TupleTag<>()); + + PCollectionTuple allocatedRows = + p.apply( + Create.of( + FeatureRowProto.FeatureRow.newBuilder().setFeatureSet("project/bq_1").build(), + FeatureRowProto.FeatureRow.newBuilder().setFeatureSet("project/bq_2").build(), + FeatureRowProto.FeatureRow.newBuilder() + .setFeatureSet("project/redis_1") + .build(), + FeatureRowProto.FeatureRow.newBuilder() + .setFeatureSet("project/redis_2") + .build(), + FeatureRowProto.FeatureRow.newBuilder() + .setFeatureSet("project/redis_3") + .build())) + .apply( + FeatureRowToStoreAllocator.newBuilder() + .setStoreTags(storeTags) + .setStores(ImmutableList.of(bqOnlyStore, redisOnlyStore, anyStore)) + .build()); + + PAssert.that( + allocatedRows + .get(storeTags.get(bqOnlyStore)) + .setCoder(ProtoCoder.of(FeatureRowProto.FeatureRow.class)) + .apply("CountBq", Count.globally())) + .containsInAnyOrder(2L); + + PAssert.that( + allocatedRows + .get(storeTags.get(redisOnlyStore)) + .setCoder(ProtoCoder.of(FeatureRowProto.FeatureRow.class)) + .apply("CountRedis", Count.globally())) + .containsInAnyOrder(3L); + + PAssert.that( + allocatedRows + .get(storeTags.get(anyStore)) + .setCoder(ProtoCoder.of(FeatureRowProto.FeatureRow.class)) + .apply("CountAll", Count.globally())) + .containsInAnyOrder(5L); + + p.run(); + } +} diff --git a/ingestion/src/test/java/feast/ingestion/transform/ProcessAndValidateFeatureRowsTest.java b/ingestion/src/test/java/feast/ingestion/transform/ProcessAndValidateFeatureRowsTest.java index 8c5d7bd8ed..010a593ab8 100644 --- a/ingestion/src/test/java/feast/ingestion/transform/ProcessAndValidateFeatureRowsTest.java +++ b/ingestion/src/test/java/feast/ingestion/transform/ProcessAndValidateFeatureRowsTest.java @@ -34,7 +34,9 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.junit.Rule; import org.junit.Test; @@ -104,6 +106,9 @@ public void shouldWriteSuccessAndFailureTagsCorrectly() { input.add(FeatureRow.newBuilder().setFeatureSet("invalid").build()); + PCollectionView>> specsView = + p.apply("StaticSpecs", Create.of(featureSetSpecs)).apply(View.asMultimap()); + PCollectionTuple output = p.apply(Create.of(input)) .setCoder(ProtoCoder.of(FeatureRow.class)) @@ -112,7 +117,7 @@ public void shouldWriteSuccessAndFailureTagsCorrectly() { .setDefaultProject("myproject") .setFailureTag(FAILURE_TAG) .setSuccessTag(SUCCESS_TAG) - .setFeatureSetSpecs(featureSetSpecs) + .setFeatureSetSpecs(specsView) .build()); PAssert.that(output.get(SUCCESS_TAG)).containsInAnyOrder(expected); @@ -154,6 +159,9 @@ public void shouldStripVersions() { randomRow = randomRow.toBuilder().setFeatureSet("myproject/feature_set:1").build(); input.add(randomRow); + PCollectionView>> specsView = + p.apply("StaticSpecs", Create.of(featureSetSpecs)).apply(View.asMultimap()); + PCollectionTuple output = p.apply(Create.of(input)) .setCoder(ProtoCoder.of(FeatureRow.class)) @@ -162,7 +170,7 @@ public void shouldStripVersions() { .setDefaultProject("myproject") .setFailureTag(FAILURE_TAG) .setSuccessTag(SUCCESS_TAG) - .setFeatureSetSpecs(featureSetSpecs) + .setFeatureSetSpecs(specsView) .build()); PAssert.that(output.get(SUCCESS_TAG)).containsInAnyOrder(expected); @@ -203,6 +211,9 @@ public void shouldApplyDefaultProject() { randomRow = randomRow.toBuilder().setFeatureSet("feature_set").build(); input.add(randomRow); + PCollectionView>> specsView = + p.apply("StaticSpecs", Create.of(featureSetSpecs)).apply(View.asMultimap()); + PCollectionTuple output = p.apply(Create.of(input)) .setCoder(ProtoCoder.of(FeatureRow.class)) @@ -211,7 +222,7 @@ public void shouldApplyDefaultProject() { .setDefaultProject("myproject") .setFailureTag(FAILURE_TAG) .setSuccessTag(SUCCESS_TAG) - .setFeatureSetSpecs(featureSetSpecs) + .setFeatureSetSpecs(specsView) .build()); PAssert.that(output.get(SUCCESS_TAG)).containsInAnyOrder(expected); @@ -241,8 +252,8 @@ public void shouldExcludeUnregisteredFields() { FeatureSpec.newBuilder().setName("feature_2").setValueType(Enum.INT64).build()) .build(); - Map featureSets = new HashMap<>(); - featureSets.put("myproject/feature_set", fs1); + Map featureSetSpecs = new HashMap<>(); + featureSetSpecs.put("myproject/feature_set", fs1); List input = new ArrayList<>(); List expected = new ArrayList<>(); @@ -258,6 +269,9 @@ public void shouldExcludeUnregisteredFields() { .setValue(Value.newBuilder().setStringVal("hello"))) .build()); + PCollectionView>> specsView = + p.apply("StaticSpecs", Create.of(featureSetSpecs)).apply(View.asMultimap()); + PCollectionTuple output = p.apply(Create.of(input)) .setCoder(ProtoCoder.of(FeatureRow.class)) @@ -266,7 +280,7 @@ public void shouldExcludeUnregisteredFields() { .setDefaultProject("myproject") .setFailureTag(FAILURE_TAG) .setSuccessTag(SUCCESS_TAG) - .setFeatureSetSpecs(featureSets) + .setFeatureSetSpecs(specsView) .build()); PAssert.that(output.get(SUCCESS_TAG)).containsInAnyOrder(expected); diff --git a/ingestion/src/test/java/feast/ingestion/transform/specs/FeatureSetSpecReadAndWriteTest.java b/ingestion/src/test/java/feast/ingestion/transform/specs/FeatureSetSpecReadAndWriteTest.java new file mode 100644 index 0000000000..b0cad59b5e --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/transform/specs/FeatureSetSpecReadAndWriteTest.java @@ -0,0 +1,244 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.transform.specs; + +import static feast.ingestion.utils.SpecUtil.getFeatureSetReference; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.MatcherAssert.assertThat; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.io.Files; +import com.google.protobuf.InvalidProtocolBufferException; +import feast.proto.core.FeatureSetProto; +import feast.proto.core.IngestionJobProto; +import feast.proto.core.SourceProto; +import feast.proto.core.StoreProto; +import feast.test.TestUtil; +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; +import org.apache.beam.runners.direct.DirectOptions; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.joda.time.Duration; +import org.junit.*; + +public class FeatureSetSpecReadAndWriteTest { + @Rule public transient TestPipeline p = TestPipeline.fromOptions(makePipelineOptions()); + + private static final String KAFKA_HOST = "localhost"; + private static final int KAFKA_PORT = 29092; + private static final String KAFKA_BOOTSTRAP_SERVERS = KAFKA_HOST + ":" + KAFKA_PORT; + private static final short KAFKA_REPLICATION_FACTOR = 1; + private static final String KAFKA_TOPIC = "topic"; + private static final String KAFKA_SPECS_TOPIC = "topic_specs"; + private static final String KAFKA_SPECS_ACK_TOPIC = "topic_specs_ack"; + + private static final long KAFKA_PUBLISH_TIMEOUT_SEC = 10; + private static final long KAFKA_POLL_TIMEOUT_SEC = 10; + + private KafkaConsumer consumer; + + @SuppressWarnings("UnstableApiUsage") + private static final String ZOOKEEPER_DATA_DIR = Files.createTempDir().getAbsolutePath(); + + private static final String ZOOKEEPER_HOST = "localhost"; + private static final int ZOOKEEPER_PORT = 2183; + + @BeforeClass + public static void setupClass() throws IOException, InterruptedException { + TestUtil.LocalKafka.start( + KAFKA_HOST, + KAFKA_PORT, + KAFKA_REPLICATION_FACTOR, + true, + ZOOKEEPER_HOST, + ZOOKEEPER_PORT, + ZOOKEEPER_DATA_DIR); + } + + @Before + public void setup() { + consumer = + TestUtil.makeKafkaConsumer( + KAFKA_BOOTSTRAP_SERVERS, KAFKA_SPECS_ACK_TOPIC, AckMessageDeserializer.class); + } + + @AfterClass + public static void tearDown() { + TestUtil.LocalKafka.stop(); + } + + public static PipelineOptions makePipelineOptions() { + DirectOptions options = PipelineOptionsFactory.as(DirectOptions.class); + options.setJobName("test_job"); + options.setBlockOnRun(false); + return options; + } + + @Test + public void pipelineShouldReadSpecsAndAcknowledge() { + SourceProto.Source source = + SourceProto.Source.newBuilder() + .setKafkaSourceConfig( + SourceProto.KafkaSourceConfig.newBuilder() + .setBootstrapServers(KAFKA_BOOTSTRAP_SERVERS) + .setTopic(KAFKA_TOPIC) + .build()) + .build(); + + StoreProto.Store store = + StoreProto.Store.newBuilder() + .addSubscriptions( + StoreProto.Store.Subscription.newBuilder() + .setProject("project") + .setName("*") + .build()) + .build(); + + IngestionJobProto.SpecsStreamingUpdateConfig specsStreamingUpdateConfig = + IngestionJobProto.SpecsStreamingUpdateConfig.newBuilder() + .setSource( + SourceProto.KafkaSourceConfig.newBuilder() + .setBootstrapServers(KAFKA_BOOTSTRAP_SERVERS) + .setTopic(KAFKA_SPECS_TOPIC) + .build()) + .setAck( + SourceProto.KafkaSourceConfig.newBuilder() + .setBootstrapServers(KAFKA_BOOTSTRAP_SERVERS) + .setTopic(KAFKA_SPECS_ACK_TOPIC) + .build()) + .build(); + + p.apply( + ReadFeatureSetSpecs.newBuilder() + .setSource(source) + .setStores(ImmutableList.of(store)) + .setSpecsStreamingUpdateConfig(specsStreamingUpdateConfig) + .build()) + .apply( + WriteFeatureSetSpecAck.newBuilder() + .setSpecsStreamingUpdateConfig(specsStreamingUpdateConfig) + .build()); + + // specs' history is being compacted on the initial read + publishSpecToKafka("project", "fs", 1, source); + publishSpecToKafka("project", "fs", 2, source); + publishSpecToKafka("project", "fs", 3, source); + publishSpecToKafka("project", "fs_2", 2, source); + + PipelineResult run = p.run(); + run.waitUntilFinish(Duration.standardSeconds(10)); + + List acks = getFeatureSetSpecAcks(); + + assertThat( + acks, + hasItem( + IngestionJobProto.FeatureSetSpecAck.newBuilder() + .setJobName("test_job") + .setFeatureSetVersion(3) + .setFeatureSetReference("project/fs") + .build())); + assertThat( + acks, + hasItem( + IngestionJobProto.FeatureSetSpecAck.newBuilder() + .setJobName("test_job") + .setFeatureSetVersion(2) + .setFeatureSetReference("project/fs_2") + .build())); + + // in-flight update 1 + publishSpecToKafka("project", "fs", 4, source); + + run.waitUntilFinish(Duration.standardSeconds(5)); + + assertThat( + getFeatureSetSpecAcks(), + hasItem( + IngestionJobProto.FeatureSetSpecAck.newBuilder() + .setJobName("test_job") + .setFeatureSetVersion(4) + .setFeatureSetReference("project/fs") + .build())); + + // in-flight update 2 + publishSpecToKafka("project", "fs_2", 3, source); + + run.waitUntilFinish(Duration.standardSeconds(5)); + + assertThat( + getFeatureSetSpecAcks(), + hasItem( + IngestionJobProto.FeatureSetSpecAck.newBuilder() + .setJobName("test_job") + .setFeatureSetVersion(3) + .setFeatureSetReference("project/fs_2") + .build())); + } + + private List getFeatureSetSpecAcks() { + ConsumerRecords consumerRecords = + consumer.poll(java.time.Duration.ofSeconds(KAFKA_POLL_TIMEOUT_SEC)); + + return Lists.newArrayList(consumerRecords.records(KAFKA_SPECS_ACK_TOPIC)).stream() + .map(ConsumerRecord::value) + .collect(Collectors.toList()); + } + + private void publishSpecToKafka( + String project, String name, int version, SourceProto.Source source) { + FeatureSetProto.FeatureSetSpec spec = + FeatureSetProto.FeatureSetSpec.newBuilder() + .setProject(project) + .setName(name) + .setVersion(version) + .setSource(source) + .build(); + + TestUtil.publishToKafka( + KAFKA_BOOTSTRAP_SERVERS, + KAFKA_SPECS_TOPIC, + ImmutableList.of(Pair.of(getFeatureSetReference(spec), spec)), + ByteArraySerializer.class, + KAFKA_PUBLISH_TIMEOUT_SEC); + } + + public static class AckMessageDeserializer + implements Deserializer { + + @Override + public IngestionJobProto.FeatureSetSpecAck deserialize(String topic, byte[] data) { + try { + return IngestionJobProto.FeatureSetSpecAck.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + return null; + } + } + } +} diff --git a/ingestion/src/test/java/feast/ingestion/transform/specs/FilterRelevantTest.java b/ingestion/src/test/java/feast/ingestion/transform/specs/FilterRelevantTest.java new file mode 100644 index 0000000000..33736dad41 --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/transform/specs/FilterRelevantTest.java @@ -0,0 +1,109 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.transform.specs; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import feast.proto.core.FeatureSetProto; +import feast.proto.core.SourceProto; +import feast.proto.core.StoreProto; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Filter; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; + +public class FilterRelevantTest { + @Rule public transient TestPipeline p = TestPipeline.create(); + + @Test + public void onlySpecsThatMatchStoreSubscriptionShouldPass() { + SourceProto.Source source = + SourceProto.Source.newBuilder() + .setKafkaSourceConfig( + SourceProto.KafkaSourceConfig.newBuilder() + .setBootstrapServers("localhost") + .setTopic("topic") + .build()) + .build(); + + StoreProto.Store store1 = + StoreProto.Store.newBuilder() + .addSubscriptions( + StoreProto.Store.Subscription.newBuilder() + .setProject("project") + .setName("fs*") + .build()) + .build(); + + StoreProto.Store store2 = + StoreProto.Store.newBuilder() + .addSubscriptions( + StoreProto.Store.Subscription.newBuilder() + .setProject("project_2") + .setName("fs_1") + .build()) + .build(); + + SourceProto.Source irrelevantSource = + SourceProto.Source.newBuilder() + .setKafkaSourceConfig( + SourceProto.KafkaSourceConfig.newBuilder() + .setBootstrapServers("localhost") + .setTopic("other_topic") + .build()) + .build(); + + PCollection> filtered = + p.apply( + Create.of( + ImmutableMap.of( + "project/fs_1", makeSpecBuilder(source).build(), // pass + "project/fs_2", + makeSpecBuilder(irrelevantSource).build(), // different source + "invalid/fs_3", + makeSpecBuilder(source) + .setProject("invalid") + .build(), // invalid project + "project/invalid", + makeSpecBuilder(source).setName("invalid").build(), // invalid name + "project_2/fs_1", + makeSpecBuilder(source).setProject("project_2").build() // pass + ))) + .apply(Filter.by(new FilterRelevantFunction(source, ImmutableList.of(store1, store2)))); + + PAssert.that(filtered) + .containsInAnyOrder( + KV.of("project/fs_1", makeSpecBuilder(source).build()), + KV.of("project_2/fs_1", makeSpecBuilder(source).setProject("project_2").build())); + + PAssert.that(filtered.apply(Count.globally())).containsInAnyOrder(2L); + + p.run(); + } + + private FeatureSetProto.FeatureSetSpec.Builder makeSpecBuilder(SourceProto.Source source) { + return FeatureSetProto.FeatureSetSpec.newBuilder() + .setProject("project") + .setName("fs_1") + .setSource(source); + } +} diff --git a/ingestion/src/test/java/feast/test/TestUtil.java b/ingestion/src/test/java/feast/test/TestUtil.java index 3204d93bcd..91a8b9ba7e 100644 --- a/ingestion/src/test/java/feast/test/TestUtil.java +++ b/ingestion/src/test/java/feast/test/TestUtil.java @@ -18,7 +18,10 @@ import static feast.ingestion.utils.SpecUtil.getFeatureSetReference; +import com.google.common.collect.ImmutableList; +import com.google.common.io.Files; import com.google.protobuf.ByteString; +import com.google.protobuf.Message; import com.google.protobuf.util.Timestamps; import feast.ingestion.transform.metrics.WriteSuccessMetricsTransform; import feast.proto.core.FeatureSetProto.FeatureSet; @@ -43,15 +46,20 @@ import java.util.concurrent.TimeoutException; import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.metrics.MetricResult; import org.apache.beam.sdk.metrics.MetricResults; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.zookeeper.server.ServerConfig; import org.apache.zookeeper.server.ZooKeeperServerMain; import org.joda.time.Duration; @@ -113,16 +121,19 @@ public static void start( kafkaProp.put("zookeeper.connect", zookeeperHost + ":" + zookeeperPort); kafkaProp.put("host.name", kafkaHost); kafkaProp.put("port", kafkaPort); + kafkaProp.put("log.dir", Files.createTempDir().getAbsolutePath()); kafkaProp.put("offsets.topic.replication.factor", kafkaReplicationFactor); KafkaConfig kafkaConfig = new KafkaConfig(kafkaProp); server = new KafkaServerStartable(kafkaConfig); new Thread(server::startup).start(); + Thread.sleep(5000); } public static void stop() { if (server != null) { try { server.shutdown(); + LocalZookeeper.stop(); } catch (Exception e) { e.printStackTrace(); } @@ -139,23 +150,24 @@ public static void stop() { * @param valueSerializer in Feast this valueSerializer should be "ByteArraySerializer.class" * @param publishTimeoutSec duration to wait for publish operation (of each message) to succeed */ - public static void publishFeatureRowsToKafka( + public static void publishToKafka( String bootstrapServers, String topic, - List messages, + List> messages, Class valueSerializer, long publishTimeoutSec) { - Long defaultKey = 1L; + Properties prop = new Properties(); - prop.put("bootstrap.servers", bootstrapServers); - prop.put("key.serializer", LongSerializer.class); - prop.put("value.serializer", valueSerializer); - Producer producer = new KafkaProducer<>(prop); + prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); + Producer producer = new KafkaProducer<>(prop); messages.forEach( featureRow -> { - ProducerRecord record = - new ProducerRecord<>(topic, defaultKey, featureRow.toByteArray()); + ProducerRecord record = + new ProducerRecord<>( + topic, featureRow.getLeft(), featureRow.getRight().toByteArray()); try { producer.send(record).get(publishTimeoutSec, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { @@ -164,6 +176,22 @@ public static void publishFeatureRowsToKafka( }); } + public static KafkaConsumer makeKafkaConsumer( + String bootstrapServers, String topic, Class valueDeserializer) { + Properties prop = new Properties(); + prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer); + prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + prop.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); + + KafkaConsumer consumer = new KafkaConsumer<>(prop); + + consumer.subscribe(ImmutableList.of(topic)); + return consumer; + } + /** * Create a Feature Row with random value according to the FeatureSetSpec * @@ -315,20 +343,26 @@ public static RedisKey createRedisKey(FeatureSetSpec featureSetSpec, FeatureRow } private static class LocalZookeeper { + public static Thread thread; static void start(int zookeeperPort, String zookeeperDataDir) { - final ZooKeeperServerMain zookeeper = new ZooKeeperServerMain(); + ZooKeeperServerMain zookeeper = new ZooKeeperServerMain(); final ServerConfig serverConfig = new ServerConfig(); serverConfig.parse(new String[] {String.valueOf(zookeeperPort), zookeeperDataDir}); - new Thread( + thread = + new Thread( () -> { try { zookeeper.runFromConfig(serverConfig); - } catch (IOException e) { + } catch (Exception e) { e.printStackTrace(); } - }) - .start(); + }); + thread.start(); + } + + static void stop() { + thread.interrupt(); } } diff --git a/protos/feast/core/FeatureSet.proto b/protos/feast/core/FeatureSet.proto index b0b15276bc..de22388ce7 100644 --- a/protos/feast/core/FeatureSet.proto +++ b/protos/feast/core/FeatureSet.proto @@ -63,6 +63,10 @@ message FeatureSetSpec { // User defined metadata map labels = 8; + + // Read-only self-incrementing version that increases monotonically + // when changes are made to a feature set + int32 version = 9; } message EntitySpec { @@ -150,3 +154,8 @@ enum FeatureSetStatus { STATUS_JOB_STARTING = 3; STATUS_READY = 2; } + +enum FeatureSetJobDeliveryStatus { + STATUS_IN_PROGRESS = 0; + STATUS_DELIVERED = 1; +} \ No newline at end of file diff --git a/protos/feast/core/IngestionJob.proto b/protos/feast/core/IngestionJob.proto index c63e573ffe..0ce0f9681c 100644 --- a/protos/feast/core/IngestionJob.proto +++ b/protos/feast/core/IngestionJob.proto @@ -63,3 +63,18 @@ enum IngestionJobStatus { // job has been suspended SUSPENDED = 8; } + +// Config for bi-directional communication channel between Core Service and Ingestion Job +message SpecsStreamingUpdateConfig { + // out-channel for publishing new FeatureSetSpecs (by Core). + // IngestionJob use it as source of existing FeatureSetSpecs and new real-time updates + feast.core.KafkaSourceConfig source = 1; + // ack-channel for sending acknowledgments when new FeatureSetSpecs is installed in Job + feast.core.KafkaSourceConfig ack = 2; +} + +message FeatureSetSpecAck { + string feature_set_reference = 1; + int32 feature_set_version = 2; + string job_name = 3; +} \ No newline at end of file