You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Message has an attribute orderingKey. Messages with orderingKey should be published.
Actual Behaviour
If the task has messages with orderingKey, the execution fails. Here is the error log:
2024-03-23 12:31:27.869 Cannot publish a message with an ordering key when message ordering is not enabled in the Publisher client. Please create a Publisher client with setEnableMessageOrdering(true) in the builder.
2024-03-23 12:31:27.869 java.lang.IllegalStateException: Cannot publish a message with an ordering key when message ordering is not enabled in the Publisher client. Please create a Publisher client with setEnableMessageOrdering(true) in the builder.
at com.google.common.base.Preconditions.checkState(Preconditions.java:512)
at com.google.cloud.pubsub.v1.Publisher.publish(Publisher.java:255)
at io.kestra.plugin.gcp.pubsub.Publish.lambda$buildFlowable$1(Publish.java:116)
at io.kestra.core.utils.Rethrow.lambda$throwFunction$4(Rethrow.java:90)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:171)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.MonoReduce$ReduceSubscriber.request(MonoReduce.java:222)
at reactor.core.publisher.BlockingSingleSubscriber.onSubscribe(BlockingSingleSubscriber.java:53)
at reactor.core.publisher.MonoReduce$ReduceSubscriber.onSubscribe(MonoReduce.java:98)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
at reactor.core.publisher.Mono.block(Mono.java:1727)
at io.kestra.plugin.gcp.pubsub.Publish.run(Publish.java:97)
at io.kestra.plugin.gcp.pubsub.Publish.run(Publish.java:28)
at io.kestra.core.runners.Worker$WorkerThread.run(Worker.java:710)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:103)
at reactor.core.publisher.Mono.block(Mono.java:1728)
... 3 more
Looks like we should enableMessageOrdering on the publisher when the messages contain orderingKey (or we can by default set it to enabled).
Steps To Reproduce
Take the example flow, and set correct values (ensure orderingKey is set for atleast one of the messages).
Execute the flow.
Environment Information
Kestra Version: 0.15.5
Plugin version: 0.15.5
Operating System (OS / Docker / Kubernetes): Docker
Expected Behavior
Message has an attribute
orderingKey
. Messages withorderingKey
should be published.Actual Behaviour
If the task has messages with orderingKey, the execution fails. Here is the error log:
Looks like we should enableMessageOrdering on the
publisher
when the messages containorderingKey
(or we can by default set it to enabled).Steps To Reproduce
orderingKey
is set for atleast one of the messages).Environment Information
Example flow
The text was updated successfully, but these errors were encountered: