Skip to content

Commit

Permalink
fix(datafactory): rollback duration to v1 properties
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle committed Sep 30, 2024
1 parent 8f3d9e7 commit 100db1e
Showing 1 changed file with 17 additions and 3 deletions.
20 changes: 17 additions & 3 deletions src/main/java/io/kestra/plugin/azure/datafactory/CreateRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.executions.metrics.Timer;
import io.kestra.core.models.property.Property;
Expand Down Expand Up @@ -79,8 +80,6 @@
public class CreateRun extends AbstractDataFactoryConnection implements RunnableTask<CreateRun.Output> {
private static final String PIPELINE_SUCCEEDED_STATUS = "Succeeded";
private static final List<String> PIPELINE_FAILED_STATUS = List.of("Failed", "Canceling", "Cancelled");
private static final Duration WAIT_UNTIL_COMPLETION = Duration.ofHours(1);
private static final Duration COMPLETION_CHECK_INTERVAL = Duration.ofSeconds(5);

@Schema(title = "Factory name")
private Property<String> factoryName;
Expand All @@ -104,6 +103,21 @@ public class CreateRun extends AbstractDataFactoryConnection implements Runnable
@Builder.Default
private Property<Boolean> wait = Property.of(Boolean.TRUE);

@Schema(
title = "Wait until completion duration",
description = "Maximum duration to wait for the pipeline to resolve. After this time the task will time out"
)
@PluginProperty
private Duration waitUntilCompletion = Duration.ofHours(1L);

@Schema(
title = "Completion check interval",
description = "The frequency with which the task checks whether the pipeline has completed."
)
@Builder.Default
@PluginProperty
private final Duration completionCheckInterval = Duration.ofSeconds(5L);

@Override
public CreateRun.Output run(RunContext runContext) throws Exception {
Logger logger = runContext.logger();
Expand Down Expand Up @@ -154,7 +168,7 @@ public CreateRun.Output run(RunContext runContext) throws Exception {
}

return PIPELINE_SUCCEEDED_STATUS.equals(runStatus);
}, COMPLETION_CHECK_INTERVAL, WAIT_UNTIL_COMPLETION);
}, completionCheckInterval, waitUntilCompletion);
} catch (TimeoutException | RuntimeException e) {
logger.error("Pipeline '{}' with runId '{} finished with status '{}'", pipelineName, runId, runningPipelineResponse.get().status());
throw new RuntimeException(runningPipelineResponse.get().message());
Expand Down

0 comments on commit 100db1e

Please sign in to comment.