YMJake.Aspire.Hosting.RocketMQ
1.0.2
dotnet add package YMJake.Aspire.Hosting.RocketMQ --version 1.0.2
NuGet\Install-Package YMJake.Aspire.Hosting.RocketMQ -Version 1.0.2
<PackageReference Include="YMJake.Aspire.Hosting.RocketMQ" Version="1.0.2" />
<PackageVersion Include="YMJake.Aspire.Hosting.RocketMQ" Version="1.0.2" />
<PackageReference Include="YMJake.Aspire.Hosting.RocketMQ" />
paket add YMJake.Aspire.Hosting.RocketMQ --version 1.0.2
#r "nuget: YMJake.Aspire.Hosting.RocketMQ, 1.0.2"
#:package YMJake.Aspire.Hosting.RocketMQ@1.0.2
#addin nuget:?package=YMJake.Aspire.Hosting.RocketMQ&version=1.0.2
#tool nuget:?package=YMJake.Aspire.Hosting.RocketMQ&version=1.0.2
Aspire RocketMQ libraries
Aspire helpers for RocketMQ hosting and client wiring.
Packages
YMJake.Aspire.Hosting.RocketMQ: AppHost extensions for running a RocketMQ NameServer-led cluster, broker, and optional dashboard in .NET Aspire.YMJake.Aspire.Apache.RocketMQ: app-side helpers for registering RocketMQ producers and consumers from Aspire connection strings.YMJake.AspNetCore.HealthChecks.RocketMQ: ASP.NET Core health checks for probing RocketMQ connectivity from application code.
Hosting
The hosting package is designed around the RocketMQ NameServer-led deployment path:
Note:
AddRocketMQNameServer("rocketmq-nameserver")usesrocketmq-nameserveras the root Aspire resource name and connection-string key. That root resource represents the RocketMQ NameServer-led cluster entry point, with broker and proxy wired underneath it.
Why this shape? RocketMQ clients first resolve routes through NameServer, brokers register themselves there, and proxy depends on the broker/name server pair. Making NameServer the root resource keeps the Aspire graph aligned with RocketMQ's startup and routing model.
using Aspire.Hosting.RocketMQ;
using Projects;
var builder = DistributedApplication.CreateBuilder(args);
var brokerConfigPath = Path.Combine("..", "rocketmq", "conf", "broker.conf");
var proxy = builder.AddRocketMQProxy(brokerConfigPath);
var rocketmq = builder.AddRocketMQNameServer("rocketmq-nameserver")
.WithProxy(proxy)
.WithDataBindMount("../.containers/rocketmq")
.WithJvmArgs(
nameServerArgs: "-Xms256m -Xmx512m",
brokerArgs: "-Xms256m -Xmx512m");
builder.AddProject<Aspire_RocketMQ_AspNet_Demo>("api")
.WaitFor(proxy)
.WithReference(rocketmq);
builder.Build().Run();
WithProxy(...) is intentionally the only proxy wiring API. We do not expose a separate WithBrokerConfig(...) entry point because RocketMQ local mode depends on a single broker broadcast address and fixed networking assumptions that do not fit Aspire's dynamic port model.
Client
The client package now uses a simple, test-friendly flow:
using Aspire.RocketMQ.Client;
using RocketMQ.Client.Core;
using System.Text;
using System.Collections.Generic;
var builder = WebApplication.CreateBuilder(args);
builder.AddRocketMQProducer(
"rocketmq",
settings =>
{
settings.EnableSsl = false;
settings.MaxStartupAttempts = 10;
},
producer => producer.SetTopics("AspireRocketMQSmoke"));
builder.AddRocketMQConsumer(
"rocketmq",
settings =>
{
settings.EnableSsl = false;
settings.MaxStartupAttempts = 10;
},
consumer => consumer
.SetConsumerGroup("aspire-consumer-group")
.SetSubscriptionExpression(new Dictionary<string, FilterExpression>
{
["AspireRocketMQSmoke"] = new FilterExpression("*")
})
.SetAwaitDuration(TimeSpan.FromSeconds(3)));
var app = builder.Build();
app.MapGet("/rocketmq", (RocketMQProducerSettings rocketmq) => new
{
rocketmq.Endpoints,
rocketmq.RequestTimeout,
rocketmq.EnableSsl,
rocketmq.Namespace,
rocketmq.MaxStartupAttempts
});
app.MapPost("/rocketmq/send", async (Producer producer) =>
{
var message = new Message.Builder()
.SetTopic("AspireRocketMQSmoke")
.SetKeys(Guid.NewGuid().ToString("N"))
.SetBody(Encoding.UTF8.GetBytes("hello from aspire"))
.Build();
var receipt = await producer.Send(message);
return Results.Ok(new { receipt.MessageId, message.Topic });
});
app.MapPost("/rocketmq/consume", async (SimpleConsumer consumer, int? maxMessageNum, int? invisibleDurationMs) =>
{
var safeInvisibleDurationMs = Math.Max(invisibleDurationMs.GetValueOrDefault(10_000), 10_000);
var messages = await consumer.Receive(
maxMessageNum.GetValueOrDefault(1),
TimeSpan.FromMilliseconds(safeInvisibleDurationMs));
return Results.Ok(messages.Select(message => new
{
message.MessageId,
message.Topic,
message.Keys,
Body = Encoding.UTF8.GetString(message.Body)
}));
});
app.Run();
Client registrations
AddRocketMQProducer(...)AddRocketMQConsumer(...)AddRocketMQPushConsumer(...)AddRocketMQLitePushConsumer(...)
Keyed overloads are available for multi-connection scenarios.
Configuration
ConnectionStrings:{name}Aspire:Apache:RocketMQ:Producer:{name}:RequestTimeoutAspire:Apache:RocketMQ:Producer:{name}:EnableSslAspire:Apache:RocketMQ:Producer:{name}:DisableHealthChecksAspire:Apache:RocketMQ:Producer:{name}:DisableTracingAspire:Apache:RocketMQ:Producer:{name}:DisableMetricsAspire:Apache:RocketMQ:Producer:{name}:DisableLoggingAspire:Apache:RocketMQ:Producer:{name}:NamespaceAspire:Apache:RocketMQ:Producer:{name}:MaxStartupAttemptsAspire:Apache:RocketMQ:Consumer:{name}:RequestTimeoutAspire:Apache:RocketMQ:Consumer:{name}:EnableSslAspire:Apache:RocketMQ:Consumer:{name}:DisableHealthChecksAspire:Apache:RocketMQ:Consumer:{name}:DisableTracingAspire:Apache:RocketMQ:Consumer:{name}:DisableMetricsAspire:Apache:RocketMQ:Consumer:{name}:DisableLoggingAspire:Apache:RocketMQ:Consumer:{name}:NamespaceAspire:Apache:RocketMQ:Consumer:{name}:MaxStartupAttempts
Demo flow
POST /rocketmq/sendpublishes a message.POST /rocketmq/consume?maxMessageNum=1pulls messages.invisibleDurationMsmust be at least10000.
Notes
- The underlying RocketMQ client fetches topic routes during startup, so
MaxStartupAttemptsis surfaced as a first-class setting. - The Aspire client package automatically wires RocketMQ health checks, tracing, and metrics unless disabled in settings. The underlying health check and OpenTelemetry packages are still used internally.
Deployment plan
- The current hosting health checks use
127.0.0.1because they are designed for local Aspire/AppHost development. - For Docker or Kubernetes deployments, the broker and proxy probe hosts should be made configurable so they can target service DNS names or in-cluster addresses instead of loopback.
- The next release should keep the local defaults but expose an override path for non-local deployments.
Health checks
The health check package is application-facing and probes RocketMQ by starting a short-lived client instance. By default it only verifies connectivity and route fetch, not consumer subscription state. It accepts a direct ConnectionString or a ConnectionStringFactory, so it works both with and without Aspire. It is separate from hosting readiness checks:
YMJake.Aspire.Hosting.RocketMQcovers container readiness and startup ordering.YMJake.AspNetCore.HealthChecks.RocketMQcovers app-level RocketMQ dependency checks.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Aspire.Hosting.AppHost (>= 13.2.1)
- Microsoft.Extensions.Diagnostics.HealthChecks (>= 10.0.5)
- Microsoft.Extensions.Http.Resilience (>= 10.4.0)
- YMJake.RocketMQ.Client (>= 5.3.8)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.