From a61d6901f0ddd1826b3ce582600a6afc9acdb5d5 Mon Sep 17 00:00:00 2001 From: Sergei Petrunia Date: Fri, 8 Aug 2014 00:35:03 +0400 Subject: [PATCH] Issue #4: Efficient VARCHAR storage for keys - Implemented efficient storage for VARCHARs in Keys. - Also, NULL values are stored as one NULL-byte (without "default value" as in KeyTupleFormat). --- storage/rocksdb/ha_rocksdb.cc | 73 ++---- storage/rocksdb/rdb_datadic.cc | 446 ++++++++++++++++++++++++++------- storage/rocksdb/rdb_datadic.h | 115 ++++++--- 3 files changed, 469 insertions(+), 165 deletions(-) diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index c82b5f8e5326..871fe5fc23b2 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -749,41 +749,6 @@ int ha_rocksdb::convert_record_from_storage_format(rocksdb::Slice *slice, } -/* - A simple string reader. -*/ - -class Stream_reader -{ - const char* ptr; - uint len; -public: - Stream_reader(const std::string &str) - { - ptr= &str.at(0); - len= str.length(); - } - /* - Read the next @param size bytes. Returns pointer to the bytes read, or - NULL if the remaining string doesn't have that many bytes. - */ - const char *read(uint size) - { - const char *res; - if (len < size) - res= NULL; - else - { - res= ptr; - ptr += size; - len -= size; - } - return res; - } - uint remaining_bytes() { return len; } -}; - - /* Unpack the record in this->retrieved_record from storage format into buf (which can be table->record[0] or table->record[1]) @@ -893,6 +858,10 @@ static void make_dbname_tablename(StringBuffer<64> *str, TABLE *table_arg) } +/* + Setup data needed to convert table->record[] to and from record storage format. +*/ + void ha_rocksdb::setup_field_converters() { uint i; @@ -906,7 +875,7 @@ void ha_rocksdb::setup_field_converters() { Field *field= table->field[i]; field_enc[i].field_type= field->real_type(); - //TODO: setup other copying data + if (field->real_maybe_null()) { field_enc[i].null_mask= cur_null_mask; @@ -1303,8 +1272,11 @@ int ha_rocksdb::secondary_index_read(int keyno, uchar *buf) { int rc; //TODO: We could here check if we have ran out of range we're scanning - uint size= key_descr[keyno]->get_primary_key_tuple(pk_descr, key.data(), + uint size= key_descr[keyno]->get_primary_key_tuple(pk_descr, &key, (char*) pk_packed_tuple); + if (size == (uint)-1) + return HA_ERR_INTERNAL_ERROR; + last_rowkey.copy((const char*)pk_packed_tuple, size, &my_charset_bin); if (keyread_only && !lock_rows) @@ -1533,7 +1505,7 @@ int ha_rocksdb::index_read_map(uchar *buf, const uchar *key, if (find_flag == HA_READ_PREFIX_LAST) { packed_size= kd->pack_index_tuple(table, sec_key_packed_tuple, key, - n_used_parts); + keypart_map); /* Check if the record has the same search prefix. */ @@ -1582,21 +1554,26 @@ int ha_rocksdb::index_read_map(uchar *buf, const uchar *key, } else { - pk_size= kd->get_primary_key_tuple(pk_descr, rkey.data(), + pk_size= kd->get_primary_key_tuple(pk_descr, &rkey, (char*) pk_packed_tuple); - last_rowkey.copy((const char*)pk_packed_tuple, pk_size, &my_charset_bin); - - if (keyread_only && !lock_rows) + if (pk_size != (uint)-1) { - /* Get the key columns and primary key value */ - rocksdb::Slice value= scan_it->value(); - if (kd->unpack_record(table, buf, &rkey, &value)) - rc= HA_ERR_INTERNAL_ERROR; + last_rowkey.copy((const char*)pk_packed_tuple, pk_size, &my_charset_bin); + + if (keyread_only && !lock_rows) + { + /* Get the key columns and primary key value */ + rocksdb::Slice value= scan_it->value(); + if (kd->unpack_record(table, buf, &rkey, &value)) + rc= HA_ERR_INTERNAL_ERROR; + else + rc= 0; + } else - rc= 0; + rc= get_row_by_rowid(buf, (const char*)pk_packed_tuple, pk_size); } else - rc= get_row_by_rowid(buf, (const char*)pk_packed_tuple, pk_size); + rc= HA_ERR_INTERNAL_ERROR; } if (rc) diff --git a/storage/rocksdb/rdb_datadic.cc b/storage/rocksdb/rdb_datadic.cc index 357f8b11b4d3..da5a24b583ae 100644 --- a/storage/rocksdb/rdb_datadic.cc +++ b/storage/rocksdb/rdb_datadic.cc @@ -48,10 +48,12 @@ uint32 read_int(char **data) RDBSE_KEYDEF::~RDBSE_KEYDEF() { - if (pk_key_parts) - my_free(pk_key_parts); + if (pk_part_no) + my_free(pk_part_no); if (pack_info) my_free(pack_info); + if (pack_buffer) + my_free(pack_buffer); } @@ -72,12 +74,12 @@ void RDBSE_KEYDEF::setup(TABLE *tbl) if (keyno != tbl->s->primary_key) { n_pk_key_parts= pk_info->actual_key_parts; - pk_key_parts= (PK_KEY_PART*)my_malloc(sizeof(PK_KEY_PART) * n_pk_key_parts, MYF(0)); + pk_part_no= (uint*)my_malloc(sizeof(uint)*n_pk_key_parts, MYF(0)); } else { pk_info= NULL; - pk_key_parts= 0; + pk_part_no= NULL; } // "unique" secondary keys support: @@ -94,36 +96,39 @@ void RDBSE_KEYDEF::setup(TABLE *tbl) size_t size= sizeof(Field_pack_info) * m_key_parts; pack_info= (Field_pack_info*)my_malloc(size, MYF(0)); - uint len= INDEX_NUMBER_SIZE; + size_t max_len= INDEX_NUMBER_SIZE; int unpack_len= 0; KEY_PART_INFO *key_part= key_info->key_part; + int max_part_len= 0; /* this loop also loops over the 'extended key' tail */ for (uint i= 0; i < m_key_parts; i++) { Field *field= key_part->field; if (field->real_maybe_null()) - len +=1; // NULL-byte + max_len +=1; // NULL-byte pack_info[i].setup(field); - pack_info[i].image_offset= len; pack_info[i].unpack_data_offset= unpack_len; if (pk_info) { + pk_part_no[i]= -1; for (uint j= 0; j < n_pk_key_parts; j++) { if (field->field_index == pk_info->key_part[j].field->field_index) { - pk_key_parts[j].offset= len; - pk_key_parts[j].size= pack_info[i].image_len; + pk_part_no[i]= j; + break; } } } - len += pack_info[i].image_len; + max_len += pack_info[i].max_image_len; unpack_len += pack_info[i].unpack_data_len; + max_part_len= std::max(max_part_len, pack_info[i].max_image_len); + key_part++; /* For "unique" secondary indexes, pretend they have "index extensions" */ if (unique_secondary_index && i+1 == key_info->actual_key_parts) @@ -131,8 +136,10 @@ void RDBSE_KEYDEF::setup(TABLE *tbl) key_part= pk_info->key_part; } } - maxlength= len; + maxlength= max_len; unpack_data_len= unpack_len; + + pack_buffer= (uchar*)my_malloc(max_part_len, MYF(0)); } } @@ -158,24 +165,75 @@ void RDBSE_KEYDEF::setup(TABLE *tbl) */ uint RDBSE_KEYDEF::get_primary_key_tuple(RDBSE_KEYDEF *pk_descr, - const char *key, char *pk_buffer) + const rocksdb::Slice *key, + char *pk_buffer) { uint size= 0; char *buf= pk_buffer; DBUG_ASSERT(n_pk_key_parts); - // copy the PK number + /* Put the PK number */ store_index_number((uchar*)buf, pk_descr->index_number); buf += INDEX_NUMBER_SIZE; size += INDEX_NUMBER_SIZE; - for (uint j= 0; j < n_pk_key_parts; j++) + const char* start_offs[MAX_REF_PARTS]; + const char* end_offs[MAX_REF_PARTS]; + int pk_key_part; + uint i; + Stream_reader reader(key); + + // Skip the index number + if ((!reader.read(INDEX_NUMBER_SIZE))) + return (uint)-1; + + for (i= 0; i < m_key_parts; i++) + { + if ((pk_key_part= pk_part_no[i]) != -1) + { + start_offs[pk_key_part]= reader.get_current_ptr(); + } + + bool have_value= true; + /* It is impossible to unpack the column. Skip it. */ + if (pack_info[i].maybe_null) + { + const char* nullp; + if (!(nullp= reader.read(1))) + return (uint)-1; + if (*nullp == 0) + { + /* This is a NULL value */ + have_value= false; + } + else + { + /* If NULL marker is not '0', it can be only '1' */ + if (*nullp != 1) + return (uint)-1; + } + } + + if (have_value) + { + if (pack_info[i].skip_func(&pack_info[i], &reader)) + return (uint)-1; + } + + if (pk_key_part != -1) + { + end_offs[pk_key_part]= reader.get_current_ptr(); + } + } + + for (i=0; i < n_pk_key_parts; i++) { - uint len= pk_key_parts[j].size; - memcpy(buf, key + pk_key_parts[j].offset, len); - buf += len; - size += len; + uint part_size= end_offs[i] - start_offs[i]; + memcpy(buf, start_offs[i], end_offs[i] - start_offs[i]); + buf += part_size; + size += part_size; } + return size; } @@ -227,6 +285,7 @@ static Field *get_field_by_keynr(TABLE *tbl, KEY *key_info, uint part) } } + /* Get index columns from the record and pack them into mem-comparable form. @@ -241,6 +300,9 @@ static Field *get_field_by_keynr(TABLE *tbl, KEY *key_info, uint part) @detail Some callers do not need the unpack information, they can pass unpack_info=NULL, unpack_info_len=NULL. + + @return + Length of the packed tuple */ uint RDBSE_KEYDEF::pack_record(TABLE *tbl, const uchar *record, @@ -261,43 +323,35 @@ uint RDBSE_KEYDEF::pack_record(TABLE *tbl, const uchar *record, for (uint i=0; i < n_key_parts; i++) { - /* - Field *field= (i < key_info->actual_key_parts)? - key_info->key_part[i].field : - tbl->key_info[tbl->s->primary_key].key_part[i - - key_info->actual_key_parts].field;*/ Field *field= get_field_by_keynr(tbl, key_info, i); my_ptrdiff_t ptr_diff= record - tbl->record[0]; - field->move_field_offset(ptr_diff); - const int length= pack_info[i].image_len; if (field->real_maybe_null()) { - if (field->is_real_null()) + if (field->is_real_null(ptr_diff)) { /* NULL value. store '\0' so that it sorts before non-NULL values */ *tuple++ = 0; - memset(tuple, 0, length); + /* That's it, don't store anything else */ + continue; } else { - // store '1' + /* Not a NULL value. Store '1' */ *tuple++ = 1; - field->make_sort_key(tuple, length); } } - else - field->make_sort_key(tuple, length); - tuple += length; + field->move_field_offset(ptr_diff); + pack_info[i].pack_func(&pack_info[i], field, pack_buffer, &tuple); - if (unpack_end && pack_info && pack_info[i].make_unpack_info_func) + /* Make "unpack info" to be stored in the value */ + if (unpack_end && pack_info[i].make_unpack_info_func) { pack_info[i].make_unpack_info_func(&pack_info[i], field, unpack_end); unpack_end += pack_info[i].unpack_data_len; } - field->move_field_offset(-ptr_diff); } @@ -308,68 +362,122 @@ uint RDBSE_KEYDEF::pack_record(TABLE *tbl, const uchar *record, } +void pack_with_make_sort_key(Field_pack_info *fpi, Field *field, + uchar *buf __attribute__((unused)), + uchar **dst) +{ + const int max_len= fpi->max_image_len; + field->make_sort_key(*dst, max_len); + *dst += max_len; +} + + /* Take mem-comparable form and unpack_info and unpack it to Table->record @detail not all indexes support this + + @return + 0 - Ok + 1 - Data format error. */ int RDBSE_KEYDEF::unpack_record(TABLE *table, uchar *buf, const rocksdb::Slice *packed_key, const rocksdb::Slice *unpack_info) { - int res= 0; KEY * const key_info= &table->key_info[keyno]; - const uchar * const key_ptr= (const uchar*)packed_key->data(); + Stream_reader reader(packed_key); const uchar * const unpack_ptr= (const uchar*)unpack_info->data(); - - if (packed_key->size() != max_storage_fmt_length()) - return 1; + my_ptrdiff_t ptr_diff= buf - table->record[0]; if (unpack_info->size() != unpack_data_len) return 1; + // Skip the index number + if ((!reader.read(INDEX_NUMBER_SIZE))) + return (uint)-1; + for (uint i= 0; i < m_key_parts ; i++) { Field_pack_info *fpi= &pack_info[i]; - //Field *field= fpi->field; Field *field= get_field_by_keynr(table, key_info, i); if (fpi->unpack_func) { - my_ptrdiff_t ptr_diff= buf - table->record[0]; - field->move_field_offset(ptr_diff); + /* It is possible to unpack this column. Do it. */ if (fpi->maybe_null) { - if (*(key_ptr + (fpi->image_offset - 1)) == 0) - field->set_null(); + const char* nullp; + if (!(nullp= reader.read(1))) + return 1; + if (*nullp == 0) + { + field->set_null(ptr_diff); + continue; + } + else if (*nullp == 1) + field->set_notnull(ptr_diff); else - field->set_notnull(); + return 1; } - res= fpi->unpack_func(fpi, field, key_ptr + fpi->image_offset, + field->move_field_offset(ptr_diff); + int res= fpi->unpack_func(fpi, field, &reader, unpack_ptr + fpi->unpack_data_offset); field->move_field_offset(-ptr_diff); if (res) - break; /* Error */ + return 1; + } + else + { + /* It is impossible to unpack the column. Skip it. */ + if (fpi->maybe_null) + { + const char* nullp; + if (!(nullp= reader.read(1))) + return 1; + if (*nullp == 0) + { + /* This is a NULL value */ + continue; + } + /* If NULL marker is not '0', it can be only '1' */ + if (*nullp != 1) + return 1; + } + if (fpi->skip_func(fpi, &reader)) + return 1; } } - return res; + return 0; } /////////////////////////////////////////////////////////////////////////////////////////// // Field_pack_info /////////////////////////////////////////////////////////////////////////////////////////// +int skip_max_length(Field_pack_info *fpi, Stream_reader *reader) +{ + if (!reader->read(fpi->max_image_len)) + return 1; + return 0; +} + + int unpack_integer(Field_pack_info *fpi, Field *field, - const uchar *from, const uchar *unpack_info) + Stream_reader *reader, const uchar *unpack_info) { - const int length= field->pack_length(); + const int length= fpi->max_image_len; uchar *to= field->ptr; + const uchar *from; + + if (!(from= (const uchar*)reader->read(length))) + return 1; /* Mem-comparable image doesn't have enough bytes */ #ifdef WORDS_BIGENDIAN { @@ -394,12 +502,20 @@ int unpack_integer(Field_pack_info *fpi, Field *field, } -/* Unpack the string by copying it over */ +/* + Unpack the string by copying it over. + This is for BINARY(n) where the value occupies the whole length. +*/ + int unpack_binary_str(Field_pack_info *fpi, Field *field, - const uchar *tuple, + Stream_reader *reader, const uchar *unpack_info) { - memcpy(field->ptr + fpi->field_data_offset, tuple, fpi->image_len); + const char* from; + if (!(from= reader->read(fpi->max_image_len))) + return 1; /* Mem-comparable image doesn't have enough bytes */ + + memcpy(field->ptr + fpi->field_data_offset, from, fpi->max_image_len); return 0; } @@ -410,14 +526,17 @@ int unpack_binary_str(Field_pack_info *fpi, Field *field, */ int unpack_utf8_str(Field_pack_info *fpi, Field *field, - const uchar *tuple, + Stream_reader *reader, const uchar *unpack_info) { CHARSET_INFO *cset= (CHARSET_INFO*)field->charset(); - const uchar *src= tuple; - const uchar *src_end= tuple + fpi->image_len; + const uchar *src; + if (!(src= (const uchar*)reader->read(fpi->max_image_len))) + return 1; /* Mem-comparable image doesn't have enough bytes */ + + const uchar *src_end= src + fpi->max_image_len; uchar *dst= field->ptr + fpi->field_data_offset; - uchar *dst_end= dst + fpi->image_len; + uchar *dst_end= dst + field->pack_length(); while (src < src_end) { @@ -433,38 +552,181 @@ int unpack_utf8_str(Field_pack_info *fpi, Field *field, } -int unpack_binary_varchar(Field_pack_info *fpi, Field *field, - const uchar *tuple, - const uchar *unpack_info) -{ - uint32 length_bytes= ((Field_varstring*)field)->length_bytes; - //copy the length bytes - memcpy(field->ptr, unpack_info, length_bytes); +/* + (ESCAPE_LENGTH-1) must be an even number so that pieces of lines are not + split in the middle of an UTF-8 character. +*/ +const uint ESCAPE_LENGTH=9; + - return unpack_binary_str(fpi, field, tuple, unpack_info); +void pack_with_varchar_encoding(Field_pack_info *fpi, Field *field, uchar *buf, + uchar **dst) +{ + const CHARSET_INFO *charset= field->charset(); + Field_varstring *field_var= (Field_varstring*)field; + + size_t value_length= (field_var->length_bytes == 1) ? + (uint) *field->ptr : + uint2korr(field->ptr); + size_t xfrm_len; + xfrm_len= charset->coll->strnxfrm(charset, + buf, fpi->max_image_len, + field_var->char_length(), + field_var->ptr + field_var->length_bytes, + value_length, + 0); + + /* Got a mem-comparable image in 'buf'. Now, produce varlength encoding */ + + size_t encoded_size= 0; + uchar *ptr= *dst; + while (1) + { + size_t copy_len= std::min((size_t)ESCAPE_LENGTH-1, xfrm_len); + size_t padding_bytes= ESCAPE_LENGTH - 1 - copy_len; + memcpy(ptr, buf, copy_len); + ptr += copy_len; + buf += copy_len; + // pad with zeros if necessary; + for (size_t idx= 0; idx < padding_bytes; idx++) + *(ptr++)= 0; + *(ptr++) = 255 - padding_bytes; + + xfrm_len -= copy_len; + encoded_size += ESCAPE_LENGTH; + if (padding_bytes !=0) + break; + } + *dst += encoded_size; } -int unpack_utf8_varchar(Field_pack_info *fpi, Field *field, - const uchar *tuple, - const uchar *unpack_info) +int unpack_binary_or_utf8_varchar(Field_pack_info *fpi, Field *field, + Stream_reader *reader, + const uchar *unpack_info) { - uint32 length_bytes= ((Field_varstring*)field)->length_bytes; - //copy the length bytes - memcpy(field->ptr, unpack_info, length_bytes); + const uchar *ptr; + size_t len= 0; + bool finished= false; + uchar *dst= field->ptr + fpi->field_data_offset; + Field_varstring* field_var= (Field_varstring*)field; + size_t dst_len= field_var->pack_length() - field_var->length_bytes; // How much we can unpack + uchar *dst_end= dst + dst_len; + + /* Decode the length-emitted encoding here */ + while ((ptr= (const uchar*)reader->read(ESCAPE_LENGTH))) + { + /* + ESCAPE_LENGTH-th byte has: + Set it to (255 - #pad) where #pad is 0 when the var length field filled + all N-1 previous bytes and #pad is otherwise the number of padding + bytes used. + */ + uchar pad= 255 - ptr[ESCAPE_LENGTH - 1]; //number of padding bytes + uchar used_bytes= ESCAPE_LENGTH - 1 - pad; + + if (used_bytes > ESCAPE_LENGTH - 1) + return 1; /* cannot store that much, invalid data */ + + if (dst_len < used_bytes) + { + /* Encoded index tuple is longer than the size in the record buffer? */ + return 1; + } - return unpack_utf8_str(fpi, field, tuple, unpack_info); + /* + Now, we need to decode used_bytes of data and append them to the value. + */ + if (fpi->varchar_charset == &my_charset_utf8_bin) + { + if (used_bytes & 1) + { + /* + UTF-8 characters are encoded into two-byte entities. There is no way + we can an odd number of bytes after encoding. + */ + return 1; + } + + const uchar *src= ptr; + const uchar *src_end= ptr + used_bytes; + while (src < src_end) + { + my_wc_t wc= (src[0] <<8) | src[1]; + src += 2; + const CHARSET_INFO *cset= fpi->varchar_charset; + int res= cset->cset->wc_mb(cset, wc, dst, dst_end); + DBUG_ASSERT(res > 0 && res <=3); + if (res < 0) + return 1; + dst += res; + len += res; + dst_len -= res; + } + } + else + { + memcpy(dst, ptr, used_bytes); + dst += used_bytes; + dst_len -= used_bytes; + len += used_bytes; + } + + if (used_bytes < ESCAPE_LENGTH - 1) + { + finished= true; + break; + } + } + + if (!finished) + return 1; + + /* Save the length */ + if (field_var->length_bytes == 1) + { + field->ptr[0]= len; + } + else + { + DBUG_ASSERT(field_var->length_bytes == 2); + int2store(field->ptr, len); + } + return 0; } -/* - For varchar, save the length. -*/ -void make_varchar_unpack_info(Field_pack_info *fsi, Field *field, uchar *unpack_data) +int skip_variable_length(Field_pack_info *fpi, Stream_reader *reader) { - // TODO: use length from fsi. - Field_varstring *fv= (Field_varstring*)field; - memcpy(unpack_data, fv->ptr, fv->length_bytes); + const uchar *ptr; + bool finished= false; + + /* Decode the length-emitted encoding here */ + while ((ptr= (const uchar*)reader->read(ESCAPE_LENGTH))) + { + /* + ESCAPE_LENGTH-th byte has: + Set it to (255 - #pad) where #pad is 0 when the var length field filled + all N-1 previous bytes and #pad is otherwise the number of padding + bytes used. + */ + uchar pad= 255 - ptr[ESCAPE_LENGTH - 1]; //number of padding bytes + uchar used_bytes= ESCAPE_LENGTH - 1 - pad; + + if (used_bytes > ESCAPE_LENGTH - 1) + return 1; /* cannot store that much, invalid data */ + + if (used_bytes < ESCAPE_LENGTH - 1) + { + finished= true; + break; + } + } + + if (!finished) + return 1; + + return 0; } @@ -491,7 +753,14 @@ bool Field_pack_info::setup(Field *field) field_data_offset= 0; /* Calculate image length. By default, is is pack_length() */ - image_len= field->pack_length(); + max_image_len= field->pack_length(); + + skip_func= skip_max_length; + pack_func= pack_with_make_sort_key; + + make_unpack_info_func= NULL; + unpack_data_len= 0; + if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) { /* @@ -499,7 +768,7 @@ bool Field_pack_info::setup(Field *field) field->field_length = field->char_length() * cs->mbmaxlen. */ const CHARSET_INFO *cs= field->charset(); - image_len= cs->coll->strnxfrmlen(cs, field->field_length); + max_image_len= cs->coll->strnxfrmlen(cs, field->field_length); } if (type == MYSQL_TYPE_LONGLONG || @@ -509,31 +778,32 @@ bool Field_pack_info::setup(Field *field) type == MYSQL_TYPE_TINY) { unpack_func= unpack_integer; - make_unpack_info_func= NULL; return true; } const bool is_varchar= (type == MYSQL_TYPE_VARCHAR); + + const CHARSET_INFO *cs= field->charset(); if (is_varchar) { - make_unpack_info_func= make_varchar_unpack_info; - unpack_data_len= ((Field_varstring*)field)->length_bytes; + varchar_charset= cs; field_data_offset= ((Field_varstring*)field)->length_bytes; + skip_func= skip_variable_length; + pack_func= pack_with_varchar_encoding; + max_image_len= (max_image_len/(ESCAPE_LENGTH-1) + 1) * ESCAPE_LENGTH; } if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) { - const CHARSET_INFO *cs= field->charset(); - if (cs == &my_charset_bin || cs == &my_charset_latin1_bin) { - unpack_func= is_varchar? unpack_binary_varchar : unpack_binary_str; + unpack_func= is_varchar? unpack_binary_or_utf8_varchar : unpack_binary_str; res= true; } else if(cs == &my_charset_utf8_bin) { - unpack_func= is_varchar? unpack_utf8_varchar : unpack_utf8_str; + unpack_func= is_varchar? unpack_binary_or_utf8_varchar : unpack_utf8_str; res= true; } } diff --git a/storage/rocksdb/rdb_datadic.h b/storage/rocksdb/rdb_datadic.h index 784518286c42..2e2695db5e04 100644 --- a/storage/rocksdb/rdb_datadic.h +++ b/storage/rocksdb/rdb_datadic.h @@ -30,6 +30,59 @@ inline void store_index_number(uchar *dst, uint32 number) #endif } + +/* + A simple string reader. + - it keeps position within the string that we read from + - it prevents one from reading beyond the end of the string. + (todo: rename to String_reader) +*/ + +class Stream_reader +{ + const char* ptr; + uint len; +public: + Stream_reader(const std::string &str) + { + ptr= &str.at(0); + len= str.length(); + } + + Stream_reader(const rocksdb::Slice *slice) + { + ptr= slice->data(); + len= slice->size(); + } + + /* + Read the next @param size bytes. Returns pointer to the bytes read, or + NULL if the remaining string doesn't have that many bytes. + */ + const char *read(uint size) + { + const char *res; + if (len < size) + res= NULL; + else + { + res= ptr; + ptr += size; + len -= size; + } + return res; + } + uint remaining_bytes() { return len; } + + /* + Return pointer to data that will be read by next read() call (if there is + nothing left to read, returns pointer to beyond the end of previous read() + call) + */ + const char *get_current_ptr() { return ptr; } +}; + + /* An object of this class represents information about an index in an SQL table. It provides services to encode and decode index tuples. @@ -101,6 +154,8 @@ class RDBSE_KEYDEF This can be used to compare prefixes. if X is a prefix of Y, then we consider that X = Y. */ + // psergey-todo: this seems to work for variable-length keys, does it? + // {pb, b_len} describe the lookup key, which can be a prefix of pa/a_len. int cmp_full_keys(const char *pa, uint a_len, const char *pb, uint b_len, uint n_parts) { @@ -124,8 +179,8 @@ class RDBSE_KEYDEF } /* Must only be called for secondary keys: */ - uint get_primary_key_tuple(RDBSE_KEYDEF *pk_descr, const char *key, - char *pk_buffer); + uint get_primary_key_tuple(RDBSE_KEYDEF *pk_descr, + const rocksdb::Slice *key, char *pk_buffer); /* Return max length of mem-comparable form */ uint max_storage_fmt_length() @@ -135,11 +190,12 @@ class RDBSE_KEYDEF RDBSE_KEYDEF(uint indexnr_arg, uint keyno_arg) : index_number(indexnr_arg), - pk_key_parts(NULL), + pk_part_no(NULL), pack_info(NULL), keyno(keyno_arg), m_key_parts(0), - maxlength(0) // means 'not intialized' + maxlength(0), // means 'not intialized' + pack_buffer(NULL) { store_index_number(index_number_storage_form, indexnr_arg); } @@ -160,22 +216,14 @@ class RDBSE_KEYDEF friend class RDBSE_TABLE_DEF; // for index_number above - class PK_KEY_PART - { - public: - uint offset; - uint size; - }; + /* Number of key parts in the primary key*/ + uint n_pk_key_parts; /* - Array of descriptions of primary key columns. - - element #0 describes the first PK column, - - element #1 describes the second PK column, and so forth. - the offsets are offsets of column representation in StorageFormat - representation of this index. + pk_part_no[X]=Y means that keypart #X of this key is key part #Y of the + primary key. Y==-1 means this column is not present in the primary key. */ - PK_KEY_PART *pk_key_parts; - uint n_pk_key_parts; + uint *pk_part_no; /* Array of index-part descriptors. */ Field_pack_info *pack_info; @@ -188,22 +236,25 @@ class RDBSE_KEYDEF */ uint m_key_parts; - /* - Length of the mem-comparable form. In the encoding we're using, it is - constant (any value will have this length). - */ + /* Maximum length of the mem-comparable form. */ uint maxlength; /* Length of the unpack_data */ uint unpack_data_len; + + uchar *pack_buffer; }; typedef void (*make_unpack_info_t) (Field_pack_info *fpi, Field *field, uchar *dst); typedef int (*index_field_unpack_t)(Field_pack_info *fpi, Field *field, - const uchar *tuple, + Stream_reader *reader, const uchar *unpack_info); +typedef int (*index_field_skip_t)(Field_pack_info *fpi, Stream_reader *reader); + +typedef void (*index_field_pack_t)(Field_pack_info *fpi, Field *field, uchar* buf, uchar **dst); + /* This stores information about how a field can be packed to mem-comparable form and unpacked back. @@ -212,14 +263,8 @@ typedef int (*index_field_unpack_t)(Field_pack_info *fpi, Field *field, class Field_pack_info { public: - /* - Offset of the image of this field in the mem-comparable image. This field - must be set from outside of the class - */ - int image_offset; - /* Length of mem-comparable image of the field, in bytes */ - int image_len; + int max_image_len; /* Length of image in the unpack data */ int unpack_data_len; @@ -230,6 +275,13 @@ class Field_pack_info bool maybe_null; /* TRUE <=> NULL-byte is stored */ + /* + Valid only for VARCHAR fields. + */ + const CHARSET_INFO *varchar_charset; + + index_field_pack_t pack_func; + /* Pack function is assumed to be: - store NULL-byte, if needed @@ -246,6 +298,11 @@ class Field_pack_info */ index_field_unpack_t unpack_func; + /* + This function skips over mem-comparable form. + */ + index_field_skip_t skip_func; + bool setup(Field *field); };