Implementing the Transactional Outbox pattern using Transactional Event Queues and JMS

Hi, in this post I want to provide an example of how to implement the Transactional Outbox pattern using Transactional Event Queues and JMS with the new Oracle Database 23c Free – Developer Release I mentioned in my last post.

In the Transactional Outbox pattern, we have a microservice that needs to perform a database operation (like an insert) and send a message, and either both or neither of these need to happen.

Unlike other messaging providers, Transactional Event Queues is built-in to the Oracle Database and has the unique advantage of being able to expose the underlying database transaction to your application. This allows us perform database and messaging operations in the same transaction – which is exactly what we need to implement this pattern.

Prepare the database

The first thing we want to do is start up the Oracle 23c Free Database. This is very easy to do in a container using a command like this:

docker run --name free23c -d -p 1521:1521 -e ORACLE_PWD=Welcome12345 container-registry.oracle.com/database/free:latest

This will pull the image and start up the database with a listener on port 1521. It will also create a pluggable database (a database container) called “FREEPDB1” and will set the admin passwords to the password you specified on this command.

You can tail the logs to see when the database is ready to use:

docker logs -f free23c

(look for this message...)
#########################
DATABASE IS READY TO USE!
#########################

Also, grab the IP address of the container, we’ll need that to connect to the database:

docker inspect free23c | grep IPA
            "SecondaryIPAddresses": null,
            "IPAddress": "172.17.0.2",
                    "IPAMConfig": null,
                    "IPAddress": "172.17.0.2",

To set up the necessary permissions, you’ll need to connect to the database with a client. If you don’t have one already, I’d recommend trying the new SQLcl CLI which you can download here. Start it up and connect to the database like this (note that your IP address and password may be different):

sql sys/Welcome12345@//172.17.0.2:1521/freepdb1 as sysdba


SQLcl: Release 22.2 Production on Tue Apr 11 12:36:24 2023

Copyright (c) 1982, 2023, Oracle.  All rights reserved.

Connected to:
Oracle Database 23c Free, Release 23.0.0.0.0 - Developer-Release
Version 23.2.0.0.0

SQL>

Now, run these commands to create a user called “mark” and give it the necessary privileges:


SQL> create user mark identified by Welcome12345;

User MARK created.

SQL> grant resource , connect, unlimited tablespace to mark;

Grant succeeded.

SQL> grant execute on dbms_aq to mark;

Grant succeeded.

SQL> grant execute on dbms_aqadm to mark;

Grant succeeded.

SQL> grant execute on dbms_aqin to mark;

Grant succeeded.

SQL> grant execute on dbms_aqjms_internal to mark;

Grant succeeded.

SQL> grant execute on dbms_teqk to mark;

Grant succeeded.

SQL> grant execute on DBMS_RESOURCE_MANAGER to mark;

Grant succeeded.

SQL> grant select_catalog_role to mark;

Grant succeeded.

SQL> grant select on sys.aq$_queue_shards to mark;

Grant succeeded.

SQL> grant select on user_queue_partition_assignment_table to mark;

Grant succeeded.

SQL> exec  dbms_teqk.AQ$_GRANT_PRIV_FOR_REPL('MARK');

PL/SQL procedure successfully completed.

SQL> commit;

Commit complete.

SQL> quit;

Ok, we are ready to start on our Java code!

Create the Java project

