Skip to content

Commit

Permalink
refactor: dynamic properties (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle authored Dec 9, 2024
1 parent fba83dd commit 098e9a8
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
Expand Down Expand Up @@ -36,16 +37,14 @@ public abstract class AbstractFivetranConnection extends Task {
@Schema(
title = "API key"
)
@PluginProperty(dynamic = true)
@NotNull
String apiKey;
Property<String> apiKey;

@Schema(
title = "API secret"
)
@PluginProperty(dynamic = true)
@NotNull
String apiSecret;
Property<String> apiSecret;

private static final NettyHttpClientFactory FACTORY = new NettyHttpClientFactory();

Expand All @@ -63,7 +62,7 @@ protected <REQ, RES> HttpResponse<RES> request(RunContext runContext, MutableHtt
request = request
.contentType(MediaType.APPLICATION_JSON)
.accept("application/json;version=2")
.basicAuth(runContext.render(this.apiKey), runContext.render(this.apiSecret));
.basicAuth(runContext.render(this.apiKey).as(String.class).orElseThrow(), runContext.render(this.apiSecret).as(String.class).orElseThrow());

try (HttpClient client = this.client(runContext)) {
return client.toBlocking().exchange(request, argument);
Expand Down
23 changes: 10 additions & 13 deletions src/main/java/io/kestra/plugin/fivetran/connectors/Sync.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.VoidOutput;
import io.kestra.core.runners.RunContext;
Expand Down Expand Up @@ -59,33 +60,29 @@ public class Sync extends AbstractFivetranConnection implements RunnableTask<Voi
@Schema(
title = "The connector id to sync."
)
@PluginProperty(dynamic = true)
@NotNull
private String connectorId;
private Property<String> connectorId;

@Schema(
title = "Force with running sync.",
description = "If `force` is true and the connector is currently syncing, it will stop the sync and re-run it. " +
"If force is `false`, the connector will sync only if it isn't currently syncing."
)
@PluginProperty(dynamic = false)
@Builder.Default
Boolean force = false;
Property<Boolean> force = Property.of(false);

@Schema(
title = "Wait for the end of the job.",
description = "Allowing to capture job status & logs."
)
@PluginProperty(dynamic = false)
@Builder.Default
Boolean wait = true;
Property<Boolean> wait = Property.of(true);

@Schema(
title = "The max total wait duration."
)
@PluginProperty(dynamic = false)
@Builder.Default
Duration maxDuration = Duration.ofMinutes(60);
Property<Duration> maxDuration = Property.of(Duration.ofMinutes(60));

@Builder.Default
@Getter(AccessLevel.NONE)
Expand All @@ -94,7 +91,7 @@ public class Sync extends AbstractFivetranConnection implements RunnableTask<Voi
@Override
public VoidOutput run(RunContext runContext) throws Exception {
Logger logger = runContext.logger();
String connectorId = runContext.render(this.connectorId);
String connectorId = runContext.render(this.connectorId).as(String.class).orElseThrow();

// previous sync
Connector previousConnector = fetchConnector(runContext);
Expand All @@ -109,15 +106,15 @@ public VoidOutput run(RunContext runContext) throws Exception {
.of("/v2/connectors/{connectorId}/sync")
.expand(Map.of("connectorId", connectorId))
)
.body(Map.of("force", this.force)),
.body(Map.of("force", runContext.render(this.force).as(Boolean.class).orElseThrow())),
Argument.of(SyncResponse.class)
);

SyncResponse syncResponse = syncHttpResponse.getBody().orElseThrow(() -> new IllegalStateException("Missing body on trigger"));

logger.info("Job status {} with response: {}", syncHttpResponse.getStatus(), syncResponse);

if (!this.wait) {
if (!runContext.render(this.wait).as(Boolean.class).orElseThrow()) {
return null;
}

Expand All @@ -133,7 +130,7 @@ public VoidOutput run(RunContext runContext) throws Exception {
return null;
}),
Duration.ofSeconds(1),
this.maxDuration
runContext.render(this.maxDuration).as(Duration.class).orElseThrow()
);

if (finalConnector.getFailedAt() != null) {
Expand All @@ -151,7 +148,7 @@ private Connector fetchConnector(RunContext runContext) throws IllegalVariableEv
HttpMethod.GET,
UriTemplate
.of("/v2/connectors/{connectorId}")
.expand(Map.of("connectorId", runContext.render(this.connectorId)))
.expand(Map.of("connectorId", runContext.render(this.connectorId).as(String.class).orElseThrow()))
),
Argument.of(ConnectorResponse.class)
);
Expand Down

0 comments on commit 098e9a8

Please sign in to comment.