[Home] Edit Article
Author Information
File Uploads
Edit Article XML Content
<document> <header> <issuecode /> <articlecode /> <zone /> <title>Implementing the Outbox Pattern with Kafka and C#</title> <authors /> <copyright>CODE Magazine</copyright> <owner>CODE Magazine</owner> </header> <body> <p id="0">The outbox pattern is a proven and scalable software design pattern often used in a distributed environment to publish events reliably and enforce data consistency. This article presents an overview of the outbox pattern and examines how it can be implemented using Kafka and C# in ASP.NET Core applications.</p> <p id="1">If you’re to work with the code examples discussed in this article, you need the following installed in your system:</p> <list type="bulleted"> <bulletedlist>Visual Studio 2022 </bulletedlist> <bulletedlist>.NET 8.0</bulletedlist> <bulletedlist>ASP.NET Core 8.0 Runtime</bulletedlist> </list> <p id="2">If you don’t already have Visual Studio 2022 installed on your computer, you can download it from here: https://visualstudio.microsoft.com/downloads/.</p> <p id="3">In this article, I'll examine the following points:</p> <list type="bulleted"> <bulletedlist>An overview of the outbox pattern and its benefits and downsides</bulletedlist> <bulletedlist>An introduction to Kafka as a message broker</bulletedlist> <bulletedlist>How the transaction outbox pattern works</bulletedlist> <bulletedlist>How to implement the outbox pattern in ASP.NET Core, Kafka, and C#</bulletedlist> <bulletedlist>How to implement the consumer application to consume messages stored in Kafka</bulletedlist> </list> <h2>A Real-World Use Case</h2> <p id="4">Typically, microservices-based applications maintain their own data store to persist and retrieve data. This strategy helps improve the application’s scalability and promotes agility in the deployment process. Additionally, these applications take advantage of messaging to facilitate asynchronous communication between the services. There are several messaging solutions for you to choose from such as Kafka, Azure Service Bus, RabbitMQ, etc.</p> <h3>Understanding the Problem</h3> <p id="5">Consider the flow of a typical Order Processing application at a glance:</p> <list type="numbered"> <numberedlist>The client application sends a request to the API to persist the data to the underlying database.</numberedlist> <numberedlist>The API validates the incoming request, executes the necessary business logic, and persists the data in the database, as shown in <b>Figure 1</b>.</numberedlist> </list> <figure id="1" src="image1.png"> <b>Figure 1</b> <b>:</b> The flow of a typical Order Processing application </figure> <list type="numbered"> <numberedlist>Once the data is stored in the database, the API calls a message broker to publish the event so that other services can consume it if needed, as shown in <b>Figure 2.</b></numberedlist> </list> <figure id="2" src="image2.png"> <b>Figure 2</b> <b>:</b> The API calls the message broker to publish the event. </figure> <p id="6">What’s wrong with this? Although the flow may appear straightforward, complications can arise. For instance, if the message broker is unavailable, there’s an error in publishing the messages or a runtime exception occurs in the application. As shown in <b>Figure 3</b>, the ordering service and the data can become inconsistent. </p> <figure id="3" src="image3.png"> <b>Figure 3</b> <b>:</b> <b> </b> <b> <b>An e</b> </b> <b>rror</b> in publishing messages</figure> <p id="7">Addressing this issue can be challenging, as writing code to solve it may lead to a violation of the Single Responsibility Principle, potentially requiring additional responsibilities for the OrderService. Here’s where the transactional outbox pattern comes to the rescue.</p> <h2>An Overview of the Outbox Pattern</h2> <p id="8">The outbox pattern is a technique used in microservices architectures to guarantee reliable communication between services. For this purpose, a temporary storage area is created in the database of a microservice, often referred to as an <b>outbox table</b>, in which the outgoing data changes are recorded. The data is serialized before being sent from the outbox table to other external systems or services. In distributed systems, duplicate writes occur when an operation must update a local database and notify other services. </p> <p id="9">The outbox design solves this problem by combining the publication of messages/events with the same database transaction that updates the local state—making both actions atomic and, therefore, complete or incomplete. In the outbox pattern, you won’t be publishing an event to the message broker directly. Instead, a record will be inserted into a special database named Outbox. Incidentally, the Outbox database table is stored in the same database where the data of the application is being stored. Then an asynchronous process executes in the background and periodically checks for incoming messages. When it finds one, it publishes the message to a message broker (i.e., Apache Kafka, RabbitMQ, etc.), as shown in <b>Figure 4</b>.</p> <figure id="4" src="image4.png"> <b>Figure 4</b> <b>:</b> Demonstrating the Outbox Pattern</figure> <h2>Components of the Outbox Pattern</h2> <p id="10">The outbox pattern is comprised of the following key components: </p> <list type="bulleted"> <bulletedlist> <b>Events:</b> At runtime, an application generates events that are stored in the Outbox table.</bulletedlist> <bulletedlist> <b>Outbox table:</b> This data store is used to store the messages until they are processed by the publisher.</bulletedlist> <bulletedlist> <b>Publisher:</b> This component is used to process messages from the outbox table and then dispatch them to the appropriate destinations</bulletedlist> </list> <h2>Benefits and Downsides</h2> <p id="11">The outbox pattern is a proven technique widely used for enhancing data consistency and reliability in microservices architectures. However, like any architectural pattern, it has its benefits and downsides.</p> <h3>Benefits</h3> <p id="12">The following are the benefits of the outbox pattern:</p> <list type="bulleted"> <bulletedlist> <b>Transactional </b> <b>c</b> <b>onsistency:</b> The outbox pattern ensures that any changes to the database residing locally and the publishing of events/messages are part of the same database transaction. This prevents data inconsistencies that might occur due to operation failures after the database changes are committed but before the events are published. By sending the messages as part of the same transaction, the outbox pattern maintains atomicity and consistency. Moreover, the outbox pattern enables events to be retried until they have been successfully processed.</bulletedlist> <bulletedlist> <b>Reliable </b> <b>m</b> <b>essage </b> <b>p</b> <b>ublishing:</b> The outbox pattern ensures reliable message sending by taking advantage of a persistent storage mechanism where the messages or events are stored in the database, thereby preventing message loss due to failures in the event publishing mechanism or network issues. This persistent storage of messages coupled with asynchronous message sending helps decouple the message dispatch process from the application's response, thereby minimizing the chances of message loss and improving resiliency.</bulletedlist> <bulletedlist> <b>Scalability:</b> The outbox pattern enhances scalability of an application by separating the concerns of event creation from event distribution. It enables an application to focus on processing incoming requests without interruption by offloading message transmission to a different process or service.</bulletedlist> <bulletedlist> <b>Performance:</b> The outbox pattern improves performance by sending messages asynchronously, thereby processing the messages without any delays to enhance responsiveness and throughput. Storing the events in the same database transaction as the business operations minimizes the number of total transactions executed, which can enhance performance.</bulletedlist> </list> <h3>Downsides</h3> <p id="13">Despite the benefits of the outbox pattern stated earlier, there are certain downsides as well:</p> <list type="bulleted"> <bulletedlist> <b>Increased </b> <b>c</b> <b>omplexity:</b> The outbox pattern can introduce additional complexity to your architecture, application design, and development. By implementing this pattern, you need to manage the outbox processor and verify whether the messages are recorded correctly, thereby making your application difficult to manage, maintain, and debug.</bulletedlist> <bulletedlist> <b>Latency:</b> When using the outbox pattern, there might be a delay from the time an event occurs and when it’s published to other services. It usually publishes events in batches at a regular time interval set by the outbox processor. However, this delay isn’t recommended in situations where an immediate or real-time communication is needed.</bulletedlist> <bulletedlist> <b>Database </b> <b>c</b> <b>oupling:</b> Because the outbox pattern depends on the database to store events before they’re published, there's an inherent coupling with the database, thereby making future changes more challenging.</bulletedlist> <bulletedlist> <b>Scalability:</b> Although the outbox pattern ensures that microservices are not tightly coupled and can be independently scaled, scaling your application may still be challenging. If an outbox processor isn’t working as expected or there’s too much traffic, you might have to consider scaling your infrastructure for processing outboxes.</bulletedlist> <bulletedlist> <b>Resiliency:</b> Although the outbox pattern ensures that events aren’t lost, there’s no built-in mechanism to handle failures while processing the events. You need to write your code to handle duplicate events, idempotency, retries, and failure recovery.</bulletedlist> <bulletedlist> <b>O</b> <b>perational overhead:</b> Monitoring the outbox repository regularly and adopting the strategy to transmit messages and potential retries can be an additional overhead. Keeping an eye on the outbox storage, ensuring that the messages are delivered, and dealing with errors or retries may be challenging.</bulletedlist> <bulletedlist> <b>Integration challenges:</b> You must take the necessary steps to ensure reliable message delivery. This may involve implementing additional configurations and dependencies when connecting to external systems like RabbitMQ and Apache Kafka.</bulletedlist> </list> <h2>How Does the Outbox Pattern Work?</h2> <p id="14">Assume an order processing application in which an order service is used to handle orders placed by the customer. As soon as a new order is placed, the application must update the order database table and also notify the inventory service that the stock should be updated. The complete flow is illustrated in the sequence of steps of a typical order processing use case given below:</p> <list type="numbered"> <numberedlist> <b>Create </b> <b>O</b> <b>rder:</b> A customer uses the application to place a new order. Next, the Order Service picks up this order and processes it.</numberedlist> <numberedlist> <b>Update </b> <b>O</b> <b>rder </b> <b>d</b> <b>atabase </b> <b>t</b> <b>able:</b> The Order Service stores the new order in the Order database table with information that includes the order details and the customer details together with the Order status.</numberedlist> <numberedlist> <b>Create Stock Update </b> <b>m</b> <b>essage:</b> Besides updating the Order database table, the Order Service creates a message to indicate that the Stock database table needs to be updated. This message is persisted in the Outbox database table within the context of the same transaction.</numberedlist> <numberedlist> <b>Commit </b> <b>t</b> <b>ransaction:</b> Next, the transaction that comprises updating the Order database table and insertion of the message into the Outbox database table is committed.</numberedlist> <numberedlist> <b>Check Outbox </b> <b>d</b> <b>atabase </b> <b>t</b> <b>able </b> <b>p</b> <b>eriodically:</b> A process named Message Relay executes in the background to check the Outbox database table for new messages.</numberedlist> <numberedlist> <b>Dispatch </b> <b>m</b> <b>essage to Stock </b> <b>s</b> <b>ervice:</b> When the Message Relay detects a new message in the Outbox database table, it forwards the message to the Stock service for processing via the Message queue.</numberedlist> <numberedlist> <b>Stock </b> <b>u</b> <b>pdate:</b> Upon receipt of a new message, the Stock service updates the Stock database table accordingly.</numberedlist> </list> <p id="15">In the outbox pattern, when an event occurs, instead of a message being sent to a message broker such as Kafka, a new record that contains the details of the event is stored in the Outbox database table. It should be noted that typically the Outbox table is part of the same database and this all happens as part of the same transaction. <b>Figure 5</b> illustrates how this all works together.</p> <figure id="5" src="image5.png"> <b>Figure </b> <b>5</b> <b>:</b> The Outbox Pattern in action </figure> <h2>Key Terminologies</h2> <p id="16">Here are the key terminologies you often come across when exploring the Outbox pattern:</p> <list type="bulleted"> <bulletedlist> <b>Event </b> <b>l</b> <b>og or the </b> <b>O</b> <b>utbox:</b> Each microservice maintains an event log or an Outbox where the events that have to be published are first recorded to ensure that the events are never lost. Essentially, an Outbox is a database table to store events or messages before the events are published. </bulletedlist> <bulletedlist> <b>Transactional </b> <b>o</b> <b>peration:</b> For the purpose of ensuring that only successful business operations are captured in the event log, events are added to the log as part of the same transaction as the business operations that generated them. </bulletedlist> <bulletedlist> <b>Outbox </b> <b>p</b> <b>rocessor:</b> An outbox processor runs as a separate background task and scans the event log at regular intervals of time. It collects events without processing them and then publishes them to the event bus or message broker. </bulletedlist> <bulletedlist> <b>Message </b> <b>b</b> <b>roker or </b> <b>e</b> <b>vent </b> <b>b</b> <b>us:</b> A message broker or an event bus receives those events that have been published. The message broker or event bus provides durability and reliability and distributes these events to the subscribers or the other microservices.</bulletedlist> </list> <h2>Introduction to Kafka as a Message Broker</h2> <p id="17">Apache Kafka is a distributed, high-performance, open source, scalable, and versatile stream-processing software. It’s written in Java and Scala, and has become popular in recent times for building systems that are adept at handling massive volumes of data. Kafka is a messaging platform based on the publish/subscribe model. It comes with built-in features for replication, partitioning, fault tolerance, and improved throughput, compared to applications without message brokers. </p> <p id="18">Kafka is a good option for applications that need vast data processing capabilities. The main use of Kafka is in creating real-time streaming data pipelines. For this reason, Kafka includes stream processing and fault-tolerant storage functionalities, which enable storing and analyzing historical and real-time data. <b>Figure 6</b> shows how Kafka fits in a typical implementation of the outbox pattern.</p> <figure id="6" src="image6.png"> <b>Figure </b> <b>6</b> <b>:</b> Using Kafka as the message broker in a typical Outbox pattern implementation</figure> <h2>Getting Started with Apache Kafka</h2> <p id="19">To get started with Apache Kafka, follow the steps outlined in the next few sections.</p> <h3>Download and Extract Kafka</h3> <p id="20">You can download Apache Kafka from here: https://kafka.apache.org/downloads.</p> <p id="21">Alternatively, you can run the following command at the terminal to download Kafka in your computer:</p> <codesnippet>wget»https://archive.apache.org/</codesnippet> <codesnippet>dist/kafka/3.7.0/</codesnippet> <codesnippet>kafka_2.13-3.7.0.tgz</codesnippet> <p id="22">Once Kafka has been downloaded, you should extract the zip archive by running the following command at the terminal window:</p> <codesnippet>tar»-xzf»kafka_2.13-3.7.0.tgz</codesnippet> <h3>Start ZooKeeper</h3> <p id="23">You need ZooKeeper to run Kafka on your computer. Once you've downloaded and extracted Apache Kafka to your computer, start ZooKeeper by running the following command at the terminal window:</p> <codesnippet>cd»kafka_2.13-3.7.0</codesnippet> <codesnippet>bin/windows/</codesnippet> <codesnippet>zookeeper-server-start.bat»</codesnippet> <codesnippet>config/zookeeper.properties</codesnippet> <h3>Create a Topic</h3> <p id="24">Now that ZooKeeper and Kafka are up and running in your computer, you can start creating topics. You can create a new topic by running the following command at the terminal window.</p> <codesnippet>.\bin\windows\</codesnippet> <codesnippet>kafka-topics.bat»--create»</codesnippet> <codesnippet>--topic»mynewtopic»</codesnippet> <codesnippet>--bootstrap-server»</codesnippet> <codesnippet>localhost:9092</codesnippet> <h3>Display All Kafka Messages in a Topic</h3> <p id="25">To display all messages in a particular topic, use the following command:</p> <codesnippet>».\bin\windows\</codesnippet> <codesnippet>kafka-console-consumer.bat»</codesnippet> <codesnippet>--bootstrap-server»</codesnippet> <codesnippet>localhost:9092»--topic»</codesnippet> <codesnippet>mynewtopic»--from-beginning</codesnippet> <h3>Display all Topics</h3> <p id="26">To list all topics, you can use the following command:</p> <codesnippet>.\bin\windows\kafka-topics.bat»</codesnippet> <codesnippet>--list»--bootstrap-server»</codesnippet> <codesnippet>localhost:9092</codesnippet> <h3>Start the Producer</h3> <p id="27">You can run the following command at the terminal window to start a producer.</p> <codesnippet>bin/windows/</codesnippet> <codesnippet>kafka-console-producer.bat»</codesnippet> <codesnippet>--topic»mynewtopic»</codesnippet> <codesnippet>--bootstrap-server»</codesnippet> <codesnippet>localhost:9092</codesnippet> <h3>Start the Consumer</h3> <p id="28">In Kafka, you need a consumer to read the messages produced by a producer. You can run the following command at the terminal window to read all messages sent by the producer as shown in <b>Figure 7</b>.</p> <codesnippet>bin/windows/</codesnippet> <codesnippet>kafka-console-consumer.bat»</codesnippet> <codesnippet>--topic»mynewtopic»</codesnippet> <codesnippet>--from-beginning»</codesnippet> <codesnippet>--bootstrap-server»</codesnippet> <codesnippet>localhost:9092</codesnippet> <figure id="7" src="image7.png"> <b>Figure </b> <b>7</b> <b>:</b> The Kafka consumer in action</figure> <h3>Shut Down Zookeeper and Kafka</h3> <p id="29">To shut down Zookeeper, use the zookeeper-server-stop.bat script, as shown below:</p> <codesnippet>bin\windows\</codesnippet> <codesnippet>zookeeper-server-stop.bat</codesnippet> <p id="30">To shut down Kafka, you use the kafka-server-stop.bat script as shown below:</p> <codesnippet> <font>bin\windows\</font> </codesnippet> <codesnippet> <font>kafka-server-stop.bat</font> </codesnippet> <h2>Implementing the Outbox Pattern in ASP.NET Core, Kafka, and C#</h2> <p id="31">In this section, I’ll examine how to implement the outbox pattern in an ASP.NET Core application using Kafka. In this example, you’ll use the following interfaces and classes.</p> <list type="bulleted"> <bulletedlist> <b>OutboxMessage</b> <b>:</b> This is the model class</bulletedlist> <bulletedlist> <b>IKafkaProducer</b> <b>:</b> This is the interface for the KafkaProducer class.</bulletedlist> <bulletedlist> <b>IKafkaConsumer</b> <b>:</b> This is the interface for the KafkaConsumer class.</bulletedlist> <bulletedlist> <b>IOrderService:</b> This interface contains the declaration of methods supported by the OrderService.</bulletedlist> <bulletedlist> <b>IOutboxMessageRepository</b> <b>:</b> This represents the interface for the OutboxMessageRepository class.</bulletedlist> <bulletedlist> <b>KafkaProducer</b> <b>:</b> This class is used to send messages to Kafka. </bulletedlist> <bulletedlist> <b>KafkaConsumer</b> <b>:</b> This class is used to consume messages in Kafka.</bulletedlist> <bulletedlist> <b>OrderService</b> <b>:</b> The OrderService class implements the IOrderService interface and encapsulates the logic for invoking the appropriate methods of the OrderMessageRepository.</bulletedlist> <bulletedlist> <b>KafkaMessageProcessor</b> <b>:</b> This is a background service that calls KafkaConsumer at regular intervals of time to consume messages residing in Kafka.</bulletedlist> <bulletedlist> <b>OutboxMessageProcessor</b> <b>:</b> This is yet another background service used to invoke KafkaProducer to transmit messages to Kafka.</bulletedlist> <bulletedlist> <b>OutboxMessageRepository</b>: This class contains methods to retrieve messages from the Outbox database table and also update a message as needed.</bulletedlist> <bulletedlist> <b>ApplicationDbContext</b> <b>:</b> This class represents the data context that acts as a gateway to connect to the underlying database being used by the application. </bulletedlist> <bulletedlist> <b>OrderController</b> <b>:</b> This represents the Order API that can be consumed by clients to retrieve existing orders or create new orders.</bulletedlist> </list> <h3>Create a New ASP.NET Core 8 Project in Visual Studio 2022</h3> <p id="32">You can create a project in Visual Studio 2022 in several ways such as, from the Visual Studio 2022 Developer Command Prompt or by launching the Visual Studio 2022 IDE. When you launch Visual Studio 2022, you'll see the Start window. You can choose "Continue without code" to launch the main screen of the Visual Studio 2022 IDE.</p> <p id="33">Now that you know the basics, let’s start setting up the project. To create a new ASP.NET Core 8 Project in Visual Studio 2022:</p> <list type="numbered"> <numberedlist>Start the Visual Studio 2022 IDE.</numberedlist> <numberedlist>In the "Create a new project" window, select "ASP.NET Core Web API" and click Next to move on.</numberedlist> <numberedlist>Specify the project name as Outbox_Pattern_Demo and the path where it should be created in the "Configure your new project" window.</numberedlist> <numberedlist>If you want the solution file and project to be created in the same directory, you can optionally check the "Place solution and project in the same directory" checkbox. Click Next to move on.</numberedlist> <numberedlist>In the next screen, specify the target framework and authentication type as well. Ensure that the "Configure for HTTPS," "Enable Docker Support," "Do not use top-level statements", and the "Enable OpenAPI support" checkboxes are unchecked because you won’t use any of these in this example.</numberedlist> <numberedlist>Remember to leave the <b>Use controllers </b>checkbox checked because you won’t use minimal API in this example.</numberedlist> <numberedlist>Click Create to complete the process.</numberedlist> </list> <p id="34">A new ASP.NET Core Web API project is created. You’ll use this project to implement the outbox pattern using Kafka in ASP.NET Core and C#.</p> <h3>Create the Outbox Database Table</h3> <p id="35">First off, create the Outbox database table to store messages. To do this, create a new database called Outbox_Pattern_Demo using the database script given below:</p> <codesnippet>CREATE»DATABASE»Outbox_Pattern_Demo;</codesnippet> <p id="36">Next, create a new database table called OutboxMessage using the following database script.</p> <codesnippet> <font color="Blue">CREATE</font>»<font color="Blue">TABLE</font>»Outbox_Message</codesnippet> <codesnippet>(</codesnippet> <codesnippet>»»»Event_Id»<font color="#A31515">BIGINT</font>»<font color="Blue">IDENTITY</font>»<font color="Blue">PRIMARY</font>»KEY,</codesnippet> <codesnippet>»»»Event_Payload»NVARCHAR(MAX)»<font color="Blue">NOT</font>»<font color="Blue">NULL</font>,</codesnippet> <codesnippet>»»»Event_Date»DATETIME»<font color="Blue">NOT</font>»<font color="Blue">NULL</font>»»»</codesnippet> <codesnippet>»»»<font color="Blue">DEFAULT</font>»<font color="Blue">CURRENT_TIMESTAMP</font>,</codesnippet> <codesnippet>»»»IsMessageDispatched»BIT»<font color="Blue">NOT</font>»<font color="Blue">NULL</font></codesnippet> <codesnippet>);</codesnippet> <p id="37">Create another database table in the same database named Order using the following script.</p> <codesnippet>CREATE»TABLE»[Order]</codesnippet> <codesnippet>(</codesnippet> <codesnippet>»»»»Order_Id»<font color="#A31515">BIGINT</font>»IDENTITY»PRIMARY»KEY,</codesnippet> <codesnippet>»»»»Customer_Id»<font color="#A31515">INT</font>»NOT»NULL,</codesnippet> <codesnippet>»»»»Order_Date»DATETIME,</codesnippet> <codesnippet>»»»»Order_Amount»MONEY»NOT»NULL</codesnippet> <codesnippet>);</codesnippet> <h3>Install NuGet Package(s)</h3> <p id="38">In this example, you’ll take advantage of Entity Framework to interact with the database and perform CRUD (an acronym for Create Read Update Delete) operations. You’ll also need a Kafka client package to produce and consume messages. To work with Apache Kafka, you’ll use the Confluent.Kafka NuGet package. To install the required package into your project, right-click on the solution and then select Manage NuGet Packages for Solution…. Now search for the Microsoft.EntityFramework.SqlServer and Confluent.Kafka packages in the search box and install it. Alternatively, you can type the commands shown below at the NuGet Package Manager Command Prompt:</p> <codesnippet>PM>»Install-Package</codesnippet> <codesnippet>Microsoft.EntityFrameworkCore.SqlServer</codesnippet> <codesnippet>Install-Package»Confluent.Kafka</codesnippet> <p id="39">You can also install this package by executing the following commands at the Windows Shell:</p> <codesnippet>dotnet»add»package</codesnippet> <codesnippet>Microsoft.EntityFrameworkCore.SqlServer</codesnippet> <codesnippet>dotnet»add»package»Confluent.Kafka</codesnippet> <h3>Create the OutboxMessage Class</h3> <codesnippet> <font color="Blue">public</font>»<font color="Blue">class</font>»OutboxMessage</codesnippet> <codesnippet>{</codesnippet> <codesnippet>»»<font color="Blue">public</font>»<font color="Blue">int</font>»Event_Id»{»<font color="Blue">get</font>;»<font color="Blue">set</font>;»}</codesnippet> <codesnippet>»»<font color="Blue">public</font>»<font color="Blue">string</font>»Event_Payload»{»<font color="Blue">get</font>;»<font color="Blue">set</font>;»}</codesnippet> <codesnippet>»»<font color="Blue">public</font>»DateTime»Event_Date»{»<font color="Blue">get</font>;»<font color="Blue">set</font>;»}</codesnippet> <codesnippet>»»<font color="Blue">public</font>»<font color="Blue">bool</font>»IsMessageDispatched»{»<font color="Blue">get</font>;»<font color="Blue">set</font>;»}</codesnippet> <codesnippet>}</codesnippet> <h3>Create the Order Model Class</h3> <codesnippet>public»class»<font color="#A31515">Order</font></codesnippet> <codesnippet>{</codesnippet> <codesnippet>»»»public»long»Order_Id»{»get;»set;»}</codesnippet> <codesnippet>»»»public»int»Customer_Id»{»get;»set;»}</codesnippet> <codesnippet>»»»public»DateTime»Order_Date»{»get;»set;»}</codesnippet> <codesnippet>»»»public»decimal»Amount»{»get;»set;»}</codesnippet> <codesnippet>}</codesnippet> <codesnippet> <font color="Blue">public</font>»<font color="Blue">class</font>»ApplicationDbContext»:»</codesnippet> <codesnippet>»»»»»»»»»»»»»DbContext</codesnippet> <codesnippet>{</codesnippet> <codesnippet>»»»»<font color="Blue">public</font>»ApplicationDbContext</codesnippet> <codesnippet>»»»(DbContextOptions</codesnippet> <codesnippet>»»»<ApplicationDbContext>»</codesnippet> <codesnippet>»»»»options)»:»base(options)</codesnippet> <codesnippet>»»»»{</codesnippet> <codesnippet>»»»»}</codesnippet> <codesnippet>»»»»<font color="Blue">protected</font>»<font color="Blue">override</font>»<font color="Blue">void</font>»</codesnippet> <codesnippet>»»»»OnModelCreating</codesnippet> <codesnippet>»»»»(ModelBuilder»modelBuilder)</codesnippet> <codesnippet>»»»»{</codesnippet> <codesnippet>»»»»»»»»<font color="Blue">base</font>.OnModelCreating(modelBuilder);</codesnippet> <codesnippet>»»»»}</codesnippet> <codesnippet>»»»»<font color="Blue">public</font>»DbSet<OutboxMessage>»</codesnippet> <codesnippet>»»»»OutboxEvents»{»<font color="Blue">get</font>;»<font color="Blue">set</font>;»}</codesnippet> <codesnippet>»»»»<font color="Blue">public</font>»DbSet<Order>»</codesnippet> <codesnippet>»»»»Orders»{»<font color="Blue">get</font>;»<font color="Blue">set</font>;»}</codesnippet> <codesnippet>}</codesnippet> <codesnippet>protected»override»void»</codesnippet> <codesnippet> <font color="#A31515">OnModelCreating</font> </codesnippet> <codesnippet>(<font color="#A31515">ModelBuilder</font>»modelBuilder)</codesnippet> <codesnippet>{</codesnippet> <codesnippet>»»»»modelBuilder.Entity<<font color="#A31515">Order</font>>(entity»=></codesnippet> <codesnippet>»»»»{</codesnippet> <codesnippet>»»»»»»»»entity.<font color="#A31515">ToTable</font>(<font color="#A31515">"Order"</font>);</codesnippet> <codesnippet>»»»»»»»»<font>entity.</font><font color="#A31515">HasKey</font><font>(e»=>»e.Order_Id);</font></codesnippet> <codesnippet> <font>»»»»});</font> </codesnippet> <codesnippet> <font>»»»»modelBuilder.Entity<</font> <font color="#A31515">OutboxMessage</font> <font>></font> </codesnippet> <codesnippet> <font>»»»»(entity»=></font> </codesnippet> <codesnippet> <font>»»»»{</font> </codesnippet> <codesnippet> <font>»»»»»»»»entity.</font> <font color="#A31515">ToTable</font> <font>(</font> <font color="#A31515">"OutboxMessage"</font> <font>);</font> </codesnippet> <codesnippet> <font>»»»»»»»»entity.</font> <font color="#A31515">HasKey</font> <font>(e»=>»e.Event_Id);</font> </codesnippet> <codesnippet> <font>»»»»});</font> </codesnippet> <codesnippet> <font>»»»»</font>base.<font color="#A31515">OnModelCreating</font>(modelBuilder);</codesnippet> <codesnippet>}</codesnippet> <codesnippet>public»interface»<font color="#A31515">IOrderService</font></codesnippet> <codesnippet>{</codesnippet> <codesnippet>»»»public»Task<List<Order>>»</codesnippet> <codesnippet>»»»<font color="#A31515">GetAllOrdersAsync</font>();</codesnippet> <codesnippet>»»»public»Task<Order>»</codesnippet> <codesnippet>»»»<font color="#A31515">GetOrderAsync</font>(<font color="#A31515">int</font>»Id);</codesnippet> <codesnippet>»»»public»Task»</codesnippet> <codesnippet>»»»<font color="#A31515">CreateOrderAsync</font>(Order»order);</codesnippet> <codesnippet>}</codesnippet> <codesnippet>public»async»Task»</codesnippet> <codesnippet> <font color="#A31515">CreateOrderAsync</font>(Order»order)</codesnippet> <codesnippet>{</codesnippet> <codesnippet>using»var»transaction»=</codesnippet> <codesnippet>»_context.Database.BeginTransaction();</codesnippet> <codesnippet>»»»»try</codesnippet> <codesnippet>»»»»{</codesnippet> <codesnippet>»»»»»»»»_context.Orders.Add(order);</codesnippet> <codesnippet>»»»»»»»»await»_context.SaveChangesAsync();</codesnippet> <codesnippet>»»»»»»»»var»outboxMessage»=»new»OutboxMessage</codesnippet> <codesnippet>»»»»»»»»{</codesnippet> <codesnippet>»»»»»»»»»»»»Event_Payload»=</codesnippet> <codesnippet>»»»»»»»»»»»»JsonSerializer.Serialize(order),</codesnippet> <codesnippet>»»»»»»»»»»»»Event_Date»=»DateTime.Now,</codesnippet> <codesnippet>»»»»»»»»»»»»IsMessageDispatched»=»<font color="#A31515">false</font></codesnippet> <codesnippet>»»»»»»»»<font>};</font></codesnippet> <codesnippet> <font>»»»»»»»»_context.OutboxMessages.</font> </codesnippet> <codesnippet> <font>»»»»»»»»Add(outboxMessage);</font> </codesnippet> <codesnippet> <font>»»»»»»»»await»_context.SaveChangesAsync();</font> </codesnippet> <codesnippet> <font>»»»»»»»»await»transaction.CommitAsync();</font> </codesnippet> <codesnippet> <font>»»»»}</font> </codesnippet> <codesnippet> <font>»»»»catch</font> </codesnippet> <codesnippet> <font>»»»»{</font> </codesnippet> <codesnippet> <font>»»»»»»»»await»transaction.RollbackAsync();</font> </codesnippet> <codesnippet> <font>»»»»»»»»</font>throw;</codesnippet> <codesnippet>»»»»}</codesnippet> <codesnippet>}</codesnippet> <h3>Create the OutboxMessageProcessor Background Service</h3> <codesnippet>public»interface»<font color="#A31515">IHostedService</font></codesnippet> <codesnippet>{</codesnippet> <codesnippet>»»»»Task»<font color="#A31515">StartAsync</font></codesnippet> <codesnippet>»»»»(CancellationToken»cancellationToken);</codesnippet> <codesnippet>»»»»Task»<font color="#A31515">StopAsync</font></codesnippet> <codesnippet>»»»»(CancellationToken»cancellationToken);</codesnippet> <codesnippet>}</codesnippet> <codesnippet>public»abstract»class»</codesnippet> <codesnippet> <font color="#A31515">BackgroundService</font>»:»»</codesnippet> <codesnippet> <font color="#A31515">IHostedService</font>,»</codesnippet> <codesnippet> <font color="#A31515">IDisposable</font> </codesnippet> <codesnippet>{</codesnippet> <codesnippet>»»»»public»virtual»void»<font color="#A31515">Dispose</font>();</codesnippet> <codesnippet>»»»»public»virtual»Task»</codesnippet> <codesnippet>»»»»<font color="#A31515">StartAsync</font>(CancellationToken»</codesnippet> <codesnippet>»»»»cancellationToken);</codesnippet> <codesnippet>»»»»public»virtual»Task»</codesnippet> <codesnippet>»»»»<font color="#A31515">StopAsync</font>(CancellationToken»</codesnippet> <codesnippet>»»»»cancellationToken);</codesnippet> <codesnippet>»»»»protected»abstract»Task»</codesnippet> <codesnippet>»»»»<font color="#A31515">ExecuteAsync</font>(CancellationToken»</codesnippet> <codesnippet>»»»»stoppingToken);</codesnippet> <codesnippet>}</codesnippet> <codesnippet> <font color="Blue">public</font>»<font color="Blue">class</font>»OutboxMessageProcessor»:»</codesnippet> <codesnippet>BackgroundService</codesnippet> <codesnippet>{»»»»»»»»</codesnippet> <codesnippet>»»»<font color="Blue">protected</font>»<font color="Blue">override</font>»<font color="Blue">async</font>»Task»</codesnippet> <codesnippet>»»»ExecuteAsync(CancellationToken»</codesnippet> <codesnippet>»»»cancellationToken)</codesnippet> <codesnippet>»»»{</codesnippet> <codesnippet>»»»»»»»»»»»»</codesnippet> <codesnippet>»»»}»»»»»»»»</codesnippet> <codesnippet>}</codesnippet> <codesnippet>private»async»Task»</codesnippet> <codesnippet> <font color="#A31515">PublishOutboxMessagesAsync</font> </codesnippet> <codesnippet>(CancellationToken»cancellationToken)</codesnippet> <codesnippet>{</codesnippet> <codesnippet>»»»»try</codesnippet> <codesnippet>»»»»{</codesnippet> <codesnippet>»»»»»»»»using»var»scope»=»</codesnippet> <codesnippet>»»»»»»»_scopeFactory.CreateScope();</codesnippet> <codesnippet>»»»»»»»»await»using»var»_dbContext»=»</codesnippet> <codesnippet>»»»»»»»»scope.ServiceProvider.</codesnippet> <codesnippet>»»»»»»»»GetRequiredService</codesnippet> <codesnippet>»»»»»»»»<ApplicationDbContext>();</codesnippet> <codesnippet>»»»»»»»»List<OutboxMessage>»messages»=»</codesnippet> <codesnippet>»»»»»»»»_dbContext.OutboxMessages.Where(om»=>»</codesnippet> <codesnippet>»»»»»»»»om.IsMessageDispatched»</codesnippet> <codesnippet>»»»»»»»»!=»<font color="#A31515">false</font>).ToList();</codesnippet> <codesnippet>»»»»»»»»foreach»(OutboxMessage»outboxMessage»</codesnippet> <codesnippet>»»»»»»»»in»messages)</codesnippet> <codesnippet>»»»»»»»»{</codesnippet> <codesnippet>»»»»»»»»»»»»try</codesnippet> <codesnippet>»»»»»»»»»»»»{</codesnippet> <codesnippet>»»»»»»»»»»»»»»»»await»_producer.</codesnippet> <codesnippet>»»»»»»»»»»»»»»»»SendMessageToKafkaAsync</codesnippet> <codesnippet>»»»»»»»»»»»»»»»»(outboxMessage);</codesnippet> <codesnippet>»»»»»»»»»»»»»»»»outboxMessage.</codesnippet> <codesnippet>»»»»»»»»»»»»»»»»IsMessageDispatched»=»<font color="#A31515">true</font>;</codesnippet> <codesnippet>»»»»»»»»»»»»»»»»outboxMessage.Event_Date»=»</codesnippet> <codesnippet>»»»»»»»»»»»»»»»»DateTime.UtcNow;</codesnippet> <codesnippet>»»»»»»»»»»»»»»»»_dbContext.OutboxMessages.</codesnippet> <codesnippet>»»»»»»»»»»»»»»»»Update(outboxMessage);</codesnippet> <codesnippet>»»»»»»»»»»»»»»»»await»_dbContext.</codesnippet> <codesnippet>»»»»»»»»»»»»»»»»SaveChangesAsync();</codesnippet> <codesnippet>»»»»»»»»»»»»}</codesnippet> <codesnippet>»»»»»»»»»»»»catch»(Exception»e)</codesnippet> <codesnippet>»»»»»»»»»»»»{</codesnippet> <codesnippet>»»»»»»»»»»»»»»»»Console.WriteLine(e);</codesnippet> <codesnippet>»»»»»»»»»»»»}</codesnippet> <codesnippet>»»»»»»»»}</codesnippet> <codesnippet>»»»»}</codesnippet> <codesnippet>»»»»catch</codesnippet> <codesnippet>»»»»{</codesnippet> <codesnippet>»»»»»»»»throw;</codesnippet> <codesnippet>»»»»}</codesnippet> <codesnippet>»»»»await»Task.Delay</codesnippet> <codesnippet>»»»»(TimeSpan.FromSeconds(5),»</codesnippet> <codesnippet>»»»»cancellationToken);</codesnippet> <codesnippet>}</codesnippet> <h3>Create the OutboxMessageRepository</h3> <codesnippet> <font color="Blue">namespace</font>»Outbox_Pattern_Demo</codesnippet> <codesnippet>{</codesnippet> <codesnippet>»»»»<font color="Blue">public</font>»<font color="Blue">interface</font>»</codesnippet> <codesnippet>»»»»IOutboxMessageRepository</codesnippet> <codesnippet>»»»»{</codesnippet> <codesnippet>»»»»»»»»Task<IreadOnlyCollection</codesnippet> <codesnippet>»»»»»»»»<OutboxMessage>>»</codesnippet> <codesnippet>»»»»»»»»GetUnsentMessagesAsync();</codesnippet> <codesnippet>»»»»»»»»Task<IreadOnlyCollection</codesnippet> <codesnippet>»»»»»»»»<OutboxMessage>></codesnippet> <codesnippet>»»»»»»»»GetMessagesByIdsAsync</codesnippet> <codesnippet>»»»»»»»»(IEnumerable<<font color="Blue">int</font>>»ids);</codesnippet> <codesnippet>»»»»»»»»Task»UpdateAsync</codesnippet> <codesnippet>»»»»»»»»(OutboxMessage»message,»</codesnippet> <codesnippet>»»»»»»»»<font color="Blue">bool</font>»status);</codesnippet> <codesnippet>»»»»};</codesnippet> <codesnippet>}</codesnippet> <p id="40">The complete source code of the OutboxMessageRepository class is given in <b>Listing 4</b>.</p> <p id="41">listi</p> <codesnippet> <font color="#2B91AF">[HttpPost(</font>"CreateOrder"<font color="#2B91AF">)]</font></codesnippet> <codesnippet>public»async»Task<IActionResult>»</codesnippet> <codesnippet> <font color="Blue">CreateOrder</font>([FromBody]»Order»order)</codesnippet> <codesnippet>{</codesnippet> <codesnippet>»»»»await»_orderService.</codesnippet> <codesnippet>»»»»<font color="Blue">CreateOrderAsync</font>(order);</codesnippet> <codesnippet>»»»»return»<font color="Blue">Ok</font>();</codesnippet> <codesnippet>}</codesnippet> <h3>Send Messages to Apache Kafka</h3> <p id="42">Create an interface named IKafkaProducer in a file named IKafkaProducerr.cs and write the following code in there:</p> <codesnippet> <font color="Blue">public»interface»IKafkaProducer</font> </codesnippet> <codesnippet> <font color="Blue">{</font> </codesnippet> <codesnippet> <font color="Blue">»»»Task»SendMessageToKafkaAsync</font> </codesnippet> <codesnippet> <font color="Blue">»»»(OutboxMessage»message);»</font> </codesnippet> <codesnippet> <font color="Blue">};</font> </codesnippet> <p id="43">Next, create a new class named KafkaProducer in a file having the same name with a .cs extension and write the code given in <b>Listing 6</b> in there.</p> <p id="44">In addition, you should register the instances of type ImessageRepository and IKafkaProducer with the service collection, as shown below: </p> <codesnippet>builder.Services.AddScoped</codesnippet> <codesnippet><IOutboxMessageRepository,»</codesnippet> <codesnippet>OutboxMessageRepository>();</codesnippet> <codesnippet>builder.Services.AddScoped</codesnippet> <codesnippet><IKafkaProducer,»KafkaProducer>();</codesnippet> <p id="45">You should also register the hosted service in the Program.cs file, as shown in the code snippet given below:</p> <codesnippet>services.AddSingleton</codesnippet> <codesnippet><IHostedService,»</codesnippet> <codesnippet>OutboxMessageProcessor>();</codesnippet> <h3>Sequence Diagram of the Complete Flow</h3> <p id="46">The sequence diagram of the complete flow is shown in <b>Figure 8</b>.</p> <figure id="8" src="image8.png"> <b>Figure </b> <b>8</b> <b>:</b> A sequence diagram of the complete flow</figure> <h2>Create the Message Consumer Application</h2> <p id="47">Let’s create a message consumer to consume the messages stored in Kafka. To do this, create another ASP.NET Core application by following the steps mentioned earlier in this article. Create a new interface named IKafkaConsumer and write the following code in there:</p> <codesnippet> <font color="Blue">namespace</font>»Outbox_Pattern_Demo</codesnippet> <codesnippet>{</codesnippet> <codesnippet>»»»»<font color="Blue">public</font>»<font color="Blue">interface</font>»IKafkaConsumer</codesnippet> <codesnippet>»»»»{</codesnippet> <codesnippet>»»»»»»»»<font color="Blue">public</font>»Task»ConsumeMessagesAsync</codesnippet> <codesnippet>»»»»»»»»(CancellationToken»cancellationToken);</codesnippet> <codesnippet>»»»»}</codesnippet> <codesnippet>}</codesnippet> <p id="48">Next, create a new class named KafkaConsumer that implements the IKafkaConsumer interface, as shown in <b>Listing 8</b>. You now need a service that runs in the background and consumes the messages residing in Kafka. To do this, create a class named KafkaMessageProcessor that extends the BackgroundService class, as shown in <b>Listing 9</b>.</p> <p id="49">Register the instance of type IKafkaConsumer as a scoped service and the instance of KafkaMessageProcessor as a singleton hosted service.</p> <codesnippet>builder.Services.AddScoped</codesnippet> <codesnippet><IKafkaConsumer,»KafkaConsumer>();</codesnippet> <codesnippet>builder.Services.AddSingleton</codesnippet> <codesnippet><IHostedService,»KafkaMessageProcessor>();</codesnippet> <p id="50"> <b>Figure </b> <b>9</b> illustrates the complete flow visually. </p> <figure id="9" src="image9.png"> <b>Figure </b> <b>9</b> <b>:</b> <b> </b>The complete flow of the consumer application</figure> <p id="51"> <b>Listing</b> <b> </b> <b>10</b> shows the Program.cs file of the consumer application.</p> <h2>Execute the Application</h2> <p id="52">Run the producer and the consumer applications separately because they’re part of different solutions. You should also set appropriate breakpoints in the source code of both applications so that you can debug them. To run the application, follow the steps outlined below:</p> <list type="numbered"> <numberedlist>Start ZooKeeper in a terminal window.</numberedlist> <numberedlist>Start Kafka in another terminal window.</numberedlist> <numberedlist>Execute the producer application.</numberedlist> <numberedlist>Execute the consumer application.</numberedlist> <numberedlist>Launch the Postman Http Debugger tool.</numberedlist> <numberedlist>Send an HTTP POST request to the producer API using Postman.</numberedlist> </list> <p id="53"> <b>Figure </b> <b>10 </b>shows how you can invoke the API endpoint in Postman.</p> <figure id="10" src="image10.png"> <b>Figure </b> <b>1</b> <b>0</b> <b>:</b> Invoking the API endpoint in Postman</figure> <h2>Real-World Use Cases for the Outbox Pattern</h2> <p id="54">The outbox pattern has several use cases in real-life: real-time notifications and data synchronization. The next two sections discuss these. </p> <h3>Real-Time Notifications</h3> <p id="55">You might often want real-time notifications in your application to send instant alerts, messages, etc. You can use the outbox pattern to implement notifications in your application in real-time. These events can then be processed asynchronously, thereby triggering real-time notifications to the logged-in users of the application.</p> <h3>Data Synchronization</h3> <p id="56">Real-time data synchronization is yet another use case where the outbox pattern fits in. For example, in an e-commerce application when a customer places an order, both the order and the stock database tables are updated simultaneously in the same transaction. Additionally, the order, product, and cart data must be synchronized across all devices in use using asynchronous processing.</p> <h2>Alternatives to the Transactional Outbox Pattern</h2> <p id="57">Although the outbox pattern is a popular strategy for handling distributed transactions efficiently and ensuring reliable and consistent communication between microservices, there are certain downsides to using it as well. I’ve discussed them earlier in this article. The Two-Phase Commit and the Saga Pattern are two popular alternatives to this strategy.</p> <h3>Two-Phase Commit</h3> <p id="58">This approach is used for performing an atomic transaction across multiple resources. Keep in mind that there are two phases in this strategy. While the transaction coordinator notifies all other resources about its desire to execute a transaction in the first phase, it instructs all resources to execute this operation in the next phase. The major drawback of such an approach is that if any resource fails or there are network issues, the whole transaction stops and locks are applied on resources.</p> <h3>Saga Pattern</h3> <p id="59">This approach splits the transactions into multiple steps and completes each step as a separate transaction. This particular design is best suited for long-running transactions or those that take place between microservices. However, when you adopt this approach, it can be difficult to manage which transactions must be rolled back in the event of an error in your application. This is because the transaction timing and sequence for such transactions can be difficult to comprehend.</p> <h2>Where Do We Go from Here?</h2> <p id="60">The outbox pattern, a reliable pattern for consistent event publishing in microservices-based applications, is a great way to make your microservices apps more stable and scalable. It helps separate services through which the events are successfully delivered and isolates business operations from publishing events. However, it also has certain downsides, which explains why you should understand these constraints before deciding on your application's design. I’ll discuss more on the Outbox Pattern in a future article.</p> </body> <sidebars> <sidebar title="Sponsored Sidebar " /> <sidebar title="Ready to Modernize a Legacy App?"> <p id="61">Need advice on migrating yesterday’s <b>legacy applications</b> to today’s modern platforms? </p> <p id="62">Take advantage of <b>CODE Consulting’s</b> years of experience and contact us today to schedule a <b>FREE</b> consulting call to discuss your options. </p> <p id="63"> <b>No strings. No commitment. </b> </p> <p id="64"> <b>For more information</b> <b>,</b> see www.codemag.com/consulting or email us at info@codemag.com.</p> </sidebar> </sidebars> <tables /> <codelistings> <codelisting id="1" header="Listing 1: The ApplicationDbContext class"> <code> <font color="Blue">using</font>»Microsoft.EntityFrameworkCore;</code> <code /> <code> <font color="Blue">namespace</font>»<font color="#A31515">Outbox_Pattern_Demo</font></code> <code>{</code> <code>»»»»<font color="Blue">public</font>»<font color="Blue">class</font>»<font color="#A31515">ApplicationDbContext</font>»:»<font color="#A31515">DbContext</font></code> <code>»»»»{</code> <code>»»»»»»»»<font color="Blue">public</font>»<font color="#A31515">ApplicationDbContext</font>(DbContextOptions</code> <code>»»»»»»»»<ApplicationDbContext>»options)»:»<font color="#A31515">base</font>(options)</code> <code>»»»»»»»»{</code> <code>»»»»»»»»}</code> <code /> <code>»»»»»»»»<font color="Blue">protected</font>»<font color="Blue">override</font>»<font color="Blue">void</font>»<font color="#A31515">OnModelCreating</font></code> <code>»»»»»»»»(ModelBuilder»modelBuilder)</code> <code>»»»»»»»»{</code> <code>»»»»»»»»»»»»modelBuilder.Entity<Order>(entity»=></code> <code>»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»entity.ToTable(<font color="#A31515">"Order"</font>);</code> <code>»»»»»»»»»»»»»»»»entity.HasKey(e»=>»e.Order_Id);</code> <code>»»»»»»»»»»»»});</code> <code /> <code>»»»»»»»»»»»»modelBuilder.Entity</code> <code>»»»»»»»»»»»»<OutboxMessage>(entity»=></code> <code>»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»entity.ToTable(<font color="#A31515">"OutboxMessage"</font>);</code> <code>»»»»»»»»»»»»»»»»entity.HasKey(e»=>»e.Event_Id);</code> <code>»»»»»»»»»»»»});</code> <code /> <code>»»»»»»»»»»»»<font color="Blue">base</font>.OnModelCreating(modelBuilder);</code> <code>»»»»»»»»}</code> <code /> <code>»»»»»»»»<font color="Blue">public</font>»DbSet<OutboxMessage>»</code> <code>»»»»»»»»OutboxMessages»{»<font color="Blue">get</font>;»<font color="Blue">set</font>;»}</code> <code>»»»»»»»»<font color="Blue">public</font>»DbSet<Order>»</code> <code>»»»»»»»»Orders»{»<font color="Blue">get</font>;»<font color="Blue">set</font>;»}</code> <code>»»»»}</code> <code>}</code> </codelisting> <codelisting id="2" header="Listing 2: The OrderService Class"> <code> <font color="Blue">namespace</font>»Outbox_Pattern_Demo</code> <code>{</code> <code>»»»<font color="Blue">using</font>»System.Text.Json;</code> <code /> <code>»»»»<font color="Blue">public</font>»<font color="Blue">class</font>»OrderService»:»IOrderService</code> <code>»»»»{</code> <code>»»»»»»»»<font color="Blue">private</font>»<font color="Blue">readonly</font>»ApplicationDbContext»_context;</code> <code /> <code>»»»»»»»»<font color="Blue">public</font>»OrderService(ApplicationDbContext»context)</code> <code>»»»»»»»»{</code> <code>»»»»»»»»»»»»_context»=»context;</code> <code>»»»»»»»»}</code> <code /> <code>»»»»»»»»<font color="Blue">public</font>»<font color="Blue">async</font>»Task<List<Order>>»GetAllOrdersAsync()</code> <code>»»»»»»»»{</code> <code>»»»»»»»»»»»»<font color="Blue">return</font>»<font color="Blue">await</font>»Task.FromResult</code> <code>»»»»»»»»»»»»(_context.Orders.</code> <code>»»»»»»»»»»»»ToList<Order>());</code> <code>»»»»»»»»}</code> <code /> <code>»»»»»»»»<font color="Blue">public</font>»<font color="Blue">async</font>»Task<Order>»</code> <code>»»»»»»»»GetOrderAsync(<font color="Blue">int</font>»Id)</code> <code>»»»»»»»»{</code> <code>»»»»»»»»»»»»<font color="Blue">return</font>»<font color="Blue">await</font>»Task.FromResult</code> <code>»»»»»»»»»»»»(_context.Orders.FirstOrDefault</code> <code>»»»»»»»»»»»»(x»=>»x.Order_Id»==»Id));</code> <code>»»»»»»»»}</code> <code /> <code>»»»»»»»»<font color="Blue">public</font>»<font color="Blue">async</font>»Task»CreateOrderAsync(Order»order)</code> <code>»»»»»»»»{</code> <code>»»»»»»»»»»»»<font color="Blue">using</font>»<font color="Blue">var</font>»transaction»=»</code> <code>»»»»»»»»»»»»_context.Database.BeginTransaction();</code> <code /> <code>»»»»»»»»»»»»<font color="Blue">try</font></code> <code>»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»_context.Orders.Add(order);</code> <code>»»»»»»»»»»»»»»»»<font color="Blue">await</font>»_context.SaveChangesAsync();</code> <code /> <code>»»»»»»»»»»»»»»»»<font color="Blue">var</font>»outboxMessage»=»<font color="Blue">new</font>»OutboxMessage</code> <code>»»»»»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»»»»»Event_Payload»=»</code> <code>»»»»»»»»»»»»»»»»»»»»JsonSerializer.Serialize(order),</code> <code>»»»»»»»»»»»»»»»»»»»»Event_Date»=»DateTime.Now,</code> <code>»»»»»»»»»»»»»»»»»»»»IsMessageDispatched»=»false</code> <code>»»»»»»»»»»»»»»»»<font>};</font></code> <code /> <code> <font>»»»»»»»»»»»»»»»»_context.OutboxMessages.</font> </code> <code> <font>»»»»»»»»»»»»»»»»Add(outboxMessage);</font> </code> <code /> <code> <font>»»»»»»»»»»»»»»»»</font> <font color="Blue">await</font> <font>»_context.SaveChangesAsync();</font> </code> <code> <font>»»»»»»»»»»»»»»»»</font> <font color="Blue">await</font> <font>»transaction.CommitAsync();</font> </code> <code> <font>»»»»»»»»»»»»}</font> </code> <code> <font>»»»»»»»»»»»»</font> <font color="Blue">catch</font> </code> <code> <font>»»»»»»»»»»»»{</font> </code> <code> <font>»»»»»»»»»»»»»»»»</font> <font color="Blue">await</font> <font>»transaction.RollbackAsync();</font> </code> <code> <font>»»»»»»»»»»»»»»»»</font> <font color="Blue">throw</font>;</code> <code>»»»»»»»»»»»»}</code> <code>»»»»»»»»}</code> <code>»»»»}</code> <code>}</code> </codelisting> <codelisting id="3" header="Listing 3: The OutboxMessageProcessor class"> <code> <font color="Blue">namespace</font>»Outbox_Pattern_Demo</code> <code>{</code> <code>»»»»<font color="Blue">public</font>»<font color="Blue">class</font>»OutboxMessageProcessor»:»BackgroundService</code> <code>»»»»{</code> <code>»»»»»»»»<font color="Blue">private</font>»<font color="Blue">readonly</font>»IServiceScopeFactory»_scopeFactory;</code> <code>»»»»»»»»<font color="Blue">private</font>»<font color="Blue">readonly</font>»IKafkaProducer»_producer;</code> <code /> <code>»»»»»»»»<font color="Blue">public</font>»OutboxMessageProcessor</code> <code>»»»»»»»»(IServiceScopeFactory»scopeFactory,»</code> <code>»»»»»»»»IKafkaProducer»producer)</code> <code>»»»»»»»»{</code> <code>»»»»»»»»»»»»_scopeFactory»=»scopeFactory;</code> <code>»»»»»»»»»»»»_producer»=»producer;</code> <code>»»»»»»»»}</code> <code /> <code>»»»»»»»»<font color="Blue">protected</font>»<font color="Blue">override</font>»<font color="Blue">async</font>»Task»ExecuteAsync</code> <code>»»»»»»»»(CancellationToken»cancellationToken)</code> <code>»»»»»»»»{</code> <code>»»»»»»»»»»»»<font color="Blue">while</font>»(!cancellationToken.IsCancellationRequested)</code> <code>»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»<font color="Blue">await</font>»PublishOutboxMessagesAsync</code> <code>»»»»»»»»»»»»»»»»(cancellationToken);</code> <code>»»»»»»»»»»»»}</code> <code>»»»»»»»»}</code> <code /> <code>»»»»»»»»<font color="Blue">private</font>»<font color="Blue">async</font>»Task»PublishOutboxMessagesAsync</code> <code>»»»»»»»»(CancellationToken»cancellationToken)</code> <code>»»»»»»»»{</code> <code>»»»»»»»»»»»»<font color="Blue">try</font></code> <code>»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»<font color="Blue">using</font>»<font color="Blue">var</font>»scope»=»_scopeFactory.CreateScope();</code> <code>»»»»»»»»»»»»»»»»<font color="Blue">await</font>»<font color="Blue">using</font>»<font color="Blue">var</font>»_dbContext»=»</code> <code>»»»»»»»»»»»»»»»»scope.ServiceProvider.GetRequiredService</code> <code>»»»»»»»»»»»»»»»»<ApplicationDbContext>();</code> <code /> <code>»»»»»»»»»»»»»»»»List<OutboxMessage>»messages»=»</code> <code>»»»»»»»»»»»»»»»»»_dbContext.OutboxMessages.Where</code> <code>»»»»»»»»»»»»»»»»(om»=>»om.IsMessageDispatched»!=»false).ToList();</code> <code /> <code>»»»»»»»»»»»»»»»»<font color="Blue">foreach</font>»(OutboxMessage»outboxMessage»<font color="Blue">in</font>»messages)</code> <code>»»»»»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»»»»»<font color="Blue">try</font></code> <code>»»»»»»»»»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»»»»»»»»»<font color="Blue">await</font>»_producer.</code> <code>»»»»»»»»»»»»»»»»»»»»»»»»SendMessageToKafkaAsync(outboxMessage);</code> <code /> <code>»»»»»»»»»»»»»»»»»»»»»»»»outboxMessage.IsMessageDispatched»=»true;</code> <code>»»»»»»»»»»»»»»»»»»»»»»»»outboxMessage.Event_Date»=»DateTime.UtcNow;</code> <code /> <code>»»»»»»»»»»»»»»»»»»»»»»»»_dbContext.OutboxMessages.</code> <code>»»»»»»»»»»»»»»»»»»»»»»»»Update(outboxMessage);</code> <code>»»»»»»»»»»»»»»»»»»»»»»»»<font color="Blue">await</font>»_dbContext.SaveChangesAsync();</code> <code>»»»»»»»»»»»»»»»»»»»»}</code> <code>»»»»»»»»»»»»»»»»»»»»<font color="Blue">catch</font>»(Exception»e)</code> <code>»»»»»»»»»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»»»»»»»»»Console.WriteLine(e);</code> <code>»»»»»»»»»»»»»»»»»»»»}</code> <code>»»»»»»»»»»»»»»»»}</code> <code>»»»»»»»»»»»»}</code> <code>»»»»»»»»»»»»<font color="Blue">catch</font></code> <code>»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»<font color="Blue">throw</font>;</code> <code>»»»»»»»»»»»»}</code> <code /> <code>»»»»»»»»»»»»<font color="Blue">await</font>»Task.Delay</code> <code>»»»»»»»»»»»»(TimeSpan.FromSeconds(5),»</code> <code>»»»»»»»»»»»»cancellationToken);</code> <code>»»»»»»»»}</code> <code>»»»»}</code> <code>}</code> </codelisting> <codelisting id="4" header="Listing 4: The OutboxMessageRepository class"> <code> <font color="Blue">using</font>»System.Collections.ObjectModel;</code> <code> <font color="Blue">using</font>»System.Data;</code> <code /> <code> <font color="Blue">namespace</font>»<font color="#A31515">Outbox_Pattern_Demo</font></code> <code>{</code> <code>»»»»<font color="Blue">public</font>»<font color="Blue">class</font>»<font color="#A31515">OutboxMessageRepository</font>»:»</code> <code>»»»»<font color="#A31515">IOutboxMessageRepository</font></code> <code>»»»»{</code> <code>»»»»»»»»<font color="Blue">private</font>»<font color="Blue">readonly</font>»ApplicationDbContext»_context;</code> <code>»»»»»»»»<font color="Blue">private</font>»IreadOnlyCollection</code> <code>»»»»»»»»<OutboxMessage>»_outboxMessages;</code> <code /> <code>»»»»»»»»<font color="Blue">public</font>»<font color="#A31515">OutboxMessageRepository</font></code> <code>»»»»»»»»(ApplicationDbContext»context)</code> <code>»»»»»»»»{</code> <code>»»»»»»»»»»»»_context»=»context;</code> <code>»»»»»»»»}</code> <code /> <code>»»»»»»»»<font color="Blue">public</font>»IreadOnlyCollection</code> <code>»»»»»»»»<OutboxMessage>»OutboxMessages</code> <code>»»»»»»»»{</code> <code>»»»»»»»»»»»»<font color="Blue">get</font></code> <code>»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»<font color="Blue">return</font>»_outboxMessages»??»</code> <code>»»»»»»»»»»»»»»»»(_outboxMessages»=»<font color="Blue">new</font>»</code> <code>»»»»»»»»»»»»»»»»ReadOnlyCollection<OutboxMessage></code> <code>»»»»»»»»»»»»»»»»(_context.OutboxMessages.ToList()));</code> <code>»»»»»»»»»»»»}</code> <code>»»»»»»»»}</code> <code /> <code>»»»»»»»»<font color="Blue">public</font>»<font color="Blue">async</font>»Task<IreadOnlyCollection</code> <code>»»»»»»»»<OutboxMessage>>»GetUnsentMessagesAsync()</code> <code>»»»»»»»»{</code> <code>»»»»»»»»»»»»List<OutboxMessage>?»unsentMessages»=</code> <code>»»»»»»»»»»»»_context.OutboxMessages.Where</code> <code>»»»»»»»»»»»»(e»=>»e.IsMessageDispatched»!=»<font color="#A31515">true</font>).ToList();</code> <code>»»»»»»»»»»»»ReadOnlyCollection<OutboxMessage>?»</code> <code>»»»»»»»»»»»»result»=»<font color="Blue">new</font>»ReadOnlyCollection</code> <code>»»»»»»»»»»»»<OutboxMessage>(unsentMessages);</code> <code>»»»»»»»»»»»»<font color="Blue">return</font>»result;</code> <code>»»»»»»»»}</code> <code /> <code>»»»»»»»»<font color="Blue">public</font>»<font color="Blue">async</font>»Task<IreadOnlyCollection</code> <code>»»»»»»»»<OutboxMessage>>»GetMessagesByIdsAsync</code> <code>»»»»»»»»(IEnumerable<<font color="Blue">int</font>>»ids)</code> <code>»»»»»»»»{</code> <code>»»»»»»»»»»»»List<OutboxMessage>?»</code> <code>»»»»»»»»»»»»orders»=»_context.</code> <code>»»»»»»»»»»»»OutboxMessages.ToList();</code> <code>»»»»»»»»»»»»<font color="Blue">var</font>»readOnlyOrders»=»</code> <code>»»»»»»»»»»»»<font color="Blue">new</font>»ReadOnlyCollection</code> <code>»»»»»»»»»»»»<OutboxMessage>(orders);</code> <code>»»»»»»»»»»»»<font color="Blue">return</font>»readOnlyOrders;</code> <code>»»»»»»»»}</code> <code /> <code>»»»»»»»»<font color="Blue">public</font>»<font color="Blue">async</font>»Task»<font color="#A31515">UpdateAsync</font></code> <code>»»»»»»»»(OutboxMessage»message,»<font color="Blue">bool</font>»status)</code> <code>»»»»»»»»{</code> <code>»»»»»»»»»»»»<font color="Blue">var</font>»entity»=»_context.OutboxMessages.</code> <code>»»»»»»»»»»»»FirstOrDefault(o»=>»o.Event_Id»==»message.Event_Id);</code> <code /> <code>»»»»»»»»»»»»<font color="Blue">if</font>»(entity»!=»<font color="#A31515">null</font>)</code> <code>»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»entity.Event_Id»=»message.Event_Id;</code> <code>»»»»»»»»»»»»»»»»<font>entity.Event_Date»=»message.Event_Date;</font></code> <code> <font>»»»»»»»»»»»»»»»»entity.Event_Payload»=»message.Event_Payload;</font> </code> <code> <font>»»»»»»»»»»»»»»»»entity.IsMessageDispatched»=</font> </code> <code> <font>»»»»»»»»»»»»»»»»message.IsMessageDispatched;</font> </code> <code> <font>»»»»»»»»»»»»»»»»</font> <font color="Blue">await</font> <font>»_context.SaveChangesAsync();</font> </code> <code> <font>»»»»»»»»»»»»}</font> </code> <code> <font>»»»»»»»»}</font> </code> <code> <font>»»»»</font>};</code> <code>}</code> </codelisting> <codelisting id="5" header="Listing 5: The OrderController Class"> <code> <font color="Blue">using</font>»Microsoft.AspNetCore.Mvc;</code> <code /> <code> <font color="Blue">namespace</font>»<font color="#A31515">Outbox_Pattern_Demo.Controllers</font></code> <code>{</code> <code>»»»»[<font color="#2B91AF">Route(</font><font color="#A31515">"api/[controller]"</font><font color="#2B91AF">)</font>]</code> <code>»»»»[<font color="#2B91AF">ApiController</font>]</code> <code>»»»»<font color="Blue">public</font>»<font color="Blue">class</font>»<font color="#A31515">OrderController</font>»:»<font color="#A31515">ControllerBase</font></code> <code>»»»»{</code> <code>»»»»»»»»<font color="Blue">private</font>»IOrderService»_orderService;</code> <code>»»»»»»»»<font color="Blue">public</font>»<font color="#A31515">OrderController</font></code> <code>»»»»»»»»(IOrderService»orderService)</code> <code>»»»»»»»»{</code> <code>»»»»»»»»»»»»_orderService»=»orderService;</code> <code>»»»»»»»»}</code> <code /> <code>»»»»»»»»[<font color="#2B91AF">HttpGet(</font><font color="#A31515">"GetOrders"</font><font color="#2B91AF">)</font>]</code> <code>»»»»»»»»<font color="Blue">public</font>»<font color="Blue">async</font>»Task<List<Order>>»GetOrders()</code> <code>»»»»»»»»{</code> <code>»»»»»»»»»»»»<font color="Blue">return</font>»<font color="Blue">await</font>»_orderService.</code> <code>»»»»»»»»»»»»GetAllOrdersAsync();</code> <code>»»»»»»»»}</code> <code /> <code>»»»»»»»»[<font color="#2B91AF">HttpGet(</font><font color="#A31515">"{id}"</font><font color="#2B91AF">)</font>]</code> <code>»»»»»»»»<font color="Blue">public</font>»<font color="Blue">async</font>»Task<Order>»<font color="#A31515">GetOrder</font>(<font color="Blue">int</font>»id)</code> <code>»»»»»»»»{</code> <code>»»»»»»»»»»»»<font color="Blue">return</font>»<font color="Blue">await</font>»_orderService.</code> <code>»»»»»»»»»»»»GetOrderAsync(id);</code> <code>»»»»»»»»}</code> <code /> <code>»»»»»»»»[<font color="#2B91AF">HttpPost(</font><font color="#A31515">"CreateOrder"</font><font color="#2B91AF">)</font>]</code> <code>»»»»»»»»<font color="Blue">public</font>»<font color="Blue">async</font>»Task<IActionResult>»</code> <code>»»»»»»»»<font color="#A31515">CreateOrder</font>([FromBody]»Order»order)</code> <code>»»»»»»»»{</code> <code>»»»»»»»»»»»»<font color="Blue">await</font>»_orderService.</code> <code>»»»»»»»»»»»»CreateOrderAsync(order);</code> <code>»»»»»»»»»»»»<font color="Blue">return</font>»Ok();</code> <code>»»»»»»»»}</code> <code>»»»»}</code> <code>}</code> </codelisting> <codelisting id="6" header="Listing 6: The KafkaProducer class"> <code> <font color="Blue">using</font>»Confluent.Kafka;</code> <code> <font color="Blue">using</font>»System.Net;</code> <code /> <code> <font color="Blue">namespace</font>»<font color="#A31515">Outbox_Pattern_Demo</font></code> <code>{</code> <code>»»»»<font color="Blue">public</font>»<font color="Blue">class</font>»<font color="#A31515">KafkaProducer</font>»:»<font color="#A31515">IKafkaProducer</font></code> <code>»»»»{</code> <code>»»»»»»»»<font color="Blue">private</font>»<font color="Blue">readonly</font>»ProducerConfig»_producerConfig;</code> <code>»»»»»»»»<font color="Blue">private</font>»<font color="Blue">readonly</font>»IOutboxRepository»_outboxRepository;</code> <code>»»»»»»»»<font color="Blue">private</font>»<font color="Blue">readonly</font>»<font color="Blue">string</font>»topic»=»<font color="#A31515">"test"</font>;</code> <code /> <code>»»»»»»»»<font color="Blue">public</font>»<font color="#A31515">KafkaProducer</font>(IOutboxRepository»outboxRepository)</code> <code>»»»»»»»»{</code> <code>»»»»»»»»»»»»_outboxRepository»=»outboxRepository;</code> <code /> <code>»»»»»»»»»»»»_producerConfig»=»<font color="Blue">new</font>»ProducerConfig</code> <code>»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»BootstrapServers»=»<font color="#A31515">"localhost:9092"</font>,</code> <code>»»»»»»»»»»»»»»»»ClientId»=»Dns.GetHostName()</code> <code>»»»»»»»»»»»»};</code> <code /> <code>»»»»»»»»»»»»_outboxRepository»=»outboxRepository;</code> <code>»»»»»»»»}</code> <code /> <code>»»»»»»»»<font color="Blue">public</font>»<font color="Blue">async</font>»Task»<font color="#A31515">SendMessageToKafkaAsync</font></code> <code>»»»»»»»»(OutboxMessage»message)</code> <code>»»»»»»»»{</code> <code>»»»»»»»»»»»»<font color="Blue">if</font>»(message»==»<font color="#A31515">null</font>)</code> <code>»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»<font color="Blue">throw</font>»<font color="Blue">new</font>»ArgumentNullException</code> <code>»»»»»»»»»»»»»»»»(<font color="Blue">nameof</font>(message));</code> <code>»»»»»»»»»»»»}</code> <code /> <code>»»»»»»»»»»»»<font color="Blue">using</font>»<font color="Blue">var</font>»producer»=»</code> <code>»»»»»»»»»»»»<font color="Blue">new</font>»ProducerBuilder<Null,»</code> <code>»»»»»»»»»»»»<font color="Blue">string</font>>(_producerConfig).Build();</code> <code /> <code>»»»»»»»»»»»»<font color="Blue">try</font></code> <code>»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»<font color="Blue">var</font>»result»=»<font color="Blue">await</font>»producer.ProduceAsync</code> <code>»»»»»»»»»»»»»»»»»»»»(topic,»<font color="Blue">new</font>»Message<Null,»<font color="Blue">string</font>></code> <code>»»»»»»»»»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»»»»»»»»»Value»=»message.EventPayload</code> <code>»»»»»»»»»»»»»»»»»»»»});</code> <code /> <code>»»»»»»»»»»»»»»»»<font color="Blue">if</font>»(result.Status»==»PersistenceStatus.Persisted)</code> <code>»»»»»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»»»»»<font color="Blue">await</font>»_outboxRepository.UpdateAsync</code> <code>»»»»»»»»»»»»»»»»»»»»(message,»OutboxMessageStatus.Sent);</code> <code>»»»»»»»»»»»»»»»»}</code> <code>»»»»»»»»»»»»}</code> <code>»»»»»»»»»»»»<font color="Blue">catch</font>»(Exception)</code> <code>»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»<font color="Blue">await</font>»_outboxRepository.UpdateAsync</code> <code>»»»»»»»»»»»»»»»»(message,»OutboxMessageStatus.New);</code> <code>»»»»»»»»»»»»}</code> <code>»»»»»»»»}</code> <code>»»»»};</code> <code>}</code> </codelisting> <codelisting id="7" header="Listing 7: The Program.cs file (Producer Application)"> <code> <font color="Blue">using</font>»Microsoft.EntityFrameworkCore;</code> <code> <font color="Blue">using</font>»Outbox_Pattern_Demo;</code> <code /> <code> <font color="Blue">var</font>»builder»=»WebApplication.CreateBuilder(args);</code> <code /> <code> <font color="Green">//»Add»services»to»the»container.</font> </code> <code /> <code>builder.Services.AddControllers();</code> <code>builder.Services.AddScoped</code> <code><IOrderService,»OrderService>();</code> <code>builder.Services.AddScoped</code> <code><IOutboxMessageRepository,»</code> <code>OutboxMessageRepository>();</code> <code>builder.Services.AddScoped</code> <code><IKafkaProducer,»KafkaProducer>();</code> <code /> <code>builder.Services.AddSingleton</code> <code><IHostedService,»OutboxMessageProcessor>();</code> <code /> <code>builder.Services.AddDbContext</code> <code><ApplicationDbContext>(options»=></code> <code>options.UseSqlServer</code> <code>(<font color="#A31515">@"Data»Source=JOYDIP;</font></code> <code> <font color="#A31515">Initial»Catalog=Outbox_Pattern_Demo;</font> </code> <code> <font color="#A31515">Trusted_Connection=True;»</font> </code> <code> <font color="#A31515">TrustServerCertificate=True;</font> </code> <code> <font color="#A31515">Integrated»Security=True;"</font>));</code> <code /> <code> <font color="Blue">var</font>»app»=»builder.Build();</code> <code /> <code> <font color="Green">//»Configure»the»HTTP»request»pipeline.</font> </code> <code /> <code>app.UseAuthorization();</code> <code>app.MapControllers();</code> <code>app.Run();</code> </codelisting> <codelisting id="8" header="Listing 8: The KafkaConsumer class"> <code> <font color="Blue">using</font>»Confluent.Kafka;</code> <code> <font color="Blue">using</font>»System.Text.Json;</code> <code /> <code> <font color="Blue">namespace</font>»<font color="#A31515">Outbox_Pattern_Demo</font></code> <code>{</code> <code>»»»»<font color="Blue">public</font>»<font color="Blue">class</font>»<font color="#A31515">KafkaConsumer</font>»:»<font color="#A31515">IKafkaConsumer</font></code> <code>»»»»{</code> <code>»»»»»»»»<font color="Blue">private</font>»<font color="Blue">readonly</font>»<font color="Blue">string</font>»topic»=»<font color="#A31515">"test"</font>;</code> <code>»»»»»»»»<font color="Blue">private</font>»<font color="Blue">readonly</font>»<font color="Blue">string</font>»groupId»=»<font color="#A31515">"test_group"</font>;</code> <code>»»»»»»»»<font color="Blue">private</font>»<font color="Blue">readonly</font>»<font color="Blue">string</font>»bootstrapServers»=»</code> <code>»»»»»»»»<font color="#A31515">"localhost:9092"</font>;</code> <code /> <code>»»»»»»»»<font color="Blue">public</font>»<font color="Blue">async</font>»Task»<font color="#A31515">ConsumeMessagesAsync</font></code> <code>»»»»»»»(CancellationToken»cancellationToken)</code> <code>»»»»»»»»{</code> <code>»»»»»»»»»»»»<font color="Blue">var</font>»config»=»<font color="Blue">new</font>»ConsumerConfig</code> <code>»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»GroupId»=»groupId,</code> <code>»»»»»»»»»»»»»»»»BootstrapServers»=»bootstrapServers,</code> <code>»»»»»»»»»»»»»»»»AutoOffsetReset»=»AutoOffsetReset.Earliest</code> <code>»»»»»»»»»»»»};</code> <code /> <code>»»»»»»»»»»»»<font color="Blue">try</font></code> <code>»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»<font color="Blue">using</font>»(<font color="Blue">var</font>»consumerBuilder»=»<font color="Blue">new</font>»ConsumerBuilder</code> <code>»»»»»»»»»»»»»»»»<Ignore,»<font color="Blue">string</font>>(config).Build())</code> <code>»»»»»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»»»»»consumerBuilder.Subscribe(topic);</code> <code>»»»»»»»»»»»»»»»»»»»»<font color="Blue">var</font>»cancelToken»=»<font color="Blue">new</font>»CancellationTokenSource();</code> <code /> <code>»»»»»»»»»»»»»»»»»»»»<font color="Blue">try</font></code> <code>»»»»»»»»»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»»»»»»»»»<font color="Blue">while</font>»(<font color="#A31515">true</font>)</code> <code>»»»»»»»»»»»»»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»»»»»»»»»»»»»<font color="Blue">var</font>»consumer»=»consumerBuilder.Consume</code> <code>»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»(cancelToken.Token);</code> <code>»»»»»»»»»»»»»»»»»»»»»»»»»»»»<font color="Blue">var</font>»order»=»JsonSerializer.Deserialize</code> <code>»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»»<Order>(consumer.Message.Value);</code> <code>»»»»»»»»»»»»»»»»»»»»»»»»}</code> <code>»»»»»»»»»»»»»»»»»»»»}</code> <code>»»»»»»»»»»»»»»»»»»»»<font color="Blue">catch</font>»(OperationCanceledException)</code> <code>»»»»»»»»»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»»»»»»»»»consumerBuilder.Close();</code> <code>»»»»»»»»»»»»»»»»»»»»}</code> <code>»»»»»»»»»»»»»»»»}</code> <code>»»»»»»»»»»»»}</code> <code>»»»»»»»»»»»»<font color="Blue">catch</font></code> <code>»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»<font color="Blue">throw</font>;</code> <code>»»»»»»»»»»»»}</code> <code>»»»»»»»»}</code> <code>»»»»}</code> <code>}</code> </codelisting> <codelisting id="9" header="Listing 9: The KafkaMessageProducer class"> <code> <font color="Blue">namespace</font>»Outbox_Pattern_Demo</code> <code>{</code> <code>»»»»<font color="Blue">public</font>»<font color="Blue">class</font>»KafkaMessageProcessor»:»BackgroundService</code> <code>»»»»{</code> <code>»»»»»»»»<font color="Blue">private</font>»<font color="Blue">readonly</font>»IServiceScopeFactory»_scopeFactory;</code> <code>»»»»»»»»<font color="Blue">private</font>»<font color="Blue">readonly</font>»IKafkaConsumer»_consumer;</code> <code /> <code>»»»»»»»»<font color="Blue">public</font>»KafkaMessageProcessor</code> <code>»»»»»»»»(IServiceScopeFactory»scopeFactory,»</code> <code>»»»»»»»»IKafkaConsumer»consumer)</code> <code>»»»»»»»»{</code> <code>»»»»»»»»»»»»_scopeFactory»=»scopeFactory;</code> <code>»»»»»»»»»»»»_consumer»=»consumer;</code> <code>»»»»»»»»}</code> <code /> <code>»»»»»»»»<font color="Blue">protected</font>»<font color="Blue">override</font>»<font color="Blue">async</font>»Task»ExecuteAsync</code> <code>»»»»»»»»(CancellationToken»cancellationToken)</code> <code>»»»»»»»»{</code> <code>»»»»»»»»»»»»<font color="Blue">while</font>»(!cancellationToken.IsCancellationRequested)</code> <code>»»»»»»»»»»»»{</code> <code>»»»»»»»»»»»»»»»»<font color="Blue">await</font>»_consumer.ConsumeMessagesAsync</code> <code>»»»»»»»»»»»»»»»»(cancellationToken);</code> <code>»»»»»»»»»»»»}</code> <code>»»»»»»»»}</code> <code>»»»»}</code> <code>}</code> </codelisting> <codelisting id="10" header="Listing 10: The Program.cs file (Consumer Application)"> <code> <font color="Blue">using</font>»Microsoft.EntityFrameworkCore;</code> <code> <font color="Blue">using</font>»Outbox_Pattern_Client_Demo;</code> <code /> <code> <font color="Blue">var</font>»builder»=»WebApplication.CreateBuilder(args);</code> <code /> <code> <font color="Green">//»Add»services»to»the»container.</font> </code> <code /> <code>builder.Services.AddControllers();</code> <code>builder.Services.AddScoped</code> <code><IKafkaConsumer,»KafkaConsumer>();</code> <code>builder.Services.AddSingleton</code> <code><IHostedService,»</code> <code>KafkaMessageProcessor>();</code> <code /> <code> <font color="Blue">var</font>»app»=»builder.Build();</code> <code /> <code> <font color="Green">//»Configure»the»HTTP»request»pipeline.</font> </code> <code /> <code>app.UseAuthorization();</code> <code>app.MapControllers();</code> <code>app.Run();</code> </codelisting> </codelistings> </document>