Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CC-News benchmark #600

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
add first Benchmark iteration
  • Loading branch information
MaxDall committed Jul 17, 2024
commit aed8b8c15546d337eca6e9fd95f8250ab47727ec
158 changes: 148 additions & 10 deletions src/fundus/scraping/crawler.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
from __future__ import annotations

import contextlib
import datetime as dt
import gzip
import itertools
import json
import logging.config
import math
import multiprocessing
import os
import pickle
import random
import re
import threading
import time
from abc import ABC, abstractmethod
from datetime import datetime
from functools import lru_cache, partial, wraps
from multiprocessing import Manager
from multiprocessing.context import TimeoutError
Expand All @@ -22,10 +25,13 @@
from typing import (
Any,
Callable,
Dict,
Generic,
Iterator,
List,
Literal,
Mapping,
NamedTuple,
Optional,
Pattern,
Set,
Expand All @@ -37,25 +43,28 @@
)
from urllib.parse import urljoin, urlparse

import dateutil.rrule as rrule
import dill
import fastwarc.stream_io
import matplotlib.pyplot as plt
import more_itertools
import numpy as np
import requests
import urllib3.exceptions
from dateutil.rrule import MONTHLY, rrule
from more_itertools import roundrobin
from tqdm import tqdm
from typing_extensions import ParamSpec, TypeAlias
from typing_extensions import ParamSpec, Self, TypeAlias

from fundus.logging import create_logger, get_current_config
from fundus.publishers.base_objects import Publisher, PublisherGroup
from fundus.scraping.article import Article
from fundus.scraping.article import Article, Stat
from fundus.scraping.delay import Delay
from fundus.scraping.filter import ExtractionFilter, Requires, RequiresAll, URLFilter
from fundus.scraping.html import CCNewsSource
from fundus.scraping.html import HTML, CCNewsSource
from fundus.scraping.scraper import CCNewsScraper, WebScraper
from fundus.scraping.session import session_handler
from fundus.scraping.url import URLSource
from fundus.utils.serialization import JSONVal
from fundus.utils.timeout import Timeout

logger = create_logger(__name__)
Expand Down Expand Up @@ -435,8 +444,8 @@ class CCNewsCrawler(CrawlerBase):
def __init__(
self,
*publishers: PublisherType,
start: datetime = datetime(2016, 8, 1),
end: datetime = datetime.now(),
start: dt.datetime = dt.datetime(2016, 8, 1),
end: dt.datetime = dt.datetime.now(),
processes: int = -1,
retries: int = 3,
disable_tqdm: bool = False,
Expand Down Expand Up @@ -561,13 +570,13 @@ def _get_warc_paths(self) -> List[str]:
if self.start >= self.end:
raise ValueError("Start date has to be < end date.")

if self.start < datetime(2016, 8, 1):
if self.start < dt.datetime(2016, 8, 1):
raise ValueError("The default, and earliest possible, start date is 2016/08/01.")

if self.end > datetime.now():
if self.end > dt.datetime.now():
raise ValueError("The specified end date is in the future. We don't want to give spoilers, do we?")

date_sequence: List[datetime] = list(rrule(MONTHLY, dtstart=self.start, until=self.end))
date_sequence: List[dt.datetime] = list(rrule.rrule(rrule.MONTHLY, dtstart=self.start, until=self.end))
urls: List[str] = [
f"{self.server_address}crawl-data/CC-NEWS/{date.strftime('%Y/%m')}/warc.paths.gz" for date in date_sequence
]
Expand Down Expand Up @@ -629,3 +638,132 @@ def _build_article_iterator(
yield from self._single_crawl(warc_paths, article_task)
else:
yield from self._parallel_crawl(warc_paths, article_task)

def benchmark(
self, sample_rate: int = rrule.MONTHLY, sample_size: Optional[int] = 1000, keep_html: bool = True
) -> Benchmark:
if sample_rate > rrule.DAILY:
raise ValueError("Sample rate < rrule.DAILY are not supported")

benchmark = Benchmark(*self.publishers, keep_html=keep_html)

dates = list(rrule.rrule(freq=sample_rate, dtstart=self.start, until=self.end))

# TODO: add date filter
for date in tqdm(reversed(dates), total=len(dates), desc="samples", position=0, disable=self.disable_tqdm):
crawler = CCNewsCrawler(*self.publishers, start=date, end=date + dt.timedelta(days=1), disable_tqdm=True)

for article in tqdm(
crawler.crawl(max_articles=sample_size, only_complete=False),
total=sample_size,
desc="articles",
position=1,
leave=False,
disable=self.disable_tqdm,
):
benchmark.add(article)

return benchmark


class Entry(NamedTuple):
stat: Stat
html: Optional[HTML] = None

def __repr__(self) -> str:
return f"{self.stat!r}"


class Series(List[Entry]):
def __init__(self):
super().__init__()

@property
def avg(self) -> float:
return sum(entry.stat.completeness for entry in self) / len(self)

def __repr__(self) -> str:
return f"{self.avg:.2%}"


class TimeFrame(Dict[str, Series]):
def __init__(self, *publishers: str, keep_html: bool = True):
super().__init__({publisher: Series() for publisher in publishers})
self._keep_html = keep_html

def add(self, article: Article):
self[article.publishers].append(Entry(article.complete, article.html if self._keep_html else None))

def squeeze(self, threshold: float) -> Self:
for publisher, series in self.items():
tmp = Series()
for entry in series:
if entry.stat.completeness <= threshold:
tmp.append(entry)
self[publisher] = tmp
return self

def reduce(self, percentage: float) -> Self:
new_length = math.ceil(len(self) * percentage)
while len(self) > new_length:
max_list = sorted(self.values(), key=len, reverse=True)[0]
max_list.pop()
return self

def trim(self, max_length: int) -> Self:
for publisher, series in self.items():
if len(series) <= max_length:
continue
random.shuffle(series)
self[publisher] = series[:max_length]
return self

def __len__(self) -> int:
return sum(len(entries) for entries in self.values())


class Benchmark(Dict[dt.date, TimeFrame]):
def __init__(self, *publishers: Publisher, keep_html: bool = True):
self.keep_html = keep_html
self._publishers = {publisher.name for publisher in publishers}
super().__init__()

def add(self, article: Article):
record = self[article.html.crawl_date.date()]
record.add(article)

def squeeze(self, threshold: float) -> Self:
for frame in self.values():
frame.squeeze(threshold)
return self

def reduce(self, percentage: float) -> Self:
for frame in self.values():
frame.reduce(percentage)
return self

def trim(self, max_length: int) -> Self:
for frame in self.values():
frame.trim(max_length)
return self

def save(self, path: Union[Path, str]) -> None:
with gzip.open(path, "wb") as file:
file.write(pickle.dumps(self))

@classmethod
def load(cls, path: Union[Path, str]) -> Benchmark:
with gzip.open(path, "rb") as file:
return pickle.loads(file.read())

def plot(self, path: Union[Path, str]) -> None:
fig, ax = plt.subplots()
ax.plot(self)

def __len__(self) -> int:
return sum(len(record) for record in self.values())

def __missing__(self, key: dt.date) -> TimeFrame:
new = TimeFrame(*self._publishers, keep_html=self.keep_html)
self[key] = new
return new
Loading