Skip to content

Commit

Permalink
Merge pull request #14 from Xewdy444/RecaptchaBox-class-implementation
Browse files Browse the repository at this point in the history
RecaptchaBox class implementation
  • Loading branch information
Xewdy444 authored Feb 19, 2023
2 parents 566fcb2 + cfbc675 commit 66de1cb
Show file tree
Hide file tree
Showing 6 changed files with 671 additions and 286 deletions.
202 changes: 85 additions & 117 deletions playwright_recaptcha/recaptchav2/async_solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,14 @@
import httpx
import pydub
import speech_recognition
from playwright.async_api import Frame, Locator, Page, Response
from playwright.async_api import Page, Response

from playwright_recaptcha.errors import (
RecaptchaNotFoundError,
RecaptchaRateLimitError,
RecaptchaSolveError,
)
from playwright_recaptcha.recaptchav2.utils import (
get_recaptcha_checkbox,
get_recaptcha_frame,
)
from playwright_recaptcha.recaptchav2.recaptcha_box import AsyncRecaptchaBox


class AsyncSolver:
Expand Down Expand Up @@ -71,74 +68,6 @@ async def __aenter__(self) -> AsyncSolver:
async def __aexit__(self, *args: Any) -> None:
self.close()

async def _random_delay(self) -> None:
"""Delay the execution for a random amount of time between 1 and 4 seconds."""
await self._page.wait_for_timeout(random.randint(1000, 4000))

async def _extract_token(self, response: Response) -> None:
"""
Extract the g-recaptcha-response token from the userverify response.
Parameters
----------
response : Response
The response to extract the g-recaptcha-response token from.
"""
if re.search("/recaptcha/(api2|enterprise)/userverify", response.url) is None:
return

token_match = re.search('"uvresp","(.*?)"', await response.text())

if token_match is not None:
self.token = token_match.group(1)

async def _get_audio_url(self, recaptcha_frame: Frame) -> str:
"""
Get the reCAPTCHA audio URL.
Parameters
----------
recaptcha_frame : Frame
The reCAPTCHA frame.
Returns
-------
str
The reCAPTCHA audio URL.
Raises
------
RecaptchaRateLimitError
If the reCAPTCHA rate limit has been exceeded.
"""
audio_challenge_button = recaptcha_frame.get_by_role(
"button", name="Get an audio challenge"
)

if await audio_challenge_button.is_visible():
await audio_challenge_button.click(force=True)

audio_challenge_text = recaptcha_frame.get_by_text("Press PLAY to listen")
rate_limit = recaptcha_frame.get_by_text("Try again later")

while True:
if (
await audio_challenge_text.is_visible()
and await audio_challenge_text.is_enabled()
):
break

if await rate_limit.is_visible():
raise RecaptchaRateLimitError

await self._page.wait_for_timeout(100)

audio_url = recaptcha_frame.get_by_role(
"link", name="Alternatively, download audio as MP3"
)

return await audio_url.get_attribute("href")

