diff --git a/playwright_recaptcha/recaptchav2/async_solver.py b/playwright_recaptcha/recaptchav2/async_solver.py index f5d1be5..594c7f4 100644 --- a/playwright_recaptcha/recaptchav2/async_solver.py +++ b/playwright_recaptcha/recaptchav2/async_solver.py @@ -11,17 +11,14 @@ import httpx import pydub import speech_recognition -from playwright.async_api import Frame, Locator, Page, Response +from playwright.async_api import Page, Response from playwright_recaptcha.errors import ( RecaptchaNotFoundError, RecaptchaRateLimitError, RecaptchaSolveError, ) -from playwright_recaptcha.recaptchav2.utils import ( - get_recaptcha_checkbox, - get_recaptcha_frame, -) +from playwright_recaptcha.recaptchav2.recaptcha_box import AsyncRecaptchaBox class AsyncSolver: @@ -71,74 +68,6 @@ async def __aenter__(self) -> AsyncSolver: async def __aexit__(self, *args: Any) -> None: self.close() - async def _random_delay(self) -> None: - """Delay the execution for a random amount of time between 1 and 4 seconds.""" - await self._page.wait_for_timeout(random.randint(1000, 4000)) - - async def _extract_token(self, response: Response) -> None: - """ - Extract the g-recaptcha-response token from the userverify response. - - Parameters - ---------- - response : Response - The response to extract the g-recaptcha-response token from. - """ - if re.search("/recaptcha/(api2|enterprise)/userverify", response.url) is None: - return - - token_match = re.search('"uvresp","(.*?)"', await response.text()) - - if token_match is not None: - self.token = token_match.group(1) - - async def _get_audio_url(self, recaptcha_frame: Frame) -> str: - """ - Get the reCAPTCHA audio URL. - - Parameters - ---------- - recaptcha_frame : Frame - The reCAPTCHA frame. - - Returns - ------- - str - The reCAPTCHA audio URL. - - Raises - ------ - RecaptchaRateLimitError - If the reCAPTCHA rate limit has been exceeded. - """ - audio_challenge_button = recaptcha_frame.get_by_role( - "button", name="Get an audio challenge" - ) - - if await audio_challenge_button.is_visible(): - await audio_challenge_button.click(force=True) - - audio_challenge_text = recaptcha_frame.get_by_text("Press PLAY to listen") - rate_limit = recaptcha_frame.get_by_text("Try again later") - - while True: - if ( - await audio_challenge_text.is_visible() - and await audio_challenge_text.is_enabled() - ): - break - - if await rate_limit.is_visible(): - raise RecaptchaRateLimitError - - await self._page.wait_for_timeout(100) - - audio_url = recaptcha_frame.get_by_role( - "link", name="Alternatively, download audio as MP3" - ) - - return await audio_url.get_attribute("href") - @staticmethod async def _convert_audio_to_text(audio_url: str) -> Optional[str]: """ @@ -187,18 +116,70 @@ async def _convert_audio_to_text(audio_url: str) -> Optional[str]: return text["alternative"][0]["transcript"] if text else None + async def _random_delay(self) -> None: + """Delay the execution for a random amount of time between 1 and 4 seconds.""" + await self._page.wait_for_timeout(random.randint(1000, 4000)) + + async def _extract_token(self, response: Response) -> None: + """ + Extract the g-recaptcha-response token from the userverify response. + + Parameters + ---------- + response : Response + The response to extract the g-recaptcha-response token from. + """ + if re.search("/recaptcha/(api2|enterprise)/userverify", response.url) is None: + return + + token_match = re.search('"uvresp","(.*?)"', await response.text()) + + if token_match is not None: + self.token = token_match.group(1) + + async def _get_audio_url(self, recaptcha_box: AsyncRecaptchaBox) -> str: + """ + Get the reCAPTCHA audio URL. + + Parameters + ---------- + recaptcha_box : AsyncRecaptchaBox + The reCAPTCHA box. + + Returns + ------- + str + The reCAPTCHA audio URL. + + Raises + ------ + RecaptchaRateLimitError + If the reCAPTCHA rate limit has been exceeded. + """ + if await recaptcha_box.audio_challenge_button.is_visible(): + await recaptcha_box.audio_challenge_button.click(force=True) + + while True: + if await recaptcha_box.audio_challenge_is_visible(): + break + + if await recaptcha_box.rate_limit_is_visible(): + raise RecaptchaRateLimitError + + await self._page.wait_for_timeout(100) + + return await recaptcha_box.audio_download_button.get_attribute("href") + async def _submit_audio_text( - self, recaptcha_frame: Frame, recaptcha_checkbox: Locator, text: str + self, recaptcha_box: AsyncRecaptchaBox, text: str ) -> None: """ Submit the reCAPTCHA audio text. Parameters ---------- - recaptcha_frame : Frame - The reCAPTCHA frame. - recaptcha_checkbox : Locator - The reCAPTCHA checkbox. + recaptcha_box : AsyncRecaptchaBox + The reCAPTCHA box. text : str The reCAPTCHA audio text. @@ -207,26 +188,17 @@ async def _submit_audio_text( RecaptchaRateLimitError If the reCAPTCHA rate limit has been exceeded. """ - textbox = recaptcha_frame.get_by_role("textbox", name="Enter what you hear") - verify_button = recaptcha_frame.get_by_role("button", name="Verify") - - await textbox.fill(text) - await verify_button.click() - - solve_failure = recaptcha_frame.get_by_text( - "Multiple correct solutions required - please solve more." - ) + await recaptcha_box.audio_challenge_textbox.fill(text) + await recaptcha_box.audio_challenge_verify_button.click() - rate_limit = recaptcha_frame.get_by_text("Try again later") - - while not recaptcha_frame.is_detached(): + while recaptcha_box.frames_are_attached(): if ( - await recaptcha_checkbox.is_checked() - or await solve_failure.is_visible() + await recaptcha_box.checkbox.is_checked() + or await recaptcha_box.solve_failure_is_visible() ): break - if await rate_limit.is_visible(): + if await recaptcha_box.rate_limit_is_visible(): raise RecaptchaRateLimitError await self._page.wait_for_timeout(100) @@ -261,32 +233,29 @@ async def solve_recaptcha(self, attempts: Optional[int] = None) -> str: RecaptchaSolveError If the reCAPTCHA could not be solved. """ + self.token = None self._page.on("response", self._extract_token) - attempts = attempts or self._attempts - await self._page.wait_for_load_state("networkidle") - recaptcha_frame = get_recaptcha_frame(self._page.frames) - recaptcha_checkbox = get_recaptcha_checkbox(self._page.frames) + attempts = attempts or self._attempts + recaptcha_box = await AsyncRecaptchaBox.from_frames(self._page.frames) - if await recaptcha_checkbox.is_hidden(): + if await recaptcha_box.checkbox.is_hidden(): raise RecaptchaNotFoundError - await recaptcha_checkbox.click(force=True) - audio_challenge_text = recaptcha_frame.get_by_text("Press PLAY to listen") - - audio_challenge_button = recaptcha_frame.get_by_role( - "button", name="Get an audio challenge" - ) + await recaptcha_box.checkbox.click(force=True) while True: if ( - await audio_challenge_text.is_visible() - or await audio_challenge_button.is_visible() - and await audio_challenge_button.is_enabled() + await recaptcha_box.audio_challenge_is_visible() + or await recaptcha_box.audio_challenge_button.is_visible() + and await recaptcha_box.audio_challenge_button.is_enabled() ): break - if await recaptcha_checkbox.is_checked(): + if ( + not recaptcha_box.frames_are_attached() + or await recaptcha_box.checkbox.is_checked() + ): if self.token is None: raise RecaptchaSolveError @@ -294,30 +263,29 @@ async def solve_recaptcha(self, attempts: Optional[int] = None) -> str: await self._page.wait_for_timeout(100) - new_challenge_button = recaptcha_frame.get_by_role( - "button", name="Get a new challenge" - ) - while attempts > 0: await self._random_delay() - url = await self._get_audio_url(recaptcha_frame) + url = await self._get_audio_url(recaptcha_box) text = await self._convert_audio_to_text(url) if text is None: - await new_challenge_button.click() + await recaptcha_box.new_challenge_button.click() attempts -= 1 continue await self._random_delay() - await self._submit_audio_text(recaptcha_frame, recaptcha_checkbox, text) + await self._submit_audio_text(recaptcha_box, text) - if recaptcha_frame.is_detached() or await recaptcha_checkbox.is_checked(): + if ( + not recaptcha_box.frames_are_attached() + or await recaptcha_box.checkbox.is_checked() + ): if self.token is None: raise RecaptchaSolveError return self.token - await new_challenge_button.click() + await recaptcha_box.new_challenge_button.click() attempts -= 1 raise RecaptchaSolveError diff --git a/playwright_recaptcha/recaptchav2/recaptcha_box.py b/playwright_recaptcha/recaptchav2/recaptcha_box.py new file mode 100644 index 0000000..2d0e2f8 --- /dev/null +++ b/playwright_recaptcha/recaptchav2/recaptcha_box.py @@ -0,0 +1,503 @@ +from __future__ import annotations + +import re +from abc import ABC, abstractmethod +from typing import Iterable, List, Tuple, Union + +from playwright.async_api import Frame as AsyncFrame +from playwright.async_api import Locator as AsyncLocator +from playwright.sync_api import Frame as SyncFrame +from playwright.sync_api import Locator as SyncLocator + +from playwright_recaptcha.errors import RecaptchaNotFoundError, RecaptchaSolveError + +Locator = Union[AsyncLocator, SyncLocator] +Frame = Union[AsyncFrame, SyncFrame] + + +class RecaptchaBox(ABC): + """ + The base class for reCAPTCHA v2 boxes. + + Attributes + ---------- + anchor_frame : Frame + The reCAPTCHA anchor frame. + bframe_frame : Frame + The reCAPTCHA bframe frame. + checkbox : Locator + The reCAPTCHA checkbox locator. + audio_challenge_button : Locator + The reCAPTCHA audio challenge button locator. + new_challenge_button : Locator + The reCAPTCHA new challenge button locator. + audio_download_button : Locator + The reCAPTCHA audio download button locator. + audio_challenge_textbox : Locator + The reCAPTCHA audio challenge textbox locator. + audio_challenge_verify_button : Locator + The reCAPTCHA audio challenge verify button locator. + + Methods + ------- + from_frames(frames: Iterable[Frame]) -> Union[AsyncRecaptchaBox, SyncRecaptchaBox] + Create a reCAPTCHA box using a list of frames. + frames_are_attached() + Check if the reCAPTCHA frames are attached. + rate_limit_is_visible() + Check if the reCAPTCHA rate limit message is visible. + solve_failure_is_visible() + Check if the reCAPTCHA solve failure message is visible. + audio_challenge_is_visible() + Check if the reCAPTCHA audio challenge is visible. + + Raises + ------ + RecaptchaNotFoundError + If the reCAPTCHA was not found. + RecaptchaSolveError + If no unchecked reCAPTCHA boxes were found. + """ + + @staticmethod + def _get_recaptcha_frame_pairs( + frames: Iterable[Frame], + ) -> List[Tuple[Frame, Frame]]: + """ + Get the reCAPTCHA anchor and bframe frame pairs. + + Parameters + ---------- + frames : Iterable[Frame] + A list of frames to search for the reCAPTCHA anchor and bframe frames. + + Returns + ------- + List[Tuple[Frame, Frame]] + A list of reCAPTCHA anchor and bframe frame pairs. + + Raises + ------ + RecaptchaNotFoundError + If no reCAPTCHA anchor and bframe frame pairs were found. + """ + anchor_frames = list( + filter( + lambda frame: re.search( + "/recaptcha/(api2|enterprise)/anchor", frame.url + ) + is not None, + frames, + ) + ) + + bframe_frames = list( + filter( + lambda frame: re.search( + "/recaptcha/(api2|enterprise)/bframe", frame.url + ) + is not None, + frames, + ) + ) + + frame_pairs = [] + + for anchor_frame in anchor_frames: + frame_id = anchor_frame.name[2:] + + for bframe_frame in bframe_frames: + if frame_id in bframe_frame.name: + frame_pairs.append((anchor_frame, bframe_frame)) + + if not frame_pairs: + raise RecaptchaNotFoundError + + return frame_pairs + + @property + def checkbox(self) -> Locator: + """The reCAPTCHA checkbox locator.""" + return self.anchor_frame.get_by_role("checkbox", name="I'm not a robot") + + @property + def audio_challenge_button(self) -> Locator: + """The reCAPTCHA audio challenge button locator.""" + return self.bframe_frame.get_by_role("button", name="Get an audio challenge") + + @property + def new_challenge_button(self) -> Locator: + """The reCAPTCHA new challenge button locator.""" + return self.bframe_frame.get_by_role("button", name="Get a new challenge") + + @property + def audio_download_button(self) -> Locator: + """The reCAPTCHA audio download button locator.""" + return self.bframe_frame.get_by_role( + "link", name="Alternatively, download audio as MP3" + ) + + @property + def audio_challenge_textbox(self) -> Locator: + """The reCAPTCHA audio challenge textbox locator.""" + return self.bframe_frame.get_by_role("textbox", name="Enter what you hear") + + @property + def audio_challenge_verify_button(self) -> Locator: + """The reCAPTCHA audio challenge verify button locator.""" + return self.bframe_frame.get_by_role("button", name="Verify") + + def frames_are_attached(self) -> bool: + """ + Check if the reCAPTCHA frames are attached. + + Returns + ------- + bool + True if the reCAPTCHA frames are attached, False otherwise. + """ + return ( + not self.bframe_frame.is_detached() and not self.anchor_frame.is_detached() + ) + + @property + @abstractmethod + def anchor_frame(self) -> Frame: + """The reCAPTCHA anchor frame.""" + + @property + @abstractmethod + def bframe_frame(self) -> Frame: + """The reCAPTCHA bframe frame.""" + + @classmethod + @abstractmethod + def from_frames( + cls, + frames: Iterable[Frame], + ) -> Union[AsyncRecaptchaBox, SyncRecaptchaBox]: + """ + Create a reCAPTCHA box using a list of frames. + + Parameters + ---------- + frames : Iterable[Frame] + A list of frames to search for the reCAPTCHA frames. + + Returns + ------- + Union[AsyncRecaptchaBox, SyncRecaptchaBox] + The reCAPTCHA box. + + Raises + ------ + RecaptchaNotFoundError + If the reCAPTCHA frames were not found. + RecaptchaSolveError + If no unchecked reCAPTCHA boxes were found. + """ + + @abstractmethod + def rate_limit_is_visible(self) -> bool: + """ + Check if the reCAPTCHA rate limit message is visible. + + Returns + ------- + bool + True if the reCAPTCHA rate limit message is visible, False otherwise. + """ + + @abstractmethod + def solve_failure_is_visible(self) -> bool: + """ + Check if the reCAPTCHA solve failure message is visible. + + Returns + ------- + bool + True if the reCAPTCHA solve failure message is visible, False otherwise. + """ + + @abstractmethod + def audio_challenge_is_visible(self) -> bool: + """ + Check if the reCAPTCHA audio challenge is visible. + + Returns + ------- + bool + True if the reCAPTCHA audio challenge is visible, False otherwise. + """ + + +class SyncRecaptchaBox(RecaptchaBox): + """ + The synchronous class for reCAPTCHA v2 boxes. + + Parameters + ---------- + anchor_frame : SyncFrame + The reCAPTCHA anchor frame. + bframe_frame : SyncFrame + The reCAPTCHA bframe frame. + + Attributes + ---------- + anchor_frame : Frame + The reCAPTCHA anchor frame. + bframe_frame : Frame + The reCAPTCHA bframe frame. + checkbox : Locator + The reCAPTCHA checkbox locator. + audio_challenge_button : Locator + The reCAPTCHA audio challenge button locator. + new_challenge_button : Locator + The reCAPTCHA new challenge button locator. + audio_download_button : Locator + The reCAPTCHA audio download button locator. + audio_challenge_textbox : Locator + The reCAPTCHA audio challenge textbox locator. + audio_challenge_verify_button : Locator + The reCAPTCHA audio challenge verify button locator. + + Methods + ------- + from_frames(frames: Iterable[SyncFrame]) -> SyncRecaptchaBox + Create a reCAPTCHA box using a list of frames. + frames_are_attached() + Check if the reCAPTCHA frames are attached. + rate_limit_is_visible() + Check if the reCAPTCHA rate limit message is visible. + solve_failure_is_visible() + Check if the reCAPTCHA solve failure message is visible. + audio_challenge_is_visible() + Check if the reCAPTCHA audio challenge is visible. + + Raises + ------ + RecaptchaNotFoundError + If the reCAPTCHA was not found. + RecaptchaSolveError + If no unchecked reCAPTCHA boxes were found. + """ + + def __init__(self, anchor_frame: SyncFrame, bframe_frame: SyncFrame) -> None: + self._anchor_frame = anchor_frame + self._bframe_frame = bframe_frame + + def __repr__(self) -> str: + return f"SyncRecaptchaBox(anchor_frame={self._anchor_frame!r}, bframe_frame={self._bframe_frame!r})" + + @classmethod + def from_frames(cls, frames: Iterable[SyncFrame]) -> SyncRecaptchaBox: + """ + Create a reCAPTCHA box using a list of frames. + + Parameters + ---------- + frames : Iterable[SyncFrame] + A list of frames to search for the reCAPTCHA frames. + + Returns + ------- + SyncRecaptchaBox + The reCAPTCHA box. + + Raises + ------ + RecaptchaNotFoundError + If the reCAPTCHA frames were not found. + RecaptchaSolveError + If no unchecked reCAPTCHA boxes were found. + """ + frame_pairs = cls._get_recaptcha_frame_pairs(frames) + + for anchor_frame, bframe_frame in frame_pairs: + if not anchor_frame.get_by_role( + "checkbox", name="I'm not a robot" + ).is_checked(): + return cls(anchor_frame, bframe_frame) + + raise RecaptchaSolveError("No unchecked reCAPTCHA boxes were found.") + + @property + def anchor_frame(self) -> SyncFrame: + """The reCAPTCHA anchor frame.""" + return self._anchor_frame + + @property + def bframe_frame(self) -> SyncFrame: + """The reCAPTCHA bframe frame.""" + return self._bframe_frame + + def rate_limit_is_visible(self) -> bool: + """ + Check if the reCAPTCHA rate limit message is visible. + + Returns + ------- + bool + True if the reCAPTCHA rate limit message is visible, False otherwise. + """ + return self.bframe_frame.get_by_text("Try again later").is_visible() + + def solve_failure_is_visible(self) -> bool: + """ + Check if the reCAPTCHA solve failure message is visible. + + Returns + ------- + bool + True if the reCAPTCHA solve failure message is visible, False otherwise. + """ + return self.bframe_frame.get_by_text( + "Multiple correct solutions required - please solve more." + ).is_visible() + + def audio_challenge_is_visible(self) -> bool: + """ + Check if the reCAPTCHA audio challenge is visible. + + Returns + ------- + bool + True if the reCAPTCHA audio challenge is visible, False otherwise. + """ + return self.bframe_frame.get_by_text("Press PLAY to listen").is_visible() + + +class AsyncRecaptchaBox(RecaptchaBox): + """ + The asynchronous class for reCAPTCHA v2 boxes. + + Parameters + ---------- + anchor_frame : AsyncFrame + The reCAPTCHA anchor frame. + bframe_frame : AsyncFrame + The reCAPTCHA bframe frame. + + Attributes + ---------- + anchor_frame : Frame + The reCAPTCHA anchor frame. + bframe_frame : Frame + The reCAPTCHA bframe frame. + checkbox : Locator + The reCAPTCHA checkbox locator. + audio_challenge_button : Locator + The reCAPTCHA audio challenge button locator. + new_challenge_button : Locator + The reCAPTCHA new challenge button locator. + audio_download_button : Locator + The reCAPTCHA audio download button locator. + audio_challenge_textbox : Locator + The reCAPTCHA audio challenge textbox locator. + audio_challenge_verify_button : Locator + The reCAPTCHA audio challenge verify button locator. + + Methods + ------- + from_frames(frames: Iterable[AsyncFrame]) -> AsyncRecaptchaBox + Create a reCAPTCHA box using a list of frames. + frames_are_attached() + Check if the reCAPTCHA frames are attached. + rate_limit_is_visible() + Check if the reCAPTCHA rate limit message is visible. + solve_failure_is_visible() + Check if the reCAPTCHA solve failure message is visible. + audio_challenge_is_visible() + Check if the reCAPTCHA audio challenge is visible. + + Raises + ------ + RecaptchaNotFoundError + If the reCAPTCHA was not found. + RecaptchaSolveError + If no unchecked reCAPTCHA boxes were found. + """ + + def __init__(self, anchor_frame: AsyncFrame, bframe_frame: AsyncFrame) -> None: + self._anchor_frame = anchor_frame + self._bframe_frame = bframe_frame + + def __repr__(self) -> str: + return f"AsyncRecaptchaBox(anchor_frame={self._anchor_frame!r}, bframe_frame={self._bframe_frame!r})" + + @classmethod + async def from_frames(cls, frames: Iterable[AsyncFrame]) -> AsyncRecaptchaBox: + """ + Create a reCAPTCHA box using a list of frames. + + Parameters + ---------- + frames : Iterable[AsyncFrame] + A list of frames to search for the reCAPTCHA frames. + + Returns + ------- + AsyncRecaptchaBox + The reCAPTCHA box. + + Raises + ------ + RecaptchaNotFoundError + If the reCAPTCHA frames were not found. + RecaptchaSolveError + If no unchecked reCAPTCHA boxes were found. + """ + frame_pairs = cls._get_recaptcha_frame_pairs(frames) + + for anchor_frame, bframe_frame in frame_pairs: + if not await anchor_frame.get_by_role( + "checkbox", name="I'm not a robot" + ).is_checked(): + return cls(anchor_frame, bframe_frame) + + raise RecaptchaSolveError("No unchecked reCAPTCHA boxes were found.") + + @property + def anchor_frame(self) -> AsyncFrame: + """The reCAPTCHA anchor frame.""" + return self._anchor_frame + + @property + def bframe_frame(self) -> AsyncFrame: + """The reCAPTCHA bframe frame.""" + return self._bframe_frame + + async def rate_limit_is_visible(self) -> bool: + """ + Check if the reCAPTCHA rate limit message is visible. + + Returns + ------- + bool + True if the reCAPTCHA rate limit message is visible, False otherwise. + """ + return await self.bframe_frame.get_by_text("Try again later").is_visible() + + async def solve_failure_is_visible(self) -> bool: + """ + Check if the reCAPTCHA solve failure message is visible. + + Returns + ------- + bool + True if the reCAPTCHA solve failure message is visible, False otherwise. + """ + return await self.bframe_frame.get_by_text( + "Multiple correct solutions required - please solve more." + ).is_visible() + + async def audio_challenge_is_visible(self) -> bool: + """ + Check if the reCAPTCHA audio challenge is visible. + + Returns + ------- + bool + True if the reCAPTCHA audio challenge is visible, False otherwise. + """ + return await self.bframe_frame.get_by_text("Press PLAY to listen").is_visible() diff --git a/playwright_recaptcha/recaptchav2/sync_solver.py b/playwright_recaptcha/recaptchav2/sync_solver.py index 18267db..095197f 100644 --- a/playwright_recaptcha/recaptchav2/sync_solver.py +++ b/playwright_recaptcha/recaptchav2/sync_solver.py @@ -8,17 +8,14 @@ import httpx import pydub import speech_recognition -from playwright.sync_api import Frame, Locator, Page, Response +from playwright.sync_api import Page, Response from playwright_recaptcha.errors import ( RecaptchaNotFoundError, RecaptchaRateLimitError, RecaptchaSolveError, ) -from playwright_recaptcha.recaptchav2.utils import ( - get_recaptcha_checkbox, - get_recaptcha_frame, -) +from playwright_recaptcha.recaptchav2.recaptcha_box import SyncRecaptchaBox class SyncSolver: @@ -68,6 +65,36 @@ def __enter__(self) -> SyncSolver: def __exit__(self, *args: Any) -> None: self.close() + @staticmethod + def _convert_audio_to_text(audio_url: str) -> Optional[str]: + """ + Convert the reCAPTCHA audio to text. + + Parameters + ---------- + audio_url : str + The reCAPTCHA audio URL. + + Returns + ------- + Optional[str] + The reCAPTCHA audio text. + """ + response = httpx.get(audio_url) + + wav_audio = io.BytesIO() + mp3_audio = io.BytesIO(response.content) + audio = pydub.AudioSegment.from_mp3(mp3_audio) + audio.export(wav_audio, format="wav") + + recognizer = speech_recognition.Recognizer() + + with speech_recognition.AudioFile(wav_audio) as source: + audio_data = recognizer.record(source) + + text = recognizer.recognize_google(audio_data, show_all=True) + return text["alternative"][0]["transcript"] if text else None + def _random_delay(self) -> None: """Delay the execution for a random amount of time between 1 and 4 seconds.""" self._page.wait_for_timeout(random.randint(1000, 4000)) @@ -89,14 +116,14 @@ def _extract_token(self, response: Response) -> None: if token_match is not None: self.token = token_match.group(1) - def _get_audio_url(self, recaptcha_frame: Frame) -> str: + def _get_audio_url(self, recaptcha_box: SyncRecaptchaBox) -> str: """ Get the reCAPTCHA audio URL. Parameters ---------- - recaptcha_frame : Frame - The reCAPTCHA frame. + recaptcha_box : SyncRecaptchaBox + The reCAPTCHA box. Returns ------- @@ -108,73 +135,28 @@ def _get_audio_url(self, recaptcha_frame: Frame) -> str: RecaptchaRateLimitError If the reCAPTCHA rate limit has been exceeded. """ - audio_challenge_button = recaptcha_frame.get_by_role( - "button", name="Get an audio challenge" - ) - - if audio_challenge_button.is_visible(): - audio_challenge_button.click(force=True) - - audio_challenge_text = recaptcha_frame.get_by_text("Press PLAY to listen") - rate_limit = recaptcha_frame.get_by_text("Try again later") + if recaptcha_box.audio_challenge_button.is_visible(): + recaptcha_box.audio_challenge_button.click(force=True) while True: - if audio_challenge_text.is_visible(): + if recaptcha_box.audio_challenge_is_visible(): break - if rate_limit.is_visible(): + if recaptcha_box.rate_limit_is_visible(): raise RecaptchaRateLimitError self._page.wait_for_timeout(100) - audio_url = recaptcha_frame.get_by_role( - "link", name="Alternatively, download audio as MP3" - ) - - return audio_url.get_attribute("href") - - @staticmethod - def _convert_audio_to_text(audio_url: str) -> Optional[str]: - """ - Convert the reCAPTCHA audio to text. - - Parameters - ---------- - audio_url : str - The reCAPTCHA audio URL. - - Returns - ------- - Optional[str] - The reCAPTCHA audio text. - """ - response = httpx.get(audio_url) - - wav_audio = io.BytesIO() - mp3_audio = io.BytesIO(response.content) - audio = pydub.AudioSegment.from_mp3(mp3_audio) - audio.export(wav_audio, format="wav") - - recognizer = speech_recognition.Recognizer() - - with speech_recognition.AudioFile(wav_audio) as source: - audio_data = recognizer.record(source) - - text = recognizer.recognize_google(audio_data, show_all=True) - return text["alternative"][0]["transcript"] if text else None + return recaptcha_box.audio_download_button.get_attribute("href") - def _submit_audio_text( - self, recaptcha_frame: Frame, recaptcha_checkbox: Locator, text: str - ) -> None: + def _submit_audio_text(self, recaptcha_box: SyncRecaptchaBox, text: str) -> None: """ Submit the reCAPTCHA audio text. Parameters ---------- - recaptcha_frame : Frame - The reCAPTCHA frame. - recaptcha_checkbox : Locator - The reCAPTCHA checkbox. + recaptcha_box : SyncRecaptchaBox + The reCAPTCHA box. text : str The reCAPTCHA audio text. @@ -183,20 +165,17 @@ def _submit_audio_text( RecaptchaRateLimitError If the reCAPTCHA rate limit has been exceeded. """ - recaptcha_frame.get_by_role("textbox", name="Enter what you hear").fill(text) - recaptcha_frame.get_by_role("button", name="Verify").click() - - solve_failure = recaptcha_frame.get_by_text( - "Multiple correct solutions required - please solve more." - ) + recaptcha_box.audio_challenge_textbox.fill(text) + recaptcha_box.audio_challenge_verify_button.click() - rate_limit = recaptcha_frame.get_by_text("Try again later") - - while not recaptcha_frame.is_detached(): - if recaptcha_checkbox.is_checked() or solve_failure.is_visible(): + while recaptcha_box.frames_are_attached(): + if ( + recaptcha_box.checkbox.is_checked() + or recaptcha_box.solve_failure_is_visible() + ): break - if rate_limit.is_visible(): + if recaptcha_box.rate_limit_is_visible(): raise RecaptchaRateLimitError self._page.wait_for_timeout(100) @@ -231,32 +210,29 @@ def solve_recaptcha(self, attempts: Optional[int] = None) -> str: RecaptchaSolveError If the reCAPTCHA could not be solved. """ + self.token = None self._page.on("response", self._extract_token) - attempts = attempts or self._attempts - self._page.wait_for_load_state("networkidle") - recaptcha_frame = get_recaptcha_frame(self._page.frames) - recaptcha_checkbox = get_recaptcha_checkbox(self._page.frames) + attempts = attempts or self._attempts + recaptcha_box = SyncRecaptchaBox.from_frames(self._page.frames) - if recaptcha_checkbox.is_hidden(): + if recaptcha_box.checkbox.is_hidden(): raise RecaptchaNotFoundError - recaptcha_checkbox.click(force=True) - audio_challenge_text = recaptcha_frame.get_by_text("Press PLAY to listen") - - audio_challenge_button = recaptcha_frame.get_by_role( - "button", name="Get an audio challenge" - ) + recaptcha_box.checkbox.click(force=True) while True: if ( - audio_challenge_text.is_visible() - or audio_challenge_button.is_visible() - and audio_challenge_button.is_enabled() + recaptcha_box.audio_challenge_is_visible() + or recaptcha_box.audio_challenge_button.is_visible() + and recaptcha_box.audio_challenge_button.is_enabled() ): break - if recaptcha_checkbox.is_checked(): + if ( + not recaptcha_box.frames_are_attached() + or recaptcha_box.checkbox.is_checked() + ): if self.token is None: raise RecaptchaSolveError @@ -264,30 +240,29 @@ def solve_recaptcha(self, attempts: Optional[int] = None) -> str: self._page.wait_for_timeout(100) - new_challenge_button = recaptcha_frame.get_by_role( - "button", name="Get a new challenge" - ) - while attempts > 0: self._random_delay() - url = self._get_audio_url(recaptcha_frame) + url = self._get_audio_url(recaptcha_box) text = self._convert_audio_to_text(url) if text is None: - new_challenge_button.click() + recaptcha_box.new_challenge_button.click() attempts -= 1 continue self._random_delay() - self._submit_audio_text(recaptcha_frame, recaptcha_checkbox, text) + self._submit_audio_text(recaptcha_box, text) - if recaptcha_frame.is_detached() or recaptcha_checkbox.is_checked(): + if ( + not recaptcha_box.frames_are_attached() + or recaptcha_box.checkbox.is_checked() + ): if self.token is None: raise RecaptchaSolveError return self.token - new_challenge_button.click() + recaptcha_box.new_challenge_button.click() attempts -= 1 raise RecaptchaSolveError diff --git a/playwright_recaptcha/recaptchav2/utils.py b/playwright_recaptcha/recaptchav2/utils.py deleted file mode 100644 index 540c303..0000000 --- a/playwright_recaptcha/recaptchav2/utils.py +++ /dev/null @@ -1,65 +0,0 @@ -import re -from typing import Iterable, Union - -from playwright.async_api import Frame as AsyncFrame -from playwright.async_api import Locator as AsyncLocator -from playwright.sync_api import Frame as SyncFrame -from playwright.sync_api import Locator as SyncLocator - -from playwright_recaptcha.errors import RecaptchaNotFoundError - - -def get_recaptcha_frame( - frames: Iterable[Union[AsyncFrame, SyncFrame]] -) -> Union[AsyncFrame, SyncFrame]: - """ - Find the reCAPTCHA frame in the provided list of frames. - - Parameters - ---------- - frames : Iterable[Union[AsyncFrame, SyncFrame]] - A list of frames to search for the reCAPTCHA frame. - - Returns - ------- - Union[AsyncFrame, SyncFrame] - The reCAPTCHA frame. - - Raises - ------ - RecaptchaNotFoundError - If the reCAPTCHA frame was not found. - """ - for frame in frames: - if re.search("/recaptcha/(api2|enterprise)/bframe", frame.url) is not None: - return frame - - raise RecaptchaNotFoundError - - -def get_recaptcha_checkbox( - frames: Iterable[Union[AsyncFrame, SyncFrame]] -) -> Union[AsyncLocator, SyncLocator]: - """ - Find the reCAPTCHA checkbox in the provided list of frames. - - Parameters - ---------- - frames : Iterable[Union[AsyncFrame, SyncFrame]] - A list of frames to search for the reCAPTCHA checkbox. - - Returns - ------- - Union[AsyncLocator, SyncLocator] - The reCAPTCHA checkbox. - - Raises - ------ - RecaptchaNotFoundError - If the reCAPTCHA checkbox was not found. - """ - for frame in frames: - if re.search("/recaptcha/(api2|enterprise)/anchor", frame.url) is not None: - return frame.get_by_role("checkbox", name="I'm not a robot") - - raise RecaptchaNotFoundError diff --git a/playwright_recaptcha/recaptchav3/async_solver.py b/playwright_recaptcha/recaptchav3/async_solver.py index a5a9478..088e93a 100644 --- a/playwright_recaptcha/recaptchav3/async_solver.py +++ b/playwright_recaptcha/recaptchav3/async_solver.py @@ -18,7 +18,7 @@ class AsyncSolver: page : Page The playwright page to solve the reCAPTCHA on. timeout : int, optional - The timeout in seconds, by default 30. + The solve timeout in seconds, by default 30. Attributes ---------- @@ -35,7 +35,7 @@ class AsyncSolver: Raises ------ RecaptchaTimeoutError - If the timeout has been exceeded. + If the solve timeout has been exceeded. RecaptchaVersionError If the reCAPTCHA is not version 3. """ @@ -85,7 +85,7 @@ async def solve_recaptcha(self, timeout: Optional[int] = None) -> str: Parameters ---------- timeout : Optional[int], optional - The timeout in seconds, by default 30. + The solve timeout in seconds, by default 30. Returns ------- @@ -95,11 +95,13 @@ async def solve_recaptcha(self, timeout: Optional[int] = None) -> str: Raises ------ RecaptchaTimeoutError - If the timeout has been exceeded. + If the solve timeout has been exceeded. RecaptchaVersionError If the reCAPTCHA is not version 3. """ + self.token = None self._page.on("response", self._extract_token) + timeout = timeout or self._timeout start_time = time.time() diff --git a/playwright_recaptcha/recaptchav3/sync_solver.py b/playwright_recaptcha/recaptchav3/sync_solver.py index c09e41d..b1ffa3c 100644 --- a/playwright_recaptcha/recaptchav3/sync_solver.py +++ b/playwright_recaptcha/recaptchav3/sync_solver.py @@ -18,7 +18,7 @@ class SyncSolver: page : Page The playwright page to solve the reCAPTCHA on. timeout : int, optional - The timeout in seconds, by default 30. + The solve timeout in seconds, by default 30. Attributes ---------- @@ -35,7 +35,7 @@ class SyncSolver: Raises ------ RecaptchaTimeoutError - If the timeout has been exceeded. + If the solve timeout has been exceeded. RecaptchaVersionError If the reCAPTCHA is not version 3. """ @@ -85,7 +85,7 @@ def solve_recaptcha(self, timeout: Optional[int] = None) -> str: Parameters ---------- timeout : Optional[int], optional - The timeout in seconds, by default 30. + The solve timeout in seconds, by default 30. Returns ------- @@ -95,11 +95,13 @@ def solve_recaptcha(self, timeout: Optional[int] = None) -> str: Raises ------ RecaptchaTimeoutError - If the timeout has been exceeded. + If the solve timeout has been exceeded. RecaptchaVersionError If the reCAPTCHA is not version 3. """ + self.token = None self._page.on("response", self._extract_token) + timeout = timeout or self._timeout start_time = time.time()