-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathshared_cache.py
120 lines (97 loc) · 4.42 KB
/
shared_cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import copy, os
from multiprocessing import Lock
import utilities
# A Orchestrator is a proxying manager manages the shared cache data between processes.
# Orchestrator is managed by an internal server process
class Orchestrator(object):
def __init__(self,number_clusters):
self.cached_data_clusters = [{} for _ in range(number_clusters)]
self.lock = Lock()
def getClusterByIndex(self,cluster_index):
self.lock.acquire()
img = copy.deepcopy(self.cached_data_clusters[cluster_index])
self.lock.release()
return img
def getAllLocalCacheWithoutLock(self):
self.lock.acquire()
img = copy.deepcopy(self.cached_data_clusters)
self.lock.release()
utilities.dump_data_to_file(img, "tmp", "cache")
content = self.getClusterSizeAllWithoutLock()
print("Cache pairs " + str(content))
print("Cache (mb) " + str(os.path.getsize(os.path.join("tmp", "cache"))/(1024*1024.0)))
def getClusterSizeByIndex(self,cluster_index):
temp_size = 0
self.lock.acquire()
for _,value in self.cached_data_clusters[cluster_index].items():
temp_size += len(value)
self.lock.release()
return temp_size
def getClusterSizeAll(self):
cur_size = []
self.lock.acquire()
for cluster_index in range(len(self.cached_data_clusters)):
temp_size = 0
for _,value in self.cached_data_clusters[cluster_index].items():
temp_size += len(value)
cur_size.append(temp_size)
self.lock.release()
return cur_size
def getClusterSizeAllWithoutLock(self):
cur_size = []
for cluster_index in range(len(self.cached_data_clusters)):
temp_size = 0
for _,value in self.cached_data_clusters[cluster_index].items():
temp_size += len(value)
cur_size.append(temp_size)
#print("Test " + str(cur_size))
return sum(cur_size)
def getClusterKeyNum(self):
cur_size = []
self.lock.acquire()
for cluster_index in range(len(self.cached_data_clusters)):
cur_size.append(len(self.cached_data_clusters[cluster_index]))
self.lock.release()
return cur_size
def clearClusterByIndex(self,cluster_index):
self.lock.acquire()
self.cached_data_clusters[cluster_index].clear()
self.lock.release()
def readCacheSizeAll(self):
self.lock.acquire()
count = 0
for cluster_index in range(len(self.cached_data_clusters)):
count += len(self.cached_data_clusters[cluster_index])
self.lock.release()
return count
def updateEntriesByDocument(self,keyword,docId,cluster_index):
self.lock.acquire()
if keyword in self.cached_data_clusters[cluster_index]:
self.cached_data_clusters[cluster_index][keyword].add(docId)
else:
self.cached_data_clusters[cluster_index][keyword] = set([docId])
self.lock.release()
def updateClusterEntries(self, key_id_pairs): #(keyword,id,cluster_index)
self.lock.acquire()
for (keyword,docId,cluster_index) in key_id_pairs:
if keyword in self.cached_data_clusters[cluster_index]:
self.cached_data_clusters[cluster_index][keyword].add(docId)
else:
self.cached_data_clusters[cluster_index][keyword] = set([docId])
self.lock.release()
def updateClusterByIndex(self, cluster_index,falling_back_data):
self.lock.acquire()
for keyword, ids in falling_back_data.items():
if keyword in self.cached_data_clusters[cluster_index]:
for docId in ids:
self.cached_data_clusters[cluster_index][keyword].add(docId)
else:
self.cached_data_clusters[cluster_index][keyword] = set([])
for docId in ids:
self.cached_data_clusters[cluster_index][keyword].add(docId)
self.lock.release()
def search_local_cache(self, keyword):
for cluster in self.cached_data_clusters:
if keyword in cluster:
return cluster[keyword]
return None