using Books.Api.EventFlow.Infrastructure; using Books.Api.EventFlow.ReadModels; using Books.Api.EventFlow.Repositories; using Books.Api.Tests.Helpers; using Books.Api.Tests.Infrastructure; using Dapper; using AwesomeAssertions; using Microsoft.Extensions.DependencyInjection; using Npgsql; namespace Books.Api.Tests.Integration; /// /// Integration tests for the read model auto-repair/repopulation system. /// Tests verify that corrupt or out-of-sync read models can be repaired /// by replaying events from the event store. /// [Trait("Category", "Integration")] public class ReadModelRepopulationTests(TestWebApplicationFactory factory) : IntegrationTestBase(factory) { [Fact] public async Task RepopulateReadModel_FixesCorruptedStatus() { // Arrange: Create a draft through normal flow var graphqlClient = new GraphQLTestClient(Client); var companyId = await CreateCompanyAsync(graphqlClient, "Repopulation Test Company"); var draftId = await CreateDraftAsync(graphqlClient, companyId, "Test Draft for Repopulation"); // Wait for read model to be created var originalDraft = await Eventually.GetAsync(async () => { var repo = GetService(); return await repo.GetByIdAsync(draftId); }); originalDraft.Should().NotBeNull(); originalDraft!.Status.Should().Be("active"); // Corrupt the read model: Set status to something wrong await CorruptReadModelStatusAsync(draftId, "posted"); // Verify corruption var corruptedDraft = await GetService().GetByIdAsync(draftId); corruptedDraft!.Status.Should().Be("posted"); // Corrupted! // Act: Repopulate the read model var response = await graphqlClient.MutateAsync(""" mutation Repopulate($aggregateId: String!, $readModelType: String!) { repopulateReadModel(aggregateId: $aggregateId, readModelType: $readModelType) } """, new { aggregateId = draftId, readModelType = "JournalEntryDraftReadModel" }); // Assert: Repopulation succeeded response.EnsureNoErrors(); response.Data!.RepopulateReadModel.Should().BeTrue(); // Verify read model is now fixed var fixedDraft = await GetService().GetByIdAsync(draftId); fixedDraft!.Status.Should().Be("active"); // Fixed! } [Fact] public async Task RepopulateReadModel_FixesMissingName() { // Arrange var graphqlClient = new GraphQLTestClient(Client); var companyId = await CreateCompanyAsync(graphqlClient, "Missing Name Test Company"); var draftId = await CreateDraftAsync(graphqlClient, companyId, "Original Draft Name"); await Eventually.GetAsync(async () => { var repo = GetService(); return await repo.GetByIdAsync(draftId); }); // Corrupt: Clear the name await CorruptReadModelNameAsync(draftId, ""); var corruptedDraft = await GetService().GetByIdAsync(draftId); corruptedDraft!.Name.Should().BeEmpty(); // Act var response = await graphqlClient.MutateAsync(""" mutation Repopulate($aggregateId: String!, $readModelType: String!) { repopulateReadModel(aggregateId: $aggregateId, readModelType: $readModelType) } """, new { aggregateId = draftId, readModelType = "JournalEntryDraftReadModel" }); // Assert response.EnsureNoErrors(); var fixedDraft = await GetService().GetByIdAsync(draftId); fixedDraft!.Name.Should().Be("Original Draft Name"); } [Fact] public async Task RepopulateReadModel_RestoresUpdatedData() { // Arrange: Create and update a draft var graphqlClient = new GraphQLTestClient(Client); var companyId = await CreateCompanyAsync(graphqlClient, "Updated Data Test Company"); var draftId = await CreateDraftAsync(graphqlClient, companyId, "Initial Name"); await Eventually.GetAsync(async () => { var repo = GetService(); return await repo.GetByIdAsync(draftId); }); // Update the draft with new data await graphqlClient.MutateAsync(""" mutation UpdateDraft($input: UpdateJournalEntryDraftInput!) { updateJournalEntryDraft(input: $input) { id name description } } """, new { input = new { id = draftId, name = "Updated Name", description = "Test Description", documentDate = "2025-01-15", lines = Array.Empty() } }); // Wait for update await Eventually.GetAsync(async () => { var repo = GetService(); var draft = await repo.GetByIdAsync(draftId); return draft?.Name == "Updated Name" ? draft : null; }); // Corrupt: Revert to old data await CorruptReadModelNameAsync(draftId, "Wrong Name"); await CorruptReadModelDescriptionAsync(draftId, "Wrong Description"); var corruptedDraft = await GetService().GetByIdAsync(draftId); corruptedDraft!.Name.Should().Be("Wrong Name"); // Act var response = await graphqlClient.MutateAsync(""" mutation Repopulate($aggregateId: String!, $readModelType: String!) { repopulateReadModel(aggregateId: $aggregateId, readModelType: $readModelType) } """, new { aggregateId = draftId, readModelType = "JournalEntryDraftReadModel" }); // Assert response.EnsureNoErrors(); var fixedDraft = await GetService().GetByIdAsync(draftId); fixedDraft!.Name.Should().Be("Updated Name"); fixedDraft.Description.Should().Be("Test Description"); } [Fact] public async Task RepopulateReadModel_FixesOutOfSyncSequenceNumber() { // Arrange var graphqlClient = new GraphQLTestClient(Client); var companyId = await CreateCompanyAsync(graphqlClient, "Sequence Number Test Company"); var draftId = await CreateDraftAsync(graphqlClient, companyId, "Sequence Test Draft"); await Eventually.GetAsync(async () => { var repo = GetService(); return await repo.GetByIdAsync(draftId); }); // Update draft multiple times to increase sequence number for (int i = 0; i < 3; i++) { await graphqlClient.MutateAsync(""" mutation UpdateDraft($input: UpdateJournalEntryDraftInput!) { updateJournalEntryDraft(input: $input) { id } } """, new { input = new { id = draftId, name = $"Update {i + 1}", lines = Array.Empty() } }); await Task.Delay(100); } await Eventually.GetAsync(async () => { var repo = GetService(); var draft = await repo.GetByIdAsync(draftId); return draft?.Name == "Update 3" ? draft : null; }); // Corrupt: Set sequence number to 1 (old) await CorruptReadModelSequenceNumberAsync(draftId, 1); await CorruptReadModelNameAsync(draftId, "Old Name"); // Act var response = await graphqlClient.MutateAsync(""" mutation Repopulate($aggregateId: String!, $readModelType: String!) { repopulateReadModel(aggregateId: $aggregateId, readModelType: $readModelType) } """, new { aggregateId = draftId, readModelType = "JournalEntryDraftReadModel" }); // Assert response.EnsureNoErrors(); var fixedDraft = await GetService().GetByIdAsync(draftId); fixedDraft!.Name.Should().Be("Update 3"); // Latest name from events } [Fact] public async Task RepopulateReadModel_FailsForNonExistentAggregate() { // Arrange var graphqlClient = new GraphQLTestClient(Client); var companyId = await CreateCompanyAsync(graphqlClient, "Non-existent Aggregate Company"); var nonExistentId = $"journalentrydraft-{Guid.NewGuid():D}"; // Act var response = await graphqlClient.MutateAsync(""" mutation Repopulate($aggregateId: String!, $readModelType: String!) { repopulateReadModel(aggregateId: $aggregateId, readModelType: $readModelType) } """, new { aggregateId = nonExistentId, readModelType = "JournalEntryDraftReadModel" }); // Assert: Should fail or succeed without effect (no events to replay) // The behavior depends on implementation - either error or silent success // Since there are no events, it should complete but read model won't exist if (!response.HasErrors) { var draft = await GetService().GetByIdAsync(nonExistentId); draft.Should().BeNull(); } } [Fact] public async Task RepopulateReadModel_FailsForInvalidReadModelType() { // Arrange var graphqlClient = new GraphQLTestClient(Client); var companyId = await CreateCompanyAsync(graphqlClient, "Invalid Type Company"); var draftId = await CreateDraftAsync(graphqlClient, companyId, "Invalid Type Test"); await Eventually.GetAsync(async () => { var repo = GetService(); return await repo.GetByIdAsync(draftId); }); // Act var response = await graphqlClient.MutateAsync(""" mutation Repopulate($aggregateId: String!, $readModelType: String!) { repopulateReadModel(aggregateId: $aggregateId, readModelType: $readModelType) } """, new { aggregateId = draftId, readModelType = "NonExistentReadModel" }); // Assert response.HasErrors.Should().BeTrue(); var errorDetails = response.Errors! .SelectMany(e => new[] { e.Message, e.Extensions?.GetValueOrDefault("details")?.ToString() ?? "" }); var errorInfo = string.Join(" ", errorDetails); errorInfo.Should().Contain("Unknown read model type"); } [Fact] public async Task ListReadModelTypes_ReturnsAvailableTypes() { // Arrange var graphqlClient = new GraphQLTestClient(Client); var companyId = await CreateCompanyAsync(graphqlClient, "List Types Company"); // Act - listReadModelTypes is a mutation field (admin endpoint) var response = await graphqlClient.MutateAsync(""" mutation { listReadModelTypes } """); // Assert response.EnsureNoErrors(); response.Data!.ListReadModelTypes.Should().Contain("JournalEntryDraftReadModel"); response.Data.ListReadModelTypes.Should().Contain("AccountReadModel"); response.Data.ListReadModelTypes.Should().Contain("FiscalYearReadModel"); response.Data.ListReadModelTypes.Should().Contain("CompanyReadModel"); } #region Database Corruption Helpers private async Task CorruptReadModelStatusAsync(string aggregateId, string newStatus) { var dataSource = GetService(); await using var conn = await dataSource.OpenConnectionAsync(); await conn.ExecuteAsync( "UPDATE journal_entry_draft_read_models SET status = @status WHERE aggregate_id = @id", new { status = newStatus, id = aggregateId }); } private async Task CorruptReadModelNameAsync(string aggregateId, string newName) { var dataSource = GetService(); await using var conn = await dataSource.OpenConnectionAsync(); await conn.ExecuteAsync( "UPDATE journal_entry_draft_read_models SET name = @name WHERE aggregate_id = @id", new { name = newName, id = aggregateId }); } private async Task CorruptReadModelDescriptionAsync(string aggregateId, string newDescription) { var dataSource = GetService(); await using var conn = await dataSource.OpenConnectionAsync(); await conn.ExecuteAsync( "UPDATE journal_entry_draft_read_models SET description = @description WHERE aggregate_id = @id", new { description = newDescription, id = aggregateId }); } private async Task CorruptReadModelSequenceNumberAsync(string aggregateId, int newSequenceNumber) { var dataSource = GetService(); await using var conn = await dataSource.OpenConnectionAsync(); await conn.ExecuteAsync( "UPDATE journal_entry_draft_read_models SET last_aggregate_sequence_number = @seq WHERE aggregate_id = @id", new { seq = newSequenceNumber, id = aggregateId }); } #endregion #region Helper Methods private async Task CreateCompanyAsync(GraphQLTestClient client, string name) { var response = await client.MutateAsync(""" mutation CreateCompany($input: CreateCompanyInput!) { createCompany(input: $input) { id } } """, new { input = new { name } }); response.EnsureNoErrors(); var companyId = response.Data!.CreateCompany!.Id; client.SetCompanyId(companyId); return companyId; } private async Task CreateDraftAsync(GraphQLTestClient client, string companyId, string name) { var response = await client.MutateAsync(""" mutation CreateDraft($input: CreateJournalEntryDraftInput!) { createJournalEntryDraft(input: $input) { id } } """, new { input = new { companyId, name } }); response.EnsureNoErrors(); return response.Data!.CreateJournalEntryDraft!.Id; } #endregion #region Response DTOs private class RepopulateResponse { public bool RepopulateReadModel { get; set; } } private class RepopulateNullableResponse { public bool? RepopulateReadModel { get; set; } } private class ListTypesResponse { public List ListReadModelTypes { get; set; } = []; } private class CreateCompanyResponse { public CompanyDto? CreateCompany { get; set; } } private class CreateDraftResponse { public DraftDto? CreateJournalEntryDraft { get; set; } } private class UpdateDraftResponse { public DraftDto? UpdateJournalEntryDraft { get; set; } } private class CompanyDto { public string Id { get; set; } = string.Empty; } private class DraftDto { public string Id { get; set; } = string.Empty; public string Name { get; set; } = string.Empty; public string? Description { get; set; } public string Status { get; set; } = string.Empty; } #endregion }