Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQL Query receiver - support timestamp in tracking column #2

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion receiver/sqlqueryreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ over and over again, unless there's an external actor removing the old rows from
To prevent reading the same rows on every collection interval, use a parameterized query like `select * from my_logs where id_column > ?`,
together with the `tracking_start_value` configuration property that specifies the initial value for the parameter.
The receiver will use the configured `tracking_start_value` as the value for the query parameter when running the query for the first time.
On each query run, the receiver will retrieve the highest value from the `tracking_column` from the result set and use it as the value for the query parameter on next collection interval.
On each query run, the receiver will retrieve the last value from the `tracking_column` from the result set and use it as the value for the query parameter on next collection interval. To prevent duplicate log downloads, make sure to sort the query results in ascending order by the tracking_column value.

Note that the notation for the parameter depends on the database backend. For example in MySQL this is `?`, in PostgreSQL this is `$1`, in Oracle this is any string identifier starting with a colon `:`, for example `:my_parameter`.

Expand Down
2 changes: 1 addition & 1 deletion receiver/sqlqueryreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type Query struct {
Metrics []MetricCfg `mapstructure:"metrics"`
Logs []LogsCfg `mapstructure:"logs"`
TrackingColumn string `mapstructure:"tracking_column"`
TrackingStartValue int `mapstructure:"tracking_start_value"`
TrackingStartValue string `mapstructure:"tracking_start_value"`
}

