Skip to content

Commit

Permalink
refactor: 优化文件导入
Browse files Browse the repository at this point in the history
  • Loading branch information
zhou-hao committed Dec 20, 2023
1 parent 7f02888 commit 27ff051
Show file tree
Hide file tree
Showing 9 changed files with 322 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,27 @@
package org.jetlinks.community.utils;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;

import org.springframework.util.unit.DataSize;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;

import javax.annotation.Nonnull;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;

public class ObjectMappers {
Expand Down Expand Up @@ -96,4 +111,143 @@ public static <T> T parseJsonArray(String data, Class<T> type) {
return JSON_MAPPER.readerForListOf(type).readValue(data);
}

/**
* 转换数据流为json对象流
*
* @param stream 数据流
* @param type json对象类型
* @param <T> json对象类型
* @return json对象流
*/
public static <T> Flux<T> parseJsonStream(Flux<DataBuffer> stream,
Class<T> type) {
return parseJsonStream(stream, type, JSON_MAPPER);
}

/**
* 转换数据流为json对象流
*
* @param stream 数据流
* @param type json对象类型
* @param mapper json转换器
* @param <T> json对象类型
* @return json对象流
*/
public static <T> Flux<T> parseJsonStream(Flux<DataBuffer> stream,
Class<T> type,
ObjectMapper mapper) {
Jackson2JsonDecoder decoder = new Jackson2JsonDecoder(mapper);
decoder.setMaxInMemorySize((int) DataSize.ofMegabytes(8).toBytes());
return decoder
.decode(stream, ResolvableType.forType(type), null, null)
.cast(type);
}

/**
* 转换数据流为json字节流
*
* @param objectStream 数据流
* @return json字节流
*/
public static Flux<byte[]> toJsonStream(Flux<?> objectStream) {
return toJsonStream(objectStream, JSON_MAPPER);
}

/**
* 转换数据流为json字节流
*
* @param objectStream 数据流
* @param mapper json转换器
* @return json字节流
*/
public static Flux<byte[]> toJsonStream(Flux<?> objectStream, ObjectMapper mapper) {
return Flux.create(sink -> {
OutputStream stream = createStream(sink, 8096);

JsonGenerator generator = createJsonGenerator(mapper, stream);
try {
generator.writeStartArray();
} catch (IOException e) {
sink.error(e);
return;
}
@SuppressWarnings("all")
Disposable writer = objectStream
.publishOn(Schedulers.single(Schedulers.boundedElastic()))
.subscribe(
next -> writeObject(generator, next),
sink::error,
() -> {
try {
generator.writeEndArray();
} catch (IOException e) {
sink.error(e);
return;
}
safeClose(generator);
sink.complete();
},
Context.of(sink.contextView()));

sink.onDispose(() -> {
safeClose(generator);
writer.dispose();
});
});
}

private static OutputStream createStream(FluxSink<byte[]> sink, int bufferSize) {
return new BufferedOutputStream(new OutputStream() {

@Override
public void write(@Nonnull byte[] b, int off, int len) {
if (len == b.length) {
sink.next(b);
} else {
sink.next(Arrays.copyOfRange(b, off, off + len));
}
}

@Override
public void write(@Nonnull byte[] b) {
sink.next(b);
}

@Override
public void write(int b) {
sink.next(new byte[]{(byte) b});
}
}, bufferSize) {
@Override
public void close() throws IOException {
try {
super.close();
} finally {
sink.complete();
}
}
};
}

@SneakyThrows
private static JsonGenerator createJsonGenerator(ObjectMapper mapper, OutputStream stream) {
return mapper.createGenerator(stream);
}

@SneakyThrows
private static void writeObject(JsonGenerator generator, Object data) {
generator.writePOJO(data);
}

private static void safeClose(JsonGenerator closeable) {
if (closeable.isClosed()) {
return;
}
try {
closeable.close();
} catch (IOException ignore) {

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.jetlinks.community.io.file.FileManager;
import org.jetlinks.community.io.file.FileOption;
import org.jetlinks.community.io.utils.FileUtils;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -16,6 +17,8 @@
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

import static org.jetlinks.community.io.excel.ImportHelper.FORMAT_JSON;

/**
* 抽象数据导入服务,导入数据并返回实时数据结果,导入结束后返回导入结果文件.
*
Expand All @@ -27,9 +30,9 @@
@AllArgsConstructor
public abstract class AbstractImporter<T> {

private final FileManager fileManager;
protected final FileManager fileManager;

private final WebClient client;
protected final WebClient client;

protected abstract Mono<Void> handleData(Flux<T> data);

Expand All @@ -39,25 +42,48 @@ protected void customImport(ImportHelper<T> helper) {

}

public Flux<ImportResult<T>> doImport(String fileUrl) {
String format = FileUtils.getExtension(fileUrl);
/**
* 写出导入结果文件
*
* @param fileUrl 原始导入文件地址
* @param format 原始导入文件格式
* @param buffer 结果文件流
* @return 导入结果信息
*/
protected Mono<ImportResult<T>> writeDetailFile(String fileUrl, String format, Flux<DataBuffer> buffer) {
return fileManager
.saveFile(getResultFileName(fileUrl, format), buffer, FileOption.tempFile)
.map(ImportResult::of);
}

public Flux<ImportResult<T>> doImport(String fileUrl, String format) {
ImportHelper<T> importHelper = new ImportHelper<>(this::newInstance, this::handleData);

customImport(importHelper);

//导入JSON
if (FORMAT_JSON.equalsIgnoreCase(format)) {
return importHelper
.doImportJson(
FileUtils.readDataBuffer(client, fileUrl),
ImportResult::of,
buf -> writeDetailFile(fileUrl, format, buf));
}
//导入EXCEL
return this
.getInputStream(fileUrl)
.flatMapMany(stream -> importHelper
.doImport(stream, format, ImportResult::of,
buf -> fileManager
.saveFile(getResultFileName(fileUrl, format), buf, FileOption.tempFile)
.map(ImportResult::<T>of)));
buf -> writeDetailFile(fileUrl, format, buf)));
}

protected String getResultFileName(String sourceFileUrl, String format) {
public Flux<ImportResult<T>> doImport(String fileUrl) {
return doImport(fileUrl, FileUtils.getExtension(fileUrl));
}

return "导入结果_" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy_MM_dd_HH_mm_ss")) + "." + format;
protected String getResultFileName(String sourceFileUrl, String format) {
return "导入结果_" + LocalDateTime
.now()
.format(DateTimeFormatter.ofPattern("yyyy_MM_dd_HH_mm_ss")) + "." + format;
}

public enum ImportResultType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ public Mono<InputStream> getInputStream(String fileUrl) {
.get()
.uri(fileUrl)
.accept(MediaType.APPLICATION_OCTET_STREAM)
.exchange()
.flatMap(clientResponse -> clientResponse.bodyToMono(Resource.class))
.exchangeToMono(clientResponse -> clientResponse.bodyToMono(Resource.class))
.flatMap(resource -> Mono.fromCallable(resource::getInputStream));
} else {
return Mono.fromCallable(() -> new FileInputStream(fileUrl));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.function.Function3;
Expand Down Expand Up @@ -106,8 +107,10 @@ public static <T> Flux<T> read(Supplier<T> supplier,
.wrapper(ReactorExcel.mapWrapper())
.readAndClose(inputStream, options)
.map(map -> {

map = transformValue(map, keyAndHeader, ConverterExcelOption::convertForRead);
//过滤空值后转换数据
map = transformValue(Maps.filterValues(map, val -> !ObjectUtils.isEmpty(val)),
keyAndHeader,
ConverterExcelOption::convertForRead);

T data = supplier.get();

Expand Down
Loading

0 comments on commit 27ff051

Please sign in to comment.