If you have read my posts before, you’ll know I like to use Maven for my Java projects. Let’s create a Maven POM file (pom.xml) and add the dependencies we need for this application. I’ve also iunclude some profiles to make it easy to run the three main entry points we will create – one to create a queue, one to consume messages, and finally the transactional outbox implementation. Here’s the content for the pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" 
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
         https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.example</groupId>
	<artifactId>txoutbox</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>txoutbox</name>

	<properties>
		<java.version>17</java.version>
		<maven.compiler.source>17</maven.compiler.source>
		<maven.compiler.target>17</maven.compiler.target>
        </properties>

    <dependencies>
        <dependency>
            <groupId>com.oracle.database.messaging</groupId>
            <artifactId>aqapi</artifactId>
            <version>21.3.0.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <scope>compile</scope>
            <version>2.14.2</version>
        </dependency>
        <dependency>
            <groupId>javax.transaction</groupId>
            <artifactId>jta</artifactId>
            <version>1.1</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>jakarta.jms</groupId>
            <artifactId>jakarta.jms-api</artifactId>
            <scope>compile</scope>
            <version>3.1.0</version>
        </dependency>
        <dependency>
            <groupId>jakarta.management.j2ee</groupId>
            <artifactId>jakarta.management.j2ee-api</artifactId>
            <scope>compile</scope>
            <version>1.1.4</version>
        </dependency>
        <dependency>
            <groupId>com.oracle.database.jdbc</groupId>
            <artifactId>ojdbc11</artifactId>
            <scope>compile</scope>
            <version>21.3.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.oracle.database.jdbc</groupId>
            <artifactId>ucp</artifactId>
            <scope>compile</scope>
            <version>21.3.0.0</version>
        </dependency>
    </dependencies>

    <profiles>
        <profile>
            <id>publish</id>
            <build>
                <plugins>
                    <plugin>
                        <groupId>org.codehaus.mojo</groupId>
                        <artifactId>exec-maven-plugin</artifactId>
                        <version>3.0.0</version>
                        <executions>
                            <execution>
                                <goals>
                                    <goal>exec</goal>
                                </goals>
                            </execution>
                        </executions>
                        <configuration>
                            <executable>java</executable>
                            <arguments>
                                <argument>-Doracle.jdbc.fanEnabled=false</argument>
                                <argument>-classpath</argument>
                                <classpath/>
                                <argument>com.example.Publish</argument>
                                <argument>jack</argument>
                                <argument>jack@jack.com</argument>
                                <argument>0</argument>
                            </arguments>
                        </configuration>
                    </plugin>
                </plugins>
            </build>
        </profile>

        <profile>
            <id>consume</id>
            <build>
                <plugins>
                    <plugin>
                        <groupId>org.codehaus.mojo</groupId>
                        <artifactId>exec-maven-plugin</artifactId>
                        <version>3.0.0</version>
                        <executions>
                            <execution>
                                <goals>
                                    <goal>exec</goal>
                                </goals>
                            </execution>
                        </executions>
                        <configuration>
                            <executable>java</executable>
                            <arguments>
                                <argument>-Doracle.jdbc.fanEnabled=false</argument>
                                <argument>-classpath</argument>
                                <classpath/>
                                <argument>com.example.Consume</argument>
                            </arguments>
                        </configuration>
                    </plugin>
                </plugins>
            </build>
        </profile>

        <profile>
            <id>createq</id>
            <build>
                <plugins>
                    <plugin>
                        <groupId>org.codehaus.mojo</groupId>
                        <artifactId>exec-maven-plugin</artifactId>
                        <version>3.0.0</version>
                        <executions>
                            <execution>
                                <goals>
                                    <goal>exec</goal>
                                </goals>
                            </execution>
                        </executions>
                        <configuration>
                            <executable>java</executable>
                            <arguments>
                                <argument>-Doracle.jdbc.fanEnabled=false</argument>
                                <argument>-classpath</argument>
                                <classpath/>
                                <argument>com.example.CreateTxEventQ</argument>
                            </arguments>
                        </configuration>
                    </plugin>
                </plugins>
            </build>
        </profile>
    </profiles>

</project>

I won’t go into a heap of detail on this or the first two Java classes, since they are fairly standard and I have talked about very similiar things before in older posts including this one for example. I will go into detail on the transactional outbox implementation though, don’t worry!

Create a Java class to create the queue

We are going to need a queue to put messages on, so let me show you how to do that in Java. Transactional Event Queues support various types of queues and payloads. This example shows how to create a queue that uses the JMS format. Create a file called src/main/com/example/CreateTxEventQ.java with this content:

package com.example;

import java.sql.SQLException;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;

import oracle.AQ.AQException;
import oracle.AQ.AQQueueTableProperty;
import oracle.jms.AQjmsDestination;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
import oracle.ucp.jdbc.PoolDataSource;
import oracle.ucp.jdbc.PoolDataSourceFactory;

public class CreateTxEventQ {

    private static String username = "mark";
    private static String password = "Welcome12345";
    private static String url = "jdbc:oracle:thin:@//172.17.0.2:1521/freepdb1";

