Skip to content

Commit

Permalink
Ability to chain delivery option helper extension methods for cascadi…
Browse files Browse the repository at this point in the history
…ng. Closes GH-1142
  • Loading branch information
jeremydmiller committed Nov 21, 2024
1 parent 65ca00c commit dce6b58
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 13 deletions.
66 changes: 66 additions & 0 deletions src/Testing/CoreTests/ConfiguredMessageExtensionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,44 @@ public void delayed_for()
configured.Message.ShouldBe(inner);
}

[Fact]
public void chain_delayed_for()
{
var delay = 5.Minutes();
var inner = new Message1();

var configured = inner.WithTenantId("one")
.DelayedFor(5.Minutes());

configured.Options.ScheduleDelay.ShouldBe(delay);
configured.Message.ShouldBe(inner);
configured.Options.TenantId.ShouldBe("one");
}

[Fact]
public void chain_with_tenant_id()
{
var delay = 5.Minutes();
var inner = new Message1();

var configured = inner
.DelayedFor(delay).WithTenantId("one");

configured.Options.ScheduleDelay.ShouldBe(delay);
configured.Options.TenantId.ShouldBe("one");
}

[Fact]
public void chain_with_group_id()
{
var inner = new Message1();
var configured = inner.WithTenantId("one")
.WithGroupId("g1");

configured.Options.TenantId.ShouldBe("one");
configured.Options.GroupId.ShouldBe("g1");
}

[Fact]
public void scheduled_at()
{
Expand All @@ -30,6 +68,19 @@ public void scheduled_at()
configured.Options.ScheduledTime.ShouldBe(time);
configured.Message.ShouldBe(inner);
}

[Fact]
public void chain_scheduled_at()
{
var time = (DateTimeOffset)DateTime.Today;
var inner = new Message1();

var configured = inner.WithTenantId("one").ScheduledAt(time);

configured.Options.ScheduledTime.ShouldBe(time);
configured.Message.ShouldBe(inner);
configured.Options.TenantId.ShouldBe("one");
}

[Fact]
public void to_endpoint()
Expand Down Expand Up @@ -67,4 +118,19 @@ public async Task to_topic()

await bus.Received().BroadcastToTopicAsync("blue", inner);
}

[Fact]
public async Task chaining_to_topic()
{
var inner = new Message1();
var message = inner.WithTenantId("one").ToTopic("blue");
message.Message.ShouldBeSameAs(inner);
message.Topic.ShouldBe("blue");
message.Options.TenantId.ShouldBe("one");

var bus = Substitute.For<IMessageContext>();
await message.As<ISendMyself>().ApplyAsync(bus);

await bus.Received().BroadcastToTopicAsync("blue", inner, message.Options);
}
}
100 changes: 87 additions & 13 deletions src/Wolverine/ISendMyself.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ public static DeliveryMessage<T> WithTenantId<T>(this T message, string tenantId
return new DeliveryMessage<T>(message, new DeliveryOptions { TenantId = tenantId });
}

/// <summary>
/// Create a cascading message tagged to a specific tenant id
/// </summary>
/// <param name="message"></param>
/// <param name="tenantId"></param>
/// <returns></returns>
public static DeliveryMessage<T> WithTenantId<T>(this DeliveryMessage<T> message, string tenantId)
{
message.Options.TenantId = tenantId;
return message;
}

/// <summary>
/// Create a cascading message tagged to a specific group id
/// </summary>
Expand All @@ -49,6 +61,19 @@ public static DeliveryMessage<T> WithGroupId<T>(this T message, string groupId)
{
return new DeliveryMessage<T>(message, new DeliveryOptions { GroupId = groupId });
}

/// <summary>
/// Create a cascading message tagged to a specific group id
/// </summary>
/// <param name="message"></param>
/// <param name="groupId"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static DeliveryMessage<T> WithGroupId<T>(this DeliveryMessage<T> message, string groupId)
{
message.Options.GroupId = groupId;
return message;
}

