Skip to content

Commit

Permalink
fix(datafactory): rollback duration to v1 properties
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle committed Oct 23, 2024
1 parent 5b8a385 commit 02ec3f3
Showing 1 changed file with 18 additions and 3 deletions.
21 changes: 18 additions & 3 deletions src/main/java/io/kestra/plugin/azure/datafactory/CreateRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.executions.metrics.Timer;
import io.kestra.core.models.property.Property;
Expand Down Expand Up @@ -84,8 +85,6 @@
public class CreateRun extends AbstractAzureIdentityConnection implements RunnableTask<CreateRun.Output> {
private static final String PIPELINE_SUCCEEDED_STATUS = "Succeeded";
private static final List<String> PIPELINE_FAILED_STATUS = List.of("Failed", "Canceling", "Cancelled");
private static final Duration WAIT_UNTIL_COMPLETION = Duration.ofHours(1);
private static final Duration COMPLETION_CHECK_INTERVAL = Duration.ofSeconds(5);

@Schema(title = "Subscription ID")
@NotNull
Expand Down Expand Up @@ -113,6 +112,22 @@ public class CreateRun extends AbstractAzureIdentityConnection implements Runnab
@Builder.Default
private Property<Boolean> wait = Property.of(Boolean.TRUE);

@Schema(
title = "Wait until completion duration",
description = "Maximum duration to wait for the pipeline to resolve. After this time the task will time out"
)
@PluginProperty
@Builder.Default
private Duration waitUntilCompletion = Duration.ofHours(1L);

@Schema(
title = "Completion check interval",
description = "The frequency with which the task checks whether the pipeline has completed."
)
@Builder.Default
@PluginProperty
private final Duration completionCheckInterval = Duration.ofSeconds(5L);

@Override
public CreateRun.Output run(RunContext runContext) throws Exception {
Logger logger = runContext.logger();
Expand Down Expand Up @@ -163,7 +178,7 @@ public CreateRun.Output run(RunContext runContext) throws Exception {
}

return PIPELINE_SUCCEEDED_STATUS.equals(runStatus);
}, COMPLETION_CHECK_INTERVAL, WAIT_UNTIL_COMPLETION);
}, completionCheckInterval, waitUntilCompletion);
} catch (TimeoutException | RuntimeException e) {
logger.error("Pipeline '{}' with runId '{} finished with status '{}'", pipelineName, runId, runningPipelineResponse.get().status());
throw new RuntimeException(runningPipelineResponse.get().message());
Expand Down

0 comments on commit 02ec3f3

Please sign in to comment.