Skip to content

Commit

Permalink
[Manual Backport 2.x] update vpc flow with flint-s3 based DDL assets …
Browse files Browse the repository at this point in the history
…and dashboard (#1721)

* update vpc flow with flint-s3 based DDL assets and dashboard (#1691)

* update vpc flow with flint-s3 based DDL assets and dashboard

Signed-off-by: YANGDB <[email protected]>

* update MV to use auto sync refresh

Signed-off-by: YANGDB <[email protected]>

* Update server/adaptors/integrations/__data__/repository/aws_vpc_flow/assets/vpc_agg_30min_connections_mv-1.0.0.sql

Co-authored-by: Simeon Widdis <[email protected]>
Signed-off-by: YANGDB <[email protected]>

* Update server/adaptors/integrations/__data__/repository/aws_vpc_flow/assets/vpc_window-agg_60min_dest_ip_total-bytes_mv-1.0.0.sql

Co-authored-by: Simeon Widdis <[email protected]>
Signed-off-by: YANGDB <[email protected]>

* Update server/adaptors/integrations/__data__/repository/aws_vpc_flow/assets/vpc_window-agg_60min_dest_ip_cardinality_mv-1.0.0.sql

Co-authored-by: Simeon Widdis <[email protected]>
Signed-off-by: YANGDB <[email protected]>

* Update server/adaptors/integrations/__data__/repository/aws_vpc_flow/assets/vpc_live_week_mv-1.0.0.sql

Co-authored-by: Simeon Widdis <[email protected]>
Signed-off-by: YANGDB <[email protected]>

* Update server/adaptors/integrations/__data__/repository/aws_vpc_flow/assets/vpc_agg_60min_connections_mv-1.0.0.sql

Co-authored-by: Simeon Widdis <[email protected]>
Signed-off-by: YANGDB <[email protected]>

* remove comments from MV queries

Signed-off-by: YANGDB <[email protected]>

* update to remove HIVE as not supported yet in current EMR version

Signed-off-by: YANGDB <[email protected]>

* update refresh sync rate

Signed-off-by: YANGDB <[email protected]>

* update watermark_delay

Signed-off-by: YANGDB <[email protected]>

* add live only workflow & dashboard
add pre-agg + live workflow & dashboard
add refresh-workflow for the pre-agg queries

Signed-off-by: YANGDB <[email protected]>

* add live only workflow & dashboard
add pre-agg + live workflow & dashboard
add refresh-workflow for the pre-agg queries

Signed-off-by: YANGDB <[email protected]>

* add live all only workflow & dashboard

Signed-off-by: YANGDB <[email protected]>

* correct table name hard coded issue

Signed-off-by: YANGDB <[email protected]>

* add vpc table creation based on CSV format

Signed-off-by: YANGDB <[email protected]>

---------

Signed-off-by: YANGDB <[email protected]>
Co-authored-by: Simeon Widdis <[email protected]>

* Change the vpc flow's refresh interval to 15mins and watermark delay to 1 min

Signed-off-by: Ryan Liang <[email protected]>

---------

Signed-off-by: YANGDB <[email protected]>
Signed-off-by: Ryan Liang <[email protected]>
Co-authored-by: YANGDB <[email protected]>
Co-authored-by: Simeon Widdis <[email protected]>
  • Loading branch information
3 people authored Apr 18, 2024
1 parent f1e2ce1 commit e97df5f
Show file tree
Hide file tree
Showing 18 changed files with 1,108 additions and 35 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,30 +0,0 @@
CREATE MATERIALIZED VIEW {table_name}__mview AS
SELECT
CAST(FROM_UNIXTIME(start) AS TIMESTAMP) as `@timestamp`,
version as `aws.vpc.version`,
account_id as `aws.vpc.account-id`,
interface_id as `aws.vpc.interface-id`,
srcaddr as `aws.vpc.srcaddr`,
dstaddr as `aws.vpc.dstaddr`,
CAST(srcport AS LONG) as `aws.vpc.srcport`,
CAST(dstport AS LONG) as `aws.vpc.dstport`,
protocol as `aws.vpc.protocol`,
CAST(packets AS LONG) as `aws.vpc.packets`,
CAST(bytes AS LONG) as `aws.vpc.bytes`,
CAST(FROM_UNIXTIME(start) AS TIMESTAMP) as `aws.vpc.start`,
CAST(FROM_UNIXTIME(end) AS TIMESTAMP) as `aws.vpc.end`,
action as `aws.vpc.action`,
log_status as `aws.vpc.log-status`,
CASE
WHEN regexp(dstaddr, '(10\\..*)|(192\\.168\\..*)|(172\\.1[6-9]\\..*)|(172\\.2[0-9]\\..*)|(172\\.3[0-1]\\.*)')
THEN 'ingress'
ELSE 'egress'
END AS `aws.vpc.flow-direction`
FROM
{table_name}
WITH (
auto_refresh = 'true',
checkpoint_location = '{s3_checkpoint_location}',
watermark_delay = '1 Minute',
extra_options = '{ "{table_name}": { "maxFilesPerTrigger": "10" }}'
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
CREATE EXTERNAL TABLE IF NOT EXISTS {table_name} (
cloud STRUCT<
account_uid: STRING,
region: STRING,
zone: STRING,
provider: STRING
>,
src_endpoint STRUCT<
port: INT,
svc_name: STRING,
ip: STRING,
intermediate_ips: ARRAY<STRING>,
interface_uid: STRING,
vpc_uid: STRING,
instance_uid: STRING,
subnet_uid: STRING
>,
dst_endpoint STRUCT<
port: INT,
svc_name: STRING,
ip: STRING,
intermediate_ips: ARRAY<STRING>,
interface_uid: STRING,
vpc_uid: STRING,
instance_uid: STRING,
subnet_uid: STRING
>,
connection_info STRUCT<
protocol_num: INT,
tcp_flags: INT,
protocol_ver: STRING,
boundary_id: INT,
boundary: STRING,
direction_id: INT,
direction: STRING
>,
traffic STRUCT<
packets: BIGINT,
bytes: BIGINT
>,
time BIGINT,
start_time BIGINT,
end_time BIGINT,
status_code STRING,
severity_id INT,
severity STRING,
class_name STRING,
class_uid INT,
category_name STRING,
category_uid INT,
activity_name STRING,
activity_id INT,
disposition STRING,
disposition_id INT,
type_uid INT,
type_name STRING,
region STRING,
accountid STRING,
eventday STRING
)
USING json
LOCATION '{s3_bucket_location}'
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
CREATE MATERIALIZED VIEW IF NOT EXISTS {table_name}__agg_30_min_connections_mview AS
SELECT
CAST(from_unixtime(CAST((start_time / 1000) AS BIGINT) DIV 1800 * 1800) AS TIMESTAMP) AS interval_start_time,
CAST(from_unixtime((CAST((start_time / 1000) AS BIGINT) DIV 1800 * 1800) + 1799) AS TIMESTAMP) AS interval_end_time,

status_code as `aws.vpc.status_code`,
CAST(IFNULL(connection_info['direction'], 'Unknown') AS STRING) AS `aws.vpc.connection.direction`,
CAST(IFNULL(src_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-src-aws-service`,
CAST(IFNULL(dst_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-dst-aws-service`,

accountid as `aws.vpc.account-id`,
region as `aws.vpc.region`,

COUNT(*) AS total_connections,
SUM(CAST(IFNULL(traffic.bytes, 0) AS LONG)) AS total_bytes,
SUM(CAST(IFNULL(traffic.packets, 0) AS LONG)) AS total_packets
FROM
{table_name}
GROUP BY
CAST((start_time / 1000) AS BIGINT) DIV 1800 * 1800,
region,
accountid,
status_code,
src_endpoint.svc_name,
dst_endpoint.svc_name,
connection_info['direction']
ORDER BY
interval_start_time
WITH (
auto_refresh = false
)

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
REFRESH MATERIALIZED VIEW {table_name}__agg_30_min_connections_mview
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
CREATE MATERIALIZED VIEW IF NOT EXISTS {table_name}__agg_60_min_connections_mview AS
SELECT
date_trunc('hour', from_unixtime(start_time / 1000)) AS interval_start_time,
date_trunc('hour', from_unixtime(start_time / 1000)) + INTERVAL 1 HOUR AS interval_end_time,

status_code as `aws.vpc.status_code`,
CAST(IFNULL(connection_info['direction'], 'Unknown') AS STRING) AS `aws.vpc.connection.direction`,
CAST(IFNULL(src_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-src-aws-service`,
CAST(IFNULL(dst_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-dst-aws-service`,

accountid as `aws.vpc.account-id`,
region as `aws.vpc.region`,

COUNT(*) AS total_connections,
SUM(CAST(IFNULL(traffic.bytes, 0) AS LONG)) AS total_bytes,
SUM(CAST(IFNULL(traffic.packets, 0) AS LONG)) AS total_packets
FROM
{table_name}
GROUP BY
date_trunc('hour', from_unixtime(start_time / 1000)),
region,
accountid,
status_code,
src_endpoint.svc_name,
dst_endpoint.svc_name,
connection_info['direction']
ORDER BY
interval_start_time
WITH (
auto_refresh = false
)

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
REFRESH MATERIALIZED VIEW {table_name}__agg_60_min_connections_mview
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
CREATE MATERIALIZED VIEW {table_name}__week_live_mview AS
SELECT
cloud.account_uid AS `aws.vpc.cloud_account_uid`,
cloud.region AS `aws.vpc.cloud_region`,
cloud.zone AS `aws.vpc.cloud_zone`,
cloud.provider AS `aws.vpc.cloud_provider`,

CAST(IFNULL(src_endpoint.port, 0) AS LONG) AS `aws.vpc.srcport`,
CAST(IFNULL(src_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-src-aws-service`,
CAST(IFNULL(src_endpoint.ip, '0.0.0.0') AS STRING) AS `aws.vpc.srcaddr`,
CAST(IFNULL(src_endpoint.interface_uid, 'Unknown') AS STRING) AS `aws.vpc.src-interface_uid`,
CAST(IFNULL(src_endpoint.vpc_uid, 'Unknown') AS STRING) AS `aws.vpc.src-vpc_uid`,
CAST(IFNULL(src_endpoint.instance_uid, 'Unknown') AS STRING) AS `aws.vpc.src-instance_uid`,
CAST(IFNULL(src_endpoint.subnet_uid, 'Unknown') AS STRING) AS `aws.vpc.src-subnet_uid`,

CAST(IFNULL(dst_endpoint.port, 0) AS LONG) AS `aws.vpc.dstport`,
CAST(IFNULL(dst_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-dst-aws-service`,
CAST(IFNULL(dst_endpoint.ip, '0.0.0.0') AS STRING) AS `aws.vpc.dstaddr`,
CAST(IFNULL(dst_endpoint.interface_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-interface_uid`,
CAST(IFNULL(dst_endpoint.vpc_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-vpc_uid`,
CAST(IFNULL(dst_endpoint.instance_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-instance_uid`,
CAST(IFNULL(dst_endpoint.subnet_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-subnet_uid`,
CASE
WHEN regexp(dst_endpoint.ip, '(10\\..*)|(192\\.168\\..*)|(172\\.1[6-9]\\..*)|(172\\.2[0-9]\\..*)|(172\\.3[0-1]\\.*)')
THEN 'ingress'
ELSE 'egress'
END AS `aws.vpc.flow-direction`,

CAST(IFNULL(connection_info['protocol_num'], 0) AS INT) AS `aws.vpc.connection.protocol_num`,
CAST(IFNULL(connection_info['tcp_flags'], '0') AS STRING) AS `aws.vpc.connection.tcp_flags`,
CAST(IFNULL(connection_info['protocol_ver'], '0') AS STRING) AS `aws.vpc.connection.protocol_ver`,
CAST(IFNULL(connection_info['boundary'], 'Unknown') AS STRING) AS `aws.vpc.connection.boundary`,
CAST(IFNULL(connection_info['direction'], 'Unknown') AS STRING) AS `aws.vpc.connection.direction`,

CAST(IFNULL(traffic.packets, 0) AS LONG) AS `aws.vpc.packets`,
CAST(IFNULL(traffic.bytes, 0) AS LONG) AS `aws.vpc.bytes`,

CAST(FROM_UNIXTIME(time / 1000) AS TIMESTAMP) AS `@timestamp`,
CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP) AS `start_time`,
CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP) AS `interval_start_time`,
CAST(FROM_UNIXTIME(end_time / 1000) AS TIMESTAMP) AS `end_time`,
status_code AS `aws.vpc.status_code`,

severity AS `aws.vpc.severity`,
class_name AS `aws.vpc.class_name`,
category_name AS `aws.vpc.category_name`,
activity_name AS `aws.vpc.activity_name`,
disposition AS `aws.vpc.disposition`,
type_name AS `aws.vpc.type_name`,

region AS `aws.vpc.region`,
accountid AS `aws.vpc.account-id`
FROM
{table_name}
WITH (
auto_refresh = true,
refresh_interval = '15 Minute',
checkpoint_location = '{s3_checkpoint_location}',
watermark_delay = '1 Minute',
extra_options = '{ "{table_name}": { "maxFilesPerTrigger": "10" }}'
)

Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
CREATE MATERIALIZED VIEW {table_name}__week_live_mview AS
SELECT
cloud.account_uid AS `aws.vpc.cloud_account_uid`,
cloud.region AS `aws.vpc.cloud_region`,
cloud.zone AS `aws.vpc.cloud_zone`,
cloud.provider AS `aws.vpc.cloud_provider`,

CAST(IFNULL(src_endpoint.port, 0) AS LONG) AS `aws.vpc.srcport`,
CAST(IFNULL(src_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-src-aws-service`,
CAST(IFNULL(src_endpoint.ip, '0.0.0.0') AS STRING) AS `aws.vpc.srcaddr`,
CAST(IFNULL(src_endpoint.interface_uid, 'Unknown') AS STRING) AS `aws.vpc.src-interface_uid`,
CAST(IFNULL(src_endpoint.vpc_uid, 'Unknown') AS STRING) AS `aws.vpc.src-vpc_uid`,
CAST(IFNULL(src_endpoint.instance_uid, 'Unknown') AS STRING) AS `aws.vpc.src-instance_uid`,
CAST(IFNULL(src_endpoint.subnet_uid, 'Unknown') AS STRING) AS `aws.vpc.src-subnet_uid`,

CAST(IFNULL(dst_endpoint.port, 0) AS LONG) AS `aws.vpc.dstport`,
CAST(IFNULL(dst_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-dst-aws-service`,
CAST(IFNULL(dst_endpoint.ip, '0.0.0.0') AS STRING) AS `aws.vpc.dstaddr`,
CAST(IFNULL(dst_endpoint.interface_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-interface_uid`,
CAST(IFNULL(dst_endpoint.vpc_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-vpc_uid`,
CAST(IFNULL(dst_endpoint.instance_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-instance_uid`,
CAST(IFNULL(dst_endpoint.subnet_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-subnet_uid`,
CASE
WHEN regexp(dst_endpoint.ip, '(10\\..*)|(192\\.168\\..*)|(172\\.1[6-9]\\..*)|(172\\.2[0-9]\\..*)|(172\\.3[0-1]\\.*)')
THEN 'ingress'
ELSE 'egress'
END AS `aws.vpc.flow-direction`,

CAST(IFNULL(connection_info['protocol_num'], 0) AS INT) AS `aws.vpc.connection.protocol_num`,
CAST(IFNULL(connection_info['tcp_flags'], '0') AS STRING) AS `aws.vpc.connection.tcp_flags`,
CAST(IFNULL(connection_info['protocol_ver'], '0') AS STRING) AS `aws.vpc.connection.protocol_ver`,
CAST(IFNULL(connection_info['boundary'], 'Unknown') AS STRING) AS `aws.vpc.connection.boundary`,
CAST(IFNULL(connection_info['direction'], 'Unknown') AS STRING) AS `aws.vpc.connection.direction`,

CAST(IFNULL(traffic.packets, 0) AS LONG) AS `aws.vpc.packets`,
CAST(IFNULL(traffic.bytes, 0) AS LONG) AS `aws.vpc.bytes`,

CAST(FROM_UNIXTIME(time / 1000) AS TIMESTAMP) AS `@timestamp`,
CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP) AS `start_time`,
CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP) AS `interval_start_time`,
CAST(FROM_UNIXTIME(end_time / 1000) AS TIMESTAMP) AS `end_time`,
status_code AS `aws.vpc.status_code`,

severity AS `aws.vpc.severity`,
class_name AS `aws.vpc.class_name`,
category_name AS `aws.vpc.category_name`,
activity_name AS `aws.vpc.activity_name`,
disposition AS `aws.vpc.disposition`,
type_name AS `aws.vpc.type_name`,

region AS `aws.vpc.region`,
accountid AS `aws.vpc.account-id`
FROM
{table_name},
(SELECT MAX(CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP)) AS max_start_time FROM {table_name}) AS latest
WHERE
CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP) >= DATE_SUB(latest.max_start_time, 7)
WITH (
auto_refresh = false
)

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
REFRESH MATERIALIZED VIEW {table_name}__week_live_mview
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
CREATE MATERIALIZED VIEW IF NOT EXISTS {table_name}__window_agg_60_min_network_ip_bytes_mview AS
WITH hourly_buckets AS (
SELECT
date_trunc('hour', from_unixtime(start_time / 1000)) AS interval_start_time,
CAST(IFNULL(dst_endpoint.ip, '0.0.0.0') AS STRING) AS dstaddr,
SUM(CAST(IFNULL(traffic.bytes, 0) AS LONG)) AS total_bytes
FROM
{table_name}
GROUP BY
interval_start_time,
dstaddr
),
ranked_addresses AS (
SELECT
CAST(interval_start_time AS TIMESTAMP),
dstaddr,
total_bytes,
RANK() OVER (PARTITION BY interval_start_time ORDER BY total_bytes DESC) AS bytes_rank
FROM
hourly_buckets
)
SELECT
CAST(interval_start_time AS TIMESTAMP),
dstaddr,
total_bytes
FROM
ranked_addresses
WHERE
bytes_rank <= 50
ORDER BY
interval_start_time ASC,
bytes_rank ASC
WITH (
auto_refresh = false
)

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
REFRESH MATERIALIZED VIEW {table_name}__window_agg_60_min_network_ip_bytes_mview
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
CREATE MATERIALIZED VIEW IF NOT EXISTS {table_name}__window_agg_60_min_network_ip_cardinality_mview AS
WITH hourly_buckets AS (
SELECT
date_trunc('hour', from_unixtime(start_time / 1000)) AS interval_start_time,
CAST(IFNULL(dst_endpoint.ip, '0.0.0.0') AS STRING) AS dstaddr,
COUNT(*) AS total_count
FROM
{table_name}
GROUP BY
interval_start_time,
dstaddr
),
ranked_addresses AS (
SELECT
CAST(interval_start_time AS TIMESTAMP),
dstaddr,
total_count,
RANK() OVER (PARTITION BY interval_start_time ORDER BY total_count DESC) AS addr_rank
FROM
hourly_buckets
)
SELECT
CAST(interval_start_time AS TIMESTAMP),
dstaddr,
total_count
FROM
ranked_addresses
WHERE
addr_rank <= 50
ORDER BY
interval_start_time ASC,
addr_rank ASC
WITH (
auto_refresh = false
)

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
REFRESH MATERIALIZED VIEW {table_name}__window_agg_60_min_network_ip_cardinality_mview
Loading

0 comments on commit e97df5f

Please sign in to comment.