Skip to content

Commit

Permalink
ml: support FLB_ML_TYPE_MAP to parse(fluent#4034)
Browse files Browse the repository at this point in the history
Signed-off-by: Takahiro Yamashita <[email protected]>
  • Loading branch information
nokute78 committed Nov 27, 2021
1 parent 32d98bd commit 5343f2e
Showing 1 changed file with 115 additions and 31 deletions.
146 changes: 115 additions & 31 deletions src/multiline/flb_ml.c
Original file line number Diff line number Diff line change
Expand Up @@ -438,21 +438,14 @@ static int process_append(struct flb_ml_parser_ins *parser_i,
unpacked = FLB_TRUE;
}
else if (type == FLB_ML_TYPE_MAP) {
full_map = obj;
if (!full_map) {
off = 0;
msgpack_unpacked_init(&result);
ret = msgpack_unpack_next(&result, buf, size, &off);
if (ret != MSGPACK_UNPACK_SUCCESS) {
return -1;
}
full_map = &result.data;
unpacked = FLB_TRUE;
}
else if (full_map->type != MSGPACK_OBJECT_MAP) {
msgpack_unpacked_destroy(&result);
off = 0;
msgpack_unpacked_init(&result);
ret = msgpack_unpack_next(&result, buf, size, &off);
if (ret != MSGPACK_UNPACK_SUCCESS) {
return -1;
}
full_map = &result.data;
unpacked = FLB_TRUE;
}

/* Lookup for key_content entry */
Expand Down Expand Up @@ -504,6 +497,92 @@ static int process_append(struct flb_ml_parser_ins *parser_i,
return 0;
}

static int ml_append_try_parser_type_text(struct flb_ml_parser_ins *parser,
uint64_t stream_id,
int *type,
struct flb_time *tm, void *buf, size_t size,
msgpack_object *map,
void **out_buf, size_t *out_size, int *out_release,
struct flb_time *out_time)
{
int ret;

if (parser->ml_parser->parser) {
/* Parse incoming content */
ret = flb_parser_do(parser->ml_parser->parser, (char *) buf, size,
out_buf, out_size, out_time);
if (flb_time_to_double(out_time) == 0.0) {
flb_time_copy(out_time, tm);
}
if (ret >= 0) {
*out_release = FLB_TRUE;
*type = FLB_ML_TYPE_MAP;
}
else {
*out_buf = buf;
*out_size = size;
return -1;
}
}
else {
*out_buf = buf;
*out_size = size;
}
return 0;
}

static int ml_append_try_parser_type_map(struct flb_ml_parser_ins *parser,
uint64_t stream_id,
int *type,
struct flb_time *tm, void *buf, size_t size,
msgpack_object *map,
void **out_buf, size_t *out_size, int *out_release,
struct flb_time *out_time)
{
int map_size;
int i;
int len;
msgpack_object key;
msgpack_object val;

if (map == NULL || map->type != MSGPACK_OBJECT_MAP) {
flb_error("%s:invalid map", __FUNCTION__);
return -1;
}

if (parser->ml_parser->parser) {
/* lookup key_content */

len = flb_sds_len(parser->key_content);
map_size = map->via.map.size;
for(i = 0; i < map_size; i++) {
key = map->via.map.ptr[i].key;
val = map->via.map.ptr[i].val;
if (key.type == MSGPACK_OBJECT_STR &&
parser->key_content &&
key.via.str.size == len &&
strncmp(key.via.str.ptr, parser->key_content, len) == 0) {
/* key_content found */
if (val.type == MSGPACK_OBJECT_STR) {
/* try parse the value of key_content e*/
return ml_append_try_parser_type_text(parser, stream_id, type,
tm, (void*) val.via.str.ptr,
val.via.str.size,
map,
out_buf, out_size, out_release,
out_time);
} else {
return -1;
}
}
}
}
else {
*out_buf = buf;
*out_size = size;
}
return 0;
}

static int ml_append_try_parser(struct flb_ml_parser_ins *parser,
uint64_t stream_id,
Expand All @@ -520,27 +599,32 @@ static int ml_append_try_parser(struct flb_ml_parser_ins *parser,

flb_time_zero(&out_time);

if (parser->ml_parser->parser && type == FLB_ML_TYPE_TEXT) {
/* Parse incoming content */
ret = flb_parser_do(parser->ml_parser->parser, (char *) buf, size,
&out_buf, &out_size, &out_time);
if (flb_time_to_double(&out_time) == 0.0) {
flb_time_copy(&out_time, tm);
}

if (ret >= 0) {
release = FLB_TRUE;
type = FLB_ML_TYPE_MAP;
switch (type) {
case FLB_ML_TYPE_TEXT:
ret = ml_append_try_parser_type_text(parser, stream_id, &type,
tm, buf, size, map,
&out_buf, &out_size, &release,
&out_time);
if (ret < 0) {
return -1;
}
else {
out_buf = buf;
out_size = size;
break;
case FLB_ML_TYPE_MAP:
ret = ml_append_try_parser_type_map(parser, stream_id, &type,
tm, buf, size, map,
&out_buf, &out_size, &release,
&out_time);
if (ret < 0) {
return -1;
}
}
else if (type == FLB_ML_TYPE_TEXT) {
out_buf = buf;
out_size = size;
break;
case FLB_ML_TYPE_RECORD:
/* TODO */
break;

default:
flb_error("%s: unknown type=%d", type);
return -1;
}

if (flb_time_to_double(&out_time) == 0.0) {
Expand Down

0 comments on commit 5343f2e

Please sign in to comment.