    public static void main(String[] args) throws AQException, SQLException, JMSException {
        
        // create a topic session
        PoolDataSource ds = PoolDataSourceFactory.getPoolDataSource();
        ds.setConnectionFactoryClassName("oracle.jdbc.pool.OracleDataSource");
        ds.setURL(url);
        ds.setUser(username);
        ds.setPassword(password);

        TopicConnectionFactory tcf = AQjmsFactory.getTopicConnectionFactory(ds);
        TopicConnection conn = tcf.createTopicConnection();
        conn.start();
        TopicSession session = (AQjmsSession) conn.createSession(true, Session.AUTO_ACKNOWLEDGE);

        // create properties
        AQQueueTableProperty props = new AQQueueTableProperty("SYS.AQ$_JMS_TEXT_MESAGE");
        props.setMultiConsumer(true);
        props.setPayloadType("SYS.AQ$_JMS_TEXT_MESSAGE");

        // create queue table, topic and start it
        Destination myTeq = ((AQjmsSession) session).createJMSTransactionalEventQueue("my_txeventq", true);
        ((AQjmsDestination) myTeq).start(session, true, true);

    }

}

As you read through this, you’ll see I’ve just hardcoded the username, password and url for convenience in this file (and the others in this post), of course we’d never do that in real life, would we 🙂 You should also notice that we get a connection, then create the queue table first, set the consumer type (multiple, i.e. pub/sub – so a JMS Topic) and the format (JMS) and the queue itself, and then start it up. Easy, right?

You can run this and create the queue with this command:

mvn exec:exec -Pcreateq

If you want to see the queue in the database, you can log in using that mark user you created and run a query:

$ sql mark/Welcome12345@//172.17.0.2:1521/freepdb1

SQLcl: Release 22.2 Production on Tue Apr 11 15:18:58 2023

Copyright (c) 1982, 2023, Oracle.  All rights reserved.


Last Successful login time: Tue Apr 11 2023 15:18:59 -04:00

Connected to:
Oracle Database 23c Free, Release 23.0.0.0.0 - Developer-Release
Version 23.2.0.0.0

SQL> select * from USER_QUEUES ;

NAME           QUEUE_TABLE         QID QUEUE_TYPE         MAX_RETRIES    RETRY_DELAY ENQUEUE_ENABLED    DEQUEUE_ENABLED    RETENTION    USER_COMMENT    NETWORK_NAME    SHARDED    QUEUE_CATEGORY               RECIPIENTS
______________ ______________ ________ _______________ ______________ ______________ __________________ __________________ ____________ _______________ _______________ __________ ____________________________ _____________
MY_TXEVENTQ    MY_TXEVENTQ       78567 NORMAL_QUEUE                 5              0   YES                YES              0                                            TRUE       Transactional Event Queue    MULTIPLE

While you’re there, let’s also create a table so we have somewhere to perform the database insert operation:

create table customer ( name varchar2(256), email varchar2(256) ); 

Create the consumer

Let’s create the consumer next. This will be a new Java file in the same directory called Consume.java. Here’s the content:

package com.example;

import java.sql.SQLException;

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;

import oracle.AQ.AQException;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
import oracle.jms.AQjmsTextMessage;
import oracle.jms.AQjmsTopicSubscriber;
import oracle.ucp.jdbc.PoolDataSource;
import oracle.ucp.jdbc.PoolDataSourceFactory;

public class Consume {

    private static String username = "mark";
    private static String password = "Welcome12345";
    private static String url = "jdbc:oracle:thin:@//172.17.0.2:1521/freepdb1";
    private static String topicName = "my_txeventq";

    public static void main(String[] args) throws AQException, SQLException, JMSException {

        // create a topic session
        PoolDataSource ds = PoolDataSourceFactory.getPoolDataSource();
        ds.setConnectionFactoryClassName("oracle.jdbc.pool.OracleDataSource");
        ds.setURL(url);
        ds.setUser(username);
        ds.setPassword(password);

        // create a JMS topic connection and session
        TopicConnectionFactory tcf = AQjmsFactory.getTopicConnectionFactory(ds);
        TopicConnection conn = tcf.createTopicConnection();
        conn.start();
        TopicSession session = 
           (AQjmsSession) conn.createSession(true, Session.AUTO_ACKNOWLEDGE);

        // create a subscriber on the topic
        Topic topic = ((AQjmsSession) session).getTopic(username, topicName);
        AQjmsTopicSubscriber subscriber = 
           (AQjmsTopicSubscriber) session.createDurableSubscriber(topic, "my_subscriber");

        System.out.println("Waiting for messages...");

        // wait forever for messages to arrive and print them out
        while (true) {

            // the 1_000 is a one second timeout
            AQjmsTextMessage message = (AQjmsTextMessage) subscriber.receive(1_000); 
            if (message != null) {
                if (message.getText() != null) {
                    System.out.println(message.getText());
                } else {
                    System.out.println();
                }
            }
            session.commit();
        }
    }

}

