diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index ffa7a73fe6481..9ef9c7f6fe3d7 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -265,6 +265,7 @@ + diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteShareGroupStateRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteShareGroupStateRequest.java index 1b36227a70ef0..c7ab7ad01956b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/WriteShareGroupStateRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteShareGroupStateRequest.java @@ -34,7 +34,7 @@ public static class Builder extends AbstractRequest.Builder metadataCache.getAliveBrokerNodes(config.interBrokerListenerName).asJava, Time.SYSTEM) persister.configure(new PersisterConfig(persisterStateManager)) - val serde = new RecordSerde - - groupCoordinator = createGroupCoordinator(serde, persister) - shareCoordinator = createShareCoordinator(serde) + groupCoordinator = createGroupCoordinator(persister) + shareCoordinator = createShareCoordinator() val producerIdManagerSupplier = () => ProducerIdManager.rpc( config.brokerId, @@ -580,7 +578,7 @@ class BrokerServer( } } - private def createGroupCoordinator(serde: RecordSerde, persister: Persister): GroupCoordinator = { //todo smjn: pass persister to group coord + private def createGroupCoordinator(persister: Persister): GroupCoordinator = { //todo smjn: pass persister to group coord // Create group coordinator, but don't start it until we've started replica manager. // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good // to fix the underlying issue. @@ -609,6 +607,7 @@ class BrokerServer( "group-coordinator-reaper", new SystemTimer("group-coordinator") ) + val serde = new RecordSerde val loader = new CoordinatorLoaderImpl[group.Record]( time, replicaManager, @@ -639,7 +638,7 @@ class BrokerServer( } } - private def createShareCoordinator(serde: RecordSerde): ShareCoordinator = { + private def createShareCoordinator(): ShareCoordinator = { if (!config.isShareGroupEnabled) { return null } @@ -654,6 +653,7 @@ class BrokerServer( "share-coordinator-reaper", new SystemTimer("share-coordinator") ) + val serde = new ShareRecordSerde val loader = new CoordinatorLoaderImpl[group.Record]( time, replicaManager, diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 30288eb3838f0..28809aac1011c 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -265,6 +265,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.SHARE_GROUP_HEARTBEAT => handleShareGroupHeartbeat(request).exceptionally(handleError) case ApiKeys.SHARE_GROUP_DESCRIBE => handleShareGroupDescribe(request).exceptionally(handleError) case ApiKeys.SHARE_ACKNOWLEDGE => handleShareAcknowledgeRequest(request) + case ApiKeys.WRITE_SHARE_GROUP_STATE => handleShareGroupStateWrite(request) case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") } } catch { @@ -4800,6 +4801,13 @@ class KafkaApis(val requestChannel: RequestChannel, } } + private def handleShareGroupStateWrite(request: RequestChannel.Request): Unit = { + val writeShareRequest = request.body[WriteShareGroupStateRequest] + //todo smjn: add auth check for group, topic + val writeShareData = shareCoordinator.writeState(request.context, writeShareRequest.data).get() + requestHelper.sendMaybeThrottle(request, new WriteShareGroupStateResponse(writeShareData)) + } + private def updateRecordConversionStats(request: RequestChannel.Request, tp: TopicPartition, conversionStats: RecordValidationStats): Unit = { diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index 1360c73a38fa5..5609f4b60cee3 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -179,6 +179,17 @@ class BrokerMetadataPublisher( s"coordinator with local changes in $deltaName", t) } //todo smjn: add code for share coord + try { + updateCoordinator(newImage, + delta, + Topic.SHARE_GROUP_STATE_TOPIC_NAME, + shareCoordinator.onElection, + (partitionIndex, leaderEpochOpt) => shareCoordinator.onResignation(partitionIndex, toOptionalInt(leaderEpochOpt)) + ) + } catch { + case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating share " + + s"coordinator with local changes in $deltaName", t) + } try { // Notify the group coordinator about deleted topics. val deletedTopicPartitions = new mutable.ArrayBuffer[TopicPartition]() diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java index 402aa4c2cd57a..e98fb66ba6ed7 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java @@ -41,7 +41,10 @@ import org.apache.kafka.coordinator.group.classic.ClassicGroup; import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ShareSnapshotKey; +import org.apache.kafka.coordinator.group.generated.ShareSnapshotValue; import org.apache.kafka.coordinator.group.share.ShareGroupMember; +import org.apache.kafka.coordinator.group.share.ShareGroupOffset; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.MetadataVersion; @@ -641,6 +644,29 @@ public static Record newShareMemberSubscriptionTombstoneRecord( ); } + public static Record newShareSnapshotRecord(String groupId, Uuid topicId, int partitionId, ShareGroupOffset offsetData) { + return new Record( + new ApiMessageAndVersion(new ShareSnapshotKey() + .setGroupId(groupId) + .setTopicId(topicId) + .setPartition(partitionId), + (short) 0), + new ApiMessageAndVersion(new ShareSnapshotValue() + .setSnapshotEpoch(offsetData.snapshotEpoch) + .setStateEpoch(offsetData.stateEpoch) + .setLeaderEpoch(offsetData.leaderEpoch) + .setStartOffset(offsetData.startOffset) + .setStateBatches(offsetData.stateBatches.stream() + .map(batch -> new ShareSnapshotValue.StateBatch() + .setFirstOffset(batch.firstOffset()) + .setLastOffset(batch.lastOffset()) + .setDeliveryCount(batch.deliveryCount()) + .setDeliveryState(batch.deliveryState())) + .collect(Collectors.toList())), + (short) 0) + ); + } + private static List toTopicPartitions( Map> topicPartitions ) { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java index e3f3ca63c71af..0619b1a9eb3bd 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java @@ -121,7 +121,7 @@ private void readMessage(ApiMessage message, ByteBuffer buffer, short version, S } } - private ApiMessage apiMessageKeyFor(short recordType) { + protected ApiMessage apiMessageKeyFor(short recordType) { switch (recordType) { case 0: case 1: @@ -147,7 +147,7 @@ private ApiMessage apiMessageKeyFor(short recordType) { } } - private ApiMessage apiMessageValueFor(short recordType) { + protected ApiMessage apiMessageValueFor(short recordType) { switch (recordType) { case 0: case 1: diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/share/GroupTopicPartitionLeader.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/share/GroupTopicPartitionLeader.java new file mode 100644 index 0000000000000..2aacb080b591c --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/share/GroupTopicPartitionLeader.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.share; + +import org.apache.kafka.common.Uuid; + +public class GroupTopicPartitionLeader { + public final String groupId; + public final Uuid topicId; + public final int partition; + public final int leaderEpoch; + + public GroupTopicPartitionLeader(String groupId, Uuid topicId, int partition, int leaderEpoch) { + this.groupId = groupId; + this.topicId = topicId; + this.partition = partition; + this.leaderEpoch = leaderEpoch; + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/share/ShareCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/share/ShareCoordinator.java index 80b234bc01903..ce3383b6c46f7 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/share/ShareCoordinator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/share/ShareCoordinator.java @@ -18,8 +18,13 @@ package org.apache.kafka.coordinator.group.share; import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.message.WriteShareGroupStateRequestData; +import org.apache.kafka.common.message.WriteShareGroupStateResponseData; +import org.apache.kafka.common.requests.RequestContext; +import java.util.OptionalInt; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import java.util.function.IntSupplier; @InterfaceStability.Evolving @@ -51,4 +56,10 @@ public interface ShareCoordinator { * Stop the share coordinator */ void shutdown(); + + CompletableFuture writeState(RequestContext context, WriteShareGroupStateRequestData request); + + void onElection(int partitionIndex, int partitionLeaderEpoch); + + void onResignation(int partitionIndex, OptionalInt partitionLeaderEpoch); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/share/ShareCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/share/ShareCoordinatorService.java index 07038a1e764ef..dd10b2c3c7327 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/share/ShareCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/share/ShareCoordinatorService.java @@ -17,7 +17,13 @@ package org.apache.kafka.coordinator.group.share; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.WriteShareGroupStateRequestData; +import org.apache.kafka.common.message.WriteShareGroupStateResponseData; +import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -29,14 +35,23 @@ import org.apache.kafka.coordinator.group.runtime.CoordinatorShardBuilderSupplier; import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor; import org.apache.kafka.coordinator.group.runtime.PartitionWriter; +import org.apache.kafka.server.group.share.ShareGroupHelper; import org.apache.kafka.server.record.BrokerCompressionType; import org.apache.kafka.server.util.timer.Timer; import org.slf4j.Logger; import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.OptionalInt; import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.IntSupplier; +import java.util.stream.Collectors; public class ShareCoordinatorService implements ShareCoordinator { private final ShareCoordinatorConfig config; @@ -208,4 +223,86 @@ public void shutdown() { Utils.closeQuietly(shareCoordinatorMetrics, "share coordinator metrics"); log.info("Shutdown complete."); } + + @Override + public CompletableFuture writeState(RequestContext context, WriteShareGroupStateRequestData request) { + log.info("ShareCoordinatorService writeState request received - {}", request); + String groupId = request.groupId(); + Map>> futureMap = new HashMap<>(); + +// The request received here could have multiple keys of structure group:topic:partition. However, +// the writeState method in ShareCoordinatorShard expects a single key in the request. Hence, we will +// be looping over the keys below and constructing new WriteShareGroupStateRequestData objects to pass +// onto the shard method. + request.topics().forEach(topicData -> { + Map> partitionFut = + futureMap.computeIfAbsent(topicData.topicId(), k -> new HashMap<>()); + topicData.partitions().forEach( + partitionData -> partitionFut.put(partitionData.partition(), runtime.scheduleWriteOperation( + "write-share-group-state", + topicPartitionFor(ShareGroupHelper.coordinatorKey(groupId, topicData.topicId(), partitionData.partition())), + Duration.ofMillis(config.writeTimeoutMs), + coordinator -> coordinator.writeState(context, new WriteShareGroupStateRequestData() + .setGroupId(groupId) + .setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData() + .setTopicId(topicData.topicId()) + .setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData() + .setPartition(partitionData.partition()) + .setStartOffset(partitionData.startOffset()) + .setLeaderEpoch(partitionData.leaderEpoch()) + .setStateEpoch(partitionData.stateEpoch()) + .setStateBatches(partitionData.stateBatches())))))))) + ); + }); + + + // Combine all futures into a single CompletableFuture + CompletableFuture combinedFuture = CompletableFuture.allOf(futureMap.values().stream() + .flatMap(partMap -> partMap.values().stream()).toArray(CompletableFuture[]::new)); + + return combinedFuture.thenApply(v -> { + List writeStateResults = futureMap.keySet().stream() + .map(topicId -> { + List partitionResults = futureMap.get(topicId).values().stream() + .map(future -> { + try { + WriteShareGroupStateResponseData partitionData = future.get(); + // error check if the partitionData results contains only 1 row (corresponding to topicId) + return partitionData.results().get(0).partitions(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }) + .flatMap(List::stream) + .collect(Collectors.toList()); + + return new WriteShareGroupStateResponseData.WriteStateResult() + .setTopicId(topicId) + .setPartitions(partitionResults); + }) + .collect(Collectors.toList()); + return new WriteShareGroupStateResponseData() + .setResults(writeStateResults); + }); + } + + @Override + public void onElection(int partitionIndex, int partitionLeaderEpoch) { + runtime.scheduleLoadOperation( + new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, partitionIndex), + partitionLeaderEpoch + ); + } + + @Override + public void onResignation(int partitionIndex, OptionalInt partitionLeaderEpoch) { + runtime.scheduleUnloadOperation( + new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, partitionIndex), + partitionLeaderEpoch + ); + } + + private TopicPartition topicPartitionFor(String key) { + return new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, partitionFor(key)); + } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/share/ShareCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/share/ShareCoordinatorShard.java index b1790af470c3b..438166d6a0400 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/share/ShareCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/share/ShareCoordinatorShard.java @@ -18,20 +18,41 @@ package org.apache.kafka.coordinator.group.share; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.WriteShareGroupStateRequestData; +import org.apache.kafka.common.message.WriteShareGroupStateResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.coordinator.group.Record; +import org.apache.kafka.coordinator.group.RecordHelpers; +import org.apache.kafka.coordinator.group.Utils; +import org.apache.kafka.coordinator.group.generated.ShareSnapshotKey; +import org.apache.kafka.coordinator.group.generated.ShareSnapshotValue; +import org.apache.kafka.coordinator.group.generated.ShareUpdateKey; +import org.apache.kafka.coordinator.group.generated.ShareUpdateValue; import org.apache.kafka.coordinator.group.metrics.CoordinatorMetrics; import org.apache.kafka.coordinator.group.metrics.CoordinatorMetricsShard; +import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; import org.apache.kafka.coordinator.group.runtime.CoordinatorShard; import org.apache.kafka.coordinator.group.runtime.CoordinatorShardBuilder; import org.apache.kafka.coordinator.group.runtime.CoordinatorTimer; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.group.share.ShareGroupHelper; import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; import org.slf4j.Logger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + public class ShareCoordinatorShard implements CoordinatorShard { private final Logger log; private final Time time; @@ -39,6 +60,9 @@ public class ShareCoordinatorShard implements CoordinatorShard { private final ShareCoordinatorConfig config; private final CoordinatorMetrics coordinatorMetrics; private final CoordinatorMetricsShard metricsShard; + // coord key -> ShareShap + private final TimelineHashMap shareStateMap; + private final Map leaderMap; public static class Builder implements CoordinatorShardBuilder { private ShareCoordinatorConfig config; @@ -115,7 +139,8 @@ public ShareCoordinatorShard build() { timer, config, coordinatorMetrics, - metricsShard + metricsShard, + snapshotRegistry ); } } @@ -126,7 +151,8 @@ public ShareCoordinatorShard build() { CoordinatorTimer timer, ShareCoordinatorConfig config, CoordinatorMetrics coordinatorMetrics, - CoordinatorMetricsShard metricsShard + CoordinatorMetricsShard metricsShard, + SnapshotRegistry snapshotRegistry ) { this.log = logContext.logger(ShareCoordinatorShard.class); this.time = time; @@ -134,6 +160,8 @@ public ShareCoordinatorShard build() { this.config = config; this.coordinatorMetrics = coordinatorMetrics; this.metricsShard = metricsShard; + this.shareStateMap = new TimelineHashMap<>(snapshotRegistry, 0); + this.leaderMap = new HashMap<>(); } @Override @@ -153,11 +181,104 @@ public void onUnloaded() { @Override public void replay(long offset, long producerId, short producerEpoch, Record record) throws RuntimeException { + ApiMessageAndVersion key = record.key(); + ApiMessageAndVersion value = record.value(); + switch (key.version()) { + case 0: // ShareSnapshot + handleShareSnapshot((ShareSnapshotKey) key.message(), (ShareSnapshotValue) Utils.messageOrNull(value)); + break; + case 1: // ShareUpdate + handleShareUpdate((ShareUpdateKey) key.message(), (ShareUpdateValue) Utils.messageOrNull(value)); + break; + default: + // noop + } + } + + private void handleShareSnapshot(ShareSnapshotKey key, ShareSnapshotValue value) { + String mapKey = ShareGroupHelper.coordinatorKey(key.groupId(), key.topicId(), key.partition()); + Integer oldValue = leaderMap.get(mapKey); + if (oldValue == null) { + leaderMap.put(mapKey, value.leaderEpoch()); + } else if (oldValue < value.leaderEpoch()) { + leaderMap.put(mapKey, value.leaderEpoch()); + } + shareStateMap.put(mapKey, value); + } + + private void handleShareUpdate(ShareUpdateKey key, ShareUpdateValue value) { + // update internal hashmaps } @Override public void replayEndTransactionMarker(long producerId, short producerEpoch, TransactionResult result) throws RuntimeException { CoordinatorShard.super.replayEndTransactionMarker(producerId, producerEpoch, result); } + + /** + * This method as called by the ShareCoordinatorService will be provided with + * the request data which covers only key i.e. group1:topic1:partition1. The implementation + * below was done keeping this in mind. + * @param context - RequestContext + * @param request - WriteShareGroupStateRequestData for a single key + * @return CoordinatorResult(records, response) + */ + public CoordinatorResult writeState(RequestContext context, WriteShareGroupStateRequestData request) { + log.info("shard writeState request received - {}", request); + WriteShareGroupStateResponseData responseData = new WriteShareGroupStateResponseData(); + // records to write (with both key and value of snapshot type), response to caller + String groupId = request.groupId(); + final Map requestKeys = new HashMap<>(); + List recordList = request.topics().stream() + .map(topicData -> topicData.partitions().stream() + .map(partitionData -> { + requestKeys.put(ShareGroupHelper.coordinatorKey(groupId, topicData.topicId(), partitionData.partition()), + new GroupTopicPartitionLeader(groupId, topicData.topicId(), partitionData.partition(), partitionData.leaderEpoch())); + return RecordHelpers.newShareSnapshotRecord( + groupId, topicData.topicId(), partitionData.partition(), ShareGroupOffset.fromRequest(partitionData)); + }) + .collect(Collectors.toList())) + .flatMap(List::stream) + .collect(Collectors.toList()); + + for (Map.Entry entry : requestKeys.entrySet()) { // should only be 1 key + if (leaderMap.containsKey(entry.getKey()) && leaderMap.get(entry.getKey()) > entry.getValue().leaderEpoch) { + responseData.setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult() + .setTopicId(entry.getValue().topicId) + .setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult() + .setPartition(entry.getValue().partition) + .setErrorCode(Errors.FENCED_LEADER_EPOCH.code()) + .setErrorMessage(Errors.FENCED_LEADER_EPOCH.message()))))); + return new CoordinatorResult<>(Collections.emptyList(), responseData); + } + } + + List validRecords = new ArrayList<>(); + + for (Record record : recordList) { // should be single record + if (record.key().message() instanceof ShareSnapshotKey && record.value().message() instanceof ShareSnapshotValue) { + ShareSnapshotKey newKey = (ShareSnapshotKey) record.key().message(); + ShareSnapshotValue newValue = (ShareSnapshotValue) record.value().message(); + + responseData.setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult() + .setTopicId(newKey.topicId()) + .setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult() + .setPartition(newKey.partition()) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage(Errors.NONE.message()))))); + + String mapKey = ShareGroupHelper.coordinatorKey(newKey.groupId(), newKey.topicId(), newKey.partition()); + + if (shareStateMap.containsKey(mapKey)) { + ShareSnapshotValue oldValue = shareStateMap.get(mapKey); + newValue.setSnapshotEpoch(oldValue.snapshotEpoch() + 1); // increment the snapshot epoch + } + validRecords.add(record); + shareStateMap.put(mapKey, newValue); + } + } + + return new CoordinatorResult<>(validRecords, responseData); + } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/share/ShareGroupOffset.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/share/ShareGroupOffset.java new file mode 100644 index 0000000000000..1b99ef31b48b3 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/share/ShareGroupOffset.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.share; + +import org.apache.kafka.common.message.WriteShareGroupStateRequestData; +import org.apache.kafka.server.group.share.PersisterStateBatch; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +public class ShareGroupOffset { + public final int snapshotEpoch; + public final int stateEpoch; + public final int leaderEpoch; + public final long startOffset; + public final List stateBatches; + + public ShareGroupOffset(int snapshotEpoch, + int stateEpoch, + int leaderEpoch, + long startOffset, + List stateBatches) { + this.snapshotEpoch = snapshotEpoch; + this.stateEpoch = stateEpoch; + this.leaderEpoch = leaderEpoch; + this.startOffset = startOffset; + this.stateBatches = stateBatches; + } + + public static ShareGroupOffset fromRecord() { + return null; + } + + public static ShareGroupOffset fromRequest(WriteShareGroupStateRequestData.PartitionData data) { + return fromRequest(data, 0); + } + + public static ShareGroupOffset fromRequest(WriteShareGroupStateRequestData.PartitionData data, int snapshotEpoch) { + return new ShareGroupOffset(snapshotEpoch, + data.stateEpoch(), + data.leaderEpoch(), + data.startOffset(), + data.stateBatches().stream() + .map(PersisterStateBatch::from) + .collect(Collectors.toList())); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ShareGroupOffset that = (ShareGroupOffset) o; + return snapshotEpoch == that.snapshotEpoch && stateEpoch == that.stateEpoch && leaderEpoch == that.leaderEpoch && startOffset == that.startOffset && Objects.equals(stateBatches, that.stateBatches); + } + + @Override + public int hashCode() { + return Objects.hash(snapshotEpoch, stateEpoch, leaderEpoch, startOffset, stateBatches); + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/share/ShareRecordSerde.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/share/ShareRecordSerde.java new file mode 100644 index 0000000000000..794a2b7db8752 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/share/ShareRecordSerde.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.share; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.coordinator.group.RecordSerde; +import org.apache.kafka.coordinator.group.generated.ShareSnapshotKey; +import org.apache.kafka.coordinator.group.generated.ShareSnapshotValue; +import org.apache.kafka.coordinator.group.generated.ShareUpdateKey; +import org.apache.kafka.coordinator.group.generated.ShareUpdateValue; +import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; + +public class ShareRecordSerde extends RecordSerde { + @Override + protected ApiMessage apiMessageKeyFor(short recordType) { + switch (recordType) { + case 0: + return new ShareSnapshotKey(); + case 1: + return new ShareUpdateKey(); + default: + throw new CoordinatorLoader.UnknownRecordTypeException(recordType); + } + } + + @Override + protected ApiMessage apiMessageValueFor(short recordType) { + switch (recordType) { + case 0: + return new ShareSnapshotValue(); + case 1: + return new ShareUpdateValue(); + default: + throw new CoordinatorLoader.UnknownRecordTypeException(recordType); + } + } +} diff --git a/group-coordinator/src/main/resources/common/message/ShareSnapshotValue.json b/group-coordinator/src/main/resources/common/message/ShareSnapshotValue.json index 7921f7817ff21..9d49b2a735fd7 100644 --- a/group-coordinator/src/main/resources/common/message/ShareSnapshotValue.json +++ b/group-coordinator/src/main/resources/common/message/ShareSnapshotValue.json @@ -25,6 +25,8 @@ "about": "The snapshot epoch." }, { "name": "StateEpoch", "type": "int32", "versions": "0+", "about": "The state epoch for this share-partition." }, + { "name": "LeaderEpoch", "type": "int32", "versions": "0+", + "about": "The leader epoch of the share-partition." }, { "name": "StartOffset", "type": "int64", "versions": "0", "about": "The share-partition start offset." }, { "name": "StateBatches", "type": "[]StateBatch", "versions": "0", "fields": [ diff --git a/group-coordinator/src/main/resources/common/message/ShareUpdateValue.json b/group-coordinator/src/main/resources/common/message/ShareUpdateValue.json index f1145d2a7b470..4253061eaad80 100644 --- a/group-coordinator/src/main/resources/common/message/ShareUpdateValue.json +++ b/group-coordinator/src/main/resources/common/message/ShareUpdateValue.json @@ -23,6 +23,8 @@ "fields": [ { "name": "SnapshotEpoch", "type": "uint16", "versions": "0", "about": "The snapshot epoch." }, + { "name": "LeaderEpoch", "type": "int32", "versions": "0+", + "about": "The leader epoch of the share-partition." }, { "name": "StartOffset", "type": "int64", "versions": "0", "about": "The share-partition start offset, or -1 if the start offset is not being updated." }, { "name": "StateBatches", "type": "[]StateBatch", "versions": "0", "fields": [ diff --git a/server-common/src/main/java/org/apache/kafka/server/group/share/DefaultStatePersister.java b/server-common/src/main/java/org/apache/kafka/server/group/share/DefaultStatePersister.java index ec769c4a8fade..38f785765ae45 100644 --- a/server-common/src/main/java/org/apache/kafka/server/group/share/DefaultStatePersister.java +++ b/server-common/src/main/java/org/apache/kafka/server/group/share/DefaultStatePersister.java @@ -17,14 +17,19 @@ package org.apache.kafka.server.group.share; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.requests.WriteShareGroupStateResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; /** @@ -60,6 +65,7 @@ public static Persister getInstance() { public void configure(PersisterConfig config) { this.persisterConfig = Objects.requireNonNull(config); this.stateManager = Objects.requireNonNull(config.stateManager); + this.stateManager.start(); } @Override @@ -90,7 +96,6 @@ public CompletableFuture initializeState(Initia * @return ReadShareGroupStateResult */ public CompletableFuture readState(ReadShareGroupStateParameters request) { - this.stateManager.start(); GroupTopicPartitionData gtp = request.groupTopicPartitionData(); String groupId = gtp.groupId(); List handlers = gtp.topicsData().stream() @@ -114,7 +119,57 @@ public CompletableFuture readState(ReadShareGroupStat * @return WriteShareGroupStateResult */ public CompletableFuture writeState(WriteShareGroupStateParameters request) { - throw new RuntimeException("not implemented"); + log.info("Write share group state request received - {}", request); + GroupTopicPartitionData gtp = request.groupTopicPartitionData(); + String groupId = gtp.groupId(); + + Map>> futureMap = new HashMap<>(); + + List handlers = gtp.topicsData().stream() + .map(topicData -> topicData.partitions().stream() + .map(partitionData -> { + Map> partMap = futureMap.computeIfAbsent(topicData.topicId(), k -> new HashMap<>()); + if (!partMap.containsKey(partitionData.partition())) { + partMap.put(partitionData.partition(), new CompletableFuture<>()); + } + return stateManager.new WriteStateHandler( + groupId, topicData.topicId(), partitionData.partition(), partitionData.stateEpoch(), partitionData.leaderEpoch(), partitionData.startOffset(), partitionData.stateBatches(), + partMap.get(partitionData.partition())); + }) + .collect(Collectors.toList())) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + + for (PersisterStateManager.PersisterStateManagerHandler handler : handlers) { + stateManager.enqueue(handler); + } + + CompletableFuture combinedFuture = CompletableFuture.allOf(futureMap.values().stream() + .flatMap(partMap -> partMap.values().stream()).toArray(CompletableFuture[]::new)); + + return combinedFuture.thenApply(v -> { + List> topicsData = futureMap.keySet().stream() + .map(topicId -> { + List partitionErrData = futureMap.get(topicId).values().stream() + .map(future -> { + try { + WriteShareGroupStateResponse partitionResponse = future.get(); + return partitionResponse.data().results().get(0).partitions().stream() + .map(partitionResult -> PartitionFactory.newPartitionErrorData(partitionResult.partition(), partitionResult.errorCode(), partitionResult.errorMessage())) + .collect(Collectors.toList()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }) + .flatMap(List::stream) + .collect(Collectors.toList()); + return new TopicData<>(topicId, partitionErrData); + }) + .collect(Collectors.toList()); + return new WriteShareGroupStateResult.Builder() + .setTopicsData(topicsData) + .build(); + }); } /** diff --git a/server-common/src/main/java/org/apache/kafka/server/group/share/PersisterStateManager.java b/server-common/src/main/java/org/apache/kafka/server/group/share/PersisterStateManager.java index 9fbf1ed482d84..515e538b434d2 100644 --- a/server-common/src/main/java/org/apache/kafka/server/group/share/PersisterStateManager.java +++ b/server-common/src/main/java/org/apache/kafka/server/group/share/PersisterStateManager.java @@ -25,12 +25,15 @@ import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.FindCoordinatorResponseData; import org.apache.kafka.common.message.ReadShareGroupStateRequestData; +import org.apache.kafka.common.message.WriteShareGroupStateRequestData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.ReadShareGroupStateRequest; +import org.apache.kafka.common.requests.WriteShareGroupStateRequest; +import org.apache.kafka.common.requests.WriteShareGroupStateResponse; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.util.InterBrokerSendThread; import org.apache.kafka.server.util.RequestAndCompletionHandler; @@ -46,6 +49,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import java.util.stream.Collectors; public class PersisterStateManager { @@ -86,8 +90,17 @@ public void stop() throws InterruptedException { } } - public abstract static class PersisterStateManagerHandler implements RequestCompletionHandler { + public abstract class PersisterStateManagerHandler implements RequestCompletionHandler { protected Node coordinatorNode; + protected String groupId; + protected Uuid topicId; + protected int partition; + + public PersisterStateManagerHandler(String groupId, Uuid topicId, int partition) { + this.groupId = groupId; + this.topicId = topicId; + this.partition = partition; + } /** * looks for single group:topic:partition item @@ -101,66 +114,77 @@ public abstract static class PersisterStateManagerHandler implements RequestComp * * @return builder for the same */ - protected abstract AbstractRequest.Builder findShareCoordinatorBuilder(); + protected AbstractRequest.Builder findShareCoordinatorBuilder() { + return new FindCoordinatorRequest.Builder(new FindCoordinatorRequestData() + .setKeyType(FindCoordinatorRequest.CoordinatorType.SHARE.id()) + .setKey(coordinatorKey())); + } - protected abstract Node shareCoordinatorNode(); + protected Node shareCoordinatorNode() { + return coordinatorNode; + } protected boolean lookupNeeded() { return coordinatorNode == null; } - protected abstract String coordinatorKey(); + protected String coordinatorKey() { + return ShareGroupHelper.coordinatorKey(groupId, topicId, partition); + } protected boolean isFindCoordinatorResponse(ClientResponse response) { return response != null && response.requestHeader().apiKey() == ApiKeys.FIND_COORDINATOR; } - } - - public class ReadStateHandler extends PersisterStateManagerHandler { - // result of one group:topicId:partition call - private CompletableFuture> result = new CompletableFuture<>(); - private final String groupId; - private final Uuid topicId; - private final int partition; - private final int leaderEpoch; - private final String coordinatorKey; - - public ReadStateHandler(String groupId, Uuid topicId, int partition, int leaderEpoch) { - this.groupId = groupId; - this.topicId = topicId; - this.partition = partition; - this.leaderEpoch = leaderEpoch; - this.coordinatorKey = groupId + ":" + topicId + ":" + partition; - } @Override public void onComplete(ClientResponse response) { if (response != null && response.hasResponse()) { if (isFindCoordinatorResponse(response)) { - handleCoordinatorResponse(response); - } else if (response.requestHeader().apiKey() == ApiKeys.READ_SHARE_GROUP_STATE) { - handleReadShareGroupResponse(response); + handleFindCoordinatorResponse(response); + } else if (isRequestResponse(response)) { + handleRequestResponse(response); } } } - private void handleCoordinatorResponse(ClientResponse response) { + protected void handleFindCoordinatorResponse(ClientResponse response) { List coordinators = ((FindCoordinatorResponse) response.responseBody()).coordinators(); if (coordinators.size() != 1) { - log.error("ReadState coordinator response for {} is invalid", coordinatorKey); + log.error("ReadState coordinator response for {} is invalid", coordinatorKey()); //todo smjn: how to handle? } FindCoordinatorResponseData.Coordinator coordinatorData = coordinators.get(0); Errors error = Errors.forCode(coordinatorData.errorCode()); if (error == Errors.NONE) { - log.info("ReadState find coordinator response received."); + log.info("Find coordinator response received."); coordinatorNode = new Node(coordinatorData.nodeId(), coordinatorData.host(), coordinatorData.port()); // now we want the read state call to happen - // enqueue(this) //todo smjn: enable this when read RPC is working + enqueue(this); //todo smjn: enable this when read RPC is working } } - private void handleReadShareGroupResponse(ClientResponse response) { + protected abstract void handleRequestResponse(ClientResponse response); + + protected abstract boolean isRequestResponse(ClientResponse response); + } + + public class ReadStateHandler extends PersisterStateManagerHandler { + // result of one group:topicId:partition call + private CompletableFuture> result = new CompletableFuture<>(); + private final int leaderEpoch; + + public ReadStateHandler(String groupId, Uuid topicId, int partition, int leaderEpoch) { + super(groupId, topicId, partition); + this.leaderEpoch = leaderEpoch; + } + + @Override + protected boolean isRequestResponse(ClientResponse response) { + return response.requestHeader().apiKey() == ApiKeys.READ_SHARE_GROUP_STATE; + } + + @Override + protected void handleRequestResponse(ClientResponse response) { log.info("ReadState response"); } @@ -176,28 +200,58 @@ protected AbstractRequest.Builder requestBuilder() { .setLeaderEpoch(leaderEpoch)))))); } - @Override - protected AbstractRequest.Builder findShareCoordinatorBuilder() { - return new FindCoordinatorRequest.Builder(new FindCoordinatorRequestData() - .setKeyType(FindCoordinatorRequest.CoordinatorType.SHARE.id()) - .setKey(coordinatorKey())); + public CompletableFuture> result() { + return this.result; + } + } + + public class WriteStateHandler extends PersisterStateManagerHandler { + private final int stateEpoch; + private final int leaderEpoch; + private final long startOffset; + private final List batches; + + private final CompletableFuture result; + + public WriteStateHandler(String groupId, Uuid topicId, int partition, int stateEpoch, int leaderEpoch, long startOffset, List batches, CompletableFuture result) { + super(groupId, topicId, partition); + this.stateEpoch = stateEpoch; + this.leaderEpoch = leaderEpoch; + this.startOffset = startOffset; + this.batches = batches; + this.result = result; } @Override - protected Node shareCoordinatorNode() { - if (coordinatorNode != null) { - return coordinatorNode; - } - return null; + protected AbstractRequest.Builder requestBuilder() { + return new WriteShareGroupStateRequest.Builder(new WriteShareGroupStateRequestData() + .setGroupId(groupId) + .setTopics(Collections.singletonList( + new WriteShareGroupStateRequestData.WriteStateData() + .setTopicId(topicId) + .setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData() + .setPartition(partition) + .setStateEpoch(stateEpoch) + .setLeaderEpoch(leaderEpoch) + .setStartOffset(startOffset) + .setStateBatches(batches.stream() + .map(batch -> new WriteShareGroupStateRequestData.StateBatch() + .setFirstOffset(batch.firstOffset()) + .setLastOffset(batch.lastOffset()) + .setDeliveryState(batch.deliveryState()) + .setDeliveryCount(batch.deliveryCount())) + .collect(Collectors.toList()))))))); } - public CompletableFuture> result() { - return this.result; + @Override + protected boolean isRequestResponse(ClientResponse response) { + return response.requestHeader().apiKey() == ApiKeys.WRITE_SHARE_GROUP_STATE; } @Override - protected String coordinatorKey() { - return this.coordinatorKey; + protected void handleRequestResponse(ClientResponse response) { + log.info("Write state response received. - {}", response); + this.result.complete((WriteShareGroupStateResponse) response.responseBody()); } } @@ -244,17 +298,15 @@ public Collection generateRequests() { handler.findShareCoordinatorBuilder(), handler )); + } else { + // share coord node already available + return Collections.singletonList(new RequestAndCompletionHandler( + System.currentTimeMillis(), + handler.shareCoordinatorNode(), + handler.requestBuilder(), + handler + )); } - //todo smjn: handle the read RPC here when its lifecycle is complete -// else { -// // share coord node already available -// return Collections.singletonList(new RequestAndCompletionHandler( -// System.currentTimeMillis(), -// handler.shareCoordinatorNode(), -// handler.requestBuilder(), -// handler -// )); -// } } return Collections.emptyList(); } diff --git a/server-common/src/main/java/org/apache/kafka/server/group/share/ShareGroupHelper.java b/server-common/src/main/java/org/apache/kafka/server/group/share/ShareGroupHelper.java new file mode 100644 index 0000000000000..28ea6722d6ada --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/group/share/ShareGroupHelper.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.group.share; + +import org.apache.kafka.common.Uuid; + +import java.util.Objects; + +public class ShareGroupHelper { + public static String coordinatorKey(String groupId, Uuid topicId, int partition) { + Objects.requireNonNull(groupId); + Objects.requireNonNull(topicId); + + return String.format("%s:%s:%d", groupId, topicId, partition); + } +}