Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Listening rules on endpoints to modify envelopes to tag tenant id or message type name #1143

Merged
merged 5 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions src/Testing/CoreTests/Configuration/EndpointTests.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
using NSubstitute;
using Wolverine.ComplianceTests;
using Wolverine.ComplianceTests.Compliance;
using Wolverine.Configuration;
using Wolverine.Runtime;
using Wolverine.Transports;
using Wolverine.Transports.Sending;
using Wolverine.Util;
using Xunit;

namespace CoreTests.Configuration;
Expand Down Expand Up @@ -38,6 +42,72 @@ public void should_auto_start_as_listener(bool isListener, ListenerScope scope,

endpoint.ShouldAutoStartAsListener(settings).ShouldBe(shouldStart);
}

[Fact]
public void return_message_type_rule()
{
var endpoint = new TestEndpoint(EndpointRole.System);
endpoint.RulesForIncoming().Any().ShouldBeFalse();

endpoint.MessageType = typeof(Message1);

var rules = endpoint.RulesForIncoming().ToArray();

var envelope = ObjectMother.Envelope();
foreach (var rule in rules)
{
rule.Modify(envelope);
}

envelope.MessageType.ShouldBe(typeof(Message1).ToMessageTypeName());
}

[Fact]
public void return_tenant_id_rule()
{
var endpoint = new TestEndpoint(EndpointRole.System);
endpoint.RulesForIncoming().Any().ShouldBeFalse();

endpoint.TenantId = "one";

var rules = endpoint.RulesForIncoming().ToArray();

var envelope = ObjectMother.Envelope();
foreach (var rule in rules)
{
rule.Modify(envelope);
}

envelope.TenantId.ShouldBe("one");
}

[Fact]
public void maybe_wrap_receiver_no_rules()
{
var endpoint = new TestEndpoint(EndpointRole.System);
endpoint.RulesForIncoming().Any().ShouldBeFalse();

var inner = Substitute.For<IReceiver>();

endpoint.MaybeWrapReceiver(inner)
.ShouldBeSameAs(inner);
}

[Fact]
public void maybe_wrap_receiver_with_rules()
{
var endpoint = new TestEndpoint(EndpointRole.System);
endpoint.TenantId = "one";

var inner = Substitute.For<IReceiver>();

var wrapped = endpoint.MaybeWrapReceiver(inner)
.ShouldBeOfType<ReceiverWithRules>();

wrapped.Inner.ShouldBeSameAs(inner);
wrapped.Rules.Single().ShouldBeOfType<TenantIdRule>();

}
}

public class TestEndpoint : Endpoint
Expand Down
61 changes: 61 additions & 0 deletions src/Testing/CoreTests/Transports/ReceiverWithRulesTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using NSubstitute;
using Wolverine.ComplianceTests;
using Wolverine.Transports;
using Xunit;

namespace CoreTests.Transports;

public class ReceiverWithRulesTests
{
private readonly IReceiver theInner = Substitute.For<IReceiver>();
private readonly IEnvelopeRule rule1 = Substitute.For<IEnvelopeRule>();
private readonly IEnvelopeRule rule2 = Substitute.For<IEnvelopeRule>();
private readonly ReceiverWithRules theReceiver;
private readonly IListener theListener = Substitute.For<IListener>();

public ReceiverWithRulesTests()
{
theReceiver = new ReceiverWithRules(theInner, [rule1, rule2]);
}

[Fact]
public void dispose_delegates()
{
theReceiver.Dispose();
theInner.Received().Dispose();
}

[Fact]
public async Task receive_a_single_envelope()
{
var envelope = ObjectMother.Envelope();

await theReceiver.ReceivedAsync(theListener, envelope);
rule1.Received().Modify(envelope);
rule2.Received().Modify(envelope);

await theInner.Received().ReceivedAsync(theListener, envelope);
}

[Fact]
public async Task receive_multiple_envelopes()
{
var envelope1 = ObjectMother.Envelope();
var envelope2 = ObjectMother.Envelope();
var envelope3 = ObjectMother.Envelope();

var envelopes = new Envelope[] { envelope1, envelope2, envelope3 };

await theReceiver.ReceivedAsync(theListener, envelopes);
rule1.Received().Modify(envelope1);
rule2.Received().Modify(envelope1);

rule1.Received().Modify(envelope2);
rule2.Received().Modify(envelope2);

rule1.Received().Modify(envelope3);
rule2.Received().Modify(envelope3);

await theInner.Received().ReceivedAsync(theListener, envelopes);
}
}
32 changes: 32 additions & 0 deletions src/Wolverine/Configuration/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,32 @@ public string EndpointName
public bool IsUsedForReplies { get; set; }

