Skip to content

Commit

Permalink
FeatureSets are delivered to Ingestion Job through Kafka (#792)
Browse files Browse the repository at this point in the history
* intermediate table in m2m Job <-> FeatureSet with delivery status + FeatureSet version

switch to spring-kafka (configs)

specService send message to kafka & expect ack & update status accordingly

jobs runner to send source & specs config (source + ack)

ingestion job to read specs from kafka and send ack

return featureSets in ingestionJob

generate uniq topic name for each test run

prevent listJobs from failing when job failed on start

* fix listJobs with filter
  • Loading branch information
Oleksii Moskalenko authored Jun 17, 2020
1 parent f56e5f2 commit 6137102
Show file tree
Hide file tree
Showing 46 changed files with 1,981 additions and 420 deletions.
4 changes: 2 additions & 2 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<!--compileOnly 'org.projectlombok:lombok:1.18.2'-->
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/java/feast/core/config/FeastProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ public static class StreamProperties {
/* Feature stream options */
@NotNull private FeatureStreamOptions options;

/* FeatureSetSpec stream options - communication channel between SpecService and IngestionJob
* to update Spec inside job w/o restart */
@NotNull private FeatureSetSpecStreamProperties specsOptions;

/** Feature stream options */
@Getter
@Setter
Expand All @@ -159,6 +163,16 @@ public static class FeatureStreamOptions {
/* Number of Kafka partitions to to use for managed feature stream. */
@Positive private int partitions = 1;
}

@Getter
@Setter
public static class FeatureSetSpecStreamProperties {
/* Kafka topic to send feature set spec to ingestion streaming job */
@NotBlank private String specsTopic = "feast-feature-set-specs";

/* Kafka topic to receive acknowledgment from ingestion job on successful processing of new specs */
@NotBlank private String specsAckTopic = "feast-feature-set-specs-ack";
}
}

/** Feast population job metrics */
Expand Down
127 changes: 96 additions & 31 deletions core/src/main/java/feast/core/config/FeatureStreamConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,119 @@
*/
package feast.core.config;

import com.google.common.base.Strings;
import feast.core.config.FeastProperties.StreamProperties;
import feast.core.model.Source;
import feast.core.util.KafkaSerialization;
import feast.proto.core.FeatureSetProto;
import feast.proto.core.IngestionJobProto;
import feast.proto.core.SourceProto.KafkaSourceConfig;
import feast.proto.core.SourceProto.SourceType;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.*;

@Slf4j
@Configuration
public class FeatureStreamConfig {

String DEFAULT_KAFKA_REQUEST_TIMEOUT_MS_CONFIG = "15000";
int DEFAULT_SPECS_TOPIC_PARTITIONING = 1;
short DEFAULT_SPECS_TOPIC_REPLICATION = 3;

@Bean
public KafkaAdmin admin(FeastProperties feastProperties) {
String bootstrapServers = feastProperties.getStream().getOptions().getBootstrapServers();

Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, DEFAULT_KAFKA_REQUEST_TIMEOUT_MS_CONFIG);
return new KafkaAdmin(configs);
}

@Bean
public NewTopic featureRowsTopic(FeastProperties feastProperties) {
StreamProperties streamProperties = feastProperties.getStream();

return new NewTopic(
streamProperties.getOptions().getTopic(),
streamProperties.getOptions().getPartitions(),
streamProperties.getOptions().getReplicationFactor());
}

@Bean
public NewTopic featureSetSpecsTopic(FeastProperties feastProperties) {
StreamProperties streamProperties = feastProperties.getStream();
Map<String, String> configs = new HashMap<>();
configs.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);

NewTopic topic =
new NewTopic(
streamProperties.getSpecsOptions().getSpecsTopic(),
DEFAULT_SPECS_TOPIC_PARTITIONING,
DEFAULT_SPECS_TOPIC_REPLICATION);

topic.configs(configs);
return topic;
}

@Bean
public NewTopic featureSetSpecsAckTopic(FeastProperties feastProperties) {
StreamProperties streamProperties = feastProperties.getStream();

return new NewTopic(
streamProperties.getSpecsOptions().getSpecsAckTopic(),
DEFAULT_SPECS_TOPIC_PARTITIONING,
(short) 1);
}

@Bean
public KafkaTemplate<String, FeatureSetProto.FeatureSetSpec> specKafkaTemplate(
FeastProperties feastProperties) {
StreamProperties streamProperties = feastProperties.getStream();
Map<String, Object> props = new HashMap<>();

props.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
streamProperties.getOptions().getBootstrapServers());

