Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update vpc flow with flint-s3 based DDL assets and dashboard #1691

Merged
merged 18 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions opensearch_dashboards.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"id": "observabilityDashboards",
"version": "3.0.0.0",
"opensearchDashboardsVersion": "3.0.0",
"version": "2.13.0.0",
"opensearchDashboardsVersion": "2.13.0",
"server": true,
"ui": true,
"requiredPlugins": [
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "observability-dashboards",
"version": "3.0.0.0",
"version": "2.13.0.0",
"main": "index.ts",
"license": "Apache-2.0",
"scripts": {
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,30 +0,0 @@
CREATE MATERIALIZED VIEW {table_name}__mview AS
SELECT
CAST(FROM_UNIXTIME(start) AS TIMESTAMP) as `@timestamp`,
version as `aws.vpc.version`,
account_id as `aws.vpc.account-id`,
interface_id as `aws.vpc.interface-id`,
srcaddr as `aws.vpc.srcaddr`,
dstaddr as `aws.vpc.dstaddr`,
CAST(srcport AS LONG) as `aws.vpc.srcport`,
CAST(dstport AS LONG) as `aws.vpc.dstport`,
protocol as `aws.vpc.protocol`,
CAST(packets AS LONG) as `aws.vpc.packets`,
CAST(bytes AS LONG) as `aws.vpc.bytes`,
CAST(FROM_UNIXTIME(start) AS TIMESTAMP) as `aws.vpc.start`,
CAST(FROM_UNIXTIME(end) AS TIMESTAMP) as `aws.vpc.end`,
action as `aws.vpc.action`,
log_status as `aws.vpc.log-status`,
CASE
WHEN regexp(dstaddr, '(10\\..*)|(192\\.168\\..*)|(172\\.1[6-9]\\..*)|(172\\.2[0-9]\\..*)|(172\\.3[0-1]\\.*)')
THEN 'ingress'
ELSE 'egress'
END AS `aws.vpc.flow-direction`
FROM
{table_name}
WITH (
auto_refresh = 'true',
checkpoint_location = '{s3_checkpoint_location}',
watermark_delay = '1 Minute',
extra_options = '{ "{table_name}": { "maxFilesPerTrigger": "10" }}'
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
CREATE EXTERNAL TABLE IF NOT EXISTS {table_name} (
cloud STRUCT<
account_uid: STRING,
region: STRING,
zone: STRING,
provider: STRING
>,
src_endpoint STRUCT<
port: INT,
svc_name: STRING,
ip: STRING,
intermediate_ips: ARRAY<STRING>,
interface_uid: STRING,
vpc_uid: STRING,
instance_uid: STRING,
subnet_uid: STRING
>,
dst_endpoint STRUCT<
port: INT,
svc_name: STRING,
ip: STRING,
intermediate_ips: ARRAY<STRING>,
interface_uid: STRING,
vpc_uid: STRING,
instance_uid: STRING,
subnet_uid: STRING
>,
connection_info STRUCT<
protocol_num: INT,
tcp_flags: INT,
protocol_ver: STRING,
boundary_id: INT,
boundary: STRING,
direction_id: INT,
direction: STRING
>,
traffic STRUCT<
packets: BIGINT,
bytes: BIGINT
>,
time BIGINT,
start_time BIGINT,
end_time BIGINT,
status_code STRING,
severity_id INT,
severity STRING,
class_name STRING,
class_uid INT,
category_name STRING,
category_uid INT,
activity_name STRING,
activity_id INT,
disposition STRING,
disposition_id INT,
type_uid INT,
type_name STRING,
region STRING,
accountid STRING,
eventday STRING
)
USING json
LOCATION '{s3_bucket_location}'
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
CREATE MATERIALIZED VIEW IF NOT EXISTS {table_name}__agg_30_min_connections_mview AS
SELECT
CAST(from_unixtime(CAST((start_time / 1000) AS BIGINT) DIV 1800 * 1800) AS TIMESTAMP) AS interval_start_time,
CAST(from_unixtime((CAST((start_time / 1000) AS BIGINT) DIV 1800 * 1800) + 1799) AS TIMESTAMP) AS interval_end_time,

status_code as `aws.vpc.status_code`,
CAST(IFNULL(connection_info['direction'], 'Unknown') AS STRING) AS `aws.vpc.connection.direction`,
CAST(IFNULL(src_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-src-aws-service`,
CAST(IFNULL(dst_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-dst-aws-service`,

accountid as `aws.vpc.account-id`,
region as `aws.vpc.region`,

COUNT(*) AS total_connections,
SUM(CAST(IFNULL(traffic.bytes, 0) AS LONG)) AS total_bytes,
SUM(CAST(IFNULL(traffic.packets, 0) AS LONG)) AS total_packets
FROM
{table_name}
GROUP BY
CAST((start_time / 1000) AS BIGINT) DIV 1800 * 1800,
region,
accountid,
status_code,
src_endpoint.svc_name,
dst_endpoint.svc_name,
connection_info['direction']
ORDER BY
interval_start_time
WITH (
auto_refresh = false
)

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
REFRESH MATERIALIZED VIEW {table_name}__agg_30_min_connections_mview
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
CREATE MATERIALIZED VIEW IF NOT EXISTS {table_name}__agg_60_min_connections_mview AS
SELECT
date_trunc('hour', from_unixtime(start_time / 1000)) AS interval_start_time,
date_trunc('hour', from_unixtime(start_time / 1000)) + INTERVAL 1 HOUR AS interval_end_time,

status_code as `aws.vpc.status_code`,
CAST(IFNULL(connection_info['direction'], 'Unknown') AS STRING) AS `aws.vpc.connection.direction`,
CAST(IFNULL(src_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-src-aws-service`,
CAST(IFNULL(dst_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-dst-aws-service`,

accountid as `aws.vpc.account-id`,
region as `aws.vpc.region`,

COUNT(*) AS total_connections,
SUM(CAST(IFNULL(traffic.bytes, 0) AS LONG)) AS total_bytes,
SUM(CAST(IFNULL(traffic.packets, 0) AS LONG)) AS total_packets
FROM
{table_name}
GROUP BY
date_trunc('hour', from_unixtime(start_time / 1000)),
region,
accountid,
status_code,
src_endpoint.svc_name,
dst_endpoint.svc_name,
connection_info['direction']
ORDER BY
interval_start_time
WITH (
auto_refresh = false
)

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
REFRESH MATERIALIZED VIEW {table_name}__agg_60_min_connections_mview
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
CREATE MATERIALIZED VIEW {table_name}__week_live_mview AS
SELECT
cloud.account_uid AS `aws.vpc.cloud_account_uid`,
cloud.region AS `aws.vpc.cloud_region`,
cloud.zone AS `aws.vpc.cloud_zone`,
cloud.provider AS `aws.vpc.cloud_provider`,

CAST(IFNULL(src_endpoint.port, 0) AS LONG) AS `aws.vpc.srcport`,
CAST(IFNULL(src_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-src-aws-service`,
CAST(IFNULL(src_endpoint.ip, '0.0.0.0') AS STRING) AS `aws.vpc.srcaddr`,
CAST(IFNULL(src_endpoint.interface_uid, 'Unknown') AS STRING) AS `aws.vpc.src-interface_uid`,
CAST(IFNULL(src_endpoint.vpc_uid, 'Unknown') AS STRING) AS `aws.vpc.src-vpc_uid`,
CAST(IFNULL(src_endpoint.instance_uid, 'Unknown') AS STRING) AS `aws.vpc.src-instance_uid`,
CAST(IFNULL(src_endpoint.subnet_uid, 'Unknown') AS STRING) AS `aws.vpc.src-subnet_uid`,

CAST(IFNULL(dst_endpoint.port, 0) AS LONG) AS `aws.vpc.dstport`,
CAST(IFNULL(dst_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-dst-aws-service`,
CAST(IFNULL(dst_endpoint.ip, '0.0.0.0') AS STRING) AS `aws.vpc.dstaddr`,
CAST(IFNULL(dst_endpoint.interface_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-interface_uid`,
CAST(IFNULL(dst_endpoint.vpc_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-vpc_uid`,
CAST(IFNULL(dst_endpoint.instance_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-instance_uid`,
CAST(IFNULL(dst_endpoint.subnet_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-subnet_uid`,
CASE
WHEN regexp(dst_endpoint.ip, '(10\\..*)|(192\\.168\\..*)|(172\\.1[6-9]\\..*)|(172\\.2[0-9]\\..*)|(172\\.3[0-1]\\.*)')
THEN 'ingress'
ELSE 'egress'
END AS `aws.vpc.flow-direction`,

CAST(IFNULL(connection_info['protocol_num'], 0) AS INT) AS `aws.vpc.connection.protocol_num`,
CAST(IFNULL(connection_info['tcp_flags'], '0') AS STRING) AS `aws.vpc.connection.tcp_flags`,
CAST(IFNULL(connection_info['protocol_ver'], '0') AS STRING) AS `aws.vpc.connection.protocol_ver`,
CAST(IFNULL(connection_info['boundary'], 'Unknown') AS STRING) AS `aws.vpc.connection.boundary`,
CAST(IFNULL(connection_info['direction'], 'Unknown') AS STRING) AS `aws.vpc.connection.direction`,

CAST(IFNULL(traffic.packets, 0) AS LONG) AS `aws.vpc.packets`,
CAST(IFNULL(traffic.bytes, 0) AS LONG) AS `aws.vpc.bytes`,

CAST(FROM_UNIXTIME(time / 1000) AS TIMESTAMP) AS `@timestamp`,
CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP) AS `start_time`,
CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP) AS `interval_start_time`,
CAST(FROM_UNIXTIME(end_time / 1000) AS TIMESTAMP) AS `end_time`,
status_code AS `aws.vpc.status_code`,

severity AS `aws.vpc.severity`,
class_name AS `aws.vpc.class_name`,
category_name AS `aws.vpc.category_name`,
activity_name AS `aws.vpc.activity_name`,
disposition AS `aws.vpc.disposition`,
type_name AS `aws.vpc.type_name`,

region AS `aws.vpc.region`,
accountid AS `aws.vpc.account-id`
FROM
{table_name}
WITH (
auto_refresh = true,
refresh_interval = '1 Minute',
checkpoint_location = '{s3_checkpoint_location}',
watermark_delay = '10 Second',
extra_options = '{ "{table_name}": { "maxFilesPerTrigger": "10" }}'
)

Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
CREATE MATERIALIZED VIEW {table_name}__week_live_mview AS
SELECT
cloud.account_uid AS `aws.vpc.cloud_account_uid`,
cloud.region AS `aws.vpc.cloud_region`,
cloud.zone AS `aws.vpc.cloud_zone`,
cloud.provider AS `aws.vpc.cloud_provider`,

CAST(IFNULL(src_endpoint.port, 0) AS LONG) AS `aws.vpc.srcport`,
CAST(IFNULL(src_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-src-aws-service`,
CAST(IFNULL(src_endpoint.ip, '0.0.0.0') AS STRING) AS `aws.vpc.srcaddr`,
CAST(IFNULL(src_endpoint.interface_uid, 'Unknown') AS STRING) AS `aws.vpc.src-interface_uid`,
CAST(IFNULL(src_endpoint.vpc_uid, 'Unknown') AS STRING) AS `aws.vpc.src-vpc_uid`,
CAST(IFNULL(src_endpoint.instance_uid, 'Unknown') AS STRING) AS `aws.vpc.src-instance_uid`,
CAST(IFNULL(src_endpoint.subnet_uid, 'Unknown') AS STRING) AS `aws.vpc.src-subnet_uid`,

CAST(IFNULL(dst_endpoint.port, 0) AS LONG) AS `aws.vpc.dstport`,
CAST(IFNULL(dst_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-dst-aws-service`,
CAST(IFNULL(dst_endpoint.ip, '0.0.0.0') AS STRING) AS `aws.vpc.dstaddr`,
CAST(IFNULL(dst_endpoint.interface_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-interface_uid`,
CAST(IFNULL(dst_endpoint.vpc_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-vpc_uid`,
CAST(IFNULL(dst_endpoint.instance_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-instance_uid`,
CAST(IFNULL(dst_endpoint.subnet_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-subnet_uid`,
CASE
WHEN regexp(dst_endpoint.ip, '(10\\..*)|(192\\.168\\..*)|(172\\.1[6-9]\\..*)|(172\\.2[0-9]\\..*)|(172\\.3[0-1]\\.*)')
THEN 'ingress'
ELSE 'egress'
END AS `aws.vpc.flow-direction`,

CAST(IFNULL(connection_info['protocol_num'], 0) AS INT) AS `aws.vpc.connection.protocol_num`,
CAST(IFNULL(connection_info['tcp_flags'], '0') AS STRING) AS `aws.vpc.connection.tcp_flags`,
CAST(IFNULL(connection_info['protocol_ver'], '0') AS STRING) AS `aws.vpc.connection.protocol_ver`,
CAST(IFNULL(connection_info['boundary'], 'Unknown') AS STRING) AS `aws.vpc.connection.boundary`,
CAST(IFNULL(connection_info['direction'], 'Unknown') AS STRING) AS `aws.vpc.connection.direction`,

CAST(IFNULL(traffic.packets, 0) AS LONG) AS `aws.vpc.packets`,
CAST(IFNULL(traffic.bytes, 0) AS LONG) AS `aws.vpc.bytes`,

CAST(FROM_UNIXTIME(time / 1000) AS TIMESTAMP) AS `@timestamp`,
CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP) AS `start_time`,
CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP) AS `interval_start_time`,
CAST(FROM_UNIXTIME(end_time / 1000) AS TIMESTAMP) AS `end_time`,
status_code AS `aws.vpc.status_code`,

severity AS `aws.vpc.severity`,
class_name AS `aws.vpc.class_name`,
category_name AS `aws.vpc.category_name`,
activity_name AS `aws.vpc.activity_name`,
disposition AS `aws.vpc.disposition`,
type_name AS `aws.vpc.type_name`,

region AS `aws.vpc.region`,
accountid AS `aws.vpc.account-id`
FROM
{table_name},
(SELECT MAX(CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP)) AS max_start_time FROM {table_name}) AS latest
WHERE
CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP) >= DATE_SUB(latest.max_start_time, 7)
WITH (
auto_refresh = false
)

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
REFRESH MATERIALIZED VIEW {table_name}__week_live_mview
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
CREATE MATERIALIZED VIEW IF NOT EXISTS {table_name}__window_agg_60_min_network_ip_bytes_mview AS
WITH hourly_buckets AS (
SELECT
date_trunc('hour', from_unixtime(start_time / 1000)) AS interval_start_time,
CAST(IFNULL(dst_endpoint.ip, '0.0.0.0') AS STRING) AS dstaddr,
SUM(CAST(IFNULL(traffic.bytes, 0) AS LONG)) AS total_bytes
FROM
{table_name}
GROUP BY
interval_start_time,
dstaddr
),
ranked_addresses AS (
SELECT
CAST(interval_start_time AS TIMESTAMP),
dstaddr,
total_bytes,
RANK() OVER (PARTITION BY interval_start_time ORDER BY total_bytes DESC) AS bytes_rank
FROM
hourly_buckets
)
SELECT
CAST(interval_start_time AS TIMESTAMP),
dstaddr,
total_bytes
FROM
ranked_addresses
WHERE
bytes_rank <= 50
ORDER BY
interval_start_time ASC,
bytes_rank ASC
WITH (
auto_refresh = false
)

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
REFRESH MATERIALIZED VIEW {table_name}__window_agg_60_min_network_ip_bytes_mview
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
CREATE MATERIALIZED VIEW IF NOT EXISTS {table_name}__window_agg_60_min_network_ip_cardinality_mview AS
WITH hourly_buckets AS (
SELECT
date_trunc('hour', from_unixtime(start_time / 1000)) AS interval_start_time,
CAST(IFNULL(dst_endpoint.ip, '0.0.0.0') AS STRING) AS dstaddr,
COUNT(*) AS total_count
FROM
{table_name}
GROUP BY
interval_start_time,
dstaddr
),
ranked_addresses AS (
SELECT
CAST(interval_start_time AS TIMESTAMP),
dstaddr,
total_count,
RANK() OVER (PARTITION BY interval_start_time ORDER BY total_count DESC) AS addr_rank
FROM
hourly_buckets
)
SELECT
CAST(interval_start_time AS TIMESTAMP),
dstaddr,
total_count
FROM
ranked_addresses
WHERE
addr_rank <= 50
ORDER BY
interval_start_time ASC,
addr_rank ASC
WITH (
auto_refresh = false
)

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
REFRESH MATERIALIZED VIEW {table_name}__window_agg_60_min_network_ip_cardinality_mview
Loading
Loading