Skip to content

Commit

Permalink
Docs on SQS interop
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Sep 18, 2023
1 parent b1d3462 commit 09b55a8
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 5 deletions.
26 changes: 24 additions & 2 deletions docs/guide/messaging/transports/sqs/interoperability.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,26 @@
# Interoperability

The Amazon SQS transport is not yet supporting configurable interoperability with non-Wolverine applications, but you
can track [the backlog issue for that work here](https://github.com/JasperFx/wolverine/issues/402).
Hey, it's a complicated world and Wolverine is a relative newcomer, so it's somewhat likely you'll find yourself needing to make a Wolverine application talk via AWS SQS to
a non-Wolverine application. Not to worry (too much), Wolverine has you covered with the ability to customize Wolverine to Amazon SQS mapping.

## Receive Raw JSON

If you need to receive raw JSON from an upstream system *and* you can expect only one message type for the current
queue, you can do that with this option:

snippet: sample_receive_raw_json_in_sqs

Likewise, to send raw JSON to external systems, you have this option:

snippet: sample_publish_raw_json_in_sqs

## Advanced Interoperability

For any kind of advanced interoperability between Wolverine and any other kind of application communicating with your
Wolverine application using SQS, you can build custom implementations of the `ISqsEnvelopeMapper` like this one:

snippet: sample_custom_sqs_mapper

And apply this to any or all of your SQS endpoints with the configuration fluent interface as shown in this sample:

snippet: sample_apply_custom_sqs_mapping
103 changes: 102 additions & 1 deletion src/Transports/AWS/Wolverine.AmazonSqs.Tests/Samples/Bootstrapping.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
using System.Text;
using System.Text.Json;
using Amazon.Runtime;
using Amazon.SQS;
using Amazon.SQS.Model;
using JasperFx.Core;
using Microsoft.Extensions.Hosting;
using TestMessages;
Expand Down Expand Up @@ -197,4 +200,102 @@ public async Task overriding_dead_letter_queueing()

#endregion
}
}

public async Task receive_raw_json()
{
#region sample_receive_raw_json_in_sqs

using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseAmazonSqsTransport();

opts.ListenToSqsQueue("incoming").ReceiveRawJsonMessage(
// Specify the single message type for this queue
typeof(Message1),

// Optionally customize System.Text.Json configuration
o =>
{
o.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
});
}).StartAsync();

#endregion
}

public async Task publish_raw_json()
{
#region sample_publish_raw_json_in_sqs

using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseAmazonSqsTransport();

opts.PublishAllMessages().ToSqsQueue("outgoing").SendRawJsonMessage(
// Specify the single message type for this queue
typeof(Message1),

// Optionally customize System.Text.Json configuration
o =>
{
o.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
});
}).StartAsync();

#endregion
}

[Fact]
public async Task customize_mappers()
{
#region sample_apply_custom_sqs_mapping

using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseAmazonSqsTransport()
.UseConventionalRouting()

.ConfigureListeners(l => l.InteropWith(new CustomSqsMapper()))

.ConfigureSenders(s => s.InteropWith(new CustomSqsMapper()));

}).StartAsync();

#endregion
}
}

#region sample_custom_sqs_mapper

public class CustomSqsMapper : ISqsEnvelopeMapper
{
public string BuildMessageBody(Envelope envelope)
{
// Serialized data from the Wolverine message
return Encoding.Default.GetString(envelope.Data);
}

// Specify header values for the SQS message from the Wolverine envelope
public IEnumerable<KeyValuePair<string, MessageAttributeValue>> ToAttributes(Envelope envelope)
{
if (envelope.TenantId.IsNotEmpty())
{
yield return new KeyValuePair<string, MessageAttributeValue>("tenant-id", new MessageAttributeValue{StringValue = envelope.TenantId});
}
}

public void ReadEnvelopeData(Envelope envelope, string messageBody, IDictionary<string, MessageAttributeValue> attributes)
{
envelope.Data = Encoding.Default.GetBytes(messageBody);

if (attributes.TryGetValue("tenant-id", out var att))
{
envelope.TenantId = att.StringValue;
}
}
}

#endregion
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public async Task InitializeAsync()
.UseAmazonSqsTransportLocally()
.ConfigureListeners(listeners =>
{
listeners.ReceiveNativeJsonMessage(typeof(MyNativeJsonMessage));
listeners.ReceiveRawJsonMessage(typeof(MyNativeJsonMessage));
})
.AutoProvision()
.AutoPurgeOnStartup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,14 @@ public AmazonSqsListenerConfiguration ConfigureQueueCreation(Action<CreateQueueR
return this;
}

public AmazonSqsListenerConfiguration ReceiveNativeJsonMessage(
/// <summary>
/// Configure this listener to receive raw JSON of an expected message type from
/// an external system
/// </summary>
/// <param name="messageType"></param>
/// <param name="configure"></param>
/// <returns></returns>
public AmazonSqsListenerConfiguration ReceiveRawJsonMessage(
Type messageType,
Action<JsonSerializerOptions>? configure = null)
{
Expand All @@ -91,4 +98,15 @@ public AmazonSqsListenerConfiguration ReceiveNativeJsonMessage(

return this;
}

/// <summary>
/// Utilize custom envelope mapping for SQS interoperability with external non-Wolverine systems
/// </summary>
/// <param name="mapper"></param>
/// <returns></returns>
public AmazonSqsListenerConfiguration InteropWith(ISqsEnvelopeMapper mapper)
{
add(e => e.Mapper = mapper);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Text.Json;
using Amazon.SQS.Model;
using Wolverine.AmazonSqs.Internal;
using Wolverine.Configuration;
Expand All @@ -21,4 +22,29 @@ public AmazonSqsSubscriberConfiguration ConfigureQueueCreation(Action<CreateQueu
add(e => configure(e.Configuration));
return this;
}

/// Opt to send messages as raw JSON without any Wolverine metadata
/// </summary>
/// <param name="defaultMessageType">Optional. If both sending and receiving from this queue, you will want to specify a default message type</param>
/// <param name="configure">Optional configuration of System.Text.Json for this endpoint</param>
/// <returns></returns>
public AmazonSqsSubscriberConfiguration SendRawJsonMessage(Type? defaultMessageType = null, Action<JsonSerializerOptions>? configure = null)
{
var options = new JsonSerializerOptions();
configure?.Invoke(options);
add(e => e.Mapper = new RawJsonSqsEnvelopeMapper(defaultMessageType ?? typeof(object), options));

return this;
}

/// <summary>
/// Utilize custom envelope mapping for SQS interoperability with external non-Wolverine systems
/// </summary>
/// <param name="mapper"></param>
/// <returns></returns>
public AmazonSqsSubscriberConfiguration InteropWith(ISqsEnvelopeMapper mapper)
{
add(e => e.Mapper = mapper);
return this;
}
}

0 comments on commit 09b55a8

Please sign in to comment.