KafkaTemplate<String, FeatureSetProto.FeatureSetSpec> t =
new KafkaTemplate<>(
new DefaultKafkaProducerFactory<>(
props, new StringSerializer(), new KafkaSerialization.ProtoSerializer<>()));
t.setDefaultTopic(streamProperties.getSpecsOptions().getSpecsTopic());
return t;
}

@Bean
public ConsumerFactory<?, ?> ackConsumerFactory(FeastProperties feastProperties) {
StreamProperties streamProperties = feastProperties.getStream();
Map<String, Object> props = new HashMap<>();

props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
streamProperties.getOptions().getBootstrapServers());
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
String.format(
"core-service-%s", FeatureStreamConfig.class.getPackage().getImplementationVersion()));

return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new KafkaSerialization.ProtoDeserializer<>(IngestionJobProto.FeatureSetSpecAck.parser()));
}

@Autowired
@Bean
Expand All @@ -50,31 +139,7 @@ public Source getDefaultSource(FeastProperties feastProperties) {
case KAFKA:
String bootstrapServers = streamProperties.getOptions().getBootstrapServers();
String topicName = streamProperties.getOptions().getTopic();
Map<String, Object> map = new HashMap<>();
map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
map.put(
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, DEFAULT_KAFKA_REQUEST_TIMEOUT_MS_CONFIG);
AdminClient client = AdminClient.create(map);

NewTopic newTopic =
new NewTopic(
topicName,
streamProperties.getOptions().getPartitions(),
streamProperties.getOptions().getReplicationFactor());
CreateTopicsResult createTopicsResult =
client.createTopics(Collections.singleton(newTopic));
try {
createTopicsResult.values().get(topicName).get();
} catch (InterruptedException | ExecutionException e) {
if (e.getCause().getClass().equals(TopicExistsException.class)) {
log.warn(
Strings.lenientFormat(
"Unable to create topic %s in the feature stream, topic already exists, using existing topic.",
topicName));
} else {
throw new RuntimeException(e.getMessage(), e);
}
}

KafkaSourceConfig sourceConfig =
KafkaSourceConfig.newBuilder()
.setBootstrapServers(bootstrapServers)
Expand Down
38 changes: 35 additions & 3 deletions core/src/main/java/feast/core/config/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import feast.core.job.dataflow.DataflowJobManager;
import feast.core.job.direct.DirectJobRegistry;
import feast.core.job.direct.DirectRunnerJobManager;
import feast.proto.core.IngestionJobProto;
import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions;
import feast.proto.core.RunnerProto.DirectRunnerConfigOptions;
import feast.proto.core.SourceProto;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -38,14 +40,40 @@
public class JobConfig {
private final Gson gson = new Gson();

/**
* Create SpecsStreamingUpdateConfig, which is used to set up communications (bi-directional
* channel) to send new FeatureSetSpec to IngestionJob and receive acknowledgments.
*
* @param feastProperties feast config properties
*/
@Bean
public IngestionJobProto.SpecsStreamingUpdateConfig createSpecsStreamingUpdateConfig(
FeastProperties feastProperties) {
FeastProperties.StreamProperties streamProperties = feastProperties.getStream();

return IngestionJobProto.SpecsStreamingUpdateConfig.newBuilder()
.setSource(
SourceProto.KafkaSourceConfig.newBuilder()
.setBootstrapServers(streamProperties.getOptions().getBootstrapServers())
.setTopic(streamProperties.getSpecsOptions().getSpecsTopic())
.build())
.setAck(
SourceProto.KafkaSourceConfig.newBuilder()
.setBootstrapServers(streamProperties.getOptions().getBootstrapServers())
.setTopic(streamProperties.getSpecsOptions().getSpecsAckTopic()))
.build();
}

/**
* Get a JobManager according to the runner type and Dataflow configuration.
*
* @param feastProperties feast config properties
*/
@Bean
@Autowired
public JobManager getJobManager(FeastProperties feastProperties)
public JobManager getJobManager(
FeastProperties feastProperties,
IngestionJobProto.SpecsStreamingUpdateConfig specsStreamingUpdateConfig)
throws InvalidProtocolBufferException {

JobProperties jobProperties = feastProperties.getJobs();
Expand All @@ -60,13 +88,17 @@ public JobManager getJobManager(FeastProperties feastProperties)
DataflowRunnerConfigOptions.Builder dataflowRunnerConfigOptions =
DataflowRunnerConfigOptions.newBuilder();
JsonFormat.parser().merge(configJson, dataflowRunnerConfigOptions);
return new DataflowJobManager(dataflowRunnerConfigOptions.build(), metrics);
return new DataflowJobManager(
dataflowRunnerConfigOptions.build(), metrics, specsStreamingUpdateConfig);
case DIRECT:
DirectRunnerConfigOptions.Builder directRunnerConfigOptions =
DirectRunnerConfigOptions.newBuilder();
JsonFormat.parser().merge(configJson, directRunnerConfigOptions);
return new DirectRunnerJobManager(
directRunnerConfigOptions.build(), new DirectJobRegistry(), metrics);
directRunnerConfigOptions.build(),
new DirectJobRegistry(),
metrics,
specsStreamingUpdateConfig);
default:
throw new IllegalArgumentException("Unsupported runner: " + runner);
}
Expand Down
8 changes: 2 additions & 6 deletions core/src/main/java/feast/core/dao/JobRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,20 @@
*/
package feast.core.dao;

import feast.core.model.FeatureSet;
import feast.core.model.FeatureSetJobStatus;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import java.util.Collection;
import java.util.List;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

/** JPA repository supplying Job objects keyed by ID. */
@Repository
public interface JobRepository extends JpaRepository<Job, String> {
List<Job> findByStatusNotIn(Collection<JobStatus> statuses);

List<Job> findBySourceIdAndStoreNameOrderByLastUpdatedDesc(String sourceId, String storeName);

// find jobs by feast store name
List<Job> findByStoreName(String storeName);

// find jobs by featureset
List<Job> findByFeatureSetsIn(List<FeatureSet> featureSets);
List<Job> findByFeatureSetJobStatusesIn(List<FeatureSetJobStatus> featureSetsJobStatuses);
}
43 changes: 25 additions & 18 deletions core/src/main/java/feast/core/job/JobUpdateTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,8 @@
import feast.core.log.Action;
import feast.core.log.AuditLogger;
import feast.core.log.Resource;
import feast.core.model.FeatureSet;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.model.Source;
import feast.core.model.Store;
import feast.proto.core.FeatureSetProto.FeatureSetStatus;
import feast.core.model.*;
import feast.proto.core.FeatureSetProto;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -103,16 +99,12 @@ public Job call() {
}

boolean requiresUpdate(Job job) {
// If set of feature sets has changed
if (!Sets.newHashSet(featureSets).equals(Sets.newHashSet(job.getFeatureSets()))) {
// if store subscriptions have changed
if (!Sets.newHashSet(store.getSubscriptions())
.equals(Sets.newHashSet(job.getStore().getSubscriptions()))) {
return true;
}
// If any of the incoming feature sets were updated
for (FeatureSet featureSet : featureSets) {
if (featureSet.getStatus() == FeatureSetStatus.STATUS_PENDING) {
return true;
}
}

return false;
}

Expand All @@ -123,10 +115,15 @@ private Job createJob() {

/** Start or update the job to ingest data to the sink. */
private Job startJob(String jobId) {
Job job = new Job();
job.setId(jobId);
job.setRunner(jobManager.getRunnerType());
job.setSource(source);
job.setStore(store);
job.setStatus(JobStatus.PENDING);

updateFeatureSets(job);

Job job =
new Job(
jobId, "", jobManager.getRunnerType(), source, store, featureSets, JobStatus.PENDING);
try {
logAudit(Action.SUBMIT, job, "Building graph and submitting to %s", runnerName);

Expand All @@ -151,9 +148,19 @@ private Job startJob(String jobId) {
}
}

private void updateFeatureSets(Job job) {
for (FeatureSet fs : featureSets) {
FeatureSetJobStatus status = new FeatureSetJobStatus();
status.setFeatureSet(fs);
status.setJob(job);
status.setDeliveryStatus(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS);
job.getFeatureSetJobStatuses().add(status);
}
}

/** Update the given job */
private Job updateJob(Job job) {
job.setFeatureSets(featureSets);
updateFeatureSets(job);
job.setStore(store);
logAudit(Action.UPDATE, job, "Updating job %s for runner %s", job.getId(), runnerName);
return jobManager.updateJob(job);
Expand Down
Loading

0 comments on commit 6137102

Please sign in to comment.