Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow multiple values per request-arg in a HystrixObservableCollapser #881

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import rx.Observable;
import rx.Scheduler;
import rx.functions.Func1;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import rx.subjects.ReplaySubject;

Expand Down Expand Up @@ -163,16 +163,13 @@ public Observable<BatchReturnType> createObservableCommand(Collection<CollapsedR

@Override
public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
return batchResponse.single().flatMap(new Func1<BatchReturnType, Observable<Void>>() {

return batchResponse.single().doOnNext(new Action1<BatchReturnType>() {
@Override
public Observable<Void> call(BatchReturnType response) {
public void call(BatchReturnType batchReturnType) {
// this is a blocking call in HystrixCollapser
self.mapResponseToRequests(response, requests);
return Observable.empty();
self.mapResponseToRequests(batchReturnType, requests);
}

});
}).ignoreElements().cast(Void.class);
}

@Override
Expand Down Expand Up @@ -507,23 +504,40 @@ public interface CollapsedRequest<ResponseType, RequestArgumentType> {
public RequestArgumentType getArgument();

/**
* When set any client thread blocking on get() will immediately be unblocked and receive the response.
*
* This corresponds in a OnNext(Response); OnCompleted pair of emissions. It represents a single-value usecase.
*
* @throws IllegalStateException
* if called more than once or after setException.
* if called more than once or after setException/setComplete.
* @param response
* ResponseType
*/
public void setResponse(ResponseType response);

/**
* When set any client thread blocking on get() will immediately be unblocked and receive the exception.
* When invoked, any Observer will be OnNexted this value
* @throws IllegalStateException
* if called after setException/setResponse/setComplete.
* @param response
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this throw an IllegalStateException if called after setResponse or setComplete? That should be a developer bug I think, and not just swallowed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

*/
public void emitResponse(ResponseType response);

/**
* When set, any Observer will be OnErrored this exception
*
* @param exception exception to set on response
* @throws IllegalStateException
* if called more than once or after setResponse.
* if called more than once or after setResponse/setComplete.
*/
public void setException(Exception exception);

/**
* When set, any Observer will have an OnCompleted emitted.
* The intent is to use if after a series of emitResponses
*
* Note that, unlike the other 3 methods above, this method does not throw an IllegalStateException.
* This allows Hystrix-core to unilaterally call it without knowing the internal state.
*/
public void setComplete();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherFactory;
Expand All @@ -29,6 +31,7 @@
import rx.Observable;
import rx.Scheduler;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.ReplaySubject;
Expand Down Expand Up @@ -171,32 +174,46 @@ public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchR
// index the requests by key
final Map<K, CollapsedRequest<ResponseType, RequestArgumentType>> requestsByKey = new HashMap<K, CollapsedRequest<ResponseType, RequestArgumentType>>(requests.size());
for (CollapsedRequest<ResponseType, RequestArgumentType> cr : requests) {
requestsByKey.put(requestKeySelector.call(cr.getArgument()), cr);
K requestArg = requestKeySelector.call(cr.getArgument());
requestsByKey.put(requestArg, cr);
}
final Set<K> seenKeys = new HashSet<K>();

// observe the responses and join with the requests by key
return batchResponse.flatMap(new Func1<BatchReturnType, Observable<Void>>() {

return batchResponse.doOnNext(new Action1<BatchReturnType>() {
@Override
public Observable<Void> call(BatchReturnType r) {
K responseKey = batchResponseKeySelector.call(r);
CollapsedRequest<ResponseType, RequestArgumentType> requestForResponse = requestsByKey.get(responseKey);
requestForResponse.setResponse(mapBatchTypeToResponseType.call(r));
// now remove from map so we know what wasn't set at end
requestsByKey.remove(responseKey);
return Observable.empty();
public void call(BatchReturnType batchReturnType) {
try {
K responseKey = batchResponseKeySelector.call(batchReturnType);
CollapsedRequest<ResponseType, RequestArgumentType> requestForResponse = requestsByKey.get(responseKey);
if (requestForResponse != null) {
requestForResponse.emitResponse(mapBatchTypeToResponseType.call(batchReturnType));
// now add this to seenKeys, so we can later check what was seen, and what was unseen
seenKeys.add(responseKey);
} else {
logger.warn("Batch Response contained a response key not in request batch : " + responseKey);
}
} catch (Throwable ex) {
logger.warn("Uncaught error during demultiplexing of BatchResponse", ex);
}
}

}).doOnTerminate(new Action0() {

@Override
public void call() {
for (CollapsedRequest<ResponseType, RequestArgumentType> cr : requestsByKey.values()) {
onMissingResponse(cr);
for (K key: requestsByKey.keySet()) {
CollapsedRequest<ResponseType, RequestArgumentType> collapsedReq = requestsByKey.get(key);
if (!seenKeys.contains(key)) {
try {
onMissingResponse(collapsedReq);
} catch (Throwable ex) {
collapsedReq.setException(new RuntimeException("Error in HystrixObservableCollapser.onMissingResponse handler", ex));
}
}
//then unconditionally issue an onCompleted. this ensures the downstream gets a terminal, regardless of how onMissingResponse was implemented
collapsedReq.setComplete();
}
}

});
}).ignoreElements().cast(Void.class);
}

@Override
Expand Down
Loading