diff --git a/src/Testing/CoreTests/Configuration/EndpointTests.cs b/src/Testing/CoreTests/Configuration/EndpointTests.cs index d0a12f24e..9845d5529 100644 --- a/src/Testing/CoreTests/Configuration/EndpointTests.cs +++ b/src/Testing/CoreTests/Configuration/EndpointTests.cs @@ -1,7 +1,11 @@ +using NSubstitute; +using Wolverine.ComplianceTests; +using Wolverine.ComplianceTests.Compliance; using Wolverine.Configuration; using Wolverine.Runtime; using Wolverine.Transports; using Wolverine.Transports.Sending; +using Wolverine.Util; using Xunit; namespace CoreTests.Configuration; @@ -38,6 +42,72 @@ public void should_auto_start_as_listener(bool isListener, ListenerScope scope, endpoint.ShouldAutoStartAsListener(settings).ShouldBe(shouldStart); } + + [Fact] + public void return_message_type_rule() + { + var endpoint = new TestEndpoint(EndpointRole.System); + endpoint.RulesForIncoming().Any().ShouldBeFalse(); + + endpoint.MessageType = typeof(Message1); + + var rules = endpoint.RulesForIncoming().ToArray(); + + var envelope = ObjectMother.Envelope(); + foreach (var rule in rules) + { + rule.Modify(envelope); + } + + envelope.MessageType.ShouldBe(typeof(Message1).ToMessageTypeName()); + } + + [Fact] + public void return_tenant_id_rule() + { + var endpoint = new TestEndpoint(EndpointRole.System); + endpoint.RulesForIncoming().Any().ShouldBeFalse(); + + endpoint.TenantId = "one"; + + var rules = endpoint.RulesForIncoming().ToArray(); + + var envelope = ObjectMother.Envelope(); + foreach (var rule in rules) + { + rule.Modify(envelope); + } + + envelope.TenantId.ShouldBe("one"); + } + + [Fact] + public void maybe_wrap_receiver_no_rules() + { + var endpoint = new TestEndpoint(EndpointRole.System); + endpoint.RulesForIncoming().Any().ShouldBeFalse(); + + var inner = Substitute.For(); + + endpoint.MaybeWrapReceiver(inner) + .ShouldBeSameAs(inner); + } + + [Fact] + public void maybe_wrap_receiver_with_rules() + { + var endpoint = new TestEndpoint(EndpointRole.System); + endpoint.TenantId = "one"; + + var inner = Substitute.For(); + + var wrapped = endpoint.MaybeWrapReceiver(inner) + .ShouldBeOfType(); + + wrapped.Inner.ShouldBeSameAs(inner); + wrapped.Rules.Single().ShouldBeOfType(); + + } } public class TestEndpoint : Endpoint diff --git a/src/Testing/CoreTests/Transports/ReceiverWithRulesTests.cs b/src/Testing/CoreTests/Transports/ReceiverWithRulesTests.cs new file mode 100644 index 000000000..15a0c7720 --- /dev/null +++ b/src/Testing/CoreTests/Transports/ReceiverWithRulesTests.cs @@ -0,0 +1,61 @@ +using NSubstitute; +using Wolverine.ComplianceTests; +using Wolverine.Transports; +using Xunit; + +namespace CoreTests.Transports; + +public class ReceiverWithRulesTests +{ + private readonly IReceiver theInner = Substitute.For(); + private readonly IEnvelopeRule rule1 = Substitute.For(); + private readonly IEnvelopeRule rule2 = Substitute.For(); + private readonly ReceiverWithRules theReceiver; + private readonly IListener theListener = Substitute.For(); + + public ReceiverWithRulesTests() + { + theReceiver = new ReceiverWithRules(theInner, [rule1, rule2]); + } + + [Fact] + public void dispose_delegates() + { + theReceiver.Dispose(); + theInner.Received().Dispose(); + } + + [Fact] + public async Task receive_a_single_envelope() + { + var envelope = ObjectMother.Envelope(); + + await theReceiver.ReceivedAsync(theListener, envelope); + rule1.Received().Modify(envelope); + rule2.Received().Modify(envelope); + + await theInner.Received().ReceivedAsync(theListener, envelope); + } + + [Fact] + public async Task receive_multiple_envelopes() + { + var envelope1 = ObjectMother.Envelope(); + var envelope2 = ObjectMother.Envelope(); + var envelope3 = ObjectMother.Envelope(); + + var envelopes = new Envelope[] { envelope1, envelope2, envelope3 }; + + await theReceiver.ReceivedAsync(theListener, envelopes); + rule1.Received().Modify(envelope1); + rule2.Received().Modify(envelope1); + + rule1.Received().Modify(envelope2); + rule2.Received().Modify(envelope2); + + rule1.Received().Modify(envelope3); + rule2.Received().Modify(envelope3); + + await theInner.Received().ReceivedAsync(theListener, envelopes); + } +} \ No newline at end of file diff --git a/src/Wolverine/Configuration/Endpoint.cs b/src/Wolverine/Configuration/Endpoint.cs index 8fb27b4b4..29431a155 100644 --- a/src/Wolverine/Configuration/Endpoint.cs +++ b/src/Wolverine/Configuration/Endpoint.cs @@ -216,6 +216,32 @@ public string EndpointName public bool IsUsedForReplies { get; set; } public IList OutgoingRules { get; } = new List(); + public IList IncomingRules { get; } = new List(); + + /// + /// In some cases, you may want to tell Wolverine that any message + /// coming into this endpoint are automatically tagged to a certain + /// tenant id + /// + public virtual string? TenantId { get; set; } + + internal IEnumerable RulesForIncoming() + { + foreach (var rule in IncomingRules) + { + yield return rule; + } + + if (MessageType != null) + { + yield return new MessageTypeRule(MessageType); + } + + if (TenantId.IsNotEmpty()) + { + yield return new TenantIdRule(TenantId); + } + } internal ISendingAgent? Agent { get; set; } @@ -361,6 +387,12 @@ public void RegisterSerializer(IMessageSerializer serializer) /// public abstract ValueTask BuildListenerAsync(IWolverineRuntime runtime, IReceiver receiver); + internal IReceiver MaybeWrapReceiver(IReceiver inner) + { + var rules = RulesForIncoming().ToArray(); + return rules.Any() ? new ReceiverWithRules(inner, rules) : inner; + } + /// /// Create new sending agent for this /// diff --git a/src/Wolverine/IEnvelopeRule.cs b/src/Wolverine/IEnvelopeRule.cs index cb09f4de3..3782db33f 100644 --- a/src/Wolverine/IEnvelopeRule.cs +++ b/src/Wolverine/IEnvelopeRule.cs @@ -1,3 +1,5 @@ +using Wolverine.Util; + namespace Wolverine; /// @@ -9,6 +11,48 @@ public interface IEnvelopeRule void Modify(Envelope envelope); } +internal class MessageTypeRule : IEnvelopeRule +{ + private readonly Type _messageType; + private readonly string _messageTypeName; + + public MessageTypeRule(Type messageType) + { + _messageType = messageType; + _messageTypeName = messageType.ToMessageTypeName(); + } + + public void Modify(Envelope envelope) + { + envelope.MessageType = _messageTypeName; + } + + public override string ToString() + { + return $"{nameof(_messageType)} is {_messageType}, with MessageTypeName: {_messageTypeName}"; + } +} + +internal class TenantIdRule : IEnvelopeRule +{ + public string TenantId { get; } + + public TenantIdRule(string tenantId) + { + TenantId = tenantId; + } + + public void Modify(Envelope envelope) + { + envelope.TenantId = TenantId; + } + + public override string ToString() + { + return $"{nameof(TenantId)}: {TenantId}"; + } +} + internal class DeliverWithinRule : IEnvelopeRule { public DeliverWithinRule(TimeSpan time) diff --git a/src/Wolverine/Transports/IReceiver.cs b/src/Wolverine/Transports/IReceiver.cs index c27d18124..cb123e755 100644 --- a/src/Wolverine/Transports/IReceiver.cs +++ b/src/Wolverine/Transports/IReceiver.cs @@ -6,4 +6,50 @@ public interface IReceiver : IDisposable ValueTask ReceivedAsync(IListener listener, Envelope envelope); ValueTask DrainAsync(); +} + +internal class ReceiverWithRules : IReceiver +{ + public ReceiverWithRules(IReceiver inner, IEnumerable rules) + { + Inner = inner; + Rules = rules.ToArray(); + } + + public IReceiver Inner { get; } + + public IEnvelopeRule[] Rules { get; } + + public void Dispose() + { + Inner.Dispose(); + } + + public ValueTask ReceivedAsync(IListener listener, Envelope[] messages) + { + foreach (var envelope in messages) + { + foreach (var rule in Rules) + { + rule.Modify(envelope); + } + } + + return Inner.ReceivedAsync(listener, messages); + } + + public ValueTask ReceivedAsync(IListener listener, Envelope envelope) + { + foreach (var rule in Rules) + { + rule.Modify(envelope); + } + + return Inner.ReceivedAsync(listener, envelope); + } + + public async ValueTask DrainAsync() + { + throw new NotImplementedException(); + } } \ No newline at end of file diff --git a/src/Wolverine/Transports/ListeningAgent.cs b/src/Wolverine/Transports/ListeningAgent.cs index 983cdef40..5065abb47 100644 --- a/src/Wolverine/Transports/ListeningAgent.cs +++ b/src/Wolverine/Transports/ListeningAgent.cs @@ -164,9 +164,8 @@ public async ValueTask StartAsync() return; } - _receiver ??= await buildReceiverAsync(); - - + _receiver ??= Endpoint.MaybeWrapReceiver(await buildReceiverAsync()); + if (Endpoint.ListenerCount > 1) { var listeners = new List(Endpoint.ListenerCount);