books/backend/Books.Api/EventFlow/Infrastructure/DispatchToSubscriberResilienceStrategy.cs

239 lines
8.4 KiB
C#
Raw Normal View History

using System.Reflection;
using Books.Api.Infrastructure;
using EventFlow.Aggregates;
using EventFlow.EventStores;
using EventFlow.Subscribers;
using Hangfire;
namespace Books.Api.EventFlow.Infrastructure;
#pragma warning disable CS9113 // Parameter is unread
public class DispatchToSubscriberResilienceStrategy(
ILogger<DispatchToSubscriberResilienceStrategy> logger,
IScheduler scheduler,
IServiceProvider serviceProvider,
IEventJsonSerializer eventJsonSerializer) : IDispatchToSubscriberResilienceStrategy
#pragma warning restore CS9113
{
public Task BeforeHandleEventAsync(
ISubscribe subscriberTo,
IDomainEvent domainEvent,
CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public Task HandleEventFailedAsync(
ISubscribe subscriberTo,
IDomainEvent domainEvent,
Exception exception,
bool swallowException,
CancellationToken cancellationToken)
{
var subscriberType = GetSubscriberType(subscriberTo);
var eventType = domainEvent.EventType.Name;
var aggregateId = domainEvent.GetIdentity()?.Value ?? "unknown";
logger.LogError(exception,
"[RESILIENCE] Subscriber {SubscriberType} failed to handle event {EventType} for aggregate {AggregateId}",
subscriberType.Name,
eventType,
aggregateId);
try
{
var serializedEvent = eventJsonSerializer.Serialize(domainEvent);
var domainEventInterface = domainEvent.GetType().GetTypeInfo()
.GetInterfaces()
.FirstOrDefault(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IDomainEvent<,,>));
if (domainEventInterface != null)
{
scheduler.EnqueueJob<SubscriberRetryJob>(
job => job.RetryEventDispatchAsync(
subscriberType.AssemblyQualifiedName!,
domainEventInterface.AssemblyQualifiedName!,
serializedEvent.SerializedData,
serializedEvent.SerializedMetadata),
TimeSpan.FromSeconds(30));
logger.LogWarning(
"[RESILIENCE] Scheduled retry job for {SubscriberType} / {EventType} / {AggregateId}",
subscriberType.Name,
eventType,
aggregateId);
}
}
catch (Exception e)
{
logger.LogError(e,
"[RESILIENCE] Failed to schedule retry for {SubscriberType} / {EventType}. " +
"This event will NOT be retried and may cause data inconsistency.",
subscriberType.Name,
eventType);
}
return Task.CompletedTask;
}
public Task HandleEventSucceededAsync(
ISubscribe subscriberTo,
IDomainEvent domainEvent,
CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public Task BeforeDispatchToSubscribersAsync(
IDomainEvent domainEvent,
IReadOnlyCollection<IDomainEvent> domainEvents,
CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public Task DispatchToSubscribersSucceededAsync(
IDomainEvent domainEvent,
IReadOnlyCollection<IDomainEvent> domainEvents,
CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public Task<bool> HandleDispatchToSubscribersFailedAsync(
IDomainEvent domainEvent,
IReadOnlyCollection<IDomainEvent> domainEvents,
Exception exception,
CancellationToken cancellationToken)
{
return Task.FromResult(true);
}
private static Type GetSubscriberType(ISubscribe subscriberTo)
{
if (subscriberTo is ISubscribeDecorator decorator)
{
return GetSubscriberType(decorator.InnerInstance as ISubscribe
?? throw new InvalidOperationException("InnerInstance is not ISubscribe"));
}
return subscriberTo.GetType();
}
}
public interface ISubscribeDecorator
{
object InnerInstance { get; }
}
public class SubscriberRetryJob(
ILogger<SubscriberRetryJob> logger,
IServiceProvider serviceProvider,
IEventJsonSerializer eventJsonSerializer)
{
[AutomaticRetry(Attempts = 3, DelaysInSeconds = [30, 60, 120])]
public async Task RetryEventDispatchAsync(
string subscriberTypeName,
string domainEventTypeName,
string serializedData,
string serializedMetadata)
{
logger.LogInformation(
"[RETRY] Retrying event dispatch to {SubscriberType}",
subscriberTypeName);
try
{
var domainEvent = eventJsonSerializer.Deserialize(serializedData, serializedMetadata);
var subscriberType = Type.GetType(subscriberTypeName);
if (subscriberType == null)
{
logger.LogError("[RETRY] Could not resolve subscriber type: {SubscriberType}", subscriberTypeName);
return;
}
var subscriber = FindSubscriberInstance(subscriberType, domainEvent);
if (subscriber == null)
{
logger.LogError("[RETRY] Could not find subscriber instance for {SubscriberType}", subscriberTypeName);
return;
}
var domainEventType = domainEvent.GetType();
var handleAsyncMethod = subscriber.GetType().GetMethods()
.FirstOrDefault(m => m.Name == "HandleAsync" &&
m.GetParameters().Any(p => p.ParameterType.IsAssignableFrom(domainEventType)));
if (handleAsyncMethod != null)
{
await (Task)handleAsyncMethod.Invoke(subscriber, [domainEvent, CancellationToken.None])!;
logger.LogInformation("[RETRY] Successfully retried event dispatch to {SubscriberType}", subscriberTypeName);
}
else
{
logger.LogError(
"[RETRY] No matching HandleAsync method found for {SubscriberType} and {DomainEventType}",
subscriberTypeName,
domainEventTypeName);
}
}
catch (Exception ex)
{
logger.LogError(ex, "[RETRY] Failed to retry event dispatch to {SubscriberType}", subscriberTypeName);
throw;
}
}
private object? FindSubscriberInstance(Type subscriberType, object domainEvent)
{
var interfaceType = ExtractSubscriberInterface(subscriberType, domainEvent);
if (interfaceType == null)
return null;
var instances = serviceProvider.GetServices(interfaceType);
return instances.FirstOrDefault(x =>
x?.GetType() == subscriberType ||
HasInnerType(x, subscriberType));
}
private static Type? ExtractSubscriberInterface(Type subscriberType, object domainEvent)
{
var domainEventType = domainEvent.GetType();
var interfaces = subscriberType.GetInterfaces()
.Where(i => i.IsGenericType &&
(i.GetGenericTypeDefinition() == typeof(ISubscribeAsynchronousTo<,,>) ||
i.GetGenericTypeDefinition() == typeof(ISubscribeSynchronousTo<,,>)));
foreach (var iface in interfaces)
{
var genericArguments = iface.GetGenericArguments();
var eventGenericArgs = domainEventType.GenericTypeArguments;
if (eventGenericArgs.Length >= 3 && genericArguments[2] == eventGenericArgs[2])
{
return iface;
}
}
return null;
}
private static bool HasInnerType(object? obj, Type concreteType)
{
if (obj == null) return false;
var decoratorType = obj.GetType();
var field = decoratorType.GetFields(BindingFlags.NonPublic | BindingFlags.Instance)
.FirstOrDefault(x => x.Name.Contains("inner", StringComparison.InvariantCultureIgnoreCase));
var property = decoratorType.GetProperties(BindingFlags.NonPublic | BindingFlags.Instance)
.FirstOrDefault(x => x.Name.Contains("inner", StringComparison.InvariantCultureIgnoreCase));
if (field?.GetValue(obj)?.GetType() == concreteType)
return true;
if (property?.GetValue(obj)?.GetType() == concreteType)
return true;
return false;
}
}