Skip to content

Commit

Permalink
Merge pull request #16 from kestra-io/feat/multiple_file_editor_storage
Browse files Browse the repository at this point in the history
feat: multi file editor
  • Loading branch information
Martin authored Oct 23, 2023
2 parents c33c286 + fa56a5c commit 77616bf
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 210 deletions.
9 changes: 5 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ dependencies {
api 'software.amazon.awssdk:s3-transfer-manager'
api 'software.amazon.awssdk.crt:aws-crt:0.24.0'
api 'software.amazon.awssdk:apache-client'
api 'software.amazon.awssdk:sts'
}


Expand Down Expand Up @@ -89,19 +90,19 @@ dependencies {

// test deps needed only for to have a runner
testImplementation group: "io.kestra", name: "core", version: kestraVersion
testImplementation group: "io.kestra", name: "core", version: kestraVersion, classifier: 'tests'
testImplementation group: "io.kestra", name: "repository-memory", version: kestraVersion
testImplementation group: "io.kestra", name: "runner-memory", version: kestraVersion
testImplementation group: "io.kestra", name: "storage-local", version: kestraVersion

// test
testImplementation "org.junit.jupiter:junit-jupiter-engine"
testImplementation "org.hamcrest:hamcrest:2.2"
testImplementation "org.hamcrest:hamcrest-library:2.2"

// containers
testImplementation "org.testcontainers:junit-jupiter:1.19.0"
testImplementation "org.testcontainers:testcontainers:1.19.0"
testImplementation "org.testcontainers:localstack:1.19.0"
testImplementation "org.testcontainers:junit-jupiter:1.19.1"
testImplementation "org.testcontainers:testcontainers:1.19.1"
testImplementation "org.testcontainers:localstack:1.19.1"
testImplementation "com.amazonaws:aws-java-sdk-s3:$s3Version"
}

Expand Down
46 changes: 46 additions & 0 deletions src/main/java/io/kestra/storage/s3/S3FileAttributes.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.kestra.storage.s3;

import io.kestra.core.storages.FileAttributes;
import lombok.Builder;
import lombok.Value;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;

@Value
@Builder
public class S3FileAttributes implements FileAttributes {

String fileName;
HeadObjectResponse head;
boolean isDirectory;

@Override
public long getLastModifiedTime() {
return head.lastModified().getEpochSecond();
}

/**
* https://docs.aws.amazon.com/AmazonS3/latest/userguide/intro-lifecycle-rules.html
* Amazon S3 maintains only the last modified date for each object. For example, the Amazon S3 console shows
* the Last Modified date in the object Properties pane. When you initially create a new object, this date reflects
* the date the object is created. If you replace the object, the date changes accordingly. So when we use the term
* creation date, it is synonymous with the term last modified date.
* @return
*/
@Override
public long getCreationTime() {
return head.lastModified().getEpochSecond();
}

@Override
public FileType getType() {
if (isDirectory || fileName.endsWith("/") || head.contentType().equals("application/x-directory")) {
return FileType.Directory;
}
return FileType.File;
}

@Override
public long getSize() {
return head.contentLength();
}
}
195 changes: 172 additions & 23 deletions src/main/java/io/kestra/storage/s3/S3Storage.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package io.kestra.storage.s3;

import io.kestra.core.storages.FileAttributes;
import io.kestra.core.storages.StorageInterface;
import io.micronaut.core.annotation.Introspected;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Builder;
import org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.*;
Expand All @@ -20,12 +22,16 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;

import static io.kestra.core.utils.Rethrow.throwFunction;

@Singleton
@Introspected
@S3StorageEnabled
Expand All @@ -42,24 +48,27 @@ public S3Storage(S3Config s3Config) {
this.s3AsyncClient = S3ClientFactory.getAsyncS3Client(s3Config);
}

