Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support for elastic / kafka export #70

Merged
merged 5 commits into from
Nov 8, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.15

require (
github.com/Knetic/govaluate v3.0.0+incompatible // indirect
github.com/Shopify/sarama v1.30.0
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d
github.com/elazarl/goproxy v0.0.0-20210110162100-a92cc753f88e
Expand All @@ -20,5 +21,5 @@ require (
github.com/projectdiscovery/tinydns v0.0.1
github.com/rs/xid v1.3.0
github.com/spaolacci/murmur3 v1.1.0 // indirect
golang.org/x/net v0.0.0-20210614182718-04defd469f4e
golang.org/x/net v0.0.0-20210917221730-978cfadd31cf
)
69 changes: 64 additions & 5 deletions go.sum

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions internal/runner/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/projectdiscovery/gologger"
"github.com/projectdiscovery/gologger/formatter"
"github.com/projectdiscovery/gologger/levels"
"github.com/projectdiscovery/proxify/pkg/logger/elastic"
"github.com/projectdiscovery/proxify/pkg/logger/kafka"
"github.com/projectdiscovery/proxify/pkg/types"
)

Expand Down Expand Up @@ -35,6 +37,8 @@ type Options struct {
DumpResponse bool // Dump responses in separate files
Deny types.CustomList // Deny ip/cidr
Allow types.CustomList // Allow ip/cidr
Elastic elastic.Options
Kafka kafka.Options
}

