Skip to content

Commit

Permalink
[SPARK-32714][PYTHON] Initial pyspark-stubs port
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR proposes migration of [`pyspark-stubs`](https://github.com/zero323/pyspark-stubs) into Spark codebase.

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

Yes. This PR adds type annotations directly to Spark source.

This can impact interaction with development tools for users, which haven't used `pyspark-stubs`.

### How was this patch tested?

- [x] MyPy tests of the PySpark source
    ```
    mypy --no-incremental --config python/mypy.ini python/pyspark
    ```
- [x] MyPy tests of Spark examples
    ```
   MYPYPATH=python/ mypy --no-incremental --config python/mypy.ini examples/src/main/python/ml examples/src/main/python/sql examples/src/main/python/sql/streaming
    ```
- [x] Existing Flake8 linter

- [x] Existing unit tests

Tested against:

- `mypy==0.790+dev.e959952d9001e9713d329a2f9b196705b028f894`
- `mypy==0.782`

Closes #29591 from zero323/SPARK-32681.

Authored-by: zero323 <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
  • Loading branch information
zero323 authored and HyukjinKwon committed Sep 24, 2020
1 parent 0bc0e91 commit 31a16fb
Show file tree
Hide file tree
Showing 189 changed files with 14,053 additions and 119 deletions.
1 change: 1 addition & 0 deletions dev/.rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,4 @@ GangliaReporter.java
application_1578436911597_0052
config.properties
app-20200706201101-0003
py.typed
2 changes: 1 addition & 1 deletion dev/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ exclude=python/pyspark/cloudpickle/*.py,shared.py,python/docs/source/conf.py,wor

[flake8]
select = E901,E999,F821,F822,F823,F401,F405
exclude = python/pyspark/cloudpickle/*.py,shared.py,python/docs/source/conf.py,work/*/*.py,python/.eggs/*,dist/*,.git/*
exclude = python/pyspark/cloudpickle/*.py,shared.py*,python/docs/source/conf.py,work/*/*.py,python/.eggs/*,dist/*,.git/*,python/out,python/pyspark/sql/pandas/functions.pyi,python/pyspark/sql/column.pyi,python/pyspark/worker.pyi,python/pyspark/java_gateway.pyi
max-line-length = 100
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@
# We may alternatively specify parameters using a Python dictionary as a paramMap
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30 # Specify 1 Param, overwriting the original maxIter.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55}) # Specify multiple Params.
# Specify multiple Params.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55}) # type: ignore

# You can combine paramMaps, which are python dictionaries.
paramMap2 = {lr.probabilityCol: "myProbability"} # Change output column name
# Change output column name
paramMap2 = {lr.probabilityCol: "myProbability"} # type: ignore
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2)
paramMapCombined.update(paramMap2) # type: ignore

# Now learn a new model using the paramMapCombined parameters.
# paramMapCombined overrides all parameters set earlier via lr.set* methods.
Expand Down
6 changes: 3 additions & 3 deletions examples/src/main/python/ml/fm_classifier_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@
print("Test set accuracy = %g" % accuracy)

fmModel = model.stages[2]
print("Factors: " + str(fmModel.factors))
print("Linear: " + str(fmModel.linear))
print("Intercept: " + str(fmModel.intercept))
print("Factors: " + str(fmModel.factors)) # type: ignore
print("Linear: " + str(fmModel.linear)) # type: ignore
print("Intercept: " + str(fmModel.intercept)) # type: ignore
# $example off$

spark.stop()
6 changes: 3 additions & 3 deletions examples/src/main/python/ml/fm_regressor_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

fmModel = model.stages[1]
print("Factors: " + str(fmModel.factors))
print("Linear: " + str(fmModel.linear))
print("Intercept: " + str(fmModel.intercept))
print("Factors: " + str(fmModel.factors)) # type: ignore
print("Linear: " + str(fmModel.linear)) # type: ignore
print("Intercept: " + str(fmModel.intercept)) # type: ignore
# $example off$

spark.stop()
8 changes: 6 additions & 2 deletions examples/src/main/python/ml/pipeline_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,12 @@
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
rid, text, prob, prediction = row
print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))
rid, text, prob, prediction = row # type: ignore
print(
"(%d, %s) --> prob=%s, prediction=%f" % (
rid, text, str(prob), prediction # type: ignore
)
)
# $example off$

spark.stop()
4 changes: 2 additions & 2 deletions examples/src/main/python/sql/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@


def dataframe_with_arrow_example(spark):
import numpy as np
import pandas as pd
import numpy as np # type: ignore[import]
import pandas as pd # type: ignore[import]

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
Expand Down
1 change: 1 addition & 0 deletions python/MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ recursive-include deps/data *.data *.txt
recursive-include deps/licenses *.txt
recursive-include deps/examples *.py
recursive-include lib *.zip
recursive-include pyspark *.pyi py.typed
include README.md
36 changes: 36 additions & 0 deletions python/mypy.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
;
; Licensed to the Apache Software Foundation (ASF) under one or more
; contributor license agreements. See the NOTICE file distributed with
; this work for additional information regarding copyright ownership.
; The ASF licenses this file to You under the Apache License, Version 2.0
; (the "License"); you may not use this file except in compliance with
; the License. You may obtain a copy of the License at
;
; http://www.apache.org/licenses/LICENSE-2.0
;
; Unless required by applicable law or agreed to in writing, software
; distributed under the License is distributed on an "AS IS" BASIS,
; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
; See the License for the specific language governing permissions and
; limitations under the License.
;

[mypy]

[mypy-pyspark.cloudpickle.*]
ignore_errors = True

[mypy-py4j.*]
ignore_missing_imports = True

[mypy-numpy]
ignore_missing_imports = True

[mypy-scipy.*]
ignore_missing_imports = True

[mypy-pandas.*]
ignore_missing_imports = True

[mypy-pyarrow]
ignore_missing_imports = True
73 changes: 73 additions & 0 deletions python/pyspark/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import Callable, Optional, TypeVar

from pyspark.accumulators import ( # noqa: F401
Accumulator as Accumulator,
AccumulatorParam as AccumulatorParam,
)
from pyspark.broadcast import Broadcast as Broadcast # noqa: F401
from pyspark.conf import SparkConf as SparkConf # noqa: F401
from pyspark.context import SparkContext as SparkContext # noqa: F401
from pyspark.files import SparkFiles as SparkFiles # noqa: F401
from pyspark.status import (
StatusTracker as StatusTracker,
SparkJobInfo as SparkJobInfo,
SparkStageInfo as SparkStageInfo,
) # noqa: F401
from pyspark.profiler import ( # noqa: F401
BasicProfiler as BasicProfiler,
Profiler as Profiler,
)
from pyspark.rdd import RDD as RDD, RDDBarrier as RDDBarrier # noqa: F401
from pyspark.serializers import ( # noqa: F401
MarshalSerializer as MarshalSerializer,
PickleSerializer as PickleSerializer,
)
from pyspark.status import ( # noqa: F401
SparkJobInfo as SparkJobInfo,
SparkStageInfo as SparkStageInfo,
StatusTracker as StatusTracker,
)
from pyspark.storagelevel import StorageLevel as StorageLevel # noqa: F401
from pyspark.taskcontext import ( # noqa: F401
BarrierTaskContext as BarrierTaskContext,
BarrierTaskInfo as BarrierTaskInfo,
TaskContext as TaskContext,
)
from pyspark.util import InheritableThread as InheritableThread # noqa: F401

# Compatiblity imports
from pyspark.sql import ( # noqa: F401
SQLContext as SQLContext,
HiveContext as HiveContext,
Row as Row,
)

T = TypeVar("T")
F = TypeVar("F", bound=Callable)

def since(version: str) -> Callable[[T], T]: ...
def copy_func(
f: F,
name: Optional[str] = ...,
sinceversion: Optional[str] = ...,
doc: Optional[str] = ...,
) -> F: ...
def keyword_only(func: F) -> F: ...
27 changes: 27 additions & 0 deletions python/pyspark/_globals.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# NOTE: This dynamically typed stub was automatically generated by stubgen.

from typing import Any

__ALL__: Any

class _NoValueType:
def __new__(cls): ...
def __reduce__(self): ...
33 changes: 33 additions & 0 deletions python/pyspark/_typing.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import Callable, Iterable, Sized, TypeVar, Union
from typing_extensions import Protocol

F = TypeVar("F", bound=Callable)
T = TypeVar("T", covariant=True)

PrimitiveType = Union[bool, float, int, str]

class SupportsIAdd(Protocol):
def __iadd__(self, other: SupportsIAdd) -> SupportsIAdd: ...

class SupportsOrdering(Protocol):
def __le__(self, other: SupportsOrdering) -> bool: ...

class SizedIterable(Protocol, Sized, Iterable[T]): ...
71 changes: 71 additions & 0 deletions python/pyspark/accumulators.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import Callable, Generic, Tuple, Type, TypeVar

import socketserver.BaseRequestHandler # type: ignore

from pyspark._typing import SupportsIAdd

T = TypeVar("T")
U = TypeVar("U", bound=SupportsIAdd)

import socketserver as SocketServer

class Accumulator(Generic[T]):
aid: int
accum_param: AccumulatorParam[T]
def __init__(
self, aid: int, value: T, accum_param: AccumulatorParam[T]
) -> None: ...
def __reduce__(
self,
) -> Tuple[
Callable[[int, int, AccumulatorParam[T]], Accumulator[T]],
Tuple[int, int, AccumulatorParam[T]],
]: ...
@property
def value(self) -> T: ...
@value.setter
def value(self, value: T) -> None: ...
def add(self, term: T) -> None: ...
def __iadd__(self, term: T) -> Accumulator[T]: ...

class AccumulatorParam(Generic[T]):
def zero(self, value: T) -> T: ...
def addInPlace(self, value1: T, value2: T) -> T: ...

class AddingAccumulatorParam(AccumulatorParam[U]):
zero_value: U
def __init__(self, zero_value: U) -> None: ...
def zero(self, value: U) -> U: ...
def addInPlace(self, value1: U, value2: U) -> U: ...

class _UpdateRequestHandler(SocketServer.StreamRequestHandler):
def handle(self) -> None: ...

class AccumulatorServer(SocketServer.TCPServer):
auth_token: str
def __init__(
self,
server_address: Tuple[str, int],
RequestHandlerClass: Type[socketserver.BaseRequestHandler],
auth_token: str,
) -> None: ...
server_shutdown: bool
def shutdown(self) -> None: ...
46 changes: 46 additions & 0 deletions python/pyspark/broadcast.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import threading
from typing import Any, Generic, Optional, TypeVar

T = TypeVar("T")

class Broadcast(Generic[T]):
def __init__(
self,
sc: Optional[Any] = ...,
value: Optional[T] = ...,
pickle_registry: Optional[Any] = ...,
path: Optional[Any] = ...,
sock_file: Optional[Any] = ...,
) -> None: ...
def dump(self, value: Any, f: Any) -> None: ...
def load_from_path(self, path: Any): ...
def load(self, file: Any): ...
@property
def value(self) -> T: ...
def unpersist(self, blocking: bool = ...) -> None: ...
def destroy(self, blocking: bool = ...) -> None: ...
def __reduce__(self): ...

class BroadcastPickleRegistry(threading.local):
def __init__(self) -> None: ...
def __iter__(self) -> None: ...
def add(self, bcast: Any) -> None: ...
def clear(self) -> None: ...
Loading

0 comments on commit 31a16fb

Please sign in to comment.