This one is a fairly standard JMS consumer. It is going to create a subscription to that topic we just created, and wait for messages to arrive, and then just print the content on the screen. Nice and simple. You can run this with this command:

mvn exec:exec -Pconsume

Leave that running so that you see messages as they are produced. Later, when you run the transactional outbox producer, run it in a different window so that you can see what happens in the consumer.

Implement the Transactional Outbox pattern

Yay! The fun part! Here’s the code for this class, which will go into a new Java file in the same dircetory called Publish.java. I’ll walk through this code step by step.

package com.example;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;

import oracle.AQ.AQException;
import oracle.jms.AQjmsAgent;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
import oracle.jms.AQjmsTextMessage;
import oracle.jms.AQjmsTopicPublisher;
import oracle.ucp.jdbc.PoolDataSource;
import oracle.ucp.jdbc.PoolDataSourceFactory;

public class Publish {

    private static String username = "mark";
    private static String password = "Welcome12345";
    private static String url = "jdbc:oracle:thin:@//172.17.0.2:1521/freepdb1";
    private static String topicName = "my_txeventq";

    public static void main(String[] args) throws JMSException, SQLException {

        AQjmsTopicPublisher publisher = null;
        TopicSession session = null;
        TopicConnection tconn = null;
        Connection conn = null;

        if (args.length != 3) {
            System.err.println("""
                You must provide 3 arguments - name, email and failure mode
                failure mode:
                  0    do not fail
                  1    fail before insert and publish
                  2    fail after insert, before publish
                  3    fail after insert and publlsh
            """);
        }
        String name = args[0];
        String email = args[1];
        int failMode = Integer.parseInt(args[2]);

        try {
            // create a topic session
            PoolDataSource ds = PoolDataSourceFactory.getPoolDataSource();
            ds.setConnectionFactoryClassName("oracle.jdbc.pool.OracleDataSource");
            ds.setURL(url);
            ds.setUser(username);
            ds.setPassword(password);

            // create a JMS topic connection and session
            TopicConnectionFactory tcf = AQjmsFactory.getTopicConnectionFactory(ds);
            tconn = tcf.createTopicConnection();
            tconn.start();

            // open a Transactional session
            session = (AQjmsSession) tconn.createSession(true, Session.AUTO_ACKNOWLEDGE);

            // also get the JDBC connection
            conn = ((AQjmsSession) session).getDBConnection();
            conn.setAutoCommit(false);

            // if failMode = 1, fail here
            if (failMode == 1) throw new Exception();

            // first, perform the database operation
            PreparedStatement stmt = conn.prepareStatement("insert into customer (name, email) values (?, ?)");
            stmt.setString(1,  name);
            stmt.setString(2, email);
            stmt.executeUpdate();
            System.out.println("row inserted");

            // if failMode = 2, fail here
            if (failMode == 2) throw new Exception();

            // second, publish the message
            Topic topic = ((AQjmsSession) session).getTopic(username, topicName);
            publisher = (AQjmsTopicPublisher) session.createPublisher(topic);

            AQjmsTextMessage message = (AQjmsTextMessage) session.createTextMessage("new customer with name=" + name + " and email=" + email);
            publisher.publish(message, new AQjmsAgent[] { new AQjmsAgent("my_subscription", null) });
            System.out.println("message sent");

            // if failMode = 3, fail here
            if (failMode == 3) throw new Exception();        

            // we didn't fail - so commit the transaction
            if (failMode == 0) session.commit();

        } catch (Exception e) {
            System.err.println("rolling back");
            if (conn != null) conn.rollback();
        } finally {
            // clean up
            if (publisher != null) publisher.close();
            if (session != null) session.close();
            if (tconn != null) tconn.close();
        }
    }

}

Ok, so the overall structure of the code is as follows:

First, we are going to start a transaction. Then we will perform two operations – insert a record into the customer table, and send a message on a topic. If eevrything works as expected, we will commit the transaction. Of course, if there is a failure at any point, we will rollback instead. Notice the arrows are labeled with numbers – in the code I have included failure points that correspond to each of these arrows.

