Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor][doc] Improve schema docs #18434

Merged
merged 22 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
426 changes: 236 additions & 190 deletions site2/docs/admin-api-schemas.md

Large diffs are not rendered by default.

Binary file removed site2/docs/assets/schema-consumer.png
Binary file not shown.
1 change: 1 addition & 0 deletions site2/docs/assets/schema-consumer.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file removed site2/docs/assets/schema-producer.png
Binary file not shown.
1 change: 1 addition & 0 deletions site2/docs/assets/schema-producer.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions site2/docs/assets/schema.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
173 changes: 1 addition & 172 deletions site2/docs/client-libraries-python.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,6 @@ The schema definition is like this.

### Declare and validate schema

You can send messages using `BytesSchema`, `StringSchema`, `AvroSchema`, and `JsonSchema`.

Before the producer is created, the Pulsar broker validates that the existing topic schema is the correct type and that the format is compatible with the schema definition of a class. If the format of the topic schema is incompatible with the schema definition, an exception occurs in the producer creation.

Once a producer is created with a certain schema definition, it only accepts objects that are instances of the declared schema class.
Expand All @@ -321,176 +319,7 @@ while True:
consumer.negative_acknowledge(msg)
```

````mdx-code-block
<Tabs
defaultValue="BytesSchema"
values={[{"label":"BytesSchema","value":"BytesSchema"},{"label":"StringSchema","value":"StringSchema"},{"label":"AvroSchema","value":"AvroSchema"},{"label":"JsonSchema","value":"JsonSchema"}]}>

<TabItem value="BytesSchema">

You can send byte data using a `BytesSchema`.

**Example**

```python
producer = client.create_producer(
'bytes-schema-topic',
schema=BytesSchema())
producer.send(b"Hello")

consumer = client.subscribe(
'bytes-schema-topic',
'sub',
schema=BytesSchema())
msg = consumer.receive()
data = msg.value()
```

</TabItem>
<TabItem value="StringSchema">

You can send string data using a `StringSchema`.

**Example**

```python
producer = client.create_producer(
'string-schema-topic',
schema=StringSchema())
producer.send("Hello")

consumer = client.subscribe(
'string-schema-topic',
'sub',
schema=StringSchema())
msg = consumer.receive()
str = msg.value()
```

</TabItem>
<TabItem value="AvroSchema">

You can declare an `AvroSchema` using one of the following methods.

#### Method 1: Record

You can declare an `AvroSchema` by passing a class that inherits
from `pulsar.schema.Record` and defines the fields as
class variables.

**Example**

```python
class Example(Record):
a = Integer()
b = Integer()

producer = client.create_producer(
'avro-schema-topic',
schema=AvroSchema(Example))
r = Example(a=1, b=2)
producer.send(r)

