Skip to content

Commit 8203165

Browse files
author
Michael Usachenko
committed
pre-pypi commit
1 parent b84f1ff commit 8203165

File tree

5 files changed

+220
-0
lines changed

5 files changed

+220
-0
lines changed

AsyncPQ/ASPQ_Wrapper.py

+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import asyncio
2+
3+
class AsyncPQWrapper(object):
4+
5+
def __init__(self):
6+
self.queue = []
7+
8+
class Unit(object):
9+
10+
def __init__(self, functionRef, fArgs, priority):
11+
12+
self.funcRef = functionRef
13+
self.funcArgs = fArgs
14+
self.priority = priority
15+
16+
def addQUnit(self, functionRef, fArgs, priority):
17+
self.queue.append(self.Unit(functionRef, fArgs, priority))
18+
19+
def preprocessUnits(self):
20+
21+
units = [self.queue.pop() for u in range(len(self.queue))]
22+
if (len(units) == 0): return None
23+
taskloop = asyncio.new_event_loop()
24+
asyncio.set_event_loop(taskloop)
25+
return (units, taskloop)
26+
27+
def processUnits(self): pass
28+
29+
class AsyncRWQueue(AsyncPQWrapper):
30+
31+
def __init__(self):
32+
super().__init__()
33+
self.WRITE_PRIORITY = 1
34+
self.READ_PRIORITY = 0
35+
36+
def processUnits(self):
37+
38+
units, taskloop = self.preprocessUnits()
39+
wUnits = list(filter(lambda x: x.priority == self.WRITE_PRIORITY, units))
40+
rUnits = list(filter(lambda x: x.priority == self.READ_PRIORITY, units))
41+
wTasks, rTasks = [], []
42+
43+
for unit in wUnits:
44+
wTasks.append(asyncio.ensure_future(unit.funcRef(*unit.funcArgs)))
45+
for unit in rUnits:
46+
rTasks.append(asyncio.ensure_future(unit.funcRef(*unit.funcArgs)))
47+
48+
wFuture, rFuture = asyncio.gather(*wTasks), asyncio.gather(*rTasks)
49+
taskloop.run_until_complete(wFuture)
50+
rValues = taskloop.run_until_complete(rFuture)
51+
taskloop.stop()
52+
taskloop.close()
53+
54+
return tuple(rValues[::-1])
55+
56+
class AsyncPQueue(AsyncPQWrapper):
57+
58+
def __init__(self): super().__init__()
59+
60+
def processUnits(self):
61+
62+
units, taskloop = self.preprocessUnits()
63+
uPriorList = sorted(list(set([unit.priority for unit in units])))[::-1]
64+
sortedUnits = [list(filter(lambda x: x.priority == u, units)) for u in uPriorList]
65+
66+
sFutures = []
67+
for sUnit in sortedUnits:
68+
sortedTasks = []
69+
for unit in sUnit:
70+
sortedTasks.append(asyncio.ensure_future(unit.funcRef(*unit.funcArgs)))
71+
sFutures.append(asyncio.gather(*sortedTasks))
72+
73+
for sFuture in sFutures:
74+
taskloop.run_until_complete(sFuture)
75+
76+
taskloop.stop()
77+
taskloop.close()
78+