At the start of the main method, we are going to check we have the expected arguments — the name and email, and the point at which to fail, i.e., which of those arrows to simulate a failure at. A “0” indicates that no failure should be simulated. So if we run the code with “mark mark@example.com 2” as the input, we expect it to fail on the “2” arrow – after it inserted the row in the table and before it sent the message on the topic.

Next we get both a JMS Connection and a JDBC Connection. This is important because it allows us to have a single transaction. Note the following lines:

// open a Transactional session
session = (AQjmsSession) tconn.createSession(true, Session.AUTO_ACKNOWLEDGE);

also get the JDBC connection
conn = ((AQjmsSession) session).getDBConnection();
conn.setAutoCommit(false);

We explicity set the “auto commit” to false on the JDBC connection – we want to control exactly if and when work is commited, we do not want any automatic commits to occur. And on the JMS session we set the “transacted” parameter to true. That’s the first parameter in the createSession() call. This tells it to use the same database transaction.

Next, you will notice that we simulate a failure if the failure point was “1”:

if (failMode == 1) throw new Exception();

If an exception is thrown at this point (or any point), we’d expect to see no new rows in the database and no messages recieved by the consumer. We can check the table with this query:

select * from customer;

And you will see output like this in the consumer window every time a message is produced, so if you do not see that output – no messages:

new customer with name=jack and email=jack@jack.com

You can also check directly in the database with this query:

select * from my_txeventq;

The next thing you will see is a standard JDBC Prepared Statement to insert a row into the customer table. Notice that I don’t commit yet.

PreparedStatement stmt = conn.prepareStatement("insert into customer (name, email) values (?, ?)");
stmt.setString(1,  name);
stmt.setString(2, email);
stmt.executeUpdate();
System.out.println("row inserted");

Then you will see failure point “2”.

And next, we have the code to publish a message on the topic:

AQjmsTextMessage message = (AQjmsTextMessage) session.createTextMessage("new customer with name=" + name + " and email=" + email);
publisher.publish(message, new AQjmsAgent[] { new AQjmsAgent("my_subscription", null) });
System.out.println("message sent");

Then you’ll see failure point “3” and then finally the commit!

Next, notice that the catch block contains a rollback on the database connection. You don’t have to rollback the JMS session as well – since they are in the same transaction, this one rollback call is enough to rollback all of the operations.

Run the Transactional Outbox code

Now we’re ready to run the code! First, notice in the POM file we created a profile called “publish” whic contains the following configuration:

<configuration>
  <executable>java</executable>
  <arguments>
    <argument>-Doracle.jdbc.fanEnabled=false</argument>
    <argument>-classpath</argument>
    <classpath/>
    <argument>com.example.Publish</argument>
    <argument>jack</argument>
    <argument>jack@jack.com</argument>
    <argument>0</argument>
  </arguments>
</configuration>

The last three arguments are the name, email and the failure point. If you go ahead and run it as is (with failure point 0, meaning no failure) then it should actually get all the way through to the commit. You should see output in the consumer window to let you know the message was produced, and you can check the table in the database to see the new record in there. Run the code like this:

mvn exec:exec -Pproduce

Of course, you’ll see a record in the table and the message.

If you now edit the POM file and change that last argument from 0 to any of the other options and run it again, you’ll notice that it rolls back and you do not get a new record in the table or a message produced on the topic.

How do I know it really worked?

If you’d like to experiment and convince yourself it really is working, try something like commenting out failure point 2 like this:

// if (failMode == 2) throw new Exception();

When you run the code again, you will now see that there is a row in the database that was not rolled back (because the failure never occured and the exception was never thrown) but the message was never sent (becuase the commit was never run due to failMode being 2, not 0).

If you tweak the failure points you can easily convince yourself that it is in fact working just as expected 🙂

So there you go, that’s the Transactional Outbox pattern implemented using Transactional Event Queues with Oracle Database 23c Free – that was pretty easy, right? Hope you enjoyed it, and see you soon!

About Mark Nelson

Mark Nelson is a Developer Evangelist at Oracle, focusing on microservices and AI. Mark has served as a Section Leader in Stanford's Code in Place program that has introduced tens of thousands of people to the joy of programming, he is a published author, a reviewer and contributor, a content creator and a lifelong learner. He enjoys traveling, meeting people and learning about foods and cultures of the world. Mark has worked at Oracle since 2006 and before that at IBM since 1994.
This entry was posted in Uncategorized and tagged , , , , , . Bookmark the permalink.

Leave a comment