-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature: TCP Input #6266
Feature: TCP Input #6266
Conversation
Waiting for CI to switch it to |
filebeat/input/tcp/client.go
Outdated
} | ||
r.Reset() | ||
// TODO(ph), should we keep the byte array instead of allocating a string? | ||
c.forwarder.Send(c.createEvent(scanner.Text())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a legitimate question, I am not sure how the remaining part of the code will work if I keep the message as a byte array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would ignore the overhead for now if it adds code complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For me on my side there is no overhead, it just a matter of calling scanner.Bytes()
instead of scanner.Text()
, I am just unaware of the downstream consequence for processor If I keep the bytes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I just read the doc, It's better to do an allocation because the original bytes can be overridden.
// Bytes returns the most recent token generated by a call to Scan.
// The underlying array may point to data that will be overwritten
// by a subsequent call to Scan. It does no allocation.
func (s *Scanner) Bytes() []byte {
return s.token
}
@@ -0,0 +1,72 @@ | |||
package tcp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've keep theses methods public, because I might move it to common.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should only move it to common if we also use it in metricbeat
for example, which could happen.
Working on fixing the test suite, simple environment difference with how I assert the tests, I will make it more robust. thanks Travis. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only skimmed through it and left some minor comments. Looks really good to me. Happy to dive deeper when it's ready.
BTW: +1 on having TSL in a separate PR.
#host: "localhost:8080" | ||
|
||
# Character used to split new message | ||
#line_delimiter: "\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this to be configurable? I would expect that same behaviour that supports also \r
etc. like the log prospector / harvester.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it needs to be configurable; this input is more than just syslog entries.
If you look at the code by default, it will use the ScanLines function to deal with that case. Anything else will use our own implementation of the split.
The default method will detect \n,
but it will also strip \r
, I think this is the most common use case with plain sockets protocol. So I think we should be fine(tm)?
The custom delimiter is OK for more esoteric implementation. Maybe in a future revision, we could allow multiple patterns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The feature got requested already several times for the log harvester ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't look at the log harvester how it does the splitting... maybe we can reuse it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a quick look at the log harvester, I believe we could use the same scanner/split.
filebeat/filebeat.reference.yml
Outdated
@@ -507,6 +507,16 @@ filebeat.prospectors: | |||
# Maximum size of the message received over UDP | |||
#max_message_size: 10240 | |||
|
|||
#------------------------------ Tcp prospector -------------------------------- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/Tcp/TCP
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for that change, I've went with Tcp
even if golang style would complain because of existing code, I wanted consistency with our Udp
input. I guess I can make a PR on top of UDP
to be all caps. :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, saw the UDP one when I reviewed one of your other PR's and was thinking the same.
filebeat/input/tcp/client.go
Outdated
} | ||
r.Reset() | ||
// TODO(ph), should we keep the byte array instead of allocating a string? | ||
c.forwarder.Send(c.createEvent(scanner.Text())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would ignore the overhead for now if it adds code complexity.
filebeat/input/tcp/client.go
Outdated
|
||
// Handle is reading message from the specified TCP socket. | ||
func (c *Client) Handle() error { | ||
defer c.conn.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this defer
should really be here or outside where the connection is opened or Handle
is called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, I will that change when you are done with the review.
filebeat/input/tcp/client.go
Outdated
return data | ||
} | ||
|
||
// GetRemoveHosts take the IP address of the client and try to resolve the name, if it fails we |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/getRemoveHosts/getRemoteHosts
filebeat/input/tcp/client.go
Outdated
|
||
func (c *Client) cacheMetadata() { | ||
c.metadata = common.MapStr{ | ||
"hostnames": c.getRemoteHosts(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we report all hosts? Should we exclude some?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure what is the best behavior here:
- taking the first one could be wrong.
- taking a subset could also be wrong.
- They are set in the metadata field for now, so users are free to use them or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any chance you could provide some example json output docs in the comment of the PR? I think that makes it easier to understand how the end event will look like.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also wonder if we want to opt-out from resolving the hostname.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right it could be costly or slow.
@@ -0,0 +1,72 @@ | |||
package tcp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should only move it to common if we also use it in metricbeat
for example, which could happen.
import socket | ||
|
||
|
||
class Test(BaseTest): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice tests. I already head in mind to ask for this, especially seeing if the shut down works 👍
libbeat/common/scan.go
Outdated
|
||
// ScanDelimiter return a function to split line using a custom delimiter, the delimiter | ||
// is stripped from the returned value. | ||
func ScanDelimiter(delimiter []byte) bufio.SplitFunc { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not put functions into common which we only use in one beat. We should have at least 2 use cases.
@ruflin You can go ahead with the full review, I was just marking it as in progress to wait for CI. Since I was testing some goroutine interaction I wanted to make it was solid on the CI. I haven't made your initial changes to this PR yet. But go a full review and I will make them in batch. Thanks for checking it out. |
jenkins test this please |
#host: "localhost:8080" | ||
|
||
# Character used to split new message | ||
#line_delimiter: "\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The feature got requested already several times for the log harvester ...
|
||
[float] | ||
[[max-read-buffer]] | ||
==== `max_read_buffer` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have max_message_size
in udp. We should probably use the same config name. Also we can change the udp one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will rename max_read_buffer
to max_message_size
, after looking back at the implementation they are a bit different but the underlying concept is the same, making sure we don't OOM on incoming messages
filebeat/filebeat.reference.yml
Outdated
#- type: tcp | ||
#enabled: false | ||
# The host and port to receive the new event | ||
#host: "localhost:8080" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you test to have multiple syslog inputs configured? I assume the following should work:
- type: tcp
host: "localhost:8080"
- type: tcp
host: "localhost:8081"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't tested it, but it should work if every input have their own port.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If both port are free:
2018-02-07T11:47:40.997-0500 INFO tcp/harvester.go:58 Started listening for incoming TCP connection on: localhost:8080`
2018-02-07T11:47:40.997-0500 INFO input/input.go:87 Starting input of type: tcp; ID: 14733937925171989371
2018-02-07T11:47:40.997-0500 DEBUG [processors] processors/processor.go:49 Processors:
2018-02-07T11:47:40.997-0500 INFO tcp/input.go:50 Starting TCP input
2018-02-07T11:47:40.997-0500 INFO tcp/harvester.go:58 Started listening for incoming TCP connection on: localhost:8081
2018-02-07T11:47:40.998-0500 DEBUG [input] log/config.go:178 recursive glob enabled
if one port is already taken:
2018-02-07T11:48:52.928-0500 ERROR instance/beat.go:674 Exiting: Error in initing input: listen tcp 127.0.0.1:8080: bind: address already in use
Exiting: Error in initing input: listen tcp 127.0.0.1:8080: bind: address already in use
FB exits
filebeat/input/tcp/client.go
Outdated
|
||
func (c *Client) cacheMetadata() { | ||
c.metadata = common.MapStr{ | ||
"hostnames": c.getRemoteHosts(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any chance you could provide some example json output docs in the comment of the PR? I think that makes it easier to understand how the end event will look like.
filebeat/input/tcp/config.go
Outdated
Type: "tcp", | ||
}, | ||
LineDelimiter: "\n", | ||
Host: "localhost:9000", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is 9000 a "common" default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ruflin Not sure about that, I think it's has good as the default we have in UDP. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
filebeat/input/tcp/config.go
Outdated
}, | ||
LineDelimiter: "\n", | ||
Host: "localhost:9000", | ||
Timeout: 5 * 60, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you will have to define the unit here (time.Second)
filebeat/input/tcp/conn.go
Outdated
"github.com/pkg/errors" | ||
) | ||
|
||
// ErrMaxReadBuffer returns when too much byte was read on the io.Reader |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
too many bytes?
filebeat/input/tcp/conn.go
Outdated
byteRead int64 | ||
} | ||
|
||
// NewMeteredReader returns a Meteredread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a MeteredReader
filebeat/input/tcp/harvester.go
Outdated
func (h *Harvester) Run() error { | ||
logp.Info("Started listening for incoming TCP connection on: %s:", h.config.Host) | ||
for { | ||
conn, err := h.server.Accept() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect that the prospector/input would be the server and then each harvester would be a client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like we discussed in the input refactoring, I am not sure the wording harvester work in the context of UDP and TCP, I would support renaming harvester
to server
and use client
for the actual remote connection. Would you be OK with that?
My reasoning is harvester
is more active process to fetch new things (pull), which on the other hand a TCP / UDP is more passive (push).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not really concerned here about not using the name harvester but why input != server
. From my point of view 1 input = 1 server, so if I start an input that should mean the server is started. Instead of having input
, server
, client
I think we only need input=server
, client
and can remove one abstraction layer.
I'm not too concerned about not having the name harvester anywhere as this is not something exposed to the user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ph Now looking at my own UDP code I see that I used also the harvester as the server :-( Leading by bad example? :-(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not, you lead by good example you see the problem :)
@ph I suggest we get this PR in rather sooner then later. The internal code structure I brought up above can be discussed and refactored later especially as I realised now the changes would also apply to UDP. Can you go through the other comments above again and see what needs addressing, rebase and squash and ping me for an other review and merge? |
@ruflin Thanks for answering my comments will do the changes, I wanted to bulk them. :) |
This is an example of the event format from a received events with this input. {
"@timestamp": "2018-02-12T19:12:29.000Z",
"@metadata": {
"beat": "filebeat",
"type": "doc",
"version": "7.0.0-alpha1",
"hostnames": [
"localhost"
],
"ip_address": "127.0.0.1:56576"
},
"beat": {
"hostname": "sashimi",
"version": "7.0.0-alpha1",
"name": "sashimi"
},
"message": "Hello world",
"input": {
"type": "tcp"
},
"prospector": {
"type": "tcp"
}
}
|
7f9ecdf
to
558d422
Compare
filebeat/input/tcp/conn.go
Outdated
} | ||
|
||
// NewDeadlineReader returns a new deadlineReader | ||
func NewDeadlineReader(c net.Conn, timeout time.Duration) *deadlineReader { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported func NewDeadlineReader returns unexported type *tcp.deadlineReader, which can be annoying to use
filebeat/input/tcp/conn.go
Outdated
} | ||
|
||
// NewMeteredReader returns a new MeteredReader | ||
func NewMeteredReader(reader io.Reader, maxReadBuffer int64) *meteredReader { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported func NewMeteredReader returns unexported type *tcp.meteredReader, which can be annoying to use
d2f4e36
to
c0b79db
Compare
I am fixing the flaky tests, oh network services. |
c0b79db
to
8028c8d
Compare
Thanks for the json output. We should probably also add an example to the docs. I'm a bit on the fence about the |
338edb7
to
435069e
Compare
@ruflin Let's do a following PR for the JSON example in the doc, not sure where it would be the best place to insert it, inputs doesn't dedicated pages yet, Any idea @dedemorton where I could place this information ? |
CHANGELOG.asciidoc
Outdated
@@ -46,8 +46,8 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di | |||
- Refactor the Stdin prospector to use the input interface {pull}6116[6116] | |||
- Refactor the usage of prospector to input in the YAML reference and | |||
the system test. {pull}6121[6121] | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Client is a remote client. | ||
type Client struct { | ||
conn net.Conn | ||
forwarder *harvester.Forwarder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we replace *harvester.Forwarder
by an interface or function type? Some more isolation from harvester.Forwarder (I hope to remove Forwarder and Outlet in the future).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Knowing that it make sense to change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to #6439 ?:)
I was planning to do it in second part but I can do it right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's keep the PR as small as possible and do the interface when we have a second implementation.
data := util.NewData() | ||
data.Event = beat.Event{ | ||
Timestamp: time.Now(), | ||
Meta: c.metadata, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The metadata
field behaves like @metadata
in Logstash. If one sets index
or id
or pipeline
, this will be configure some paramters in the Elasticsearch output. This on purpose?
When indexing into ES, the @metadata
is dropped. That is, when sending directly to ES, this information can never be retrieved by users (no rename processor).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did it on purpose, I am not sure how valuable that information it is. but I could make it public field and store in the mapping. WDYT?
filebeat/input/tcp/client.go
Outdated
|
||
// GetRemoteHosts take the IP address of the client and try to resolve the name, if it fails we | ||
// fallback to the IP, IP can resolve to multiple hostname. | ||
func (c *Client) getRemoteHosts() []string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need a method on Client for this one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see the need to make it a simple function or if It can be reused elsewhere, it will called once and we cache the result.
filebeat/input/tcp/client.go
Outdated
return hosts | ||
} | ||
|
||
func (c *Client) cacheMetadata() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cacheMetadata
is quite short and only executed once by the constructor. How about moving the body into the constructor and removing the function.
Alternatively have it be a function returning a common.MapStr
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will go with the common.MapStr
.
) | ||
|
||
// Harvester represent a TCP server | ||
type Harvester struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us not export the Harvester
. It's some internal detail specific to the input. How about renaming to server
?
Do we actually need a special Harvester/server
type? The Input
implementations seems to mostly wrap the server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filebeat/input/tcp/harvester.go
Outdated
conn.RemoteAddr(), | ||
h.clientsCount(), | ||
) | ||
h.wg.Done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better put defer h.wg.Done()
at the top of the go-func. Also try to capture panics and print an error. If something goes really wrong with one client, the other clients can still be served.
Same for h.unregisterClient()
.
e.g.
go func() {
defer logp.Recover()
defer h.wg.Done()
defer conn.Close()
h.registerClient()
defer h.unregisterClient()
err := client.Handle()
...
}()
@ph Can you ping us when this needs an other look? |
a0a3349
to
7b920dd
Compare
Allow to receive new events via TCP, this will create a new event per line and add some useful information about the connected client to the evant. This input is marked as **experimental**. This input expose the following settings: - `line_delimiter`: The characters used to split incoming events, by default it will split on `\n` and will also strip both `\n` and `\r`. You can also use any characters like `;` and you can also used multiple characters delimiter like `<END>`, the delimiter tokens will always be removed from the string. - `max_message_size`: This is a number of bytes that a client can buffer in memory before finding a new message, if the limit is reached the connection is killed and the event is logged. This is to prevent rogue clients to DoS attack by consuming all the available memory. The default values is 20971520. - `timeout`: The server will close any client that reach this inactivity timeout. Ref: #5862
I have rebased this PR and included your latest comments. I also migrated the documentation into his own file. Open questions: Should we extract the TCP logic into his own class in this PR, like I did in refactor of #6439? What do we do about I would like to move this forward or at least have a plan to unblock #6433. |
@ruflin @urso Don't check it out yet, I will make some changes I've learned from @andrewkroh |
I think it make sense to reopen a new one, since structure has changes locally to follow the udp refactor. |
Allow to receive new events via TCP, this will create a new event per
line and add some useful information about the connected client to the
evant. This input is marked as experimental.
This input expose the following settings:
line_delimiter
: The characters used to split incoming events, bydefault it will split on
\n
and will also strip both\n
and\r
.You can also use any characters like
;
and you can also used multiplecharacters delimiter like
<END>
, the delimiter tokens will always beremoved from the string.
max_read_buffer
: This is a number of bytes that a client can bufferin memory before finding a new message, if the limit is reached the
connection is killed and the event is logged. This is to prevent rogue
clients to DoS attack by consuming all the available memory. The default
values is 20971520.
timeout
: The server will close any client that reach this inactivitytimeout.
TODO
I've an almost TLS support done, but this PR was getting pretty big and I still have the tests to write so I've excluded it.
Ref: #5862