func (q Query) Validate() error {
Expand Down
2 changes: 1 addition & 1 deletion receiver/sqlqueryreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestLoadConfig(t *testing.T) {
{
SQL: "select * from test_logs where log_id > ?",
TrackingColumn: "log_id",
TrackingStartValue: 10,
TrackingStartValue: "10",
Logs: []LogsCfg{
{
BodyColumn: "log_body",
Expand Down
49 changes: 37 additions & 12 deletions receiver/sqlqueryreceiver/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,19 @@ func TestPostgresIntegration(t *testing.T) {
},
},
{
SQL: "select * from simple_logs where id >= $1",
SQL: "select * from simple_logs where id > $1",
TrackingColumn: "id",
TrackingStartValue: 3,
TrackingStartValue: "3",
Logs: []LogsCfg{
{
BodyColumn: "body",
},
},
},
{
SQL: "select * from simple_logs where insert_time > $1",
TrackingColumn: "insert_time",
TrackingStartValue: "2022-06-03 21:59:28+00",
Logs: []LogsCfg{
{
BodyColumn: "body",
Expand Down Expand Up @@ -185,11 +195,11 @@ func TestPostgresIntegration(t *testing.T) {
require.Eventuallyf(
t,
func() bool {
return logsConsumer.LogRecordCount() > 0
return logsConsumer.LogRecordCount() > 2
},
2*time.Minute,
1*time.Second,
"failed to receive more than 0 logs",
"failed to receive more than 2 logs",
)
testSimpleLogs(t, logsConsumer.AllLogs())
}
Expand Down Expand Up @@ -249,9 +259,19 @@ func TestOracleDBIntegration(t *testing.T) {
},
},
{
SQL: "select * from sys.simple_logs where id >= :id",
SQL: "select * from sys.simple_logs where id > :id",
TrackingColumn: "id",
TrackingStartValue: 3,
TrackingStartValue: "3",
Logs: []LogsCfg{
{
BodyColumn: "BODY",
},
},
},
{
SQL: "select * from sys.simple_logs where insert_time > :insert_time",
TrackingColumn: "insert_time",
TrackingStartValue: "03-JUN-22 09.59.29.000000000 PM +00:00",
Logs: []LogsCfg{
{
BodyColumn: "BODY",
Expand Down Expand Up @@ -295,11 +315,11 @@ func TestOracleDBIntegration(t *testing.T) {
require.Eventuallyf(
t,
func() bool {
return logsConsumer.LogRecordCount() > 0
return logsConsumer.LogRecordCount() > 2
},
5*time.Minute,
1*time.Second,
"failed to receive more than 0 logs",
"failed to receive more than 2 logs",
)
testSimpleLogs(t, logsConsumer.AllLogs())
}
Expand Down Expand Up @@ -383,15 +403,20 @@ func assertDoubleGaugeEquals(t *testing.T, expected float64, metric pmetric.Metr

func testSimpleLogs(t *testing.T, logs []plog.Logs) {
assert.Equal(t, 1, len(logs))
assert.Equal(t, 1, logs[0].ResourceLogs().Len())
assert.Equal(t, 1, logs[0].ResourceLogs().At(0).ScopeLogs().Len())
assert.Equal(t, 2, logs[0].ResourceLogs().Len())
testScopeLogsSLiceFromSimpleLogs(t, logs[0].ResourceLogs().At(0).ScopeLogs())
testScopeLogsSLiceFromSimpleLogs(t, logs[0].ResourceLogs().At(1).ScopeLogs())
}

func testScopeLogsSLiceFromSimpleLogs(t *testing.T, scopeLogsSlice plog.ScopeLogsSlice) {
assert.Equal(t, 1, scopeLogsSlice.Len())
expectedEntries := []string{
"- - - [03/Jun/2022:21:59:29 +0000] \"GET /api/health HTTP/1.1\" 200 6233 4 \"-\" \"-\" 579e8362d3185b61 -",
"- - - [03/Jun/2022:21:59:31 +0000] \"GET /api/health HTTP/1.1\" 200 6207 5 \"-\" \"-\" 8c6ac61ae66e509f -",
"- - - [03/Jun/2022:21:59:31 +0000] \"GET /api/health HTTP/1.1\" 200 6200 4 \"-\" \"-\" c163495861e873d8 -",
}
assert.Equal(t, len(expectedEntries), logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len())
assert.Equal(t, len(expectedEntries), scopeLogsSlice.At(0).LogRecords().Len())
for i, _ := range expectedEntries {
assert.Equal(t, expectedEntries[i], logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i).Body().Str())
assert.Equal(t, expectedEntries[i], scopeLogsSlice.At(0).LogRecords().At(i).Body().Str())
}
}
12 changes: 2 additions & 10 deletions receiver/sqlqueryreceiver/logs_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"database/sql"
"fmt"
"strconv"
"time"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -170,7 +169,7 @@ type logsQueryReceiver struct {

db *sql.DB
client dbClient
trackingValue int
trackingValue string
}

func newLogsQueryReceiver(
Expand Down Expand Up @@ -241,14 +240,7 @@ func (queryReceiver *logsQueryReceiver) storeTrackingValue(row stringMap) {
return
}
currentTrackingColumnValueString := row[queryReceiver.query.TrackingColumn]
currentTrackingColumnValue, err := strconv.Atoi(currentTrackingColumnValueString)
if err != nil {
queryReceiver.logger.Error("tracking column value is not integer", zap.String("tracking_column_value", currentTrackingColumnValueString))
return
}
if currentTrackingColumnValue > queryReceiver.trackingValue {
queryReceiver.trackingValue = currentTrackingColumnValue
}
queryReceiver.trackingValue = currentTrackingColumnValueString
}

func rowToLog(row stringMap, config LogsCfg, logRecord plog.LogRecord) error {
Expand Down
13 changes: 7 additions & 6 deletions receiver/sqlqueryreceiver/testdata/integration/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ grant select on movie to otel;
create table simple_logs
(
id integer primary key,
insert_time timestamp,
body text
);
grant select on simple_logs to otel;

insert into simple_logs (id, body) values
(1, '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -'),
(2, '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -'),
(3, '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -'),
(4, '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -'),
(5, '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -');
insert into simple_logs (id, insert_time, body) values
(1, '2022-06-03 21:59:26+00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -'),
(2, '2022-06-03 21:59:26+00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -'),
(3, '2022-06-03 21:59:29+00', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -'),
(4, '2022-06-03 21:59:31+00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -'),
(5, '2022-06-03 21:59:31+00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -');

21 changes: 11 additions & 10 deletions receiver/sqlqueryreceiver/testdata/integration/initOracleDB.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,18 @@ GRANT ALL ON movie TO OTEL;
create table simple_logs
(
id number primary key,
insert_time timestamp with time zone,
body varchar2(4000)
);
grant select on simple_logs to otel;

insert into simple_logs (id, body) values
(1, '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -');
insert into simple_logs (id, body) values
(2, '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -');
insert into simple_logs (id, body) values
(3, '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -');
insert into simple_logs (id, body) values
(4, '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -');
insert into simple_logs (id, body) values
(5, '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -');
insert into simple_logs (id, insert_time, body) values
(1, TIMESTAMP '2022-06-03 21:59:26 +00:00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -');
insert into simple_logs (id, insert_time, body) values
(2, TIMESTAMP '2022-06-03 21:59:26 +00:00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -');
insert into simple_logs (id, insert_time, body) values
(3, TIMESTAMP '2022-06-03 21:59:29 +00:00', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -');
insert into simple_logs (id, insert_time, body) values
(4, TIMESTAMP '2022-06-03 21:59:31 +00:00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -');
insert into simple_logs (id, insert_time, body) values
(5, TIMESTAMP '2022-06-03 21:59:31 +00:00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -');