diff --git a/.circleci/config.yml b/.circleci/config.yml index ddb7037f0..342b85159 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -100,7 +100,7 @@ jobs: - run: name: Run tests no_output_timeout: 20m - command: go test -timeout 20m -v ./... -skip "TestPostgres|TestMySQL|TestMongo|TestRedis" + command: go test -timeout 20m -v ./... -skip "TestPostgres|TestMySQL|TestMongo|TestRedis|TestClickHouse" unit-test-windows: executor: win/server-2022 @@ -139,7 +139,7 @@ jobs: - run: name: Run tests no_output_timeout: 20m - command: go test -timeout 20m -v ./... -skip "TestPostgres|TestMySQL|TestMongo|TestRedis" + command: go test -timeout 20m -v ./... -skip "TestPostgres|TestMySQL|TestMongo|TestRedis|TestClickHouse" workflows: unit-test: diff --git a/.github/workflows/build_test.yml b/.github/workflows/build_test.yml index 7fefe409a..680ad4108 100644 --- a/.github/workflows/build_test.yml +++ b/.github/workflows/build_test.yml @@ -50,6 +50,16 @@ jobs: --health-timeout 5s --health-retries 5 + clickhouse: + image: clickhouse/clickhouse-server:22 + ports: + - 8123 + options: >- + --health-cmd="clickhouse-client --query 'SELECT 1'" + --health-interval=10s + --health-timeout=5s + --health-retries=5 + redis: image: redis/redis-stack:6.2.6-v9 ports: @@ -102,6 +112,8 @@ jobs: POSTGRES_URI: postgres://gorse:gorse_pass@localhost:${{ job.services.postgres.ports[5432] }}/ # MongoDB MONGO_URI: mongodb://root:password@localhost:${{ job.services.mongo.ports[27017] }}/ + # ClickHouse + CLICKHOUSE_URI: clickhouse://localhost:${{ job.services.clickhouse.ports[8123] }}/ # Redis REDIS_URI: redis://localhost:${{ job.services.redis.ports[6379] }}/ @@ -113,7 +125,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - database: [mysql, postgres, mongo] + database: [mysql, postgres, mongo, clickhouse] steps: - uses: actions/checkout@v1 diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index f2d0a0ec6..061fc961d 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -80,6 +80,7 @@ The default database URLs are directed to these databases in `storage/docker-com | `MYSQL_URI` | `mysql://root:password@tcp(127.0.0.1:3306)/` | | `POSTGRES_URI` | `postgres://gorse:gorse_pass@127.0.0.1/` | | `MONGO_URI` | `mongodb://root:password@127.0.0.1:27017/` | +| `CLICKHOUSE_URI` | `clickhouse://127.0.0.1:8123/` | | `REDIS_URI` | `redis://127.0.0.1:6379/` | For example, use TiDB as a test database by: diff --git a/client/docker-compose.yml.j2 b/client/docker-compose.yml.j2 index ac3de45b0..62e745903 100644 --- a/client/docker-compose.yml.j2 +++ b/client/docker-compose.yml.j2 @@ -63,6 +63,22 @@ services: timeout: 5s retries: 5 + {% elif database == 'clickhouse' %} + + clickhouse: + image: clickhouse/clickhouse-server:22 + ports: + - 8123:8123 + environment: + CLICKHOUSE_DB: gorse + CLICKHOUSE_USER: gorse + CLICKHOUSE_PASSWORD: gorse_pass + healthcheck: + test: clickhouse-client --user $$CLICKHOUSE_USER --password $$CLICKHOUSE_PASSWORD --query "SELECT 1" + interval: 10s + timeout: 5s + retries: 5 + {% endif %} worker: @@ -117,6 +133,8 @@ services: GORSE_DATA_STORE: postgres://gorse:gorse_pass@postgres/gorse?sslmode=disable {% elif database == 'mongo' %} GORSE_DATA_STORE: mongodb://root:password@mongo:27017/gorse?authSource=admin&connect=direct + {% elif database == 'clickhouse' %} + GORSE_DATA_STORE: clickhouse://gorse:gorse_pass@clickhouse:8123/gorse?mutations_sync=2 {% endif %} command: > -c /etc/gorse/config.toml @@ -138,6 +156,9 @@ services: {% elif database == 'mongo' %} mongo: condition: service_healthy + {% elif database == 'clickhouse' %} + clickhouse: + condition: service_healthy {% endif %} volumes: diff --git a/config/config.go b/config/config.go index 5a44e2f70..f13d92f0b 100644 --- a/config/config.go +++ b/config/config.go @@ -607,6 +607,9 @@ func (config *Config) Validate(oneModel bool) error { storage.MySQLPrefix, storage.PostgresPrefix, storage.PostgreSQLPrefix, + storage.ClickhousePrefix, + storage.CHHTTPPrefix, + storage.CHHTTPSPrefix, } if oneModel { prefixes = append(prefixes, storage.SQLitePrefix) diff --git a/config/config.toml b/config/config.toml index e0dd91ee0..e6fb194a5 100644 --- a/config/config.toml +++ b/config/config.toml @@ -9,10 +9,13 @@ # mongodb+srv://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]] cache_store = "redis://localhost:6379/0" -# The database for persist data, support MySQL, Postgres and MongoDB: +# The database for persist data, support MySQL, Postgres, ClickHouse and MongoDB: # mysql://[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...¶mN=valueN] # postgres://bob:secret@1.2.3.4:5432/mydb?sslmode=verify-full # postgresql://bob:secret@1.2.3.4:5432/mydb?sslmode=verify-full +# clickhouse://user:password@host[:port]/database?param1=value1&...¶mN=valueN +# chhttp://user:password@host[:port]/database?param1=value1&...¶mN=valueN +# chhttps://user:password@host[:port]/database?param1=value1&...¶mN=valueN # mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]] # mongodb+srv://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]] data_store = "mysql://gorse:gorse_pass@tcp(localhost:3306)/gorse" diff --git a/docker-compose.yml b/docker-compose.yml index 55f9d76c4..27354f54c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -41,6 +41,17 @@ services: # volumes: # - mongo_data:/data/db + # clickhouse: + # image: clickhouse/clickhouse-server:22 + # ports: + # - 8123:8123 + # environment: + # CLICKHOUSE_DB: gorse + # CLICKHOUSE_USER: gorse + # CLICKHOUSE_PASSWORD: gorse_pass + # volumes: + # - clickhouse_data:/var/lib/clickhouse + worker: image: zhenghaoz/gorse-worker restart: unless-stopped @@ -84,6 +95,7 @@ services: GORSE_DATA_STORE: mysql://gorse:gorse_pass@tcp(mysql:3306)/gorse # GORSE_DATA_STORE: postgres://gorse:gorse_pass@postgres/gorse?sslmode=disable # GORSE_DATA_STORE: mongodb://root:password@mongo:27017/gorse?authSource=admin&connect=direct + # GORSE_DATA_STORE: clickhouse://gorse:gorse_pass@clickhouse:8123/gorse command: > -c /etc/gorse/config.toml --log-path /var/log/gorse/master.log @@ -97,6 +109,7 @@ services: - mysql # - postgres # - mongo + # - clickhouse volumes: worker_data: @@ -106,3 +119,4 @@ volumes: mysql_data: # postgres_data: # mongo_data: + # clickhouse_data: diff --git a/go.mod b/go.mod index d03af06a5..ad9b3ee3d 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( github.com/klauspost/cpuid/v2 v2.2.3 github.com/lafikl/consistent v0.0.0-20220512074542-bdd3606bfc3e github.com/lib/pq v1.10.6 + github.com/mailru/go-clickhouse/v2 v2.0.1-0.20221121001540-b259988ad8e5 github.com/mitchellh/mapstructure v1.5.0 github.com/orcaman/concurrent-map v1.0.0 github.com/prometheus/client_golang v1.13.0 @@ -68,6 +69,7 @@ require ( gopkg.in/yaml.v2 v2.4.0 gorgonia.org/gorgonia v0.9.18-0.20230327110624-d1c17944ed22 gorgonia.org/tensor v0.9.23 + gorm.io/driver/clickhouse v0.4.2 gorm.io/driver/mysql v1.3.4 gorm.io/driver/postgres v1.3.5 gorm.io/driver/sqlite v1.3.4 @@ -101,6 +103,7 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v2.0.6+incompatible // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect + github.com/hashicorp/go-version v1.6.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/jackc/chunkreader/v2 v2.0.1 // indirect @@ -179,8 +182,9 @@ require ( modernc.org/token v1.0.1 // indirect ) -replace gorm.io/driver/sqlite v1.3.4 => github.com/gorse-io/sqlite v1.3.3-0.20220713123255-c322aec4e59e - -replace gorgonia.org/tensor v0.9.23 => github.com/gorse-io/tensor v0.0.0-20230617102451-4c006ddc5162 - -replace gorgonia.org/gorgonia v0.9.18-0.20230327110624-d1c17944ed22 => github.com/gorse-io/gorgonia v0.0.0-20230817132253-6dd1dbf95849 +replace ( + gorgonia.org/gorgonia v0.9.18-0.20230327110624-d1c17944ed22 => github.com/gorse-io/gorgonia v0.0.0-20230817132253-6dd1dbf95849 + gorgonia.org/tensor v0.9.23 => github.com/gorse-io/tensor v0.0.0-20230617102451-4c006ddc5162 + gorm.io/driver/clickhouse v0.4.2 => github.com/gorse-io/clickhouse v0.3.3-0.20220715124633-688011a495bb + gorm.io/driver/sqlite v1.3.4 => github.com/gorse-io/sqlite v1.3.3-0.20220713123255-c322aec4e59e +) diff --git a/go.sum b/go.sum index 49488f278..ab7e3b29f 100644 --- a/go.sum +++ b/go.sum @@ -288,6 +288,7 @@ github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S3 github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -298,6 +299,8 @@ github.com/gorgonia/bindgen v0.0.0-20180812032444-09626750019e/go.mod h1:YzKk63P github.com/gorgonia/bindgen v0.0.0-20210223094355-432cd89e7765/go.mod h1:BLHSe436vhQKRfm6wxJgebeK4fDY+ER/8jV3vVH9yYU= github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyCS8BvQ= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorse-io/clickhouse v0.3.3-0.20220715124633-688011a495bb h1:z/oOWE+Vy0PLcwIulZmIug4FtmvE3dJ1YOGprLeHwwY= +github.com/gorse-io/clickhouse v0.3.3-0.20220715124633-688011a495bb/go.mod h1:iILWzbul8U+gsf4kqbheF2QzBmdvVp63mloGGK8emDI= github.com/gorse-io/dashboard v0.0.0-20230729051855-6c53a42d2bd4 h1:x0bLXsLkjEZdztd0Tw+Hx38vIjzabyj2Fk0EDitKcLk= github.com/gorse-io/dashboard v0.0.0-20230729051855-6c53a42d2bd4/go.mod h1:bv2Yg9Pn4Dca4xPJbvibpF6LH6BjoxcjsEdIuojNano= github.com/gorse-io/gorgonia v0.0.0-20230817132253-6dd1dbf95849 h1:Hwywr6NxzYeZYn35KwOsw7j8ZiMT60TBzpbn1MbEido= @@ -309,6 +312,9 @@ github.com/gorse-io/tensor v0.0.0-20230617102451-4c006ddc5162/go.mod h1:1dsOegMm github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 h1:asbCHRVmodnJTuQ3qamDwqVOIjwqUPTYmYuemVOx+Ys= github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0/go.mod h1:ggCgvZ2r7uOoQjOyu2Y1NhHmEPPzzuhWgcza5M1Ji1I= +github.com/hashicorp/go-version v1.5.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= +github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= @@ -442,6 +448,8 @@ github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/mailru/go-clickhouse/v2 v2.0.1-0.20221121001540-b259988ad8e5 h1:JgQ+kJg8uKs6JjnDxnMgkKT4PPH36uU6chpYw2PQc9Q= +github.com/mailru/go-clickhouse/v2 v2.0.1-0.20221121001540-b259988ad8e5/go.mod h1:TwxN829KnFZ7jAka9l9EoCV+U0CBFq83SFev4oLbnNU= github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= diff --git a/server/bench_test.go b/server/bench_test.go index 6439bfadd..e39c83b4b 100644 --- a/server/bench_test.go +++ b/server/bench_test.go @@ -19,16 +19,6 @@ import ( "database/sql" "encoding/json" "fmt" - "math/rand" - "net" - "net/http" - "os" - "runtime" - "strconv" - "strings" - "testing" - "time" - "github.com/emicklei/go-restful/v3" "github.com/go-resty/resty/v2" "github.com/redis/go-redis/v9" @@ -41,6 +31,15 @@ import ( "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "google.golang.org/protobuf/proto" + "math/rand" + "net" + "net/http" + "os" + "runtime" + "strconv" + "strings" + "testing" + "time" ) const ( @@ -61,7 +60,7 @@ func init() { } return defaultValue } - benchDataStore = env("BENCH_DATA_STORE", "mysql://root:password@tcp(127.0.0.1:3306)/") + benchDataStore = env("BENCH_DATA_STORE", "clickhouse://127.0.0.1:8123/") benchCacheStore = env("BENCH_CACHE_STORE", "redis://127.0.0.1:6379/") } @@ -192,6 +191,17 @@ func (s *benchServer) prepareData(b *testing.B, url, benchName string) string { err = db.Close() require.NoError(b, err) return url + strings.ToLower(dbName) + "?sslmode=disable&TimeZone=UTC" + } else if strings.HasPrefix(url, "clickhouse://") { + uri := "http://" + url[len("clickhouse://"):] + db, err := sql.Open("clickhouse", uri) + require.NoError(b, err) + _, err = db.Exec("DROP DATABASE IF EXISTS " + dbName) + require.NoError(b, err) + _, err = db.Exec("CREATE DATABASE " + dbName) + require.NoError(b, err) + err = db.Close() + require.NoError(b, err) + return url + dbName + "?mutations_sync=2" } else if strings.HasPrefix(url, "mongodb://") { ctx := context.Background() cli, err := mongo.Connect(ctx, options.Client().ApplyURI(url+"?authSource=admin&connect=direct")) diff --git a/server/bench_test.sh b/server/bench_test.sh index ad7bacd34..f3e380079 100644 --- a/server/bench_test.sh +++ b/server/bench_test.sh @@ -40,6 +40,9 @@ case $CACHE_ARG in esac case $DATA_ARG in + clickhouse) + export BENCH_DATA_STORE='clickhouse://127.0.0.1:8123/' + ;; mysql) export BENCH_DATA_STORE='mysql://root:password@tcp(127.0.0.1:3306)/' ;; diff --git a/storage/data/database.go b/storage/data/database.go index c0c020fc6..13bf4579e 100644 --- a/storage/data/database.go +++ b/storage/data/database.go @@ -17,6 +17,7 @@ package data import ( "context" "encoding/json" + "net/url" "reflect" "sort" "strings" @@ -33,6 +34,7 @@ import ( "go.mongodb.org/mongo-driver/x/mongo/driver/connstring" "go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo" semconv "go.opentelemetry.io/otel/semconv/v1.12.0" + "gorm.io/driver/clickhouse" "gorm.io/driver/mysql" "gorm.io/driver/postgres" "gorm.io/driver/sqlite" @@ -166,6 +168,10 @@ type Feedback struct { Comment string `gorm:"column:comment" mapsstructure:"comment"` } +type UserFeedback Feedback + +type ItemFeedback Feedback + // SortFeedbacks sorts feedback from latest to oldest. func SortFeedbacks(feedback []Feedback) { sort.Sort(feedbackSorter(feedback)) @@ -244,6 +250,7 @@ type Database interface { Init() error Ping() error Close() error + Optimize() error Purge() error BatchInsertItems(ctx context.Context, items []Item) error BatchGetItems(ctx context.Context, namespace string, itemIds []string) ([]Item, error) @@ -318,6 +325,32 @@ func Open(path, tablePrefix string) (Database, error) { return nil, errors.Trace(err) } return database, nil + } else if strings.HasPrefix(path, storage.ClickhousePrefix) || strings.HasPrefix(path, storage.CHHTTPPrefix) || strings.HasPrefix(path, storage.CHHTTPSPrefix) { + // replace schema + parsed, err := url.Parse(path) + if err != nil { + return nil, errors.Trace(err) + } + if strings.HasPrefix(path, storage.CHHTTPSPrefix) { + parsed.Scheme = "https" + } else { + parsed.Scheme = "http" + } + uri := parsed.String() + database := new(SQLDatabase) + database.driver = ClickHouse + database.TablePrefix = storage.TablePrefix(tablePrefix) + if database.client, err = otelsql.Open("chhttp", uri, + otelsql.WithAttributes(semconv.DBSystemKey.String("clickhouse")), + otelsql.WithSpanOptions(otelsql.SpanOptions{DisableErrSkip: true}), + ); err != nil { + return nil, errors.Trace(err) + } + database.gormDB, err = gorm.Open(clickhouse.New(clickhouse.Config{Conn: database.client}), storage.NewGORMConfig(tablePrefix)) + if err != nil { + return nil, errors.Trace(err) + } + return database, nil } else if strings.HasPrefix(path, storage.MongoPrefix) || strings.HasPrefix(path, storage.MongoSrvPrefix) { // connect to database database := new(MongoDB) diff --git a/storage/data/database_test.go b/storage/data/database_test.go index fd702e3a0..eeea8fc40 100644 --- a/storage/data/database_test.go +++ b/storage/data/database_test.go @@ -35,6 +35,7 @@ var ( positiveFeedbackType = "positiveFeedbackType" negativeFeedbackType = "negativeFeedbackType" duplicateFeedbackType = "duplicateFeedbackType" + dateTime64Zero = time.Date(1900, 1, 1, 0, 0, 0, 0, time.UTC) ) type baseTestSuite struct { @@ -126,6 +127,14 @@ func (suite *baseTestSuite) getFeedbackStream(ctx context.Context, batchSize int return feedbacks } +func (suite *baseTestSuite) isClickHouse() bool { + if sqlDB, isSQL := suite.Database.(*SQLDatabase); !isSQL { + return false + } else { + return sqlDB.driver == ClickHouse + } +} + func (suite *baseTestSuite) TearDownSuite() { err := suite.Database.Close() suite.NoError(err) @@ -183,6 +192,8 @@ func (suite *baseTestSuite) TestUsers() { // test override err = suite.Database.BatchInsertUsers(ctx, []User{{UserId: "1", Comment: "override"}}) suite.NoError(err) + err = suite.Database.Optimize() + suite.NoError(err) user, err = suite.Database.GetUser(ctx, "1") suite.NoError(err) suite.Equal("override", user.Comment) @@ -193,6 +204,8 @@ func (suite *baseTestSuite) TestUsers() { suite.NoError(err) err = suite.Database.ModifyUser(ctx, "1", UserPatch{Subscribe: []string{"d", "e", "f"}}) suite.NoError(err) + err = suite.Database.Optimize() + suite.NoError(err) user, err = suite.Database.GetUser(ctx, "1") suite.NoError(err) suite.Equal("modify", user.Comment) @@ -259,12 +272,19 @@ func (suite *baseTestSuite) TestFeedback() { feedbackFromStream = suite.getFeedbackStream(ctx, 3, WithBeginUserId("1"), WithEndUserId("3"), WithEndTime(time.Now()), WithFeedbackTypes(positiveFeedbackType)) suite.Equal(feedback[1:4], feedbackFromStream) // Get items + err = suite.Database.Optimize() + suite.NoError(err) items := suite.getItems(ctx, 3) suite.Equal(5, len(items)) for i, item := range items { suite.Equal(strconv.Itoa(i*2), item.ItemId) if item.ItemId != "0" { - suite.Zero(item.Timestamp) + if suite.isClickHouse() { + // ClickHouse returns 1900-01-01 00:00:00 +0000 UTC as zero date. + suite.Equal(dateTime64Zero, item.Timestamp) + } else { + suite.Zero(item.Timestamp) + } suite.Empty(item.Labels) suite.Empty(item.Comment) } @@ -291,9 +311,10 @@ func (suite *baseTestSuite) TestFeedback() { // Get typed feedback by user ret, err = suite.Database.GetUserFeedback(ctx, "namespace", "2", lo.ToPtr(time.Now()), positiveFeedbackType) suite.NoError(err) - suite.Equal(1, len(ret)) - suite.Equal("2", ret[0].UserId) - suite.Equal("4", ret[0].ItemId) + if suite.Equal(1, len(ret)) { + suite.Equal("2", ret[0].UserId) + suite.Equal("4", ret[0].ItemId) + } // Get all feedback by user ret, err = suite.Database.GetUserFeedback(ctx, "namespace", "2", lo.ToPtr(time.Now())) suite.NoError(err) @@ -314,6 +335,8 @@ func (suite *baseTestSuite) TestFeedback() { Comment: "override", }}, true, true, true) suite.NoError(err) + err = suite.Database.Optimize() + suite.NoError(err) ret, err = suite.Database.GetUserFeedback(ctx, "namespace", "0", lo.ToPtr(time.Now()), positiveFeedbackType) suite.NoError(err) suite.Equal(1, len(ret)) @@ -324,6 +347,8 @@ func (suite *baseTestSuite) TestFeedback() { Comment: "not_override", }}, true, true, false) suite.NoError(err) + err = suite.Database.Optimize() + suite.NoError(err) ret, err = suite.Database.GetUserFeedback(ctx, "namespace", "0", lo.ToPtr(time.Now()), positiveFeedbackType) suite.NoError(err) suite.Equal(1, len(ret)) @@ -441,6 +466,8 @@ func (suite *baseTestSuite) TestItems() { // test override err = suite.Database.BatchInsertItems(ctx, []Item{{Namespace: "namespace", ItemId: "4", IsHidden: false, Categories: []string{"b"}, Labels: []string{"o"}, Comment: "override"}}) suite.NoError(err) + err = suite.Database.Optimize() + suite.NoError(err) item, err := suite.Database.GetItem(ctx, "namespace", "4") suite.NoError(err) suite.False(item.IsHidden) @@ -460,6 +487,8 @@ func (suite *baseTestSuite) TestItems() { suite.NoError(err) err = suite.Database.ModifyItem(ctx, "namespace", "2", ItemPatch{Timestamp: ×tamp}) suite.NoError(err) + err = suite.Database.Optimize() + suite.NoError(err) item, err = suite.Database.GetItem(ctx, "namespace", "2") suite.NoError(err) suite.True(item.IsHidden) @@ -553,14 +582,21 @@ func (suite *baseTestSuite) TestDeleteFeedback() { // delete user-item feedback deleteCount, err := suite.Database.DeleteUserItemFeedback(ctx, "namespace", "2", "3") suite.NoError(err) - suite.Equal(3, deleteCount) + if !suite.isClickHouse() { + // RowAffected isn't supported by ClickHouse, + suite.Equal(3, deleteCount) + } + err = suite.Database.Optimize() + suite.NoError(err) ret, err = suite.Database.GetUserItemFeedback(ctx, "namespace", "2", "3") suite.NoError(err) suite.Empty(ret) feedbackType1 := "type1" deleteCount, err = suite.Database.DeleteUserItemFeedback(ctx, "namespace", "1", "3", feedbackType1) suite.NoError(err) - suite.Equal(1, deleteCount) + if !suite.isClickHouse() { + suite.Equal(1, deleteCount) + } ret, err = suite.Database.GetUserItemFeedback(ctx, "namespace", "1", "3", feedbackType2) suite.NoError(err) suite.Empty(ret) @@ -670,6 +706,8 @@ func (suite *baseTestSuite) TestTimezone() { suite.NoError(err) err = suite.Database.ModifyItem(ctx, "namespace", "200", ItemPatch{Timestamp: &now}) suite.NoError(err) + err = suite.Database.Optimize() + suite.NoError(err) switch database := suite.Database.(type) { case *SQLDatabase: switch suite.Database.(*SQLDatabase).driver { @@ -680,6 +718,13 @@ func (suite *baseTestSuite) TestTimezone() { item, err = suite.Database.GetItem(ctx, "namespace", "200") suite.NoError(err) suite.Equal(now.Round(time.Microsecond).In(time.UTC), item.Timestamp) + case ClickHouse: + item, err := suite.Database.GetItem(ctx, "namespace", "100") + suite.NoError(err) + suite.Equal(now.Truncate(time.Second).In(time.UTC), item.Timestamp) + item, err = suite.Database.GetItem(ctx, "namespace", "200") + suite.NoError(err) + suite.Equal(now.Truncate(time.Second).In(time.UTC), item.Timestamp) case SQLite: item, err := suite.Database.GetItem(ctx, "namespace", "100") suite.NoError(err) @@ -742,9 +787,9 @@ func (suite *baseTestSuite) TestPurge() { func (suite *baseTestSuite) TestNamespace() { // insert items items := []Item{ - {Namespace: "namespace1", ItemId: "0"}, - {Namespace: "namespace1", ItemId: "1"}, - {Namespace: "namespace2", ItemId: "0"}, + {Namespace: "namespace1", ItemId: "0", Timestamp: time.Date(1996, 3, 15, 0, 0, 0, 0, time.UTC)}, + {Namespace: "namespace1", ItemId: "1", Timestamp: time.Date(1996, 3, 15, 0, 0, 0, 0, time.UTC)}, + {Namespace: "namespace2", ItemId: "0", Timestamp: time.Date(1996, 3, 15, 0, 0, 0, 0, time.UTC)}, } err := suite.Database.BatchInsertItems(context.Background(), items) suite.NoError(err) @@ -754,11 +799,11 @@ func (suite *baseTestSuite) TestNamespace() { // insert feedbacks feedbacks := []Feedback{ - {FeedbackKey: FeedbackKey{"namespace1", "type1", "0", "0"}}, - {FeedbackKey: FeedbackKey{"namespace1", "type1", "0", "1"}}, - {FeedbackKey: FeedbackKey{"namespace2", "type1", "0", "0"}}, - {FeedbackKey: FeedbackKey{"namespace3", "type1", "0", "0"}}, - {FeedbackKey: FeedbackKey{"namespace4", "type1", "0", "0"}}, + {FeedbackKey: FeedbackKey{"namespace1", "type1", "0", "0"}, Timestamp: time.Date(1996, 3, 15, 0, 0, 0, 0, time.UTC)}, + {FeedbackKey: FeedbackKey{"namespace1", "type1", "0", "1"}, Timestamp: time.Date(1996, 3, 15, 0, 0, 0, 0, time.UTC)}, + {FeedbackKey: FeedbackKey{"namespace2", "type1", "0", "0"}, Timestamp: time.Date(1996, 3, 15, 0, 0, 0, 0, time.UTC)}, + {FeedbackKey: FeedbackKey{"namespace3", "type1", "0", "0"}, Timestamp: time.Date(1996, 3, 15, 0, 0, 0, 0, time.UTC)}, + {FeedbackKey: FeedbackKey{"namespace4", "type1", "0", "0"}, Timestamp: time.Date(1996, 3, 15, 0, 0, 0, 0, time.UTC)}, } err = suite.Database.BatchInsertFeedback(context.Background(), feedbacks, true, true, true) suite.NoError(err) diff --git a/storage/data/mongodb.go b/storage/data/mongodb.go index 806234487..9703dbd8c 100644 --- a/storage/data/mongodb.go +++ b/storage/data/mongodb.go @@ -65,6 +65,11 @@ type MongoDB struct { dbName string } +// Optimize is used by ClickHouse only. +func (db *MongoDB) Optimize() error { + return nil +} + // Init collections and indices in MongoDB. func (db *MongoDB) Init() error { ctx := context.Background() diff --git a/storage/data/no_database.go b/storage/data/no_database.go index 7d6d9409b..1096f7459 100644 --- a/storage/data/no_database.go +++ b/storage/data/no_database.go @@ -22,6 +22,11 @@ import ( // NoDatabase means that no database used. type NoDatabase struct{} +// Optimize is used by ClickHouse only. +func (NoDatabase) Optimize() error { + return ErrNoDatabase +} + // Init method of NoDatabase returns ErrNoDatabase. func (NoDatabase) Init() error { return ErrNoDatabase diff --git a/storage/data/no_database_test.go b/storage/data/no_database_test.go index 91d66fa2b..806a1ce77 100644 --- a/storage/data/no_database_test.go +++ b/storage/data/no_database_test.go @@ -16,11 +16,10 @@ package data import ( "context" - "testing" - "time" - "github.com/samber/lo" "github.com/stretchr/testify/assert" + "testing" + "time" ) func TestNoDatabase(t *testing.T) { @@ -29,6 +28,8 @@ func TestNoDatabase(t *testing.T) { err := database.Close() assert.ErrorIs(t, err, ErrNoDatabase) + err = database.Optimize() + assert.ErrorIs(t, err, ErrNoDatabase) err = database.Init() assert.ErrorIs(t, err, ErrNoDatabase) err = database.Ping() diff --git a/storage/data/sql.go b/storage/data/sql.go index 0dd5c1516..af1ee0bba 100644 --- a/storage/data/sql.go +++ b/storage/data/sql.go @@ -26,6 +26,7 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/juju/errors" _ "github.com/lib/pq" + _ "github.com/mailru/go-clickhouse/v2" "github.com/samber/lo" "github.com/zhenghaoz/gorse/base/jsonutil" "github.com/zhenghaoz/gorse/base/log" @@ -42,6 +43,7 @@ type SQLDriver int const ( MySQL SQLDriver = iota Postgres + ClickHouse SQLite ) @@ -87,6 +89,34 @@ func NewSQLUser(user User) (sqlUser SQLUser) { return } +type ClickHouseItem struct { + SQLItem `gorm:"embedded"` + Version time.Time `gorm:"column:version"` +} + +func NewClickHouseItem(item Item) (clickHouseItem ClickHouseItem) { + clickHouseItem.SQLItem = NewSQLItem(item) + clickHouseItem.Timestamp = item.Timestamp.In(time.UTC) + clickHouseItem.Version = time.Now().In(time.UTC) + return +} + +type ClickhouseUser struct { + SQLUser `gorm:"embedded"` + Version time.Time `gorm:"column:version"` +} + +func NewClickhouseUser(user User) (clickhouseUser ClickhouseUser) { + clickhouseUser.SQLUser = NewSQLUser(user) + clickhouseUser.Version = time.Now().In(time.UTC) + return +} + +type ClickHouseFeedback struct { + Feedback `gorm:"embedded"` + Version time.Time `gorm:"column:version"` +} + // SQLDatabase use MySQL as data storage. type SQLDatabase struct { storage.TablePrefix @@ -95,6 +125,19 @@ type SQLDatabase struct { driver SQLDriver } +// Optimize is used by ClickHouse only. +func (d *SQLDatabase) Optimize() error { + if d.driver == ClickHouse { + for _, tableName := range []string{d.UsersTable(), d.ItemsTable(), d.FeedbackTable(), d.UserFeedbackTable(), d.ItemFeedbackTable()} { + _, err := d.client.Exec("OPTIMIZE TABLE " + tableName) + if err != nil { + return errors.Trace(err) + } + } + } + return nil +} + // Init tables and indices in MySQL. func (d *SQLDatabase) Init() error { switch d.driver { @@ -185,6 +228,73 @@ func (d *SQLDatabase) Init() error { if err != nil { return errors.Trace(err) } + case ClickHouse: + // create tables + type Items struct { + Namespace string `gorm:"column:namespace;type:String"` + ItemId string `gorm:"column:item_id;type:String"` + IsHidden int `gorm:"column:is_hidden;type:Boolean;default:0"` + Categories string `gorm:"column:categories;type:String;default:'[]'"` + Timestamp time.Time `gorm:"column:time_stamp;type:Datetime64(9,'UTC')"` + Labels string `gorm:"column:labels;type:String;default:'[]'"` + Comment string `gorm:"column:comment;type:String"` + Version struct{} `gorm:"column:version;type:DateTime"` + } + err := d.gormDB.Set("gorm:table_options", "ENGINE = ReplacingMergeTree(version) ORDER BY (namespace, item_id)").AutoMigrate(Items{}) + if err != nil { + return errors.Trace(err) + } + type Users struct { + UserId string `gorm:"column:user_id;type:String"` + Labels string `gorm:"column:labels;type:String;default:'[]'"` + Subscribe string `gorm:"column:subscribe;type:String;default:'[]'"` + Comment string `gorm:"column:comment;type:String"` + Version struct{} `gorm:"column:version;type:DateTime"` + } + err = d.gormDB.Set("gorm:table_options", "ENGINE = ReplacingMergeTree(version) ORDER BY user_id").AutoMigrate(Users{}) + if err != nil { + return errors.Trace(err) + } + type Feedback struct { + Namespace string `gorm:"column:namespace;type:String"` + FeedbackType string `gorm:"column:feedback_type;type:String"` + UserId string `gorm:"column:user_id;type:String"` + ItemId string `gorm:"column:item_id;type:String"` + Timestamp time.Time `gorm:"column:time_stamp;type:DateTime64(9,'UTC')"` + Comment string `gorm:"column:comment;type:String"` + Version struct{} `gorm:"column:version;type:DateTime"` + } + err = d.gormDB.Set("gorm:table_options", + "ENGINE = ReplacingMergeTree(version) ORDER BY (namespace, feedback_type, user_id, item_id)"). + AutoMigrate(Feedback{}) + if err != nil { + return errors.Trace(err) + } + // create materialized views + type UserFeedback Feedback + err = d.gormDB.Set("gorm:table_options", + "ENGINE = ReplacingMergeTree(version) ORDER BY (user_id, namespace, item_id, feedback_type)"). + AutoMigrate(UserFeedback{}) + if err != nil { + return errors.Trace(err) + } + err = d.gormDB.Exec(fmt.Sprintf("CREATE MATERIALIZED VIEW IF NOT EXISTS %s_mv TO %s AS SELECT * FROM %s", + d.UserFeedbackTable(), d.UserFeedbackTable(), d.FeedbackTable())).Error + if err != nil { + return errors.Trace(err) + } + type ItemFeedback Feedback + err = d.gormDB.Set("gorm:table_options", + "ENGINE = ReplacingMergeTree(version) ORDER BY (namespace, item_id, user_id, feedback_type)"). + AutoMigrate(ItemFeedback{}) + if err != nil { + return errors.Trace(err) + } + err = d.gormDB.Exec(fmt.Sprintf("CREATE MATERIALIZED VIEW IF NOT EXISTS %s_mv TO %s AS SELECT * FROM %s", + d.ItemFeedbackTable(), d.ItemFeedbackTable(), d.FeedbackTable())).Error + if err != nil { + return errors.Trace(err) + } } return nil } @@ -199,11 +309,21 @@ func (d *SQLDatabase) Close() error { } func (d *SQLDatabase) Purge() error { - tables := []string{d.ItemsTable(), d.FeedbackTable(), d.UsersTable()} - for _, tableName := range tables { - err := d.gormDB.Exec(fmt.Sprintf("DELETE FROM %s", tableName)).Error - if err != nil { - return errors.Trace(err) + if d.driver == ClickHouse { + tables := []string{d.ItemsTable(), d.FeedbackTable(), d.UsersTable(), d.UserFeedbackTable(), d.ItemFeedbackTable()} + for _, tableName := range tables { + err := d.gormDB.Exec(fmt.Sprintf("alter table %s delete where 1=1", tableName)).Error + if err != nil { + return errors.Trace(err) + } + } + } else { + tables := []string{d.ItemsTable(), d.FeedbackTable(), d.UsersTable()} + for _, tableName := range tables { + err := d.gormDB.Exec(fmt.Sprintf("DELETE FROM %s", tableName)).Error + if err != nil { + return errors.Trace(err) + } } } return nil @@ -214,23 +334,36 @@ func (d *SQLDatabase) BatchInsertItems(ctx context.Context, items []Item) error if len(items) == 0 { return nil } - rows := make([]SQLItem, 0, len(items)) - memo := mapset.NewSet[ItemUID]() - for _, item := range items { - if !memo.Contains(item.ItemUID()) { - memo.Add(item.ItemUID()) - row := NewSQLItem(item) - if d.driver == SQLite { - row.Timestamp = row.Timestamp.In(time.UTC) + if d.driver == ClickHouse { + rows := make([]ClickHouseItem, 0, len(items)) + memo := mapset.NewSet[ItemUID]() + for _, item := range items { + if !memo.Contains(item.ItemUID()) { + memo.Add(item.ItemUID()) + rows = append(rows, NewClickHouseItem(item)) } - rows = append(rows, row) } + err := d.gormDB.Create(rows).Error + return errors.Trace(err) + } else { + rows := make([]SQLItem, 0, len(items)) + memo := mapset.NewSet[ItemUID]() + for _, item := range items { + if !memo.Contains(item.ItemUID()) { + memo.Add(item.ItemUID()) + row := NewSQLItem(item) + if d.driver == SQLite { + row.Timestamp = row.Timestamp.In(time.UTC) + } + rows = append(rows, row) + } + } + err := d.gormDB.WithContext(ctx).Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "namespace"}, {Name: "item_id"}}, + DoUpdates: clause.AssignmentColumns([]string{"is_hidden", "categories", "time_stamp", "labels", "comment"}), + }).Create(rows).Error + return errors.Trace(err) } - err := d.gormDB.WithContext(ctx).Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "namespace"}, {Name: "item_id"}}, - DoUpdates: clause.AssignmentColumns([]string{"is_hidden", "categories", "time_stamp", "labels", "comment"}), - }).Create(rows).Error - return errors.Trace(err) } func (d *SQLDatabase) BatchGetItems(ctx context.Context, namespace string, itemIds []string) ([]Item, error) { @@ -257,16 +390,22 @@ func (d *SQLDatabase) BatchGetItems(ctx context.Context, namespace string, itemI // DeleteItem deletes a item from MySQL. func (d *SQLDatabase) DeleteItem(ctx context.Context, namespace string, itemId string) error { - if err := d.gormDB.WithContext(ctx).Delete(&SQLItem{ - Namespace: namespace, - ItemId: itemId, - }).Error; err != nil { + if err := d.gormDB.WithContext(ctx). + Delete(&SQLItem{}, "namespace = ? and item_id = ?", namespace, itemId).Error; err != nil { return errors.Trace(err) } if err := d.gormDB.WithContext(ctx). Delete(&Feedback{}, "namespace = ? and item_id = ?", namespace, itemId).Error; err != nil { return errors.Trace(err) } + if d.driver == ClickHouse { + if err := d.gormDB.WithContext(ctx).Delete(&ItemFeedback{}, "item_id = ?", itemId).Error; err != nil { + return errors.Trace(err) + } + if err := d.gormDB.WithContext(ctx).Delete(&UserFeedback{}, "item_id = ?", itemId).Error; err != nil { + return errors.Trace(err) + } + } return nil } @@ -319,7 +458,7 @@ func (d *SQLDatabase) ModifyItem(ctx context.Context, namespace string, itemId s } if patch.Timestamp != nil { switch d.driver { - case SQLite: + case ClickHouse, SQLite: attributes["time_stamp"] = patch.Timestamp.In(time.UTC) default: attributes["time_stamp"] = patch.Timestamp @@ -420,11 +559,18 @@ func (d *SQLDatabase) GetItemStream(ctx context.Context, batchSize int, timeLimi // GetItemFeedback returns feedback of a item from MySQL. func (d *SQLDatabase) GetItemFeedback(ctx context.Context, namespace string, itemId string, feedbackTypes ...string) ([]Feedback, error) { - tx := d.gormDB.WithContext(ctx).Table(d.FeedbackTable()). - Select("namespace, user_id, item_id, feedback_type, time_stamp") + tx := d.gormDB.WithContext(ctx) + if d.driver == ClickHouse { + tx = tx.Table(d.ItemFeedbackTable()) + } else { + tx = tx.Table(d.FeedbackTable()) + } + tx = tx.Select("namespace, user_id, item_id, feedback_type, time_stamp") switch d.driver { case SQLite: tx.Where("time_stamp <= DATETIME() AND namespace = ? AND item_id = ?", namespace, itemId) + case ClickHouse: + tx.Where("time_stamp <= NOW('UTC') AND namespace = ? AND item_id = ?", namespace, itemId) default: tx.Where("time_stamp <= NOW() AND namespace = ? AND item_id = ?", namespace, itemId) } @@ -452,19 +598,32 @@ func (d *SQLDatabase) BatchInsertUsers(ctx context.Context, users []User) error if len(users) == 0 { return nil } - rows := make([]SQLUser, 0, len(users)) - memo := mapset.NewSet[string]() - for _, user := range users { - if !memo.Contains(user.UserId) { - memo.Add(user.UserId) - rows = append(rows, NewSQLUser(user)) + if d.driver == ClickHouse { + rows := make([]ClickhouseUser, 0, len(users)) + memo := mapset.NewSet[string]() + for _, user := range users { + if !memo.Contains(user.UserId) { + memo.Add(user.UserId) + rows = append(rows, NewClickhouseUser(user)) + } + } + err := d.gormDB.Create(rows).Error + return errors.Trace(err) + } else { + rows := make([]SQLUser, 0, len(users)) + memo := mapset.NewSet[string]() + for _, user := range users { + if !memo.Contains(user.UserId) { + memo.Add(user.UserId) + rows = append(rows, NewSQLUser(user)) + } } + err := d.gormDB.WithContext(ctx).Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "user_id"}}, + DoUpdates: clause.AssignmentColumns([]string{"labels", "subscribe", "comment"}), + }).Create(rows).Error + return errors.Trace(err) } - err := d.gormDB.WithContext(ctx).Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "user_id"}}, - DoUpdates: clause.AssignmentColumns([]string{"labels", "subscribe", "comment"}), - }).Create(rows).Error - return errors.Trace(err) } // DeleteUser deletes a user from MySQL. @@ -475,6 +634,14 @@ func (d *SQLDatabase) DeleteUser(ctx context.Context, userId string) error { if err := d.gormDB.WithContext(ctx).Delete(&Feedback{}, "user_id = ?", userId).Error; err != nil { return errors.Trace(err) } + if d.driver == ClickHouse { + if err := d.gormDB.WithContext(ctx).Delete(&ItemFeedback{}, "user_id = ?", userId).Error; err != nil { + return errors.Trace(err) + } + if err := d.gormDB.WithContext(ctx).Delete(&UserFeedback{}, "user_id = ?", userId).Error; err != nil { + return errors.Trace(err) + } + } return nil } @@ -590,8 +757,13 @@ func (d *SQLDatabase) GetUserStream(ctx context.Context, batchSize int) (chan [] // GetUserFeedback returns feedback of a user from MySQL. func (d *SQLDatabase) GetUserFeedback(ctx context.Context, namespace string, userId string, endTime *time.Time, feedbackTypes ...string) ([]Feedback, error) { - tx := d.gormDB.WithContext(ctx).Table(d.FeedbackTable()). - Select("namespace, feedback_type, user_id, item_id, time_stamp, comment"). + tx := d.gormDB.WithContext(ctx) + if d.driver == ClickHouse { + tx = tx.Table(d.UserFeedbackTable()) + } else { + tx = tx.Table(d.FeedbackTable()) + } + tx = tx.Select("namespace, feedback_type, user_id, item_id, time_stamp, comment"). Where("namespace = ? and user_id = ?", namespace, userId) if endTime != nil { tx.Where("time_stamp <= ?", d.convertTimeZone(endTime)) @@ -634,18 +806,33 @@ func (d *SQLDatabase) BatchInsertFeedback(ctx context.Context, feedback []Feedba // insert users if insertUser { userList := users.ToSlice() - err := tx.Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "user_id"}}, - DoNothing: true, - }).Create(lo.Map(userList, func(userId string, _ int) SQLUser { - return SQLUser{ - UserId: userId, - Labels: "null", - Subscribe: "null", + if d.driver == ClickHouse { + err := tx.Create(lo.Map(userList, func(userId string, _ int) ClickhouseUser { + return ClickhouseUser{ + SQLUser: SQLUser{ + UserId: userId, + Labels: "[]", + Subscribe: "[]", + }, + } + })).Error + if err != nil { + return errors.Trace(err) + } + } else { + err := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "user_id"}}, + DoNothing: true, + }).Create(lo.Map(userList, func(userId string, _ int) SQLUser { + return SQLUser{ + UserId: userId, + Labels: "null", + Subscribe: "null", + } + })).Error + if err != nil { + return errors.Trace(err) } - })).Error - if err != nil { - return errors.Trace(err) } } else { for _, user := range users.ToSlice() { @@ -663,22 +850,38 @@ func (d *SQLDatabase) BatchInsertFeedback(ctx context.Context, feedback []Feedba // insert items if insertItem { itemList := items.ToSlice() - err := tx.Clauses(clause.OnConflict{ - Columns: []clause.Column{ - {Name: "namespace"}, - {Name: "item_id"}, - }, - DoNothing: true, - }).Create(lo.Map(itemList, func(uid ItemUID, _ int) SQLItem { - return SQLItem{ - Namespace: uid.Namespace, - ItemId: uid.ItemId, - Labels: "null", - Categories: "null", + if d.driver == ClickHouse { + err := tx.Create(lo.Map(itemList, func(uid ItemUID, _ int) ClickHouseItem { + return ClickHouseItem{ + SQLItem: SQLItem{ + Namespace: uid.Namespace, + ItemId: uid.ItemId, + Labels: "[]", + Categories: "[]", + }, + } + })).Error + if err != nil { + return errors.Trace(err) + } + } else { + err := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{ + {Name: "namespace"}, + {Name: "item_id"}, + }, + DoNothing: true, + }).Create(lo.Map(itemList, func(uid ItemUID, _ int) SQLItem { + return SQLItem{ + Namespace: uid.Namespace, + ItemId: uid.ItemId, + Labels: "null", + Categories: "null", + } + })).Error + if err != nil { + return errors.Trace(err) } - })).Error - if err != nil { - return errors.Trace(err) } } else { for _, item := range items.ToSlice() { @@ -696,33 +899,55 @@ func (d *SQLDatabase) BatchInsertFeedback(ctx context.Context, feedback []Feedba } } // insert feedback - rows := make([]Feedback, 0, len(feedback)) - memo := mapset.NewSet[FeedbackKey]() - for _, f := range feedback { - if users.Contains(f.UserId) && items.Contains(f.ItemUID()) { - if !memo.Contains(f.FeedbackKey) { - memo.Add(f.FeedbackKey) - if d.driver == SQLite { + if d.driver == ClickHouse { + rows := make([]ClickHouseFeedback, 0, len(feedback)) + memo := mapset.NewSet[FeedbackKey]() + for _, f := range feedback { + if users.Contains(f.UserId) && items.Contains(f.ItemUID()) { + if !memo.Contains(f.FeedbackKey) { + memo.Add(f.FeedbackKey) f.Timestamp = f.Timestamp.In(time.UTC) + rows = append(rows, ClickHouseFeedback{ + Feedback: f, + Version: lo.If(overwrite, time.Now().In(time.UTC)).Else(time.Time{}), + }) } - rows = append(rows, f) } } + if len(rows) == 0 { + return nil + } + err := tx.Create(rows).Error + return errors.Trace(err) + } else { + rows := make([]Feedback, 0, len(feedback)) + memo := mapset.NewSet[FeedbackKey]() + for _, f := range feedback { + if users.Contains(f.UserId) && items.Contains(f.ItemUID()) { + if !memo.Contains(f.FeedbackKey) { + memo.Add(f.FeedbackKey) + if d.driver == SQLite { + f.Timestamp = f.Timestamp.In(time.UTC) + } + rows = append(rows, f) + } + } + } + if len(rows) == 0 { + return nil + } + err := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{ + {Name: "namespace"}, + {Name: "feedback_type"}, + {Name: "user_id"}, + {Name: "item_id"}, + }, + DoNothing: !overwrite, + DoUpdates: lo.If(overwrite, clause.AssignmentColumns([]string{"time_stamp", "comment"})).Else(nil), + }).Create(rows).Error + return errors.Trace(err) } - if len(rows) == 0 { - return nil - } - err := tx.Clauses(clause.OnConflict{ - Columns: []clause.Column{ - {Name: "namespace"}, - {Name: "feedback_type"}, - {Name: "user_id"}, - {Name: "item_id"}, - }, - DoNothing: !overwrite, - DoUpdates: lo.If(overwrite, clause.AssignmentColumns([]string{"time_stamp", "comment"})).Else(nil), - }).Create(rows).Error - return errors.Trace(err) } // GetFeedback returns feedback from MySQL. @@ -833,8 +1058,13 @@ func (d *SQLDatabase) GetFeedbackStream(ctx context.Context, batchSize int, scan // GetUserItemFeedback gets a feedback by user id and item id from MySQL. func (d *SQLDatabase) GetUserItemFeedback(ctx context.Context, namespace, userId, itemId string, feedbackTypes ...string) ([]Feedback, error) { - tx := d.gormDB.WithContext(ctx).Table(d.FeedbackTable()). - Select("namespace, feedback_type, user_id, item_id, time_stamp, comment"). + tx := d.gormDB.WithContext(ctx) + if d.driver == ClickHouse { + tx = tx.Table(d.UserFeedbackTable()) + } else { + tx = tx.Table(d.FeedbackTable()) + } + tx = tx.Select("namespace, feedback_type, user_id, item_id, time_stamp, comment"). Where("namespace = ? AND user_id = ? AND item_id = ?", namespace, userId, itemId) if len(feedbackTypes) > 0 { tx.Where("feedback_type IN ?", feedbackTypes) @@ -857,23 +1087,37 @@ func (d *SQLDatabase) GetUserItemFeedback(ctx context.Context, namespace, userId // DeleteUserItemFeedback deletes a feedback by user id and item id from MySQL. func (d *SQLDatabase) DeleteUserItemFeedback(ctx context.Context, namespace, userId, itemId string, feedbackTypes ...string) (int, error) { - tx := d.gormDB.WithContext(ctx).Where("namespace = ? AND user_id = ? AND item_id = ?", namespace, userId, itemId) - if len(feedbackTypes) > 0 { - tx.Where("feedback_type IN ?", feedbackTypes) + deleteUserItemFeedback := func(value any) (int, error) { + tx := d.gormDB.WithContext(ctx).Where("namespace = ? AND user_id = ? AND item_id = ?", namespace, userId, itemId) + if len(feedbackTypes) > 0 { + tx.Where("feedback_type IN ?", feedbackTypes) + } + tx.Delete(value) + if tx.Error != nil { + return 0, errors.Trace(tx.Error) + } + return int(tx.RowsAffected), nil } - tx.Delete(&Feedback{}) - if tx.Error != nil { - return 0, errors.Trace(tx.Error) + rowAffected, err := deleteUserItemFeedback(&Feedback{}) + if err != nil { + return 0, errors.Trace(err) } - if tx.Error != nil { - return 0, errors.Trace(tx.Error) + if d.driver == ClickHouse { + _, err = deleteUserItemFeedback(&UserFeedback{}) + if err != nil { + return 0, errors.Trace(err) + } + _, err = deleteUserItemFeedback(&ItemFeedback{}) + if err != nil { + return 0, errors.Trace(err) + } } - return int(tx.RowsAffected), nil + return rowAffected, nil } func (d *SQLDatabase) convertTimeZone(timestamp *time.Time) time.Time { switch d.driver { - case SQLite: + case ClickHouse, SQLite: return timestamp.In(time.UTC) default: return *timestamp diff --git a/storage/data/sql_test.go b/storage/data/sql_test.go index dccff5bd8..98836cd37 100644 --- a/storage/data/sql_test.go +++ b/storage/data/sql_test.go @@ -27,8 +27,9 @@ import ( ) var ( - mySqlDSN string - postgresDSN string + mySqlDSN string + postgresDSN string + clickhouseDSN string ) func init() { @@ -41,6 +42,7 @@ func init() { } mySqlDSN = env("MYSQL_URI", "mysql://root:password@tcp(127.0.0.1:3306)/") postgresDSN = env("POSTGRES_URI", "postgres://gorse:gorse_pass@127.0.0.1/") + clickhouseDSN = env("CLICKHOUSE_URI", "clickhouse://120.55.97.224:8123/") } type MySQLTestSuite struct { @@ -106,6 +108,34 @@ func TestPostgres(t *testing.T) { suite.Run(t, new(PostgresTestSuite)) } +type ClickHouseTestSuite struct { + baseTestSuite +} + +func (suite *ClickHouseTestSuite) SetupSuite() { + var err error + // create database + databaseComm, err := sql.Open("chhttp", "http://"+clickhouseDSN[len(storage.ClickhousePrefix):]) + suite.NoError(err) + const dbName = "gorse_data_test" + _, err = databaseComm.Exec("DROP DATABASE IF EXISTS " + dbName) + suite.NoError(err) + _, err = databaseComm.Exec("CREATE DATABASE " + dbName) + suite.NoError(err) + err = databaseComm.Close() + suite.NoError(err) + // connect database + suite.Database, err = Open(clickhouseDSN+dbName+"?mutations_sync=2", "gorse_") + suite.NoError(err) + // create schema + err = suite.Database.Init() + suite.NoError(err) +} + +func TestClickHouse(t *testing.T) { + suite.Run(t, new(ClickHouseTestSuite)) +} + type SQLiteTestSuite struct { baseTestSuite } diff --git a/storage/docker-compose.yml b/storage/docker-compose.yml index 18d768152..fa3f7edfb 100644 --- a/storage/docker-compose.yml +++ b/storage/docker-compose.yml @@ -30,3 +30,8 @@ services: environment: MONGO_INITDB_ROOT_USERNAME: root MONGO_INITDB_ROOT_PASSWORD: password + + clickhouse: + image: clickhouse/clickhouse-server:22 + ports: + - 8123:8123 diff --git a/storage/scheme.go b/storage/scheme.go index 6fb1d5f5f..12c22a3f7 100644 --- a/storage/scheme.go +++ b/storage/scheme.go @@ -34,6 +34,9 @@ const ( MongoSrvPrefix = "mongodb+srv://" PostgresPrefix = "postgres://" PostgreSQLPrefix = "postgresql://" + ClickhousePrefix = "clickhouse://" + CHHTTPPrefix = "chhttp://" + CHHTTPSPrefix = "chhttps://" SQLitePrefix = "sqlite://" RedisPrefix = "redis://" RedissPrefix = "rediss://" @@ -122,6 +125,16 @@ func (tp TablePrefix) FeedbackTable() string { return string(tp) + "feedback" } +// UserFeedbackTable returns the materialized view of user feedback. +func (tp TablePrefix) UserFeedbackTable() string { + return string(tp) + "user_feedback" +} + +// ItemFeedbackTable returns the materialized view of item feedback. +func (tp TablePrefix) ItemFeedbackTable() string { + return string(tp) + "item_feedback" +} + func (tp TablePrefix) Key(key string) string { return string(tp) + key } @@ -143,6 +156,9 @@ func NewGORMConfig(tablePrefix string) *gorm.Config { "SQLScore", "Scores", "PostgresScore", "Scores", "TimeSeriesPoint", "time_series_points", + "ClickhouseUser", "Users", + "ClickHouseItem", "Items", + "ClickHouseFeedback", "Feedback", ), }, }