-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserializers.py
106 lines (80 loc) · 3.88 KB
/
serializers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import numpy as np
import pandas as pd
from sagemaker.serializers import NumpySerializer, SimpleBaseSerializer
class ParquetSerializer(SimpleBaseSerializer):
"""Serialize data to a buffer using the .parquet format."""
def __init__(self, content_type="application/x-parquet"):
"""Initialize a ``ParquetSerializer`` instance.
Args:
content_type (str): The MIME type to signal to the inference endpoint when sending
request data (default: "application/x-parquet").
"""
super(ParquetSerializer, self).__init__(content_type=content_type)
def serialize(self, data):
"""Serialize data to a buffer using the .parquet format.
Args:
data (object): Data to be serialized. Can be a Pandas Dataframe,
file, or buffer.
Returns:
io.BytesIO: A buffer containing data serialized in the .parquet format.
"""
if isinstance(data, pd.DataFrame):
return data.to_parquet()
# files and buffers. Assumed to hold parquet-formatted data.
if hasattr(data, "read"):
return data.read()
raise ValueError(
f"{data} format is not supported. Please provide a DataFrame, parquet file, or buffer."
)
class MultiModalSerializer(SimpleBaseSerializer):
"""
Serializer for multi-modal use case.
When passed in a dataframe, the serializer will serialize the data to be parquet format.
When passed in a numpy array, the serializer will serialize the data to be numpy format.
"""
def __init__(self, content_type="application/x-parquet"):
"""Initialize a ``MultiModalSerializer`` instance.
Args:
content_type (str): The MIME type to signal to the inference endpoint when sending
request data (default: "application/x-parquet").
To BE NOTICED, this content_type will not used by MultiModalSerializer
as it doesn't support dynamic updating. Instead, we pass expected content_type to
`initial_args` of `predict()` call to endpoints.
"""
super(MultiModalSerializer, self).__init__(content_type=content_type)
self.parquet_serializer = ParquetSerializer()
self.numpy_serializer = NumpySerializer()
def serialize(self, data):
"""Serialize data to a buffer using the .parquet format or numpy format.
Args:
data (object): Data to be serialized. Can be a Pandas Dataframe,
or numpy array
Returns:
io.BytesIO: A buffer containing data serialized in the .parquet or .npy format.
"""
if isinstance(data, pd.DataFrame):
return self.parquet_serializer.serialize(data)
if isinstance(data, np.ndarray):
return self.numpy_serializer.serialize(data)
raise ValueError(
f"{data} format is not supported. Please provide a DataFrame, or numpy array."
)
class JsonLineSerializer(SimpleBaseSerializer):
"""Serialize data to a buffer using the .jsonl format."""
def __init__(self, content_type="application/jsonl"):
"""Initialize a ``JsonLineSerializer`` instance.
Args:
content_type (str): The MIME type to signal to the inference endpoint when sending
request data (default: "application/jsonl").
"""
super(JsonLineSerializer, self).__init__(content_type=content_type)
def serialize(self, data):
"""Serialize data to a buffer using the .jsonl format.
Args:
data (pd.DataFrame): Data to be serialized.
Returns:
io.StringIO: A buffer containing data serialized in the .jsonl format.
"""
if isinstance(data, pd.DataFrame):
return data.to_json(orient="records", lines=True)
raise ValueError(f"{data} format is not supported. Please provide a DataFrame.")