Skip to content

Commit

Permalink
chore: update to use new storage interface with tenant (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin authored Oct 12, 2023
1 parent c00a2b1 commit 9f9ad6a
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 34 deletions.
31 changes: 17 additions & 14 deletions src/main/java/io/kestra/storage/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ public String createBucket(String bucketName) throws IOException {
}

@Override
public InputStream get(URI uri) throws IOException {
public InputStream get(String tenantId, URI uri) throws IOException {
try {
GetObjectRequest request = GetObjectRequest.builder()
.bucket(s3Config.getBucket())
.key(uri.getPath())
.key(getPath(tenantId, uri))
.build();
return s3Client.getObject(request);
} catch (NoSuchKeyException exception) {
Expand All @@ -70,11 +70,11 @@ public InputStream get(URI uri) throws IOException {
}

@Override
public Long size(URI uri) throws IOException {
public Long size(String tenantId, URI uri) throws IOException {
try {
HeadObjectRequest headObjectRequest = HeadObjectRequest.builder()
.bucket(s3Config.getBucket())
.key(uri.getPath())
.key(getPath(tenantId, uri))
.build();
return s3Client.headObject(headObjectRequest).contentLength();
} catch (NoSuchKeyException exception) {
Expand All @@ -85,11 +85,11 @@ public Long size(URI uri) throws IOException {
}

@Override
public Long lastModifiedTime(URI uri) throws IOException {
public Long lastModifiedTime(String tenantId, URI uri) throws IOException {
try {
HeadObjectRequest headObjectRequest = HeadObjectRequest.builder()
.bucket(s3Config.getBucket())
.key(uri.getPath())
.key(getPath(tenantId, uri))
.build();
return s3Client.headObject(headObjectRequest).lastModified().getEpochSecond();
} catch (NoSuchKeyException exception) {
Expand All @@ -100,13 +100,13 @@ public Long lastModifiedTime(URI uri) throws IOException {
}

@Override
public URI put(URI uri, InputStream data) throws IOException {
public URI put(String tenantId, URI uri, InputStream data) throws IOException {
try {
int length = data.available();

PutObjectRequest request = PutObjectRequest.builder()
.bucket(s3Config.getBucket())
.key(uri.getPath())
.key(getPath(tenantId, uri))
.build();

Optional<Upload> upload;
Expand All @@ -132,17 +132,17 @@ public URI put(URI uri, InputStream data) throws IOException {
}

@Override
public boolean delete(URI uri) throws IOException {
public boolean delete(String tenantId, URI uri) throws IOException {
try {
try {
lastModifiedTime(uri);
lastModifiedTime(tenantId, uri);
} catch (FileNotFoundException exception) {
return false;
}

DeleteObjectRequest request = DeleteObjectRequest.builder()
.bucket(s3Config.getBucket())
.key(uri.getPath())
.key(getPath(tenantId, uri))
.build();

return s3Client.deleteObject(request).sdkHttpResponse().isSuccessful();
Expand All @@ -154,10 +154,10 @@ public boolean delete(URI uri) throws IOException {
}

@Override
public List<URI> deleteByPrefix(URI storagePrefix) throws IOException {
public List<URI> deleteByPrefix(String tenantId, URI storagePrefix) throws IOException {
ListObjectsRequest listRequest = ListObjectsRequest.builder()
.bucket(s3Config.getBucket())
.prefix(storagePrefix.getPath())
.prefix(getPath(tenantId, storagePrefix))
.build();
ListObjectsResponse objectListing = s3Client.listObjects(listRequest);

Expand All @@ -183,13 +183,16 @@ public List<URI> deleteByPrefix(URI storagePrefix) throws IOException {

return result.deleted().stream()
.map(DeletedObject::key)
.map(S3Storage::createUri)
.map(key -> createUri(key.replace(tenantId + "/", "")))
.toList();
} catch (AwsServiceException exception) {
throw new IOException(exception);
}
}

private String getPath(String tenantId, URI uri) {
return "/" + tenantId + uri.getPath();
}
private static URI createUri(String key) {
return URI.create("kestra://%s".formatted(key));
}
Expand Down
47 changes: 27 additions & 20 deletions src/test/java/io/kestra/storage/s3/S3StorageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@ static void stopContainers() {
}
}

private URI putFile(URL resource, String path) throws Exception {
private URI putFile(String tenantId, URL resource, String path) throws Exception {

return storageInterface.put(
tenantId,
new URI(path),
new FileInputStream(Objects.requireNonNull(resource).getFile())
);
Expand All @@ -80,66 +82,70 @@ private URI putFile(URL resource, String path) throws Exception {
@Test
void get() throws Exception {
String prefix = IdUtils.create();
String tenantId = IdUtils.create();

URL resource = S3StorageTest.class.getClassLoader().getResource("application.yml");
String content = CharStreams.toString(new InputStreamReader(new FileInputStream(Objects.requireNonNull(resource).getFile())));

this.putFile(resource, "/" + prefix + "/storage/get.yml");
this.putFile(tenantId, resource, "/" + prefix + "/storage/get.yml");

URI item = new URI("/" + prefix + "/storage/get.yml");
InputStream get = storageInterface.get(item);
InputStream get = storageInterface.get(tenantId, item);
assertThat(CharStreams.toString(new InputStreamReader(get)), is(content));
assertTrue(storageInterface.exists(item));
assertThat(storageInterface.size(item), is((long) content.length()));
assertThat(storageInterface.lastModifiedTime(item), notNullValue());
assertTrue(storageInterface.exists(tenantId, item));
assertThat(storageInterface.size(tenantId, item), is((long) content.length()));
assertThat(storageInterface.lastModifiedTime(tenantId, item), notNullValue());

InputStream getScheme = storageInterface.get(new URI("kestra:///" + prefix + "/storage/get.yml"));
InputStream getScheme = storageInterface.get(tenantId, new URI("kestra:///" + prefix + "/storage/get.yml"));
assertThat(CharStreams.toString(new InputStreamReader(getScheme)), is(content));
}

@Test
void missing() {
String prefix = IdUtils.create();
String tenantId = IdUtils.create();

assertThrows(FileNotFoundException.class, () -> {
storageInterface.get(new URI("/" + prefix + "/storage/missing.yml"));
storageInterface.get(tenantId, new URI("/" + prefix + "/storage/missing.yml"));
});
}

@Test
void put() throws Exception {
String prefix = IdUtils.create();
String tenantId = IdUtils.create();

URL resource = S3StorageTest.class.getClassLoader().getResource("application.yml");
URI put = this.putFile(resource, "/" + prefix + "/storage/put.yml");
InputStream get = storageInterface.get(new URI("/" + prefix + "/storage/put.yml"));
URI put = this.putFile(tenantId, resource, "/" + prefix + "/storage/put.yml");
InputStream get = storageInterface.get(tenantId, new URI("/" + prefix + "/storage/put.yml"));

assertThat(put.toString(), is(new URI("kestra:///" + prefix + "/storage/put.yml").toString()));
assertThat(
CharStreams.toString(new InputStreamReader(get)),
is(CharStreams.toString(new InputStreamReader(new FileInputStream(Objects.requireNonNull(resource).getFile()))))
);

assertThat(storageInterface.size(new URI("/" + prefix + "/storage/put.yml")), is(74L));
assertThat(storageInterface.size(tenantId, new URI("/" + prefix + "/storage/put.yml")), is(74L));

assertThrows(FileNotFoundException.class, () -> {
assertThat(storageInterface.size(new URI("/" + prefix + "/storage/muissing.yml")), is(74L));
assertThat(storageInterface.size(tenantId, new URI("/" + prefix + "/storage/muissing.yml")), is(74L));
});

boolean delete = storageInterface.delete(put);
boolean delete = storageInterface.delete(tenantId, put);
assertThat(delete, is(true));

delete = storageInterface.delete(put);
delete = storageInterface.delete(tenantId, put);
assertThat(delete, is(false));

assertThrows(FileNotFoundException.class, () -> {
storageInterface.get(new URI("/" + prefix + "/storage/put.yml"));
storageInterface.get(tenantId, new URI("/" + prefix + "/storage/put.yml"));
});
}

@Test
void deleteByPrefix() throws Exception {
String prefix = IdUtils.create();
String tenantId = IdUtils.create();

URL resource = S3StorageTest.class.getClassLoader().getResource("application.yml");

Expand All @@ -149,29 +155,30 @@ void deleteByPrefix() throws Exception {
"/" + prefix + "/storage/level1/level2/1.yml"
);

path.forEach(throwConsumer(s -> this.putFile(resource, s)));
path.forEach(throwConsumer(s -> this.putFile(tenantId, resource, s)));

List<URI> deleted = storageInterface.deleteByPrefix(new URI("/" + prefix + "/storage/"));
List<URI> deleted = storageInterface.deleteByPrefix(tenantId, new URI("/" + prefix + "/storage/"));

assertThat(deleted, containsInAnyOrder(path.stream().map(s -> URI.create("kestra://" + s)).toArray()));

assertThrows(FileNotFoundException.class, () -> {
storageInterface.get(new URI("/" + prefix + "/storage/"));
storageInterface.get(tenantId, new URI("/" + prefix + "/storage/"));
});

path
.forEach(s -> {
assertThrows(FileNotFoundException.class, () -> {
storageInterface.get(new URI(s));
storageInterface.get(tenantId, new URI(s));
});
});
}

@Test
void deleteByPrefixNoResult() throws Exception {
String prefix = IdUtils.create();
String tenantId = IdUtils.create();

List<URI> deleted = storageInterface.deleteByPrefix(new URI("/" + prefix + "/storage/"));
List<URI> deleted = storageInterface.deleteByPrefix(tenantId, new URI("/" + prefix + "/storage/"));
assertThat(deleted.size(), is(0));
}
}

0 comments on commit 9f9ad6a

Please sign in to comment.