Skip to content

Commit

Permalink
Fix [Broker Interceptor] Pulsar didn't respond error messages when hi…
Browse files Browse the repository at this point in the history
…tting the resource limit (#17)
  • Loading branch information
Technoboy- committed Aug 12, 2021
1 parent 84241d4 commit 4b44911
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.web;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response;
import org.apache.pulsar.common.intercept.InterceptException;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;

/**
* Exception handler for handle exception.
*/
public class ExceptionHandler {

public void handle(ServletResponse response, Exception ex) throws IOException {
if (ex instanceof InterceptException) {
String reason = ex.getMessage();
byte[] content = reason.getBytes(StandardCharsets.UTF_8);
MetaData.Response info = new MetaData.Response();
info.setHttpVersion(HttpVersion.HTTP_1_1);
info.setReason(reason);
info.setStatus(((InterceptException) ex).getErrorCode());
info.setContentLength(content.length);
if (response instanceof org.eclipse.jetty.server.Response) {
((org.eclipse.jetty.server.Response) response).getHttpChannel().sendResponse(info,
ByteBuffer.wrap(content), true);
} else {
((HttpServletResponse) response).sendError(((InterceptException) ex).getErrorCode(),
ex.getMessage());
}
} else {
((HttpServletResponse) response).sendError(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
ex.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MediaType;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -38,8 +37,11 @@ public class PreInterceptFilter implements Filter {

private final BrokerInterceptor interceptor;

public PreInterceptFilter(BrokerInterceptor interceptor) {
private final ExceptionHandler exceptionHandler;

public PreInterceptFilter(BrokerInterceptor interceptor, ExceptionHandler exceptionHandler) {
this.interceptor = interceptor;
this.exceptionHandler = exceptionHandler;
}

@Override
Expand Down Expand Up @@ -67,7 +69,7 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo
interceptor.onWebserviceRequest(requestWrapper);
filterChain.doFilter(requestWrapper, servletResponse);
} catch (InterceptException e) {
((HttpServletResponse) servletResponse).sendError(e.getErrorCode(), e.getMessage());
exceptionHandler.handle(servletResponse, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,12 @@ public void addServlet(String path, ServletHolder servletHolder, boolean require
context.setAttribute(key, value);
});
}
ExceptionHandler handler = new ExceptionHandler();

if (!pulsar.getConfig().getBrokerInterceptors().isEmpty()
|| !pulsar.getConfig().isDisableBrokerInterceptors()) {
// Enable PreInterceptFilter only when interceptors are enabled
context.addFilter(new FilterHolder(new PreInterceptFilter(pulsar.getBrokerInterceptor())),
context.addFilter(new FilterHolder(new PreInterceptFilter(pulsar.getBrokerInterceptor(), handler)),
MATCH_ALL, EnumSet.allOf(DispatcherType.class));
context.addFilter(new FilterHolder(new ProcessHandlerFilter(pulsar)),
MATCH_ALL, EnumSet.allOf(DispatcherType.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.web.ExceptionHandler;
import org.apache.pulsar.broker.web.PreInterceptFilter;
import org.apache.pulsar.broker.web.ProcessHandlerFilter;
import org.apache.pulsar.broker.web.ResponseHandlerFilter;
Expand Down Expand Up @@ -61,7 +62,8 @@ public class InterceptFilterOutTest {
@Test
public void testFilterOutForPreInterceptFilter() throws Exception {
CounterBrokerInterceptor interceptor = new CounterBrokerInterceptor();
PreInterceptFilter filter = new PreInterceptFilter(interceptor);
ExceptionHandler handler = new ExceptionHandler();
PreInterceptFilter filter = new PreInterceptFilter(interceptor, handler);

HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.web;

import lombok.SneakyThrows;
import org.apache.pulsar.common.intercept.InterceptException;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.Response;
import org.mockito.Mockito;
import org.testng.annotations.Test;

import javax.servlet.http.HttpServletResponse;

/**
* Unit test for ExceptionHandler.
*/
@Test(groups = "broker")
public class ExceptionHandlerTest {

@Test
@SneakyThrows
public void testHandle() {
ExceptionHandler handler = new ExceptionHandler();
HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
handler.handle(response, new InterceptException(HttpStatus.PRECONDITION_FAILED_412, "Reach the max tenants [5] restriction"));
Mockito.verify(response).sendError(Mockito.anyInt(), Mockito.anyString());
handler.handle(response, new InterceptException(HttpStatus.INTERNAL_SERVER_ERROR_500, "internal exception"));
Mockito.verify(response, Mockito.times(2)).sendError(Mockito.anyInt(), Mockito.anyString());
handler.handle(response, new IllegalArgumentException("illegal argument exception "));
Mockito.verify(response, Mockito.times(3)).sendError(Mockito.anyInt(), Mockito.anyString());
Response response2 = Mockito.mock(Response.class);
HttpChannel httpChannel = Mockito.mock(HttpChannel.class);
Mockito.when(response2.getHttpChannel()).thenReturn(httpChannel);
handler.handle(response2, new InterceptException(HttpStatus.PRECONDITION_FAILED_412, "Reach the max tenants [5] restriction"));
Mockito.verify(httpChannel).sendResponse(Mockito.any(), Mockito.any(), Mockito.anyBoolean());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,22 @@ public Future<?> apply(ClientRequest jerseyRequest, AsyncConnectorCallback callb
} else {
ClientResponse jerseyResponse =
new ClientResponse(Status.fromStatusCode(response.getStatusCode()), jerseyRequest);
jerseyResponse.setStatusInfo(new javax.ws.rs.core.Response.StatusType() {
@Override
public int getStatusCode() {
return response.getStatusCode();
}

@Override
public Status.Family getFamily() {
return Status.Family.familyOf(response.getStatusCode());
}

@Override
public String getReasonPhrase() {
return response.getStatusText();
}
});
response.getHeaders().forEach(e -> jerseyResponse.header(e.getKey(), e.getValue()));
if (response.hasResponseBody()) {
jerseyResponse.setEntityStream(response.getResponseBodyAsStream());
Expand Down

0 comments on commit 4b44911

Please sign in to comment.