diff --git a/pom.xml b/pom.xml index 726327b142c..7204e9738e4 100644 --- a/pom.xml +++ b/pom.xml @@ -568,6 +568,7 @@ ${maven.compiler.source} ${maven.compiler.target} + true diff --git a/seatunnel-engine/pom.xml b/seatunnel-engine/pom.xml index 61c4513e38d..0d07baf5883 100644 --- a/seatunnel-engine/pom.xml +++ b/seatunnel-engine/pom.xml @@ -28,6 +28,7 @@ 5.1 + 3.4.4 @@ -46,6 +47,11 @@ hazelcast ${hazelcast.version} + + com.lmax + disruptor + ${disruptor.version} + diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/pom.xml b/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/pom.xml new file mode 100644 index 00000000000..7781fa15793 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/pom.xml @@ -0,0 +1,45 @@ + + + + + seatunnel-engine-storage + org.apache.seatunnel + ${revision} + + 4.0.0 + + imap-storage-api + + + + io.protostuff + protostuff-core + + + + io.protostuff + protostuff-runtime + + + + \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/IMapStorage.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/IMapStorage.java new file mode 100644 index 00000000000..8ac55310ec0 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/IMapStorage.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.api; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +public interface IMapStorage { + + public void initialize(Map properties); + + /** + * Store a key-value pair in the map. + * @param key storage key + * @param value storage value + * @return storage status, true is success, false is fail + */ + public boolean store(Object key, Object value); + + /** + * Store a key-value pair in the map storage. + * @param map storage key-value pair + * @return if some key-value pair is not stored, return this keys; + * if all key-value pair is stored, return empty set. + */ + public Set storeAll(Map map); + + /** + * Delete a key in the map storage. + * @param key storage key + * @return storage status, true is success, false is fail + */ + public boolean delete(Object key); + + /** + * Delete a collection of keys from the map storage. + * @param keys delete keys + * @return if some keys delete fail, will return this keys + * if all keys delete success, will return empty set + */ + public Set deleteAll(Collection keys); + + public Map loadAll() throws IOException; + + public Set loadAllKeys(); + + public void destroy(); +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/common/ProtoStuffSerializer.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/common/ProtoStuffSerializer.java new file mode 100644 index 00000000000..4b162dc3747 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/common/ProtoStuffSerializer.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.api.common; + +import io.protostuff.LinkedBuffer; +import io.protostuff.ProtostuffIOUtil; +import io.protostuff.Schema; +import io.protostuff.runtime.RuntimeSchema; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Todo: move to common module + */ +@Slf4j +public class ProtoStuffSerializer implements Serializer { + + /** + * At the moment it looks like we only have one Schema. + */ + private static final Map, Schema> SCHEMA_CACHE = new ConcurrentHashMap<>(); + + @SuppressWarnings("unchecked") + private static Schema getSchema(Class clazz) { + return (Schema) SCHEMA_CACHE.computeIfAbsent(clazz, RuntimeSchema::createFrom); + } + + private static final Set> WRAPPERS = new HashSet<>(); + + private static final Class WRAPPER_CLASS = SerializerDeserializerWrapper.class; + + private static final Schema WRAPPER_SCHEMA = getSchema(WRAPPER_CLASS); + + static { + WRAPPERS.add(Boolean.class); + WRAPPERS.add(Byte.class); + WRAPPERS.add(Character.class); + WRAPPERS.add(Short.class); + WRAPPERS.add(Integer.class); + WRAPPERS.add(Long.class); + WRAPPERS.add(Float.class); + WRAPPERS.add(Double.class); + WRAPPERS.add(String.class); + WRAPPERS.add(Void.class); + WRAPPERS.add(List.class); + WRAPPERS.add(ArrayList.class); + WRAPPERS.add(Map.class); + WRAPPERS.add(HashMap.class); + WRAPPERS.add(TreeMap.class); + WRAPPERS.add(Hashtable.class); + WRAPPERS.add(SortedMap.class); + } + + @Override + public byte[] serialize(T obj) { + Class clazz = (Class) obj.getClass(); + LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); + Schema schema = WRAPPER_SCHEMA; + if (WRAPPERS.contains(clazz)) { + obj = (T) SerializerDeserializerWrapper.of(obj); + } else { + schema = getSchema(clazz); + } + + byte[] data; + try { + data = ProtostuffIOUtil.toByteArray(obj, schema, buffer); + } finally { + buffer.clear(); + } + return data; + } + + @Override + public T deserialize(byte[] data, Class clz) { + + if (!WRAPPERS.contains(clz)) { + Schema schema = getSchema(clz); + T message = schema.newMessage(); + ProtostuffIOUtil.mergeFrom(data, message, schema); + return message; + } + SerializerDeserializerWrapper wrapper = new SerializerDeserializerWrapper<>(); + ProtostuffIOUtil.mergeFrom(data, wrapper, WRAPPER_SCHEMA); + return wrapper.getObj(); + } + + public static class SerializerDeserializerWrapper { + private T obj; + + public static SerializerDeserializerWrapper of(T obj) { + SerializerDeserializerWrapper wrapper = new SerializerDeserializerWrapper<>(); + wrapper.setObj(obj); + return wrapper; + } + + public T getObj() { + return obj; + } + + public void setObj(T obj) { + this.obj = obj; + } + } + +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/common/Serializer.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/common/Serializer.java new file mode 100644 index 00000000000..15ed06c92c4 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/common/Serializer.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.api.common; + +import java.io.IOException; + +/** + * Todo: move to common module + */ +public interface Serializer { + + byte[] serialize(T obj) throws IOException; + + T deserialize(byte[] data, Class clz) throws IOException; +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/exception/IMapStorageException.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/exception/IMapStorageException.java new file mode 100644 index 00000000000..07a0165dde3 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/exception/IMapStorageException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.api.exception; + +public class IMapStorageException extends RuntimeException { + + public IMapStorageException(String message) { + super(message); + } + + public IMapStorageException(String message, Throwable cause) { + super(message, cause); + } + + public IMapStorageException(Throwable cause) { + super(cause); + } + + public IMapStorageException(Throwable cause, String message, Object... data) { + super(String.format(message, data), cause); + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/pom.xml b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/pom.xml new file mode 100644 index 00000000000..6242a5121ff --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/pom.xml @@ -0,0 +1,65 @@ + + + + + seatunnel-engine-storage + org.apache.seatunnel + ${revision} + + 4.0.0 + + imap-storage-file + + + + org.apache.seatunnel + imap-storage-api + ${project.version} + + + org.apache.flink + flink-shaded-hadoop-2 + provided + + + org.apache.commons + commons-lang + + + + + org.apache.commons + commons-lang3 + + + + com.lmax + disruptor + + + org.awaitility + awaitility + + + + \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java new file mode 100644 index 00000000000..a688848908e --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file; + +import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.DEFAULT_IMAP_FILE_PATH_SPLIT; +import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.DEFAULT_IMAP_NAMESPACE; +import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.BUSINESS_KEY; +import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.CLUSTER_NAME; +import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.HDFS_CONFIG_KEY; +import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.NAMESPACE_KEY; + +import org.apache.seatunnel.engine.imap.storage.api.IMapStorage; +import org.apache.seatunnel.engine.imap.storage.api.common.ProtoStuffSerializer; +import org.apache.seatunnel.engine.imap.storage.api.common.Serializer; +import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; +import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; +import org.apache.seatunnel.engine.imap.storage.file.common.FileConstants; +import org.apache.seatunnel.engine.imap.storage.file.common.WALReader; +import org.apache.seatunnel.engine.imap.storage.file.disruptor.WALDisruptor; +import org.apache.seatunnel.engine.imap.storage.file.disruptor.WALEventType; +import org.apache.seatunnel.engine.imap.storage.file.future.RequestFuture; +import org.apache.seatunnel.engine.imap.storage.file.future.RequestFutureCache; + +import lombok.extern.slf4j.Slf4j; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * IMapFileStorage + * Please notice : + * Only applicable to big data (kv) storage. Otherwise, there may be a lot of fragmented files + * This is not suitable for frequently updated scenarios because all data is stored as an appended file. + * There is no guarantee that all files will be up-to-date when a query is made, and this delay depends on the archive cycle. + * If you write large amounts of data in batches, it is best to archive immediately. + * Some design detail: + * base on file, use orc file to store data + * use disruptor to write data to file + * use orc reader to read data from file + * use wal to ensure data consistency + * use request future to ensure data consistency + */ +@Slf4j +public class IMapFileStorage implements IMapStorage { + + public FileSystem fs; + + public String namespace; + + /** + * virtual region, Randomly generate a region name + */ + public String region; + + /** + * like OSS bucket name + * It is used to distinguish data storage locations of different business. + */ + public String businessName; + + /** + * This parameter is primarily used for cluster isolation + * we can use this to distinguish different cluster, like cluster1, cluster2 + * and this is also used to distinguish different business + */ + public String clusterName; + + /** + * We used disruptor to implement the asynchronous write. + */ + WALDisruptor walDisruptor; + + /** + * serializer, default is ProtoStuffSerializer + */ + Serializer serializer; + + private String businessRootPath = null; + + public static final int DEFAULT_ARCHIVE_WAIT_TIME_MILLISECONDS = 1000 * 60; + + public static final int DEFAULT_QUERY_LIST_SIZE = 256; + + public static final int DEFAULT_QUERY_DATA_TIMEOUT_MILLISECONDS = 100; + + private Configuration conf; + + /** + * @param configuration configuration + * @see FileConstants.FileInitProperties + */ + @Override + public void initialize(Map configuration) { + checkInitStorageProperties(configuration); + Configuration hadoopConf = (Configuration) configuration.get(HDFS_CONFIG_KEY); + this.conf = hadoopConf; + this.namespace = (String) configuration.getOrDefault(NAMESPACE_KEY, DEFAULT_IMAP_NAMESPACE); + this.businessName = (String) configuration.get(BUSINESS_KEY); + + this.clusterName = (String) configuration.get(CLUSTER_NAME); + + this.region = String.valueOf(System.nanoTime()); + this.businessRootPath = namespace + DEFAULT_IMAP_FILE_PATH_SPLIT + clusterName + DEFAULT_IMAP_FILE_PATH_SPLIT + businessName + DEFAULT_IMAP_FILE_PATH_SPLIT; + try { + this.fs = FileSystem.get(hadoopConf); + } catch (IOException e) { + throw new IMapStorageException("Failed to get file system", e); + } + this.serializer = new ProtoStuffSerializer(); + this.walDisruptor = new WALDisruptor(fs, businessRootPath + region + DEFAULT_IMAP_FILE_PATH_SPLIT, serializer); + } + + @Override + public boolean store(Object key, Object value) { + IMapFileData data; + try { + data = parseToIMapFileData(key, value); + } catch (IOException e) { + log.error("parse to IMapFileData error, key is {}, value is {}", key, value, e); + return false; + } + + long requestId = sendToDisruptorQueue(data, WALEventType.APPEND); + return queryExecuteStatus(requestId); + } + + @Override + public Set storeAll(Map map) { + Map requestMap = new HashMap<>(map.size()); + Set failures = new HashSet<>(); + map.forEach((key, value) -> { + try { + IMapFileData data = parseToIMapFileData(key, value); + long requestId = sendToDisruptorQueue(data, WALEventType.APPEND); + requestMap.put(requestId, key); + } catch (IOException e) { + log.error("parse to IMapFileData error", e); + failures.add(key); + } + }); + return batchQueryExecuteFailsStatus(requestMap, failures); + } + + @Override + public boolean delete(Object key) { + IMapFileData data; + try { + data = buildDeleteIMapFileData(key); + } catch (IOException e) { + log.error("parse to IMapFileData error, key is {} ", key, e); + return false; + } + long requestId = sendToDisruptorQueue(data, WALEventType.APPEND); + return queryExecuteStatus(requestId); + } + + @Override + public Set deleteAll(Collection keys) { + Map requestMap = new HashMap<>(keys.size()); + Set failures = new HashSet<>(); + keys.forEach(key -> { + try { + IMapFileData data = buildDeleteIMapFileData(key); + long requestId = sendToDisruptorQueue(data, WALEventType.APPEND); + walDisruptor.tryAppendPublish(data, requestId); + requestMap.put(requestId, data); + } catch (IOException e) { + log.error("parse to IMapFileData error", e); + failures.add(key); + } + }); + return batchQueryExecuteFailsStatus(requestMap, failures); + } + + @Override + public Map loadAll() { + try { + WALReader reader = new WALReader(fs, serializer); + return reader.loadAllData(new Path(businessRootPath), new HashSet<>()); + } catch (IOException e) { + throw new IMapStorageException("load all data error", e); + } + } + + @Override + public Set loadAllKeys() { + try { + WALReader reader = new WALReader(fs, serializer); + return reader.loadAllKeys(new Path(businessRootPath)); + } catch (IOException e) { + throw new IMapStorageException(e, "load all keys error parent path is {}", e, businessRootPath); + } + } + + @Override + public void destroy() { + log.info("start destroy IMapFileStorage, businessName is {}, cluster name is {}", businessName, region); + /** + * 1. close current disruptor + * 2. delete all files + * notice: we can not delete the files in the middle of the write, so some current file may be not deleted + */ + try { + walDisruptor.close(); + } catch (IOException e) { + log.error("close walDisruptor error", e); + } + // delete all files + String parentPath = businessRootPath; + + try { + fs.delete(new Path(parentPath), true); + } catch (IOException e) { + log.error("destroy IMapFileStorage error,businessName is {}, cluster name is {}", businessName, region, e); + } + + } + + private IMapFileData parseToIMapFileData(Object key, Object value) throws IOException { + return IMapFileData.builder() + .key(serializer.serialize(key)) + .keyClassName(key.getClass().getName()) + .value(serializer.serialize(value)) + .valueClassName(value.getClass().getName()) + .timestamp(System.currentTimeMillis()) + .deleted(false) + .build(); + } + + private IMapFileData buildDeleteIMapFileData(Object key) throws IOException { + return IMapFileData.builder() + .key(serializer.serialize(key)) + .keyClassName(key.getClass().getName()) + .timestamp(System.currentTimeMillis()) + .deleted(true) + .build(); + } + + private long sendToDisruptorQueue(IMapFileData data, WALEventType type) { + long requestId = RequestFutureCache.getRequestId(); + RequestFuture requestFuture = new RequestFuture(); + RequestFutureCache.put(requestId, requestFuture); + walDisruptor.tryPublish(data, type, requestId); + return requestId; + } + + private boolean queryExecuteStatus(long requestId) { + return queryExecuteStatus(requestId, DEFAULT_QUERY_DATA_TIMEOUT_MILLISECONDS); + } + + private boolean queryExecuteStatus(long requestId, long timeout) { + RequestFuture requestFuture = RequestFutureCache.get(requestId); + try { + if (requestFuture.isDone() || Boolean.TRUE.equals(requestFuture.get(timeout, TimeUnit.MILLISECONDS))) { + return true; + } + } catch (Exception e) { + log.error("wait for write status error", e); + } finally { + RequestFutureCache.remove(requestId); + } + return false; + } + + private Set batchQueryExecuteFailsStatus(Map requestMap, Set failures) { + for (Map.Entry entry : requestMap.entrySet()) { + boolean success = false; + RequestFuture requestFuture = RequestFutureCache.get(entry.getKey()); + try { + if (requestFuture.isDone() || Boolean.TRUE.equals(requestFuture.get())) { + success = true; + } + } catch (Exception e) { + log.error("wait for write status error", e); + } finally { + RequestFutureCache.remove(entry.getKey()); + } + if (!success) { + failures.add(entry.getValue()); + } + } + return failures; + } + + private void checkInitStorageProperties(Map properties) { + if (properties == null || properties.isEmpty()) { + throw new IllegalArgumentException("init file storage properties is empty"); + } + List requiredProperties = Arrays.asList(BUSINESS_KEY, CLUSTER_NAME, HDFS_CONFIG_KEY); + for (String requiredProperty : requiredProperties) { + if (!properties.containsKey(requiredProperty)) { + throw new IllegalArgumentException("init file storage properties is not contains " + requiredProperty); + } + } + } + +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/bean/IMapData.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/bean/IMapData.java new file mode 100644 index 00000000000..fa2a8731c24 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/bean/IMapData.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.bean; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class IMapData implements Serializable, Comparable { + + private boolean deleted; + + private byte[] key; + + private String keyClassName; + + private byte[] value; + + private String valueClassName; + + private long timestamp; + + @Override + public int compareTo(IMapData o) { + return o.timestamp - this.timestamp > 0 ? 1 : -1; + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/bean/IMapFileData.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/bean/IMapFileData.java new file mode 100644 index 00000000000..5b9d8863f5c --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/bean/IMapFileData.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.bean; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class IMapFileData implements Serializable, Comparable { + private boolean deleted; + + private byte[] key; + + private String keyClassName; + + private byte[] value; + + private String valueClassName; + + private long timestamp; + + @Override + public int compareTo(IMapFileData o) { + return o.timestamp - this.timestamp > 0 ? 1 : -1; + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/FileConstants.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/FileConstants.java new file mode 100644 index 00000000000..92708d24fff --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/FileConstants.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.common; + +import org.apache.hadoop.conf.Configuration; + +public class FileConstants { + + public static final String DEFAULT_IMAP_NAMESPACE = "/seatunnel-imap"; + + public static final String DEFAULT_IMAP_FILE_PATH_SPLIT = "/"; + + public static final byte FILE_DATA_DELIMITER = 28; + + /** + * init file storage + */ + public interface FileInitProperties { + + /****************** The following are required parameters for initialization **************/ + + String NAMESPACE_KEY = "namespace"; + + /** + * like OSS bucket name + * It is used to distinguish data storage locations of different business. + * Type: String + */ + String BUSINESS_KEY = "businessName"; + + /** + * This parameter is primarily used for cluster isolation + * we can use this to distinguish different cluster, like cluster1, cluster2 + * and this is also used to distinguish different business + *

+ * Type: String + */ + String CLUSTER_NAME = "clusterName"; + + /** + * We used hdfs api read/write file + * so, used this storage need provide hdfs configuratio + *

+ * Type: + * + * @see Configuration + */ + String HDFS_CONFIG_KEY = "hdfsConfig"; + + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALDataUtils.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALDataUtils.java new file mode 100644 index 00000000000..f1d71c13c95 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALDataUtils.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.common; + +// CHECKSTYLE:OFF +public class WALDataUtils { + + public static final int WAL_DATA_METADATA_LENGTH = 12; + + public static byte[] wrapperBytes(byte[] bytes) { + byte[] metadata = new byte[WAL_DATA_METADATA_LENGTH]; + byte[] length = intToByteArray(bytes.length); + System.arraycopy(length, 0, metadata, 0, length.length); + byte[] result = new byte[bytes.length + WAL_DATA_METADATA_LENGTH]; + System.arraycopy(metadata, 0, result, 0, metadata.length); + System.arraycopy(bytes, 0, result, metadata.length, bytes.length); + return result; + } + + public static int byteArrayToInt(byte[] encodedValue) { + int value = (encodedValue[3] << (Byte.SIZE * 3)); + value |= (encodedValue[2] & 0xFF) << (Byte.SIZE * 2); + value |= (encodedValue[1] & 0xFF) << (Byte.SIZE); + value |= (encodedValue[0] & 0xFF); + return value; + } + + public static byte[] intToByteArray(int value) { + byte[] encodedValue = new byte[Integer.SIZE / Byte.SIZE]; + encodedValue[3] = (byte) (value >> Byte.SIZE * 3); + encodedValue[2] = (byte) (value >> Byte.SIZE * 2); + encodedValue[1] = (byte) (value >> Byte.SIZE); + encodedValue[0] = (byte) value; + return encodedValue; + } + +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReader.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReader.java new file mode 100644 index 00000000000..72ddc4ea1b3 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReader.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.common; + +import static org.apache.seatunnel.engine.imap.storage.file.common.WALDataUtils.WAL_DATA_METADATA_LENGTH; + +import org.apache.seatunnel.engine.imap.storage.api.common.Serializer; +import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; +import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.ClassUtils; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class WALReader { + private static final int DEFAULT_QUERY_LIST_SIZE = 1024; + private FileSystem fs; + private final Serializer serializer; + + public WALReader(FileSystem fs, Serializer serializer) throws IOException { + this.serializer = serializer; + this.fs = fs; + } + + private List readAllData(Path parentPath) throws IOException { + List fileNames = getFileNames(parentPath); + if (CollectionUtils.isEmpty(fileNames)) { + return new ArrayList<>(); + } + List result = new ArrayList<>(DEFAULT_QUERY_LIST_SIZE); + for (String fileName : fileNames) { + result.addAll(readData(new Path(parentPath, fileName))); + } + return result; + } + + public Set loadAllKeys(Path parentPath) throws IOException { + List allData = readAllData(parentPath); + if (CollectionUtils.isEmpty(allData)) { + return new HashSet<>(); + } + Collections.sort(allData); + Set result = new HashSet<>(allData.size()); + Map deleteMap = new HashMap<>(); + for (IMapFileData data : allData) { + Object key = deserializeData(data.getKey(), data.getKeyClassName()); + if (deleteMap.containsKey(key)) { + continue; + } + if (data.isDeleted()) { + deleteMap.put(key, data.getTimestamp()); + continue; + } + if (result.contains(key)) { + continue; + } + result.add(key); + } + return result; + } + + public Map loadAllData(Path parentPath, Set searchKeys) throws IOException { + List allData = readAllData(parentPath); + if (CollectionUtils.isEmpty(allData)) { + return new HashMap<>(); + } + Collections.sort(allData); + Map result = new HashMap<>(allData.size()); + Map deleteMap = new HashMap<>(); + boolean searchByKeys = CollectionUtils.isNotEmpty(searchKeys); + for (IMapFileData data : allData) { + Object key = deserializeData(data.getKey(), data.getKeyClassName()); + if (searchByKeys && !searchKeys.contains(data.getKey())) { + continue; + } + if (deleteMap.containsKey(key)) { + continue; + } + if (data.isDeleted()) { + deleteMap.put(key, data.getTimestamp()); + continue; + } + if (result.containsKey(key)) { + continue; + } + Object value = deserializeData(data.getValue(), data.getValueClassName()); + result.put(key, value); + } + return result; + } + + private List readData(Path path) throws IOException { + List result = new ArrayList<>(DEFAULT_QUERY_LIST_SIZE); + long length = fs.getFileStatus(path).getLen(); + try (FSDataInputStream in = fs.open(path)) { + byte[] datas = new byte[(int) length]; + in.readFully(datas); + int startIndex = 0; + while (startIndex + WAL_DATA_METADATA_LENGTH < datas.length) { + + byte[] metadata = new byte[WAL_DATA_METADATA_LENGTH]; + System.arraycopy(datas, startIndex, metadata, 0, WAL_DATA_METADATA_LENGTH); + int dataLength = WALDataUtils.byteArrayToInt(metadata); + startIndex += WAL_DATA_METADATA_LENGTH; + if (startIndex + dataLength > datas.length) { + break; + } + byte[] data = new byte[dataLength]; + System.arraycopy(datas, startIndex, data, 0, data.length); + IMapFileData fileData = serializer.deserialize(data, IMapFileData.class); + result.add(fileData); + startIndex += data.length; + } + } + return result; + } + + private List getFileNames(Path parentPath) { + try { + + RemoteIterator fileStatusRemoteIterator = fs.listFiles(parentPath, true); + List fileNames = new ArrayList<>(); + while (fileStatusRemoteIterator.hasNext()) { + LocatedFileStatus fileStatus = fileStatusRemoteIterator.next(); + if (fileStatus.getPath().getName().endsWith("wal.txt")) { + fileNames.add(fileStatus.getPath().toString()); + } + } + return fileNames; + } catch (IOException e) { + throw new IMapStorageException(e, "get file names error,path is s%", parentPath); + } + } + + private Object deserializeData(byte[] data, String className) { + try { + Class clazz = ClassUtils.getClass(className); + try { + return serializer.deserialize(data, clazz); + } catch (IOException e) { + //log.error("deserialize data error, data is {}, className is {}", data, className, e); + throw new IMapStorageException(e, "deserialize data error: data is s%, className is s%", data, className); + } + } catch (ClassNotFoundException e) { + // log.error("deserialize data error, class name is {}", className, e); + throw new IMapStorageException(e, "deserialize data error, class name is {}", className); + } + } +} + + diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALWriter.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALWriter.java new file mode 100644 index 00000000000..c516d4481cb --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALWriter.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.common; + +import org.apache.seatunnel.engine.imap.storage.api.common.Serializer; +import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSOutputStream; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; + +import java.io.IOException; +import java.util.EnumSet; + +public class WALWriter implements AutoCloseable { + + FSDataOutputStream out; + + Serializer serializer; + + public WALWriter(FileSystem fs, Path parentPath, Serializer serializer) throws IOException { + Path path = new Path(parentPath, "wal.txt"); + this.out = fs.create(path); + this.serializer = serializer; + } + + public void write(IMapFileData data) throws IOException { + byte[] bytes = serializer.serialize(data); + this.write(bytes); + } + + public void flush() throws IOException { + // hsync to flag + if (out instanceof HdfsDataOutputStream) { + ((HdfsDataOutputStream) out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); + } + if (out.getWrappedStream() instanceof DFSOutputStream) { + ((DFSOutputStream) out.getWrappedStream()).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); + } else { + out.hsync(); + } + this.out.hflush(); + + } + + private void write(byte[] bytes) throws IOException { + byte[] data = WALDataUtils.wrapperBytes(bytes); + this.out.write(data); + this.flush(); + } + + @Override + public void close() throws Exception { + out.close(); + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/FileWALEvent.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/FileWALEvent.java new file mode 100644 index 00000000000..e3fe1416a4d --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/FileWALEvent.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.disruptor; + +import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; + +import com.lmax.disruptor.EventFactory; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class FileWALEvent { + + private IMapFileData data; + + private WALEventType type; + + private long requestId; + + public static final EventFactory FACTORY = FileWALEvent::new; +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java new file mode 100644 index 00000000000..b0481484fba --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.disruptor; + +import org.apache.seatunnel.engine.imap.storage.api.common.Serializer; +import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; +import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; + +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.EventTranslatorThreeArg; +import com.lmax.disruptor.TimeoutException; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import com.lmax.disruptor.util.DaemonThreadFactory; +import lombok.extern.slf4j.Slf4j; +import org.apache.hadoop.fs.FileSystem; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class WALDisruptor implements Closeable { + + private volatile Disruptor disruptor; + + private static final int DEFAULT_RING_BUFFER_SIZE = 256 * 1024; + + private static final int DEFAULT_CLOSE_WAIT_TIME_SECONDS = 5; + + private boolean isClosed = false; + + private static final EventTranslatorThreeArg TRANSLATOR = + (event, sequence, data, walEventStatus, requestId) -> { + event.setData(data); + event.setType(walEventStatus); + event.setRequestId(requestId); + }; + + public WALDisruptor(FileSystem fs, String parentPath, Serializer serializer) { + //todo should support multi thread producer + ThreadFactory threadFactory = DaemonThreadFactory.INSTANCE; + this.disruptor = new Disruptor<>(FileWALEvent.FACTORY, + DEFAULT_RING_BUFFER_SIZE, threadFactory, + ProducerType.SINGLE, + new BlockingWaitStrategy()); + + disruptor.handleEventsWithWorkerPool(new WALWorkHandler(fs, parentPath, serializer)); + + disruptor.start(); + } + + public boolean tryPublish(IMapFileData message, WALEventType status, Long requestId) { + if (isClosed()) { + return false; + } + disruptor.getRingBuffer().publishEvent(TRANSLATOR, message, status, requestId); + return true; + } + + public boolean tryAppendPublish(IMapFileData message, long requestId) { + return this.tryPublish(message, WALEventType.APPEND, requestId); + } + + public boolean isClosed() { + return isClosed; + } + + @Override + public void close() throws IOException { + //we can wait for 5 seconds, so that backlog can be committed + try { + tryPublish(null, WALEventType.CLOSED, 0L); + isClosed = true; + disruptor.shutdown(DEFAULT_CLOSE_WAIT_TIME_SECONDS, TimeUnit.SECONDS); + } catch (TimeoutException e) { + log.error("WALDisruptor close timeout error", e); + throw new IMapStorageException("WALDisruptor close timeout error", e); + } + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALEventType.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALEventType.java new file mode 100644 index 00000000000..83f7c52ce4c --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALEventType.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.disruptor; + +public enum WALEventType { + /** + * write data to wal file + */ + APPEND, + /** + * delete all wal file in this namespace + */ + CLEAR, + /** + * Close wal file + */ + CLOSED +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java new file mode 100644 index 00000000000..8eb63991c16 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.disruptor; + +import org.apache.seatunnel.engine.imap.storage.api.common.Serializer; +import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; +import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; +import org.apache.seatunnel.engine.imap.storage.file.common.WALWriter; +import org.apache.seatunnel.engine.imap.storage.file.future.RequestFutureCache; + +import com.lmax.disruptor.WorkHandler; +import lombok.extern.slf4j.Slf4j; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; + +/** + * NOTICE: + * Single thread to write data to orc file. + */ +@Slf4j +public class WALWorkHandler implements WorkHandler { + + private WALWriter writer; + + public WALWorkHandler(FileSystem fs, String parentPath, Serializer serializer) { + try { + writer = new WALWriter(fs, new Path(parentPath), serializer); + } catch (IOException e) { + throw new IMapStorageException(e, "create new current writer failed, parent path is %s", parentPath); + } + } + + @Override + public void onEvent(FileWALEvent fileWALEvent) throws Exception { + log.debug("write data to orc file"); + walEvent(fileWALEvent.getData(), fileWALEvent.getType(), fileWALEvent.getRequestId()); + } + + private void walEvent(IMapFileData iMapFileData, WALEventType type, long requestId) throws Exception { + if (type == WALEventType.APPEND) { + boolean writeSuccess = true; + // write to current writer + try { + writer.write(iMapFileData); + } catch (IOException e) { + writeSuccess = false; + log.error("write orc file error, walEventBean is {} ", iMapFileData, e); + } + // return the result to the client + executeResponse(requestId, writeSuccess); + return; + } + + if (type == WALEventType.CLOSED) { + //close writer and archive + writer.close(); + } + } + + private void executeResponse(long requestId, boolean success) { + try { + RequestFutureCache.get(requestId).done(success); + } catch (RuntimeException e) { + log.error("response error, requestId is {} ", requestId, e); + } + } + +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/future/RequestFuture.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/future/RequestFuture.java new file mode 100644 index 00000000000..640bf93318a --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/future/RequestFuture.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.future; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +public class RequestFuture implements Future { + + private CountDownLatch latch = new CountDownLatch(1); + + private boolean success = false; + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return success; + } + + @Override + public Boolean get() throws InterruptedException { + if (success) { + return true; + } + latch.await(1, TimeUnit.SECONDS); + if (!success) { + return false; + } + return success; + } + + @Override + public Boolean get(long timeout, TimeUnit unit) throws InterruptedException { + if (success) { + return true; + } + latch.await(timeout, unit); + return success; + } + + public void done(boolean success) { + this.success = success; + latch.countDown(); + } + +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/future/RequestFutureCache.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/future/RequestFutureCache.java new file mode 100644 index 00000000000..cfc16a335f8 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/future/RequestFutureCache.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.future; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +public class RequestFutureCache { + + private RequestFutureCache() { + throw new IllegalStateException("Utility class"); + } + + private static AtomicLong REQUEST_ID_GEN = new AtomicLong(0); + + private static ConcurrentHashMap REQUEST_MAP = new ConcurrentHashMap<>(); + + public static void put(long requestId, RequestFuture requestFuture) { + REQUEST_MAP.put(requestId, requestFuture); + } + + public static RequestFuture get(Long requestId) { + return REQUEST_MAP.get(requestId); + } + + public static void remove(Long requestId) { + REQUEST_MAP.remove(requestId); + } + + public static long getRequestId() { + return REQUEST_ID_GEN.incrementAndGet(); + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/scheduler/SchedulerTaskInfo.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/scheduler/SchedulerTaskInfo.java new file mode 100644 index 00000000000..0fc0451d09f --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/scheduler/SchedulerTaskInfo.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.scheduler; + +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +public class SchedulerTaskInfo { + + private long scheduledTime; + private long latestTime; +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageTest.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageTest.java new file mode 100644 index 00000000000..59c67289b0c --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.condition.OS.LINUX; +import static org.junit.jupiter.api.condition.OS.MAC; + +import org.apache.seatunnel.engine.imap.storage.file.common.FileConstants; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledOnOs; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@EnabledOnOs({LINUX, MAC}) +public class IMapFileStorageTest { + + private static final Configuration CONF; + + static { + CONF = new Configuration(); + CONF.set("fs.defaultFS", "file:///"); + CONF.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem"); + } + + @Test + void testAll() { + IMapFileStorage storage = new IMapFileStorage(); + Map properties = new HashMap<>(); + properties.put(FileConstants.FileInitProperties.BUSINESS_KEY, "random"); + properties.put(FileConstants.FileInitProperties.NAMESPACE_KEY, "/tmp/imap-kris-test/2"); + properties.put(FileConstants.FileInitProperties.CLUSTER_NAME, "test-one"); + properties.put(FileConstants.FileInitProperties.HDFS_CONFIG_KEY, CONF); + + storage.initialize(properties); + + String key1Index = "key1"; + String key2Index = "key2"; + String key50Index = "key50"; + + AtomicInteger dataSize = new AtomicInteger(); + Long keyValue = 123456789L; + for (int i = 0; i < 100; i++) { + String key = "key" + i; + Long value = System.currentTimeMillis(); + + if (i == 50) { + // delete + storage.delete(key1Index); + //update + storage.store(key2Index, keyValue); + value = keyValue; + new Thread(() -> dataSize.set(storage.loadAll().size())).start(); + } + storage.store(key, value); + storage.delete(key1Index); + } + + await().atMost(1, TimeUnit.SECONDS).until(dataSize::get, size -> size > 0); + Map loadAllDatas = storage.loadAll(); + Assertions.assertTrue(dataSize.get() >= 50); + Assertions.assertEquals(keyValue, loadAllDatas.get(key50Index)); + Assertions.assertEquals(keyValue, loadAllDatas.get(key2Index)); + Assertions.assertNull(loadAllDatas.get(key1Index)); + storage.destroy(); + } + + @AfterAll + static void afterAll() throws IOException { + FileSystem.get(CONF).delete(new Path("/tmp/imap-kris-test/2"), true); + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReaderAndWriterTest.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReaderAndWriterTest.java new file mode 100644 index 00000000000..cc7663babfa --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReaderAndWriterTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.common; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.condition.OS.LINUX; +import static org.junit.jupiter.api.condition.OS.MAC; + +import org.apache.seatunnel.engine.imap.storage.api.common.ProtoStuffSerializer; +import org.apache.seatunnel.engine.imap.storage.api.common.Serializer; +import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledOnOs; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; + +@EnabledOnOs({LINUX, MAC}) +public class WALReaderAndWriterTest { + + private static FileSystem FS; + private static final Path PARENT_PATH = new Path("/tmp/9/"); + private static final Serializer SERIALIZER = new ProtoStuffSerializer(); + + @BeforeAll + public static void init() throws IOException { + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", "file:///"); + conf.set("fs.hdfs.impl", "org.apache.hadoop.fs.LocalFileSystem"); + FS = FileSystem.getLocal(conf); + } + + @Test + public void testWriterAndReader() throws Exception { + WALWriter writer = new WALWriter(FS, PARENT_PATH, SERIALIZER); + IMapFileData data; + boolean isDelete; + for (int i = 0; i < 1024; i++) { + data = IMapFileData.builder() + .key(SERIALIZER.serialize("key" + i)) + .keyClassName(String.class.getName()) + .value(SERIALIZER.serialize("value" + i)) + .valueClassName(Integer.class.getName()) + .timestamp(System.nanoTime()) + .build(); + if (i % 2 == 0) { + isDelete = true; + data.setKey(SERIALIZER.serialize(i)); + data.setKeyClassName(Integer.class.getName()); + } else { + isDelete = false; + } + data.setDeleted(isDelete); + + writer.write(data); + } + //update key 511 + data = IMapFileData.builder() + .key(SERIALIZER.serialize("key" + 511)) + .keyClassName(String.class.getName()) + .value(SERIALIZER.serialize("Kristen")) + .valueClassName(String.class.getName()) + .deleted(false) + .timestamp(System.nanoTime()) + .build(); + writer.write(data); + //delete key 519 + data = IMapFileData.builder() + .key(SERIALIZER.serialize("key" + 519)) + .keyClassName(String.class.getName()) + .deleted(true) + .timestamp(System.nanoTime()) + .build(); + + writer.write(data); + writer.close(); + await().atMost(10, java.util.concurrent.TimeUnit.SECONDS).await(); + + WALReader reader = new WALReader(FS, new ProtoStuffSerializer()); + Map result = reader.loadAllData(PARENT_PATH, new HashSet<>()); + Assertions.assertEquals("Kristen", result.get("key511")); + Assertions.assertEquals(511, result.size()); + Assertions.assertNull(result.get("key519")); + + } + + @AfterAll + public static void close() throws IOException { + FS.delete(PARENT_PATH, true); + FS.close(); + + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptorTest.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptorTest.java new file mode 100644 index 00000000000..11df7b3ca3b --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptorTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.disruptor; + +import static org.junit.jupiter.api.condition.OS.LINUX; +import static org.junit.jupiter.api.condition.OS.MAC; + +import org.apache.seatunnel.engine.imap.storage.api.common.ProtoStuffSerializer; +import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; +import org.apache.seatunnel.engine.imap.storage.file.future.RequestFuture; +import org.apache.seatunnel.engine.imap.storage.file.future.RequestFutureCache; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledOnOs; + +import java.io.IOException; + +@EnabledOnOs({LINUX, MAC}) +public class WALDisruptorTest { + + private static final String FILEPATH = "/tmp/WALDisruptorTest/"; + + private static WALDisruptor DISRUPTOR; + + private static FileSystem FS; + + private static final Configuration CONF; + + static { + CONF = new Configuration(); + CONF.set("fs.defaultFS", "file:///"); + CONF.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem"); + } + + @Test + void testProducerAndConsumer() throws IOException { + FS = FileSystem.get(CONF); + DISRUPTOR = new WALDisruptor(FS, FILEPATH, new ProtoStuffSerializer()); + IMapFileData data; + for (int i = 0; i < 100; i++) { + data = IMapFileData.builder() + .deleted(false) + .key(("key" + i).getBytes()) + .keyClassName(String.class.getName()) + .value(("value" + i).getBytes()) + .valueClassName(String.class.getName()) + .timestamp(System.nanoTime()) + .build(); + long requestId = RequestFutureCache.getRequestId(); + RequestFutureCache.put(requestId, new RequestFuture()); + DISRUPTOR.tryAppendPublish(data, requestId); + } + DISRUPTOR.close(); + + } + + @AfterAll + public static void afterAll() throws IOException { + Assertions.assertTrue(FS.delete(new Path(FILEPATH), true)); + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/pom.xml b/seatunnel-engine/seatunnel-engine-storage/pom.xml index 1fd242e8a17..80c98a5069d 100644 --- a/seatunnel-engine/seatunnel-engine-storage/pom.xml +++ b/seatunnel-engine/seatunnel-engine-storage/pom.xml @@ -33,6 +33,8 @@ checkpoint-storage-api checkpoint-storage-plugins + imap-storage-api + imap-storage-file \ No newline at end of file diff --git a/tools/checkstyle/checkStyle.xml b/tools/checkstyle/checkStyle.xml index 82c33518969..253d83477f3 100755 --- a/tools/checkstyle/checkStyle.xml +++ b/tools/checkstyle/checkStyle.xml @@ -104,6 +104,8 @@ + +