Skip to content

Commit

Permalink
Merge pull request #39 from JosepSampe/pywren-cf
Browse files Browse the repository at this point in the history
Use CloudPickle module in all project for serializing/unserializing
  • Loading branch information
gilv authored Nov 29, 2018
2 parents dbfc50d + 4a05728 commit d341e33
Show file tree
Hide file tree
Showing 10 changed files with 49 additions and 110 deletions.
6 changes: 0 additions & 6 deletions pywren/pywren_ibm_cloud/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,5 @@
# limitations under the License.
#

from __future__ import absolute_import

from pywren_ibm_cloud.wren import *
from pywren_ibm_cloud.version import __version__
import os


SOURCE_DIR = os.path.dirname(os.path.abspath(__file__))
16 changes: 16 additions & 0 deletions pywren/pywren_ibm_cloud/action/__main__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
#
# (C) Copyright IBM Corp. 2018
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging
from pywren_ibm_cloud import wrenlogging
from pywren_ibm_cloud.action.wrenhandler import ibm_cloud_function_handler
Expand Down
4 changes: 2 additions & 2 deletions pywren/pywren_ibm_cloud/action/jobrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import time
import logging
import inspect
from six.moves import cPickle as pickle
from pywren_ibm_cloud.libs import cloudpickle as pickle
from pywren_ibm_cloud import wrenlogging
from pywren_ibm_cloud.storage import storage
from pywren_ibm_cloud.storage.backends.cos import COSBackend
Expand Down Expand Up @@ -74,7 +74,7 @@ def write_stat(stat, val):
stats_fid.flush()

try:
logger.info("Getting function from COS")
logger.info("Getting function and modules")
func_download_time_t1 = time.time()
func_obj = internal_storage.get_func(func_key)
loaded_func_all = pickle.loads(func_obj)
Expand Down
23 changes: 12 additions & 11 deletions pywren/pywren_ibm_cloud/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,24 @@
# limitations under the License.
#

