Skip to content

Commit

Permalink
Batch processing working *with* external transports. Closes GH-1076
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Oct 16, 2024
1 parent 0815b75 commit 2e02283
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 1 deletion.
18 changes: 18 additions & 0 deletions src/Transports/Kafka/BatchMessaging/BatchMessaging.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<TargetFrameworks>net8.0</TargetFrameworks>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="8.0.7"/>
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.4.0"/>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Wolverine.Kafka\Wolverine.Kafka.csproj" />
</ItemGroup>

</Project>
6 changes: 6 additions & 0 deletions src/Transports/Kafka/BatchMessaging/BatchMessaging.http
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
@BatchMessaging_HostAddress = http://localhost:5089

GET {{BatchMessaging_HostAddress}}/weatherforecast/
Accept: application/json

###
52 changes: 52 additions & 0 deletions src/Transports/Kafka/BatchMessaging/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using Confluent.Kafka;
using Oakton;
using Wolverine;
using Wolverine.Kafka;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

builder.Host.UseWolverine(opts =>
{
opts.UseKafka("localhost:9092").AutoProvision();

opts.PublishAllMessages().ToKafkaTopic("topic_0");

opts.BatchMessagesOf<TestMessage>();
opts.ListenToKafkaTopic("topic_0");
});

var app = builder.Build();

if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}

app.MapPost("/test", async (IMessageBus bus) =>
{
var message = new TestMessage();
await bus.PublishAsync(message);
await bus.PublishAsync(message);
// results in:
// No known handler for TestMessage#08dced0c-3834-b4c6-54d7-e075bf020000 from kafka://topic/topic_0
})
.WithOpenApi();

return await app.RunOaktonCommands(args);

public partial class Program {}


public record TestMessage;

public class TestMessagesHandler
{
public void Handle(TestMessage[] messages)
{
Console.WriteLine("Messages received");
}
}
41 changes: 41 additions & 0 deletions src/Transports/Kafka/BatchMessaging/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"$schema": "http://json.schemastore.org/launchsettings.json",
"iisSettings": {
"windowsAuthentication": false,
"anonymousAuthentication": true,
"iisExpress": {
"applicationUrl": "http://localhost:3391",
"sslPort": 44303
}
},
"profiles": {
"http": {
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": true,
"launchUrl": "swagger",
"applicationUrl": "http://localhost:5089",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
},
"https": {
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": true,
"launchUrl": "swagger",
"applicationUrl": "https://localhost:7028;http://localhost:5089",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
},
"IIS Express": {
"commandName": "IISExpress",
"launchBrowser": true,
"launchUrl": "swagger",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
}
}
9 changes: 9 additions & 0 deletions src/Transports/Kafka/BatchMessaging/appsettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"AllowedHosts": "*"
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
<PropertyGroup>
<IsPackable>false</IsPackable>
<IsTestProject>true</IsTestProject>
<TargetFrameworks>net8.0</TargetFrameworks>
</PropertyGroup>

<ItemGroup>
Expand All @@ -21,6 +22,7 @@

<ItemGroup>
<ProjectReference Include="..\..\..\Testing\Wolverine.ComplianceTests\Wolverine.ComplianceTests.csproj" />
<ProjectReference Include="..\BatchMessaging\BatchMessaging.csproj" />
<ProjectReference Include="..\Wolverine.Kafka\Wolverine.Kafka.csproj"/>
</ItemGroup>

Expand All @@ -30,4 +32,8 @@
</Compile>
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net8.0'">
<PackageReference Include="Alba" Version="8.0.0" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using Alba;
using Oakton;
using Shouldly;
using Wolverine.Tracking;

namespace Wolverine.Kafka.Tests;

public class batch_processing_with_kafka
{
[Fact]
public async Task end_to_end()
{
OaktonEnvironment.AutoStartHost = true;

await using var host = await AlbaHost.For<Program>(_ => {});

IScenarioResult result = null!;

Func<IMessageContext, Task> execute = async _ =>
{
result = await host.Scenario(x => { x.Post.Url("/test"); });
};

var tracked = await host
.TrackActivity()
.WaitForMessageToBeReceivedAt<TestMessage[]>(host)
.ExecuteAndWaitAsync(execute);

tracked.FindSingleTrackedMessageOfType<TestMessage[]>()
.Length.ShouldBe(2);
}
}
1 change: 0 additions & 1 deletion src/Wolverine/Runtime/Handlers/HandlerGraph.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ namespace Wolverine.Runtime.Handlers;

public partial class HandlerGraph : ICodeFileCollectionWithServices, IWithFailurePolicies
{
public static readonly string Context = "context";
private readonly List<HandlerCall> _calls = new();
private readonly object _compilingLock = new();

Expand Down
3 changes: 3 additions & 0 deletions src/Wolverine/WolverineOptions.Batching.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public LocalQueueConfiguration BatchMessagesOf(Type elementType, Action<Batching
{
throw new ArgumentNullException(nameof(elementType));
}

// GH-1076
HandlerGraph.RegisterMessageType(elementType);

var options = new BatchingOptions(elementType);
var localQueue = Transports.GetOrCreate<LocalTransport>().FindQueueForMessageType(elementType);
Expand Down
7 changes: 7 additions & 0 deletions wolverine.sln
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wolverine.RavenDb", "src\Pe
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RavenDbTests", "src\Persistence\RavenDbTests\RavenDbTests.csproj", "{71B152DD-7A0B-4935-B8B1-1060E674D23D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BatchMessaging", "src\Transports\Kafka\BatchMessaging\BatchMessaging.csproj", "{B035801D-E786-4AAA-858A-0770D88116D6}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -610,6 +612,10 @@ Global
{71B152DD-7A0B-4935-B8B1-1060E674D23D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{71B152DD-7A0B-4935-B8B1-1060E674D23D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{71B152DD-7A0B-4935-B8B1-1060E674D23D}.Release|Any CPU.Build.0 = Release|Any CPU
{B035801D-E786-4AAA-858A-0770D88116D6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B035801D-E786-4AAA-858A-0770D88116D6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B035801D-E786-4AAA-858A-0770D88116D6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B035801D-E786-4AAA-858A-0770D88116D6}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{24497E6A-D6B1-4C80-ABFB-57FFAD5070C4} = {96119B5E-B5F0-400A-9580-B342EBE26212}
Expand Down Expand Up @@ -717,5 +723,6 @@ Global
{1A34A78B-F6AB-41A9-8B90-384C6E1DBC63} = {7A9E0EAE-9ABF-40F6-9DB9-8FB1243F4210}
{AAFFC067-D110-45FF-9FA0-8E02F77D9D14} = {7A9E0EAE-9ABF-40F6-9DB9-8FB1243F4210}
{71B152DD-7A0B-4935-B8B1-1060E674D23D} = {7A9E0EAE-9ABF-40F6-9DB9-8FB1243F4210}
{B035801D-E786-4AAA-858A-0770D88116D6} = {63E9B289-95E8-4F2B-A064-156971A6853C}
EndGlobalSection
EndGlobal

0 comments on commit 2e02283

Please sign in to comment.