Skip to content

Commit

Permalink
Add OSS/S3 to cluster-mode type apache#4621
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Apr 25, 2023
1 parent c8a5d52 commit 322e8e4
Show file tree
Hide file tree
Showing 22 changed files with 927 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,19 @@
<artifactId>imap-storage-file</artifactId>
<name>SeaTunnel : Engine : Storage : IMap Storage Plugins : File</name>

<properties>
<hadoop-aliyun.version>3.0.0</hadoop-aliyun.version>
<hadoop-aws.version>3.1.4</hadoop-aws.version>
<aws.java.sdk.version>1.11.271</aws.java.sdk.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>serializer-protobuf</artifactId>
<version>${project.version}</version>
</dependency>
<!-- hadoop jar -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
Expand All @@ -62,6 +69,27 @@
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aliyun</artifactId>
<version>${hadoop-aliyun.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop-aws.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
<version>${aws.java.sdk.version}</version>
<scope>provided</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData;
import org.apache.seatunnel.engine.imap.storage.file.common.FileConstants;
import org.apache.seatunnel.engine.imap.storage.file.common.WALReader;
import org.apache.seatunnel.engine.imap.storage.file.config.AbstractConfiguration;
import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration;
import org.apache.seatunnel.engine.imap.storage.file.disruptor.WALDisruptor;
import org.apache.seatunnel.engine.imap.storage.file.disruptor.WALEventType;
import org.apache.seatunnel.engine.imap.storage.file.future.RequestFuture;
Expand Down Expand Up @@ -52,7 +54,6 @@
import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.DEFAULT_IMAP_NAMESPACE;
import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.BUSINESS_KEY;
import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.CLUSTER_NAME;
import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.HDFS_CONFIG_KEY;
import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.NAMESPACE_KEY;
import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.WRITE_DATA_TIMEOUT_MILLISECONDS_KEY;

Expand All @@ -68,6 +69,8 @@
@Slf4j
public class IMapFileStorage implements IMapStorage {

private static final String STORAGE_TYPE_KEY = "storage.type";

public FileSystem fs;

public String namespace;
Expand Down Expand Up @@ -112,7 +115,16 @@ public class IMapFileStorage implements IMapStorage {
@Override
public void initialize(Map<String, Object> configuration) {
checkInitStorageProperties(configuration);
Configuration hadoopConf = (Configuration) configuration.get(HDFS_CONFIG_KEY);

String storageType =
String.valueOf(
configuration.getOrDefault(
STORAGE_TYPE_KEY, FileConfiguration.HDFS.toString()));
// build configuration
AbstractConfiguration fileConfiguration =
FileConfiguration.valueOf(storageType.toUpperCase()).getConfiguration(storageType);

Configuration hadoopConf = fileConfiguration.buildConfiguration(configuration);
this.conf = hadoopConf;
this.namespace = (String) configuration.getOrDefault(NAMESPACE_KEY, DEFAULT_IMAP_NAMESPACE);
this.businessName = (String) configuration.get(BUSINESS_KEY);
Expand Down Expand Up @@ -141,7 +153,10 @@ public void initialize(Map<String, Object> configuration) {
this.serializer = new ProtoStuffSerializer();
this.walDisruptor =
new WALDisruptor(
fs, businessRootPath + region + DEFAULT_IMAP_FILE_PATH_SPLIT, serializer);
fs,
FileConfiguration.valueOf(storageType.toUpperCase()),
businessRootPath + region + DEFAULT_IMAP_FILE_PATH_SPLIT,
serializer);
}

@Override
Expand Down Expand Up @@ -332,8 +347,7 @@ private void checkInitStorageProperties(Map<String, Object> properties) {
if (properties == null || properties.isEmpty()) {
throw new IllegalArgumentException("init file storage properties is empty");
}
List<String> requiredProperties =
Arrays.asList(BUSINESS_KEY, CLUSTER_NAME, HDFS_CONFIG_KEY);
List<String> requiredProperties = Arrays.asList(BUSINESS_KEY, CLUSTER_NAME);
for (String requiredProperty : requiredProperties) {
if (!properties.containsKey(requiredProperty)) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,10 @@
import org.apache.seatunnel.engine.imap.storage.api.IMapStorageFactory;
import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException;

import org.apache.hadoop.conf.Configuration;

import com.google.auto.service.AutoService;

import java.util.Map;

import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.FileInitProperties.HDFS_CONFIG_KEY;

@AutoService(IMapStorageFactory.class)
public class IMapFileStorageFactory implements IMapStorageFactory {
@Override
Expand All @@ -42,9 +38,6 @@ public String factoryIdentifier() {
@Override
public IMapStorage create(Map<String, Object> initMap) throws IMapStorageException {
IMapFileStorage iMapFileStorage = new IMapFileStorage();
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", (String) initMap.get("fs.defaultFS"));
initMap.put(HDFS_CONFIG_KEY, configuration);
iMapFileStorage.initialize(initMap);
return iMapFileStorage;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,57 +21,36 @@
package org.apache.seatunnel.engine.imap.storage.file.common;

import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData;
import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration;
import org.apache.seatunnel.engine.imap.storage.file.wal.DiscoveryWalFactory;
import org.apache.seatunnel.engine.imap.storage.file.wal.writer.IWriter;
import org.apache.seatunnel.engine.serializer.api.Serializer;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;

import java.io.IOException;
import java.util.EnumSet;

public class WALWriter implements AutoCloseable {

FSDataOutputStream out;
IWriter writer;

Serializer serializer;

public WALWriter(FileSystem fs, Path parentPath, Serializer serializer) throws IOException {
Path path = new Path(parentPath, "wal.txt");
this.out = fs.create(path);
this.serializer = serializer;
public WALWriter(
FileSystem fs,
FileConfiguration fileConfiguration,
Path parentPath,
Serializer serializer)
throws IOException {
this.writer = DiscoveryWalFactory.getWriter(fileConfiguration.getName());
this.writer.initialize(fs, parentPath, serializer);
}

public void write(IMapFileData data) throws IOException {
byte[] bytes = serializer.serialize(data);
this.write(bytes);
}

public void flush() throws IOException {
// hsync to flag
if (out instanceof HdfsDataOutputStream) {
((HdfsDataOutputStream) out)
.hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
}
if (out.getWrappedStream() instanceof DFSOutputStream) {
((DFSOutputStream) out.getWrappedStream())
.hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
} else {
out.hsync();
}
this.out.hflush();
}

private void write(byte[] bytes) throws IOException {
byte[] data = WALDataUtils.wrapperBytes(bytes);
this.out.write(data);
this.flush();
this.writer.write(data);
}

@Override
public void close() throws Exception {
out.close();
this.writer.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.seatunnel.engine.imap.storage.file.config;

import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException;

import org.apache.hadoop.conf.Configuration;

import java.util.Map;

public abstract class AbstractConfiguration {

protected static final String HDFS_IMPL_KEY = "impl";

/**
* check the configuration keys
*
* @param config configuration
* @param keys keys
*/
void checkConfiguration(Map<String, Object> config, String... keys) {
for (String key : keys) {
if (!config.containsKey(key) || null == config.get(key)) {
throw new IllegalArgumentException(key + " is required");
}
}
}

public abstract Configuration buildConfiguration(Map<String, Object> config)
throws IMapStorageException;

/**
* set extra options for configuration
*
* @param hadoopConf
* @param config
* @param prefix
*/
void setExtraConfiguration(
Configuration hadoopConf, Map<String, Object> config, String prefix) {
config.forEach(
(k, v) -> {
if (k.startsWith(prefix)) {
hadoopConf.set(k, String.valueOf(v));
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.seatunnel.engine.imap.storage.file.config;

public enum FileConfiguration {
LOCAL("local", new LocalConfiguration()),
HDFS("hdfs", new HdfsConfiguration()),
S3("s3", new S3Configuration()),
OSS("oss", new OssConfiguration());

/** file system type */
private final String name;

/** file system configuration */
private final AbstractConfiguration configuration;

FileConfiguration(String name, AbstractConfiguration configuration) {
this.name = name;
this.configuration = configuration;
}

public AbstractConfiguration getConfiguration(String name) {
return configuration;
}

public String getName() {
return name;
}
}
Loading

0 comments on commit 322e8e4

Please sign in to comment.