diff --git a/cdc/capture/http_errors.go b/cdc/capture/http_errors.go
new file mode 100644
index 00000000000..ed58b71e108
--- /dev/null
+++ b/cdc/capture/http_errors.go
@@ -0,0 +1,51 @@
+// Copyright 2021 PingCAP, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package capture
+
+import (
+	"strings"
+
+	"github.com/pingcap/errors"
+	cerror "github.com/pingcap/tiflow/pkg/errors"
+)
+
+// httpBadRequestError is some errors that will cause a BadRequestError in http handler
+var httpBadRequestError = []*errors.Error{
+	cerror.ErrAPIInvalidParam, cerror.ErrSinkURIInvalid, cerror.ErrStartTsBeforeGC,
+	cerror.ErrChangeFeedNotExists, cerror.ErrTargetTsBeforeStartTs, cerror.ErrTableIneligible,
+	cerror.ErrFilterRuleInvalid, cerror.ErrChangefeedUpdateRefused, cerror.ErrMySQLConnectionError,
+	cerror.ErrMySQLInvalidConfig,
+}
+
+// IsHTTPBadRequestError check if a error is a http bad request error
+func IsHTTPBadRequestError(err error) bool {
+	if err == nil {
+		return false
+	}
+	for _, e := range httpBadRequestError {
+		if e.Equal(err) {
+			return true
+		}
+
+		rfcCode, ok := cerror.RFCCode(err)
+		if ok && e.RFCCode() == rfcCode {
+			return true
+		}
+
+		if strings.Contains(err.Error(), string(e.RFCCode())) {
+			return true
+		}
+	}
+	return false
+}
diff --git a/cdc/capture/http_errors_test.go b/cdc/capture/http_errors_test.go
new file mode 100644
index 00000000000..8437577a20d
--- /dev/null
+++ b/cdc/capture/http_errors_test.go
@@ -0,0 +1,33 @@
+// Copyright 2021 PingCAP, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package capture
+
+import (
+	"testing"
+
+	"github.com/pingcap/errors"
+	cerror "github.com/pingcap/tiflow/pkg/errors"
+	"github.com/stretchr/testify/require"
+)
+
+func TestIsHTTPBadRequestError(t *testing.T) {
+	err := cerror.ErrAPIInvalidParam.GenWithStack("aa")
+	require.Equal(t, true, IsHTTPBadRequestError(err))
+	err = cerror.ErrAPIInvalidParam.Wrap(errors.New("aa"))
+	require.Equal(t, true, IsHTTPBadRequestError(err))
+	err = cerror.ErrPDEtcdAPIError.GenWithStack("aa")
+	require.Equal(t, false, IsHTTPBadRequestError(err))
+	err = nil
+	require.Equal(t, false, IsHTTPBadRequestError(err))
+}
diff --git a/cdc/capture/http_handler.go b/cdc/capture/http_handler.go
index 1b340c7477e..133e7cc9e92 100644
--- a/cdc/capture/http_handler.go
+++ b/cdc/capture/http_handler.go
@@ -78,13 +78,15 @@ func (h *HTTPHandler) ListChangefeed(c *gin.Context) {
 	// get all changefeed status
 	statuses, err := statusProvider.GetAllChangeFeedStatuses(ctx)
 	if err != nil {
-		c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
+		_ = c.Error(err)
 		return
 	}
 	// get all changefeed infos
 	infos, err := statusProvider.GetAllChangeFeedInfo(ctx)
 	if err != nil {
-		c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
+		// this call will return a parsedError generated by the error we passed in
+		// so it is no need to check the parsedError
+		_ = c.Error(err)
 		return
 	}
 
@@ -140,38 +142,25 @@ func (h *HTTPHandler) GetChangefeed(c *gin.Context) {
 	ctx := c.Request.Context()
 	changefeedID := c.Param(apiOpVarChangefeedID)
 	if err := model.ValidateChangefeedID(changefeedID); err != nil {
-		c.IndentedJSON(http.StatusBadRequest,
-			model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID)))
+		_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
 		return
 	}
 
 	info, err := statusProvider.GetChangeFeedInfo(ctx, changefeedID)
 	if err != nil {
-		if cerror.ErrChangeFeedNotExists.Equal(err) {
-			c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err))
-			return
-		}
-		c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
+		_ = c.Error(err)
 		return
 	}
 
 	status, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID)
 	if err != nil {
-		if cerror.ErrChangeFeedNotExists.Equal(err) {
-			c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err))
-			return
-		}
-		c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
+		_ = c.Error(err)
 		return
 	}
 
 	processorInfos, err := statusProvider.GetAllTaskStatuses(ctx, changefeedID)
 	if err != nil {
-		if cerror.ErrChangeFeedNotExists.Equal(err) {
-			c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err))
-			return
-		}
-		c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
+		_ = c.Error(err)
 		return
 	}
 
