diff --git a/exiter/exiter.go b/exiter/exiter.go new file mode 100644 index 0000000..e67c449 --- /dev/null +++ b/exiter/exiter.go @@ -0,0 +1,106 @@ +package exiter + +import ( + "context" + "fmt" + "sync" + "time" +) + +type Exiter interface { + SetSeedCount(int) + SetCancelFunc(context.CancelFunc) + IncrSeedCompleted(int) + IncrPlacesFound(int) + IncrPlacesCompleted(int) + Run(context.Context) +} + +type exiter struct { + seedCount int + seedCompleted int + placesFound int + placesCompleted int + + mu *sync.Mutex + cancelFunc context.CancelFunc +} + +func New() Exiter { + return &exiter{ + mu: &sync.Mutex{}, + } +} + +func (e *exiter) SetSeedCount(val int) { + e.mu.Lock() + defer e.mu.Unlock() + + e.seedCount = val +} + +func (e *exiter) SetCancelFunc(fn context.CancelFunc) { + e.mu.Lock() + defer e.mu.Unlock() + + e.cancelFunc = fn +} + +func (e *exiter) IncrSeedCompleted(val int) { + e.mu.Lock() + defer e.mu.Unlock() + + e.seedCompleted += val +} + +func (e *exiter) IncrPlacesFound(val int) { + e.mu.Lock() + defer e.mu.Unlock() + + e.placesFound += val +} + +func (e *exiter) IncrPlacesCompleted(val int) { + e.mu.Lock() + defer e.mu.Unlock() + + e.placesCompleted += val +} + +func (e *exiter) Run(ctx context.Context) { + ticker := time.NewTicker(time.Second * 5) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if e.isDone() { + e.cancelFunc() + + return + } + } + } +} + +func (e *exiter) isDone() bool { + e.mu.Lock() + defer e.mu.Unlock() + + fmt.Println("seedCount:", e.seedCount) + fmt.Println("seedCompleted:", e.seedCompleted) + fmt.Println("placesFound:", e.placesFound) + fmt.Println("placesCompleted:", e.placesCompleted) + + if e.seedCompleted != e.seedCount { + return false + } + + if e.placesFound != e.placesCompleted { + return false + } + + return true +} diff --git a/gmaps/emailjob.go b/gmaps/emailjob.go index 143c294..a3c716f 100644 --- a/gmaps/emailjob.go +++ b/gmaps/emailjob.go @@ -5,17 +5,21 @@ import ( "strings" "github.com/PuerkitoBio/goquery" + "github.com/gosom/google-maps-scraper/exiter" "github.com/gosom/scrapemate" "github.com/mcnijman/go-emailaddress" ) +type EmailExtractJobOptions func(*EmailExtractJob) + type EmailExtractJob struct { scrapemate.Job - Entry *Entry + Entry *Entry + ExitMonitor exiter.Exiter } -func NewEmailJob(parentID string, entry *Entry) *EmailExtractJob { +func NewEmailJob(parentID string, entry *Entry, opts ...EmailExtractJobOptions) *EmailExtractJob { const ( defaultPrio = scrapemate.PriorityHigh defaultMaxRetries = 0 @@ -33,15 +37,31 @@ func NewEmailJob(parentID string, entry *Entry) *EmailExtractJob { job.Entry = entry + for _, opt := range opts { + opt(&job) + } + return &job } +func WithEmailJobExitMonitor(exitMonitor exiter.Exiter) EmailExtractJobOptions { + return func(j *EmailExtractJob) { + j.ExitMonitor = exitMonitor + } +} + func (j *EmailExtractJob) Process(ctx context.Context, resp *scrapemate.Response) (any, []scrapemate.IJob, error) { defer func() { resp.Document = nil resp.Body = nil }() + defer func() { + if j.ExitMonitor != nil { + j.ExitMonitor.IncrPlacesCompleted(1) + } + }() + log := scrapemate.GetLoggerFromContext(ctx) log.Info("Processing email job", "url", j.URL) diff --git a/gmaps/job.go b/gmaps/job.go index 4249012..bd4c468 100644 --- a/gmaps/job.go +++ b/gmaps/job.go @@ -10,6 +10,7 @@ import ( "github.com/PuerkitoBio/goquery" "github.com/google/uuid" "github.com/gosom/google-maps-scraper/deduper" + "github.com/gosom/google-maps-scraper/exiter" "github.com/gosom/scrapemate" "github.com/playwright-community/playwright-go" ) @@ -23,7 +24,8 @@ type GmapJob struct { LangCode string ExtractEmail bool - Deduper deduper.Deduper + Deduper deduper.Deduper + ExitMonitor exiter.Exiter } func NewGmapJob( @@ -80,6 +82,12 @@ func WithDeduper(d deduper.Deduper) GmapJobOptions { } } +func WithExitMonitor(e exiter.Exiter) GmapJobOptions { + return func(j *GmapJob) { + j.ExitMonitor = e + } +} + func (j *GmapJob) UseInResults() bool { return false } @@ -105,7 +113,12 @@ func (j *GmapJob) Process(ctx context.Context, resp *scrapemate.Response) (any, } else { doc.Find(`div[role=feed] div[jsaction]>a`).Each(func(_ int, s *goquery.Selection) { if href := s.AttrOr("href", ""); href != "" { - nextJob := NewPlaceJob(j.ID, j.LangCode, href, j.ExtractEmail) + jopts := []PlaceJobOptions{} + if j.ExitMonitor != nil { + jopts = append(jopts, WithPlaceJobExitMonitor(j.ExitMonitor)) + } + + nextJob := NewPlaceJob(j.ID, j.LangCode, href, j.ExtractEmail, jopts...) if j.Deduper == nil || j.Deduper.AddIfNotExists(ctx, href) { next = append(next, nextJob) @@ -114,6 +127,11 @@ func (j *GmapJob) Process(ctx context.Context, resp *scrapemate.Response) (any, }) } + if j.ExitMonitor != nil { + j.ExitMonitor.IncrPlacesFound(len(next)) + j.ExitMonitor.IncrSeedCompleted(1) + } + log.Info(fmt.Sprintf("%d places found", len(next))) return nil, next, nil diff --git a/gmaps/place.go b/gmaps/place.go index 162420f..e7a240d 100644 --- a/gmaps/place.go +++ b/gmaps/place.go @@ -7,18 +7,22 @@ import ( "strings" "github.com/google/uuid" + "github.com/gosom/google-maps-scraper/exiter" "github.com/gosom/scrapemate" "github.com/playwright-community/playwright-go" ) +type PlaceJobOptions func(*PlaceJob) + type PlaceJob struct { scrapemate.Job UsageInResultststs bool ExtractEmail bool + ExitMonitor exiter.Exiter } -func NewPlaceJob(parentID, langCode, u string, extractEmail bool) *PlaceJob { +func NewPlaceJob(parentID, langCode, u string, extractEmail bool, opts ...PlaceJobOptions) *PlaceJob { const ( defaultPrio = scrapemate.PriorityMedium defaultMaxRetries = 3 @@ -39,9 +43,19 @@ func NewPlaceJob(parentID, langCode, u string, extractEmail bool) *PlaceJob { job.UsageInResultststs = true job.ExtractEmail = extractEmail + for _, opt := range opts { + opt(&job) + } + return &job } +func WithPlaceJobExitMonitor(exitMonitor exiter.Exiter) PlaceJobOptions { + return func(j *PlaceJob) { + j.ExitMonitor = exitMonitor + } +} + func (j *PlaceJob) Process(_ context.Context, resp *scrapemate.Response) (any, []scrapemate.IJob, error) { defer func() { resp.Document = nil @@ -66,11 +80,18 @@ func (j *PlaceJob) Process(_ context.Context, resp *scrapemate.Response) (any, [ } if j.ExtractEmail && entry.IsWebsiteValidForEmail() { - emailJob := NewEmailJob(j.ID, &entry) + opts := []EmailExtractJobOptions{} + if j.ExitMonitor != nil { + opts = append(opts, WithEmailJobExitMonitor(j.ExitMonitor)) + } + + emailJob := NewEmailJob(j.ID, &entry, opts...) j.UsageInResultststs = false return nil, []scrapemate.IJob{emailJob}, nil + } else if j.ExitMonitor != nil { + j.ExitMonitor.IncrPlacesCompleted(1) } return &entry, nil, err diff --git a/runner/databaserunner/databaserunner.go b/runner/databaserunner/databaserunner.go index ecc6a6e..a1ae324 100644 --- a/runner/databaserunner/databaserunner.go +++ b/runner/databaserunner/databaserunner.go @@ -139,6 +139,7 @@ func (d *dbrunner) produceSeedJobs(ctx context.Context) error { d.cfg.GeoCoordinates, d.cfg.Zoom, nil, + nil, ) if err != nil { return err diff --git a/runner/filerunner/filerunner.go b/runner/filerunner/filerunner.go index 343a08e..5727af5 100644 --- a/runner/filerunner/filerunner.go +++ b/runner/filerunner/filerunner.go @@ -10,6 +10,7 @@ import ( "time" "github.com/gosom/google-maps-scraper/deduper" + "github.com/gosom/google-maps-scraper/exiter" "github.com/gosom/google-maps-scraper/runner" "github.com/gosom/google-maps-scraper/tlmt" "github.com/gosom/scrapemate" @@ -72,6 +73,7 @@ func (r *fileRunner) Run(ctx context.Context) (err error) { }() dedup := deduper.New() + exitMonitor := exiter.New() seedJobs, err = runner.CreateSeedJobs( r.cfg.LangCode, @@ -81,11 +83,21 @@ func (r *fileRunner) Run(ctx context.Context) (err error) { r.cfg.GeoCoordinates, r.cfg.Zoom, dedup, + exitMonitor, ) if err != nil { return err } + exitMonitor.SetSeedCount(len(seedJobs)) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + exitMonitor.SetCancelFunc(cancel) + + go exitMonitor.Run(ctx) + err = r.app.Start(ctx, seedJobs...) return err diff --git a/runner/jobs.go b/runner/jobs.go index b40d4d9..0b56214 100644 --- a/runner/jobs.go +++ b/runner/jobs.go @@ -10,6 +10,7 @@ import ( "strings" "github.com/gosom/google-maps-scraper/deduper" + "github.com/gosom/google-maps-scraper/exiter" "github.com/gosom/google-maps-scraper/gmaps" "github.com/gosom/scrapemate" ) @@ -22,6 +23,7 @@ func CreateSeedJobs( geoCoordinates string, zoom int, dedup deduper.Deduper, + exitMonitor exiter.Exiter, ) (jobs []scrapemate.IJob, err error) { scanner := bufio.NewScanner(r) @@ -39,10 +41,15 @@ func CreateSeedJobs( } opts := []gmaps.GmapJobOptions{} + if dedup != nil { opts = append(opts, gmaps.WithDeduper(dedup)) } + if exitMonitor != nil { + opts = append(opts, gmaps.WithExitMonitor(exitMonitor)) + } + job := gmaps.NewGmapJob(id, langCode, query, maxDepth, email, geoCoordinates, zoom, opts...) jobs = append(jobs, job) diff --git a/runner/webrunner/webrunner.go b/runner/webrunner/webrunner.go index 4018aa1..890fa91 100644 --- a/runner/webrunner/webrunner.go +++ b/runner/webrunner/webrunner.go @@ -13,6 +13,7 @@ import ( "time" "github.com/gosom/google-maps-scraper/deduper" + "github.com/gosom/google-maps-scraper/exiter" "github.com/gosom/google-maps-scraper/runner" "github.com/gosom/google-maps-scraper/tlmt" "github.com/gosom/google-maps-scraper/web" @@ -174,6 +175,7 @@ func (w *webrunner) scrapeJob(ctx context.Context, job *web.Job) error { } dedup := deduper.New() + exitMonitor := exiter.New() seedJobs, err := runner.CreateSeedJobs( job.Data.Lang, @@ -183,6 +185,7 @@ func (w *webrunner) scrapeJob(ctx context.Context, job *web.Job) error { coords, job.Data.Zoom, dedup, + exitMonitor, ) if err != nil { err2 := w.svc.Update(ctx, job) @@ -194,6 +197,8 @@ func (w *webrunner) scrapeJob(ctx context.Context, job *web.Job) error { } if len(seedJobs) > 0 { + exitMonitor.SetSeedCount(len(seedJobs)) + allowedSeconds := max(60, len(seedJobs)*10*job.Data.Depth/50+120) if job.Data.MaxTime > 0 { @@ -209,8 +214,12 @@ func (w *webrunner) scrapeJob(ctx context.Context, job *web.Job) error { mateCtx, cancel := context.WithTimeout(ctx, time.Duration(allowedSeconds)*time.Second) defer cancel() + exitMonitor.SetCancelFunc(cancel) + + go exitMonitor.Run(mateCtx) + err = mate.Start(mateCtx, seedJobs...) - if err != nil && !errors.Is(err, context.DeadlineExceeded) { + if err != nil && !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { cancel() err2 := w.svc.Update(ctx, job)