-
Notifications
You must be signed in to change notification settings - Fork 109
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #35 from omerb01/add_functional_test
add functional test script
- Loading branch information
Showing
3 changed files
with
316 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
http://archive.ics.uci.edu/ml/machine-learning-databases/bag-of-words/vocab.enron.txt | ||
http://archive.ics.uci.edu/ml/machine-learning-databases/bag-of-words/vocab.kos.txt | ||
http://archive.ics.uci.edu/ml/machine-learning-databases/bag-of-words/vocab.nips.txt | ||
http://archive.ics.uci.edu/ml/machine-learning-databases/bag-of-words/vocab.nytimes.txt | ||
http://archive.ics.uci.edu/ml/machine-learning-databases/bag-of-words/vocab.pubmed.txt |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,291 @@ | ||
import sys | ||
import yaml | ||
import os | ||
import unittest | ||
import ibm_boto3 | ||
from ibm_botocore.client import Config | ||
from ibm_botocore.client import ClientError | ||
import pywren_ibm_cloud as pywren | ||
import urllib.request | ||
|
||
PREFIX = '__pywren.test' | ||
|
||
try: | ||
with open('data', 'r') as data_file: | ||
TEST_FILES_URLS = [url for url in data_file.read().split()] | ||
except: | ||
print("can't open data file") | ||
sys.exit() | ||
|
||
try: | ||
config_path = os.path.join(os.path.expanduser("~/.pywren_config")) | ||
with open(config_path, 'r') as config_file: | ||
CONFIG = yaml.safe_load(config_file) | ||
except: | ||
print("can't open config file") | ||
sys.exit() | ||
|
||
def initCos(): | ||
return ibm_boto3.resource("s3", | ||
ibm_api_key_id=CONFIG['ibm_cos']['api_key'], | ||
ibm_auth_endpoint='https://iam.ng.bluemix.net/oidc/token', | ||
config=Config(signature_version="oauth"), | ||
endpoint_url=CONFIG['ibm_cos']['endpoint'] | ||
) | ||
|
||
|
||
def putFileToCOS(cos, bucket_name, key, bytes): | ||
try: | ||
cos.Object(bucket_name, key).put(Body=bytes) | ||
print("Upload file: {} - SUCCESS".format(key)) | ||
except ClientError as be: | ||
print("CLIENT ERROR: {0}\n".format(be)) | ||
except Exception as e: | ||
print("Unable to create bucket: {0}".format(e)) | ||
|
||
|
||
def getFilenamesFromCOS(cos, bucket_name, prefix): | ||
print("Retrieving items' names from bucket: {0}, prefix: {1}".format(bucket_name, prefix)) | ||
result = [] | ||
try: | ||
for data in cos.Bucket(bucket_name).objects.filter(Prefix=prefix): | ||
result.append(data.key) | ||
except ClientError as be: | ||
print("CLIENT ERROR: {0}\n".format(be)) | ||
except Exception as e: | ||
print("Unable to delete item: {0}".format(e)) | ||
return result | ||
|
||
|
||
def getFileFromCOS(cos, bucket_name, key): | ||
print("Retrieving item from bucket: {0}, key: {1}".format(bucket_name, key)) | ||
try: | ||
file = cos.Object(bucket_name, key).get() | ||
return file["Body"].read() | ||
except ClientError as be: | ||
print("CLIENT ERROR: {0}\n".format(be)) | ||
except Exception as e: | ||
print("Unable to retrieve file contents: {0}".format(e)) | ||
|
||
|
||
def deleteFileFromCOS(cos, bucket_name, key): | ||
try: | ||
cos.Object(bucket_name, key).delete() | ||
print("File: {0} deleted!".format(key)) | ||
except ClientError as be: | ||
print("CLIENT ERROR: {0}\n".format(be)) | ||
except Exception as e: | ||
print("Unable to delete item: {0}".format(e)) | ||
|
||
|
||
class TestPywren(unittest.TestCase): | ||
|
||
def hello_world(self, param): | ||
return "Hello World!" | ||
|
||
def simple_map_function(self, x, y): | ||
return x + y | ||
|
||
def simple_reduce_function(self, results): | ||
total = 0 | ||
for map_result in results: | ||
total = total + map_result | ||
return total | ||
|
||
def test_call_async(self): | ||
pw = pywren.ibm_cf_executor() | ||
pw.call_async(self.hello_world, "") | ||
result = pw.get_result() | ||
self.assertEqual(result, "Hello World!") | ||
|
||
pw = pywren.ibm_cf_executor() | ||
pw.call_async(self.simple_map_function, [4, 6]) | ||
result = pw.get_result() | ||
self.assertEqual(result, 10) | ||
|
||
pw = pywren.ibm_cf_executor() | ||
pw.call_async(self.simple_map_function, {'x': 2, 'y': 8}) | ||
result = pw.get_result() | ||
self.assertEqual(result, 10) | ||
|
||
def test_map(self): | ||
iterdata = [[1, 1], [2, 2], [3, 3], [4, 4]] | ||
pw = pywren.ibm_cf_executor() | ||
pw.map(self.simple_map_function, iterdata) | ||
result = pw.get_result() | ||
self.assertEqual(result, [2, 4, 6, 8]) | ||
|
||
def test_map_reduce(self): | ||
iterdata = [[1, 1], [2, 2], [3, 3], [4, 4]] | ||
pw = pywren.ibm_cf_executor() | ||
pw.map_reduce(self.simple_map_function, iterdata, self.simple_reduce_function) | ||
result = pw.get_result() | ||
self.assertEqual(result, 20) | ||
|
||
|
||
def initTests(): | ||
print('Uploading test files...') | ||
|
||
cos = initCos() | ||
result_to_compare = 1 # including result's word | ||
i = 0 | ||
for url in TEST_FILES_URLS: | ||
content = urllib.request.urlopen(url).read() | ||
putFileToCOS(cos, CONFIG['pywren']['storage_bucket'], PREFIX + '/test' + str(i), content) | ||
result_to_compare += len(content.split()) | ||
i += 1 | ||
|
||
putFileToCOS(cos, CONFIG['pywren']['storage_bucket'], PREFIX + '/result', str(result_to_compare).encode()) | ||
|
||
print("ALL DONE") | ||
|
||
|
||
def cleanTests(): | ||
print('Deleting test files...') | ||
|
||
cos = initCos() | ||
for key in getFilenamesFromCOS(cos, CONFIG['pywren']['storage_bucket'], PREFIX): | ||
deleteFileFromCOS(cos, CONFIG['pywren']['storage_bucket'], key) | ||
|
||
print("ALL DONE") | ||
|
||
|
||
class TestPywrenCos(unittest.TestCase): | ||
|
||
def my_map_function_bucket(self, bucket, key, data_stream): | ||
print('I am processing the object {}'.format(key)) | ||
counter = {} | ||
|
||
data = data_stream.read() | ||
|
||
for line in data.splitlines(): | ||
for word in line.decode('utf-8').split(): | ||
if word not in counter: | ||
counter[word] = 1 | ||
else: | ||
counter[word] += 1 | ||
|
||
return counter | ||
|
||
def my_map_function_key(self, key, data_stream): | ||
print('I am processing the object {}'.format(key)) | ||
counter = {} | ||
|
||
data = data_stream.read() | ||
|
||
for line in data.splitlines(): | ||
for word in line.decode('utf-8').split(): | ||
if word not in counter: | ||
counter[word] = 1 | ||
else: | ||
counter[word] += 1 | ||
|
||
return counter | ||
|
||
def my_map_function_url(self, url, data_stream): | ||
print('I am processing the object from {}'.format(url)) | ||
counter = {} | ||
|
||
data = data_stream.read() | ||
|
||
for line in data.splitlines(): | ||
for word in line.decode('utf-8').split(): | ||
if word not in counter: | ||
counter[word] = 1 | ||
else: | ||
counter[word] += 1 | ||
|
||
return counter | ||
|
||
def my_reduce_function(self, results): | ||
final_result = 0 | ||
|
||
for count in results: | ||
for word in count: | ||
final_result += count[word] | ||
|
||
return final_result | ||
|
||
def checkResult(self, cos, result): | ||
result_to_compare = getFileFromCOS(cos, CONFIG['pywren']['storage_bucket'], PREFIX + '/result') | ||
|
||
if isinstance(result, list): | ||
total = 0 | ||
for r in result: | ||
total += r | ||
else: | ||
total = result | ||
|
||
self.assertEqual(total, int(result_to_compare)) | ||
|
||
def test_map_reduce_cos_bucket(self): | ||
data_prefix = CONFIG['pywren']['storage_bucket'] + '/' + PREFIX | ||
chunk_size = 4 * 1024 ** 2 # 4MB | ||
pw = pywren.ibm_cf_executor() | ||
pw.map_reduce(self.my_map_function_bucket, data_prefix, self.my_reduce_function, chunk_size) | ||
result = pw.get_result() | ||
self.checkResult(initCos(), result) | ||
|
||
def test_map_reduce_cos_bucket_one_reducer_per_object(self): | ||
data_prefix = CONFIG['pywren']['storage_bucket'] + '/' + PREFIX | ||
chunk_size = 4 * 1024 ** 2 # 4MB | ||
pw = pywren.ibm_cf_executor() | ||
pw.map_reduce(self.my_map_function_bucket, data_prefix, self.my_reduce_function, chunk_size, | ||
reducer_one_per_object=True) | ||
result = pw.get_result() | ||
self.checkResult(initCos(), result) | ||
|
||
def test_map_reduce_cos_key(self): | ||
cos = initCos() | ||
bucket_name = CONFIG['pywren']['storage_bucket'] | ||
iterdata = [bucket_name + '/' + key for key in getFilenamesFromCOS(cos, bucket_name, PREFIX)] | ||
chunk_size = 4 * 1024 ** 2 # 4MB | ||
pw = pywren.ibm_cf_executor() | ||
pw.map_reduce(self.my_map_function_key, iterdata, self.my_reduce_function, chunk_size) | ||
result = pw.get_result() | ||
self.checkResult(cos, result) | ||
|
||
def test_map_reduce_cos_key_one_reducer_per_object(self): | ||
cos = initCos() | ||
bucket_name = CONFIG['pywren']['storage_bucket'] | ||
iterdata = [bucket_name + '/' + key for key in getFilenamesFromCOS(cos, bucket_name, PREFIX)] | ||
chunk_size = 4 * 1024 ** 2 # 4MB | ||
pw = pywren.ibm_cf_executor() | ||
pw.map_reduce(self.my_map_function_key, iterdata, self.my_reduce_function, chunk_size, | ||
reducer_one_per_object=True) | ||
result = pw.get_result() | ||
self.checkResult(cos, result) | ||
|
||
def test_map_reduce_url(self): | ||
chunk_size = 4 * 1024 ** 2 # 4MB | ||
pw = pywren.ibm_cf_executor() | ||
pw.map_reduce(self.my_map_function_url, TEST_FILES_URLS, self.my_reduce_function, chunk_size) | ||
result = pw.get_result() | ||
self.checkResult(initCos(), result + 1) | ||
|
||
|
||
if __name__ == '__main__': | ||
if len(sys.argv) <= 1: | ||
task = 'full' | ||
else: | ||
task = sys.argv[1] | ||
|
||
if task == 'init': | ||
initTests() | ||
elif task == 'clean': | ||
cleanTests() | ||
else: | ||
suite = unittest.TestSuite() | ||
if task == 'pywren': | ||
suite.addTest(unittest.makeSuite(TestPywren)) | ||
elif task == 'pywren_cos': | ||
suite.addTest(unittest.makeSuite(TestPywrenCos)) | ||
elif task == 'full': | ||
suite.addTest(unittest.makeSuite(TestPywren)) | ||
suite.addTest(unittest.makeSuite(TestPywrenCos)) | ||
else: | ||
print('Unknown Command... use: "init", "pywren", "pywren_cos" or "clean".') | ||
sys.exit() | ||
|
||
runner = unittest.TextTestRunner() | ||
runner.run(suite) |