Skip to content

Commit

Permalink
feature/fix-manager
Browse files Browse the repository at this point in the history
  • Loading branch information
cgarciae authored Jul 9, 2020
1 parent 487d166 commit c8b7277
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 26 deletions.
26 changes: 26 additions & 0 deletions benchmarks/100_million_downloads/client-pypeln-idiomatic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# client-pypeln-pl.task.py

from aiohttp import ClientSession, TCPConnector
import asyncio
import sys
import pypeln as pl


limit = 1000
urls = ("http://localhost:8080/{}".format(i) for i in range(int(sys.argv[1])))


async def main():

async with ClientSession(connector=TCPConnector(limit=0)) as session:

async def fetch(url):
async with session.get(url) as response:
return await response.read()

await pl.task.each(
fetch, urls, workers=limit,
)


asyncio.run(main())
8 changes: 6 additions & 2 deletions pypeln/process/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@

from pypeln import utils as pypeln_utils

# multiprocessing = get_context("fork")
MANAGER = multiprocessing.Manager()
MANAGER = None


def Namespace(**kwargs) -> tp.Any:
global MANAGER

if MANAGER is None:
MANAGER = multiprocessing.Manager()

return MANAGER.Namespace(**kwargs)
2 changes: 1 addition & 1 deletion pypeln/sync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ def slow_gt3(x):
from .api.to_iterable import to_iterable
from .api.ordered import ordered
from .stage import Stage
from .utils import get_namespace
from .utils import Namespace
8 changes: 3 additions & 5 deletions pypeln/sync/api/map_sync_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def test_map_square_event_start(nums):
nums_py = map(lambda x: x ** 2, nums)
nums_py = list(nums_py)

namespace = pl.sync.get_namespace()
namespace = pl.sync.Namespace()
namespace.x = 0

def on_start():
Expand Down Expand Up @@ -99,7 +99,7 @@ def test_kwargs():
nums = range(100)
n_workers = 4
letters = "abc"
namespace = pl.sync.get_namespace()
namespace = pl.sync.Namespace()
namespace.on_done = None

def on_start():
Expand All @@ -121,7 +121,7 @@ def on_done(y):
@hp.settings(max_examples=MAX_EXAMPLES)
def test_map_square_event_end(nums):

namespace = pl.sync.get_namespace()
namespace = pl.sync.Namespace()
namespace.x = 0
namespace.done = False
namespace.active_workers = -1
Expand Down Expand Up @@ -156,6 +156,4 @@ def test_map_square_workers(nums):

assert sorted(nums_pl) == sorted(nums_py)



assert nums_pl == nums_py
5 changes: 3 additions & 2 deletions pypeln/sync/utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from collections import namedtuple
from queue import Empty, Full, Queue
from threading import Lock
import typing as tp

from pypeln import utils as pypeln_utils


def get_namespace():
return pypeln_utils.Namespace()
def Namespace(**kwargs) -> tp.Any:
return pypeln_utils.Namespace(**kwargs)


class StageStatus(object):
Expand Down
18 changes: 15 additions & 3 deletions pypeln/task/api/each.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ def each(
timeout: float = 0,
on_start: tp.Callable = None,
on_done: tp.Callable = None,
run: bool = False,
) -> tp.Optional[Stage[B]]:
) -> Stage[B]:
...


Expand All @@ -52,8 +51,21 @@ def each(
timeout: float = 0,
on_start: tp.Callable = None,
on_done: tp.Callable = None,
) -> pypeln_utils.Partial[Stage[B]]:
...


@tp.overload
def each(
f: EachFn,
stage: tp.Union[Stage[A], tp.Iterable[A], tp.AsyncIterable[A]],
workers: int = 1,
maxsize: int = 0,
timeout: float = 0,
on_start: tp.Callable = None,
on_done: tp.Callable = None,
run: bool = False,
) -> pypeln_utils.Partial[tp.Optional[Stage[B]]]:
) -> tp.Optional[Stage[B]]:
...


Expand Down
9 changes: 1 addition & 8 deletions pypeln/task/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,8 @@
from pypeln import utils as pypeln_utils


class _Namespace:
def __init__(self, **kwargs):

for key, value in kwargs.items():
setattr(self, key, value)


def Namespace(**kwargs) -> tp.Any:
return _Namespace(**kwargs)
return pypeln_utils.Namespace(**kwargs)


def get_running_loop() -> asyncio.AbstractEventLoop:
Expand Down
2 changes: 1 addition & 1 deletion pypeln/thread/api/map_thread_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def f(x):

return x

nums_pl = pl.thread.map(f, nums, timeout=0.5)
nums_pl = pl.thread.map(f, nums, timeout=0.2)
nums_pl = list(nums_pl)

assert len(nums_pl) == 9
Expand Down
1 change: 1 addition & 0 deletions pypeln/thread/queue_thread_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def worker():
for i in nums:
queue.put(i)

time.sleep(0.01)
queue.stop()

processes = pl.thread.start_workers(worker)
Expand Down
5 changes: 1 addition & 4 deletions pypeln/thread/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@

from pypeln import utils as pypeln_utils

# multiprocessing = get_context("fork")
MANAGER = multiprocessing.Manager()


def Namespace(**kwargs) -> tp.Any:
return MANAGER.Namespace(**kwargs)
return pypeln_utils.Namespace(**kwargs)

0 comments on commit c8b7277

Please sign in to comment.