Skip to content

Commit

Permalink
reduce biFunction scope
Browse files Browse the repository at this point in the history
  • Loading branch information
Roiocam committed Mar 29, 2024
1 parent 5d9c18c commit b5bff50
Showing 1 changed file with 83 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,39 +221,46 @@ protected MultiGetResult<K, V> do_GET_ALL(Set<? extends K> keys) {
// define the result object early to gain statefulFunction feature.
Map<K, CacheGetResult<V>> resultMap = new HashMap<>();
StringBinaryCommands readCommands = (StringBinaryCommands) readCommands();
ArrayList<K> keyList = new ArrayList<K>(keys);

return this.<StringBinaryCommands, StringPipelineBinaryCommands, List<byte[]>, MultiGetResult<K, V>>doWithPipeline(
readCommands,
false,
(pipeline) -> this.getAllRequestHandle(readCommands, pipeline, keyList),
(results, ex) -> this.getAllResultHandle(results, ex, keyList, resultMap)
);
}

return this.<StringBinaryCommands, StringPipelineBinaryCommands, MultiGetResult<K, V>>doWithPipeline(readCommands, false, (pipeline, ex) -> {
if (ex != null) {
logError("GET_ALL", "keys(" + keys.size() + ")", ex);
if (!resultMap.isEmpty()) {
return new MultiGetResult<K, V>(CacheResultCode.PART_SUCCESS, ex.toString(), resultMap);
} else {
return new MultiGetResult<K, V>(ex);
}
private List<byte[]> getAllRequestHandle(StringBinaryCommands readCommands, StringPipelineBinaryCommands pipeline, ArrayList<K> keyList) {
byte[][] newKeys = keyList.stream().map(this::buildKey).toArray(byte[][]::new);
List<byte[]> results;
if (pipeline != null) {
List<Response<byte[]>> responseList = new ArrayList<>();
for (byte[] newKey : newKeys) {
Response<byte[]> response = pipeline.get(newKey);
responseList.add(response);
}
ArrayList<K> keyList = new ArrayList<K>(keys);
byte[][] newKeys = keyList.stream().map(this::buildKey).toArray(byte[][]::new);
List<byte[]> results;
if (pipeline != null) {
List<Response<byte[]>> responseList = new ArrayList<>();
// Which is faster between pipeline.get or Jedis.mget()?
for (byte[] newKey : newKeys) {
Response<byte[]> response = pipeline.get(newKey);
responseList.add(response);
}

sync(pipeline);
sync(pipeline);

results = responseList.stream().map(Response::get).collect(Collectors.toList());
} else {
results = readCommands.mget(newKeys);
}
return results;
}


results = responseList.stream().map(Response::get).collect(Collectors.toList());
private MultiGetResult<K, V> getAllResultHandle(List<byte[]> results, Exception ex, List<? extends K> keyList, Map<K, CacheGetResult<V>> resultMap) {
if (ex != null) {
logError("GET_ALL", "keys(" + keyList.size() + ")", ex);
if (!resultMap.isEmpty()) {
return new MultiGetResult<K, V>(CacheResultCode.PART_SUCCESS, ex.toString(), resultMap);
} else {
results = readCommands.mget(newKeys);
return new MultiGetResult<K, V>(ex);
}
}

return this.getAllResultAssemble(keyList, results, resultMap);
});
}

private MultiGetResult<K, V> getAllResultAssemble(List<? extends K> keyList, List<byte[]> results, Map<K, CacheGetResult<V>> resultMap) {
for (int i = 0; i < results.size(); i++) {
Object value = results.get(i);
K key = keyList.get(i);
Expand Down Expand Up @@ -300,17 +307,24 @@ protected CacheResult do_PUT_ALL(Map<? extends K, ? extends V> map, long expireA
return CacheResult.SUCCESS_WITHOUT_MSG;
}
StringBinaryCommands writeCommands = (StringBinaryCommands) writeCommands();
return this.<StringBinaryCommands, StringPipelineBinaryCommands, CacheResult>doWithPipeline(writeCommands, true, (pipeline, ex) -> {
if (ex != null) {
logError("PUT_ALL", "map(" + map.size() + ")", ex);
return new CacheResult(ex);
}
return pipelinePutAll(pipeline, map, expireAfterWrite, timeUnit);
});
return this.<StringBinaryCommands, StringPipelineBinaryCommands, Integer, CacheResult>doWithPipeline(
writeCommands,
true,
(pipeline) -> this.putAllRequestHandle(pipeline, map, expireAfterWrite, timeUnit),
(res, ex) -> this.putAllResultHandle(res, ex, map)
);
}

private CacheResult putAllResultHandle(Integer res, Exception ex, Map<? extends K, ? extends V> map) {
if (ex != null) {
logError("PUT_ALL", "map(" + map.size() + ")", ex);
return new CacheResult(ex);
}
return res == 0 ? CacheResult.SUCCESS_WITHOUT_MSG :
res == map.size() ? CacheResult.FAIL_WITHOUT_MSG : CacheResult.PART_SUCCESS_WITHOUT_MSG;
}

private CacheResult pipelinePutAll(StringPipelineBinaryCommands pipeline, Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
private int putAllRequestHandle(StringPipelineBinaryCommands pipeline, Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
int failCount = 0;
List<Response<String>> responses = new ArrayList<>();
for (Map.Entry<? extends K, ? extends V> en : map.entrySet()) {
Expand All @@ -326,8 +340,8 @@ private CacheResult pipelinePutAll(StringPipelineBinaryCommands pipeline, Map<?
failCount++;
}
}
return failCount == 0 ? CacheResult.SUCCESS_WITHOUT_MSG :
failCount == map.size() ? CacheResult.FAIL_WITHOUT_MSG : CacheResult.PART_SUCCESS_WITHOUT_MSG;

return failCount;
}

@Override
Expand Down Expand Up @@ -364,30 +378,40 @@ protected CacheResult do_REMOVE_ALL(Set<? extends K> keys) {
}
KeyBinaryCommands writeCommands = (KeyBinaryCommands) writeCommands();
AtomicLong x = new AtomicLong();
return this.<KeyBinaryCommands, KeyPipelineBinaryCommands, CacheResult>doWithPipeline(writeCommands, false, (pipeline, ex) -> {
if (ex != null) {
logError("REMOVE_ALL", "keys(" + keys.size() + ")", ex);
if (x.get() > 0) {
return new CacheResult(CacheResultCode.PART_SUCCESS, ex.toString());
} else {
return new CacheResult(ex);
}
byte[][] newKeys = keys.stream().map((k) -> buildKey(k)).toArray((len) -> new byte[keys.size()][]);
return this.<KeyBinaryCommands, KeyPipelineBinaryCommands, CacheResult, CacheResult>doWithPipeline(
writeCommands,
false,
(pipeline) -> this.removeAllRequest(writeCommands, pipeline, newKeys, x),
(res, ex) -> this.removeAllHandle(res, ex, x, keys.size())
);
}

private CacheResult removeAllRequest(KeyBinaryCommands writeCommands, KeyPipelineBinaryCommands pipeline, byte[][] newKeys, AtomicLong x) {
if (pipeline != null) {
for (byte[] newKey : newKeys) {
pipeline.del(newKey);
x.getAndIncrement();
}
byte[][] newKeys = keys.stream().map((k) -> buildKey(k)).toArray((len) -> new byte[keys.size()][]);

if (pipeline != null) {
for (byte[] newKey : newKeys) {
pipeline.del(newKey);
x.getAndIncrement();
}
sync(pipeline);
} else {
writeCommands.del(newKeys);
}
return CacheResult.SUCCESS_WITHOUT_MSG;
}

sync(pipeline);

private CacheResult removeAllHandle(CacheResult res, Exception ex, AtomicLong x, int keySize) {
if (ex != null) {
logError("REMOVE_ALL", "keys(" + keySize + ")", ex);
if (x.get() > 0) {
return new CacheResult(CacheResultCode.PART_SUCCESS, ex.toString());
} else {
writeCommands.del(newKeys);
return new CacheResult(ex);
}

return CacheResult.SUCCESS_WITHOUT_MSG;
});
}
return res;
}

@Override
Expand Down Expand Up @@ -431,7 +455,7 @@ protected CacheResult do_PUT_IF_ABSENT(K key, V value, long expireAfterWrite, Ti
* @return result
*/
@SuppressWarnings("unchecked")
private <C, P, R> R doWithPipeline(C client, boolean pipelineFirst, BiFunction<P, Exception, R> biFunction) {
private <C, P, T, R> R doWithPipeline(C client, boolean pipelineFirst, Function<P, T> operator, BiFunction<T, Exception, R> biFunction) {
C commands = null;
Closeable closeable = null;
try {
Expand All @@ -453,7 +477,9 @@ private <C, P, R> R doWithPipeline(C client, boolean pipelineFirst, BiFunction<P
pipeline = null;
}

return biFunction.apply(pipeline, null);

T result = operator.apply(pipeline);
return biFunction.apply(result, null);
} catch (Exception ex) {
return biFunction.apply(null, ex);
} finally {
Expand Down

0 comments on commit b5bff50

Please sign in to comment.