-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add optional redis caching to Priviblur (#26)
- Loading branch information
Showing
28 changed files
with
717 additions
and
81 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from .poll_results import get_poll_results | ||
from .search import get_search_results | ||
from .explore import get_explore_results | ||
from .tagged import get_tag_browse_results | ||
from .blogs import get_blog_posts, get_blog_post |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
import abc | ||
import typing | ||
|
||
import orjson | ||
|
||
from .. import priviblur_extractor | ||
|
||
class AccessCache(abc.ABC): | ||
def __init__(self, ctx, prefix, cache_ttl, continuation=None, **kwargs): | ||
self.ctx = ctx | ||
self.prefix = prefix | ||
self.cache_ttl = cache_ttl | ||
|
||
self.continuation = continuation | ||
self.kwargs = kwargs | ||
|
||
@abc.abstractmethod | ||
def fetch(self) -> typing.Dict[str, typing.Any]: | ||
"""Fetches results from Tumblr""" | ||
pass | ||
|
||
@abc.abstractmethod | ||
def parse(self, initial_results): | ||
"""Parses the initial JSON response from Tumblr""" | ||
pass | ||
|
||
@abc.abstractmethod | ||
def build_key(self) -> typing.Tuple[str, str]: | ||
"""Creates a key to get/store an item within the cache""" | ||
pass | ||
|
||
def parse_cached_json(self, json): | ||
return priviblur_extractor.models.timeline.Timeline.from_json(json) | ||
|
||
def get_key(self): | ||
base_key = self.build_key() | ||
|
||
if self.continuation: | ||
full_key_with_continuation = f"{base_key}:{self.continuation}" | ||
else: | ||
full_key_with_continuation = base_key | ||
|
||
return base_key, full_key_with_continuation | ||
|
||
async def parse_and_cache(self, base_key, full_key_with_continuation, initial_results): | ||
"""Inserts the given results into the cache within the given key | ||
Creates a placeholder item within the cache for the next continuation batch if applicable | ||
""" | ||
pipeline = self.ctx.CacheDb.pipeline() | ||
|
||
timeline = self.parse(initial_results) | ||
|
||
pipeline.set(full_key_with_continuation, orjson.dumps(timeline.to_json_serialisable())) | ||
pipeline.expire(full_key_with_continuation, self.cache_ttl) | ||
|
||
# Allocate key slot for the next continuation | ||
# | ||
# When a given continuation is invalid Tumblr returns the data for the initial page. As such, | ||
# we need to add in an extra check here to ensure that a malicious user does not arbitrarily add | ||
# in data to the cache | ||
# | ||
# "0" is used as a placeholder | ||
|
||
if timeline.next and timeline.next.cursor: | ||
next_key = f"{base_key}:{timeline.next.cursor}" | ||
pipeline.setnx(next_key, "0") | ||
pipeline.expire(next_key, self.cache_ttl) | ||
|
||
self.ctx.LOGGER.debug("Cache: Allocating a slot for continuation batch with key \"%s\"", next_key) | ||
|
||
await pipeline.execute() | ||
|
||
return timeline | ||
|
||
async def get_cached(self): | ||
"""Retrieves an item from the cache | ||
Fetches new data and inserts into the cache when it is unable to do so | ||
""" | ||
base_key, full_key_with_continuation = self.get_key() | ||
cached_result = await self.ctx.CacheDb.get(full_key_with_continuation) | ||
|
||
# See comment in self.parse_and_cache as to why "0" | ||
if not cached_result or cached_result == "0": | ||
initial_results = await self.fetch() | ||
|
||
# When the current request has a continuation token attached, we'll only cache | ||
# when a slot has already been allocated for it from the previous request. | ||
if self.continuation and not cached_result: | ||
return self.parse(initial_results) | ||
else: | ||
self.ctx.LOGGER.info("Cache: Adding \"%s\" to the cache", full_key_with_continuation) | ||
return await self.parse_and_cache(base_key, full_key_with_continuation, initial_results) | ||
else: | ||
self.ctx.LOGGER.info("Cache: Cached version of \"%s\" found", full_key_with_continuation) | ||
|
||
initial_results_from_cache = orjson.loads(cached_result) | ||
|
||
if initial_results_from_cache["version"] != priviblur_extractor.models.VERSION: | ||
self.ctx.LOGGER.debug( | ||
"Cache: Version mismatch! Cached object is from a different version of Priviblur (%(cached_version)s != %(priviblur_version)s). Fetching new response...", | ||
dict(cached_version=initial_results_from_cache["version"], priviblur_version=priviblur_extractor.models.VERSION) | ||
) | ||
new_initial_results = await self.fetch() | ||
return await self.parse_and_cache(base_key, full_key_with_continuation, new_initial_results) | ||
|
||
return self.parse_cached_json(initial_results_from_cache) | ||
|
||
async def get(self): | ||
"""Retrieves some data from either the cache or Tumblr itself""" | ||
if self.ctx.CacheDb: | ||
return await self.get_cached() | ||
else: | ||
initial_results = await self.fetch() | ||
return self.parse(initial_results) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
import orjson | ||
|
||
from .base import AccessCache | ||
from .. import priviblur_extractor | ||
|
||
|
||
class BlogPostsCache(AccessCache): | ||
def __init__(self, ctx, blog, continuation, **kwargs): | ||
super().__init__( | ||
ctx=ctx, | ||
prefix=f"blog:{blog}", | ||
cache_ttl=ctx.PRIVIBLUR_CONFIG.cache.cache_blog_feed_for, | ||
continuation=continuation, | ||
**kwargs | ||
) | ||
|
||
self.blog = blog | ||
|
||
async def fetch(self): | ||
"""Fetches blog posts from Tumblr""" | ||
return await self.ctx.TumblrAPI.blog_posts(self.blog, continuation=self.continuation, **self.kwargs) | ||
|
||
def parse(self, initial_results): | ||
return priviblur_extractor.parse_blog_timeline(initial_results) | ||
|
||
def parse_cached_json(self, json): | ||
return priviblur_extractor.models.blog.Blog.from_json(json) | ||
|
||
def build_key(self): | ||
# blog:<blog_name>:<kwargs>:<continuation> | ||
path_to_cached_results = [self.prefix, ] | ||
for k,v in self.kwargs.items(): | ||
if v: | ||
path_to_cached_results.append(f"{k}:{v}") | ||
|
||
return ':'.join(path_to_cached_results) | ||
|
||
|
||
class BlogPostCache(AccessCache): | ||
def __init__(self, ctx, blog, post_id, **kwargs): | ||
super().__init__( | ||
ctx=ctx, | ||
prefix=f"blog:{blog}:post:{post_id}", | ||
cache_ttl=ctx.PRIVIBLUR_CONFIG.cache.cache_blog_post_for, | ||
**kwargs | ||
) | ||
|
||
self.blog = blog | ||
self.post_id = post_id | ||
|
||
async def fetch(self): | ||
return await self.ctx.TumblrAPI.blog_post(self.blog, self.post_id, **self.kwargs) | ||
|
||
def parse(self, initial_results): | ||
return priviblur_extractor.parse_timeline(initial_results) | ||
|
||
def build_key(self): | ||
# blog:<blog_name>:post:<post_id>:<kwargs> | ||
path_to_cached_results = [self.prefix, ] | ||
for k,v in self.kwargs.items(): | ||
if v: | ||
path_to_cached_results.append(f"{k}:{v}") | ||
|
||
return ':'.join(path_to_cached_results) | ||
|
||
|
||
async def get_blog_posts(ctx, blog, continuation=None, **kwargs): | ||
blog_posts_cache = BlogPostsCache(ctx, blog, continuation, **kwargs) | ||
return await blog_posts_cache.get() | ||
|
||
|
||
async def get_blog_post(ctx, blog, post_id, **kwargs): | ||
blog_post_cache = BlogPostCache(ctx, blog, post_id, **kwargs) | ||
return await blog_post_cache.get() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
import orjson | ||
|
||
from .base import AccessCache | ||
from .. import priviblur_extractor | ||
|
||
|
||
class ExploreCache(AccessCache): | ||
def __init__(self, ctx, type_, continuation, fetch_function, **kwargs): | ||
super().__init__( | ||
ctx=ctx, | ||
prefix=f"explore:{type_}", | ||
cache_ttl=ctx.PRIVIBLUR_CONFIG.cache.cache_feed_for, | ||
continuation=continuation, | ||
**kwargs | ||
) | ||
|
||
self.fetch_function = fetch_function | ||
|
||
async def fetch(self): | ||
"""Fetches search results from Tumblr""" | ||
return await self.fetch_function( | ||
continuation=self.continuation, | ||
**self.kwargs | ||
) | ||
|
||
def parse(self, initial_results): | ||
return priviblur_extractor.parse_timeline(initial_results) | ||
|
||
def build_key(self): | ||
return self.prefix | ||
|
||
|
||
async def get_explore_results(ctx, fetch_function, type_, continuation, **kwargs): | ||
search_cache = ExploreCache(ctx, type_, continuation, fetch_function, **kwargs) | ||
return await search_cache.get() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
import orjson | ||
|
||
async def get_poll_results(ctx, blog, post_id,poll_id, expired=False): | ||
"""Gets poll results from the given data | ||
Attempts to retrieve from the cache first and foremost, and only requests when the data is either unavailable or expired. | ||
""" | ||
if ctx.CacheDb: | ||
cached_result = await ctx.CacheDb.hgetall(f"polls:{poll_id}") | ||
if cached_result: | ||
timestamp = cached_result.pop("timestamp") | ||
poll_results = {k:int(v) for k, v in cached_result.items()} | ||
|
||
return {"timestamp": timestamp, "results": poll_results} | ||
else: | ||
initial_results = await _fetch_poll_results(ctx.TumblrAPI, blog, post_id, poll_id) | ||
await _cache_poll_results(ctx, initial_results, poll_id, expired) | ||
|
||
return initial_results | ||
else: | ||
return await _fetch_poll_results(ctx.TumblrAPI, blog, post_id, poll_id) | ||
|
||
|
||
async def _fetch_poll_results(tumblr_api, blog, post_id, poll_id): | ||
"""Requests Tumblr for poll results""" | ||
initial_results = await tumblr_api.poll_results(blog, post_id, poll_id) | ||
return initial_results["response"] | ||
|
||
|
||
async def _cache_poll_results(ctx, results, poll_id, expired): | ||
"""Caches the given poll results""" | ||
if expired: | ||
ttl = ctx.PRIVIBLUR_CONFIG.cache.cache_expired_poll_results_for | ||
else: | ||
ttl = ctx.PRIVIBLUR_CONFIG.cache.cache_active_poll_results_for | ||
|
||
pipeline = ctx.CacheDb.pipeline() | ||
|
||
cache_id = f"polls:{poll_id}" | ||
|
||
pipeline.hset(cache_id, mapping={ | ||
**results["results"], | ||
"timestamp": results["timestamp"], | ||
}) | ||
|
||
pipeline.expire(cache_id, ttl) | ||
|
||
await pipeline.execute() | ||
|
||
|
Oops, something went wrong.