@@ -197,7 +186,7 @@ func (h *HTTPHandler) GetChangefeed(c *gin.Context) {
 		TaskStatus:     taskStatus,
 	}
 
-	c.JSON(http.StatusOK, changefeedDetail)
+	c.IndentedJSON(http.StatusOK, changefeedDetail)
 }
 
 // CreateChangefeed creates a changefeed
@@ -219,29 +208,25 @@ func (h *HTTPHandler) CreateChangefeed(c *gin.Context) {
 	ctx := c.Request.Context()
 	var changefeedConfig model.ChangefeedConfig
 	if err := c.BindJSON(&changefeedConfig); err != nil {
-		c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
+		_ = c.Error(cerror.ErrAPIInvalidParam.Wrap(err))
 		return
 	}
 
 	info, err := verifyCreateChangefeedConfig(c, changefeedConfig, h.capture)
 	if err != nil {
-		if cerror.ErrPDEtcdAPIError.Equal(err) {
-			c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
-			return
-		}
-		c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err))
+		_ = c.Error(err)
 		return
 	}
 
 	infoStr, err := info.Marshal()
 	if err != nil {
-		c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
+		_ = c.Error(err)
 		return
 	}
 
 	err = h.capture.etcdClient.CreateChangefeedInfo(ctx, info, changefeedConfig.ID)
 	if err != nil {
-		c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
+		_ = c.Error(err)
 		return
 	}
 
@@ -269,18 +254,13 @@ func (h *HTTPHandler) PauseChangefeed(c *gin.Context) {
 
 	changefeedID := c.Param(apiOpVarChangefeedID)
 	if err := model.ValidateChangefeedID(changefeedID); err != nil {
-		c.IndentedJSON(http.StatusBadRequest,
-			model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID)))
+		_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
 		return
 	}
 	// check if the changefeed exists
 	_, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID)
 	if err != nil {
-		if cerror.ErrChangeFeedNotExists.Equal(err) {
-			c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err))
-			return
-		}
-		c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
+		_ = c.Error(err)
 		return
 	}
 
@@ -316,18 +296,13 @@ func (h *HTTPHandler) ResumeChangefeed(c *gin.Context) {
 	ctx := c.Request.Context()
 	changefeedID := c.Param(apiOpVarChangefeedID)
 	if err := model.ValidateChangefeedID(changefeedID); err != nil {
-		c.IndentedJSON(http.StatusBadRequest,
-			model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID)))
+		_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
 		return
 	}
 	// check if the changefeed exists
 	_, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID)
 	if err != nil {
-		if cerror.ErrChangeFeedNotExists.Equal(err) {
-			c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err))
-			return
-		}
-		c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
+		_ = c.Error(err)
 		return
 	}
 
@@ -370,21 +345,16 @@ func (h *HTTPHandler) UpdateChangefeed(c *gin.Context) {
 	changefeedID := c.Param(apiOpVarChangefeedID)
 
 	if err := model.ValidateChangefeedID(changefeedID); err != nil {
-		c.IndentedJSON(http.StatusBadRequest,
-			model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID)))
+		_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
 		return
 	}
 	info, err := statusProvider.GetChangeFeedInfo(ctx, changefeedID)
 	if err != nil {
-		if cerror.ErrChangeFeedNotExists.Equal(err) {
-			c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err))
-			return
-		}
-		c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
+		_ = c.Error(err)
 		return
 	}
 	if info.State != model.StateStopped {
-		c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs("can only update changefeed config when it is stopped")))
+		_ = c.Error(cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs("can only update changefeed config when it is stopped"))
 		return
 	}
 
