diff --git a/jsonrpc/handler.go b/jsonrpc/handler.go index 6a1f301940..68b839e58a 100644 --- a/jsonrpc/handler.go +++ b/jsonrpc/handler.go @@ -6,7 +6,7 @@ import ( "net/http" "reflect" "strings" - "sync" + "sync/atomic" "unicode" "github.com/0xPolygonHermez/zkevm-node/jsonrpc/types" @@ -73,23 +73,18 @@ func newJSONRpcHandler() *Handler { return handler } -var connectionCounter = 0 -var connectionCounterMutex sync.Mutex +var connectionCounter int64 = 0 // Handle is the function that knows which and how a function should // be executed when a JSON RPC request is received func (h *Handler) Handle(req handleRequest) types.Response { log := log.WithFields("method", req.Method, "requestId", req.ID) - connectionCounterMutex.Lock() - connectionCounter++ - connectionCounterMutex.Unlock() + atomic.AddInt64(&connectionCounter, 1) defer func() { - connectionCounterMutex.Lock() - connectionCounter-- - connectionCounterMutex.Unlock() - log.Debugf("Current open connections %d", connectionCounter) + atomic.AddInt64(&connectionCounter, -1) + log.Debugf("Current open connections %d", atomic.LoadInt64(&connectionCounter)) }() - log.Debugf("Current open connections %d", connectionCounter) + log.Debugf("Current open connections %d", atomic.LoadInt64(&connectionCounter)) log.Debugf("request params %v", string(req.Params)) service, fd, err := h.getFnHandler(req.Request) diff --git a/jsonrpc/server_test.go b/jsonrpc/server_test.go index 4cce938e20..84bcb7110d 100644 --- a/jsonrpc/server_test.go +++ b/jsonrpc/server_test.go @@ -7,6 +7,8 @@ import ( "io" "math/big" "net/http" + "sync" + "sync/atomic" "testing" "time" @@ -439,3 +441,92 @@ func TestRequestValidation(t *testing.T) { }) } } + +func TestMaxRequestPerIPPerSec(t *testing.T) { + // this is the number of requests the test will execute + // it's important to keep this number with an amount of + // requests that the machine running this test is able + // to execute in a single second + const numberOfRequests = 100 + // the number of workers are the amount of go routines + // the machine is able to run at the same time without + // consuming all the resources and making the go routines + // to affect each other performance, this number may vary + // depending on the machine spec running the test. + // a good number to this generally is a number close to + // the number of cores or threads provided by the CPU. + const workers = 12 + // it's important to keep this limit smaller than the + // number of requests the test is going to perform, so + // the test can have some requests rejected. + const maxRequestsPerIPAndSecond = 20 + + cfg := getSequencerDefaultConfig() + cfg.MaxRequestsPerIPAndSecond = maxRequestsPerIPAndSecond + s, m, _ := newMockedServerWithCustomConfig(t, cfg) + defer s.Stop() + + // since the limitation is made by second, + // the test waits 1 sec before starting because request are made during the + // server creation to check its availability. Waiting this second means + // we have a fresh second without any other request made. + time.Sleep(time.Second) + + // create a wait group to wait for all the requests to return + wg := sync.WaitGroup{} + wg.Add(numberOfRequests) + + // prepare mocks with specific amount of times it can be called + // this makes us sure the code is calling these methods only for + // allowed requests + times := int(cfg.MaxRequestsPerIPAndSecond) + m.DbTx.On("Commit", context.Background()).Return(nil).Times(times) + m.State.On("BeginStateTransaction", context.Background()).Return(m.DbTx, nil).Times(times) + m.State.On("GetLastL2BlockNumber", context.Background(), m.DbTx).Return(uint64(1), nil).Times(times) + + // prepare the workers to process the requests as long as a job is available + requestsLimitedCount := uint64(0) + jobs := make(chan int, numberOfRequests) + // put each worker to work + for i := 0; i < workers; i++ { + // each worker works in a go routine to be able to have many + // workers working concurrently + go func() { + // a worker keeps working indefinitely looking for new jobs + for { + // waits until a job is available + <-jobs + // send the request + _, err := s.JSONRPCCall("eth_blockNumber") + // if the request works well or gets rejected due to max requests per sec, it's ok + // otherwise we stop the test and log the error. + if err != nil { + if err.Error() == "429 - You have reached maximum request limit." { + atomic.AddUint64(&requestsLimitedCount, 1) + } else { + require.NoError(t, err) + } + } + + // registers in the wait group a request was executed and has returned + wg.Done() + } + }() + } + + // add jobs to notify workers accordingly to the number + // of requests the test wants to send to the server + for i := 0; i < numberOfRequests; i++ { + jobs <- i + } + + // wait for all the requests to return + wg.Wait() + + // checks if all the exceeded requests were limited + assert.Equal(t, uint64(numberOfRequests-maxRequestsPerIPAndSecond), requestsLimitedCount) + + // wait the server to process the last requests without breaking the + // connection abruptly + time.Sleep(time.Second) +}