Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add doPrivilege blocks for socket connect ops in repository-hdfs #22793

Merged
merged 8 commits into from
Jan 27, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package org.elasticsearch.repositories.hdfs;

import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
Expand All @@ -36,6 +38,9 @@
import java.io.InputStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashMap;
Expand All @@ -56,12 +61,7 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
@Override
public boolean blobExists(String blobName) {
try {
return store.execute(new Operation<Boolean>() {
@Override
public Boolean run(FileContext fileContext) throws IOException {
return fileContext.util().exists(new Path(path, blobName));
}
});
return store.execute(fileContext -> fileContext.util().exists(new Path(path, blobName)));
} catch (Exception e) {
return false;
}
Expand All @@ -73,22 +73,14 @@ public void deleteBlob(String blobName) throws IOException {
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
}

store.execute(new Operation<Boolean>() {
@Override
public Boolean run(FileContext fileContext) throws IOException {
return fileContext.delete(new Path(path, blobName), true);
}
});
store.execute(fileContext -> fileContext.delete(new Path(path, blobName), true));
}

@Override
public void move(String sourceBlobName, String targetBlobName) throws IOException {
store.execute(new Operation<Void>() {
@Override
public Void run(FileContext fileContext) throws IOException {
fileContext.rename(new Path(path, sourceBlobName), new Path(path, targetBlobName));
return null;
}
store.execute((Operation<Void>) fileContext -> {
fileContext.rename(new Path(path, sourceBlobName), new Path(path, targetBlobName));
return null;
});
}

Expand All @@ -98,11 +90,20 @@ public InputStream readBlob(String blobName) throws IOException {
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
}
// FSDataInputStream does buffering internally
return store.execute(new Operation<InputStream>() {
@Override
public InputStream run(FileContext fileContext) throws IOException {
return fileContext.open(new Path(path, blobName), bufferSize);
}
return store.execute((Operation<InputStream>) fileContext -> {
FSDataInputStream is = fileContext.open(new Path(path, blobName), bufferSize);
return new InputStream() {
@Override
public int read() throws IOException {
try {
SpecialPermission.check();
// FSDataInputStream can open connection on read()
return AccessController.doPrivileged((PrivilegedExceptionAction<Integer>) is::read);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we maybe try to trigger this differently? I mean can we for instance try to call #available() or can we maybe read the first byte on open and wrap in a BufferedInputStream and then do this:

InputStream stream = is.isMarkSupported() ? is : new BufferedInputStream(is);
// do the following in doPrivileged?
stream.mark(1);
stream.skip(1);
stream.reset();
return stream;

} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
};
});
}

Expand All @@ -111,45 +112,33 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize) t
if (blobExists(blobName)) {
throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite");
}
store.execute(new Operation<Void>() {
@Override
public Void run(FileContext fileContext) throws IOException {
Path blob = new Path(path, blobName);
// we pass CREATE, which means it fails if a blob already exists.
// NOTE: this behavior differs from FSBlobContainer, which passes TRUNCATE_EXISTING
// that should be fixed there, no need to bring truncation into this, give the user an error.
EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK);
CreateOpts[] opts = { CreateOpts.bufferSize(bufferSize) };
try (FSDataOutputStream stream = fileContext.create(blob, flags, opts)) {
int bytesRead;
byte[] buffer = new byte[bufferSize];
while ((bytesRead = inputStream.read(buffer)) != -1) {
stream.write(buffer, 0, bytesRead);
// For safety we also hsync each write as well, because of its docs:
// SYNC_BLOCK - to force closed blocks to the disk device
// "In addition Syncable.hsync() should be called after each write,
// if true synchronous behavior is required"
stream.hsync();
}
store.execute((Operation<Void>) fileContext -> {
Path blob = new Path(path, blobName);
// we pass CREATE, which means it fails if a blob already exists.
// NOTE: this behavior differs from FSBlobContainer, which passes TRUNCATE_EXISTING
// that should be fixed there, no need to bring truncation into this, give the user an error.
EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK);
CreateOpts[] opts = { CreateOpts.bufferSize(bufferSize) };
try (FSDataOutputStream stream = fileContext.create(blob, flags, opts)) {
int bytesRead;
byte[] buffer = new byte[bufferSize];
while ((bytesRead = inputStream.read(buffer)) != -1) {
stream.write(buffer, 0, bytesRead);
// For safety we also hsync each write as well, because of its docs:
// SYNC_BLOCK - to force closed blocks to the disk device
// "In addition Syncable.hsync() should be called after each write,
// if true synchronous behavior is required"
stream.hsync();
}
return null;
}
return null;
});
}

@Override
public Map<String, BlobMetaData> listBlobsByPrefix(@Nullable final String prefix) throws IOException {
FileStatus[] files = store.execute(new Operation<FileStatus[]>() {
@Override
public FileStatus[] run(FileContext fileContext) throws IOException {
return (fileContext.util().listStatus(path, new PathFilter() {
@Override
public boolean accept(Path path) {
return prefix == null || path.getName().startsWith(prefix);
}
}));
}
});
FileStatus[] files = store.execute(fileContext -> (fileContext.util().listStatus(path,
path -> prefix == null || path.getName().startsWith(prefix))));
Map<String, BlobMetaData> map = new LinkedHashMap<String, BlobMetaData>();
for (FileStatus file : files) {
map.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.io.IOException;
import java.lang.reflect.ReflectPermission;
import java.net.SocketPermission;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
Expand All @@ -46,12 +47,7 @@ final class HdfsBlobStore implements BlobStore {
HdfsBlobStore(FileContext fileContext, String path, int bufferSize) throws IOException {
this.fileContext = fileContext;
this.bufferSize = bufferSize;
this.root = execute(new Operation<Path>() {
@Override
public Path run(FileContext fileContext) throws IOException {
return fileContext.makeQualified(new Path(path));
}
});
this.root = execute(fileContext1 -> fileContext1.makeQualified(new Path(path)));
try {
mkdirs(root);
} catch (FileAlreadyExistsException ok) {
Expand All @@ -60,23 +56,17 @@ public Path run(FileContext fileContext) throws IOException {
}

private void mkdirs(Path path) throws IOException {
execute(new Operation<Void>() {
@Override
public Void run(FileContext fileContext) throws IOException {
fileContext.mkdir(path, null, true);
return null;
}
execute((Operation<Void>) fileContext -> {
fileContext.mkdir(path, null, true);
return null;
});
}

@Override
public void delete(BlobPath path) throws IOException {
execute(new Operation<Void>() {
@Override
public Void run(FileContext fc) throws IOException {
fc.delete(translateToHdfsPath(path), true);
return null;
}
execute((Operation<Void>) fc -> {
fc.delete(translateToHdfsPath(path), true);
return null;
});
}

Expand Down Expand Up @@ -128,7 +118,7 @@ <V> V execute(Operation<V> operation) throws IOException {
try {
return AccessController.doPrivileged((PrivilegedExceptionAction<V>)
() -> operation.run(fileContext), null, new ReflectPermission("suppressAccessChecks"),
new AuthPermission("modifyPrivateCredentials"));
new AuthPermission("modifyPrivateCredentials"), new SocketPermission("*", "connect"));
} catch (PrivilegedActionException pae) {
throw (IOException) pae.getException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,12 @@ private static FileContext createContext(URI uri, Settings repositorySettings)
cfg.setBoolean("fs.hdfs.impl.disable.cache", true);

// create the filecontext with our user
return Subject.doAs(subject, new PrivilegedAction<FileContext>() {
@Override
public FileContext run() {
try {
AbstractFileSystem fs = AbstractFileSystem.get(uri, cfg);
return FileContext.getFileContext(fs, cfg);
} catch (UnsupportedFileSystemException e) {
throw new RuntimeException(e);
}
return Subject.doAs(subject, (PrivilegedAction<FileContext>) () -> {
try {
AbstractFileSystem fs = AbstractFileSystem.get(uri, cfg);
return FileContext.getFileContext(fs, cfg);
} catch (UnsupportedFileSystemException e) {
throw new RuntimeException(e);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,25 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;

public class HdfsBlobStoreContainerTests extends ESBlobStoreContainerTestCase {

@Override
protected BlobStore newBlobStore() throws IOException {
return AccessController.doPrivileged(
new PrivilegedAction<HdfsBlobStore>() {
@Override
public HdfsBlobStore run() {
try {
FileContext fileContext = createContext(new URI("hdfs:///"));
return new HdfsBlobStore(fileContext, "temp", 1024);
} catch (IOException | URISyntaxException e) {
throw new RuntimeException(e);
}
}
});
FileContext fileContext;
try {
fileContext = AccessController.doPrivileged((PrivilegedExceptionAction<FileContext>)
() -> createContext(new URI("hdfs:///")));
} catch (PrivilegedActionException e) {
throw new RuntimeException(e.getCause());
}
return new HdfsBlobStore(fileContext, "temp", 1024);
}

@SuppressForbidden(reason = "lesser of two evils (the other being a bunch of JNI/classloader nightmares)")
Expand Down Expand Up @@ -90,15 +87,12 @@ private FileContext createContext(URI uri) {
cfg.set("fs.AbstractFileSystem." + uri.getScheme() + ".impl", TestingFs.class.getName());

// create the FileContext with our user
return Subject.doAs(subject, new PrivilegedAction<FileContext>() {
@Override
public FileContext run() {
try {
TestingFs fs = (TestingFs) AbstractFileSystem.get(uri, cfg);
return FileContext.getFileContext(fs, cfg);
} catch (UnsupportedFileSystemException e) {
throw new RuntimeException(e);
}
return Subject.doAs(subject, (PrivilegedAction<FileContext>) () -> {
try {
TestingFs fs = (TestingFs) AbstractFileSystem.get(uri, cfg);
return FileContext.getFileContext(fs, cfg);
} catch (UnsupportedFileSystemException e) {
throw new RuntimeException(e);
}
});
}
Expand Down