Skip to content

Commit

Permalink
Implement release connection in batch (#1062)
Browse files Browse the repository at this point in the history
* Implement release connetion

* Add example

* Improve API and fix code review issues

* Add prepare batch section to README.md
  • Loading branch information
EpicStep authored Aug 10, 2023
1 parent bd62571 commit 9ffd7d0
Show file tree
Hide file tree
Showing 11 changed files with 289 additions and 34 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ The client is tested against the currently [supported versions](https://github.c
* Connection pool
* Failover and load balancing
* [Bulk write support](examples/clickhouse_api/batch.go) (for `database/sql` [use](examples/std/batch.go) `begin->prepare->(in loop exec)->commit`)
* [PrepareBatch options](#preparebatch-options)
* [AsyncInsert](benchmark/v2/write-async/main.go) (more details in [Async insert](#async-insert) section)
* Named and numeric placeholders support
* LZ4/ZSTD compression support
Expand Down Expand Up @@ -281,6 +282,11 @@ HTTP protocol supports batching. It can be enabled by setting `async_insert` whe

For more details please see [asynchronous inserts](https://clickhouse.com/docs/en/optimize/asynchronous-inserts#enabling-asynchronous-inserts) documentation.

## PrepareBatch options

Available options:
- [WithReleaseConnection](examples/clickhouse_api/batch_release_connection.go) - after PrepareBatch connection will be returned to the pool. It can help you make a long-lived batch.

## Benchmark

| [V1 (READ)](benchmark/v1/read/main.go) | [V2 (READ) std](benchmark/v2/read/main.go) | [V2 (READ) clickhouse API](benchmark/v2/read-native/main.go) |
Expand All @@ -305,6 +311,7 @@ go get -u github.com/ClickHouse/clickhouse-go/v2
### native interface

* [batch](examples/clickhouse_api/batch.go)
* [batch with release connection](examples/clickhouse_api/batch_release_connection.go)
* [async insert](examples/clickhouse_api/async.go)
* [batch struct](examples/clickhouse_api/append_struct.go)
* [columnar](examples/clickhouse_api/columnar_insert.go)
Expand Down
14 changes: 12 additions & 2 deletions clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,18 +153,28 @@ func (ch *clickhouse) Exec(ctx context.Context, query string, args ...any) error
return nil
}

func (ch *clickhouse) PrepareBatch(ctx context.Context, query string) (driver.Batch, error) {
func (ch *clickhouse) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) {
conn, err := ch.acquire(ctx)
if err != nil {
return nil, err
}
batch, err := conn.prepareBatch(ctx, query, ch.release, ch.acquire)
batch, err := conn.prepareBatch(ctx, query, getPrepareBatchOptions(opts...), ch.release, ch.acquire)
if err != nil {
return nil, err
}
return batch, nil
}

func getPrepareBatchOptions(opts ...driver.PrepareBatchOption) driver.PrepareBatchOptions {
var options driver.PrepareBatchOptions

for _, opt := range opts {
opt(&options)
}

return options
}

func (ch *clickhouse) AsyncInsert(ctx context.Context, query string, wait bool, args ...any) error {
conn, err := ch.acquire(ctx)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions clickhouse_std.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ type stdConnect interface {
query(ctx context.Context, release func(*connect, error), query string, args ...any) (*rows, error)
exec(ctx context.Context, query string, args ...any) error
ping(ctx context.Context) (err error)
prepareBatch(ctx context.Context, query string, release func(*connect, error), acquire func(context.Context) (*connect, error)) (ldriver.Batch, error)
prepareBatch(ctx context.Context, query string, options ldriver.PrepareBatchOptions, release func(*connect, error), acquire func(context.Context) (*connect, error)) (ldriver.Batch, error)
asyncInsert(ctx context.Context, query string, wait bool, args ...any) error
}

Expand Down Expand Up @@ -270,7 +270,7 @@ func (std *stdDriver) Prepare(query string) (driver.Stmt, error) {
}

func (std *stdDriver) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
batch, err := std.conn.prepareBatch(ctx, query, func(*connect, error) {}, func(context.Context) (*connect, error) { return nil, nil })
batch, err := std.conn.prepareBatch(ctx, query, ldriver.PrepareBatchOptions{}, func(*connect, error) {}, func(context.Context) (*connect, error) { return nil, nil })
if err != nil {
if isConnBrokenError(err) {
std.debugf("PrepareContext got a fatal error, resetting connection: %v\n", err)
Expand Down
66 changes: 39 additions & 27 deletions conn_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
var splitInsertRe = regexp.MustCompile(`(?i)\sVALUES\s*\(`)
var columnMatch = regexp.MustCompile(`.*\((?P<Columns>.+)\)$`)

func (c *connect) prepareBatch(ctx context.Context, query string, release func(*connect, error), acquire func(context.Context) (*connect, error)) (driver.Batch, error) {
func (c *connect) prepareBatch(ctx context.Context, query string, opts driver.PrepareBatchOptions, release func(*connect, error), acquire func(context.Context) (*connect, error)) (driver.Batch, error) {
//defer func() {
// if err := recover(); err != nil {
// fmt.Printf("panic occurred on %d:\n", c.num)
Expand Down Expand Up @@ -74,7 +74,8 @@ func (c *connect) prepareBatch(ctx context.Context, query string, release func(*
if err = block.SortColumns(columns); err != nil {
return nil, err
}
return &batch{

b := &batch{
ctx: ctx,
query: query,
conn: c,
Expand All @@ -83,16 +84,22 @@ func (c *connect) prepareBatch(ctx context.Context, query string, release func(*
connRelease: release,
connAcquire: acquire,
onProcess: onProcess,
}, nil
}

if opts.ReleaseConnection {
b.release(b.closeQuery())
}

return b, nil
}

type batch struct {
err error
ctx context.Context
query string
conn *connect
sent bool
released bool
sent bool // sent signalize that batch is send to ClickHouse.
released bool // released signalize that conn was returned to pool and can't be used.
block *proto.Block
connRelease func(*connect, error)
connAcquire func(context.Context) (*connect, error)
Expand Down Expand Up @@ -175,47 +182,35 @@ func (b *batch) Send() (err error) {
b.sent = true
b.release(err)
}()
if b.sent {
return b.retry()
}
if b.err != nil {
return b.err
}
if b.sent || b.released {
if err = b.resetConnection(); err != nil {
return err
}
}
if b.block.Rows() != 0 {
if err = b.conn.sendData(b.block, ""); err != nil {
return err
}
}
if err = b.conn.sendData(&proto.Block{}, ""); err != nil {
return err
}
if err = b.conn.process(b.ctx, b.onProcess); err != nil {
if err = b.closeQuery(); err != nil {
return err
}
return nil
}

func (b *batch) retry() (err error) {
// exit early if Send() hasn't been attepted
if !b.sent {
return ErrBatchNotSent
}

if err = b.resetConnection(); err != nil {
return err
}

b.sent = false
b.released = false
return b.Send()
}

func (b *batch) resetConnection() (err error) {
// acquire a new conn
if b.conn, err = b.connAcquire(b.ctx); err != nil {
return err
}

defer func() {
b.released = false
}()

options := queryOptions(b.ctx)
if deadline, ok := b.ctx.Deadline(); ok {
b.conn.conn.SetDeadline(deadline)
Expand All @@ -242,6 +237,11 @@ func (b *batch) Flush() error {
if b.err != nil {
return b.err
}
if b.released {
if err := b.resetConnection(); err != nil {
return err
}
}
if b.block.Rows() != 0 {
if err := b.conn.sendData(b.block, ""); err != nil {
return err
Expand All @@ -255,6 +255,18 @@ func (b *batch) Rows() int {
return b.block.Rows()
}

func (b *batch) closeQuery() error {
if err := b.conn.sendData(&proto.Block{}, ""); err != nil {
return err
}

if err := b.conn.process(b.ctx, b.onProcess); err != nil {
return err
}

return nil
}

type batchColumn struct {
err error
batch driver.Batch
Expand Down
5 changes: 3 additions & 2 deletions conn_http_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ import (
// \x60 represents a backtick
var httpInsertRe = regexp.MustCompile(`(?i)^INSERT INTO\s+\x60?([\w.^\(]+)\x60?\s*(\([^\)]*\))?`)

// release is ignored, because http used by std with empty release function
func (h *httpConnect) prepareBatch(ctx context.Context, query string, release func(*connect, error), acquire func(context.Context) (*connect, error)) (driver.Batch, error) {
// release is ignored, because http used by std with empty release function.
// Also opts ignored because all options unused in http batch.
func (h *httpConnect) prepareBatch(ctx context.Context, query string, opts driver.PrepareBatchOptions, release func(*connect, error), acquire func(context.Context) (*connect, error)) (driver.Batch, error) {
matches := httpInsertRe.FindStringSubmatch(query)
if len(matches) < 3 {
return nil, errors.New("cannot get table name from query")
Expand Down
125 changes: 125 additions & 0 deletions examples/clickhouse_api/batch_release_connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Licensed to ClickHouse, Inc. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. ClickHouse, Inc. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package clickhouse_api

import (
"context"
"errors"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
)

func BatchWithReleaseConnection() error {
conn, err := GetNativeConnection(nil, nil, nil)
if err != nil {
return err
}
ctx := context.Background()
defer func() {
conn.Exec(ctx, "DROP TABLE example")
}()
if err := conn.Exec(ctx, `DROP TABLE IF EXISTS example`); err != nil {
return err
}
err = conn.Exec(ctx, `
CREATE TABLE IF NOT EXISTS example (
Col1 UInt64,
Col2 String
) engine=Memory
`)

batch, err := New(ctx, conn, "INSERT INTO example")
if err != nil {
return err
}

if err = batch.Append(1, "test-1"); err != nil {
return err
}

if err = batch.Send(); err != nil {
return err
}

if err = batch.Append(2, "test-2"); err != nil {
return err
}

if err = batch.Send(); err != nil {
return err
}

var count uint64
if err = conn.QueryRow(context.Background(), `SELECT COUNT(*) FROM example`).Scan(&count); err != nil {
return err
}

if count != uint64(2) {
return errors.New("count must be 2")
}

return nil
}

type YourBatch struct {
ctx context.Context

insertStatement string

conn driver.Conn
batch driver.Batch
}

func New(ctx context.Context, conn driver.Conn, insertStatement string) (*YourBatch, error) {
batch, err := conn.PrepareBatch(ctx, insertStatement, driver.WithReleaseConnection())
if err != nil {
return nil, err
}

return &YourBatch{
ctx: ctx,
insertStatement: insertStatement,
conn: conn,
batch: batch,
}, nil
}

func (b *YourBatch) Append(col1 uint64, col2 string) error {
return b.batch.Append(
col1,
col2,
)
}

func (b *YourBatch) Send() error {
if err := b.batch.Send(); err != nil {
return err
}

return b.reset()
}

func (b *YourBatch) reset() error {
batch, err := b.conn.PrepareBatch(b.ctx, b.insertStatement, driver.WithReleaseConnection())
if err != nil {
return err
}

b.batch = batch

return nil
}
4 changes: 4 additions & 0 deletions examples/clickhouse_api/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ func TestBatchInsert(t *testing.T) {
require.NoError(t, BatchInsert())
}

func TestBatchWithReleaseConnection(t *testing.T) {
require.NoError(t, BatchWithReleaseConnection())
}

func TestAuthConnect(t *testing.T) {
require.NoError(t, Auth())
}
Expand Down
2 changes: 1 addition & 1 deletion lib/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type (
Select(ctx context.Context, dest any, query string, args ...any) error
Query(ctx context.Context, query string, args ...any) (Rows, error)
QueryRow(ctx context.Context, query string, args ...any) Row
PrepareBatch(ctx context.Context, query string) (Batch, error)
PrepareBatch(ctx context.Context, query string, opts ...PrepareBatchOption) (Batch, error)
Exec(ctx context.Context, query string, args ...any) error
AsyncInsert(ctx context.Context, query string, wait bool, args ...any) error
Ping(context.Context) error
Expand Down
13 changes: 13 additions & 0 deletions lib/driver/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package driver

type PrepareBatchOptions struct {
ReleaseConnection bool
}

type PrepareBatchOption func(options *PrepareBatchOptions)

func WithReleaseConnection() PrepareBatchOption {
return func(options *PrepareBatchOptions) {
options.ReleaseConnection = true
}
}
Loading

0 comments on commit 9ffd7d0

Please sign in to comment.