Skip to content

Commit

Permalink
core - sqlkv cache file (cloud-custodian#7659)
Browse files Browse the repository at this point in the history
  • Loading branch information
kapilt authored Aug 19, 2022
1 parent ce769a1 commit f62f6b9
Show file tree
Hide file tree
Showing 11 changed files with 227 additions and 217 deletions.
137 changes: 89 additions & 48 deletions c7n/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
"""
import pickle # nosec nosemgrep

from datetime import datetime, timedelta
import os
import logging
import time
import sqlite3

log = logging.getLogger('custodian.cache')

Expand All @@ -30,12 +31,11 @@ def factory(config):
if not CACHE_NOTIFY:
log.debug("Using in-memory cache")
CACHE_NOTIFY = True
return InMemoryCache()
return InMemoryCache(config)
return SqlKvCache(config)

return FileCacheManager(config)


class NullCache:
class Cache:

def __init__(self, config):
self.config = config
Expand All @@ -52,74 +52,115 @@ def save(self, key, data):
def size(self):
return 0

def close(self):
pass

def __enter__(self):
self.load()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()


class InMemoryCache:
class NullCache(Cache):
pass


class InMemoryCache(Cache):
# Running in a temporary environment, so keep as a cache.

__shared_state = {}

def __init__(self):
def __init__(self, config):
super().__init__(config)
self.data = self.__shared_state

def load(self):
return True

def get(self, key):
return self.data.get(pickle.dumps(key)) # nosemgrep
return self.data.get(encode(key))

def save(self, key, data):
self.data[pickle.dumps(key)] = data # nosemgrep
self.data[encode(key)] = data

def size(self):
return sum(map(len, self.data.values()))


class FileCacheManager:
def encode(key):
return pickle.dumps(key, protocol=pickle.HIGHEST_PROTOCOL) # nosemgrep


def resolve_path(path):
return os.path.abspath(
os.path.expanduser(
os.path.expandvars(path)))


class SqlKvCache(Cache):

create_table = """
create table if not exists c7n_cache (
key blob primary key,
value blob,
create_date timestamp
)
"""

def __init__(self, config):
self.config = config
super().__init__(config)
self.cache_period = config.cache_period
self.cache_path = os.path.abspath(
os.path.expanduser(
os.path.expandvars(
config.cache)))
self.data = {}

def get(self, key):
k = pickle.dumps(key) # nosemgrep
return self.data.get(k)
self.cache_path = resolve_path(config.cache)
self.conn = None

def load(self):
if self.data:
return True
if os.path.isfile(self.cache_path):
if (time.time() - os.stat(self.cache_path).st_mtime >
self.config.cache_period * 60):
return False
# migration from pickle cache file
if os.path.exists(self.cache_path):
with open(self.cache_path, 'rb') as fh:
try:
self.data = pickle.load(fh) # nosec nosemgrep
except EOFError:
return False
log.debug("Using cache file %s" % self.cache_path)
return True
header = fh.read(15)
if header != b'SQLite format 3':
log.debug('removing old cache file')
os.remove(self.cache_path)
elif not os.path.exists(os.path.dirname(self.cache_path)):
# parent directory creation
os.makedirs(os.path.dirname(self.cache_path))
self.conn = sqlite3.connect(self.cache_path)
self.conn.execute(self.create_table)
with self.conn as cursor:
log.debug('expiring stale cache entries')
cursor.execute(
'delete from c7n_cache where create_date < ?',
[datetime.utcnow() - timedelta(minutes=self.cache_period)])
return True

def save(self, key, data):
try:
with open(self.cache_path, 'wb') as fh: # nosec
self.data[pickle.dumps(key)] = data # nosemgrep
pickle.dump(self.data, fh, protocol=2) # nosemgrep
except Exception as e:
log.warning("Could not save cache %s err: %s" % (
self.cache_path, e))
if not os.path.exists(self.cache_path):
directory = os.path.dirname(self.cache_path)
log.info('Generating Cache directory: %s.' % directory)
try:
os.makedirs(directory)
except Exception as e:
log.warning("Could not create directory: %s err: %s" % (
directory, e))
def get(self, key):
with self.conn as cursor:
r = cursor.execute(
'select value, create_date from c7n_cache where key = ?',
[sqlite3.Binary(encode(key))]
)
row = r.fetchone()
if row is None:
return None
value, create_date = row
create_date = sqlite3.converters['TIMESTAMP'](create_date.encode('utf8'))
if (datetime.utcnow() - create_date).total_seconds() / 60.0 > self.cache_period:
return None
return pickle.loads(value) # nosec nosemgrep

def save(self, key, data, timestamp=None):
with self.conn as cursor:
timestamp = timestamp or datetime.utcnow()
cursor.execute(
'replace into c7n_cache (key, value, create_date) values (?, ?, ?)',
(sqlite3.Binary(encode(key)), sqlite3.Binary(encode(data)), timestamp))

def size(self):
return os.path.exists(self.cache_path) and os.path.getsize(self.cache_path) or 0

def close(self):
if self.conn:
self.conn.close()
self.conn = None
3 changes: 1 addition & 2 deletions c7n/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1285,8 +1285,7 @@ def __call__(self):
resources = mode.provision()
else:
resources = mode.run()
# clear out resource manager post run, to clear cache
self.resource_manager = self.load_resource_manager()

return resources

run = __call__
Expand Down
3 changes: 3 additions & 0 deletions c7n/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,8 @@ def resources(self, query=None, augment=True) -> List[dict]:
# Don't pollute cache with unaugmented resources.
self._cache.save(cache_key, resources)

self._cache.close()

resource_count = len(resources)
with self.ctx.tracer.subsegment('filter'):
resources = self.filter_resources(resources)
Expand Down Expand Up @@ -556,6 +558,7 @@ def _get_cached_resources(self, ids):
m = self.get_model()
id_set = set(ids)
return [r for r in resources if r[m.id] in id_set]
self._cache.close()
return None

def get_resources(self, ids, cache=True, augment=True):
Expand Down
33 changes: 19 additions & 14 deletions c7n/resources/iam.py
Original file line number Diff line number Diff line change
Expand Up @@ -1605,20 +1605,25 @@ def get_value_or_schema_default(self, k):
return self.schema['properties'][k]['default']

def get_credential_report(self):
report = self.manager._cache.get('iam-credential-report')
if report:
return report
data = self.fetch_credential_report()
report = {}
if isinstance(data, bytes):
reader = csv.reader(io.StringIO(data.decode('utf-8')))
else:
reader = csv.reader(io.StringIO(data))
headers = next(reader)
for line in reader:
info = dict(zip(headers, line))
report[info['user']] = self.process_user_record(info)
self.manager._cache.save('iam-credential-report', report)
cache = self.manager._cache
with cache:
cache_key = {'account': self.manager.config.account_id, 'iam-credential-report': True}
report = cache.get(cache_key)

if report:
return report
data = self.fetch_credential_report()
report = {}
if isinstance(data, bytes):
reader = csv.reader(io.StringIO(data.decode('utf-8')))
else:
reader = csv.reader(io.StringIO(data))
headers = next(reader)
for line in reader:
info = dict(zip(headers, line))
report[info['user']] = self.process_user_record(info)
cache.save(cache_key, report)

return report

@classmethod
Expand Down
35 changes: 17 additions & 18 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ psutil = "^5.7.0"
twine = "^3.1.1"
pytest-sugar = "^0.9.2"
click = "^8.0"
freezegun = "^1.2.2"

[tool.black]
skip-string-normalization = true
Loading

0 comments on commit f62f6b9

Please sign in to comment.