func ParseOptions() *Options {
Expand All @@ -55,6 +59,20 @@ func ParseOptions() *Options {
flagSet.BoolVar(&options.DumpResponse, "dump-resp", false, "Dump only HTTP responses to output file"),
)

createGroup(flagSet, "elasticsearch", "export logs to elasticsearch",
flagSet.StringVar(&options.Elastic.Addr, "elastic-address", "", "elasticsearch address (ip:port)"),
flagSet.BoolVar(&options.Elastic.SSL, "elastic-ssl", false, "enable elasticsearch ssl"),
flagSet.BoolVar(&options.Elastic.SSLVerification, "elastic-ssl-verification", false, "enable elasticsearch ssl verification"),
flagSet.StringVar(&options.Elastic.Username, "elastic-username", "", "elasticsearch username"),
flagSet.StringVar(&options.Elastic.Password, "elastic-password", "", "elasticsearch password"),
flagSet.StringVar(&options.Elastic.IndexName, "elastic-index", "proxify", "elasticsearch index name"),
)

createGroup(flagSet, "kafka", "export logs to kafka",
flagSet.StringVar(&options.Kafka.Addr, "kafka-address", "", "address of kafka broker (ip:port)"),
flagSet.StringVar(&options.Kafka.Topic, "kafka-topic", "proxify", "kafka topic to publish messages on"),
)

createGroup(flagSet, "filter", "Filter",
flagSet.StringVarP(&options.RequestDSL, "request-dsl", "req-fd", "", "Request Filter DSL"),
flagSet.StringVarP(&options.ResponseDSL, "response-dsl", "resp-fd", "", "Response Filter DSL"),
Expand Down
8 changes: 8 additions & 0 deletions internal/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func NewRunner(options *Options) (*Runner, error) {
ResponseMatchReplaceDSL: options.ResponseMatchReplaceDSL,
DumpRequest: options.DumpRequest,
DumpResponse: options.DumpResponse,
Elastic: &options.Elastic,
Kafka: &options.Kafka,
})
if err != nil {
return nil, err
Expand All @@ -54,6 +56,12 @@ func (r *Runner) Run() error {
if r.options.OutputDirectory != "" {
gologger.Print().Msgf("Saving traffic to %s\n", r.options.OutputDirectory)
}
if r.options.Kafka.Addr != "" {
gologger.Print().Msgf("Sending traffic to Kafka at %s\n", r.options.Kafka.Addr)
}
if r.options.Elastic.Addr != "" {
gologger.Print().Msgf("Sending traffic to Elasticsearch at %s\n", r.options.Elastic.Addr)
}

if r.options.UpstreamHTTPProxy != "" {
gologger.Print().Msgf("Using upstream HTTP proxy: %s\n", r.options.UpstreamHTTPProxy)
Expand Down
152 changes: 0 additions & 152 deletions logger.go

This file was deleted.

123 changes: 123 additions & 0 deletions pkg/logger/elastic/elasticsearch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package elastic

import (
"bytes"
"crypto/tls"
"fmt"
"io/ioutil"
"net/http"
"time"

"encoding/base64"
"encoding/json"

"github.com/pkg/errors"
"github.com/projectdiscovery/proxify/pkg/types"
)

// Options contains necessary options required for elasticsearch communicaiton
type Options struct {
// Address for elasticsearch instance
Addr string `yaml:"addr"`
// SSL enables ssl for elasticsearch connection
SSL bool `yaml:"ssl"`
// SSLVerification disables SSL verification for elasticsearch
SSLVerification bool `yaml:"ssl-verification"`
// Username for the elasticsearch instance
Username string `yaml:"username"`
// Password is the password for elasticsearch instance
Password string `yaml:"password"`
// IndexName is the name of the elasticsearch index
IndexName string `yaml:"index-name"`
}

// Client type for elasticsearch
type Client struct {
url string
authentication string
httpClient *http.Client
}

// New creates and returns a new client for elasticsearch
func New(option *Options) (*Client, error) {

httpClient := &http.Client{
Timeout: 5 * time.Second,
Transport: &http.Transport{
MaxIdleConns: 10,
MaxIdleConnsPerHost: 10,
TLSClientConfig: &tls.Config{InsecureSkipVerify: option.SSLVerification},
},
}
// preparing url for elasticsearch
scheme := "http://"
if option.SSL {
scheme = "https://"
}
// if authentication is required
var authentication string
if len(option.Username) > 0 && len(option.Password) > 0 {
auth := base64.StdEncoding.EncodeToString([]byte(option.Username + ":" + option.Password))
auth = "Basic " + auth
authentication = auth
}
url := fmt.Sprintf("%s%s/%s/_update/", scheme, option.Addr, option.IndexName)

ei := &Client{
url: url,
authentication: authentication,
httpClient: httpClient,
}
return ei, nil
}

// Store stores a passed log event in elasticsearch
func (c *Client) Store(data types.OutputData) error {
req, err := http.NewRequest(http.MethodPost, c.url+data.Name, nil)
parrasajad marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Wrap(err, "could not make request")
}
if len(c.authentication) > 0 {
req.Header.Add("Authorization", c.authentication)
}
req.Header.Add("Content-Type", "application/json")
var d map[string]interface{}
parrasajad marked this conversation as resolved.
Show resolved Hide resolved
if data.Userdata.HasResponse {
d = map[string]interface{}{
"response": data.DataString,
"timestamp": time.Now().Format(time.RFC3339),
}
} else {
d = map[string]interface{}{
"request": data.DataString,
"timestamp": time.Now().Format(time.RFC3339),
}
}

b, err := json.Marshal(&map[string]interface{}{
parrasajad marked this conversation as resolved.
Show resolved Hide resolved
"doc": d,
"doc_as_upsert": true,
})
if err != nil {
return err
}
req.Body = ioutil.NopCloser(bytes.NewReader(b))
res, err := c.httpClient.Do(req)
if err != nil {
return err
}

b, err = ioutil.ReadAll(res.Body)
if err != nil {
return errors.New(err.Error() + "error thrown by elasticsearch " + string(b))
parrasajad marked this conversation as resolved.
Show resolved Hide resolved
}
if res.StatusCode >= 300 {
return errors.New("elasticsearch responded with an error: " + string(b))
}
return nil
}

// Close closes the exporter after operation
func (c *Client) Close() error {
return nil
parrasajad marked this conversation as resolved.
Show resolved Hide resolved
}
Loading