Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to chain delivery option helper extension methods for cascadi… #1144

Merged
merged 1 commit into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions src/Testing/CoreTests/ConfiguredMessageExtensionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,44 @@ public void delayed_for()
configured.Message.ShouldBe(inner);
}

[Fact]
public void chain_delayed_for()
{
var delay = 5.Minutes();
var inner = new Message1();

var configured = inner.WithTenantId("one")
.DelayedFor(5.Minutes());

configured.Options.ScheduleDelay.ShouldBe(delay);
configured.Message.ShouldBe(inner);
configured.Options.TenantId.ShouldBe("one");
}

[Fact]
public void chain_with_tenant_id()
{
var delay = 5.Minutes();
var inner = new Message1();

var configured = inner
.DelayedFor(delay).WithTenantId("one");

configured.Options.ScheduleDelay.ShouldBe(delay);
configured.Options.TenantId.ShouldBe("one");
}

[Fact]
public void chain_with_group_id()
{
var inner = new Message1();
var configured = inner.WithTenantId("one")
.WithGroupId("g1");

configured.Options.TenantId.ShouldBe("one");
configured.Options.GroupId.ShouldBe("g1");
}

[Fact]
public void scheduled_at()
{
Expand All @@ -30,6 +68,19 @@ public void scheduled_at()
configured.Options.ScheduledTime.ShouldBe(time);
configured.Message.ShouldBe(inner);
}

[Fact]
public void chain_scheduled_at()
{
var time = (DateTimeOffset)DateTime.Today;
var inner = new Message1();

var configured = inner.WithTenantId("one").ScheduledAt(time);

configured.Options.ScheduledTime.ShouldBe(time);
configured.Message.ShouldBe(inner);
configured.Options.TenantId.ShouldBe("one");
}

[Fact]
public void to_endpoint()
Expand Down Expand Up @@ -67,4 +118,19 @@ public async Task to_topic()

await bus.Received().BroadcastToTopicAsync("blue", inner);
}

[Fact]
public async Task chaining_to_topic()
{
var inner = new Message1();
var message = inner.WithTenantId("one").ToTopic("blue");
message.Message.ShouldBeSameAs(inner);
message.Topic.ShouldBe("blue");
message.Options.TenantId.ShouldBe("one");

var bus = Substitute.For<IMessageContext>();
await message.As<ISendMyself>().ApplyAsync(bus);

await bus.Received().BroadcastToTopicAsync("blue", inner, message.Options);
}
}
100 changes: 87 additions & 13 deletions src/Wolverine/ISendMyself.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ public static DeliveryMessage<T> WithTenantId<T>(this T message, string tenantId
return new DeliveryMessage<T>(message, new DeliveryOptions { TenantId = tenantId });
}

/// <summary>
/// Create a cascading message tagged to a specific tenant id
/// </summary>
/// <param name="message"></param>
/// <param name="tenantId"></param>
/// <returns></returns>
public static DeliveryMessage<T> WithTenantId<T>(this DeliveryMessage<T> message, string tenantId)
{
message.Options.TenantId = tenantId;
return message;
}

/// <summary>
/// Create a cascading message tagged to a specific group id
/// </summary>
Expand All @@ -49,6 +61,19 @@ public static DeliveryMessage<T> WithGroupId<T>(this T message, string groupId)
{
return new DeliveryMessage<T>(message, new DeliveryOptions { GroupId = groupId });
}

/// <summary>
/// Create a cascading message tagged to a specific group id
/// </summary>
/// <param name="message"></param>
/// <param name="groupId"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static DeliveryMessage<T> WithGroupId<T>(this DeliveryMessage<T> message, string groupId)
{
message.Options.GroupId = groupId;
return message;
}

