Skip to content

Commit

Permalink
refactor/dynamic properties (#17)
Browse files Browse the repository at this point in the history
* refactor: migrate transform grok to dynamic properties

* refactor: migrate transform json to dynamic properties
  • Loading branch information
mgabelle authored Dec 11, 2024
1 parent aa723dd commit 406dd8b
Show file tree
Hide file tree
Showing 12 changed files with 124 additions and 120 deletions.
Original file line number Diff line number Diff line change
@@ -1,51 +1,43 @@
package io.kestra.plugin.transform.grok;

import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;

import java.util.List;
import java.util.Map;

public interface GrokInterface {

@PluginProperty
@Schema(title = "The Grok pattern to match.")
String getPattern();
Property<String> getPattern();

@PluginProperty
@Schema(title = "The list of Grok patterns to match.")
List<String> getPatterns();
Property<List<String>> getPatterns();

@PluginProperty
@Schema(
title = "List of user-defined pattern directories.",
description = "Directories must be paths relative to the working directory."
)
List<String> getPatternsDir();
Property<List<String>> getPatternsDir();

@PluginProperty
@Schema(
title = "Custom pattern definitions.",
description = "A map of pattern-name and pattern pairs defining custom patterns to be used by the current tasks. Patterns matching existing names will override the pre-existing definition. "
)
Map<String, String> getPatternDefinitions();
Property<Map<String, String>> getPatternDefinitions();

@PluginProperty
@Schema(title = "If `true`, only store named captures from grok.")
boolean isNamedCapturesOnly();
Property<Boolean> getNamedCapturesOnly();

@PluginProperty
@Schema(
title = "If `true`, break on first match.",
description = "The first successful match by grok will result in the task being finished. Set to `false` if you want the task to try all configured patterns."
)
boolean isBreakOnFirstMatch();
Property<Boolean> getBreakOnFirstMatch();

@PluginProperty
@Schema(
title = "If `true`, keep empty captures.",
description = "When an optional field cannot be captured, the empty field is retained in the output. Set `false` if you want empty optional fields to be filtered out."
)
boolean isKeepEmptyCaptures();
Property<Boolean> getKeepEmptyCaptures();
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.kestra.plugin.transform.grok;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.transform.grok.pattern.GrokMatcher;
Expand Down Expand Up @@ -30,59 +32,59 @@
@NoArgsConstructor
public abstract class Transform extends Task {

private String pattern;
private Property<String> pattern;

private List<String> patterns;
private Property<List<String>> patterns;

private List<String> patternsDir;
private Property<List<String>> patternsDir;

private Map<String, String> patternDefinitions;
private Property<Map<String, String>> patternDefinitions;

@Builder.Default
private boolean namedCapturesOnly = true;
private Property<Boolean> namedCapturesOnly = Property.of(true);

@Builder.Default
private boolean breakOnFirstMatch = true;
private Property<Boolean> breakOnFirstMatch = Property.of(true);

@Builder.Default
private boolean keepEmptyCaptures = false;
private Property<Boolean> keepEmptyCaptures = Property.of(false);

@Getter(AccessLevel.PRIVATE)
private GrokPatternCompiler compiler;

@Getter(AccessLevel.PRIVATE)
private List<GrokMatcher> grokMatchers;

public void init(final RunContext runContext) {
public void init(final RunContext runContext) throws IllegalVariableEvaluationException {

// create compiler
this.compiler = new GrokPatternCompiler(
new GrokPatternResolver(
runContext.logger(),
patternDefinitions(),
patternDefinitions(runContext),
patternsDir(runContext)
),
isNamedCapturesOnly()
runContext.render(getNamedCapturesOnly()).as(Boolean.class).orElseThrow()
);

// compile all patterns
this.grokMatchers = patterns().stream().map(compiler::compile).toList();
this.grokMatchers = patterns(runContext).stream().map(compiler::compile).toList();
}

public Map<String, Object> matches(final byte[] bytes) {
public Map<String, Object> matches(final byte[] bytes, RunContext runContext) throws IllegalVariableEvaluationException {
// match patterns
final List<Map<String, Object>> allNamedCaptured = new ArrayList<>(grokMatchers.size());
for (GrokMatcher matcher : grokMatchers) {
final Map<String, Object> captured = matcher.captures(bytes);
if (captured != null) {
allNamedCaptured.add(captured);
if (isBreakOnFirstMatch()) break;
if (runContext.render(getBreakOnFirstMatch()).as(Boolean.class).orElseThrow()) break;
}
}
// merge all named captured
Map<String, Object> mergedValues = new HashMap<>();
for (Map<String, Object> namedCaptured : allNamedCaptured) {
if (keepEmptyCaptures) {
if (runContext.render(getKeepEmptyCaptures()).as(Boolean.class).orElseThrow()) {
mergedValues.putAll(namedCaptured);
} else {
Map<String, Object> filtered = namedCaptured.entrySet()
Expand All @@ -98,27 +100,29 @@ public Map<String, Object> matches(final byte[] bytes) {
return mergedValues;
}

private Map<String, String> patternDefinitions() {
return Optional.ofNullable(patternDefinitions).orElse(Collections.emptyMap());
private Map<String, String> patternDefinitions(RunContext runContext) throws IllegalVariableEvaluationException {
return runContext.render(patternDefinitions).asMap(String.class, String.class);
}

private List<File> patternsDir(RunContext runContext) {
if (this.patternsDir == null || this.patternsDir.isEmpty()) return Collections.emptyList();
private List<File> patternsDir(RunContext runContext) throws IllegalVariableEvaluationException {
var renderedPatternsDir = runContext.render(this.patternsDir).asList(String.class);
if (renderedPatternsDir.isEmpty()) return Collections.emptyList();

return this.patternsDir
return renderedPatternsDir
.stream()
.map(dir -> runContext.workingDir().resolve(Path.of(dir)))
.map(Path::toFile)
.collect(Collectors.toList());
}

private List<String> patterns() {
if (pattern != null) return List.of(pattern);
private List<String> patterns(RunContext runContext) throws IllegalVariableEvaluationException {
if (pattern != null) return List.of(runContext.render(pattern).as(String.class).orElseThrow());

if (patterns == null || patterns.isEmpty()) {
var patternsList = runContext.render(patterns).asList(String.class);
if (patternsList.isEmpty()) {
throw new IllegalArgumentException(
"Missing required configuration, either `pattern` or `patterns` properties must not be empty.");
}
return patterns;
return patternsList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
Expand Down Expand Up @@ -42,7 +43,7 @@
description = """
The `TransformItems` task is similar to the famous Logstash Grok filter from the ELK stack.
It is particularly useful for transforming unstructured data such as logs into a structured, indexable, and queryable data structure.
The `TransformItems` ships with all the default patterns as defined You can find them here: https://github.com/kestra-io/plugin-transform/tree/main/plugin-transform-grok/src/main/resources/patterns.
"""
)
Expand All @@ -60,7 +61,7 @@
type: io.kestra.plugin.transform.grok.TransformItems
pattern: "%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{GREEDYDATA:message}"
from: "{{ trigger.uri }}"
triggers:
- id: trigger
type: io.kestra.plugin.kafka.Trigger
Expand All @@ -86,9 +87,8 @@ public class TransformItems extends Transform implements GrokInterface, Runnable
title = "The file to be transformed.",
description = "Must be a `kestra://` internal storage URI."
)
@PluginProperty(dynamic = true)
@NotNull
private String from;
private Property<String> from;

/**
* {@inheritDoc}
Expand All @@ -97,7 +97,7 @@ public class TransformItems extends Transform implements GrokInterface, Runnable
public Output run(RunContext runContext) throws Exception {
init(runContext);

String from = runContext.render(this.from);
String from = runContext.render(this.from).as(String.class).orElseThrow();

URI objectURI = new URI(from);
try (Reader reader = new BufferedReader(new InputStreamReader(runContext.storage().getFile(objectURI)), FileSerde.BUFFER_SIZE)) {
Expand All @@ -107,7 +107,13 @@ public Output run(RunContext runContext) throws Exception {
try(Writer writer = new BufferedWriter(new OutputStreamWriter(Files.newOutputStream(ouputFilePath)))) {

// transform
Flux<Map<String, Object>> values = flux.map(data -> matches(data.getBytes(StandardCharsets.UTF_8)));
Flux<Map<String, Object>> values = flux.map(data -> {
try {
return matches(data.getBytes(StandardCharsets.UTF_8), runContext);
} catch (IllegalVariableEvaluationException e) {
throw new RuntimeException(e);
}
});
Long processedItemsTotal = FileSerde.writeAll(writer, values).block();
URI uri = runContext.storage().putFile(ouputFilePath.toFile());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
Expand All @@ -30,7 +30,7 @@
description = """
The `TransformValue` task is similar to the famous Logstash Grok filter from the ELK stack.
It is particularly useful for transforming unstructured data such as logs into a structured, indexable, and queryable data structure.
The `TransformValue` ships with all the default patterns as defined You can find them here: https://github.com/kestra-io/plugin-transform/tree/main/plugin-transform-grok/src/main/resources/patterns.
"""
)
Expand All @@ -48,15 +48,15 @@
type: io.kestra.plugin.transform.grok.TransformValue
pattern: "%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{GREEDYDATA:message}"
from: "{{ trigger.value }}"
- id: log_on_warn
type: io.kestra.plugin.core.flow.If
condition: "{{ grok.value['LOGLEVEL'] == 'ERROR' }}"
then:
- id: when_true
type: io.kestra.plugin.core.log.Log
message: "{{ outputs.transform_value.value }}"
triggers:
- id: realtime_trigger
type: io.kestra.plugin.kafka.RealtimeTrigger
Expand All @@ -67,7 +67,7 @@
schema.registry.url: http://localhost:8085
keyDeserializer: STRING
valueDeserializer: STRING
groupId: kafkaConsumerGroupId
groupId: kafkaConsumerGroupId
"""
)
}
Expand All @@ -77,9 +77,8 @@ public class TransformValue extends Transform implements GrokInterface, Runnable
private static final ObjectMapper ION_OBJECT_MAPPER = JacksonMapper.ofIon();

@Schema(title = "The value to parse.")
@PluginProperty(dynamic = true)
@NotNull
private String from;
private Property<String> from;

/**
* {@inheritDoc}
Expand All @@ -88,10 +87,10 @@ public class TransformValue extends Transform implements GrokInterface, Runnable
public Output run(RunContext runContext) throws Exception {
init(runContext);

String from = runContext.render(this.from);
String from = runContext.render(this.from).as(String.class).orElseThrow();

// transform
Map<String, Object> values = matches(from.getBytes(StandardCharsets.UTF_8));
Map<String, Object> values = matches(from.getBytes(StandardCharsets.UTF_8), runContext);

// output
return Output.builder().value(values).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.core.type.TypeReference;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.serializers.FileSerde;
Expand Down Expand Up @@ -40,11 +41,11 @@ public void shouldTransformGivenMultipleItemsAndMultiplePatterns() throws Except
URI uri = runContext.storage().putFile(ouputFilePath.toFile());

TransformItems task = TransformItems.builder()
.patterns(List.of("%{INT}", "%{EMAILADDRESS}"))
.namedCapturesOnly(false)
.from(uri.toString())
.patternsDir(List.of("./custom-patterns"))
.breakOnFirstMatch(false)
.patterns(Property.of(List.of("%{INT}", "%{EMAILADDRESS}")))
.namedCapturesOnly(Property.of(false))
.from(Property.of(uri.toString()))
.patternsDir(Property.of(List.of("./custom-patterns")))
.breakOnFirstMatch(Property.of(false))
.build();

// When
Expand Down
Loading

0 comments on commit 406dd8b

Please sign in to comment.