Skip to content

Commit

Permalink
Merge pull request #890 from mattrjacobs/multiple-responses-per-colla…
Browse files Browse the repository at this point in the history
…pser-arg

Multiple responses per collapser arg
  • Loading branch information
mattrjacobs committed Sep 10, 2015
2 parents b21f8ca + f88bb26 commit 0e18402
Show file tree
Hide file tree
Showing 5 changed files with 728 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static class CollapserState {
@Param({"1", "10", "100", "1000"})
int numToCollapse;

@Param({"1"}) //until bugfix for https://github.com/Netflix/Hystrix/issues/865, 1 is only value that works as expected
@Param({"1", "10", "100"})
int numResponsesPerArg;

@Param({"1", "1000", "1000000"})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import rx.Observable;
import rx.Scheduler;
import rx.functions.Func1;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import rx.subjects.ReplaySubject;

Expand Down Expand Up @@ -163,16 +163,13 @@ public Observable<BatchReturnType> createObservableCommand(Collection<CollapsedR

@Override
public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
return batchResponse.single().flatMap(new Func1<BatchReturnType, Observable<Void>>() {

return batchResponse.single().doOnNext(new Action1<BatchReturnType>() {
@Override
public Observable<Void> call(BatchReturnType response) {
public void call(BatchReturnType batchReturnType) {
// this is a blocking call in HystrixCollapser
self.mapResponseToRequests(response, requests);
return Observable.empty();
self.mapResponseToRequests(batchReturnType, requests);
}

});
}).ignoreElements().cast(Void.class);
}

@Override
Expand Down Expand Up @@ -509,23 +506,40 @@ public interface CollapsedRequest<ResponseType, RequestArgumentType> {
public RequestArgumentType getArgument();

/**
* When set any client thread blocking on get() will immediately be unblocked and receive the response.
*
* This corresponds in a OnNext(Response); OnCompleted pair of emissions. It represents a single-value usecase.
*
* @throws IllegalStateException
* if called more than once or after setException.
* if called more than once or after setException/setComplete.
* @param response
* ResponseType
*/
public void setResponse(ResponseType response);

/**
* When set any client thread blocking on get() will immediately be unblocked and receive the exception.
* When invoked, any Observer will be OnNexted this value
* @throws IllegalStateException
* if called after setException/setResponse/setComplete.
* @param response
*/
public void emitResponse(ResponseType response);

/**
* When set, any Observer will be OnErrored this exception
*
* @param exception exception to set on response
* @throws IllegalStateException
* if called more than once or after setResponse.
* if called more than once or after setResponse/setComplete.
*/
public void setException(Exception exception);

/**
* When set, any Observer will have an OnCompleted emitted.
* The intent is to use if after a series of emitResponses
*
* Note that, unlike the other 3 methods above, this method does not throw an IllegalStateException.
* This allows Hystrix-core to unilaterally call it without knowing the internal state.
*/
public void setComplete();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherFactory;
Expand All @@ -29,6 +31,7 @@
import rx.Observable;
import rx.Scheduler;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.ReplaySubject;
Expand Down Expand Up @@ -171,32 +174,46 @@ public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchR
// index the requests by key
final Map<K, CollapsedRequest<ResponseType, RequestArgumentType>> requestsByKey = new HashMap<K, CollapsedRequest<ResponseType, RequestArgumentType>>(requests.size());
for (CollapsedRequest<ResponseType, RequestArgumentType> cr : requests) {
requestsByKey.put(requestKeySelector.call(cr.getArgument()), cr);
K requestArg = requestKeySelector.call(cr.getArgument());
requestsByKey.put(requestArg, cr);
}
final Set<K> seenKeys = new HashSet<K>();

// observe the responses and join with the requests by key
return batchResponse.flatMap(new Func1<BatchReturnType, Observable<Void>>() {

return batchResponse.doOnNext(new Action1<BatchReturnType>() {
@Override
public Observable<Void> call(BatchReturnType r) {
K responseKey = batchResponseKeySelector.call(r);
CollapsedRequest<ResponseType, RequestArgumentType> requestForResponse = requestsByKey.get(responseKey);
requestForResponse.setResponse(mapBatchTypeToResponseType.call(r));
// now remove from map so we know what wasn't set at end
requestsByKey.remove(responseKey);
return Observable.empty();
public void call(BatchReturnType batchReturnType) {
try {
K responseKey = batchResponseKeySelector.call(batchReturnType);
CollapsedRequest<ResponseType, RequestArgumentType> requestForResponse = requestsByKey.get(responseKey);
if (requestForResponse != null) {
requestForResponse.emitResponse(mapBatchTypeToResponseType.call(batchReturnType));
// now add this to seenKeys, so we can later check what was seen, and what was unseen
seenKeys.add(responseKey);
} else {
logger.warn("Batch Response contained a response key not in request batch : " + responseKey);
}
} catch (Throwable ex) {
logger.warn("Uncaught error during demultiplexing of BatchResponse", ex);
}
}

}).doOnTerminate(new Action0() {

@Override
public void call() {
for (CollapsedRequest<ResponseType, RequestArgumentType> cr : requestsByKey.values()) {
onMissingResponse(cr);
for (K key: requestsByKey.keySet()) {
CollapsedRequest<ResponseType, RequestArgumentType> collapsedReq = requestsByKey.get(key);
if (!seenKeys.contains(key)) {
try {
onMissingResponse(collapsedReq);
} catch (Throwable ex) {
collapsedReq.setException(new RuntimeException("Error in HystrixObservableCollapser.onMissingResponse handler", ex));
}
}
//then unconditionally issue an onCompleted. this ensures the downstream gets a terminal, regardless of how onMissingResponse was implemented
collapsedReq.setComplete();
}
}

});
}).ignoreElements().cast(Void.class);
}

@Override
Expand Down
Loading

0 comments on commit 0e18402

Please sign in to comment.