Skip to content

Commit 5f5ad7e

Browse files
authored
Merge pull request #347 from flairNLP/remove-asyncio-from-ccnews
Replace `asyncio` with thread-based solution for WARC-path download
2 parents 678f303 + d8b64ee commit 5f5ad7e

File tree

1 file changed

+17
-21
lines changed

1 file changed

+17
-21
lines changed

src/fundus/scraping/common_crawl/pipeline.py

+17-21
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
11
from __future__ import annotations
22

3-
import asyncio
43
import gzip
54
import os
65
import re
76
from datetime import datetime
87
from functools import lru_cache, partial, wraps
98
from multiprocessing import Manager
109
from multiprocessing.context import TimeoutError
11-
from multiprocessing.pool import MapResult, Pool
10+
from multiprocessing.pool import MapResult, Pool, ThreadPool
1211
from queue import Empty, Queue
1312
from typing import (
1413
Any,
1514
Callable,
16-
Coroutine,
1715
Generic,
1816
Iterator,
1917
List,
@@ -27,11 +25,11 @@
2725
cast,
2826
)
2927

30-
import aiohttp
3128
import dill
3229
import more_itertools
30+
import requests
3331
from dateutil.rrule import MONTHLY, rrule
34-
from tqdm.asyncio import tqdm
32+
from tqdm import tqdm
3533
from typing_extensions import ParamSpec
3634

3735
from fundus.publishers.base_objects import PublisherEnum
@@ -147,24 +145,22 @@ def _get_warc_paths(self, start: datetime, end: datetime) -> List[str]:
147145
f"{self.server_address}crawl-data/CC-NEWS/{date.strftime('%Y/%m')}/warc.paths.gz" for date in date_sequence
148146
]
149147

150-
async def load_warc_paths_from(url: str) -> List[str]:
151-
async with aiohttp.ClientSession(raise_for_status=True) as session:
152-
async with session.get(url) as response:
153-
return gzip.decompress(await response.read()).decode("utf-8").split()
148+
with tqdm(total=len(urls), desc="Loading WARC Paths", leave=False) as bar:
154149

155-
load_warc_paths: Coroutine[Any, Any, List[List[str]]] = tqdm.gather(
156-
*[load_warc_paths_from(url) for url in urls],
157-
total=len(urls),
158-
desc="Loading WARC paths",
159-
leave=False,
160-
)
150+
def load_paths(url: str) -> List[str]:
151+
with requests.Session() as session:
152+
paths = gzip.decompress(session.get(url).content).decode("utf-8").split()
153+
bar.update()
154+
return paths
161155

162-
try:
163-
event_loop = asyncio.get_running_loop()
164-
except RuntimeError:
165-
nested_warc_paths = asyncio.run(load_warc_paths)
166-
else:
167-
nested_warc_paths = event_loop.run_until_complete(load_warc_paths)
156+
if self.processes == 0:
157+
nested_warc_paths = [load_paths(url) for url in urls]
158+
else:
159+
# use two threads per process, default two threads per core
160+
max_number_of_threads = self.processes * 2
161+
162+
with ThreadPool(processes=min(len(urls), max_number_of_threads)) as pool:
163+
nested_warc_paths = pool.map(load_paths, urls)
168164

169165
warc_paths: Iterator[str] = more_itertools.flatten(nested_warc_paths)
170166

0 commit comments

Comments
 (0)