Skip to content

Commit

Permalink
Merge pull request #55 from ojdev/dev
Browse files Browse the repository at this point in the history
v6
  • Loading branch information
ojdev authored Dec 14, 2022
2 parents c604e34 + 57c31c4 commit fa89ac1
Show file tree
Hide file tree
Showing 13 changed files with 356 additions and 27 deletions.
3 changes: 3 additions & 0 deletions .github/FUNDING.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# These are supported funding model platforms

github: ojdev
8 changes: 5 additions & 3 deletions .github/workflows/dotnetcore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ on:
tags:
- v*


jobs:
build:

Expand All @@ -30,6 +29,9 @@ jobs:
- name: Build with dotnet
run: dotnet build --configuration Release src/RabbitMQ.EventBus.AspNetCore
- name: Pack
run: dotnet pack src/RabbitMQ.EventBus.AspNetCore -c Release --include-symbols --include-source -p:PackageVersion=${{steps.tag.outputs.tag}} -o artifacts/
run: dotnet pack src/RabbitMQ.EventBus.AspNetCore -c Release --include-symbols --include-source -p:PackageVersion=${{steps.tag.outputs.tag}} -o artifacts/
- name: Publish Symbols to NuGet
run: dotnet nuget push artifacts/*.symbols.nupkg --api-key ${{secrets.NUGET_API_KEY}} --source https://api.nuget.org/v3/index.json
run: dotnet nuget push artifacts/*.symbols.nupkg --api-key ${{secrets.NUGET_API_KEY}} --source https://api.nuget.org/v3/index.json
- name: Add Source Github NuGet
run: dotnet nuget add source --username ojdev --password ${{ secrets.GITHUB_TOKEN }} --store-password-in-clear-text --name github "https://nuget.pkg.github.com/OWNER/index.json"

19 changes: 12 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[![License: MIT](https://www.rabbitmq.com/img/RabbitMQ-logo.svg)](https://www.rabbitmq.com/)
[![License: MIT](https://www.rabbitmq.com/img/RabbitMQ-logo.svg)](https://github.com/ojdev/RabbitMQ.EventBus.AspNetCore/blob/dev/LICENSE)

# RabbitMQ.EventBus.AspNetCore

# RabbitMQ.EventBus.AspNetCore           [Wiki](https://github.com/ojdev/RabbitMQ.EventBus.AspNetCore/wiki)
[![CodeFactor](https://www.codefactor.io/repository/github/ojdev/rabbitmq.eventbus.aspnetcore/badge)](https://www.codefactor.io/repository/github/ojdev/rabbitmq.eventbus.aspnetcore)
[![NuGet](https://img.shields.io/nuget/v/RabbitMQ.EventBus.AspNetCore.svg?style=popout)](https://www.nuget.org/packages/RabbitMQ.EventBus.AspNetCore)
[![NuGet](https://img.shields.io/nuget/dt/RabbitMQ.EventBus.AspNetCore.svg?style=popout)](https://www.nuget.org/packages/RabbitMQ.EventBus.AspNetCore)
Expand All @@ -11,9 +12,13 @@
[![ISSUES](https://img.shields.io/github/issues-closed/ojdev/RabbitMQ.EventBus.AspNetCore.svg)]()


## Modules
### RabbitMQ.EventBus.AspNetCore.Butterfly
该包为一个基于官方RabbitMQ.Client的二次封装包,专门针对Asp.Net Core项目进行开发,在微服务中进行消息的传递使用起来比较方便。

目前功能:

- [x] 发布/订阅
- [x] 死信队列
- [x] RPC功能(实验性)


|Name|Package|NuGet|Status|Document|
|:------|:------|:-----|:-----|:-----|
|Butterfly|[RabbitMQ.EventBus.AspNetCore.Butterfly](https://github.com/ojdev/RabbitMQ.EventBus.AspNetCore.Butterfly)|[![NuGet](https://img.shields.io/nuget/v/RabbitMQ.EventBus.AspNetCore.Butterfly.svg?style=popout)](https://www.nuget.org/packages/RabbitMQ.EventBus.AspNetCore.Butterfly)|[![NuGet](https://img.shields.io/nuget/dt/RabbitMQ.EventBus.AspNetCore.Butterfly.svg?style=popout)](https://www.nuget.org/packages/RabbitMQ.EventBus.AspNetCore.Butterfly)|[使用说明](https://github.com/ojdev/RabbitMQ.EventBus.AspNetCore/wiki/RabbitMQ.EventBus.AspNetCore.Butterfly)|
# [使用说明](https://github.com/ojdev/RabbitMQ.EventBus.AspNetCore/wiki)
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using Microsoft.AspNetCore.Mvc;
using Newtonsoft.Json;
using System;
using System.Threading.Tasks;

Expand Down
4 changes: 2 additions & 2 deletions RabbitMQ.EventBus.AspNetCore.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.29409.12
# Visual Studio Version 17
VisualStudioVersion = 17.4.33205.214
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{AACDCE8A-FB3B-496A-9ADA-527265C9B334}"
EndProject
Expand Down
290 changes: 290 additions & 0 deletions WIKI.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
# [RabbitMQ.EventBus.AspNetCore](https://github.com/ojdev/RabbitMQ.EventBus.AspNetCore)

该包为一个基于官方RabbitMQ.Client的二次封装包,专门针对Asp.Net Core项目进行开发,在微服务中进行消息的传递使用起来比较方便。

目前功能:

- [x] 发布/订阅
- [x] 死信队列
- [x] RPC功能(实验性)

### 使用说明(>=6.0.0)

#### 1. 注册
~~~ csharp
public void ConfigureServices(IServiceCollection services)
{
string assemblyName = typeof(Startup).GetTypeInfo().Assembly.GetName().Name;
services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);
services.AddRabbitMQEventBus("localhost", 5672, "guest", "guest", "", eventBusOptionAction: eventBusOption =>
{
eventBusOption.ClientProvidedAssembly(assemblyName);
eventBusOption.EnableRetryOnFailure(true, 5000, TimeSpan.FromSeconds(30));
eventBusOption.RetryOnFailure(TimeSpan.FromSeconds(1));
eventBusOption.MessageTTL(2000);
eventBusOption.SetBasicQos(10);
eventBusOption.DeadLetterExchangeConfig(config =>
{
config.Enabled = false;
config.ExchangeNameSuffix = "-test";
});
});

//or
//
//services.AddRabbitMQEventBus(() => "amqp://guest:guest@localhost:5672/", eventBusOptionAction: eventBusOption =>
//{
// eventBusOption.ClientProvidedAssembly(assemblyName);
// eventBusOption.EnableRetryOnFailure(true, 5000, TimeSpan.FromSeconds(30));
// eventBusOption.RetryOnFailure(TimeSpan.FromSeconds(1));
// eventBusOption.MessageTTL(2000);
// eventBusOption.SetBasicQos(10);
// eventBusOption.DeadLetterExchangeConfig(config =>
// {
// config.Enabled = false;
// config.ExchangeNameSuffix = "-test";
// });
//});
}
~~~
#### 2. 发消息
##### 2.1 直接发送消息
~~~ csharp
[Route("api/[controller]")]
[ApiController]
public class EventBusController : ControllerBase
{
private readonly IRabbitMQEventBus _eventBus;

public EventBusController(IRabbitMQEventBus eventBus)
{
_eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
}

// GET api/values
[HttpGet]
public IActionResult Send()
{
_eventBus.Publish(new
{
Body = "发送消息",
Time = DateTimeOffset.Now
}, exchange: "RabbitMQ.EventBus.Simple", routingKey: "rabbitmq.eventbus.test");
return Ok();
}
}
~~~
##### 2.1 发送消息并等待回复
~~~ csharp
[Route("api/[controller]")]
[ApiController]
public class EventBusController : ControllerBase
{
private readonly IRabbitMQEventBus _eventBus;

public EventBusController(IRabbitMQEventBus eventBus)
{
_eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
}

// GET api/values
[HttpGet]
public async Task<ActionResult<string>> Get()
{
Console.WriteLine($"发送消息{1}");
var body = new
{
requestId = Guid.NewGuid(),
Body = $"rabbitmq.eventbus.test=>发送消息\t{1}",
Time = DateTimeOffset.Now,
};
var r = await _eventBus.PublishAsync<string>(body, exchange: "RabbitMQ.EventBus.Simple", routingKey: "rabbitmq.eventbus.test");
Console.WriteLine($"返回了{r}");
await Task.Delay(500);
return r;
}
}
~~~
#### 3. 订阅消息
##### 1. 订阅消息(无回复)
~~~ csharp
[EventBus(Exchange = "RabbitMQ.EventBus.Simple", RoutingKey = "rabbitmq.eventbus.test")]
[EventBus(Exchange = "RabbitMQ.EventBus.Simple", RoutingKey = "rabbitmq.eventbus.test1")]
public class MessageBody : IEvent
{
public string Body { get; set; }
public DateTimeOffset Time { get; set; }
}
public class MessageBodyHandle : IEventHandler<MessageBody>, IDisposable
{
private readonly ILogger<MessageBodyHandle> _logger;

public MessageBodyHandle(ILogger<MessageBodyHandle> logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public void Dispose()
{
Console.WriteLine("释放");
}

public Task Handle(EventHandlerArgs<MessageBody1> args)
{
_logger.Information(args.Original);
_logger.Information(args.Redelivered);
_logger.Information(args.Exchange);
_logger.Information(args.RoutingKey);

_logger.Information(args.Event.Body);
return Task.CompletedTask;
}
}
~~~
##### 1. 订阅消息并回复
~~~ csharp
[EventBus(Exchange = "RabbitMQ.EventBus.Simple", RoutingKey = "rabbitmq.eventbus.test")]
[EventBus(Exchange = "RabbitMQ.EventBus.Simple", RoutingKey = "rabbitmq.eventbus.test1")]
public class MessageBody : IEvent
{
public string Body { get; set; }
public DateTimeOffset Time { get; set; }
}
public class MessageBodyHandle : IEventResponseHandler<MessageBody, string>, IDisposable
{
private Guid id;
private readonly ILogger<MessageBodyHandle> _logger;

public MessageBodyHandle(ILogger<MessageBodyHandle> logger)
{
id = Guid.NewGuid();
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public void Dispose()
{
_logger.LogInformation("MessageBodyHandle Disposable.");
}


public Task<string> HandleAsync(HandlerEventArgs<MessageBody> args)
{
return Task.FromResult("收到消息,已确认" + DateTimeOffset.Now);
}
}
~~~

### 使用说明(<=5.1.1)

#### 1. 注册
~~~ csharp
public void ConfigureServices(IServiceCollection services)
{
string assemblyName = typeof(Startup).GetTypeInfo().Assembly.GetName().Name;
services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);
services.AddRabbitMQEventBus(()=>"amqp://guest:[email protected]:5672/", eventBusOptionAction: eventBusOption =>
{
eventBusOption.ClientProvidedAssembly(assemblyName);
eventBusOption.EnableRetryOnFailure(true, 5000, TimeSpan.FromSeconds(30));
eventBusOption.RetryOnFailure(TimeSpan.FromMilliseconds(100));
eventBusOption.AddLogging(LogLevel.Warning);
eventBusOption.MessageTTL(2000);
eventBusOption.DeadLetterExchangeConfig(config =>
{
config.Enabled = true;
config.ExchangeNameSuffix = "-test";
});
});
services.AddButterfly(butterfly =>
{
butterfly.CollectorUrl = "http://192.168.0.252:6401";
butterfly.Service = "RabbitMQEventBusTest";
});
}
~~~
#### 2. 订阅消息
##### 2.1 自动订阅消息
~~~ csharp
public void Configure(IApplicationBuilder app, IHostingEnvironment env, IServiceTracer tracer)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.RabbitMQEventBusAutoSubscribe();
app.UseMvc();
}
~~~
##### 2.2 手动订阅消息
~~~ csharp
public void Configure(IApplicationBuilder app, IHostingEnvironment env, IRabbitMQEventBus eventBus)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
eventBus.Serialize<EventMessage, EventMessageHandler>();
app.UseMvc();
}
~~~
#### 3. 发消息
~~~ csharp
[Route("api/[controller]")]
[ApiController]
public class EventBusController : ControllerBase
{
private readonly IRabbitMQEventBus _eventBus;

public EventBusController(IRabbitMQEventBus eventBus)
{
_eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
}

// GET api/values
[HttpGet]
public IActionResult Send()
{
_eventBus.Publish(new
{
Body = "发送消息",
Time = DateTimeOffset.Now
}, exchange: "RabbitMQ.EventBus.Simple", routingKey: "rabbitmq.eventbus.test");
return Ok();
}
}
~~~
#### 4. 订阅消息
~~~ csharp
[EventBus(Exchange = "RabbitMQ.EventBus.Simple", RoutingKey = "rabbitmq.eventbus.test")]
[EventBus(Exchange = "RabbitMQ.EventBus.Simple", RoutingKey = "rabbitmq.eventbus.test1")]
[EventBus(Exchange = "RabbitMQ.EventBus.Simple", RoutingKey = "rabbitmq.eventbus.test2")]
public class MessageBody : IEvent
{
public string Body { get; set; }
public DateTimeOffset Time { get; set; }
}
public class MessageBodyHandle : IEventHandler<MessageBody>, IDisposable
{
private readonly ILogger<MessageBodyHandle> _logger;

public MessageBodyHandle(ILogger<MessageBodyHandle> logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public void Dispose()
{
Console.WriteLine("释放");
}

public Task Handle(EventHandlerArgs<MessageBody1> args)
{
_logger.Information(args.Original);
_logger.Information(args.Redelivered);
_logger.Information(args.Exchange);
_logger.Information(args.RoutingKey);

_logger.Information(args.Event.Body);
return Task.CompletedTask;
}
}
~~~
8 changes: 4 additions & 4 deletions src/RabbitMQ.EventBus.AspNetCore/DefaultRabbitMQEventBusV2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ public void Subscribe(Type eventType, string type = "topic")
if (attribute is EventBusAttribute attr)
{
string queue = attr.Queue ?? (_persistentConnection.Configuration.Prefix == QueuePrefixType.ExchangeName
? $"{ attr.Exchange }.{ eventType.Name }"
: $"{GlaobalExchangeName}.{ eventType.Name }");
? $"{attr.Exchange}.{eventType.Name}"
: (eventType.FullName ?? $"{GlaobalExchangeName}.{eventType.Name}"));

var onlyKey = $"{attr.Exchange}_{queue}_{attr.RoutingKey}";
if (!subscribes.TryGetValue(onlyKey, out IModel channel))
Expand Down Expand Up @@ -158,8 +158,8 @@ public void Subscribe(Type eventType, Type responseType, string type = "topic")
if (attribute is EventBusAttribute attr)
{
string queue = attr.Queue ?? (_persistentConnection.Configuration.Prefix == QueuePrefixType.ExchangeName
? $"{ attr.Exchange }.{ eventType.Name }"
: $"{GlaobalExchangeName}.{ eventType.Name }");
? $"{attr.Exchange}.{eventType.Name}"
: (eventType.FullName ?? $"{GlaobalExchangeName}.{eventType.Name}"));

var onlyKey = $"{attr.Exchange}_{queue}_{attr.RoutingKey}";
_logger.LogWarning($"onlyKey => {onlyKey}");
Expand Down
Loading

0 comments on commit fa89ac1

Please sign in to comment.