From 5343f2ed18306996ccd4fcb8b0cc9e48629b2b74 Mon Sep 17 00:00:00 2001 From: Takahiro Yamashita Date: Sat, 27 Nov 2021 21:51:25 +0900 Subject: [PATCH] ml: support FLB_ML_TYPE_MAP to parse(#4034) Signed-off-by: Takahiro Yamashita --- src/multiline/flb_ml.c | 146 ++++++++++++++++++++++++++++++++--------- 1 file changed, 115 insertions(+), 31 deletions(-) diff --git a/src/multiline/flb_ml.c b/src/multiline/flb_ml.c index b9b0f71392b..47570b0f271 100644 --- a/src/multiline/flb_ml.c +++ b/src/multiline/flb_ml.c @@ -438,21 +438,14 @@ static int process_append(struct flb_ml_parser_ins *parser_i, unpacked = FLB_TRUE; } else if (type == FLB_ML_TYPE_MAP) { - full_map = obj; - if (!full_map) { - off = 0; - msgpack_unpacked_init(&result); - ret = msgpack_unpack_next(&result, buf, size, &off); - if (ret != MSGPACK_UNPACK_SUCCESS) { - return -1; - } - full_map = &result.data; - unpacked = FLB_TRUE; - } - else if (full_map->type != MSGPACK_OBJECT_MAP) { - msgpack_unpacked_destroy(&result); + off = 0; + msgpack_unpacked_init(&result); + ret = msgpack_unpack_next(&result, buf, size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { return -1; } + full_map = &result.data; + unpacked = FLB_TRUE; } /* Lookup for key_content entry */ @@ -504,6 +497,92 @@ static int process_append(struct flb_ml_parser_ins *parser_i, return 0; } +static int ml_append_try_parser_type_text(struct flb_ml_parser_ins *parser, + uint64_t stream_id, + int *type, + struct flb_time *tm, void *buf, size_t size, + msgpack_object *map, + void **out_buf, size_t *out_size, int *out_release, + struct flb_time *out_time) +{ + int ret; + + if (parser->ml_parser->parser) { + /* Parse incoming content */ + ret = flb_parser_do(parser->ml_parser->parser, (char *) buf, size, + out_buf, out_size, out_time); + if (flb_time_to_double(out_time) == 0.0) { + flb_time_copy(out_time, tm); + } + if (ret >= 0) { + *out_release = FLB_TRUE; + *type = FLB_ML_TYPE_MAP; + } + else { + *out_buf = buf; + *out_size = size; + return -1; + } + } + else { + *out_buf = buf; + *out_size = size; + } + return 0; +} + +static int ml_append_try_parser_type_map(struct flb_ml_parser_ins *parser, + uint64_t stream_id, + int *type, + struct flb_time *tm, void *buf, size_t size, + msgpack_object *map, + void **out_buf, size_t *out_size, int *out_release, + struct flb_time *out_time) +{ + int map_size; + int i; + int len; + msgpack_object key; + msgpack_object val; + + if (map == NULL || map->type != MSGPACK_OBJECT_MAP) { + flb_error("%s:invalid map", __FUNCTION__); + return -1; + } + + if (parser->ml_parser->parser) { + /* lookup key_content */ + + len = flb_sds_len(parser->key_content); + map_size = map->via.map.size; + for(i = 0; i < map_size; i++) { + key = map->via.map.ptr[i].key; + val = map->via.map.ptr[i].val; + if (key.type == MSGPACK_OBJECT_STR && + parser->key_content && + key.via.str.size == len && + strncmp(key.via.str.ptr, parser->key_content, len) == 0) { + /* key_content found */ + if (val.type == MSGPACK_OBJECT_STR) { + /* try parse the value of key_content e*/ + return ml_append_try_parser_type_text(parser, stream_id, type, + tm, (void*) val.via.str.ptr, + val.via.str.size, + map, + out_buf, out_size, out_release, + out_time); + } else { + return -1; + } + } + } + } + else { + *out_buf = buf; + *out_size = size; + } + return 0; +} static int ml_append_try_parser(struct flb_ml_parser_ins *parser, uint64_t stream_id, @@ -520,27 +599,32 @@ static int ml_append_try_parser(struct flb_ml_parser_ins *parser, flb_time_zero(&out_time); - if (parser->ml_parser->parser && type == FLB_ML_TYPE_TEXT) { - /* Parse incoming content */ - ret = flb_parser_do(parser->ml_parser->parser, (char *) buf, size, - &out_buf, &out_size, &out_time); - if (flb_time_to_double(&out_time) == 0.0) { - flb_time_copy(&out_time, tm); - } - - if (ret >= 0) { - release = FLB_TRUE; - type = FLB_ML_TYPE_MAP; + switch (type) { + case FLB_ML_TYPE_TEXT: + ret = ml_append_try_parser_type_text(parser, stream_id, &type, + tm, buf, size, map, + &out_buf, &out_size, &release, + &out_time); + if (ret < 0) { + return -1; } - else { - out_buf = buf; - out_size = size; + break; + case FLB_ML_TYPE_MAP: + ret = ml_append_try_parser_type_map(parser, stream_id, &type, + tm, buf, size, map, + &out_buf, &out_size, &release, + &out_time); + if (ret < 0) { return -1; } - } - else if (type == FLB_ML_TYPE_TEXT) { - out_buf = buf; - out_size = size; + break; + case FLB_ML_TYPE_RECORD: + /* TODO */ + break; + + default: + flb_error("%s: unknown type=%d", type); + return -1; } if (flb_time_to_double(&out_time) == 0.0) {