Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support non-json serializers #13

Open
asvetlov opened this issue Oct 9, 2023 · 4 comments
Open

Support non-json serializers #13

asvetlov opened this issue Oct 9, 2023 · 4 comments

Comments

@asvetlov
Copy link
Contributor

asvetlov commented Oct 9, 2023

Hey! Thank you for the library.

For some reason, I would use CBOR serializer instead of JSON.

The system could be configured easily enough by overloading Pipeline and AbstractStep dumps() and loads() methods. It works pretty well at least with aio-pika broker thanks to the fact that pika headers could be not only strings but also bytes.

The solution looks a little cumbersome though, it requires overloading more methods than it should be.

As a proper and generic resolution of the problem, I could imagine adding a new abstraction: AsyncBroker.serializer. It should be an instance of the following class:

class TaskiqSerializer(ABC):
    @abstractmethod
    def dumps(self, value: Any) -> bytes: ...
        @abstractmethod
    def loads(self, value: bytes) -> Any: ...
    

The difference between TaskiqSerializer and TaskiqFormatter is that TaskiqFormatter works with TaskiqMessage only but serializer should operate with any primitive types that are supported by underlying serialization system (things like decimals, datetimes, etc). Serializer supersedes formatter ABC and could be used instead with deprecation of TaskiqFormatter class and AsyncBroker.formatter attribute.

Another subtle problem is the type of TaskiqMessage.headers and BrokerMessage.headers. I don't think it is the showstopper; AsyncTaskiqDecoratedTask.labels is Dict[str, Any] already which is the correct type I believe.

What do you think? If you agree with my thoughts I can try to create PR for taskiq itself first and modify taskiq-pipelines later.

@s3rius
Copy link
Member

s3rius commented Oct 9, 2023

I'm very glad you liked the project, and the issue seems to be very reasonable to me.

Also, we can benefit from it by making support for custom types easier and removing these checks from the kicker:
taskiq-python/taskiq@34db231/taskiq/kicker.py#L156

@s3rius
Copy link
Member

s3rius commented Oct 9, 2023

Regarding headers. Header copying happens only in the RabbitMQ. Not all brokers support headers for messages. The main reason we made it this way is because We wanted to give people control over how they can send messages.

I assume that the most elegant solution is to separate message headers and task labels. That will also reduce the number of bytes sent to the broker. Since, as for now, we have duplicate data in messages and headers. Which isn't good, I presume.

@asvetlov
Copy link
Contributor Author

Regarding headers/tasks duplication -- I suggest leaving it for a separate improvement.

Speaking about a serializer, I think that the serializer should work with primitive terminal types only and know nothing about dataclasses or pydantic models. Datetime (even timezone aware) is still a primitive type, dataclass is not. Serializer has knowledge about json structures. Also, it can know how to work with some additional types if they are supported by underlying binary format. For example, msgpack and cbor both support tagged types. cbor has standard tags for decimals, dates, and even URIs.

But I really don't think that the serializer should do something with pydantic models or dataclasses.

@s3rius
Copy link
Member

s3rius commented Oct 10, 2023

Okay. Now I cannot see how we can use serializers. For example, currently you can create a cbor formatter to make all your tasks serialized using this format.

class CBORFormatter(TaskiqFormatter):
    def dumps(self, data: TaskiqMessage) -> BrokerMessage:
        return BrokerMessage(
            task_id=data.task_id,
            task_name=data.task_name,
            labels=data.labels,
            message=cbor2.dumps(data.model_dump()),
        )

    def loads(self, data: bytes) -> TaskiqMessage:
        return TaskiqMessage.model_validate(**cbor2.loads(data))

If you subscribe to a topic and send a message, you will see that the mesasge is properly formatted.

image

If you want to serialize the pipeline data using cbor, maybe it'd be easier to add a parameter to the PipelineMiddleware that is capable of serializing and deserializing pipeline data.

But I still want to understand when new abstraction might be used and how. Since it has a priority over a formatter and operates over values rather than a whole message, I don't see where to call serialize and deserialize.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants