Part6 – (19) RabbitMQ简介
RabbitMQ的基本概念
1、集成事件是服务器间的通信,所以必须借助于第三方服务器作为事件总线。常用的消息中间件有Redis、RabbitMQ、Kafka、ActiveMQ等。
2、RabbitMQ的基本概念:
1)信道(Channel):信道是消息的生产者、消费者和服务器进行通信的虚拟连接。TCP连接的建立是非常消耗资源的,所以RabbitMQ在TCP连接的基础上构建了虚拟的信道。我们尽量重复使用TCP连接,而信道则是可以用完了就关闭。
2)队列(Queue):用来进行消息收发的地方,生产者把消息放到队列中,消费者从队列中获取数据。
3)交换机(exchange):把消息路由到一个或者多个队列中。
RabbitMQ的routing模式
生产者把消息发布到交换机中,消息携带一个routingKey属性,交换机会根据routingKey的值把消息发送到一个或者多个队列;消费者会从队列中获取消息;交换机和队列都位于RabbitMQ服务器内部。优点:即使消费者不在线,消费者相关的消息也会被保存到队列中,当消费者上线之后,消费者就可以获取到离线期间错过的消息。
Part6 – (20) .NET中RabbitMQ的基本使用
基本用法
1、安装RabbitMQ服务器。
2、分别创建发送消息的项目和接收消息的控制台项目,这两个项目都安装NuGet包RabbitMQ.Client。
var factory = new ConnectionFactory();
factory.HostName = "127.0.0.1";//RabbitMQ服务器地址
factory.DispatchConsumersAsync = true;
string exchangeName = "exchange1";//交换机的名字
string eventName = "myEvent";// routingKey的值
using var conn = factory.CreateConnection();
while(true)
{
string msg = DateTime.Now.TimeOfDay.ToString();//待发送消息
using (var channel = conn.CreateModel())//创建信道
{
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2;
channel.ExchangeDeclare(exchange: exchangeName, type: "direct");//声明交换机
byte[] body = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange: exchangeName,routingKey: eventName, mandatory: true,basicProperties: properties,body: body);//发布消息
}
Console.WriteLine("发布了消息:" + msg);
Thread.Sleep(1000);
}
var factory = new ConnectionFactory();
factory.HostName = "127.0.0.1";
factory.DispatchConsumersAsync = true;
string exchangeName = "exchange1";
string eventName = "var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
channel.BasicConsume(queue: queueName, autoAck: false,consumer: consumer);
Console.ReadLine();
async Task Consumer_Received(object sender, BasicDeliverEventArgs args)
{
try
{
var bytes = args.Body.ToArray();
string msg = Encoding.UTF8.GetString(bytes);
Console.WriteLine(DateTime.Now + "收到了消息" + msg);
channel.BasicAck(args.DeliveryTag, multiple: false);
await Task.Delay(800);
}
catch (Exception ex)
{
channel.BasicReject(args.DeliveryTag, true);//失败重发
Console.WriteLine("处理收到的消息出错"+ex);
}
}";
using var conn = factory.CreateConnection();
using var channel = conn.CreateModel();
string queueName = "queue1";
channel.ExchangeDeclare(exchange: exchangeName,type: "direct");
channel.QueueDeclare(queue: queueName,durable: true, exclusive: false,autoDelete: false,arguments: null);
channel.QueueBind(queue: queueName, exchange: exchangeName,routingKey: eventName);//将routingKey绑定到Queue
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
channel.BasicConsume(queue: queueName, autoAck: false,consumer: consumer);
Console.ReadLine();
async Task Consumer_Received(object sender, BasicDeliverEventArgs args)
{
try
{
var bytes = args.Body.ToArray();
string msg = Encoding.UTF8.GetString(bytes);
Console.WriteLine(DateTime.Now + "收到了消息" + msg);
//Delivery Tag就是消息的编号
channel.BasicAck(args.DeliveryTag, multiple: false);
await Task.Delay(800);
}
catch (Exception ex)
{
channel.BasicReject(args.DeliveryTag, true);//失败重发
Console.WriteLine("处理收到的消息出错"+ex);
}
}
Part6 – (21) .NET中简化集成事件的框架
Zack.EventBus使用
1、每次都使用RabbitMQ原始代码太麻烦。参考并改进了微软开源的eShopOnContainers,开发了简化领域事件编程的开发包Zack.EventBus,并且简化了以后迁移到其他MQ服务器的工作量。
2、使用步骤:
1)创建两个ASP.NET Core Web API项目,它们分别是发布集成事件的项目和消费集成事件的项目,然后我们为这两个项目都安装NuGet包Zack.EventBus。
2)在两个项目中的Program.cs文件中的builder.Build()上面增加对IntegrationEventRabbitMQOptions进行配置的代码以及对AddEventBus的调用,然后还要在builder.Build()下面调用
3)在需要发布领域事件的类中注入IEventBus服务,然后调用IEventBus的Publish方法发布消息。
4)创造一个实现了IIntegrationEventHandler接口的类,这个类用来处理收到的事件。通过[EventName(“UserAdded”)]设定类监听的事件。
3、JsonIntegrationEventHandler和DynamicIntegrationEventHandler。
4、RabbitMQ等消息中间件的消息发布和消费的过程是异步的,也就是消息发布者将消息放入消息中间件就返回了,并不会等待消息的消费过程,因此集成事件不仅能够降低微服务之间的耦合度,也还能够起到削峰填谷的作用,避免一个微服务中的突发请求导致其他微服务雪崩的情况出现,而且消息中间件的失败重发机制可以提高消息处理的成功率,从而保证事务的最终一致性。
5、最终一致性的事务:需要开发人员对流程进行精细的设计,甚至有时候需要引入人工补偿操作。不像强一致性事务那样是纯技术方案。
6、其他类似开源项目:CAP
using System.Reflection;
using Zack.EventBus;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
var eventBusSec = builder.Configuration.GetSection("EventBus");
builder.Services.Configure<IntegrationEventRabbitMQOptions>(eventBusSec);
builder.Services.AddEventBus("EventBusDemo1_Q1", Assembly.GetExecutingAssembly());
var app = builder.Build();
app.UseEventBus();
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Zack.EventBus;
namespace 发送集成事件.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class DemoController : ControllerBase
{
private IEventBus eventBus;
public DemoController(IEventBus eventBus)
{
this.eventBus = eventBus;
}
[HttpPost]
public string Publish()
{
eventBus.Publish("UserAdded", new { UserName = "zcq", Age = 18 });
return "ok";
}
}
}
using System.Reflection;
using Zack.EventBus;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
var eventBusSec = builder.Configuration.GetSection("EventBus");
builder.Services.Configure<IntegrationEventRabbitMQOptions>(eventBusSec);
builder.Services.AddEventBus("EventBusDemo1_Q2", Assembly.GetExecutingAssembly());
var app = builder.Build();
app.UseEventBus();
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();
using Zack.EventBus;
[EventName("UserAdded")]
public class UserAddesEventHandler : IIntegrationEventHandler
{
private readonly ILogger<UserAddesEventHandler> logger;
public UserAddesEventHandler(ILogger<UserAddesEventHandler> logger)
{
this.logger = logger;
}
public Task Handle(string eventName, string eventData)
{
logger.LogInformation("新建了用户:" + eventData);
return Task.CompletedTask;
}
}
using Zack.EventBus;
[EventName("UserAdded")]
public class UserAddesEventHandler2 : DynamicIntegrationEventHandler
{
private readonly ILogger<UserAddesEventHandler2> logger;
public UserAddesEventHandler2(ILogger<UserAddesEventHandler2> logger)
{
this.logger = logger;
}
public override Task HandleDynamic(string eventName, dynamic eventData)
{
logger.LogInformation($"Dynamic:{eventData.UserName}");
return Task.CompletedTask;
}
}
using Zack.EventBus;
[EventName("UserAdded")]
public class UserAddesEventHandler3 : JsonIntegrationEventHandler<UserData>
{
private readonly ILogger<UserAddesEventHandler3> logger;
public UserAddesEventHandler3(ILogger<UserAddesEventHandler3> logger)
{
this.logger = logger;
}
public override Task HandleJson(string eventName, UserData eventData)
{
logger.LogInformation($"Json:{eventData.UserName}");
return Task.CompletedTask;
}
}
public record UserData(string UserName, int Age);
Part6 – (22) Zack.EventBus源代码讲解
源代码结构
1、YouZack-Vnext/Zack.EventBus
2、RabbitMQConnection类提供的是RabbitMQ连接的失败重连机制
3、SubscriptionsManager类提供的是事件处理的注册和事件的分发机制,从而使得同样一个领域事件可以被微服务内多个事件处理者收到,SubscriptionsManager使用Dictionary来记录注册的事件处理者,其中的AddSubscription(string eventName, Type eventHandlerType)方法用来供把eventHandlerType指定的事件处理类注册为eventName事件的处理类
4、ServicesCollectionExtensions类中的AddEventBus方法用来把集成事件处理类注册到SubscriptionsManager中,它会扫描指定程序集中所有实现了IIntegrationEventHandler接口的类,然后读取类上标注的所有[EventName],把指定监听的事件注册到SubscriptionsManager中;
5、RabbitMQEventBus类用来进行事件的注册和分发
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
namespace Zack.EventBus
{
class RabbitMQConnection
{
private readonly IConnectionFactory _connectionFactory;
private IConnection _connection;
private bool _disposed;
private readonly object sync_root = new object();
public RabbitMQConnection(IConnectionFactory connectionFactory)
{
_connectionFactory = connectionFactory;
}
public bool IsConnected
{
get
{
return _connection != null && _connection.IsOpen && !_disposed;
}
}
public IModel CreateModel()
{
if (!IsConnected)
{
throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");
}
return _connection.CreateModel();
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
_connection.Dispose();
}
public bool TryConnect()
{
lock (sync_root)
{
_connection = _connectionFactory.CreateConnection();
if (IsConnected)
{
_connection.ConnectionShutdown += OnConnectionShutdown;
_connection.CallbackException += OnCallbackException;
_connection.ConnectionBlocked += OnConnectionBlocked;
return true;
}
else
{
return false;
}
}
}
private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
{
if (_disposed) return;
TryConnect();
}
void OnCallbackException(object sender, CallbackExceptionEventArgs e)
{
if (_disposed) return;
TryConnect();
}
void OnConnectionShutdown(object sender, ShutdownEventArgs reason)
{
if (_disposed) return;
TryConnect();
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
namespace Zack.EventBus
{
class SubscriptionsManager
{
//key是eventName,值是监听这个事件的实现了IIntegrationEventHandler接口的类型
private readonly Dictionary<string, List<Type>> _handlers = new Dictionary<string, List<Type>>();
public event EventHandler<string> OnEventRemoved;
public bool IsEmpty => !_handlers.Keys.Any();
public void Clear() => _handlers.Clear();
/// <summary>
/// 把eventHandlerType类型(实现了eventHandlerType接口)注册为监听了eventName事件
/// </summary>
/// <param name="eventName"></param>
/// <param name="eventHandlerType"></param>
public void AddSubscription(string eventName, Type eventHandlerType)
{
if (!HasSubscriptionsForEvent(eventName))
{
_handlers.Add(eventName, new List<Type>());
}
//如果已经注册过,则报错
if (_handlers[eventName].Contains(eventHandlerType))
{
throw new ArgumentException($"Handler Type {eventHandlerType} already registered for '{eventName}'", nameof(eventHandlerType));
}
_handlers[eventName].Add(eventHandlerType);
}
public void RemoveSubscription(string eventName, Type handlerType)
{
_handlers[eventName].Remove(handlerType);
if (!_handlers[eventName].Any())
{
_handlers.Remove(eventName);
OnEventRemoved?.Invoke(this, eventName);
}
}
/// <summary>
/// 得到名字为eventName的所有监听者
/// </summary>
/// <param name="eventName"></param>
/// <returns></returns>
public IEnumerable<Type> GetHandlersForEvent(string eventName) => _handlers[eventName];
/// <summary>
/// 是否有类型监听eventName这个事件
/// </summary>
/// <param name="eventName"></param>
/// <returns></returns>
public bool HasSubscriptionsForEvent(string eventName) => _handlers.ContainsKey(eventName);
}
}
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
namespace Zack.EventBus
{
public static class ServicesCollectionExtensions
{
public static IServiceCollection AddEventBus(this IServiceCollection services, string queueName,
params Assembly[] assemblies)
{
return AddEventBus(services, queueName, assemblies.ToList());
}
public static IServiceCollection AddEventBus(this IServiceCollection services, string queueName,
IEnumerable<Assembly> assemblies)
{
List<Type> eventHandlers = new List<Type>();
foreach (var asm in assemblies)
{
//用GetTypes(),这样非public类也能注册
var types = asm.GetTypes().Where(t => t.IsAbstract == false && t.IsAssignableTo(typeof(IIntegrationEventHandler)));
eventHandlers.AddRange(types);
}
return AddEventBus(services, queueName, eventHandlers);
}
/// <summary>
///
/// </summary>
/// <param name="services"></param>
/// <param name="queueName">如果多个消费者订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。为了确保一个应用监听到所有的领域事件,所以不同前端项目的queueName需要不一样。
/// 因此,对于同一个应用,这个queueName需要保证在多个集群实例和多次运行保持一致,这样可以保证应用重启后仍然能收到没来得及处理的消息。而且这样同一个应用的多个集群实例只有一个能收到一条消息,不会同一条消息被一个应用的多个实例处理。这样消息的处理就被平摊到多个实例中。
///</param>
/// <param name="eventHandlerTypes"></param>
/// <returns></returns>
public static IServiceCollection AddEventBus(this IServiceCollection services, string queueName, IEnumerable<Type> eventHandlerTypes)
{
foreach (Type type in eventHandlerTypes)
{
services.AddScoped(type, type);
}
services.AddSingleton<IEventBus>(sp =>
{
//如果注册服务的时候就要读取配置,那么可以用AddSingleton的Func<IServiceProvider, TService> 这个重载,
//因为可以拿到IServiceProvider,省得自己构建IServiceProvider
var optionMQ = sp.GetRequiredService<IOptions<IntegrationEventRabbitMQOptions>>().Value;
var factory = new ConnectionFactory()
{
HostName = optionMQ.HostName,
DispatchConsumersAsync = true
};
//eventBus归DI管理,释放的时候会调用Dispose
//eventbus的Dispose中会销毁RabbitMQConnection
RabbitMQConnection mqConnection = new RabbitMQConnection(factory);
var serviceScopeFactory = sp.GetRequiredService<IServiceScopeFactory>();
var eventBus = new RabbitMQEventBus(mqConnection, serviceScopeFactory, optionMQ.ExchangeName, queueName);
//遍历所有实现了IIntegrationEventHandler接口的类,然后把它们批量注册到eventBus
foreach (Type type in eventHandlerTypes)
{
//获取类上标注的EventNameAttribute,EventNameAttribute的Name为要监听的事件的名字
//允许监听多个事件,但是不能为空
var eventNameAttrs = type.GetCustomAttributes<EventNameAttribute>();
if (eventNameAttrs.Any() == false)
{
throw new ApplicationException($"There shoule be at least one EventNameAttribute on {type}");
}
foreach (var eventNameAttr in eventNameAttrs)
{
eventBus.Subscribe(eventNameAttr.Name, type);
}
}
return eventBus;
});
return services;
}
}
}
using Microsoft.AspNetCore.Builder;
using System;
namespace Zack.EventBus
{
public static class ApplicationBuilderExtensions
{
public static IApplicationBuilder UseEventBus(this IApplicationBuilder appBuilder)
{
//获得IEventBus一次,就会立即加载IEventBus,这样扫描所有的EventHandler,保证消息及时消费
object? eventBus = appBuilder.ApplicationServices.GetService(typeof(IEventBus));
if (eventBus == null)
{
throw new ApplicationException("找不到IEventBus实例");
}
return appBuilder;
}
}
}
using Microsoft.Extensions.DependencyInjection;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Diagnostics;
using System.Reflection;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
namespace Zack.EventBus;
class RabbitMQEventBus : IEventBus, IDisposable
{
private IModel _consumerChannel;
private readonly string _exchangeName;
private string _queueName;
private readonly RabbitMQConnection _persistentConnection;
private readonly SubscriptionsManager _subsManager;
private readonly IServiceProvider _serviceProvider;
private readonly IServiceScope serviceScope;
public RabbitMQEventBus(RabbitMQConnection persistentConnection,
IServiceScopeFactory serviceProviderFactory, string exchangeName, string queueName)
{
this._persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
this._subsManager = new SubscriptionsManager();
this._exchangeName = exchangeName;
this._queueName = queueName;
//因为RabbitMQEventBus是Singleton对象,而它创建的IIntegrationEventHandler以及用到的IIntegrationEventHandler用到的服务
//大部分是Scoped,因此必须这样显式创建一个scope,否则在getservice的时候会报错:Cannot resolvefrom root provider because it requires scoped service
this.serviceScope = serviceProviderFactory.CreateScope();
this._serviceProvider = serviceScope.ServiceProvider;
this._consumerChannel = CreateConsumerChannel();
this._subsManager.OnEventRemoved += SubsManager_OnEventRemoved; ;
}
private void SubsManager_OnEventRemoved(object? sender, string eventName)
{
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
using (var channel = _persistentConnection.CreateModel())
{
channel.QueueUnbind(queue: _queueName,
exchange: _exchangeName,
routingKey: eventName);
if (_subsManager.IsEmpty)
{
_queueName = string.Empty;
_consumerChannel.Close();
}
}
}
public void Publish(string eventName, object? eventData)
{
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
//Channel 是建立在 Connection 上的虚拟连接
//创建和销毁 TCP 连接的代价非常高,
//Connection 可以创建多个 Channel ,Channel 不是线程安全的所以不能在线程间共享。
using (var channel = _persistentConnection.CreateModel())
{
channel.ExchangeDeclare(exchange: _exchangeName, type: "direct");
byte[] body;
if (eventData == null)
{
body = new byte[0];
}
else
{
JsonSerializerOptions options = new JsonSerializerOptions
{
WriteIndented = true
};
body = JsonSerializer.SerializeToUtf8Bytes(eventData, eventData.GetType(), options);
}
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2; // persistent
channel.BasicPublish(
exchange: _exchangeName,
routingKey: eventName,
mandatory: true,
basicProperties: properties,
body: body);
}
}
public void Subscribe(string eventName, Type handlerType)
{
CheckHandlerType(handlerType);
DoInternalSubscription(eventName);
_subsManager.AddSubscription(eventName, handlerType);
StartBasicConsume();
}
private void DoInternalSubscription(string eventName)
{
var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
if (!containsKey)
{
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
_consumerChannel.QueueBind(queue: _queueName,
exchange: _exchangeName,
routingKey: eventName);
}
}
private void CheckHandlerType(Type handlerType)
{
if (!typeof(IIntegrationEventHandler).IsAssignableFrom(handlerType))
{
throw new ArgumentException($"{handlerType} doesn't inherit from IIntegrationEventHandler", nameof(handlerType));
}
}
public void Unsubscribe(string eventName, Type handlerType)
{
CheckHandlerType(handlerType);
_subsManager.RemoveSubscription(eventName, handlerType);
}
public void Dispose()
{
if (_consumerChannel != null)
{
_consumerChannel.Dispose();
}
_subsManager.Clear();
this._persistentConnection.Dispose();
this.serviceScope.Dispose();
}
private void StartBasicConsume()
{
if (_consumerChannel != null)
{
var consumer = new AsyncEventingBasicConsumer(_consumerChannel);
consumer.Received += Consumer_Received;
_consumerChannel.BasicConsume(
queue: _queueName,
autoAck: false,
consumer: consumer);
}
}
private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)
{
var eventName = eventArgs.RoutingKey;//这个框架中,就是用eventName当RoutingKey
var message = Encoding.UTF8.GetString(eventArgs.Body.Span);//框架要求所有的消息都是字符串的json
try
{
await ProcessEvent(eventName, message);
//如果在获取消息时采用不自动应答,但是获取消息后不调用basicAck,
//RabbitMQ会认为消息没有投递成功,不仅所有的消息都会保留到内存中,
//而且在客户重新连接后,会将消息重新投递一遍。这种情况无法完全避免,因此EventHandler的代码需要幂等
_consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false);
//multiple:批量确认标志。如果值为true,则执行批量确认,此deliveryTag之前收到的消息全部进行确认; 如果值为false,则只对当前收到的消息进行确认
}
catch (Exception ex)
{
//requeue:表示如何处理这条消息,如果值为true,则重新放入RabbitMQ的发送队列,如果值为false,则通知RabbitMQ销毁这条消息
//_consumerChannel.BasicReject(eventArgs.DeliveryTag, true);
Debug.Fail(ex.ToString());
}
}
private IModel CreateConsumerChannel()
{
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
var channel = _persistentConnection.CreateModel();
channel.ExchangeDeclare(exchange: _exchangeName,
type: "direct");
channel.QueueDeclare(queue: _queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.CallbackException += (sender, ea) =>
{
/*
_consumerChannel.Dispose();
_consumerChannel = CreateConsumerChannel();
StartBasicConsume();*/
Debug.Fail(ea.ToString());
};
return channel;
}
private async Task ProcessEvent(string eventName, string message)
{
if (_subsManager.HasSubscriptionsForEvent(eventName))
{
var subscriptions = _subsManager.GetHandlersForEvent(eventName);
foreach (var subscription in subscriptions)
{
//各自在不同的Scope中,避免DbContext等的共享造成如下问题:
//The instance of entity type cannot be tracked because another instance
using var scope = this._serviceProvider.CreateScope();
IIntegrationEventHandler? handler = scope.ServiceProvider.GetService(subscription) as IIntegrationEventHandler;
if (handler == null)
{
throw new ApplicationException($"无法创建{subscription}类型的服务");
}
await handler.Handle(eventName, message);
}
}
else
{
string entryAsm = Assembly.GetEntryAssembly().GetName().Name;
Debug.WriteLine($"找不到可以处理eventName={eventName}的处理程序,entryAsm:{entryAsm}");
}
}
}
using System.Text.Json;
using System.Threading.Tasks;
namespace Zack.EventBus
{
public abstract class JsonIntegrationEventHandler<T> : IIntegrationEventHandler
{
public Task Handle(string eventName, string json)
{
T? eventData = JsonSerializer.Deserialize<T>(json);
return HandleJson(eventName, eventData);
}
public abstract Task HandleJson(string eventName, T? eventData);
}
}
namespace Zack.EventBus
{
public class IntegrationEventRabbitMQOptions
{
public string HostName { get; set; }
public string ExchangeName { get; set; }
}
}
using System.Threading.Tasks;
namespace Zack.EventBus
{
public interface IIntegrationEventHandler
{
//因为消息可能会重复发送,因此Handle内的实现需要是幂等的
Task Handle(string eventName, string eventData);
}
}
using System;
namespace Zack.EventBus
{
public interface IEventBus
{
void Publish(string eventName, object? eventData);
void Subscribe(string eventName, Type handlerType);
void Unsubscribe(string eventName, Type handlerType);
}
}
using System;
namespace Zack.EventBus
{
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
public class EventNameAttribute : Attribute
{
public EventNameAttribute(string name)
{
this.Name = name;
}
public string Name { get; init; }
}
}
using Dynamic.Json;
using System.Threading.Tasks;
namespace Zack.EventBus
{
public abstract class DynamicIntegrationEventHandler : IIntegrationEventHandler
{
public Task Handle(string eventName, string eventData)
{
//https://github.com/dotnet/runtime/issues/53195
//https://github.com/dotnet/core/issues/6444
//.NET 6目前不支持把json反序列化为dynamic,本来preview 4支持,但是在preview 7又去掉了
//所以暂时用Dynamic.Json来实现。
dynamic dynamicEventData = DJson.Parse(eventData);
return HandleDynamic(eventName, dynamicEventData);
}
public abstract Task HandleDynamic(string eventName, dynamic eventData);
}
}
本文版权归个人技术分享站点所有,发布者:chaoqiang,转转请注明出处:https://www.zhengchaoqiang.com/1669.html