Skip to content

Commit

Permalink
Bring back ingest ab result to CH (#1223)
Browse files Browse the repository at this point in the history
I decided to revert this change and make it work in V2 architecture. 

Little nonsense in the dependencies, but at the cost of 100% feature
parity 🙃

---------

Signed-off-by: Przemysław Hejman <[email protected]>
  • Loading branch information
mieciu authored Jan 23, 2025
1 parent 6f0cfbb commit 36d4ae6
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 42 deletions.
3 changes: 1 addition & 2 deletions ci/it/testcases/test_ab.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ func (a *ABTestcase) SetupContainers(ctx context.Context) error {
func (a *ABTestcase) RunTests(ctx context.Context, t *testing.T) error {
t.Run("test basic request", func(t *testing.T) { a.testBasicRequest(ctx, t) })
t.Run("test ingest to both connectors", func(t *testing.T) { a.testIngest(ctx, t) })
// TODO: temporarily disabled as we no longer ingest A/B results to CH whereas this test expects it
//t.Run("test A/B queries", func(t *testing.T) { a.testQueries(ctx, t) })
t.Run("test A/B queries with ClickHouse result store", func(t *testing.T) { a.testQueries(ctx, t) })
return nil
}

Expand Down
19 changes: 10 additions & 9 deletions quesma/ab_testing/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (
"github.com/QuesmaOrg/quesma/quesma/ab_testing"
"github.com/QuesmaOrg/quesma/quesma/backend_connectors"
"github.com/QuesmaOrg/quesma/quesma/buildinfo"
"github.com/QuesmaOrg/quesma/quesma/ingest"
"github.com/QuesmaOrg/quesma/quesma/logger"
"github.com/QuesmaOrg/quesma/quesma/quesma/recovery"
"time"
)

const abTestingLogsIndex = "ab_testing_logs"
//const abTestingLogsIndex = "ab_testing_logs"

type ResponseMismatch struct {
IsOK bool `json:"is_ok"` // true if responses are the same
Expand Down Expand Up @@ -70,7 +71,7 @@ func (r *InMemoryCollector) String() string {
return "InMemoryCollector(sends data to Quesma)"
}

func NewCollector(ctx context.Context, healthQueue chan<- ab_testing.HealthMessage, esConn *backend_connectors.ElasticsearchBackendConnector) *InMemoryCollector {
func NewCollector(ctx context.Context, healthQueue chan<- ab_testing.HealthMessage, _ *backend_connectors.ElasticsearchBackendConnector, ingester ingest.Ingester) *InMemoryCollector {

ctx, cancel := context.WithCancel(ctx)

Expand All @@ -88,14 +89,14 @@ func NewCollector(ctx context.Context, healthQueue chan<- ab_testing.HealthMessa
//&ppPrintFanout{},
//&mismatchedOnlyFilter{},
&redactOkResults{},
&elasticSearchFanout{
esConn: esConn,
indexName: abTestingLogsIndex,
},
//&internalIngestFanout{ // due to migration to V2 architecture, ClickHouse ingest is disabled
// indexName: ab_testing.ABTestingTableName,
// ingestProcessor: ingester,
//&elasticSearchFanout{
// esConn: esConn,
// indexName: abTestingLogsIndex,
//},
&internalIngestFanout{
indexName: ab_testing.ABTestingTableName,
ingestProcessor: ingester,
},
},
healthQueue: healthQueue,
processorErrorQueue: make(chan processorErrorMessage, 100),
Expand Down
17 changes: 10 additions & 7 deletions quesma/ab_testing/sender/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/QuesmaOrg/quesma/quesma/ab_testing"
"github.com/QuesmaOrg/quesma/quesma/ab_testing/collector"
"github.com/QuesmaOrg/quesma/quesma/backend_connectors"
"github.com/QuesmaOrg/quesma/quesma/ingest"
"github.com/QuesmaOrg/quesma/quesma/logger"
"github.com/QuesmaOrg/quesma/quesma/quesma/config"
"github.com/QuesmaOrg/quesma/quesma/quesma/recovery"
Expand All @@ -22,11 +23,12 @@ type SenderCoordinator struct {
sender *sender // sender managed by this coordinator

elasticsearchConn *backend_connectors.ElasticsearchBackendConnector
chIngester ingest.Ingester

enabled bool
}

func NewSenderCoordinator(cfg *config.QuesmaConfiguration) *SenderCoordinator {
func NewSenderCoordinator(cfg *config.QuesmaConfiguration, ip ingest.Ingester) *SenderCoordinator {

ctx, cancel := context.WithCancel(context.Background())

Expand All @@ -43,11 +45,12 @@ func NewSenderCoordinator(cfg *config.QuesmaConfiguration) *SenderCoordinator {
}

return &SenderCoordinator{
sender: newSender(ctx),
ctx: ctx,
cancelFunc: cancel,
enabled: len(enabledForIndex) > 0,
elasticsearchConn: backend_connectors.NewElasticsearchBackendConnector(cfg.Elasticsearch),
sender: newSender(ctx),
ctx: ctx,
cancelFunc: cancel,
enabled: len(enabledForIndex) > 0,
//elasticsearchConn: backend_connectors.NewElasticsearchBackendConnector(cfg.Elasticsearch),
chIngester: ip,
// add quesma health monitor service here
}
}
Expand All @@ -61,7 +64,7 @@ func (c *SenderCoordinator) GetSender() ab_testing.Sender {
}

func (c *SenderCoordinator) newInMemoryProcessor(healthQueue chan<- ab_testing.HealthMessage) *collector.InMemoryCollector {
repo := collector.NewCollector(c.ctx, healthQueue, c.elasticsearchConn)
repo := collector.NewCollector(c.ctx, healthQueue, c.elasticsearchConn, c.chIngester)
repo.Start()
return repo
}
Expand Down
2 changes: 1 addition & 1 deletion quesma/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func main() {

quesmaManagementConsole := ui.NewQuesmaManagementConsole(&cfg, lm, qmcLogChannel, phoneHomeAgent, schemaRegistry, tableResolver)

abTestingController := sender.NewSenderCoordinator(&cfg)
abTestingController := sender.NewSenderCoordinator(&cfg, ingestProcessor)
abTestingController.Start()

instance := constructQuesma(&cfg, tableDisco, lm, ingestProcessor, schemaRegistry, phoneHomeAgent, quesmaManagementConsole, qmcLogChannel, abTestingController.GetSender(), tableResolver)
Expand Down
21 changes: 19 additions & 2 deletions quesma/processors/es_to_ch_common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/QuesmaOrg/quesma/quesma/ab_testing/sender"
"github.com/QuesmaOrg/quesma/quesma/clickhouse"
"github.com/QuesmaOrg/quesma/quesma/common_table"
"github.com/QuesmaOrg/quesma/quesma/ingest"
"github.com/QuesmaOrg/quesma/quesma/persistence"
"github.com/QuesmaOrg/quesma/quesma/quesma/config"
"github.com/QuesmaOrg/quesma/quesma/quesma/ui"
Expand Down Expand Up @@ -99,6 +100,7 @@ type LegacyQuesmaDependencies struct {
TableResolver table_resolver.TableResolver
Adminconsole *ui.QuesmaManagementConsole
AbTestingController *sender.SenderCoordinator
IngestProcessor *ingest.IngestProcessor
}

func newLegacyQuesmaDependencies(
Expand All @@ -110,6 +112,7 @@ func newLegacyQuesmaDependencies(
schemaRegistry schema.Registry,
tableResolver table_resolver.TableResolver,
abTestingController *sender.SenderCoordinator,
ingestProcessor *ingest.IngestProcessor,
) *LegacyQuesmaDependencies {
return &LegacyQuesmaDependencies{
DependenciesImpl: baseDependencies,
Expand All @@ -120,6 +123,7 @@ func newLegacyQuesmaDependencies(
SchemaRegistry: schemaRegistry,
TableResolver: tableResolver,
AbTestingController: abTestingController,
IngestProcessor: ingestProcessor,
}
}

Expand All @@ -130,8 +134,21 @@ func InitializeLegacyQuesmaDependencies(baseDeps *quesma_api.DependenciesImpl, o
schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, oldQuesmaConfig, clickhouse.SchemaTypeAdapter{})
schemaRegistry.Start()
dummyTableResolver := table_resolver.NewDummyTableResolver(oldQuesmaConfig.IndexConfig, oldQuesmaConfig.UseCommonTableForWildcard)
abTestingController := sender.NewSenderCoordinator(oldQuesmaConfig)

ingestProcessor := ingest.NewIngestProcessor(
oldQuesmaConfig,
connectionPool,
baseDeps.PhoneHomeAgent(),
tableDisco,
schemaRegistry,
virtualTableStorage,
dummyTableResolver,
)
ingestProcessor.Start()

abTestingController := sender.NewSenderCoordinator(oldQuesmaConfig, ingestProcessor)
abTestingController.Start()
legacyDependencies := newLegacyQuesmaDependencies(*baseDeps, oldQuesmaConfig, connectionPool, *virtualTableStorage, tableDisco, schemaRegistry, dummyTableResolver, abTestingController)

legacyDependencies := newLegacyQuesmaDependencies(*baseDeps, oldQuesmaConfig, connectionPool, *virtualTableStorage, tableDisco, schemaRegistry, dummyTableResolver, abTestingController, ingestProcessor)
return legacyDependencies
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ func (p *ElasticsearchToClickHouseIngestProcessor) InstanceName() string {
}

func (p *ElasticsearchToClickHouseIngestProcessor) Init() error {
// TODO so we initialize the connection pool in `prepareTemporaryIngestProcessor`, maybe we should do it here?
p.legacyIngestProcessor = p.prepareTemporaryIngestProcessor()

p.legacyIngestProcessor = p.legacyDependencies.IngestProcessor
return nil
}

Expand All @@ -70,24 +68,6 @@ func (p *ElasticsearchToClickHouseIngestProcessor) GetSchemaRegistry() schema.Re
return p.legacyIngestProcessor.GetSchemaRegistry()
}

// prepareTemporaryIngestProcessor creates a temporary ingest processor which is a new version of the ingest processor,
// which uses `quesma_api.BackendConnector` instead of `*sql.DB` for the database connection.
func (p *ElasticsearchToClickHouseIngestProcessor) prepareTemporaryIngestProcessor() *ingest.IngestProcessor {

ip := ingest.NewIngestProcessor(
p.legacyDependencies.OldQuesmaConfig,
p.legacyDependencies.ConnectionPool,
p.legacyDependencies.PhoneHomeAgent(),
p.legacyDependencies.TableDiscovery,
p.legacyDependencies.SchemaRegistry,
&p.legacyDependencies.VirtualTableStorage,
p.legacyDependencies.TableResolver,
)

ip.Start()
return ip
}

func (p *ElasticsearchToClickHouseIngestProcessor) Handle(metadata map[string]interface{}, message ...any) (map[string]interface{}, any, error) {
var data []byte

Expand Down

0 comments on commit 36d4ae6

Please sign in to comment.