public String createBucket(String bucketName) throws IOException {
public String createBucket() throws IOException {
try {
CreateBucketRequest request = CreateBucketRequest.builder().bucket(bucketName).build();
CreateBucketRequest request = CreateBucketRequest.builder().bucket(s3Config.getBucket()).build();
s3Client.createBucket(request);
return bucketName;
} catch (BucketAlreadyExistsException exception) {
throw new IOException(exception);
return s3Config.getBucket();
} catch (AwsServiceException exception) {
throw new IOException(exception);
}
}

@Override
public InputStream get(String tenantId, URI uri) throws IOException {
String path = getPath(tenantId, uri);
return get(path);
}

private ResponseInputStream<GetObjectResponse> get(String path) throws IOException {
try {
GetObjectRequest request = GetObjectRequest.builder()
.bucket(s3Config.getBucket())
.key(getPath(tenantId, uri))
.key(path)
.build();
return s3Client.getObject(request);
} catch (NoSuchKeyException exception) {
Expand All @@ -69,6 +78,38 @@ public InputStream get(String tenantId, URI uri) throws IOException {
}
}

@Override
public List<FileAttributes> list(String tenantId, URI uri) throws IOException {
String path = getPath(tenantId, uri);
String prefix = (path.endsWith("/")) ? path : path + "/";
try {
ListObjectsV2Request request = ListObjectsV2Request.builder()
.bucket(s3Config.getBucket())
.prefix(prefix)
.build();
List<S3Object> contents = s3Client.listObjectsV2(request).contents();
List<FileAttributes> list = contents.stream()
.map(S3Object::key)
.filter(key -> {
key = key.substring(prefix.length());
// Remove recursive result and requested dir
return !key.isEmpty() && !Objects.equals(key, prefix) && Path.of(key).getParent() == null;
})
.map(throwFunction(this::getFileAttributes))
.toList();
if (list.isEmpty()) {
// s3 does not handle directory deleting with a prefix that does not exist will just delete nothing
// Deleting an "empty directory" will at least return the directory name
throw new FileNotFoundException(uri + " (Not Found)");
}
return list;
} catch (NoSuchKeyException exception) {
throw new FileNotFoundException();
} catch (AwsServiceException exception) {
throw new IOException(exception);
}
}

@Override
public Long size(String tenantId, URI uri) throws IOException {
try {
Expand Down Expand Up @@ -99,14 +140,50 @@ public Long lastModifiedTime(String tenantId, URI uri) throws IOException {
}
}

@Override
public FileAttributes getAttributes(String tenantId, URI uri) throws IOException {
String path = getPath(tenantId, uri);
try {
return getFileAttributes(path);
} catch (FileNotFoundException e) {
if (path.endsWith("/")) {
throw e;
}
return getFileAttributes(path + "/");
}
}

private FileAttributes getFileAttributes(String path) throws IOException {
try {
HeadObjectRequest headObjectRequest = HeadObjectRequest.builder()
.bucket(s3Config.getBucket())
.key(path)
.build();
S3FileAttributes.S3FileAttributesBuilder builder = S3FileAttributes.builder()
.fileName(Path.of(path).getFileName().toString())
.head(s3Client.headObject(headObjectRequest));
if (path.endsWith("/")) {
builder.isDirectory(true);
}
return builder
.build();
} catch (NoSuchKeyException exception) {
throw new FileNotFoundException();
} catch (AwsServiceException exception) {
throw new IOException(exception);
}
}

@Override
public URI put(String tenantId, URI uri, InputStream data) throws IOException {
try {
int length = data.available();

String path = getPath(tenantId, uri);
mkdirs(path);
PutObjectRequest request = PutObjectRequest.builder()
.bucket(s3Config.getBucket())
.key(getPath(tenantId, uri))
.key(path)
.build();

Optional<Upload> upload;
Expand All @@ -123,7 +200,7 @@ public URI put(String tenantId, URI uri, InputStream data) throws IOException {
}

PutObjectResponse response = upload.orElseThrow(IOException::new).completionFuture().get().response();
return createUri(uri.getPath());
return createUri(tenantId, uri.getPath());
} catch (AwsServiceException exception) {
throw new IOException(exception);
} catch (ExecutionException | InterruptedException exception) {
Expand All @@ -133,26 +210,88 @@ public URI put(String tenantId, URI uri, InputStream data) throws IOException {

@Override
public boolean delete(String tenantId, URI uri) throws IOException {
return !deleteByPrefix(tenantId, uri).isEmpty();
}

@Override
public URI createDirectory(String tenantId, URI uri) throws IOException {
String path = getPath(tenantId, uri);
if (!StringUtils.endsWith(path, "/")) {
path += "/";
}
mkdirs(path);
PutObjectRequest putRequest = PutObjectRequest.builder()
.bucket(s3Config.getBucket())
.key(path)
.build();
s3Client.putObject(putRequest, RequestBody.empty());
return createUri(tenantId, uri.getPath());
}

private void mkdirs(String path) throws IOException {
try {
try {
lastModifiedTime(tenantId, uri);
} catch (FileNotFoundException exception) {
return false;
if (!path.endsWith("/") && path.lastIndexOf('/') > 0) {
path = path.substring(0, path.lastIndexOf('/') + 1);
}

DeleteObjectRequest request = DeleteObjectRequest.builder()
PutObjectRequest putRequest = PutObjectRequest.builder()
.bucket(s3Config.getBucket())
.key(getPath(tenantId, uri))
.key(path)
.build();

return s3Client.deleteObject(request).sdkHttpResponse().isSuccessful();
} catch (AwsServiceException exception) {
s3Client.putObject(putRequest, RequestBody.empty());
} catch (AwsServiceException | SdkClientException exception) {
throw new IOException(exception);
} catch (SdkClientException exception) {
}
}

@Override
public URI move(String tenantId, URI from, URI to) throws IOException {
String source = getPath(tenantId, from);
String dest = getPath(tenantId, to);
try {
FileAttributes attributes = getAttributes(tenantId, from);
if (attributes.getType() == FileAttributes.FileType.Directory) {
ListObjectsV2Request listRequest = ListObjectsV2Request.builder()
.bucket(s3Config.getBucket())
.prefix(source)
.build();

ListObjectsV2Response listResponse = s3Client.listObjectsV2(listRequest);
List<S3Object> objects = listResponse.contents();
if (objects.isEmpty()) {
throw new FileNotFoundException(to + " (Not Found)");
}

for (S3Object object : objects) {
String newKey = dest + object.key().substring(source.length());
move(object.key(), newKey);
}
} else {
move(source, dest);
}

return createUri(tenantId, to.getPath());
} catch (AwsServiceException | SdkClientException exception) {
throw new IOException(exception);
}
}

private void move(String oldKey, String newKey) {
CopyObjectRequest copyRequest = CopyObjectRequest.builder()
.sourceBucket(s3Config.getBucket())
.sourceKey(oldKey)
.destinationBucket(s3Config.getBucket())
.destinationKey(newKey)
.build();
s3Client.copyObject(copyRequest);

DeleteObjectRequest deleteRequest = DeleteObjectRequest.builder()
.bucket(s3Config.getBucket())
.key(oldKey)
.build();
s3Client.deleteObject(deleteRequest);
}

@Override
public List<URI> deleteByPrefix(String tenantId, URI storagePrefix) throws IOException {
ListObjectsRequest listRequest = ListObjectsRequest.builder()
Expand Down Expand Up @@ -183,20 +322,30 @@ public List<URI> deleteByPrefix(String tenantId, URI storagePrefix) throws IOExc

return result.deleted().stream()
.map(DeletedObject::key)
.map(key -> createUri(key.replace(tenantId + "/", "")))
.map(k -> (k.endsWith("/")) ? k.substring(0, k.length() - 1) : k)
.map(key -> createUri(tenantId, key))
.toList();
} catch (AwsServiceException exception) {
throw new IOException(exception);
}
}

private String getPath(String tenantId, URI uri) {
parentTraversalGuard(uri);
if (tenantId == null) {
return uri.getPath();
}
return "/" + tenantId + uri.getPath();
}
private static URI createUri(String key) {
return URI.create("kestra://%s".formatted(key));

// Traversal does not work with s3 but it just return empty objects so throwing is more explicit
private void parentTraversalGuard(URI uri) {
if (uri.toString().contains("..")) {
throw new IllegalArgumentException("File should be accessed with their full path and not using relative '..' path.");
}
}

private static URI createUri(String tenantId, String key) {
return URI.create("kestra://%s".formatted(key).replace(tenantId + "/", ""));
}
}
Loading

0 comments on commit 77616bf

Please sign in to comment.