import logging
import time
from multiprocessing.pool import ThreadPool
from six.moves import cPickle as pickle
import os
import time
import logging
import inspect
import requests
import pywren_ibm_cloud as pywren
import pywren_ibm_cloud.version as version
import pywren_ibm_cloud.wrenutil as wrenutil
from pywren_ibm_cloud.wait import wait
from multiprocessing.pool import ThreadPool
from pywren_ibm_cloud.wrenconfig import MAX_AGG_DATA_SIZE
from pywren_ibm_cloud.partitioner import object_partitioner
from pywren_ibm_cloud.future import ResponseFuture, JobState
from pywren_ibm_cloud.runtime import get_runtime_preinstalls
from pywren_ibm_cloud.serialize import serialize, create_mod_data
from pywren_ibm_cloud.storage.storage_utils import create_keys, create_func_key, create_agg_data_key
from pywren_ibm_cloud.storage.backends.cos import COSBackend
from pywren_ibm_cloud.libs import cloudpickle as pickle


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -236,7 +236,7 @@ def multiple_call(self, map_function, iterdata, reduce_function=None,
part_func_args = [{'map_func_args': arg_data,
'chunk_size': obj_chunk_size}]

logger.debug("Calling map on partitions from COS flow")
logger.debug("Calling map on partitions from object storage flow")
return self.map(object_partitioner_function, part_func_args,
extra_env=extra_env,
extra_meta=extra_meta,
Expand All @@ -246,7 +246,7 @@ def multiple_call(self, map_function, iterdata, reduce_function=None,
overwrite_invoke_args=overwrite_invoke_args,
exclude_modules=exclude_modules)
else:
logger.debug("Map on anything else")
logger.debug("No need to process objects from object store")

def remote_invoker(input_data):
pw = pywren.ibm_cf_executor()
Expand Down Expand Up @@ -361,8 +361,7 @@ def map(self, func, iterdata, extra_env=None, extra_meta=None, invoke_pool_threa

module_data = create_mod_data(mod_paths)
# Create func and upload
func_module_str = pickle.dumps({'func': func_str,
'module_data': module_data}, -1)
func_module_str = pickle.dumps({'func': func_str, 'module_data': module_data}, -1)
host_job_meta['func_module_str_len'] = len(func_module_str)

func_upload_time = time.time()
Expand Down Expand Up @@ -455,18 +454,20 @@ def reduce_function_wrapper(fut_list, internal_storage, storage, ibm_cos):
for f in fut_list:
results.append(f.result(throw_except=throw_except, internal_storage=internal_storage))

reduce_func_args = {'results':results}
reduce_func_args = {'results': results}

# Run reduce function
func_sig = inspect.signature(reduce_function)
if 'futures' in func_sig.parameters:
reduce_func_args['futures'] = fut_list
else:
del fut_list
if 'storage' in func_sig.parameters:
reduce_func_args['storage'] = storage
if 'ibm_cos' in func_sig.parameters:
reduce_func_args['ibm_cos'] = ibm_cos

return reduce_function(**reduce_func_args)

return self.single_call(reduce_function_wrapper, [list_of_futures, ],
extra_env=extra_env, extra_meta=extra_meta)
return self.map(reduce_function_wrapper, [[list_of_futures, ]], extra_env=extra_env,
extra_meta=extra_meta, original_func_name=reduce_function.__name__)
8 changes: 2 additions & 6 deletions pywren/pywren_ibm_cloud/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,9 @@
import time
import enum
from six import reraise
from six.moves import cPickle as pickle
from pywren_ibm_cloud.storage import storage, storage_utils

try:
from tblib import pickling_support
except:
from pywren_ibm_cloud.libs.tblib import pickling_support
from pywren_ibm_cloud.libs import cloudpickle as pickle
from pywren_ibm_cloud.libs.tblib import pickling_support

pickling_support.install()

Expand Down
3 changes: 3 additions & 0 deletions pywren/pywren_ibm_cloud/libs/cloudpickle/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from pywren_ibm_cloud.libs.cloudpickle.cloudpickle import *

__version__ = '0.6.1'
37 changes: 0 additions & 37 deletions pywren/pywren_ibm_cloud/serialize/cloudpickle/__init__.py

This file was deleted.

60 changes: 14 additions & 46 deletions pywren/pywren_ibm_cloud/serialize/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,10 @@
# WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import sys
import types
import time
import logging

if sys.version < '3':
try:
from cStringIO import StringIO
except ImportError:
from io import BytesIO as StringIO
PY3 = False
else:
types.ClassType = type
from io import BytesIO as StringIO
PY3 = True

from pywren_ibm_cloud.serialize.cloudpickle import CloudPickler
import logging
from io import BytesIO as StringIO
from pywren_ibm_cloud.libs.cloudpickle import CloudPickler
from pywren_ibm_cloud.serialize.module_dependency import ModuleDependencyAnalyzer

logger = logging.getLogger(__name__)
Expand All @@ -67,22 +54,17 @@ def __call__(self, list_of_objs, **kwargs):
preinstalled_modules = [name for name, _ in self.preinstalled_modules]
self._modulemgr.ignore(preinstalled_modules)

# f_kwargs = {}
# for k, v in kwargs.items():
# if not k.startswith('_'):
# f_kwargs[k] = v
# del kwargs[k]

cps = []
strs = []
for obj in list_of_objs:
s = StringIO()
cp = CloudPickler(s, 2)
# start = time.time()
cp.dump(obj)
# logger.debug('Time to pickle (CloudPickle): {} seconds'.format(round(time.time()-start, 3), '.3f'))
cps.append(cp)
strs.append(s)
file = StringIO()
try:
cp = CloudPickler(file)
cp.dump(obj)
cps.append(cp)
strs.append(file.getvalue())
finally:
file.close()

if '_ignore_module_dependencies' in kwargs:
ignore_modulemgr = kwargs['_ignore_module_dependencies']
Expand All @@ -95,22 +77,8 @@ def __call__(self, list_of_objs, **kwargs):
for cp in cps:
for module in cp.modules:
self._modulemgr.add(module.__name__)
# FIXME add logging
#print('inspected modules', self._modulemgr._inspected_modules)
#print('modules to inspect', self._modulemgr._modules_to_inspect)
#print('paths to trans', self._modulemgr._paths_to_transmit)

mod_paths = self._modulemgr.get_and_clear_paths()
#print("mod_paths=", mod_paths)

return ([s.getvalue() for s in strs], mod_paths)

if __name__ == "__main__":
serialize = SerializeIndependent()

def foo(x):
y = x + 10
return y + 1
mod_paths = self._modulemgr.get_and_clear_paths()
logger.debug("Modules to transmit: {}".format(None if not mod_paths else mod_paths))

sb, paths = serialize(foo, 6)
print("paths=", paths)
return (strs, mod_paths)
2 changes: 0 additions & 2 deletions pywren/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,5 @@
tests_requires=[
'pytest', 'numpy',
],
package_data={
'pywren': ['jobrunner/jobrunner.py']},
include_package_data=True
)

0 comments on commit d341e33

Please sign in to comment.