-
Notifications
You must be signed in to change notification settings - Fork 50
/
Copy pathbasic_usage.py
204 lines (184 loc) · 6.22 KB
/
basic_usage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
"""
=============================================================
Distribute sklearn model prediction with PySpark using skdist
=============================================================
In this example we train 3 sklearn estimators with 3 different
types of feature spaces. This includes standard 2-D numpy
array ready for direct input into a sklearn estimator, single
field text input ready for input into a text vectorizer piped
into a sklearn estimator, and finally a pandas DataFrame
ready for input into a custom feature pipeline or a sklearn
ColumnTransformer piped into a sklearn estimator.
The skdist.predict module is built to handle all 3 of these
use cases, intended to cover most of the ground needed to
handle a wide array of prediction use cases.
In these examples, we don't have a large amount of data
at predict time, so we simply parallelize the sample datasets
to a PySpark DataFrame for illustrative purposes. In
practice, skdist.predict would only be useful if there were
large amounts of data from another source ready for prediction.
Here is a sample output run:
+-----+--------------------+
|preds| scores|
+-----+--------------------+
| 0|[0.99988026795692...|
| 1|[4.75035277837040...|
| 2|[2.94811218592164...|
| 3|[1.63438595023762...|
| 4|[1.11339868338047...|
| 5|[1.47300432716012...|
| 6|[1.08560009259480...|
| 7|[3.02428232165044...|
| 8|[7.65445972596079...|
| 9|[3.97610488897298...|
| 0|[0.99918670844137...|
| 1|[2.65336456879078...|
| 2|[1.85886361541580...|
| 3|[2.89824009324990...|
| 4|[2.84813979824305...|
| 5|[2.70090567992820...|
| 6|[1.10907772018062...|
| 7|[3.06455862370095...|
| 8|[2.38739344440480...|
| 9|[8.23628591704589...|
+-----+--------------------+
only showing top 20 rows
+-----+--------------------+
|preds| scores|
+-----+--------------------+
| 4|[0.03736128393565...|
| 0|[0.09792807410478...|
| 17|[0.05044543817914...|
| 11|[0.03443972986074...|
| 10|[0.04757471929521...|
| 15|[0.04555477151025...|
| 4|[0.04025302976824...|
| 17|[0.04606538206124...|
| 4|[0.05296440750891...|
| 12|[0.04526243345294...|
| 4|[0.03733198188990...|
| 6|[0.04041213769366...|
| 4|[0.04252566904405...|
| 15|[0.04738860601686...|
| 4|[0.03942044494467...|
| 11|[0.04281835124858...|
| 11|[0.03675331309090...|
| 4|[0.03287753061778...|
| 12|[0.04517622045917...|
| 11|[0.04878195327579...|
+-----+--------------------+
only showing top 20 rows
+-----+--------------------+
|preds| scores|
+-----+--------------------+
| 4|[0.03736128393565...|
| 0|[0.09792807410478...|
| 17|[0.05044543817914...|
| 11|[0.03443972986074...|
| 10|[0.04757471929521...|
| 15|[0.04555477151025...|
| 4|[0.04025302976824...|
| 17|[0.04606538206124...|
| 4|[0.05296440750891...|
| 12|[0.04526243345294...|
| 4|[0.03733198188990...|
| 6|[0.04041213769366...|
| 4|[0.04252566904405...|
| 15|[0.04738860601686...|
| 4|[0.03942044494467...|
| 11|[0.04281835124858...|
| 11|[0.03675331309090...|
| 4|[0.03287753061778...|
| 12|[0.04517622045917...|
| 11|[0.04878195327579...|
+-----+--------------------+
only showing top 20 rows
"""
print(__doc__)
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.datasets import fetch_20newsgroups, load_digits
from sklearn.linear_model import LogisticRegression
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.pipeline import Pipeline
from skdist.distribute.predict import get_prediction_udf
from pyspark.sql import SparkSession, functions as F
# spark session initialization
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
# simple 2-D numpy features
data = load_digits()
X = data["data"]
y = data["target"]
model = LogisticRegression(solver="liblinear", multi_class="auto")
model.fit(X, y)
# get UDFs with default 'numpy' feature types
predict = get_prediction_udf(model, method="predict")
predict_proba = get_prediction_udf(model, method="predict_proba")
# create PySpark DataFrame from features
pdf = pd.DataFrame(X)
sdf = spark.createDataFrame(pdf)
cols = [F.col(str(c)) for c in sdf.columns]
# apply predict UDFs and select prediction output
prediction_df = (
sdf.withColumn("scores", predict_proba(*cols))
.withColumn("preds", predict(*cols))
.select("preds", "scores")
)
prediction_df.show()
# single text feature
data = fetch_20newsgroups(
shuffle=True, random_state=1, remove=("headers", "footers", "quotes")
)
X = data["data"][:100]
y = data["target"][:100]
model = Pipeline(
[
("vec", HashingVectorizer()),
("clf", LogisticRegression(solver="liblinear", multi_class="auto")),
]
)
model.fit(X, y)
# get UDFs with 'text' feature types
predict = get_prediction_udf(model, method="predict", feature_type="text")
predict_proba = get_prediction_udf(model, method="predict_proba", feature_type="text")
# create PySpark DataFrame from features
pdf = pd.DataFrame(X)
sdf = spark.createDataFrame(pdf)
cols = [F.col(str(c)) for c in sdf.columns]
# apply predict UDFs and select prediction output
prediction_df = (
sdf.withColumn("scores", predict_proba(*cols))
.withColumn("preds", predict(*cols))
.select("preds", "scores")
)
prediction_df.show()
# complex feature space as pandas DataFrame
X = pd.DataFrame({"text": data["data"][:100]})
y = data["target"][:100]
model = Pipeline(
[
("vec", ColumnTransformer([("text", HashingVectorizer(), "text")])),
("clf", LogisticRegression(solver="liblinear", multi_class="auto")),
]
)
model.fit(X, y)
# get UDFs with 'pandas' feature types
# NOTE: This time we must supply an ordered list
# of column names to the `get_predict_udf` function
predict = get_prediction_udf(
model, method="predict", feature_type="pandas", names=list(X.columns)
)
predict_proba = get_prediction_udf(
model, method="predict_proba", feature_type="pandas", names=list(X.columns)
)
# create PySpark DataFrame from features
sdf = spark.createDataFrame(X)
cols = [F.col(str(c)) for c in sdf.columns]
# apply predict UDFs and select prediction output
prediction_df = (
sdf.withColumn("scores", predict_proba(*cols))
.withColumn("preds", predict(*cols))
.select("preds", "scores")
)
prediction_df.show()