AsyncPQ/AsyncPQ.py

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import ASPQ_Wrapper as apq
2+
3+
class AsyncQueue(object):
4+
5+
def __init__(self, classDict = {}):
6+
7+
self.pQueue = None
8+
self.classDict = classDict
9+
10+
def updateClassDict(self, newDict):
11+
self.classDict.update(newDict)
12+
13+
def processTasks(self):
14+
return self.pQueue.processUnits()
15+
16+
class AsyncPriorityQueue(AsyncQueue):
17+
18+
def __init__(self, classDict = {}):
19+
20+
super().__init__(classDict)
21+
self.pQueue = apq.AsyncPQueue()
22+
23+
def addTask(self, priority, functionRef, *funcArgs):
24+
self.pQueue.addQUnit(functionRef, funcArgs, priority)
25+
26+
def addClassTask(self, priority, className, operation, *opArgs):
27+
28+
classInst = self.classDict[className]
29+
functionRef = classInst.funcDict[operation]
30+
self.pQueue.addQUnit(functionRef, opArgs, priority)
31+
32+
class AsyncReadWriteQueue(AsyncQueue):
33+
34+
def __init__(self, classDict = {}):
35+
36+
super().__init__(classDict)
37+
self.pQueue = apq.AsyncRWQueue()
38+
self.WRITE_PRIORITY = 1
39+
self.READ_PRIORITY = 0
40+
41+
def read(self, operation, *opargs):
42+
self.pQueue.addQUnit(operation, opargs, self.READ_PRIORITY)
43+
44+
def write(self, operation, *opargs):
45+
self.pQueue.addQUnit(operation, opargs, self.WRITE_PRIORITY)
46+
47+
def cdRead(self, className, operation, *opargs):
48+
49+
classInst = self.classDict[className]
50+
functionRef = classInst.funcDict[operation]
51+
self.pQueue.addQUnit(functionRef, opargs, self.READ_PRIORITY)
52+
53+
def cdWrite(self, className, operation, *opargs):
54+
55+
classInst = self.classDict[className]
56+
functionRef = classInst.funcDict[operation]
57+
self.pQueue.addQUnit(functionRef, opargs, self.WRITE_PRIORITY)
58+
59+
60+

MANIFEST.in

Whitespace-only changes.

examples/example.py

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import asyncio
2+
from AsyncPQ import AsyncPriorityQueue, AsyncReadWriteQueue
3+
4+
async def testFunc(arg):
5+
await asyncio.sleep(0)
6+
print(arg)
7+
8+
aspq = AsyncPriorityQueue()
9+
aspq.addTask(15, testFunc, 15)
10+
aspq.addTask(15, testFunc, 15)
11+
aspq.addTask(27, testFunc, 27)
12+
aspq.addTask(27, testFunc, 27)
13+
aspq.addTask(89, testFunc, 89)
14+
aspq.addTask(91, testFunc, 91)
15+
aspq.processTasks()
16+
print()
17+
18+
asrwq = AsyncReadWriteQueue()
19+
asrwq.read(testFunc, 15)
20+
asrwq.write(testFunc, 15)
21+
asrwq.read(testFunc, 27)
22+
asrwq.read(testFunc, 27)
23+
asrwq.write(testFunc, 89)
24+
asrwq.write(testFunc, 91)
25+
asrwq.processTasks()

setup.py

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
from setuptools import setup, find_packages # Always prefer setuptools over distutils
2+
from codecs import open # To use a consistent encoding
3+
from os import path
4+
5+
here = path.abspath(path.dirname(__file__))
6+
7+
# Get the long description from the relevant file
8+
with open(path.join(here, 'README'), encoding='utf-8') as f:
9+
long_description = f.read()
10+
11+
setup(
12+
name='AsyncPQ',
13+
14+
# Versions should comply with PEP440. For a discussion on single-sourcing
15+
# the version across setup.py and the project code, see
16+
# http://packaging.python.org/en/latest/tutorial.html#version
17+
version='1.0.0',
18+
19+
description='A little exercise in concurrency. Super intuitive way to schedule asynchronous tasks with little overhead.',
20+
long_description=long_description, #this is the
21+
22+
# The project's main homepage.
23+
url='https://github.com/themichaelusa/AsyncPQ',
24+
25+
# Author details
26+
author='Michael USA',
27+
author_email='[email protected]',
28+
29+
# Choose your license
30+
license='MIT',
31+
32+
# See https://PyPI.python.org/PyPI?%3Aaction=list_classifiers
33+
classifiers=[
34+
# How mature is this project? Common values are
35+
# 3 - Alpha
36+
# 4 - Beta
37+
# 5 - Production/Stable
38+
'Development Status :: 3 - Alpha',
39+
40+
# Indicate who your project is intended for
41+
'Intended Audience :: Developers',
42+
'Topic :: Software Development :: Build Tools',
43+
44+
# Pick your license as you wish (should match "license" above)
45+
'License :: OSI Approved :: MIT License',
46+
47+
# Specify the Python versions you support here. In particular, ensure
48+
# that you indicate whether you support Python 2, Python 3 or both.
49+
'Programming Language :: Python :: 3.6',
50+
],
51+
52+
# What does your project relate to?
53+
keywords='sample setuptools development',
54+
55+
packages=["MY-PACKAGE"],
56+
57+
)

0 commit comments

Comments
 (0)