@@ -392,19 +362,19 @@ func (h *HTTPHandler) UpdateChangefeed(c *gin.Context) {
 	// filter_rules, ignore_txn_start_ts, mounter_worker_num, sink_config
 	var changefeedConfig model.ChangefeedConfig
 	if err = c.BindJSON(&changefeedConfig); err != nil {
-		c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
+		_ = c.Error(err)
 		return
 	}
 
 	newInfo, err := verifyUpdateChangefeedConfig(ctx, changefeedConfig, info)
 	if err != nil {
-		c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err))
+		_ = c.Error(err)
 		return
 	}
 
 	err = h.capture.etcdClient.SaveChangeFeedInfo(ctx, newInfo, changefeedID)
 	if err != nil {
-		c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
+		_ = c.Error(err)
 		return
 	}
 
@@ -430,18 +400,13 @@ func (h *HTTPHandler) RemoveChangefeed(c *gin.Context) {
 	ctx := c.Request.Context()
 	changefeedID := c.Param(apiOpVarChangefeedID)
 	if err := model.ValidateChangefeedID(changefeedID); err != nil {
-		c.IndentedJSON(http.StatusBadRequest,
-			model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID)))
+		_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
 		return
 	}
 	// check if the changefeed exists
 	_, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID)
 	if err != nil {
-		if cerror.ErrChangeFeedNotExists.Equal(err) {
-			c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err))
-			return
-		}
-		c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
+		_ = c.Error(err)
 		return
 	}
 
@@ -478,18 +443,13 @@ func (h *HTTPHandler) RebalanceTable(c *gin.Context) {
 	changefeedID := c.Param(apiOpVarChangefeedID)
 
 	if err := model.ValidateChangefeedID(changefeedID); err != nil {
-		c.IndentedJSON(http.StatusBadRequest,
-			model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID)))
+		_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
 		return
 	}
 	// check if the changefeed exists
 	_, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID)
 	if err != nil {
-		if cerror.ErrChangeFeedNotExists.Equal(err) {
-			c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err))
-			return
-		}
-		c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
+		_ = c.Error(err)
 		return
 	}
 
@@ -522,18 +482,13 @@ func (h *HTTPHandler) MoveTable(c *gin.Context) {
 	ctx := c.Request.Context()
 	changefeedID := c.Param(apiOpVarChangefeedID)
 	if err := model.ValidateChangefeedID(changefeedID); err != nil {
-		c.IndentedJSON(http.StatusBadRequest,
-			model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID)))
+		_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
 		return
 	}
 	// check if the changefeed exists
 	_, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID)
 	if err != nil {
-		if cerror.ErrChangeFeedNotExists.Equal(err) {
-			c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err))
-			return
-		}
-		c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
+		_ = c.Error(err)
 		return
 	}
 
