Skip to content

Commit

Permalink
parquet builder buffers records before writing out to table
Browse files Browse the repository at this point in the history
  • Loading branch information
jpswinski committed Feb 23, 2023
1 parent 8fc2ca5 commit 15feadd
Show file tree
Hide file tree
Showing 7 changed files with 364 additions and 184 deletions.
451 changes: 315 additions & 136 deletions packages/arrow/ParquetBuilder.cpp

Large diffs are not rendered by default.

38 changes: 26 additions & 12 deletions packages/arrow/ParquetBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
#define __parquet_builder__

/*
* ParquetBuilder works on batches of records. It expects the `rec_type` passed
* into the constructor to be the type that defines each of the column headings,
* ParquetBuilder works on batches of records. It expects the `batch_rec_type`
* passed into the constructor to be the type that defines each of the column headings,
* then it expects to receive records that are arrays (or batches) of that record
* type. The field defined as an array is transparent to this class - it just
* expects the record to be a single array.
Expand All @@ -46,8 +46,8 @@

#include "MsgQ.h"
#include "LuaObject.h"
#include "Ordering.h"
#include "RecordObject.h"
#include "DispatchObject.h"
#include "ArrowParms.h"
#include "OsApi.h"
#include "MsgQ.h"
Expand All @@ -56,7 +56,7 @@
* PARQUET BUILDER DISPATCH CLASS
******************************************************************************/

class ParquetBuilder: public DispatchObject
class ParquetBuilder: public LuaObject
{
public:

Expand All @@ -67,7 +67,10 @@ class ParquetBuilder: public DispatchObject
static const int LIST_BLOCK_SIZE = 32;
static const int FILE_NAME_MAX_LEN = 128;
static const int FILE_BUFFER_RSPS_SIZE = 0x1000000; // 16MB
static const int ROW_GROUP_SIZE = 0x4000000; // 64MB
static const int QUEUE_BUFFER_FACTOR = 3;

static const char* OBJECT_TYPE;
static const char* LuaMetaName;
static const struct luaL_Reg LuaMetaTable[];

Expand Down Expand Up @@ -123,16 +126,27 @@ class ParquetBuilder: public DispatchObject
double y;
} ALIGN_PACKED wkbpoint_t;

typedef struct {
Subscriber::msgRef_t ref;
RecordObject* record;
int rows;
} batch_t;

/*--------------------------------------------------------------------
* Data
*--------------------------------------------------------------------*/

Thread* builderPid;
bool active;
Subscriber* inQ;
const char* recType;
Ordering<batch_t> recordBatch;
ArrowParms* parms;
field_list_t fieldList;
field_iterator_t* fieldIterator;
Mutex tableMut;
Publisher* outQ;
int rowSizeBytes;
int maxRowsInGroup;
const char* fileName; // used locally to build file
geo_data_t geoData;
const char* indexKey;
Expand All @@ -144,16 +158,16 @@ class ParquetBuilder: public DispatchObject
* Methods
*--------------------------------------------------------------------*/

ParquetBuilder (lua_State* L, ArrowParms* parms, const char* outq_name, const char* rec_type, const char* id, geo_data_t geo, const char* index_key);
ParquetBuilder (lua_State* L, ArrowParms* parms,
const char* outq_name, const char* inq_name,
const char* rec_type, const char* batch_rec_type,
const char* id, geo_data_t geo, const char* index_key);
~ParquetBuilder (void);

bool processRecord (RecordObject* record, okey_t key) override;
bool processTimeout (void) override;
bool processTermination (void) override;
bool send2Client (void);
static void* builderThread (void* parm);
void processRecordBatch (int num_rows);
bool send2S3 (const char* s3dst);
const char* buildGeoMetaData (void);
const char* buildServerMetaData (void);
bool send2Client (void);
};

#endif /* __parquet_builder__ */
11 changes: 11 additions & 0 deletions platforms/windows/OsApi.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@
* DEFINES
******************************************************************************/

typedef enum {
DEBUG = 0,
INFO = 1,
WARNING = 2,
ERROR = 3,
CRITICAL = 4,
INVALID_EVENT_LEVEL = 5
} event_level_t;

typedef unsigned long okey_t;

#define fileno _fileno

#define PATH_DELIMETER '\\'
Expand Down
12 changes: 3 additions & 9 deletions plugins/gedi/endpoints/gedi04ap.lua
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,14 @@ local rsps_from_nodes = rspq
local terminate_proxy_stream = false

-- Handle Output Options --
local output_dispatch = nil
local parquet_builder = nil
if parms[arrow.PARMS] then
local output_parms = arrow.parms(parms[arrow.PARMS])
-- Parquet Writer --
if output_parms:isparquet() then
rsps_from_nodes = rspq .. "-parquet"
terminate_proxy_stream = true
local except_pub = core.publish(rspq)
local parquet_builder = arrow.parquet(output_parms, rspq, "gedi04a.footprint", rqstid, "longitude", "latitude")
output_dispatch = core.dispatcher(rsps_from_nodes)
output_dispatch:attach(parquet_builder, "gedi04a")
output_dispatch:attach(except_pub, "exceptrec") -- exception records
output_dispatch:attach(except_pub, "eventrec") -- event records
output_dispatch:run()
parquet_builder = arrow.parquet(output_parms, rspq, rsps_from_nodes, "gedi04a", "gedi04a.footprint", rqstid, "longitude", "latitude")
end
end

Expand All @@ -66,7 +60,7 @@ end

