Skip to content

Commit

Permalink
Fix [Broker Interceptor] Pulsar didn't respond error messages when th…
Browse files Browse the repository at this point in the history
…row InterceptException (#17)
  • Loading branch information
Technoboy- committed Aug 12, 2021
1 parent 84241d4 commit 9e65a8c
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.web;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response;
import org.apache.pulsar.common.intercept.InterceptException;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;

/**
* Exception handler for handle exception.
*/
public class ExceptionHandler {

public void handle(ServletResponse response, Exception ex) throws IOException {
if (ex instanceof InterceptException) {
String reason = ex.getMessage();
byte[] content = reason.getBytes(StandardCharsets.UTF_8);
MetaData.Response info = new MetaData.Response();
info.setHttpVersion(HttpVersion.HTTP_1_1);
info.setReason(reason);
info.setStatus(((InterceptException) ex).getErrorCode());
info.setContentLength(content.length);
if (response instanceof org.eclipse.jetty.server.Response) {
((org.eclipse.jetty.server.Response) response).getHttpChannel().sendResponse(info,
ByteBuffer.wrap(content), true);
} else {
((HttpServletResponse) response).sendError(((InterceptException) ex).getErrorCode(),
ex.getMessage());
}
} else {
((HttpServletResponse) response).sendError(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
ex.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MediaType;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -38,8 +37,11 @@ public class PreInterceptFilter implements Filter {

private final BrokerInterceptor interceptor;

public PreInterceptFilter(BrokerInterceptor interceptor) {
private final ExceptionHandler exceptionHandler;

public PreInterceptFilter(BrokerInterceptor interceptor, ExceptionHandler exceptionHandler) {
this.interceptor = interceptor;
this.exceptionHandler = exceptionHandler;
}

@Override
Expand Down Expand Up @@ -67,7 +69,7 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo
interceptor.onWebserviceRequest(requestWrapper);
filterChain.doFilter(requestWrapper, servletResponse);
} catch (InterceptException e) {
((HttpServletResponse) servletResponse).sendError(e.getErrorCode(), e.getMessage());
exceptionHandler.handle(servletResponse, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,12 @@ public void addServlet(String path, ServletHolder servletHolder, boolean require
context.setAttribute(key, value);
});
}
ExceptionHandler handler = new ExceptionHandler();

if (!pulsar.getConfig().getBrokerInterceptors().isEmpty()
|| !pulsar.getConfig().isDisableBrokerInterceptors()) {
// Enable PreInterceptFilter only when interceptors are enabled
context.addFilter(new FilterHolder(new PreInterceptFilter(pulsar.getBrokerInterceptor())),
context.addFilter(new FilterHolder(new PreInterceptFilter(pulsar.getBrokerInterceptor(), handler)),
MATCH_ALL, EnumSet.allOf(DispatcherType.class));
context.addFilter(new FilterHolder(new ProcessHandlerFilter(pulsar)),
MATCH_ALL, EnumSet.allOf(DispatcherType.class));
Expand Down
9 changes: 9 additions & 0 deletions pulsar-broker/src/main/resources/admin_interceptor.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"properties" : {
"maxTenants" : 5,
"maxNamespaces" : 2,
"maxTopics" : 5,
"maxBundlesPerNamespace" : 1,
"resourceCacheRefreshInternalInMillis": 1000
}
}
53 changes: 53 additions & 0 deletions pulsar-broker/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration status="WARN" monitorInterval="30">
<Properties>
<!--自定义一些常量,之后使用${变量名}引用-->
<Property name="logFilePath">/Users/tboy/logs</Property>
<Property name="logFileName">test.log</Property>
</Properties>
<appenders>
<console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p [%c{50}#%M:%L] : %m%n"/>
</console>

<RollingFile name="RollingFileWarn" fileName="${logFilePath}/warn.log"
filePattern="${logFilePath}/warn-%d{yyyy-MM-dd}-%i.log">
<ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="[%d{HH:mm:ss:SSS}] [%p] - %l - %m%n"/>
<Policies>
<TimeBasedTriggeringPolicy/>
<SizeBasedTriggeringPolicy size="100 MB"/>
</Policies>
</RollingFile>
</appenders>

<loggers>
<logger name="org.apache.zookeeper" level="warn"/>
<logger name="org.eclipse.jetty" level="info"/>
<logger name="org.asynchttpclient" level="info"/>
<logger name="org.apache.bookkeeper.proto" level="info"/>
<root level="INFO">
<appender-ref ref="Console"/>
<!-- <appender-ref ref="RollingFileWarn"/>-->
</root>
</loggers>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.web.ExceptionHandler;
import org.apache.pulsar.broker.web.PreInterceptFilter;
import org.apache.pulsar.broker.web.ProcessHandlerFilter;
import org.apache.pulsar.broker.web.ResponseHandlerFilter;
Expand Down Expand Up @@ -61,7 +62,8 @@ public class InterceptFilterOutTest {
@Test
public void testFilterOutForPreInterceptFilter() throws Exception {
CounterBrokerInterceptor interceptor = new CounterBrokerInterceptor();
PreInterceptFilter filter = new PreInterceptFilter(interceptor);
ExceptionHandler handler = new ExceptionHandler();
PreInterceptFilter filter = new PreInterceptFilter(interceptor, handler);

HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.web;

import lombok.SneakyThrows;
import org.apache.pulsar.common.intercept.InterceptException;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.Response;
import org.mockito.Mockito;
import org.testng.annotations.Test;

import javax.servlet.http.HttpServletResponse;

/**
* Unit test for ExceptionHandler.
*/
@Test(groups = "broker")
public class ExceptionHandlerTest {

@Test
@SneakyThrows
public void testHandle() {
ExceptionHandler handler = new ExceptionHandler();
HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
handler.handle(response, new InterceptException(HttpStatus.PRECONDITION_FAILED_412, "Reach the max tenants [5] restriction"));
Mockito.verify(response).sendError(Mockito.anyInt(), Mockito.anyString());
handler.handle(response, new InterceptException(HttpStatus.INTERNAL_SERVER_ERROR_500, "internal exception"));
Mockito.verify(response, Mockito.times(2)).sendError(Mockito.anyInt(), Mockito.anyString());
handler.handle(response, new IllegalArgumentException("illegal argument exception "));
Mockito.verify(response, Mockito.times(3)).sendError(Mockito.anyInt(), Mockito.anyString());
Response response2 = Mockito.mock(Response.class);
HttpChannel httpChannel = Mockito.mock(HttpChannel.class);
Mockito.when(response2.getHttpChannel()).thenReturn(httpChannel);
handler.handle(response2, new InterceptException(HttpStatus.PRECONDITION_FAILED_412, "Reach the max tenants [5] restriction"));
Mockito.verify(httpChannel).sendResponse(Mockito.any(), Mockito.any(), Mockito.anyBoolean());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,22 @@ public Future<?> apply(ClientRequest jerseyRequest, AsyncConnectorCallback callb
} else {
ClientResponse jerseyResponse =
new ClientResponse(Status.fromStatusCode(response.getStatusCode()), jerseyRequest);
jerseyResponse.setStatusInfo(new javax.ws.rs.core.Response.StatusType() {
@Override
public int getStatusCode() {
return response.getStatusCode();
}

@Override
public Status.Family getFamily() {
return Status.Family.familyOf(response.getStatusCode());
}

@Override
public String getReasonPhrase() {
return response.getStatusText();
}
});
response.getHeaders().forEach(e -> jerseyResponse.header(e.getKey(), e.getValue()));
if (response.hasResponseBody()) {
jerseyResponse.setEntityStream(response.getResponseBodyAsStream());
Expand Down

0 comments on commit 9e65a8c

Please sign in to comment.