@staticmethod
async def _convert_audio_to_text(audio_url: str) -> Optional[str]:
"""
Expand Down Expand Up @@ -187,18 +116,70 @@ async def _convert_audio_to_text(audio_url: str) -> Optional[str]:

return text["alternative"][0]["transcript"] if text else None

async def _random_delay(self) -> None:
"""Delay the execution for a random amount of time between 1 and 4 seconds."""
await self._page.wait_for_timeout(random.randint(1000, 4000))

async def _extract_token(self, response: Response) -> None:
"""
Extract the g-recaptcha-response token from the userverify response.
Parameters
----------
response : Response
The response to extract the g-recaptcha-response token from.
"""
if re.search("/recaptcha/(api2|enterprise)/userverify", response.url) is None:
return

token_match = re.search('"uvresp","(.*?)"', await response.text())

if token_match is not None:
self.token = token_match.group(1)

async def _get_audio_url(self, recaptcha_box: AsyncRecaptchaBox) -> str:
"""
Get the reCAPTCHA audio URL.
Parameters
----------
recaptcha_box : AsyncRecaptchaBox
The reCAPTCHA box.
Returns
-------
str
The reCAPTCHA audio URL.
Raises
------
RecaptchaRateLimitError
If the reCAPTCHA rate limit has been exceeded.
"""
if await recaptcha_box.audio_challenge_button.is_visible():
await recaptcha_box.audio_challenge_button.click(force=True)

while True:
if await recaptcha_box.audio_challenge_is_visible():
break

if await recaptcha_box.rate_limit_is_visible():
raise RecaptchaRateLimitError

await self._page.wait_for_timeout(100)

return await recaptcha_box.audio_download_button.get_attribute("href")

async def _submit_audio_text(
self, recaptcha_frame: Frame, recaptcha_checkbox: Locator, text: str
self, recaptcha_box: AsyncRecaptchaBox, text: str
) -> None:
"""
Submit the reCAPTCHA audio text.
Parameters
----------
recaptcha_frame : Frame
The reCAPTCHA frame.
recaptcha_checkbox : Locator
The reCAPTCHA checkbox.
recaptcha_box : AsyncRecaptchaBox
The reCAPTCHA box.
text : str
The reCAPTCHA audio text.
Expand All @@ -207,26 +188,17 @@ async def _submit_audio_text(
RecaptchaRateLimitError
If the reCAPTCHA rate limit has been exceeded.
"""
textbox = recaptcha_frame.get_by_role("textbox", name="Enter what you hear")
verify_button = recaptcha_frame.get_by_role("button", name="Verify")

await textbox.fill(text)
await verify_button.click()

solve_failure = recaptcha_frame.get_by_text(
"Multiple correct solutions required - please solve more."
)
await recaptcha_box.audio_challenge_textbox.fill(text)
await recaptcha_box.audio_challenge_verify_button.click()

rate_limit = recaptcha_frame.get_by_text("Try again later")

while not recaptcha_frame.is_detached():
while recaptcha_box.frames_are_attached():
if (
await recaptcha_checkbox.is_checked()
or await solve_failure.is_visible()
await recaptcha_box.checkbox.is_checked()
or await recaptcha_box.solve_failure_is_visible()
):
break

if await rate_limit.is_visible():
if await recaptcha_box.rate_limit_is_visible():
raise RecaptchaRateLimitError

await self._page.wait_for_timeout(100)
Expand Down Expand Up @@ -261,63 +233,59 @@ async def solve_recaptcha(self, attempts: Optional[int] = None) -> str:
RecaptchaSolveError
If the reCAPTCHA could not be solved.
"""
self.token = None
self._page.on("response", self._extract_token)
attempts = attempts or self._attempts

await self._page.wait_for_load_state("networkidle")
recaptcha_frame = get_recaptcha_frame(self._page.frames)
recaptcha_checkbox = get_recaptcha_checkbox(self._page.frames)
attempts = attempts or self._attempts
recaptcha_box = await AsyncRecaptchaBox.from_frames(self._page.frames)

if await recaptcha_checkbox.is_hidden():
if await recaptcha_box.checkbox.is_hidden():
raise RecaptchaNotFoundError

await recaptcha_checkbox.click(force=True)
audio_challenge_text = recaptcha_frame.get_by_text("Press PLAY to listen")

audio_challenge_button = recaptcha_frame.get_by_role(
"button", name="Get an audio challenge"
)
await recaptcha_box.checkbox.click(force=True)

while True:
if (
await audio_challenge_text.is_visible()
or await audio_challenge_button.is_visible()
and await audio_challenge_button.is_enabled()
await recaptcha_box.audio_challenge_is_visible()
or await recaptcha_box.audio_challenge_button.is_visible()
and await recaptcha_box.audio_challenge_button.is_enabled()
):
break

if await recaptcha_checkbox.is_checked():
if (
not recaptcha_box.frames_are_attached()
or await recaptcha_box.checkbox.is_checked()
):
if self.token is None:
raise RecaptchaSolveError

return self.token

await self._page.wait_for_timeout(100)

new_challenge_button = recaptcha_frame.get_by_role(
"button", name="Get a new challenge"
)

while attempts > 0:
await self._random_delay()
url = await self._get_audio_url(recaptcha_frame)
url = await self._get_audio_url(recaptcha_box)
text = await self._convert_audio_to_text(url)

if text is None:
await new_challenge_button.click()
await recaptcha_box.new_challenge_button.click()
attempts -= 1
continue

await self._random_delay()
await self._submit_audio_text(recaptcha_frame, recaptcha_checkbox, text)
await self._submit_audio_text(recaptcha_box, text)

if recaptcha_frame.is_detached() or await recaptcha_checkbox.is_checked():
if (
not recaptcha_box.frames_are_attached()
or await recaptcha_box.checkbox.is_checked()
):
if self.token is None:
raise RecaptchaSolveError

return self.token

await new_challenge_button.click()
await recaptcha_box.new_challenge_button.click()
attempts -= 1

raise RecaptchaSolveError
Loading

0 comments on commit 66de1cb

Please sign in to comment.