/// <summary>
/// Create a cascading message tagged to a specific group id and scheduled for a set time
Expand Down Expand Up @@ -99,16 +124,40 @@ public static ScheduledMessage<T> ScheduledAt<T>(this T message, DateTimeOffset
{
return new ScheduledMessage<T>(message, time);
}

/// <summary>
/// Schedule the inner outgoing message to be sent at the specified time
/// </summary>
/// <param name="message"></param>
/// <param name="time"></param>
/// <returns></returns>
public static DeliveryMessage<T> ScheduledAt<T>(this DeliveryMessage<T> message, DateTimeOffset time)
{
message.Options.ScheduledTime = time;
return message;
}

/// <summary>
/// Schedule the inner outgoing message to be sent after the specified delay
/// </summary>
/// <param name="message"></param>
/// <param name="delay"></param>
/// <returns></returns>
public static DelayedMessage<T> DelayedFor<T>(this T message, TimeSpan delay)
public static DeliveryMessage<T> DelayedFor<T>(this T message, TimeSpan delay)
{
return new DelayedMessage<T>(message, delay);
return new DeliveryMessage<T>(message, new DeliveryOptions{ScheduleDelay = delay});
}

/// <summary>
/// Schedule the inner outgoing message to be sent after the specified delay
/// </summary>
/// <param name="message"></param>
/// <param name="delay"></param>
/// <returns></returns>
public static DeliveryMessage<T> DelayedFor<T>(this DeliveryMessage<T> message, TimeSpan delay)
{
message.Options.ScheduleDelay = delay;
return message;
}

/// <summary>
Expand All @@ -121,7 +170,18 @@ public static RoutedToEndpointMessage<T> ToEndpoint<T>(this T message, string en
{
return new RoutedToEndpointMessage<T>(endpointName, message, options);
}


/// <summary>
/// Send a message directly to the named endpoint as a cascading message
/// </summary>
/// <param name="message"></param>
/// <param name="endpointName"></param>
/// <returns></returns>
public static RoutedToEndpointMessage<T> ToEndpoint<T>(this DeliveryMessage<T> message, string endpointName)
{
return new RoutedToEndpointMessage<T>(endpointName, message.Message, message.Options);
}

/// <summary>
/// Send a message directly to the specific destination as a cascading message
/// </summary>
Expand All @@ -133,6 +193,17 @@ public static RoutedToEndpointMessage<T> ToDestination<T>(this T message, Uri de
return new RoutedToEndpointMessage<T>(destination, message, options);
}

/// <summary>
/// Send a message directly to the specific destination as a cascading message
/// </summary>
/// <param name="message"></param>
/// <param name="destination"></param>
/// <returns></returns>
public static RoutedToEndpointMessage<T> ToDestination<T>(this DeliveryMessage<T> message, Uri destination)
{
return new RoutedToEndpointMessage<T>(destination, message.Message, message.Options);
}

/// <summary>
/// Send a message to the supplied topic
/// </summary>
Expand All @@ -145,6 +216,19 @@ public static TopicMessage<T> ToTopic<T>(this T message, string topic, DeliveryO
{
return new TopicMessage<T>(message, topic, options);
}

/// <summary>
/// Send a message to the supplied topic
/// </summary>
/// <param name="message"></param>
/// <param name="topic">The topic name for the underlying message broker</param>
/// <param name="options">Optional delivery options</param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static TopicMessage<T> ToTopic<T>(this DeliveryMessage<T> message, string topic)
{
return new TopicMessage<T>(message.Message, topic, message.Options);
}
}

public record TopicMessage<T>(T Message, string Topic, DeliveryOptions? Options) : ISendMyself
Expand All @@ -155,16 +239,6 @@ ValueTask ISendMyself.ApplyAsync(IMessageContext context)
}
}

/// <summary>
/// Wrapper for a cascading message that has delayed delivery
/// </summary>
public class DelayedMessage<T> : DeliveryMessage<T>
{
public DelayedMessage(T message, TimeSpan delay) : base(message, new DeliveryOptions { ScheduleDelay = delay })
{
}
}

public class ScheduledMessage<T> : DeliveryMessage<T>
{
public ScheduledMessage(T message, DateTimeOffset time) : base(message,
Expand Down
Loading