consumer = client.subscribe(
'avro-schema-topic',
'sub',
schema=AvroSchema(Example))
msg = consumer.receive()
e = msg.value()
```

#### Method 2: JSON definition

You can declare an `AvroSchema` using JSON. In this case, Avro schemas are defined using JSON.

**Example**

Below is an `AvroSchema` defined using a JSON file (_company.avsc_).

```json
{
"doc": "this is doc",
"namespace": "example.avro",
"type": "record",
"name": "Company",
"fields": [
{"name": "name", "type": ["null", "string"]},
{"name": "address", "type": ["null", "string"]},
{"name": "employees", "type": ["null", {"type": "array", "items": {
"type": "record",
"name": "Employee",
"fields": [
{"name": "name", "type": ["null", "string"]},
{"name": "age", "type": ["null", "int"]}
]
}}]},
{"name": "labels", "type": ["null", {"type": "map", "values": "string"}]}
]
}
```

You can load a schema definition from file by using [`avro.schema`](https://avro.apache.org/docs/current/getting-started-python/) or [`fastavro.schema`](https://fastavro.readthedocs.io/en/latest/schema.html#fastavro._schema_py.load_schema).

If you use the "JSON definition" method to declare an `AvroSchema`, pay attention to the following points:

- You need to use [Python dict](https://developers.google.com/edu/python/dict-files) to produce and consume messages, which is different from using the "Record" method.

- When generating an `AvroSchema` object, set `_record_cls` parameter to `None`.

**Example**

```python
from fastavro.schema import load_schema
from pulsar.schema import *
schema_definition = load_schema("examples/company.avsc")
avro_schema = AvroSchema(None, schema_definition=schema_definition)
producer = client.create_producer(
topic=topic,
schema=avro_schema)
consumer = client.subscribe(topic, 'test', schema=avro_schema)
company = {
"name": "company-name" + str(i),
"address": 'xxx road xxx street ' + str(i),
"employees": [
{"name": "user" + str(i), "age": 20 + i},
{"name": "user" + str(i), "age": 30 + i},
{"name": "user" + str(i), "age": 35 + i},
],
"labels": {
"industry": "software" + str(i),
"scale": ">100",
"funds": "1000000.0"
}
}
producer.send(company)
msg = consumer.receive()
# Users could get a dict object by `value()` method.
msg.value()
```

</TabItem>
<TabItem value="JsonSchema">

#### Record

You can declare a `JsonSchema` by passing a class that inherits
from `pulsar.schema.Record` and defines the fields as class variables. This is similar to using `AvroSchema`. The only difference is to use `JsonSchema` instead of `AvroSchema` when defining schema type as shown below. For how to use `AvroSchema` via record, see [heres-python.md#method-1-record).

```python
producer = client.create_producer(
'avro-schema-topic',
schema=JsonSchema(Example))