/// <summary>
/// Create a cascading message tagged to a specific group id and scheduled for a set time
Expand Down Expand Up @@ -99,16 +124,40 @@ public static ScheduledMessage<T> ScheduledAt<T>(this T message, DateTimeOffset
{
return new ScheduledMessage<T>(message, time);
}

/// <summary>
/// Schedule the inner outgoing message to be sent at the specified time
/// </summary>
/// <param name="message"></param>
/// <param name="time"></param>
/// <returns></returns>
public static DeliveryMessage<T> ScheduledAt<T>(this DeliveryMessage<T> message, DateTimeOffset time)
{
message.Options.ScheduledTime = time;
return message;
}

/// <summary>
/// Schedule the inner outgoing message to be sent after the specified delay
/// </summary>
/// <param name="message"></param>
/// <param name="delay"></param>
/// <returns></returns>
public static DelayedMessage<T> DelayedFor<T>(this T message, TimeSpan delay)
public static DeliveryMessage<T> DelayedFor<T>(this T message, TimeSpan delay)
{
return new DelayedMessage<T>(message, delay);
return new DeliveryMessage<T>(message, new DeliveryOptions{ScheduleDelay = delay});
}

/// <summary>
/// Schedule the inner outgoing message to be sent after the specified delay
/// </summary>
/// <param name="message"></param>
/// <param name="delay"></param>
/// <returns></returns>
public static DeliveryMessage<T> DelayedFor<T>(this DeliveryMessage<T> message, TimeSpan delay)
{
message.Options.ScheduleDelay = delay;
return message;
}

/// <summary>
Expand All @@ -121,7 +170,18 @@ public static RoutedToEndpointMessage<T> ToEndpoint<T>(this T message, string en
{
return new RoutedToEndpointMessage<T>(endpointName, message, options);
}


/// <summary>
/// Send a message directly to the named endpoint as a cascading message
/// </summary>
/// <param name="message"></param>
/// <param name="endpointName"></param>
/// <returns></returns>
public static RoutedToEndpointMessage<T> ToEndpoint<T>(this DeliveryMessage<T> message, string endpointName)
{
return new RoutedToEndpointMessage<T>(endpointName, message.Message, message.Options);
}

/// <summary>
/// Send a message directly to the specific destination as a cascading message
/// </summary>
Expand All @@ -133,6 +193,17 @@ public static RoutedToEndpointMessage<T> ToDestination<T>(this T message, Uri de
return new RoutedToEndpointMessage<T>(destination, message, options);
}

/// <summary>
/// Send a message directly to the specific destination as a cascading message
/// </summary>
/// <param name="message"></param>
/// <param name="destination"></param>
/// <returns></returns>
public static RoutedToEndpointMessage<T> ToDestination<T>(this DeliveryMessage<T> message, Uri destination)
{
return new RoutedToEndpointMessage<T>(destination, message.Message, message.Options);
}

/// <summary>
/// Send a message to the supplied topic
/// </summary>
Expand All @@ -145,6 +216,19 @@ public static TopicMessage<T> ToTopic<T>(this T message, string topic, DeliveryO
{
return new TopicMessage<T>(message, topic, options);
}

/// <summary>
/// Send a message to the supplied topic
/// </summary>
/// <param name="message"></param>
/// <param name="topic">The topic name for the underlying message broker</param>
/// <param name="options">Optional delivery options</param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static TopicMessage<T> ToTopic<T>(this DeliveryMessage<T> message, string topic)
{
return new TopicMessage<T>(message.Message, topic, message.Options);
}
}

public record TopicMessage<T>(T Message, string Topic, DeliveryOptions? Options) : ISendMyself
Expand All @@ -155,16 +239,6 @@ ValueTask ISendMyself.ApplyAsync(IMessageContext context)
}
}

/// <summary>
/// Wrapper for a cascading message that has delayed delivery
/// </summary>
public class DelayedMessage<T> : DeliveryMessage<T>
{
public DelayedMessage(T message, TimeSpan delay) : base(message, new DeliveryOptions { ScheduleDelay = delay })
{
}
}

public class ScheduledMessage<T> : DeliveryMessage<T>
{
public ScheduledMessage(T message, DateTimeOffset time) : base(message,
Expand Down

0 comments on commit dce6b58

Please sign in to comment.