Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][admin] PIP-369 Change default value of unload-scope in ns-isolation-policy set #2

Closed
wants to merge 75 commits into from

Conversation

grssam
Copy link
Owner

@grssam grssam commented Sep 4, 2024

Fixes #xyz

Main Issue: #xyz

PIP: #xyz

Motivation

Modifications

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added the PIP label Sep 4, 2024
marekczajkowski and others added 29 commits September 4, 2024 16:05
Co-authored-by: duanlinlin <[email protected]>

Fixes: fix irregular method name
…pache#22372)

Co-authored-by: xiangying <[email protected]>
### Motivation
When a message is not set value, the `nullValue` message metadata should be true and change to false after the value is set. Otherwise, the message data will be set as a [] when the value is not set, that would cause the message data to be encoded and throw a `SchemaSerializationException` when calling `reconsumerLater`.
```

org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4

	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1131)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:467)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:452)
	at org.apache.pulsar.client.api.ConsumerRedeliveryTest.testRedeliverMessagesWithoutValue(ConsumerRedeliveryTest.java:445)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
	at org.testng.internal.invokers.TestInvoker.invokeMethod(TestInvoker.java:677)
	at org.testng.internal.invokers.TestInvoker.invokeTestMethod(TestInvoker.java:221)
	at org.testng.internal.invokers.MethodRunner.runInSequence(MethodRunner.java:50)
	at org.testng.internal.invokers.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:969)
	at org.testng.internal.invokers.TestInvoker.invokeTestMethods(TestInvoker.java:194)
	at org.testng.internal.invokers.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:148)
	at org.testng.internal.invokers.TestMethodWorker.run(TestMethodWorker.java:128)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.testng.TestRunner.privateRun(TestRunner.java:829)
	at org.testng.TestRunner.run(TestRunner.java:602)
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:437)
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:431)
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:391)
	at org.testng.SuiteRunner.run(SuiteRunner.java:330)
	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:95)
	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1256)
	at org.testng.TestNG.runSuitesLocally(TestNG.java:1176)
	at org.testng.TestNG.runSuites(TestNG.java:1099)
	at org.testng.TestNG.run(TestNG.java:1067)
	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:65)
	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:105)
Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLater(ConsumerBase.java:462)
	... 29 more
Caused by: org.apache.pulsar.client.api.SchemaSerializationException: Size of data received by IntSchema is not 4
	at org.apache.pulsar.client.impl.schema.IntSchema.validate(IntSchema.java:49)
	at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:80)
	at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:32)
	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.lambda$value$3(TypedMessageBuilderImpl.java:157)
	at java.base/java.util.Optional.orElseGet(Optional.java:364)
	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.value(TypedMessageBuilderImpl.java:156)
	at org.apache.pulsar.client.impl.ConsumerImpl.doReconsumeLater(ConsumerImpl.java:689)
	at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.doReconsumeLater(MultiTopicsConsumerImpl.java:550)
	at org.apache.pulsar.client.impl.ConsumerBase.reconsumeLaterAsync(ConsumerBase.java:574)
```
### Modifications
When a message is not set value, the `nullValue` message metadata should be true and change to false after the value is set.
… specific subscription (apache#22861)

Co-authored-by: duanlinlin <[email protected]>
[PIP-359](apache#22902)
Support custom message listener thread pool for specific subscription, avoid individual subscription listener consuming too much time leading to higher consumption delay in other subscriptions.

<!--
### Contribution Checklist
  
  - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://pulsar.apache.org/contribute/develop-semantic-title/)*. 

  - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
  
  - Each pull request should address only one issue, not mix up code from multiple issues.
  
  - Each commit in the pull request has a meaningful commit message

  - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
-->

<!-- Either this PR fixes an issue, -->

<!-- or this PR is one task of an issue -->

<!-- If the PR belongs to a PIP, please add the PIP link here -->

<!-- Details of when a PIP is required and how the PIP process work, please see: https://github.com/apache/pulsar/blob/master/pip/README.md -->

### Motivation
In our scenario, there is a centralized message proxy service, this service will use the same PulsarClient instance to create a lot of subscription groups to consume many topics and cache messages locally.Then the business will pull messages from the cache of the proxy service. It seems that there is no problem, but during use, we found that when the 
message processing time of several consumer groups (listener mode) is very high, it almost affects all consumer groups responsible for the proxy service, causing a large number of message delays. 

By analyzing the source code, we found that by default, all consumer instances created from the same PulsarClient will share a thread pool to process message listeners, and sometimes there are multiple consumer message listeners bound to the same thread. Obviously, when a consumer processes messages and causes long-term blocking, it will cause the messages of other consumers bound to the thread to fail to be processed in time, resulting in message delays. Therefore, for this scenario, it may be necessary to support specific a message listener thread pool with consumer latitudes to avoid mutual influence between different consumers.

<!-- Explain here the context, and why you're making that change. What is the problem you're trying to solve. -->

### Modifications
Support custom message listener thread pool for specific subscription.
<!-- Describe the modifications you've done. -->
…apache#23075)

### Motivation

Currently, the lookup process uses only the topic name as its parameter. However, to enhance this process, it's
beneficial for clients to provide additional information. This could be done by introducing the `lookupProperties` field
in the client configuration. Clients can then share these properties with the broker during lookup.

On the broker side, the broker could also contain some properties that are used for the lookup. We can also support the
lookupProperties for the broker. The broker can use these properties to make a better decision on which broker to
return.

Here is the rack-aware lookup scenario for using the client properties for the lookup:
Assuming there are two brokers that broker-0 configures the lookup property "rack" with "A" and broker-1 configures the
lookup property "rack" with "B". By using the lookup properties, clients can supply rack information during the lookup,
enabling the broker to identify and connect them to the nearest broker within the same rack. If a client that configures
the "rack" property with "A" connects to a lookup broker, the customized load manager can determine broker-0 as the
owner broker since the broker and the client have the same rack property.

### Modifications

Add new configuration `lookupProperties` to the client. While looking up the broker, the client will send the properties
to the broker through `CommandLookupTopic` request.

The `lookupProperties` will then be added to the `LookupOptions`. The Load Manager implementation can access
the `properties` through `LookupOptions` to make a better decision on which broker to return.

The properties are used only when the protocol is the binary protocol, starting with `pulsar://` or `pulsar+ssl://`, or
if the `loadManagerClassName` in the broker is a class that implements the `ExtensibleLoadManager` interface.

To support configuring the `lookupProperties` on the broker side, introduce a new broker
configuration `lookupPropertyPrefix`. Any broker configuration properties that start with the `lookupPropertyPrefix`
will be included into the `BrokerLookupData` and be persisted in the metadata store. The broker can use these properties
during the lookup.

In this way, to support the rack-aware lookup scenario mentioned in the "Motivation" part, the client can set the rack
information in the client `lookupProperties`. Similarly, the broker can also set the rack information in the broker
configuration like `lookup.rack`. The `lookup.rack` will be stored in the `BrokerLookupData`. A customized load manager
can then be implemented. For each lookup request, it will go through the `BrokerLookupData` for all brokers and select
the broker in the same rack to return.
dao-jun and others added 28 commits September 4, 2024 16:05
…ext and SSL Engine generation (apache#23110)

Co-authored-by: Apurva Telang <[email protected]>
…apache#23223)

PIP: apache#23075

### Motivation

This is the implementation for the PIP: apache#23075
Currently, the lookup process uses only the topic name as its parameter. However, to enhance this process, it's
beneficial for clients to provide additional information. This could be done by introducing the `lookupProperties` field
in the client configuration. Clients can then share these properties with the broker during lookup.

On the broker side, the broker could also contain some properties that are used for the lookup. We can also support the
lookupProperties for the broker. The broker can use these properties to make a better decision on which broker to
return.

Here is the rack-aware lookup scenario for using the client properties for the lookup:
Assuming there are two brokers that broker-0 configures the lookup property "rack" with "A" and broker-1 configures the
lookup property "rack" with "B". By using the lookup properties, clients can supply rack information during the lookup,
enabling the broker to identify and connect them to the nearest broker within the same rack. If a client that configures
the "rack" property with "A" connects to a lookup broker, the customized load manager can determine broker-0 as the
owner broker since the broker and the client have the same rack property.

### Modifications

- Add new configuration `lookupProperties` to the client. While looking up the broker, the client will send the properties
to the broker through `CommandLookupTopic` request.
- Add `properties` field to the `CommandLookupTopic`.
- Add `lookupProperties` to the `LookupOptions`. The Load Manager implementation can access
the `properties` through `LookupOptions` to make a better decision on which broker to return.
- Introduce a new broker configuration `lookupPropertyPrefix`. Any broker configuration properties that start with the `lookupPropertyPrefix`
will be included into the `BrokerLookupData` and be persisted in the metadata store. The broker can use these properties
during the lookup.

Co-authored-by: Yunze Xu <[email protected]>
…ctional message (apache#21406)

### Motivation
The decision to write a snapshot before the first transaction message instead of before building the producer, is based on the fact that only the act of writing transactional messages signifies the use of the transaction buffer. Furthermore, it is only appropriate to schedule snapshot updates after this point.
Otherwise, it will add a lot of unnecessary IO read and write operations and increase the delay of topic load.

---
  **Scenario**
 * 1000 topics under namespace1.
 * Client1 enables transaction and sends transaction messages to topic 1.
 * Client1 sends normal messages to topic 2~500. 
 * Client2 disables transaction and sends messages to topic 501~1000. 
 
 **Internal Behavior**
 * Topic 1~500 will start the 500 task to write snapshots into the same system topic, e.g., system topic 1.
 * All the topics (1~1000) will read this system topic 1 when topic loading.
### Modifications

This Pull Request aims to resolve the unnecessary write operation. Starting to write snapshots when sending first transaction messages instead of building producer.
…s-isolation-policy set`

Also change behavior of all_matching to cover both removed and added regexes
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.