Skip to content

Commit

Permalink
separate left and right stream classes
Browse files Browse the repository at this point in the history
  • Loading branch information
wernerdv authored and raminqaf committed Apr 12, 2024
1 parent bf9a27f commit 0e3cf15
Show file tree
Hide file tree
Showing 444 changed files with 11,443 additions and 7,046 deletions.
44 changes: 44 additions & 0 deletions .github/workflows/docker_scan.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: Docker Image CVE Scanner
on:
schedule:
# This job will run at 3:30 UTC daily
- cron: '30 3 * * *'
workflow_dispatch:
jobs:
scan_jvm:
runs-on: ubuntu-latest
strategy:
matrix:
# This is an array of supported tags. Make sure this array only contains the supported tags
supported_image_tag: ['latest', '3.7.0']
steps:
- name: Run CVE scan
uses: aquasecurity/trivy-action@master
if: always()
with:
image-ref: apache/kafka:${{ matrix.supported_image_tag }}
format: 'table'
severity: 'CRITICAL,HIGH'
output: scan_report_jvm_${{ matrix.supported_image_tag }}.txt
exit-code: '1'
- name: Upload CVE scan report
if: always()
uses: actions/upload-artifact@v3
with:
name: scan_report_jvm_${{ matrix.supported_image_tag }}.txt
path: scan_report_jvm_${{ matrix.supported_image_tag }}.txt
19 changes: 17 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,14 @@ Follow instructions in https://kafka.apache.org/quickstart
./gradlew clients:test --tests org.apache.kafka.clients.MetadataTest.testTimeToNextUpdate

### Running a particular unit/integration test with log4j output ###
Change the log4j setting in either `clients/src/test/resources/log4j.properties` or `core/src/test/resources/log4j.properties`
By default, there will be only small number of logs output while testing. You can adjust it by changing the `log4j.properties` file in the module's `src/test/resources` directory.

./gradlew clients:test --tests RequestResponseTest
For example, if you want to see more logs for clients project tests, you can modify [the line](https://github.com/apache/kafka/blob/trunk/clients/src/test/resources/log4j.properties#L21) in `clients/src/test/resources/log4j.properties`
to `log4j.logger.org.apache.kafka=INFO` and then run:

./gradlew cleanTest clients:test --tests NetworkClientTest

And you should see `INFO` level logs in the file under the `clients/build/test-results/test` directory.

### Specifying test retries ###
By default, each failed test is retried once up to a maximum of five retries per test run. Tests are retried at the end of the test task. Adjust these parameters in the following way:
Expand Down Expand Up @@ -87,15 +92,25 @@ fail due to code changes. You can just run:

### Running a Kafka broker in KRaft mode

Using compiled files:

KAFKA_CLUSTER_ID="$(./bin/kafka-storage.sh random-uuid)"
./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
./bin/kafka-server-start.sh config/kraft/server.properties

Using docker image:

docker run -p 9092:9092 apache/kafka:3.7.0

### Running a Kafka broker in ZooKeeper mode

Using compiled files:

./bin/zookeeper-server-start.sh config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties

>Since ZooKeeper mode is already deprecated and planned to be removed in Apache Kafka 4.0, the docker image only supports running in KRaft mode
### Cleaning the build ###
./gradlew clean

Expand Down
46 changes: 36 additions & 10 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ subprojects {
options.compilerArgs << "-Xlint:-options"
}

// -parameters generates arguments with parameter names in TestInfo#getDisplayName.
// ref: https://github.com/junit-team/junit5/blob/4c0dddad1b96d4a20e92a2cd583954643ac56ac0/junit-jupiter-params/src/main/java/org/junit/jupiter/params/ParameterizedTest.java#L161-L164
compileTestJava.options.compilerArgs.add "-parameters"

// We should only set this if Java version is < 9 (--release is recommended for >= 9), but the Scala plugin for IntelliJ sets
// `-target` incorrectly if this is unset
sourceCompatibility = minJavaVersion
Expand All @@ -305,6 +309,24 @@ subprojects {
} else {
apply plugin: 'com.github.johnrengelman.shadow'
project.shadow.component(mavenJava)

// Fix for avoiding inclusion of runtime dependencies marked as 'shadow' in MANIFEST Class-Path.
// https://github.com/johnrengelman/shadow/issues/324
afterEvaluate {
pom.withXml { xml ->
if (xml.asNode().get('dependencies') == null) {
xml.asNode().appendNode('dependencies')
}
def dependenciesNode = xml.asNode().get('dependencies').get(0)
project.configurations.shadowed.allDependencies.each {
def dependencyNode = dependenciesNode.appendNode('dependency')
dependencyNode.appendNode('groupId', it.group)
dependencyNode.appendNode('artifactId', it.name)
dependencyNode.appendNode('version', it.version)
dependencyNode.appendNode('scope', 'runtime')
}
}
}
}

afterEvaluate {
Expand Down Expand Up @@ -748,7 +770,7 @@ subprojects {
}

if (userEnableTestCoverage) {
def coverageGen = it.path == ':core' ? 'reportScoverage' : 'jacocoTestReport'
def coverageGen = it.path == ':core' ? 'reportTestScoverage' : 'jacocoTestReport'
tasks.register('reportCoverage').configure { dependsOn(coverageGen) }
}

Expand Down Expand Up @@ -829,6 +851,7 @@ project(':server') {
implementation project(':transaction-coordinator')
implementation project(':raft')
implementation libs.metrics
implementation libs.jacksonDatabind

implementation libs.slf4jApi

Expand Down Expand Up @@ -1402,6 +1425,7 @@ project(':clients') {

configurations {
generator
shadowed
}

dependencies {
Expand All @@ -1412,10 +1436,10 @@ project(':clients') {
implementation libs.opentelemetryProto

// libraries which should be added as runtime dependencies in generated pom.xml should be defined here:
shadow libs.zstd
shadow libs.lz4
shadow libs.snappy
shadow libs.slf4jApi
shadowed libs.zstd
shadowed libs.lz4
shadowed libs.snappy
shadowed libs.slf4jApi

compileOnly libs.jacksonDatabind // for SASL/OAUTHBEARER bearer token parsing
compileOnly libs.jacksonJDK8Datatypes
Expand Down Expand Up @@ -1468,10 +1492,9 @@ project(':clients') {

// dependencies excluded from the final jar, since they are declared as runtime dependencies
dependencies {
exclude(dependency(libs.snappy))
exclude(dependency(libs.zstd))
exclude(dependency(libs.lz4))
exclude(dependency(libs.slf4jApi))
project.configurations.shadowed.allDependencies.each {
exclude(dependency(it.group + ':' + it.name + ':' + it.version))
}
// exclude proto files from the jar
exclude "**/opentelemetry/proto/**/*.proto"
exclude "**/google/protobuf/*.proto"
Expand Down Expand Up @@ -2160,6 +2183,7 @@ project(':streams') {
testImplementation project(':storage')
testImplementation project(':server-common')
testImplementation project(':server-common').sourceSets.test.output
testImplementation project(':server')
testImplementation libs.log4j
testImplementation libs.junitJupiter
testImplementation libs.junitVintageEngine
Expand Down Expand Up @@ -2742,6 +2766,7 @@ project(':jmh-benchmarks') {
exclude group: 'net.sf.jopt-simple', module: 'jopt-simple'
}
implementation project(':server-common')
implementation project(':server')
implementation project(':clients')
implementation project(':group-coordinator')
implementation project(':metadata')
Expand Down Expand Up @@ -2899,7 +2924,7 @@ project(':connect:json') {
api libs.jacksonAfterburner

implementation libs.slf4jApi

testImplementation libs.junitJupiter

testRuntimeOnly libs.slf4jlog4j
Expand Down Expand Up @@ -2969,6 +2994,7 @@ project(':connect:runtime') {
testImplementation project(':metadata')
testImplementation project(':core').sourceSets.test.output
testImplementation project(':server-common')
testImplementation project(':server')
testImplementation project(':storage')
testImplementation project(':connect:test-plugins')

Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control-jmh-benchmarks.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
<allow pkg="kafka.message"/>
<allow pkg="org.mockito"/>
<allow pkg="kafka.security.authorizer"/>
<allow pkg="org.apache.kafka.security.authorizer"/>
<allow pkg="org.apache.kafka.server"/>
<allow pkg="org.apache.kafka.storage"/>
<allow pkg="org.apache.kafka.clients"/>
Expand Down
2 changes: 1 addition & 1 deletion checkstyle/import-control-server-common.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@
<allow pkg="org.apache.kafka.server.util.json" />

<allow class="org.apache.kafka.server.util.TopicFilter.IncludeList" />
<allow class="org.apache.kafka.test.TestUtils" />
<subpackage name="timer">
<allow class="org.apache.kafka.server.util.MockTime" />
<allow class="org.apache.kafka.server.util.ShutdownableThread" />
<allow class="org.apache.kafka.test.TestUtils" />
</subpackage>
</subpackage>
</subpackage>
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.eclipse.jetty.client"/>
<allow class="org.apache.kafka.storage.internals.log.CleanerConfig" />
<allow class="org.apache.kafka.server.config.ZkConfigs" />
</subpackage>
</subpackage>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ default CreateDelegationTokenResult createDelegationToken() {
* </ul>
*
* @param options The options to use when creating delegation token.
* @return The DeleteRecordsResult.
* @return The CreateDelegationTokenResult.
*/
CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,17 +1093,24 @@ public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartitio
return Collections.emptyMap();
}
final Timer timer = time.timer(timeout);
final ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
timestampsToSearch,
true,
timer);
ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
timestampsToSearch,
timer,
true);

// If timeout is set to zero return empty immediately; otherwise try to get the results
// and throw timeout exception if it cannot complete in time.
if (timeout.toMillis() == 0L)
return listOffsetsEvent.emptyResult();
if (timeout.toMillis() == 0L) {
applicationEventHandler.add(listOffsetsEvent);
return listOffsetsEvent.emptyResults();
}

return applicationEventHandler.addAndGet(listOffsetsEvent, timer);
return applicationEventHandler.addAndGet(listOffsetsEvent, timer)
.entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().buildOffsetAndTimestamp()));
} finally {
release();
}
Expand Down Expand Up @@ -1141,21 +1148,32 @@ private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition
if (partitions.isEmpty()) {
return Collections.emptyMap();
}

Map<TopicPartition, Long> timestampToSearch = partitions
.stream()
.collect(Collectors.toMap(Function.identity(), tp -> timestamp));
.stream()
.collect(Collectors.toMap(Function.identity(), tp -> timestamp));
Timer timer = time.timer(timeout);
ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
timestampToSearch,
false,
timer);
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = applicationEventHandler.addAndGet(
listOffsetsEvent,
timer);
return offsetAndTimestampMap
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
timestampToSearch,
timer,
false);

// If timeout is set to zero return empty immediately; otherwise try to get the results
// and throw timeout exception if it cannot complete in time.
if (timeout.isZero()) {
applicationEventHandler.add(listOffsetsEvent);
return listOffsetsEvent.emptyResults();
}

Map<TopicPartition, OffsetAndTimestampInternal> offsetAndTimestampMap;
offsetAndTimestampMap = applicationEventHandler.addAndGet(
listOffsetsEvent,
timer);
return offsetAndTimestampMap.entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().offset()));
} finally {
release();
}
Expand Down Expand Up @@ -1462,8 +1480,10 @@ private void updatePatternSubscription(Cluster cluster) {
final Set<String> topicsToSubscribe = cluster.topics().stream()
.filter(subscriptions::matchesSubscribedPattern)
.collect(Collectors.toSet());
if (subscriptions.subscribeFromPattern(topicsToSubscribe))
if (subscriptions.subscribeFromPattern(topicsToSubscribe)) {
applicationEventHandler.add(new SubscriptionChangeEvent());
metadata.requestUpdateForNewTopics();
}
}

@Override
Expand Down Expand Up @@ -1667,11 +1687,9 @@ private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAnd
public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
maybeThrowFencedInstanceException();
maybeInvokeCommitCallbacks();
maybeUpdateSubscriptionMetadata();
backgroundEventProcessor.process();

// Keeping this updateAssignmentMetadataIfNeeded wrapping up the updateFetchPositions as
// in the previous implementation, because it will eventually involve group coordination
// logic
return updateFetchPositions(timer);
}

Expand Down Expand Up @@ -1750,8 +1768,8 @@ private void subscribeInternal(Pattern pattern, Optional<ConsumerRebalanceListen
throwIfNoAssignorsConfigured();
log.info("Subscribed to pattern: '{}'", pattern);
subscriptions.subscribe(pattern, listener);
updatePatternSubscription(metadata.fetch());
metadata.requestUpdateForNewTopics();
updatePatternSubscription(metadata.fetch());
} finally {
release();
}
Expand Down Expand Up @@ -1943,4 +1961,10 @@ SubscriptionState subscriptions() {
return subscriptions;
}

private void maybeUpdateSubscriptionMetadata() {
if (subscriptions.hasPatternSubscription()) {
updatePatternSubscription(metadata.fetch());
}
}

}
Loading

0 comments on commit 0e3cf15

Please sign in to comment.