Skip to content

Commit

Permalink
improve doc
Browse files Browse the repository at this point in the history
  • Loading branch information
labuladong committed Nov 21, 2022
1 parent 79d1e52 commit d67b62e
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 40 deletions.
40 changes: 20 additions & 20 deletions site2/docs/schema-get-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ This example demonstrates how to construct a [string schema](schema-understand.m

```java
Consumer<String> consumer = client.newConsumer(Schema.STRING).subscribe();
consumer.receive();
Message<String> message = consumer.receive();
```

## Construct a key/value schema
Expand All @@ -37,47 +37,47 @@ This example shows how to construct a [key/value schema](schema-understand.md#ke

1. Construct a key/value schema with `INLINE` encoding type.

```java
Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
Schema.INT32,
Schema.STRING,
KeyValueEncodingType.INLINE
);
```
```java
Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
Schema.INT32,
Schema.STRING,
KeyValueEncodingType.INLINE
);
```

2. Optionally, construct a key/value schema with `SEPARATED` encoding type.

```java
Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
Schema.INT32,
Schema.STRING,
KeyValueEncodingType.SEPARATED
);
```
```java
Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
Schema.INT32,
Schema.STRING,
KeyValueEncodingType.SEPARATED
);
```

3. Produce messages using a key/value schema.

```java
Producer<KeyValue<Integer, String>> producer = client.newProducer(kvSchema)
.topic(TOPIC)
.topic(topicName)
.create();

final int key = 100;
final String value = "value-100";

// send the key/value message
producer.newMessage()
.value(new KeyValue(key, value))
.send();
.value(new KeyValue(key, value))
.send();
```

4. Consume messages using a key/value schema.

```java
Consumer<KeyValue<Integer, String>> consumer = client.newConsumer(kvSchema)
...
.topic(TOPIC)
.subscriptionName(SubscriptionName).subscribe();
.topic(topicName)
.subscriptionName(subscriptionName).subscribe();

// receive key/value pair
Message<KeyValue<Integer, String>> msg = consumer.receive();
Expand Down
41 changes: 30 additions & 11 deletions site2/docs/schema-overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,17 @@ This diagram illustrates how schema works on the Producer side.

5. If no, the broker verifies whether a schema can be automatically created in this namespace:

* If `isAllowAutoUpdateSchema` sets to **true**, then a schema can be created, and the broker validates the schema based on the schema compatibility check strategy defined for the topic.
* If `isAllowAutoUpdateSchemaEnabled` sets to **true**, then a schema can be created, and the broker validates the schema based on the schema compatibility check strategy defined for the topic.

* If `isAllowAutoUpdateSchema` sets to **false**, then a schema can not be created, and the producer is rejected to connect to the broker.
* If `isAllowAutoUpdateSchemaEnabled` sets to **false**, then a schema can not be created, and the producer is rejected to connect to the broker.

:::tip

`isAllowAutoUpdateSchema` can be set via **Pulsar admin API** or **REST API.**
`isAllowAutoUpdateSchemaEnabled` can be set via **Pulsar admin API** or **REST API.**

For how to set `isAllowAutoUpdateSchema` via Pulsar admin API, see [Manage AutoUpdate Strategy](admin-api-schemas.md#manage-autoupdate-strategy).
For how to set `isAllowAutoUpdateSchemaEnabled` via Pulsar admin API, see [Manage AutoUpdate Strategy](admin-api-schemas.md#manage-autoupdate-strategy).

:::
:::

6. If the schema is allowed to be updated, then the compatible strategy check is performed.

Expand All @@ -94,9 +94,9 @@ This diagram illustrates how schema works on the consumer side.

4. If a topic does not have all of them (a schema/data/a local consumer and a local producer):

* If `isAllowAutoUpdateSchema` sets to **true**, then the consumer registers a schema and it is connected to a broker.
* If `isAllowAutoUpdateSchemaEnabled` sets to **true**, then the consumer registers a schema and it is connected to a broker.

* If `isAllowAutoUpdateSchema` sets to **false**, then the consumer is rejected to connect to a broker.
* If `isAllowAutoUpdateSchemaEnabled` sets to **false**, then the consumer is rejected to connect to a broker.

5. If a topic has one of them (a schema/data/a local consumer and a local producer), then the schema compatibility check is performed.

Expand All @@ -112,12 +112,19 @@ This diagram illustrates how schema works on the consumer side.

You can use language-specific types of data when constructing and handling messages from simple data types like `string` to more complex application-specific types.

For example, you are using the _User_ class to define the messages sent to Pulsar topics.
For example, you are using the `User` class to define the messages sent to Pulsar topics.

```java
public class User {
String name;
int age;
public String name;
public int age;

User() {}

User(String name, int age) {
this.name = name;
this.age = age;
}
}
```

Expand All @@ -136,14 +143,26 @@ producer.send(message);

**With a schema**

This example constructs a producer with the _JSONSchema_, and you can send the _User_ class to topics directly without worrying about how to serialize POJOs into bytes.
This example constructs a producer with the `JSONSchema`, and you can send the `User` class to topics directly without worrying about how to serialize POJOs into bytes.

```java
// send with json schema
Producer<User> producer = client.newProducer(JSONSchema.of(User.class))
.topic(topic)
.create();
User user = new User("Tom", 28);
producer.send(user);

// receive with json schema
Consumer<User> consumer = client.newConsumer(JSONSchema.of(User.class))
.topic(schemaTopic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName("schema-sub")
.subscribe();

Message<User> message = consumer.receive();
User user = message.getValue();
assert user.age == 28 && user.name.equals("Tom");
```

## What's next?
Expand Down
19 changes: 10 additions & 9 deletions site2/docs/schema-understand.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ Currently, Pulsar supports the following complex types:

You can choose the encoding type when constructing the key/value schema.:
* `INLINE` - Key/value pairs are encoded together in the message payload.
* `SEPARATED` - see [Construct a key/value schema](schema-get-started.md#construct-a-keyvalue-schema).
* `SEPARATED` - Key is stored as message key, while value is stored as message payload.

#### `Struct` schema

Expand Down Expand Up @@ -148,10 +148,11 @@ Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReadin

The table below lists the possible scenarios when this connection attempt occurs and what happens in each scenario:

| Scenario | What happens |
| --- | --- |
| <li>No schema exists for the topic. </li> | (1) The producer is created using the given schema. (2) Since no existing schema is compatible with the `SensorReading` schema, the schema is transmitted to the broker and stored. (3) Any consumer created using the same schema or topic can consume messages from the `sensor-data` topic. |
| <li>A schema already exists. </li><li>The producer connects using the same schema that is already stored. </li> | (1) The schema is transmitted to the broker. (2) The broker determines that the schema is compatible. (3) The broker attempts to store the schema in [BookKeeper](concepts-architecture-overview.md#persistent-storage) but then determines that it's already stored, so it is used to tag produced messages. | <li>A schema already exists. </li><li>The producer connects using a new schema that is compatible. </li> | (1) The schema is transmitted to the broker. (2) The broker determines that the schema is compatible and stores the new schema as the current version (with a new version number). |
| Scenario | What happens |
|-----------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| <li>No schema exists for the topic. </li> | (1) The producer is created using the given schema. <br/>(2) Since no existing schema is compatible with the `SensorReading` schema, the schema is transmitted to the broker and stored. <br/>(3) Any consumer created using the same schema or topic can consume messages from the `sensor-data` topic. |
| <li>A schema already exists. </li><li>The producer connects using the same schema that is already stored. </li> | (1) The schema is transmitted to the broker. <br/>(2) The broker determines that the schema is compatible. <br/>(3) The broker attempts to store the schema in [BookKeeper](concepts-architecture-overview.md#persistent-storage) but then determines that it's already stored, so it is used to tag produced messages. |
| <li>A schema already exists. </li><li>The producer connects using a new schema that is compatible. </li> | (1) The schema is transmitted to the broker. <br/>(2) The broker determines that the schema is compatible and stores the new schema as the current version (with a new version number). |

## Schema AutoUpdate

Expand Down Expand Up @@ -179,9 +180,9 @@ For a producer, the `AutoUpdate` happens in the following cases:

* If the schema is not registered:

* If `isAllowAutoUpdateSchema` sets to **false**, the producer is rejected to connect to a broker.
* If `isAllowAutoUpdateSchemaEnabled` sets to **false**, the producer is rejected to connect to a broker.

* If `isAllowAutoUpdateSchema` sets to **true**:
* If `isAllowAutoUpdateSchemaEnabled` sets to **true**:

* If the schema passes the compatibility check, then the broker registers a new schema automatically for the topic and the producer is connected.

Expand All @@ -199,9 +200,9 @@ For a consumer, the `AutoUpdate` happens in the following cases:

* If a topic does not have all of them (a schema/data/a local consumer and a local producer):

* If `isAllowAutoUpdateSchema` sets to **true**, then the consumer registers a schema and it is connected to a broker.
* If `isAllowAutoUpdateSchemaEnabled` sets to **true**, then the consumer registers a schema and it is connected to a broker.

* If `isAllowAutoUpdateSchema` sets to **false**, then the consumer is rejected to connect to a broker.
* If `isAllowAutoUpdateSchemaEnabled` sets to **false**, then the consumer is rejected to connect to a broker.

* If a topic has one of them (a schema/data/a local consumer and a local producer), then the schema compatibility check is performed.

Expand Down

0 comments on commit d67b62e

Please sign in to comment.