Skip to content

Commit fc4714c

Browse files
Steffen Sieringmelchiormoulin
Steffen Siering
authored andcommitted
Init winlog input (elastic#19622)
1 parent e61a5c0 commit fc4714c

File tree

3 files changed

+160
-3
lines changed

3 files changed

+160
-3
lines changed

filebeat/input/default-inputs/inputs_windows.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package inputs
2020
import (
2121
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
2222
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
23+
"github.com/elastic/beats/v7/filebeat/input/winlog"
2324
"github.com/elastic/beats/v7/libbeat/beat"
2425
"github.com/elastic/beats/v7/libbeat/logp"
2526
)
@@ -30,7 +31,6 @@ type osComponents interface {
3031

3132
func osInputs(info beat.Info, log *logp.Logger, components osComponents) []v2.Plugin {
3233
return []v2.Plugin{
33-
// windows events logs are not available yet
34-
// winlog.Plugin(log, components),
34+
winlog.Plugin(log, components),
3535
}
3636
}

filebeat/input/winlog/input.go

+157
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package winlog
19+
20+
import (
21+
"fmt"
22+
"io"
23+
"time"
24+
25+
input "github.com/elastic/beats/v7/filebeat/input/v2"
26+
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
27+
"github.com/elastic/beats/v7/libbeat/common"
28+
"github.com/elastic/beats/v7/libbeat/feature"
29+
"github.com/elastic/beats/v7/libbeat/logp"
30+
"github.com/elastic/go-concert/ctxtool"
31+
"github.com/elastic/go-concert/timed"
32+
33+
"github.com/elastic/beats/v7/winlogbeat/checkpoint"
34+
"github.com/elastic/beats/v7/winlogbeat/eventlog"
35+
)
36+
37+
type eventlogRunner struct{}
38+
39+
const pluginName = "winlog"
40+
41+
// Plugin create a stateful input Plugin collecting logs from Windows Event Logs.
42+
func Plugin(log *logp.Logger, store cursor.StateStore) input.Plugin {
43+
return input.Plugin{
44+
Name: pluginName,
45+
Stability: feature.Beta,
46+
Deprecated: false,
47+
Info: "Windows Event Logs",
48+
Doc: "The winlog input collects logs from the local windows event log service",
49+
Manager: &cursor.InputManager{
50+
Logger: log,
51+
StateStore: store,
52+
Type: pluginName,
53+
Configure: configure,
54+
},
55+
}
56+
}
57+
58+
func configure(cfg *common.Config) ([]cursor.Source, cursor.Input, error) {
59+
// TODO: do we want to allow to read multiple eventLogs using a single config
60+
// as is common for other inputs?
61+
eventLog, err := eventlog.New(cfg)
62+
if err != nil {
63+
return nil, nil, fmt.Errorf("Failed to create new event log. %v", err)
64+
}
65+
66+
sources := []cursor.Source{eventLog}
67+
return sources, eventlogRunner{}, nil
68+
}
69+
70+
func (eventlogRunner) Name() string { return pluginName }
71+
72+
func (eventlogRunner) Test(source cursor.Source, ctx input.TestContext) error {
73+
api := source.(eventlog.EventLog)
74+
err := api.Open(checkpoint.EventLogState{})
75+
if err != nil {
76+
return fmt.Errorf("Failed to open '%v': %v", api.Name(), err)
77+
}
78+
return api.Close()
79+
}
80+
81+
func (eventlogRunner) Run(
82+
ctx input.Context,
83+
source cursor.Source,
84+
cursor cursor.Cursor,
85+
publisher cursor.Publisher,
86+
) error {
87+
log := ctx.Logger.With("eventlog", source.Name())
88+
checkpoint := initCheckpoint(log, cursor)
89+
90+
api := source.(eventlog.EventLog)
91+
92+
err := api.Open(checkpoint)
93+
if err != nil {
94+
return fmt.Errorf("failed to open windows event log: %v", err)
95+
}
96+
97+
log.Debugf("Windows Event Log '%s' opened successfully", source.Name())
98+
99+
// setup closing the API if either the run function is signaled asynchronously
100+
// to shut down or when returning after io.EOF
101+
cancelCtx, cancelFn := ctxtool.WithFunc(ctxtool.FromCanceller(ctx.Cancelation), func() {
102+
if err := api.Close(); err != nil {
103+
log.Errorf("Error while closing Windows Eventlog Access: %v", err)
104+
}
105+
})
106+
defer cancelFn()
107+
108+
// read loop
109+
for cancelCtx.Err() == nil {
110+
records, err := api.Read()
111+
switch err {
112+
case nil:
113+
break
114+
case io.EOF:
115+
log.Debugf("End of Winlog event stream reached: %v", err)
116+
return nil
117+
default:
118+
// only log error if we are not shutting down
119+
if cancelCtx.Err() != nil {
120+
return nil
121+
}
122+
123+
log.Errorf("Error occured while reading from Windows Event Log '%v': %v", source.Name(), err)
124+
return err
125+
}
126+
127+
if len(records) == 0 {
128+
timed.Wait(cancelCtx, time.Second)
129+
continue
130+
}
131+
132+
for _, record := range records {
133+
event := record.ToEvent()
134+
if err := publisher.Publish(event, record.Offset); err != nil {
135+
// Publisher indicates disconnect when returning an error.
136+
// stop trying to publish records and quit
137+
return err
138+
}
139+
}
140+
}
141+
142+
return nil
143+
}
144+
145+
func initCheckpoint(log *logp.Logger, cursor cursor.Cursor) checkpoint.EventLogState {
146+
var cp checkpoint.EventLogState
147+
if cursor.IsNew() {
148+
return cp
149+
}
150+
151+
if err := cursor.Unpack(&cp); err != nil {
152+
log.Errorf("Reset winlog position. Failed to read checkpoint from registry: %v", err)
153+
return checkpoint.EventLogState{}
154+
}
155+
156+
return cp
157+
}

winlogbeat/eventlog/factory.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
"github.com/elastic/beats/v7/libbeat/common"
2828
)
2929

30-
var commonConfigKeys = []string{"api", "name", "fields", "fields_under_root",
30+
var commonConfigKeys = []string{"type", "api", "name", "fields", "fields_under_root",
3131
"tags", "processors", "index"}
3232

3333
// ConfigCommon is the common configuration data used to instantiate a new

0 commit comments

Comments
 (0)