From 15feaddc4f6688e25178a63a154a4cd4e360292c Mon Sep 17 00:00:00 2001 From: JP Swinski Date: Thu, 23 Feb 2023 16:54:28 +0000 Subject: [PATCH] parquet builder buffers records before writing out to table --- packages/arrow/ParquetBuilder.cpp | 451 ++++++++++++++++++-------- packages/arrow/ParquetBuilder.h | 38 ++- platforms/windows/OsApi.h | 11 + plugins/gedi/endpoints/gedi04ap.lua | 12 +- plugins/icesat2/endpoints/atl03sp.lua | 12 +- plugins/icesat2/endpoints/atl06p.lua | 12 +- plugins/icesat2/endpoints/atl08p.lua | 12 +- 7 files changed, 364 insertions(+), 184 deletions(-) diff --git a/packages/arrow/ParquetBuilder.cpp b/packages/arrow/ParquetBuilder.cpp index f6a2cf337..627cb8af7 100644 --- a/packages/arrow/ParquetBuilder.cpp +++ b/packages/arrow/ParquetBuilder.cpp @@ -65,8 +65,8 @@ struct ParquetBuilder::impl shared_ptr schema; unique_ptr parquetWriter; - static shared_ptr defineTableSchema (field_list_t& field_list, const char* rec_type, const geo_data_t& geo); - static bool addFieldsToSchema (vector>& schema_vector, field_list_t& field_list, const geo_data_t& geo, const char* rec_type, int offset); + static shared_ptr defineTableSchema (field_list_t& field_list, const char* batch_rec_type, const geo_data_t& geo); + static bool addFieldsToSchema (vector>& schema_vector, field_list_t& field_list, const geo_data_t& geo, const char* batch_rec_type, int offset); static void appendGeoMetaData (const std::shared_ptr& metadata); static void appendServerMetaData (const std::shared_ptr& metadata); static void appendPandasMetaData (const std::shared_ptr& metadata, const shared_ptr& _schema, const field_iterator_t* field_iterator, const char* index_key); @@ -75,10 +75,10 @@ struct ParquetBuilder::impl /*---------------------------------------------------------------------------- * defineTableSchema *----------------------------------------------------------------------------*/ -shared_ptr ParquetBuilder::impl::defineTableSchema (field_list_t& field_list, const char* rec_type, const geo_data_t& geo) +shared_ptr ParquetBuilder::impl::defineTableSchema (field_list_t& field_list, const char* batch_rec_type, const geo_data_t& geo) { vector> schema_vector; - addFieldsToSchema(schema_vector, field_list, geo, rec_type, 0); + addFieldsToSchema(schema_vector, field_list, geo, batch_rec_type, 0); if(geo.as_geo) schema_vector.push_back(arrow::field("geometry", arrow::binary())); return make_shared(schema_vector); } @@ -86,12 +86,12 @@ shared_ptr ParquetBuilder::impl::defineTableSchema (field_list_t& /*---------------------------------------------------------------------------- * addFieldsToSchema *----------------------------------------------------------------------------*/ -bool ParquetBuilder::impl::addFieldsToSchema (vector>& schema_vector, field_list_t& field_list, const geo_data_t& geo, const char* rec_type, int offset) +bool ParquetBuilder::impl::addFieldsToSchema (vector>& schema_vector, field_list_t& field_list, const geo_data_t& geo, const char* batch_rec_type, int offset) { /* Loop Through Fields in Record */ char** field_names = NULL; RecordObject::field_t** fields = NULL; - int num_fields = RecordObject::getRecordFields(rec_type, &field_names, &fields); + int num_fields = RecordObject::getRecordFields(batch_rec_type, &field_names, &fields); for(int i = 0; i < num_fields; i++) { bool add_field_to_list = true; @@ -369,6 +369,7 @@ void ParquetBuilder::impl::appendPandasMetaData (const std::shared_ptr, , , , [, ], []) + * luaCreate - :parquet(, , , , , [, ], []) *----------------------------------------------------------------------------*/ int ParquetBuilder::luaCreate (lua_State* L) { @@ -404,11 +405,13 @@ int ParquetBuilder::luaCreate (lua_State* L) /* Get Parameters */ parms = (ArrowParms*)getLuaObject(L, 1, ArrowParms::OBJECT_TYPE); const char* outq_name = getLuaString(L, 2); - const char* rec_type = getLuaString(L, 3); - const char* id = getLuaString(L, 4); - const char* lon_key = getLuaString(L, 5, true, NULL); - const char* lat_key = getLuaString(L, 6, true, NULL); - const char* index_key = getLuaString(L, 7, true, NULL); + const char* inq_name = getLuaString(L, 3); + const char* rec_type = getLuaString(L, 4); + const char* batch_rec_type = getLuaString(L, 5); + const char* id = getLuaString(L, 6); + const char* lon_key = getLuaString(L, 7, true, NULL); + const char* lat_key = getLuaString(L, 8, true, NULL); + const char* index_key = getLuaString(L, 9, true, NULL); /* Build Geometry Fields */ geo_data_t geo; @@ -417,21 +420,21 @@ int ParquetBuilder::luaCreate (lua_State* L) { geo.as_geo = true; - geo.lon_field = RecordObject::getDefinedField(rec_type, lon_key); + geo.lon_field = RecordObject::getDefinedField(batch_rec_type, lon_key); if(geo.lon_field.type == RecordObject::INVALID_FIELD) { - throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to extract longitude field [%s] from record type <%s>", lon_key, rec_type); + throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to extract longitude field [%s] from record type <%s>", lon_key, batch_rec_type); } - geo.lat_field = RecordObject::getDefinedField(rec_type, lat_key); + geo.lat_field = RecordObject::getDefinedField(batch_rec_type, lat_key); if(geo.lat_field.type == RecordObject::INVALID_FIELD) { - throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to extract latitude field [%s] from record type <%s>", lat_key, rec_type); + throw RunTimeException(CRITICAL, RTE_ERROR, "Unable to extract latitude field [%s] from record type <%s>", lat_key, batch_rec_type); } } /* Create Dispatch */ - return createLuaObject(L, new ParquetBuilder(L, parms, outq_name, rec_type, id, geo, index_key)); + return createLuaObject(L, new ParquetBuilder(L, parms, outq_name, inq_name, rec_type, batch_rec_type, id, geo, index_key)); } catch(const RunTimeException& e) { @@ -464,12 +467,17 @@ void ParquetBuilder::deinit (void) /*---------------------------------------------------------------------------- * Constructor *----------------------------------------------------------------------------*/ -ParquetBuilder::ParquetBuilder (lua_State* L, ArrowParms* _parms, const char* outq_name, const char* rec_type, const char* id, geo_data_t geo, const char* index_key): - DispatchObject(L, LuaMetaName, LuaMetaTable) +ParquetBuilder::ParquetBuilder (lua_State* L, ArrowParms* _parms, + const char* outq_name, const char* inq_name, + const char* rec_type, const char* batch_rec_type, + const char* id, geo_data_t geo, const char* index_key): + LuaObject(L, OBJECT_TYPE, LuaMetaName, LuaMetaTable) { assert(parms); assert(outq_name); + assert(inq_name); assert(rec_type); + assert(batch_rec_type); assert(id); /* Save Parms */ @@ -478,11 +486,17 @@ ParquetBuilder::ParquetBuilder (lua_State* L, ArrowParms* _parms, const char* ou /* Allocate Private Implementation */ pimpl = new ParquetBuilder::impl; - /* Initialize Publisher */ - outQ = new Publisher(outq_name); + /* Row Based Parameters */ + rowSizeBytes = RecordObject::getRecordDataSize(batch_rec_type); + maxRowsInGroup = ROW_GROUP_SIZE / rowSizeBytes; - /* Row Size */ - rowSizeBytes = RecordObject::getRecordDataSize(rec_type); + /* Initialize Record Type */ + recType = StringLib::duplicate(rec_type); + + /* Initialize Queues */ + int qdepth = maxRowsInGroup * QUEUE_BUFFER_FACTOR; + outQ = new Publisher(outq_name, Publisher::defaultFree, qdepth); + inQ = new Subscriber(inq_name, MsgQ::SUBSCRIBER_OF_CONFIDENCE, qdepth); /* Initialize GeoParquet Option */ geoData = geo; @@ -491,7 +505,7 @@ ParquetBuilder::ParquetBuilder (lua_State* L, ArrowParms* _parms, const char* ou indexKey = StringLib::duplicate(index_key); /* Define Table Schema */ - pimpl->schema = pimpl->defineTableSchema(fieldList, rec_type, geoData); + pimpl->schema = pimpl->defineTableSchema(fieldList, batch_rec_type, geoData); fieldIterator = new field_iterator_t(fieldList); /* Create Unique Temporary Filename */ @@ -534,6 +548,10 @@ ParquetBuilder::ParquetBuilder (lua_State* L, ArrowParms* _parms, const char* ou if(result.ok()) pimpl->parquetWriter = std::move(result).ValueOrDie(); else mlog(CRITICAL, "Failed to open parquet writer: %s", result.status().ToString().c_str()); #endif + + /* Start Builder Thread */ + active = true; + builderPid = new Thread(builderThread, this); } /*---------------------------------------------------------------------------- @@ -541,31 +559,140 @@ ParquetBuilder::ParquetBuilder (lua_State* L, ArrowParms* _parms, const char* ou *----------------------------------------------------------------------------*/ ParquetBuilder::~ParquetBuilder(void) { + active = false; + delete builderPid; parms->releaseLuaObject(); delete [] fileName; if(indexKey) delete [] indexKey; + delete [] recType; delete outQ; + delete inQ; delete fieldIterator; delete pimpl; } /*---------------------------------------------------------------------------- - * processRecord + * builderThread *----------------------------------------------------------------------------*/ -bool ParquetBuilder::processRecord (RecordObject* record, okey_t key) +void* ParquetBuilder::builderThread(void* parm) { - (void)key; + ParquetBuilder* builder = (ParquetBuilder*)parm; + int row_cnt = 0; + + /* Early Exit on No Writer */ + if(!builder->pimpl->parquetWriter) + { + return NULL; + } - /* Determine Number of Rows in Record */ - int record_size_bytes = record->getAllocatedDataSize(); - int num_rows = record_size_bytes / rowSizeBytes; - int left_over = record_size_bytes % rowSizeBytes; - if(left_over > 0) + /* Loop Forever */ + while(builder->active) { - mlog(ERROR, "Invalid record size received for %s: %d %% %d != 0", record->getRecordType(), record_size_bytes, rowSizeBytes); - return false; + /* Receive Message */ + Subscriber::msgRef_t ref; + int recv_status = builder->inQ->receiveRef(ref, SYS_TIMEOUT); + if(recv_status > 0) + { + /* Process Record */ + if(ref.size > 0) + { + /* Get Record and Match to Type being Processed */ + RecordInterface* record = new RecordInterface((unsigned char*)ref.data, ref.size); + if(!StringLib::match(record->getRecordType(), builder->recType)) + { + delete record; + builder->outQ->postCopy(ref.data, ref.size); + builder->inQ->dereference(ref); + continue; + } + + /* Determine Rows in Record */ + int record_size_bytes = record->getAllocatedDataSize(); + int num_rows = record_size_bytes / builder->rowSizeBytes; + int left_over = record_size_bytes % builder->rowSizeBytes; + if(left_over > 0) + { + mlog(ERROR, "Invalid record size received for %s: %d %% %d != 0", record->getRecordType(), record_size_bytes, builder->rowSizeBytes); + delete record; // record is not batched, so must delete here + builder->inQ->dereference(ref); // record is not batched, so must dereference here + continue; + } + + /* Create Batch Structure */ + batch_t batch = { + .ref = ref, + .record = record, + .rows = num_rows + }; + + /* Add Batch to Ordering */ + builder->recordBatch.add(row_cnt, batch); + row_cnt += num_rows; + if(row_cnt >= builder->maxRowsInGroup) + { + builder->processRecordBatch(row_cnt); + row_cnt = 0; + } + } + else + { + /* Terminating Message */ + mlog(DEBUG, "Terminator received on %s, exiting parquet builder", builder->inQ->getName()); + builder->active = false; // breaks out of loop + builder->inQ->dereference(ref); // terminator is not batched, so must dereference here + } + } + else if(recv_status != MsgQ::STATE_TIMEOUT) + { + /* Break Out on Failure */ + mlog(CRITICAL, "Failed queue receive on %s with error %d", builder->inQ->getName(), recv_status); + builder->active = false; // breaks out of loop + } } + /* Process Remaining Records */ + builder->processRecordBatch(row_cnt); + row_cnt = 0; + + /* Close Parquet Writer */ + (void)builder->pimpl->parquetWriter->Close(); + + /* Send File to User */ + const char* _path = builder->parms->path; + int _path_len = StringLib::size(_path); + if((_path_len > 5) && + (_path[0] == 's') && + (_path[1] == '3') && + (_path[2] == ':') && + (_path[3] == '/') && + (_path[4] == '/')) + { + #ifdef __aws__ + builder->send2S3(&_path[5]); + #else + LuaEndpoint::generateExceptionStatus(RTE_ERROR, CRITICAL, outQ, NULL, "Output path specifies S3, but server compiled without AWS support"); + #endif + } + else + { + /* Stream Back to Client */ + builder->send2Client(); + } + + /* Signal Completion */ + builder->signalComplete(); + + /* Exit Thread */ + return NULL; +} + +/*---------------------------------------------------------------------------- + * processRecordBatch + *----------------------------------------------------------------------------*/ +void ParquetBuilder::processRecordBatch (int num_rows) +{ + batch_t batch; + /* Loop Through Fields in Schema */ vector> columns; for(int i = 0; i < fieldIterator->length; i++) @@ -580,10 +707,17 @@ bool ParquetBuilder::processRecord (RecordObject* record, okey_t key) { arrow::DoubleBuilder builder; (void)builder.Reserve(num_rows); - for(int row = 0; row < num_rows; row++) + unsigned long key = recordBatch.first(&batch); + while(key != (unsigned long)INVALID_KEY) { - builder.UnsafeAppend((double)record->getValueReal(field)); - field.offset += rowSizeBytes * 8; + int32_t starting_offset = field.offset; + for(int row = 0; row < batch.rows; row++) + { + builder.UnsafeAppend((double)batch.record->getValueReal(field)); + field.offset += rowSizeBytes * 8; + } + field.offset = starting_offset; + key = recordBatch.next(&batch); } (void)builder.Finish(&column); break; @@ -593,10 +727,17 @@ bool ParquetBuilder::processRecord (RecordObject* record, okey_t key) { arrow::FloatBuilder builder; (void)builder.Reserve(num_rows); - for(int row = 0; row < num_rows; row++) + unsigned long key = recordBatch.first(&batch); + while(key != (unsigned long)INVALID_KEY) { - builder.UnsafeAppend((float)record->getValueReal(field)); - field.offset += rowSizeBytes * 8; + int32_t starting_offset = field.offset; + for(int row = 0; row < batch.rows; row++) + { + builder.UnsafeAppend((float)batch.record->getValueReal(field)); + field.offset += rowSizeBytes * 8; + } + field.offset = starting_offset; + key = recordBatch.next(&batch); } (void)builder.Finish(&column); break; @@ -606,10 +747,17 @@ bool ParquetBuilder::processRecord (RecordObject* record, okey_t key) { arrow::Int8Builder builder; (void)builder.Reserve(num_rows); - for(int row = 0; row < num_rows; row++) + unsigned long key = recordBatch.first(&batch); + while(key != (unsigned long)INVALID_KEY) { - builder.UnsafeAppend((int8_t)record->getValueInteger(field)); - field.offset += rowSizeBytes * 8; + int32_t starting_offset = field.offset; + for(int row = 0; row < batch.rows; row++) + { + builder.UnsafeAppend((int8_t)batch.record->getValueInteger(field)); + field.offset += rowSizeBytes * 8; + } + field.offset = starting_offset; + key = recordBatch.next(&batch); } (void)builder.Finish(&column); break; @@ -619,10 +767,17 @@ bool ParquetBuilder::processRecord (RecordObject* record, okey_t key) { arrow::Int16Builder builder; (void)builder.Reserve(num_rows); - for(int row = 0; row < num_rows; row++) + unsigned long key = recordBatch.first(&batch); + while(key != (unsigned long)INVALID_KEY) { - builder.UnsafeAppend((int16_t)record->getValueInteger(field)); - field.offset += rowSizeBytes * 8; + int32_t starting_offset = field.offset; + for(int row = 0; row < batch.rows; row++) + { + builder.UnsafeAppend((int16_t)batch.record->getValueInteger(field)); + field.offset += rowSizeBytes * 8; + } + field.offset = starting_offset; + key = recordBatch.next(&batch); } (void)builder.Finish(&column); break; @@ -632,10 +787,17 @@ bool ParquetBuilder::processRecord (RecordObject* record, okey_t key) { arrow::Int32Builder builder; (void)builder.Reserve(num_rows); - for(int row = 0; row < num_rows; row++) + unsigned long key = recordBatch.first(&batch); + while(key != (unsigned long)INVALID_KEY) { - builder.UnsafeAppend((int32_t)record->getValueInteger(field)); - field.offset += rowSizeBytes * 8; + int32_t starting_offset = field.offset; + for(int row = 0; row < batch.rows; row++) + { + builder.UnsafeAppend((int32_t)batch.record->getValueInteger(field)); + field.offset += rowSizeBytes * 8; + } + field.offset = starting_offset; + key = recordBatch.next(&batch); } (void)builder.Finish(&column); break; @@ -645,10 +807,17 @@ bool ParquetBuilder::processRecord (RecordObject* record, okey_t key) { arrow::Int64Builder builder; (void)builder.Reserve(num_rows); - for(int row = 0; row < num_rows; row++) + unsigned long key = recordBatch.first(&batch); + while(key != (unsigned long)INVALID_KEY) { - builder.UnsafeAppend((int64_t)record->getValueInteger(field)); - field.offset += rowSizeBytes * 8; + int32_t starting_offset = field.offset; + for(int row = 0; row < batch.rows; row++) + { + builder.UnsafeAppend((int64_t)batch.record->getValueInteger(field)); + field.offset += rowSizeBytes * 8; + } + field.offset = starting_offset; + key = recordBatch.next(&batch); } (void)builder.Finish(&column); break; @@ -658,10 +827,17 @@ bool ParquetBuilder::processRecord (RecordObject* record, okey_t key) { arrow::UInt8Builder builder; (void)builder.Reserve(num_rows); - for(int row = 0; row < num_rows; row++) + unsigned long key = recordBatch.first(&batch); + while(key != (unsigned long)INVALID_KEY) { - builder.UnsafeAppend((uint8_t)record->getValueInteger(field)); - field.offset += rowSizeBytes * 8; + int32_t starting_offset = field.offset; + for(int row = 0; row < batch.rows; row++) + { + builder.UnsafeAppend((uint8_t)batch.record->getValueInteger(field)); + field.offset += rowSizeBytes * 8; + } + field.offset = starting_offset; + key = recordBatch.next(&batch); } (void)builder.Finish(&column); break; @@ -671,10 +847,17 @@ bool ParquetBuilder::processRecord (RecordObject* record, okey_t key) { arrow::UInt16Builder builder; (void)builder.Reserve(num_rows); - for(int row = 0; row < num_rows; row++) + unsigned long key = recordBatch.first(&batch); + while(key != (unsigned long)INVALID_KEY) { - builder.UnsafeAppend((uint16_t)record->getValueInteger(field)); - field.offset += rowSizeBytes * 8; + int32_t starting_offset = field.offset; + for(int row = 0; row < batch.rows; row++) + { + builder.UnsafeAppend((uint16_t)batch.record->getValueInteger(field)); + field.offset += rowSizeBytes * 8; + } + field.offset = starting_offset; + key = recordBatch.next(&batch); } (void)builder.Finish(&column); break; @@ -684,10 +867,17 @@ bool ParquetBuilder::processRecord (RecordObject* record, okey_t key) { arrow::UInt32Builder builder; (void)builder.Reserve(num_rows); - for(int row = 0; row < num_rows; row++) + unsigned long key = recordBatch.first(&batch); + while(key != (unsigned long)INVALID_KEY) { - builder.UnsafeAppend((uint32_t)record->getValueInteger(field)); - field.offset += rowSizeBytes * 8; + int32_t starting_offset = field.offset; + for(int row = 0; row < batch.rows; row++) + { + builder.UnsafeAppend((uint32_t)batch.record->getValueInteger(field)); + field.offset += rowSizeBytes * 8; + } + field.offset = starting_offset; + key = recordBatch.next(&batch); } (void)builder.Finish(&column); break; @@ -697,10 +887,17 @@ bool ParquetBuilder::processRecord (RecordObject* record, okey_t key) { arrow::UInt64Builder builder; (void)builder.Reserve(num_rows); - for(int row = 0; row < num_rows; row++) + unsigned long key = recordBatch.first(&batch); + while(key != (unsigned long)INVALID_KEY) { - builder.UnsafeAppend((uint64_t)record->getValueInteger(field)); - field.offset += rowSizeBytes * 8; + int32_t starting_offset = field.offset; + for(int row = 0; row < batch.rows; row++) + { + builder.UnsafeAppend((uint64_t)batch.record->getValueInteger(field)); + field.offset += rowSizeBytes * 8; + } + field.offset = starting_offset; + key = recordBatch.next(&batch); } (void)builder.Finish(&column); break; @@ -710,10 +907,17 @@ bool ParquetBuilder::processRecord (RecordObject* record, okey_t key) { arrow::TimestampBuilder builder(arrow::timestamp(arrow::TimeUnit::NANO), arrow::default_memory_pool()); (void)builder.Reserve(num_rows); - for(int row = 0; row < num_rows; row++) + unsigned long key = recordBatch.first(&batch); + while(key != (unsigned long)INVALID_KEY) { - builder.UnsafeAppend((int64_t)record->getValueInteger(field)); - field.offset += rowSizeBytes * 8; + int32_t starting_offset = field.offset; + for(int row = 0; row < batch.rows; row++) + { + builder.UnsafeAppend((int64_t)batch.record->getValueInteger(field)); + field.offset += rowSizeBytes * 8; + } + field.offset = starting_offset; + key = recordBatch.next(&batch); } (void)builder.Finish(&column); break; @@ -723,11 +927,18 @@ bool ParquetBuilder::processRecord (RecordObject* record, okey_t key) { arrow::StringBuilder builder; (void)builder.Reserve(num_rows); - for(int row = 0; row < num_rows; row++) + unsigned long key = recordBatch.first(&batch); + while(key != (unsigned long)INVALID_KEY) { - const char* str = record->getValueText(field); - builder.UnsafeAppend(str, StringLib::size(str)); - field.offset += rowSizeBytes * 8; + int32_t starting_offset = field.offset; + for(int row = 0; row < batch.rows; row++) + { + const char* str = batch.record->getValueText(field); + builder.UnsafeAppend(str, StringLib::size(str)); + field.offset += rowSizeBytes * 8; + } + field.offset = starting_offset; + key = recordBatch.next(&batch); } (void)builder.Finish(&column); break; @@ -750,83 +961,51 @@ bool ParquetBuilder::processRecord (RecordObject* record, okey_t key) RecordObject::field_t lat_field = geoData.lat_field; shared_ptr column; arrow::BinaryBuilder builder; - for(int row = 0; row < num_rows; row++) + unsigned long key = recordBatch.first(&batch); + while(key != (unsigned long)INVALID_KEY) { - wkbpoint_t point = { - #ifdef __be__ - .byteOrder = 0, - #else - .byteOrder = 1, - #endif - .wkbType = 1, - .x = record->getValueReal(lon_field), - .y = record->getValueReal(lat_field) - }; - (void)builder.Append((uint8_t*)&point, sizeof(wkbpoint_t)); - lon_field.offset += rowSizeBytes * 8; - lat_field.offset += rowSizeBytes * 8; + int32_t starting_lon_offset = lon_field.offset; + int32_t starting_lat_offset = lat_field.offset; + for(int row = 0; row < batch.rows; row++) + { + wkbpoint_t point = { + #ifdef __be__ + .byteOrder = 0, + #else + .byteOrder = 1, + #endif + .wkbType = 1, + .x = batch.record->getValueReal(lon_field), + .y = batch.record->getValueReal(lat_field) + }; + (void)builder.Append((uint8_t*)&point, sizeof(wkbpoint_t)); + lon_field.offset += rowSizeBytes * 8; + lat_field.offset += rowSizeBytes * 8; + } + lon_field.offset = starting_lon_offset; + lat_field.offset = starting_lat_offset; + key = recordBatch.next(&batch); } (void)builder.Finish(&column); columns.push_back(column); } - /* Write Table */ + /* Build and Write Table */ if(pimpl->parquetWriter) { - tableMut.lock(); - { - /* Build and Write Table */ - shared_ptr table = arrow::Table::Make(pimpl->schema, columns); - (void)pimpl->parquetWriter->WriteTable(*table, num_rows); - } - tableMut.unlock(); + shared_ptr table = arrow::Table::Make(pimpl->schema, columns); + (void)pimpl->parquetWriter->WriteTable(*table, num_rows); } - /* Return Success */ - return true; -} - -/*---------------------------------------------------------------------------- - * processTimeout - *----------------------------------------------------------------------------*/ -bool ParquetBuilder::processTimeout (void) -{ - return true; -} - -/*---------------------------------------------------------------------------- - * processTermination - * - * Note that RecordDispatcher will only call this once - *----------------------------------------------------------------------------*/ -bool ParquetBuilder::processTermination (void) -{ - /* Early Exit on No Writer */ - if(!pimpl->parquetWriter) return false; - - /* Close Parquet Writer */ - (void)pimpl->parquetWriter->Close(); - - /* Send File to User */ - int file_path_len = StringLib::size(parms->path); - if((file_path_len > 5) && - (parms->path[0] == 's') && - (parms->path[1] == '3') && - (parms->path[2] == ':') && - (parms->path[3] == '/') && - (parms->path[4] == '/')) - { - #ifdef __aws__ - return send2S3(&parms->path[5]); - #else - LuaEndpoint::generateExceptionStatus(RTE_ERROR, CRITICAL, outQ, NULL, "Output path specifies S3, but server not compiled with AWS support"); - #endif - } - else + /* Clear Record Batch */ + unsigned long key = recordBatch.first(&batch); + while(key != (unsigned long)INVALID_KEY) { - /* Stream Back to Client */ - return send2Client(); + delete batch.record; + inQ->dereference(batch.ref); + key = recordBatch.next(&batch); } + recordBatch.clear(); } /*---------------------------------------------------------------------------- diff --git a/packages/arrow/ParquetBuilder.h b/packages/arrow/ParquetBuilder.h index 39f16d43a..40b5e179b 100644 --- a/packages/arrow/ParquetBuilder.h +++ b/packages/arrow/ParquetBuilder.h @@ -33,8 +33,8 @@ #define __parquet_builder__ /* - * ParquetBuilder works on batches of records. It expects the `rec_type` passed - * into the constructor to be the type that defines each of the column headings, + * ParquetBuilder works on batches of records. It expects the `batch_rec_type` + * passed into the constructor to be the type that defines each of the column headings, * then it expects to receive records that are arrays (or batches) of that record * type. The field defined as an array is transparent to this class - it just * expects the record to be a single array. @@ -46,8 +46,8 @@ #include "MsgQ.h" #include "LuaObject.h" +#include "Ordering.h" #include "RecordObject.h" -#include "DispatchObject.h" #include "ArrowParms.h" #include "OsApi.h" #include "MsgQ.h" @@ -56,7 +56,7 @@ * PARQUET BUILDER DISPATCH CLASS ******************************************************************************/ -class ParquetBuilder: public DispatchObject +class ParquetBuilder: public LuaObject { public: @@ -67,7 +67,10 @@ class ParquetBuilder: public DispatchObject static const int LIST_BLOCK_SIZE = 32; static const int FILE_NAME_MAX_LEN = 128; static const int FILE_BUFFER_RSPS_SIZE = 0x1000000; // 16MB + static const int ROW_GROUP_SIZE = 0x4000000; // 64MB + static const int QUEUE_BUFFER_FACTOR = 3; + static const char* OBJECT_TYPE; static const char* LuaMetaName; static const struct luaL_Reg LuaMetaTable[]; @@ -123,16 +126,27 @@ class ParquetBuilder: public DispatchObject double y; } ALIGN_PACKED wkbpoint_t; + typedef struct { + Subscriber::msgRef_t ref; + RecordObject* record; + int rows; + } batch_t; + /*-------------------------------------------------------------------- * Data *--------------------------------------------------------------------*/ + Thread* builderPid; + bool active; + Subscriber* inQ; + const char* recType; + Ordering recordBatch; ArrowParms* parms; field_list_t fieldList; field_iterator_t* fieldIterator; - Mutex tableMut; Publisher* outQ; int rowSizeBytes; + int maxRowsInGroup; const char* fileName; // used locally to build file geo_data_t geoData; const char* indexKey; @@ -144,16 +158,16 @@ class ParquetBuilder: public DispatchObject * Methods *--------------------------------------------------------------------*/ - ParquetBuilder (lua_State* L, ArrowParms* parms, const char* outq_name, const char* rec_type, const char* id, geo_data_t geo, const char* index_key); + ParquetBuilder (lua_State* L, ArrowParms* parms, + const char* outq_name, const char* inq_name, + const char* rec_type, const char* batch_rec_type, + const char* id, geo_data_t geo, const char* index_key); ~ParquetBuilder (void); - bool processRecord (RecordObject* record, okey_t key) override; - bool processTimeout (void) override; - bool processTermination (void) override; - bool send2Client (void); + static void* builderThread (void* parm); + void processRecordBatch (int num_rows); bool send2S3 (const char* s3dst); - const char* buildGeoMetaData (void); - const char* buildServerMetaData (void); + bool send2Client (void); }; #endif /* __parquet_builder__ */ diff --git a/platforms/windows/OsApi.h b/platforms/windows/OsApi.h index d6a554dd8..e0cba70dc 100644 --- a/platforms/windows/OsApi.h +++ b/platforms/windows/OsApi.h @@ -83,6 +83,17 @@ * DEFINES ******************************************************************************/ +typedef enum { + DEBUG = 0, + INFO = 1, + WARNING = 2, + ERROR = 3, + CRITICAL = 4, + INVALID_EVENT_LEVEL = 5 +} event_level_t; + +typedef unsigned long okey_t; + #define fileno _fileno #define PATH_DELIMETER '\\' diff --git a/plugins/gedi/endpoints/gedi04ap.lua b/plugins/gedi/endpoints/gedi04ap.lua index 0710e5dd9..b54a63f10 100644 --- a/plugins/gedi/endpoints/gedi04ap.lua +++ b/plugins/gedi/endpoints/gedi04ap.lua @@ -35,20 +35,14 @@ local rsps_from_nodes = rspq local terminate_proxy_stream = false -- Handle Output Options -- -local output_dispatch = nil +local parquet_builder = nil if parms[arrow.PARMS] then local output_parms = arrow.parms(parms[arrow.PARMS]) -- Parquet Writer -- if output_parms:isparquet() then rsps_from_nodes = rspq .. "-parquet" terminate_proxy_stream = true - local except_pub = core.publish(rspq) - local parquet_builder = arrow.parquet(output_parms, rspq, "gedi04a.footprint", rqstid, "longitude", "latitude") - output_dispatch = core.dispatcher(rsps_from_nodes) - output_dispatch:attach(parquet_builder, "gedi04a") - output_dispatch:attach(except_pub, "exceptrec") -- exception records - output_dispatch:attach(except_pub, "eventrec") -- event records - output_dispatch:run() + parquet_builder = arrow.parquet(output_parms, rspq, rsps_from_nodes, "gedi04a", "gedi04a.footprint", rqstid, "longitude", "latitude") end end @@ -66,7 +60,7 @@ end -- Wait Until Dispatch Completion -- if terminate_proxy_stream then - while (userlog:numsubs() > 0) and not output_dispatch:waiton(interval * 1000) do + while (userlog:numsubs() > 0) and not parquet_builder:waiton(interval * 1000) do duration = duration + interval if timeout >= 0 and duration >= timeout then userlog:sendlog(core.ERROR, string.format("proxy dispatch <%s> timed-out after %d seconds", rspq, duration)) diff --git a/plugins/icesat2/endpoints/atl03sp.lua b/plugins/icesat2/endpoints/atl03sp.lua index e2c43915e..c158af73d 100644 --- a/plugins/icesat2/endpoints/atl03sp.lua +++ b/plugins/icesat2/endpoints/atl03sp.lua @@ -31,20 +31,14 @@ local rsps_from_nodes = rspq local terminate_proxy_stream = false -- Handle Output Options -- -local output_dispatch = nil +local parquet_builder = nil if parms[arrow.PARMS] then local output_parms = arrow.parms(parms[arrow.PARMS]) -- Parquet Writer -- if output_parms:isparquet() then rsps_from_nodes = rspq .. "-parquet" terminate_proxy_stream = true - local except_pub = core.publish(rspq) - local parquet_builder = arrow.parquet(output_parms, rspq, "flat03rec.photons", rqstid, "photon.longitude", "photon.latitude", "time") - output_dispatch = core.dispatcher(rsps_from_nodes) - output_dispatch:attach(parquet_builder, "flat03rec") - output_dispatch:attach(except_pub, "exceptrec") -- exception records - output_dispatch:attach(except_pub, "eventrec") -- event records - output_dispatch:run() + parquet_builder = arrow.parquet(output_parms, rspq, rsps_from_nodes, "flat03rec", "flat03rec.photons", rqstid, "photon.longitude", "photon.latitude", "time") end end @@ -64,7 +58,7 @@ end -- Wait Until Dispatch Completion -- if terminate_proxy_stream then - while (userlog:numsubs() > 0) and not output_dispatch:waiton(interval * 1000) do + while (userlog:numsubs() > 0) and not parquet_builder:waiton(interval * 1000) do duration = duration + interval if timeout >= 0 and duration >= timeout then userlog:sendlog(core.ERROR, string.format("proxy dispatch <%s> timed-out after %d seconds", rspq, duration)) diff --git a/plugins/icesat2/endpoints/atl06p.lua b/plugins/icesat2/endpoints/atl06p.lua index fc5b71caa..87d8536db 100644 --- a/plugins/icesat2/endpoints/atl06p.lua +++ b/plugins/icesat2/endpoints/atl06p.lua @@ -35,20 +35,14 @@ local rsps_from_nodes = rspq local terminate_proxy_stream = false -- Handle Output Options -- -local output_dispatch = nil +local parquet_builder = nil if parms[arrow.PARMS] then local output_parms = arrow.parms(parms[arrow.PARMS]) -- Parquet Writer -- if output_parms:isparquet() then rsps_from_nodes = rspq .. "-parquet" terminate_proxy_stream = true - local except_pub = core.publish(rspq) - local parquet_builder = arrow.parquet(output_parms, rspq, "atl06rec.elevation", rqstid, "lon", "lat", "time") - output_dispatch = core.dispatcher(rsps_from_nodes) - output_dispatch:attach(parquet_builder, "atl06rec") - output_dispatch:attach(except_pub, "exceptrec") -- exception records - output_dispatch:attach(except_pub, "eventrec") -- event records - output_dispatch:run() + parquet_builder = arrow.parquet(output_parms, rspq, rsps_from_nodes, "atl06rec", "atl06rec.elevation", rqstid, "lon", "lat", "time") end end @@ -66,7 +60,7 @@ end -- Wait Until Dispatch Completion -- if terminate_proxy_stream then - while (userlog:numsubs() > 0) and not output_dispatch:waiton(interval * 1000) do + while (userlog:numsubs() > 0) and not parquet_builder:waiton(interval * 1000) do duration = duration + interval if timeout >= 0 and duration >= timeout then userlog:sendlog(core.ERROR, string.format("proxy dispatch <%s> timed-out after %d seconds", rspq, duration)) diff --git a/plugins/icesat2/endpoints/atl08p.lua b/plugins/icesat2/endpoints/atl08p.lua index 7cda1cb40..d83c8378c 100644 --- a/plugins/icesat2/endpoints/atl08p.lua +++ b/plugins/icesat2/endpoints/atl08p.lua @@ -35,20 +35,14 @@ local rsps_from_nodes = rspq local terminate_proxy_stream = false -- Handle Output Options -- -local output_dispatch = nil +local parquet_builder = nil if parms[arrow.PARMS] then local output_parms = arrow.parms(parms[arrow.PARMS]) -- Parquet Writer -- if output_parms:isparquet() then rsps_from_nodes = rspq .. "-parquet" terminate_proxy_stream = true - local except_pub = core.publish(rspq) - local parquet_builder = arrow.parquet(output_parms, rspq, "atl08rec.vegetation", rqstid, "lon", "lat", "time") - output_dispatch = core.dispatcher(rsps_from_nodes) - output_dispatch:attach(parquet_builder, "atl08rec") - output_dispatch:attach(except_pub, "exceptrec") -- exception records - output_dispatch:attach(except_pub, "eventrec") -- event records - output_dispatch:run() + parquet_builder = arrow.parquet(output_parms, rspq, rsps_from_nodes, "atl08rec", "atl08rec.vegetation", rqstid, "lon", "lat", "time") end end @@ -66,7 +60,7 @@ end -- Wait Until Dispatch Completion -- if terminate_proxy_stream then - while (userlog:numsubs() > 0) and not output_dispatch:waiton(interval * 1000) do + while (userlog:numsubs() > 0) and not parquet_builder:waiton(interval * 1000) do duration = duration + interval if timeout >= 0 and duration >= timeout then userlog:sendlog(core.ERROR, string.format("proxy dispatch <%s> timed-out after %d seconds", rspq, duration))