@@ -543,12 +498,12 @@ func (h *HTTPHandler) MoveTable(c *gin.Context) {
 	}{}
 	err = c.BindJSON(&data)
 	if err != nil {
-		c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err))
+		_ = c.Error(cerror.ErrAPIInvalidParam.Wrap(err))
+		return
 	}
 
 	if err := model.ValidateChangefeedID(data.CaptureID); err != nil {
-		c.IndentedJSON(http.StatusBadRequest,
-			model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid capture_id: %s", data.CaptureID)))
+		_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid capture_id: %s", data.CaptureID))
 		return
 	}
 
@@ -603,42 +558,34 @@ func (h *HTTPHandler) GetProcessor(c *gin.Context) {
 
 	changefeedID := c.Param(apiOpVarChangefeedID)
 	if err := model.ValidateChangefeedID(changefeedID); err != nil {
-		c.IndentedJSON(http.StatusBadRequest,
-			model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID)))
+		_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
 		return
 	}
 
 	captureID := c.Param(apiOpVarCaptureID)
 	if err := model.ValidateChangefeedID(captureID); err != nil {
-		c.IndentedJSON(http.StatusBadRequest,
-			model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid capture_id: %s", changefeedID)))
+		_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid capture_id: %s", captureID))
 		return
 	}
 
 	statuses, err := statusProvider.GetAllTaskStatuses(ctx, changefeedID)
 	if err != nil {
-		if cerror.ErrChangeFeedNotExists.Equal(err) {
-			c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err))
-		}
-		c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
+		_ = c.Error(err)
 		return
 	}
 	status, exist := statuses[captureID]
 	if !exist {
-		c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(cerror.ErrCaptureNotExist.GenWithStackByArgs(captureID)))
+		_ = c.Error(cerror.ErrCaptureNotExist.GenWithStackByArgs(captureID))
 	}
 
 	positions, err := statusProvider.GetTaskPositions(ctx, changefeedID)
 	if err != nil {
-		if cerror.ErrChangeFeedNotExists.Equal(err) {
-			c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err))
-		}
-		c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
+		_ = c.Error(err)
 		return
 	}
 	position, exist := positions[captureID]
 	if !exist {
-		c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(cerror.ErrCaptureNotExist.GenWithStackByArgs(captureID)))
+		_ = c.Error(cerror.ErrCaptureNotExist.GenWithStackByArgs(captureID))
 	}
 
 	processorDetail := &model.ProcessorDetail{CheckPointTs: position.CheckPointTs, ResolvedTs: position.ResolvedTs, Error: position.Error}
@@ -647,7 +594,7 @@ func (h *HTTPHandler) GetProcessor(c *gin.Context) {
 		tables = append(tables, tableID)
 	}
 	processorDetail.Tables = tables
-	c.JSON(http.StatusOK, processorDetail)
+	c.IndentedJSON(http.StatusOK, processorDetail)
 }
 
 // ListProcessor lists all processors in the TiCDC cluster
@@ -669,7 +616,7 @@ func (h *HTTPHandler) ListProcessor(c *gin.Context) {
 	ctx := c.Request.Context()
 	infos, err := statusProvider.GetProcessors(ctx)
 	if err != nil {
-		c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
+		_ = c.Error(err)
 		return
 	}
 	resps := make([]*model.ProcessorCommonInfo, len(infos))
@@ -699,7 +646,7 @@ func (h *HTTPHandler) ListCapture(c *gin.Context) {
 	ctx := c.Request.Context()
 	captureInfos, err := statusProvider.GetCaptures(ctx)
 	if err != nil {
-		c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
+		_ = c.Error(err)
 		return
 	}
 
@@ -781,14 +728,13 @@ func SetLogLevel(c *gin.Context) {
 	}{}
 	err := c.BindJSON(&data)
 	if err != nil {
-		c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err))
+		_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid log level: %s", err.Error()))
 		return
 	}
 
 	err = logutil.SetLogLevel(data.Level)
 	if err != nil {
-		c.IndentedJSON(http.StatusBadRequest,
-			model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("fail to change log level: %s", err)))
+		_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("fail to change log level: %s", data.Level))
 		return
 	}
 	log.Warn("log level changed", zap.String("level", data.Level))
@@ -800,7 +746,7 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) {
 	ctx := c.Request.Context()
 	// every request can only forward to owner one time
 	if len(c.GetHeader(forWardFromCapture)) != 0 {
-		c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(cerror.ErrRequestForwardErr.FastGenByArgs()))
+		_ = c.Error(cerror.ErrRequestForwardErr.FastGenByArgs())
 		return
 	}
 	c.Header(forWardFromCapture, h.capture.Info().ID)
@@ -817,13 +763,13 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) {
 		return nil
 	}, retry.WithBackoffBaseDelay(300), retry.WithMaxTries(getOwnerRetryMaxTime))
 	if err != nil {
-		c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
+		_ = c.Error(err)
 		return
 	}
 
 	tslConfig, err := config.GetGlobalServerConfig().Security.ToTLSConfigWithVerify()
 	if err != nil {
-		c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
+		_ = c.Error(err)
 		return
 	}
 
