|
6 | 6 | "fmt"
|
7 | 7 | "io/fs"
|
8 | 8 | "path"
|
| 9 | + "strings" |
9 | 10 | "sync"
|
10 | 11 | "sync/atomic"
|
11 | 12 | "time"
|
@@ -166,6 +167,8 @@ type Ingester struct {
|
166 | 167 |
|
167 | 168 | // Ingest rates
|
168 | 169 | ingestRates *rate.Map
|
| 170 | + |
| 171 | + skip500EntsErr atomic.Bool |
169 | 172 | }
|
170 | 173 |
|
171 | 174 | // NewIngester creates a new Ingester that uses a dagsync Subscriber to handle
|
@@ -208,6 +211,8 @@ func NewIngester(cfg config.Ingest, h host.Host, idxr indexer.Interface, reg *re
|
208 | 211 | return nil, err
|
209 | 212 | }
|
210 | 213 |
|
| 214 | + ing.skip500EntsErr.Store(cfg.Skip500EntriesError) |
| 215 | + |
211 | 216 | // Create and start subscriber. This also registers the storage hook to
|
212 | 217 | // index data as it is received.
|
213 | 218 | sub, err := dagsync.NewSubscriber(h, ing.dsTmp, ing.lsys, cfg.PubSubTopic,
|
@@ -258,6 +263,10 @@ func (ing *Ingester) MultihashesFromMirror() uint64 {
|
258 | 263 | return ing.mhsFromMirror.Load()
|
259 | 264 | }
|
260 | 265 |
|
| 266 | +func (ing *Ingester) Skip500EntriesError(skip bool) { |
| 267 | + ing.skip500EntsErr.Store(skip) |
| 268 | +} |
| 269 | + |
261 | 270 | func (ing *Ingester) generalDagsyncBlockHook(_ peer.ID, c cid.Cid, actions dagsync.SegmentSyncActions) {
|
262 | 271 | // The only kind of block we should get by loading CIDs here should be
|
263 | 272 | // Advertisement.
|
@@ -1114,6 +1123,8 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider peer.ID, as
|
1114 | 1123 | }
|
1115 | 1124 | }
|
1116 | 1125 |
|
| 1126 | + skip500EntsErr := ing.skip500EntsErr.Load() |
| 1127 | + |
1117 | 1128 | total := len(assignment.adInfos)
|
1118 | 1129 | log.Infow("Running worker on ad stack", "headAdCid", headAdCid, "numAdsToProcess", total)
|
1119 | 1130 | var count int
|
@@ -1181,6 +1192,12 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider peer.ID, as
|
1181 | 1192 | log.Errorw("Skipping ad because of a permanent error", "adCid", ai.cid, "err", err, "errKind", adIngestErr.state)
|
1182 | 1193 | stats.Record(context.Background(), metrics.AdIngestSkippedCount.M(1))
|
1183 | 1194 | err = nil
|
| 1195 | + case adIngestSyncEntriesErr: |
| 1196 | + if skip500EntsErr && strings.Contains(err.Error(), "failed to sync first entry") && strings.Contains(err.Error(), ": 500") { |
| 1197 | + log.Errorw("Skipping ad because of a permanent 500 error", "adCid", ai.cid, "err", err, "errKind", adIngestErr.state) |
| 1198 | + stats.Record(context.Background(), metrics.AdIngestSkippedCount.M(1)) |
| 1199 | + err = nil |
| 1200 | + } |
1184 | 1201 | }
|
1185 | 1202 | stats.RecordWithOptions(context.Background(),
|
1186 | 1203 | stats.WithMeasurements(metrics.AdIngestErrorCount.M(1)),
|
|
0 commit comments