Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor/dynamic properties #17

Merged
merged 2 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,51 +1,43 @@
package io.kestra.plugin.transform.grok;

import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;

import java.util.List;
import java.util.Map;

public interface GrokInterface {

@PluginProperty
@Schema(title = "The Grok pattern to match.")
String getPattern();
Property<String> getPattern();

@PluginProperty
@Schema(title = "The list of Grok patterns to match.")
List<String> getPatterns();
Property<List<String>> getPatterns();

@PluginProperty
@Schema(
title = "List of user-defined pattern directories.",
description = "Directories must be paths relative to the working directory."
)
List<String> getPatternsDir();
Property<List<String>> getPatternsDir();

@PluginProperty
@Schema(
title = "Custom pattern definitions.",
description = "A map of pattern-name and pattern pairs defining custom patterns to be used by the current tasks. Patterns matching existing names will override the pre-existing definition. "
)
Map<String, String> getPatternDefinitions();
Property<Map<String, String>> getPatternDefinitions();

@PluginProperty
@Schema(title = "If `true`, only store named captures from grok.")
boolean isNamedCapturesOnly();
Property<Boolean> getNamedCapturesOnly();

@PluginProperty
@Schema(
title = "If `true`, break on first match.",
description = "The first successful match by grok will result in the task being finished. Set to `false` if you want the task to try all configured patterns."
)
boolean isBreakOnFirstMatch();
Property<Boolean> getBreakOnFirstMatch();

@PluginProperty
@Schema(
title = "If `true`, keep empty captures.",
description = "When an optional field cannot be captured, the empty field is retained in the output. Set `false` if you want empty optional fields to be filtered out."
)
boolean isKeepEmptyCaptures();
Property<Boolean> getKeepEmptyCaptures();
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.kestra.plugin.transform.grok;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.transform.grok.pattern.GrokMatcher;
Expand Down Expand Up @@ -30,59 +32,59 @@
@NoArgsConstructor
public abstract class Transform extends Task {

private String pattern;
private Property<String> pattern;

private List<String> patterns;
private Property<List<String>> patterns;

private List<String> patternsDir;
private Property<List<String>> patternsDir;

private Map<String, String> patternDefinitions;
private Property<Map<String, String>> patternDefinitions;

@Builder.Default
private boolean namedCapturesOnly = true;
private Property<Boolean> namedCapturesOnly = Property.of(true);

@Builder.Default
private boolean breakOnFirstMatch = true;
private Property<Boolean> breakOnFirstMatch = Property.of(true);

@Builder.Default
private boolean keepEmptyCaptures = false;
private Property<Boolean> keepEmptyCaptures = Property.of(false);

@Getter(AccessLevel.PRIVATE)
private GrokPatternCompiler compiler;

@Getter(AccessLevel.PRIVATE)
private List<GrokMatcher> grokMatchers;

public void init(final RunContext runContext) {
public void init(final RunContext runContext) throws IllegalVariableEvaluationException {

// create compiler
this.compiler = new GrokPatternCompiler(
new GrokPatternResolver(
runContext.logger(),
patternDefinitions(),
patternDefinitions(runContext),
patternsDir(runContext)
),
isNamedCapturesOnly()
runContext.render(getNamedCapturesOnly()).as(Boolean.class).orElseThrow()
);

// compile all patterns
this.grokMatchers = patterns().stream().map(compiler::compile).toList();
this.grokMatchers = patterns(runContext).stream().map(compiler::compile).toList();
}

public Map<String, Object> matches(final byte[] bytes) {
public Map<String, Object> matches(final byte[] bytes, RunContext runContext) throws IllegalVariableEvaluationException {
// match patterns
final List<Map<String, Object>> allNamedCaptured = new ArrayList<>(grokMatchers.size());
for (GrokMatcher matcher : grokMatchers) {
final Map<String, Object> captured = matcher.captures(bytes);
if (captured != null) {
allNamedCaptured.add(captured);
if (isBreakOnFirstMatch()) break;
if (runContext.render(getBreakOnFirstMatch()).as(Boolean.class).orElseThrow()) break;
}
}
// merge all named captured
Map<String, Object> mergedValues = new HashMap<>();
for (Map<String, Object> namedCaptured : allNamedCaptured) {
if (keepEmptyCaptures) {
if (runContext.render(getKeepEmptyCaptures()).as(Boolean.class).orElseThrow()) {
mergedValues.putAll(namedCaptured);
} else {
Map<String, Object> filtered = namedCaptured.entrySet()
Expand All @@ -98,27 +100,29 @@ public Map<String, Object> matches(final byte[] bytes) {
return mergedValues;
}

private Map<String, String> patternDefinitions() {
return Optional.ofNullable(patternDefinitions).orElse(Collections.emptyMap());
private Map<String, String> patternDefinitions(RunContext runContext) throws IllegalVariableEvaluationException {
return runContext.render(patternDefinitions).asMap(String.class, String.class);
}

private List<File> patternsDir(RunContext runContext) {
if (this.patternsDir == null || this.patternsDir.isEmpty()) return Collections.emptyList();
private List<File> patternsDir(RunContext runContext) throws IllegalVariableEvaluationException {
var renderedPatternsDir = runContext.render(this.patternsDir).asList(String.class);
if (renderedPatternsDir.isEmpty()) return Collections.emptyList();

return this.patternsDir
return renderedPatternsDir
.stream()
.map(dir -> runContext.workingDir().resolve(Path.of(dir)))
.map(Path::toFile)
.collect(Collectors.toList());
}

private List<String> patterns() {
if (pattern != null) return List.of(pattern);
private List<String> patterns(RunContext runContext) throws IllegalVariableEvaluationException {
if (pattern != null) return List.of(runContext.render(pattern).as(String.class).orElseThrow());

if (patterns == null || patterns.isEmpty()) {
var patternsList = runContext.render(patterns).asList(String.class);
if (patternsList.isEmpty()) {
throw new IllegalArgumentException(
"Missing required configuration, either `pattern` or `patterns` properties must not be empty.");
}
return patterns;
return patternsList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
Expand Down Expand Up @@ -42,7 +43,7 @@
description = """
The `TransformItems` task is similar to the famous Logstash Grok filter from the ELK stack.
It is particularly useful for transforming unstructured data such as logs into a structured, indexable, and queryable data structure.

The `TransformItems` ships with all the default patterns as defined You can find them here: https://github.com/kestra-io/plugin-transform/tree/main/plugin-transform-grok/src/main/resources/patterns.
"""
)
Expand All @@ -60,7 +61,7 @@
type: io.kestra.plugin.transform.grok.TransformItems
pattern: "%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{GREEDYDATA:message}"
from: "{{ trigger.uri }}"

triggers:
- id: trigger
type: io.kestra.plugin.kafka.Trigger
Expand All @@ -86,9 +87,8 @@ public class TransformItems extends Transform implements GrokInterface, Runnable
title = "The file to be transformed.",
description = "Must be a `kestra://` internal storage URI."
)
@PluginProperty(dynamic = true)
@NotNull
private String from;
private Property<String> from;

/**
* {@inheritDoc}
Expand All @@ -97,7 +97,7 @@ public class TransformItems extends Transform implements GrokInterface, Runnable
public Output run(RunContext runContext) throws Exception {
init(runContext);

String from = runContext.render(this.from);
String from = runContext.render(this.from).as(String.class).orElseThrow();

URI objectURI = new URI(from);
try (Reader reader = new BufferedReader(new InputStreamReader(runContext.storage().getFile(objectURI)), FileSerde.BUFFER_SIZE)) {
Expand All @@ -107,7 +107,13 @@ public Output run(RunContext runContext) throws Exception {
try(Writer writer = new BufferedWriter(new OutputStreamWriter(Files.newOutputStream(ouputFilePath)))) {

// transform
Flux<Map<String, Object>> values = flux.map(data -> matches(data.getBytes(StandardCharsets.UTF_8)));
Flux<Map<String, Object>> values = flux.map(data -> {
try {
return matches(data.getBytes(StandardCharsets.UTF_8), runContext);
} catch (IllegalVariableEvaluationException e) {
throw new RuntimeException(e);
}
});
Long processedItemsTotal = FileSerde.writeAll(writer, values).block();
URI uri = runContext.storage().putFile(ouputFilePath.toFile());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
Expand All @@ -30,7 +30,7 @@
description = """
The `TransformValue` task is similar to the famous Logstash Grok filter from the ELK stack.
It is particularly useful for transforming unstructured data such as logs into a structured, indexable, and queryable data structure.

The `TransformValue` ships with all the default patterns as defined You can find them here: https://github.com/kestra-io/plugin-transform/tree/main/plugin-transform-grok/src/main/resources/patterns.
"""
)
Expand All @@ -48,15 +48,15 @@
type: io.kestra.plugin.transform.grok.TransformValue
pattern: "%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{GREEDYDATA:message}"
from: "{{ trigger.value }}"

- id: log_on_warn
type: io.kestra.plugin.core.flow.If
condition: "{{ grok.value['LOGLEVEL'] == 'ERROR' }}"
then:
- id: when_true
type: io.kestra.plugin.core.log.Log
message: "{{ outputs.transform_value.value }}"

triggers:
- id: realtime_trigger
type: io.kestra.plugin.kafka.RealtimeTrigger
Expand All @@ -67,7 +67,7 @@
schema.registry.url: http://localhost:8085
keyDeserializer: STRING
valueDeserializer: STRING
groupId: kafkaConsumerGroupId
groupId: kafkaConsumerGroupId
"""
)
}
Expand All @@ -77,9 +77,8 @@ public class TransformValue extends Transform implements GrokInterface, Runnable
private static final ObjectMapper ION_OBJECT_MAPPER = JacksonMapper.ofIon();

@Schema(title = "The value to parse.")
@PluginProperty(dynamic = true)
@NotNull
private String from;
private Property<String> from;

/**
* {@inheritDoc}
Expand All @@ -88,10 +87,10 @@ public class TransformValue extends Transform implements GrokInterface, Runnable
public Output run(RunContext runContext) throws Exception {
init(runContext);

String from = runContext.render(this.from);
String from = runContext.render(this.from).as(String.class).orElseThrow();

// transform
Map<String, Object> values = matches(from.getBytes(StandardCharsets.UTF_8));
Map<String, Object> values = matches(from.getBytes(StandardCharsets.UTF_8), runContext);

// output
return Output.builder().value(values).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.core.type.TypeReference;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.serializers.FileSerde;
Expand Down Expand Up @@ -40,11 +41,11 @@ public void shouldTransformGivenMultipleItemsAndMultiplePatterns() throws Except
URI uri = runContext.storage().putFile(ouputFilePath.toFile());

TransformItems task = TransformItems.builder()
.patterns(List.of("%{INT}", "%{EMAILADDRESS}"))
.namedCapturesOnly(false)
.from(uri.toString())
.patternsDir(List.of("./custom-patterns"))
.breakOnFirstMatch(false)
.patterns(Property.of(List.of("%{INT}", "%{EMAILADDRESS}")))
.namedCapturesOnly(Property.of(false))
.from(Property.of(uri.toString()))
.patternsDir(Property.of(List.of("./custom-patterns")))
.breakOnFirstMatch(Property.of(false))
.build();

// When
Expand Down
Loading
Loading