diff --git a/clickhouse_rows.go b/clickhouse_rows.go index 698905e7f0..46f8a32d6c 100644 --- a/clickhouse_rows.go +++ b/clickhouse_rows.go @@ -46,6 +46,9 @@ func (r *rows) Next() (result bool) { } next: if r.row >= r.block.Rows() { + if r.stream == nil { + return false + } select { case err := <-r.errors: if err != nil { @@ -95,7 +98,16 @@ func (r *rows) Columns() []string { } func (r *rows) Close() error { - active := 2 + if r.errors == nil && r.stream == nil { + return r.err + } + active := 0 + if r.errors != nil { + active++ + } + if r.stream != nil { + active++ + } for { select { case _, ok := <-r.stream: diff --git a/conn_http.go b/conn_http.go index bdd8d58e85..d1c1cd6971 100644 --- a/conn_http.go +++ b/conn_http.go @@ -75,49 +75,32 @@ type HTTPReaderWriter struct { method CompressionMethod } -func (rw HTTPReaderWriter) read(res *http.Response) ([]byte, error) { +// NewReader will return a reader that will decompress data if needed. +func (rw *HTTPReaderWriter) NewReader(res *http.Response) (io.Reader, error) { enc := res.Header.Get("Content-Encoding") if !res.Uncompressed && rw.method.String() == enc { switch rw.method { case CompressionGZIP: reader := rw.reader.(*gzip.Reader) - defer reader.Close() if err := reader.Reset(res.Body); err != nil { return nil, err } - body, err := io.ReadAll(reader) - if err != nil { - return nil, err - } - return body, nil + return reader, nil case CompressionDeflate: - reader := rw.reader.(io.ReadCloser) - defer reader.Close() - if err := rw.reader.(flate.Resetter).Reset(res.Body, nil); err != nil { - return nil, err - } - body, err := io.ReadAll(reader) - if err != nil { + reader := rw.reader + if err := reader.(flate.Resetter).Reset(res.Body, nil); err != nil { return nil, err } - return body, nil + return reader, nil case CompressionBrotli: reader := rw.reader.(*brotli.Reader) if err := reader.Reset(res.Body); err != nil { return nil, err } - body, err := io.ReadAll(reader) - if err != nil { - return nil, err - } - return body, nil + return reader, nil } } - body, err := io.ReadAll(res.Body) - if err != nil { - return nil, err - } - return body, nil + return res.Body, nil } func (rw *HTTPReaderWriter) reset(pw *io.PipeWriter) io.WriteCloser { @@ -436,27 +419,21 @@ func (h *httpConnect) sendQuery(ctx context.Context, query string, options *Quer func (h *httpConnect) readRawResponse(response *http.Response) (body []byte, err error) { rw := h.compressionPool.Get() - defer response.Body.Close() defer h.compressionPool.Put(rw) - if body, err = rw.read(response); err != nil { + + reader, err := rw.NewReader(response) + if err != nil { return nil, err } if h.compression == CompressionLZ4 || h.compression == CompressionZSTD { - result := make([]byte, len(body)) - reader := chproto.NewReader(bytes.NewReader(body)) - reader.EnableCompression() - defer reader.DisableCompression() - for { - b, err := reader.ReadByte() - if err != nil { - if errors.Is(err, io.EOF) { - break - } - return nil, err - } - result = append(result, b) - } - return result, nil + chReader := chproto.NewReader(reader) + chReader.EnableCompression() + reader = chReader + } + + body, err = io.ReadAll(reader) + if err != nil { + return nil, err } return body, nil } @@ -549,14 +526,13 @@ func (h *httpConnect) executeRequest(req *http.Request) (*http.Response, error) if err != nil { return nil, err } - if resp.StatusCode != http.StatusOK { + if resp.StatusCode != http.StatusOK { + defer resp.Body.Close() msg, err := h.readRawResponse(resp) - if err != nil { return nil, fmt.Errorf("clickhouse [execute]:: %d code: failed to read the response: %w", resp.StatusCode, err) } - return nil, fmt.Errorf("clickhouse [execute]:: %d code: %s", resp.StatusCode, string(msg)) } return resp, nil diff --git a/conn_http_query.go b/conn_http_query.go index 3716a28513..92298a71a3 100644 --- a/conn_http_query.go +++ b/conn_http_query.go @@ -18,12 +18,12 @@ package clickhouse import ( - "bytes" "context" "errors" + "io" + chproto "github.com/ClickHouse/ch-go/proto" "github.com/ClickHouse/clickhouse-go/v2/lib/proto" - "io" ) // release is ignored, because http used by std with empty release function @@ -50,52 +50,48 @@ func (h *httpConnect) query(ctx context.Context, release func(*connect, error), if err != nil { return nil, err } - defer res.Body.Close() - // detect compression from http Content-Encoding header - note user will need to have set enable_http_compression - // for CH to respond with compressed data - we don't set this automatically as they might not have permissions - var body []byte - //adding Accept-Encoding:gzip on your request means response won’t be automatically decompressed per https://github.com/golang/go/blob/master/src/net/http/transport.go#L182-L190 - - rw := h.compressionPool.Get() - body, err = rw.read(res) - bufferSize := h.blockBufferSize - if options.blockBufferSize > 0 { - // allow block buffer sze to be overridden per query - bufferSize = options.blockBufferSize - } - var ( - errCh = make(chan error) - stream = make(chan *proto.Block, bufferSize) - ) - if len(body) == 0 { - // queries with no results can get an empty body - go func() { - close(stream) - close(errCh) - }() + if res.ContentLength == 0 { + block := &proto.Block{} return &rows{ - err: nil, - stream: stream, - errors: errCh, - block: &proto.Block{}, - columns: []string{}, + block: block, + columns: block.ColumnsNames(), structMap: &structMap{}, }, nil } + + rw := h.compressionPool.Get() + // The HTTPReaderWriter.NewReader will create a reader that will decompress it if needed, + // cause adding Accept-Encoding:gzip on your request means response won’t be automatically decompressed + // per https://github.com/golang/go/blob/master/src/net/http/transport.go#L182-L190. + // Note user will need to have set enable_http_compression for CH to respond with compressed data. we don't set this + // automatically as they might not have permissions. + reader, err := rw.NewReader(res) if err != nil { + res.Body.Close() + h.compressionPool.Put(rw) return nil, err } - h.compressionPool.Put(rw) - reader := chproto.NewReader(bytes.NewReader(body)) - block, err := h.readData(ctx, reader) - if err != nil { + chReader := chproto.NewReader(reader) + block, err := h.readData(ctx, chReader) + if err != nil && !errors.Is(err, io.EOF) { + res.Body.Close() + h.compressionPool.Put(rw) return nil, err } + bufferSize := h.blockBufferSize + if options.blockBufferSize > 0 { + // allow block buffer sze to be overridden per query + bufferSize = options.blockBufferSize + } + var ( + errCh = make(chan error) + stream = make(chan *proto.Block, bufferSize) + ) go func() { for { - block, err := h.readData(ctx, reader) + block, err := h.readData(ctx, chReader) if err != nil { // ch-go wraps EOF errors if !errors.Is(err, io.EOF) { @@ -110,10 +106,15 @@ func (h *httpConnect) query(ctx context.Context, release func(*connect, error), case stream <- block: } } + res.Body.Close() + h.compressionPool.Put(rw) close(stream) close(errCh) }() + if block == nil { + block = &proto.Block{} + } return &rows{ block: block, stream: stream,