public IList<IEnvelopeRule> OutgoingRules { get; } = new List<IEnvelopeRule>();
public IList<IEnvelopeRule> IncomingRules { get; } = new List<IEnvelopeRule>();

/// <summary>
/// In some cases, you may want to tell Wolverine that any message
/// coming into this endpoint are automatically tagged to a certain
/// tenant id
/// </summary>
public virtual string? TenantId { get; set; }

internal IEnumerable<IEnvelopeRule> RulesForIncoming()
{
foreach (var rule in IncomingRules)
{
yield return rule;
}

if (MessageType != null)
{
yield return new MessageTypeRule(MessageType);
}

if (TenantId.IsNotEmpty())
{
yield return new TenantIdRule(TenantId);
}
}

internal ISendingAgent? Agent { get; set; }

Expand Down Expand Up @@ -361,6 +387,12 @@ public void RegisterSerializer(IMessageSerializer serializer)
/// <returns></returns>
public abstract ValueTask<IListener> BuildListenerAsync(IWolverineRuntime runtime, IReceiver receiver);

internal IReceiver MaybeWrapReceiver(IReceiver inner)
{
var rules = RulesForIncoming().ToArray();
return rules.Any() ? new ReceiverWithRules(inner, rules) : inner;
}

/// <summary>
/// Create new sending agent for this
/// </summary>
Expand Down
44 changes: 44 additions & 0 deletions src/Wolverine/IEnvelopeRule.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using Wolverine.Util;

namespace Wolverine;

/// <summary>
Expand All @@ -9,6 +11,48 @@ public interface IEnvelopeRule
void Modify(Envelope envelope);
}

internal class MessageTypeRule : IEnvelopeRule
{
private readonly Type _messageType;
private readonly string _messageTypeName;

public MessageTypeRule(Type messageType)
{
_messageType = messageType;
_messageTypeName = messageType.ToMessageTypeName();
}

public void Modify(Envelope envelope)
{
envelope.MessageType = _messageTypeName;
}

public override string ToString()
{
return $"{nameof(_messageType)} is {_messageType}, with MessageTypeName: {_messageTypeName}";
}
}

internal class TenantIdRule : IEnvelopeRule
{
public string TenantId { get; }

public TenantIdRule(string tenantId)
{
TenantId = tenantId;
}

public void Modify(Envelope envelope)
{
envelope.TenantId = TenantId;
}

public override string ToString()
{
return $"{nameof(TenantId)}: {TenantId}";
}
}

internal class DeliverWithinRule : IEnvelopeRule
{
public DeliverWithinRule(TimeSpan time)
Expand Down
46 changes: 46 additions & 0 deletions src/Wolverine/Transports/IReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,50 @@ public interface IReceiver : IDisposable
ValueTask ReceivedAsync(IListener listener, Envelope envelope);

ValueTask DrainAsync();
}

internal class ReceiverWithRules : IReceiver
{
public ReceiverWithRules(IReceiver inner, IEnumerable<IEnvelopeRule> rules)
{
Inner = inner;
Rules = rules.ToArray();
}

public IReceiver Inner { get; }

public IEnvelopeRule[] Rules { get; }

public void Dispose()
{
Inner.Dispose();
}

public ValueTask ReceivedAsync(IListener listener, Envelope[] messages)
{
foreach (var envelope in messages)
{
foreach (var rule in Rules)
{
rule.Modify(envelope);
}
}

return Inner.ReceivedAsync(listener, messages);
}

public ValueTask ReceivedAsync(IListener listener, Envelope envelope)
{
foreach (var rule in Rules)
{
rule.Modify(envelope);
}

return Inner.ReceivedAsync(listener, envelope);
}

public async ValueTask DrainAsync()
{
throw new NotImplementedException();
}
}
5 changes: 2 additions & 3 deletions src/Wolverine/Transports/ListeningAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,8 @@ public async ValueTask StartAsync()
return;
}

_receiver ??= await buildReceiverAsync();


_receiver ??= Endpoint.MaybeWrapReceiver(await buildReceiverAsync());

if (Endpoint.ListenerCount > 1)
{
var listeners = new List<IListener>(Endpoint.ListenerCount);
Expand Down
Loading