-- Wait Until Dispatch Completion --
if terminate_proxy_stream then
while (userlog:numsubs() > 0) and not output_dispatch:waiton(interval * 1000) do
while (userlog:numsubs() > 0) and not parquet_builder:waiton(interval * 1000) do
duration = duration + interval
if timeout >= 0 and duration >= timeout then
userlog:sendlog(core.ERROR, string.format("proxy dispatch <%s> timed-out after %d seconds", rspq, duration))
Expand Down
12 changes: 3 additions & 9 deletions plugins/icesat2/endpoints/atl03sp.lua
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,14 @@ local rsps_from_nodes = rspq
local terminate_proxy_stream = false

-- Handle Output Options --
local output_dispatch = nil
local parquet_builder = nil
if parms[arrow.PARMS] then
local output_parms = arrow.parms(parms[arrow.PARMS])
-- Parquet Writer --
if output_parms:isparquet() then
rsps_from_nodes = rspq .. "-parquet"
terminate_proxy_stream = true
local except_pub = core.publish(rspq)
local parquet_builder = arrow.parquet(output_parms, rspq, "flat03rec.photons", rqstid, "photon.longitude", "photon.latitude", "time")
output_dispatch = core.dispatcher(rsps_from_nodes)
output_dispatch:attach(parquet_builder, "flat03rec")
output_dispatch:attach(except_pub, "exceptrec") -- exception records
output_dispatch:attach(except_pub, "eventrec") -- event records
output_dispatch:run()
parquet_builder = arrow.parquet(output_parms, rspq, rsps_from_nodes, "flat03rec", "flat03rec.photons", rqstid, "photon.longitude", "photon.latitude", "time")
end
end

Expand All @@ -64,7 +58,7 @@ end

-- Wait Until Dispatch Completion --
if terminate_proxy_stream then
while (userlog:numsubs() > 0) and not output_dispatch:waiton(interval * 1000) do
while (userlog:numsubs() > 0) and not parquet_builder:waiton(interval * 1000) do
duration = duration + interval
if timeout >= 0 and duration >= timeout then
userlog:sendlog(core.ERROR, string.format("proxy dispatch <%s> timed-out after %d seconds", rspq, duration))
Expand Down
12 changes: 3 additions & 9 deletions plugins/icesat2/endpoints/atl06p.lua
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,14 @@ local rsps_from_nodes = rspq
local terminate_proxy_stream = false

-- Handle Output Options --
local output_dispatch = nil
local parquet_builder = nil
if parms[arrow.PARMS] then
local output_parms = arrow.parms(parms[arrow.PARMS])
-- Parquet Writer --
if output_parms:isparquet() then
rsps_from_nodes = rspq .. "-parquet"
terminate_proxy_stream = true
local except_pub = core.publish(rspq)
local parquet_builder = arrow.parquet(output_parms, rspq, "atl06rec.elevation", rqstid, "lon", "lat", "time")
output_dispatch = core.dispatcher(rsps_from_nodes)
output_dispatch:attach(parquet_builder, "atl06rec")
output_dispatch:attach(except_pub, "exceptrec") -- exception records
output_dispatch:attach(except_pub, "eventrec") -- event records
output_dispatch:run()
parquet_builder = arrow.parquet(output_parms, rspq, rsps_from_nodes, "atl06rec", "atl06rec.elevation", rqstid, "lon", "lat", "time")
end
end

Expand All @@ -66,7 +60,7 @@ end

-- Wait Until Dispatch Completion --
if terminate_proxy_stream then
while (userlog:numsubs() > 0) and not output_dispatch:waiton(interval * 1000) do
while (userlog:numsubs() > 0) and not parquet_builder:waiton(interval * 1000) do
duration = duration + interval
if timeout >= 0 and duration >= timeout then
userlog:sendlog(core.ERROR, string.format("proxy dispatch <%s> timed-out after %d seconds", rspq, duration))
Expand Down
12 changes: 3 additions & 9 deletions plugins/icesat2/endpoints/atl08p.lua
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,14 @@ local rsps_from_nodes = rspq
local terminate_proxy_stream = false

-- Handle Output Options --
local output_dispatch = nil
local parquet_builder = nil
if parms[arrow.PARMS] then
local output_parms = arrow.parms(parms[arrow.PARMS])
-- Parquet Writer --
if output_parms:isparquet() then
rsps_from_nodes = rspq .. "-parquet"
terminate_proxy_stream = true
local except_pub = core.publish(rspq)
local parquet_builder = arrow.parquet(output_parms, rspq, "atl08rec.vegetation", rqstid, "lon", "lat", "time")
output_dispatch = core.dispatcher(rsps_from_nodes)
output_dispatch:attach(parquet_builder, "atl08rec")
output_dispatch:attach(except_pub, "exceptrec") -- exception records
output_dispatch:attach(except_pub, "eventrec") -- event records
output_dispatch:run()
parquet_builder = arrow.parquet(output_parms, rspq, rsps_from_nodes, "atl08rec", "atl08rec.vegetation", rqstid, "lon", "lat", "time")
end
end

Expand All @@ -66,7 +60,7 @@ end

-- Wait Until Dispatch Completion --
if terminate_proxy_stream then
while (userlog:numsubs() > 0) and not output_dispatch:waiton(interval * 1000) do
while (userlog:numsubs() > 0) and not parquet_builder:waiton(interval * 1000) do
duration = duration + interval
if timeout >= 0 and duration >= timeout then
userlog:sendlog(core.ERROR, string.format("proxy dispatch <%s> timed-out after %d seconds", rspq, duration))
Expand Down

0 comments on commit 15feadd

Please sign in to comment.