From 52eb6c51df2d28ffb517eecff5dbe559c3e2caed Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Wed, 20 Nov 2024 09:28:00 -0600 Subject: [PATCH 1/5] Added ReceiverWithRules for later --- .../Transports/ReceiverWithRulesTests.cs | 61 +++++++++++++++++++ src/Wolverine/Transports/IReceiver.cs | 45 ++++++++++++++ 2 files changed, 106 insertions(+) create mode 100644 src/Testing/CoreTests/Transports/ReceiverWithRulesTests.cs diff --git a/src/Testing/CoreTests/Transports/ReceiverWithRulesTests.cs b/src/Testing/CoreTests/Transports/ReceiverWithRulesTests.cs new file mode 100644 index 000000000..15a0c7720 --- /dev/null +++ b/src/Testing/CoreTests/Transports/ReceiverWithRulesTests.cs @@ -0,0 +1,61 @@ +using NSubstitute; +using Wolverine.ComplianceTests; +using Wolverine.Transports; +using Xunit; + +namespace CoreTests.Transports; + +public class ReceiverWithRulesTests +{ + private readonly IReceiver theInner = Substitute.For(); + private readonly IEnvelopeRule rule1 = Substitute.For(); + private readonly IEnvelopeRule rule2 = Substitute.For(); + private readonly ReceiverWithRules theReceiver; + private readonly IListener theListener = Substitute.For(); + + public ReceiverWithRulesTests() + { + theReceiver = new ReceiverWithRules(theInner, [rule1, rule2]); + } + + [Fact] + public void dispose_delegates() + { + theReceiver.Dispose(); + theInner.Received().Dispose(); + } + + [Fact] + public async Task receive_a_single_envelope() + { + var envelope = ObjectMother.Envelope(); + + await theReceiver.ReceivedAsync(theListener, envelope); + rule1.Received().Modify(envelope); + rule2.Received().Modify(envelope); + + await theInner.Received().ReceivedAsync(theListener, envelope); + } + + [Fact] + public async Task receive_multiple_envelopes() + { + var envelope1 = ObjectMother.Envelope(); + var envelope2 = ObjectMother.Envelope(); + var envelope3 = ObjectMother.Envelope(); + + var envelopes = new Envelope[] { envelope1, envelope2, envelope3 }; + + await theReceiver.ReceivedAsync(theListener, envelopes); + rule1.Received().Modify(envelope1); + rule2.Received().Modify(envelope1); + + rule1.Received().Modify(envelope2); + rule2.Received().Modify(envelope2); + + rule1.Received().Modify(envelope3); + rule2.Received().Modify(envelope3); + + await theInner.Received().ReceivedAsync(theListener, envelopes); + } +} \ No newline at end of file diff --git a/src/Wolverine/Transports/IReceiver.cs b/src/Wolverine/Transports/IReceiver.cs index c27d18124..11ac9b7fb 100644 --- a/src/Wolverine/Transports/IReceiver.cs +++ b/src/Wolverine/Transports/IReceiver.cs @@ -6,4 +6,49 @@ public interface IReceiver : IDisposable ValueTask ReceivedAsync(IListener listener, Envelope envelope); ValueTask DrainAsync(); +} + +internal class ReceiverWithRules : IReceiver +{ + private readonly IReceiver _inner; + private readonly IEnvelopeRule[] _rules; + + public ReceiverWithRules(IReceiver inner, IEnumerable rules) + { + _inner = inner; + _rules = rules.ToArray(); + } + + public void Dispose() + { + _inner.Dispose(); + } + + public ValueTask ReceivedAsync(IListener listener, Envelope[] messages) + { + foreach (var envelope in messages) + { + foreach (var rule in _rules) + { + rule.Modify(envelope); + } + } + + return _inner.ReceivedAsync(listener, messages); + } + + public ValueTask ReceivedAsync(IListener listener, Envelope envelope) + { + foreach (var rule in _rules) + { + rule.Modify(envelope); + } + + return _inner.ReceivedAsync(listener, envelope); + } + + public async ValueTask DrainAsync() + { + throw new NotImplementedException(); + } } \ No newline at end of file From 4440092af0c8107997ac953a4f8d4a753942e7e4 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Wed, 20 Nov 2024 09:47:16 -0600 Subject: [PATCH 2/5] Adding listening rules to Endpoint, special rules for tenant id and message type name --- .../CoreTests/Configuration/EndpointTests.cs | 41 +++++++++++++++++ src/Wolverine/Configuration/Endpoint.cs | 26 +++++++++++ src/Wolverine/IEnvelopeRule.cs | 44 +++++++++++++++++++ 3 files changed, 111 insertions(+) diff --git a/src/Testing/CoreTests/Configuration/EndpointTests.cs b/src/Testing/CoreTests/Configuration/EndpointTests.cs index d0a12f24e..2264672cf 100644 --- a/src/Testing/CoreTests/Configuration/EndpointTests.cs +++ b/src/Testing/CoreTests/Configuration/EndpointTests.cs @@ -1,7 +1,10 @@ +using Wolverine.ComplianceTests; +using Wolverine.ComplianceTests.Compliance; using Wolverine.Configuration; using Wolverine.Runtime; using Wolverine.Transports; using Wolverine.Transports.Sending; +using Wolverine.Util; using Xunit; namespace CoreTests.Configuration; @@ -38,6 +41,44 @@ public void should_auto_start_as_listener(bool isListener, ListenerScope scope, endpoint.ShouldAutoStartAsListener(settings).ShouldBe(shouldStart); } + + [Fact] + public void return_message_type_rule() + { + var endpoint = new TestEndpoint(EndpointRole.System); + endpoint.RulesForIncoming().Any().ShouldBeFalse(); + + endpoint.MessageType = typeof(Message1); + + var rules = endpoint.RulesForIncoming().ToArray(); + + var envelope = ObjectMother.Envelope(); + foreach (var rule in rules) + { + rule.Modify(envelope); + } + + envelope.MessageType.ShouldBe(typeof(Message1).ToMessageTypeName()); + } + + [Fact] + public void return_tenant_id_rule() + { + var endpoint = new TestEndpoint(EndpointRole.System); + endpoint.RulesForIncoming().Any().ShouldBeFalse(); + + endpoint.TenantId = "one"; + + var rules = endpoint.RulesForIncoming().ToArray(); + + var envelope = ObjectMother.Envelope(); + foreach (var rule in rules) + { + rule.Modify(envelope); + } + + envelope.TenantId.ShouldBe("one"); + } } public class TestEndpoint : Endpoint diff --git a/src/Wolverine/Configuration/Endpoint.cs b/src/Wolverine/Configuration/Endpoint.cs index 8fb27b4b4..32daa7d33 100644 --- a/src/Wolverine/Configuration/Endpoint.cs +++ b/src/Wolverine/Configuration/Endpoint.cs @@ -216,6 +216,32 @@ public string EndpointName public bool IsUsedForReplies { get; set; } public IList OutgoingRules { get; } = new List(); + public IList IncomingRules { get; } = new List(); + + /// + /// In some cases, you may want to tell Wolverine that any message + /// coming into this endpoint are automatically tagged to a certain + /// tenant id + /// + public virtual string? TenantId { get; set; } + + internal IEnumerable RulesForIncoming() + { + foreach (var rule in IncomingRules) + { + yield return rule; + } + + if (MessageType != null) + { + yield return new MessageTypeRule(MessageType); + } + + if (TenantId.IsNotEmpty()) + { + yield return new TenantIdRule(TenantId); + } + } internal ISendingAgent? Agent { get; set; } diff --git a/src/Wolverine/IEnvelopeRule.cs b/src/Wolverine/IEnvelopeRule.cs index cb09f4de3..3782db33f 100644 --- a/src/Wolverine/IEnvelopeRule.cs +++ b/src/Wolverine/IEnvelopeRule.cs @@ -1,3 +1,5 @@ +using Wolverine.Util; + namespace Wolverine; /// @@ -9,6 +11,48 @@ public interface IEnvelopeRule void Modify(Envelope envelope); } +internal class MessageTypeRule : IEnvelopeRule +{ + private readonly Type _messageType; + private readonly string _messageTypeName; + + public MessageTypeRule(Type messageType) + { + _messageType = messageType; + _messageTypeName = messageType.ToMessageTypeName(); + } + + public void Modify(Envelope envelope) + { + envelope.MessageType = _messageTypeName; + } + + public override string ToString() + { + return $"{nameof(_messageType)} is {_messageType}, with MessageTypeName: {_messageTypeName}"; + } +} + +internal class TenantIdRule : IEnvelopeRule +{ + public string TenantId { get; } + + public TenantIdRule(string tenantId) + { + TenantId = tenantId; + } + + public void Modify(Envelope envelope) + { + envelope.TenantId = TenantId; + } + + public override string ToString() + { + return $"{nameof(TenantId)}: {TenantId}"; + } +} + internal class DeliverWithinRule : IEnvelopeRule { public DeliverWithinRule(TimeSpan time) From 7a7255b4620ebb9bcc789be5a7a11d2092d43214 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Wed, 20 Nov 2024 13:13:13 -0600 Subject: [PATCH 3/5] Little more logic on Endpoint to maybe wrap a receiver --- .../CoreTests/Configuration/EndpointTests.cs | 29 +++++++++++++++++++ src/Wolverine/Configuration/Endpoint.cs | 6 ++++ src/Wolverine/Transports/IReceiver.cs | 21 +++++++------- 3 files changed, 46 insertions(+), 10 deletions(-) diff --git a/src/Testing/CoreTests/Configuration/EndpointTests.cs b/src/Testing/CoreTests/Configuration/EndpointTests.cs index 2264672cf..9845d5529 100644 --- a/src/Testing/CoreTests/Configuration/EndpointTests.cs +++ b/src/Testing/CoreTests/Configuration/EndpointTests.cs @@ -1,3 +1,4 @@ +using NSubstitute; using Wolverine.ComplianceTests; using Wolverine.ComplianceTests.Compliance; using Wolverine.Configuration; @@ -79,6 +80,34 @@ public void return_tenant_id_rule() envelope.TenantId.ShouldBe("one"); } + + [Fact] + public void maybe_wrap_receiver_no_rules() + { + var endpoint = new TestEndpoint(EndpointRole.System); + endpoint.RulesForIncoming().Any().ShouldBeFalse(); + + var inner = Substitute.For(); + + endpoint.MaybeWrapReceiver(inner) + .ShouldBeSameAs(inner); + } + + [Fact] + public void maybe_wrap_receiver_with_rules() + { + var endpoint = new TestEndpoint(EndpointRole.System); + endpoint.TenantId = "one"; + + var inner = Substitute.For(); + + var wrapped = endpoint.MaybeWrapReceiver(inner) + .ShouldBeOfType(); + + wrapped.Inner.ShouldBeSameAs(inner); + wrapped.Rules.Single().ShouldBeOfType(); + + } } public class TestEndpoint : Endpoint diff --git a/src/Wolverine/Configuration/Endpoint.cs b/src/Wolverine/Configuration/Endpoint.cs index 32daa7d33..29431a155 100644 --- a/src/Wolverine/Configuration/Endpoint.cs +++ b/src/Wolverine/Configuration/Endpoint.cs @@ -387,6 +387,12 @@ public void RegisterSerializer(IMessageSerializer serializer) /// public abstract ValueTask BuildListenerAsync(IWolverineRuntime runtime, IReceiver receiver); + internal IReceiver MaybeWrapReceiver(IReceiver inner) + { + var rules = RulesForIncoming().ToArray(); + return rules.Any() ? new ReceiverWithRules(inner, rules) : inner; + } + /// /// Create new sending agent for this /// diff --git a/src/Wolverine/Transports/IReceiver.cs b/src/Wolverine/Transports/IReceiver.cs index 11ac9b7fb..cb123e755 100644 --- a/src/Wolverine/Transports/IReceiver.cs +++ b/src/Wolverine/Transports/IReceiver.cs @@ -10,41 +10,42 @@ public interface IReceiver : IDisposable internal class ReceiverWithRules : IReceiver { - private readonly IReceiver _inner; - private readonly IEnvelopeRule[] _rules; - public ReceiverWithRules(IReceiver inner, IEnumerable rules) { - _inner = inner; - _rules = rules.ToArray(); + Inner = inner; + Rules = rules.ToArray(); } + public IReceiver Inner { get; } + + public IEnvelopeRule[] Rules { get; } + public void Dispose() { - _inner.Dispose(); + Inner.Dispose(); } public ValueTask ReceivedAsync(IListener listener, Envelope[] messages) { foreach (var envelope in messages) { - foreach (var rule in _rules) + foreach (var rule in Rules) { rule.Modify(envelope); } } - return _inner.ReceivedAsync(listener, messages); + return Inner.ReceivedAsync(listener, messages); } public ValueTask ReceivedAsync(IListener listener, Envelope envelope) { - foreach (var rule in _rules) + foreach (var rule in Rules) { rule.Modify(envelope); } - return _inner.ReceivedAsync(listener, envelope); + return Inner.ReceivedAsync(listener, envelope); } public async ValueTask DrainAsync() From 599f9c456bcd26a6211db34acf7f0d249a992de5 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Wed, 20 Nov 2024 13:17:31 -0600 Subject: [PATCH 4/5] Potentially using the wrapped receiver for envelope rules on listeners --- src/Wolverine/Transports/ListeningAgent.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Wolverine/Transports/ListeningAgent.cs b/src/Wolverine/Transports/ListeningAgent.cs index 983cdef40..d26177027 100644 --- a/src/Wolverine/Transports/ListeningAgent.cs +++ b/src/Wolverine/Transports/ListeningAgent.cs @@ -164,7 +164,7 @@ public async ValueTask StartAsync() return; } - _receiver ??= await buildReceiverAsync(); + _receiver ??= Endpoint.MaybeWrapReceiver(await buildReceiverAsync()); if (Endpoint.ListenerCount > 1) From 3f55540ae9e4c00cc557308ab9a207bfaafb4202 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Thu, 21 Nov 2024 07:56:13 -0600 Subject: [PATCH 5/5] listening agent potentially uses any listening rules. Closes GH-1138 --- src/Wolverine/Transports/ListeningAgent.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Wolverine/Transports/ListeningAgent.cs b/src/Wolverine/Transports/ListeningAgent.cs index d26177027..5065abb47 100644 --- a/src/Wolverine/Transports/ListeningAgent.cs +++ b/src/Wolverine/Transports/ListeningAgent.cs @@ -165,8 +165,7 @@ public async ValueTask StartAsync() } _receiver ??= Endpoint.MaybeWrapReceiver(await buildReceiverAsync()); - - + if (Endpoint.ListenerCount > 1) { var listeners = new List(Endpoint.ListenerCount);