@@ -845,7 +791,7 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) {
 	cli := httputil.NewClient(tslConfig)
 	resp, err := cli.Do(req)
 	if err != nil {
-		c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
+		_ = c.Error(err)
 		return
 	}
 
@@ -863,7 +809,7 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) {
 	defer resp.Body.Close()
 	_, err = bufio.NewReader(resp.Body).WriteTo(c.Writer)
 	if err != nil {
-		c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
+		_ = c.Error(err)
 		return
 	}
 }
diff --git a/cdc/capture/http_validator.go b/cdc/capture/http_validator.go
index 0281a23d63f..029cfd68368 100644
--- a/cdc/capture/http_validator.go
+++ b/cdc/capture/http_validator.go
@@ -139,7 +139,7 @@ func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
 
 	tz, err := util.GetTimezone(changefeedConfig.TimeZone)
 	if err != nil {
-		return nil, errors.Annotate(err, "invalid timezone:"+changefeedConfig.TimeZone)
+		return nil, cerror.ErrAPIInvalidParam.Wrap(errors.Annotatef(err, "invalid timezone:%s", changefeedConfig.TimeZone))
 	}
 	ctx = util.PutTimezoneInCtx(ctx, tz)
 	if err := sink.Validate(ctx, info.SinkURI, info.Config, info.Opts); err != nil {
diff --git a/cdc/http_router.go b/cdc/http_router.go
index 51019609629..dd75deef268 100644
--- a/cdc/http_router.go
+++ b/cdc/http_router.go
@@ -21,23 +21,29 @@ import (
 	"time"
 
 	"github.com/gin-gonic/gin"
+	"github.com/pingcap/log"
 	"github.com/pingcap/tiflow/cdc/capture"
+	"github.com/pingcap/tiflow/cdc/model"
 	swaggerFiles "github.com/swaggo/files"
 	ginSwagger "github.com/swaggo/gin-swagger"
+	"go.uber.org/zap"
 
 	// use for OpenAPI online docs
 	_ "github.com/pingcap/tiflow/docs/api"
 )
 
 // newRouter create a router for OpenAPI
+
 func newRouter(captureHandler capture.HTTPHandler) *gin.Engine {
 	// discard gin log output
 	gin.DefaultWriter = io.Discard
 
 	router := gin.New()
 
+	router.Use(logMiddleware())
 	// request will timeout after 10 second
 	router.Use(timeoutMiddleware(time.Second * 10))
+	router.Use(errorHandleMiddleware())
 
 	// OpenAPI online docs
 	router.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler))
@@ -96,7 +102,7 @@ func newRouter(captureHandler capture.HTTPHandler) *gin.Engine {
 }
 
 // timeoutMiddleware wraps the request context with a timeout
-func timeoutMiddleware(timeout time.Duration) func(c *gin.Context) {
+func timeoutMiddleware(timeout time.Duration) gin.HandlerFunc {
 	return func(c *gin.Context) {
 		// wrap the request context with a timeout
 		ctx, cancel := context.WithTimeout(c.Request.Context(), timeout)
@@ -119,3 +125,51 @@ func timeoutMiddleware(timeout time.Duration) func(c *gin.Context) {
 		c.Next()
 	}
 }
+
+func logMiddleware() gin.HandlerFunc {
+	return func(c *gin.Context) {
+		start := time.Now()
+		path := c.Request.URL.Path
+		query := c.Request.URL.RawQuery
+		c.Next()
+
+		cost := time.Since(start)
+
+		err := c.Errors.Last()
+		var stdErr error
+		if err != nil {
+			stdErr = err.Err
+		}
+
+		log.Info(path,
+			zap.Int("status", c.Writer.Status()),
+			zap.String("method", c.Request.Method),
+			zap.String("path", path),
+			zap.String("query", query),
+			zap.String("ip", c.ClientIP()),
+			zap.String("user-agent", c.Request.UserAgent()),
+			zap.Error(stdErr),
+			zap.Duration("cost", cost),
+		)
+	}
+}
+
+func errorHandleMiddleware() gin.HandlerFunc {
+	return func(c *gin.Context) {
+		c.Next()
+		// because we will return immediately after an error occurs in http_handler
+		// there wil be only one error in c.Errors
+		lastError := c.Errors.Last()
+		if lastError != nil {
+			err := lastError.Err
+			// put the error into response
+			if capture.IsHTTPBadRequestError(err) {
+				c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err))
+			} else {
+				c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
+			}
+			c.Abort()
+			return
+		}
+	}
+}
diff --git a/cdc/http_router_test.go b/cdc/http_router_test.go
index 47ebb4245b9..4f779c4adec 100644
--- a/cdc/http_router_test.go
+++ b/cdc/http_router_test.go
@@ -23,9 +23,8 @@ import (
 	"github.com/stretchr/testify/require"
 )
 
-func TestPProfRouter(t *testing.T) {
+func TestPProfPath(t *testing.T) {
 	t.Parallel()
-
 	router := newRouter(capture.NewHTTPHandler(nil))
 
 	apis := []*openAPI{
diff --git a/cdc/http_status.go b/cdc/http_status.go
index 7b534b502ec..4b3890b3238 100644
--- a/cdc/http_status.go
+++ b/cdc/http_status.go
@@ -39,6 +39,7 @@ import (
 )
 
 func (s *Server) startStatusHTTP() error {
+	conf := config.GetGlobalServerConfig()
 	router := newRouter(capture.NewHTTPHandler(s.capture))
 
 	router.GET("/status", gin.WrapF(s.handleStatus))
@@ -58,7 +59,6 @@ func (s *Server) startStatusHTTP() error {
 	prometheus.DefaultGatherer = registry
 	router.Any("/metrics", gin.WrapH(promhttp.Handler()))
 
-	conf := config.GetGlobalServerConfig()
 	err := conf.Security.AddSelfCommonName()
 	if err != nil {
 		log.Error("status server set tls config failed", zap.Error(err))
diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go
index d0704ba8a92..aeab222697d 100644
--- a/cdc/sink/mysql.go
+++ b/cdc/sink/mysql.go
@@ -334,7 +334,7 @@ func checkTiDBVariable(ctx context.Context, db *sql.DB, variableName, defaultVal
 	err := db.QueryRowContext(ctx, querySQL).Scan(&name, &value)
 	if err != nil && err != sql.ErrNoRows {
 		errMsg := "fail to query session variable " + variableName
-		return "", errors.Annotate(cerror.WrapError(cerror.ErrMySQLQueryError, err), errMsg)
+		return "", cerror.ErrMySQLQueryError.Wrap(err).GenWithStack(errMsg)
 	}
 	// session variable works, use given default value
 	if err == nil {
@@ -438,13 +438,12 @@ func parseSinkURI(ctx context.Context, sinkURI *url.URL, opts map[string]string)
 		}
 		tlsCfg, err := credential.ToTLSConfig()
 		if err != nil {
-			return nil, errors.Annotate(err, "fail to open MySQL connection")
+			return nil, errors.Trace(err)
 		}
 		name := "cdc_mysql_tls" + params.changefeedID
 		err = dmysql.RegisterTLSConfig(name, tlsCfg)
 		if err != nil {
-			return nil, errors.Annotate(
-				cerror.WrapError(cerror.ErrMySQLConnectionError, err), "fail to open MySQL connection")
+			return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection")
 		}
 		params.tls = "?tls=" + name
 	}
@@ -513,8 +512,7 @@ var GetDBConnImpl = getDBConn
 func getDBConn(ctx context.Context, dsnStr string) (*sql.DB, error) {
 	db, err := sql.Open("mysql", dsnStr)
 	if err != nil {
-		return nil, errors.Annotate(
-			cerror.WrapError(cerror.ErrMySQLConnectionError, err), "Open database connection failed")
+		return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection")
 	}
 	err = db.PingContext(ctx)
 	if err != nil {
@@ -522,8 +520,7 @@ func getDBConn(ctx context.Context, dsnStr string) (*sql.DB, error) {
 		if closeErr := db.Close(); closeErr != nil {
 			log.Warn("close db failed", zap.Error(err))
 		}
-		return nil, errors.Annotate(
-			cerror.WrapError(cerror.ErrMySQLConnectionError, err), "fail to open MySQL connection")
+		return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection")
 	}
 	return db, nil
 }
@@ -1361,12 +1358,12 @@ func newMySQLSyncpointStore(ctx context.Context, id string, sinkURI *url.URL) (S
 		}
 		tlsCfg, err := credential.ToTLSConfig()
 		if err != nil {
-			return nil, errors.Annotate(err, "fail to open MySQL connection")
+			return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection")
 		}
 		name := "cdc_mysql_tls" + "syncpoint" + id
 		err = dmysql.RegisterTLSConfig(name, tlsCfg)
 		if err != nil {
-			return nil, errors.Annotate(err, "fail to open MySQL connection")
+			return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection")
 		}
 		tlsParam = "?tls=" + name
 	}
@@ -1406,8 +1403,7 @@ func newMySQLSyncpointStore(ctx context.Context, id string, sinkURI *url.URL) (S
 	}
 	testDB, err := sql.Open("mysql", dsn.FormatDSN())
 	if err != nil {
-		return nil, errors.Annotate(
-			cerror.WrapError(cerror.ErrMySQLConnectionError, err), "fail to open MySQL connection when configuring sink")
+		return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection when configuring sink")
 	}
 	defer testDB.Close()
 	dsnStr, err = configureSinkURI(ctx, dsn, params, testDB)
@@ -1416,11 +1412,11 @@ func newMySQLSyncpointStore(ctx context.Context, id string, sinkURI *url.URL) (S
 	}
 	syncDB, err = sql.Open("mysql", dsnStr)
 	if err != nil {
-		return nil, errors.Annotate(err, "Open database connection failed")
+		return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection")
 	}
 	err = syncDB.PingContext(ctx)
 	if err != nil {
-		return nil, errors.Annotate(err, "fail to open MySQL connection")
+		return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection")
 	}
 
 	log.Info("Start mysql syncpoint sink")
diff --git a/cdc/sink/simple_mysql_tester.go b/cdc/sink/simple_mysql_tester.go
index 2bb46c14bb9..4bfd5abf7ae 100644
--- a/cdc/sink/simple_mysql_tester.go
+++ b/cdc/sink/simple_mysql_tester.go
@@ -85,7 +85,7 @@ func newSimpleMySQLSink(ctx context.Context, sinkURI *url.URL, config *config.Re
 	db, err = sql.Open("mysql", dsnStr)
 	if err != nil {
 		return nil, errors.Annotate(
-			cerror.WrapError(cerror.ErrMySQLConnectionError, err), "Open database connection failed")
+			cerror.WrapError(cerror.ErrMySQLConnectionError, err), "fail to open MySQL connection")
 	}
 	err = db.PingContext(ctx)
 	if err != nil {
diff --git a/cdc/sink/sink_test.go b/cdc/sink/sink_test.go
index ae68c9f3984..63acc486138 100644
--- a/cdc/sink/sink_test.go
+++ b/cdc/sink/sink_test.go
@@ -35,7 +35,7 @@ func TestValidateSink(t *testing.T) {
 	sinkURI := "mysql://root:111@127.0.0.1:3306/"
 	err := Validate(ctx, sinkURI, replicateConfig, opts)
 	require.NotNil(t, err)
-	require.Regexp(t, "fail to open MySQL connection.*ErrMySQLConnectionError.*", err)
+	require.Contains(t, err.Error(), "fail to open MySQL connection")
 
 	// test sink uri right
 	sinkURI = "blackhole://"
diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go
index 05f523b33b7..09430135d3a 100644
--- a/pkg/config/config_test.go
+++ b/pkg/config/config_test.go
@@ -69,6 +69,7 @@ func TestReplicaConfigOutDated(t *testing.T) {
 
 func TestServerConfigMarshal(t *testing.T) {
 	t.Parallel()
+
 	rawConfig := `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","log":{"file":{"max-size":300,"max-days":0,"max-backups":0}},"data-dir":"","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":30,"max-memory-consumption":17179869184,"num-workerpool-goroutine":16,"sort-dir":"/tmp/sorter"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":10485760,"kv-client":{"worker-concurrent":8,"worker-pool-size":0,"region-scan-limit":40}}`
 
 	conf := GetDefaultServerConfig()