consumer = client.subscribe(
'avro-schema-topic',
'sub',
schema=JsonSchema(Example))
```

</TabItem>

</Tabs>
````
For more code examples, see [Schema - Get started](schema-get-started.md).

## End-to-end encryption

Expand Down
168 changes: 5 additions & 163 deletions site2/docs/schema-evolution-compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,167 +4,9 @@ title: Schema evolution and compatibility
sidebar_label: "Schema evolution and compatibility"
---

Normally, schemas do not stay the same over a long period of time. Instead, they undergo evolutions to satisfy new needs.
````mdx-code-block
import useLocationParentPath from '@site/src/libs/useLocationParentPath';
import {Redirect} from "@docusaurus/router";

This chapter introduces how Pulsar schema evolves and what compatibility check strategies it adopts.

## Schema evolution

The message produced with `SchemaInfo` is tagged with a schema version. When a message is consumed by a Pulsar client, the Pulsar client can use the schema version to retrieve the corresponding `SchemaInfo` and use the correct schema information to deserialize data.

Schemas store the details of attributes and types. To satisfy new business requirements, you need to update schemas inevitably over time, which is called **schema evolution**.

Any schema changes affect downstream consumers. Schema evolution ensures that the downstream consumers can seamlessly handle data encoded with both old schemas and new schemas.

### How schema evolves?

The answer is [schema compatibility check strategy](#schema-compatibility-check-strategy). It determines how schema compares old schemas with new schemas in topics.

### How Pulsar supports schema evolution?

The process of how Pulsar supports schema evolution is described as follows.

1. The producer/consumer/reader sends the `SchemaInfo` of its client to brokers.

2. Brokers recognize the schema type and deploy the schema compatibility checker `schemaRegistryCompatibilityCheckers` for that schema type to enforce the schema compatibility check. By default, the value of `schemaRegistryCompatibilityCheckers` in the `conf/broker.conf` or `conf/standalone.conf` file is as follows.

```properties
schemaRegistryCompatibilityCheckers=org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck
```

:::note

Each schema type corresponds to one instance of schema compatibility checker. Currently, Avro, JSON, and Protobuf have their own compatibility checkers, while all the other schema types share the default compatibility checker which disables the schema evolution. In a word, schema evolution is only available in Avro, JSON, and Protobuf schema.

:::

3. Brokers use the schema compatibility checker to check if the `SchemaInfo` is compatible with the latest schema of the topic by applying its [compatibility check strategy](#schema-compatibility-check-strategy). Currently, the compatibility check strategy is configured at the namespace level and applied to all the topics within that namespace.

For more details, see [`schemaRegistryCompatibilityCheckers`](https://github.com/apache/pulsar/blob/bf194b557c48e2d3246e44f1fc28876932d8ecb8/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java).


## Schema compatibility check strategy

The following table outlines 8 schema compatibility check strategies and how it works.

Suppose that you have a topic containing three schemas (V1, V2, and V3). V1 is the oldest and V3 is the latest.

| Compatibility check strategy | Definition | Changes allowed | Check against which schema | Upgrade first |
| --- | --- | --- | --- | --- |
| `ALWAYS_COMPATIBLE` | Disable schema compatibility check. | All changes are allowed | All previous versions | Any order |
| `ALWAYS_INCOMPATIBLE` | Disable schema evolution, that is, any schema change is rejected. | All changes are disabled | None | None |
| `BACKWARD` | Consumers using schema V3 can process data written by producers using the **last schema version** V2. | <li>Add optional fields </li><li>Delete fields </li> | Latest version | Consumers |
| `BACKWARD_TRANSITIVE` | Consumers using schema V3 can process data written by producers using **all previous schema versions** V2 and V1. | <li>Add optional fields </li><li>Delete fields </li> | All previous versions | Consumers |
| `FORWARD` | Consumers using the **last schema version** V2 can process data written by producers using a new schema V3, even though they may not be able to use the full capabilities of the new schema. | <li>Add fields </li><li>Delete optional fields </li> | Latest version | Producers |
| `FORWARD_TRANSITIVE` | Consumers using **all previous schema versions** V2 or V1 can process data written by producers using a new schema V3. | <li>Add fields </li><li>Delete optional fields </li> | All previous versions | Producers |
| `FULL` | Schemas are both backward and forward compatible. <li>Consumers using the last schema V2 can process data written by producers using the new schema V3. </li><li>Consumers using the new schema V3 can process data written by producers using the last schema V2.</li> | <li>Modify optional fields </li> | Latest version | Any order |
| `FULL_TRANSITIVE` | Backward and forward compatible among schema V3, V2, and V1. <li>Consumers using the schema V3 can process data written by producers using schema V2 and V1. </li><li>Consumers using the schema V2 or V1 can process data written by producers using the schema V3.</li> | <li>Modify optional fields </li> | All previous versions | Any order |

:::tip

* The default schema compatibility check strategy varies depending on schema types.
* For Avro and JSON, the default one is `FULL`.
* For others, the default one is `ALWAYS_INCOMPATIBLE`.
* You can set schema compatibility check strategy at the topic, namespace or broker level. For how to set the strategy, see [here](admin-api-schemas.md#set-schema-compatibility-check-strategy).

:::

### ALWAYS_COMPATIBLE example

In some situations, an application needs to store events of several different types in the same Pulsar topic.

In particular, when developing a data model in an `Event Sourcing` style, you might have several kinds of events that affect the state of an entity.

For example, for a user entity, there are `userCreated`, `userAddressChanged` and `userEnquiryReceived` events. The application requires that those events are always read in the same order.

Consequently, those events need to go in the same Pulsar partition to maintain order. This application can use `ALWAYS_COMPATIBLE` to allow different kinds of events to co-exist on the same topic.

### ALWAYS_INCOMPATIBLE example

Sometimes we also make incompatible changes. For example, you are modifying a field type from `string` to `int`.

In this case, you need to:

* Upgrade all producers and consumers to the new schema versions at the same time.

* Optionally, create a new topic and start migrating applications to use the new topic and the new schema, avoiding the need to handle two incompatible versions in the same topic.

### BACKWARD and BACKWARD_TRANSITIVE example

* Example 1

Remove a field.

A consumer constructed to process events without one field can process events written with the old schema containing the field, and the consumer will ignore that field.

* Example 2

You want to load all Pulsar data into a Hive data warehouse and run SQL queries against the data.

Same SQL queries must continue to work even if the data is changed. To support it, you can evolve the schemas using the `BACKWARD` strategy.

### FORWARD and FORWARD_TRANSITIVE example

* Example 1

Add a field.

In most data formats, consumers written to process events without new fields can continue doing so even when they receive new events containing new fields.

* Example 2

If a consumer has an application logic tied to a full version of a schema, the application logic may not be updated instantly when the schema evolves.

In this case, you need to project data with a new schema onto an old schema that the application understands.

Consequently, you can evolve the schemas using the `FORWARD` strategy to ensure that the old schema can process data encoded with the new schema.

### FULL and FULL_TRANSITIVE example

In some data formats, for example, Avro, you can define fields with default values. Consequently, adding or removing a field with a default value is a fully compatible change.


## Schema validation

When a producer or a consumer tries to connect to a topic, a broker performs some checks to verify a schema.

### Validation on producers

By default, `schemaValidationEnforced` is **disabled** for producers, which means:
* A producer without a schema can produce any kind of messages to a topic with schemas, which may result in producing trash data to the topic.
* It allows non-java language clients that don’t support schema can produce messages to a topic with schemas.

However, if you want a stronger guarantee on the topics with schemas, you can enable `schemaValidationEnforced` across the whole cluster or on a per-namespace basis.

With `schemaValidationEnforced` enabled, When a producer tries to connect to a topic (suppose ignore the schema auto-creation), the broker checks if the schema carried by the producer exists in the schema registry or not.
* If the schema is already registered, then the producer is connected to a broker and produces messages with that schema.
* If the schema is not registered, then Pulsar verifies if the schema is allowed to be registered based on the configured compatibility check strategy.

### Validation on consumers

When a consumer tries to connect to a topic, a broker checks if a carried schema is compatible with a registered schema based on the configured schema compatibility check strategy.

| Compatibility check strategy | Check logic |
|------------------------------|--------------------------|
| `ALWAYS_COMPATIBLE` | All pass |
| `ALWAYS_INCOMPATIBLE` | No pass |
| `BACKWARD` | Can read the last schema |
| `BACKWARD_TRANSITIVE` | Can read all schemas |
| `FORWARD` | Can read the last schema |
| `FORWARD_TRANSITIVE` | Can read the last schema |
| `FULL` | Can read the last schema |
| `FULL_TRANSITIVE` | Can read all schemas |

## Order of upgrading clients

The order of upgrading client applications is determined by the [schema compatibility check strategy](#schema-compatibility-check-strategy).

For example, the producers use schemas to write data to Pulsar and the consumers use schemas to read data from Pulsar.

| Compatibility check strategy | Upgrade first | Description |
| --- | --- |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `ALWAYS_COMPATIBLE` | Any order | The compatibility check is disabled. Consequently, you can upgrade the producers and consumers in **any order**. |
| `ALWAYS_INCOMPATIBLE` | None | The schema evolution is disabled. |
| <li>`BACKWARD` </li><li>`BACKWARD_TRANSITIVE` </li> | Consumers | There is no guarantee that consumers using the old schema can read data produced using the new schema. Consequently, **upgrade all consumers first**, and then start producing new data. |
| <li>`FORWARD` </li><li>`FORWARD_TRANSITIVE` </li> | Producers | There is no guarantee that consumers using the new schema can read data produced using the old schema. Consequently, **upgrade all producers first**<li>to use the new schema and ensure that the data already produced using the old schemas are not available to consumers, and then upgrades the consumers. </li> |
| <li>`FULL` </li><li>`FULL_TRANSITIVE` </li> | Any order | It is guaranteed that consumers using the old schema can read data produced using the new schema and consumers using the new schema can read data produced using the old schema. Consequently, you can upgrade the producers and consumers in **any order**. |
<Redirect to={useLocationParentPath() + "schema-understand#schema-evolution"} />
````
Loading