Hi, in this post I want to provide an example of how to implement the Transactional Outbox pattern using Transactional Event Queues and JMS with the new Oracle Database 23c Free – Developer Release I mentioned in my last post.
In the Transactional Outbox pattern, we have a microservice that needs to perform a database operation (like an insert) and send a message, and either both or neither of these need to happen.
Unlike other messaging providers, Transactional Event Queues is built-in to the Oracle Database and has the unique advantage of being able to expose the underlying database transaction to your application. This allows us perform database and messaging operations in the same transaction – which is exactly what we need to implement this pattern.
Prepare the database
The first thing we want to do is start up the Oracle 23c Free Database. This is very easy to do in a container using a command like this:
docker run --name free23c -d -p 1521:1521 -e ORACLE_PWD=Welcome12345 container-registry.oracle.com/database/free:latest
This will pull the image and start up the database with a listener on port 1521. It will also create a pluggable database (a database container) called “FREEPDB1” and will set the admin passwords to the password you specified on this command.
You can tail the logs to see when the database is ready to use:
docker logs -f free23c
(look for this message...)
#########################
DATABASE IS READY TO USE!
#########################
Also, grab the IP address of the container, we’ll need that to connect to the database:
docker inspect free23c | grep IPA
"SecondaryIPAddresses": null,
"IPAddress": "172.17.0.2",
"IPAMConfig": null,
"IPAddress": "172.17.0.2",
To set up the necessary permissions, you’ll need to connect to the database with a client. If you don’t have one already, I’d recommend trying the new SQLcl CLI which you can download here. Start it up and connect to the database like this (note that your IP address and password may be different):
sql sys/Welcome12345@//172.17.0.2:1521/freepdb1 as sysdba
SQLcl: Release 22.2 Production on Tue Apr 11 12:36:24 2023
Copyright (c) 1982, 2023, Oracle. All rights reserved.
Connected to:
Oracle Database 23c Free, Release 23.0.0.0.0 - Developer-Release
Version 23.2.0.0.0
SQL>
Now, run these commands to create a user called “mark” and give it the necessary privileges:
SQL> create user mark identified by Welcome12345;
User MARK created.
SQL> grant resource , connect, unlimited tablespace to mark;
Grant succeeded.
SQL> grant execute on dbms_aq to mark;
Grant succeeded.
SQL> grant execute on dbms_aqadm to mark;
Grant succeeded.
SQL> grant execute on dbms_aqin to mark;
Grant succeeded.
SQL> grant execute on dbms_aqjms_internal to mark;
Grant succeeded.
SQL> grant execute on dbms_teqk to mark;
Grant succeeded.
SQL> grant execute on DBMS_RESOURCE_MANAGER to mark;
Grant succeeded.
SQL> grant select_catalog_role to mark;
Grant succeeded.
SQL> grant select on sys.aq$_queue_shards to mark;
Grant succeeded.
SQL> grant select on user_queue_partition_assignment_table to mark;
Grant succeeded.
SQL> exec dbms_teqk.AQ$_GRANT_PRIV_FOR_REPL('MARK');
PL/SQL procedure successfully completed.
SQL> commit;
Commit complete.
SQL> quit;
Ok, we are ready to start on our Java code!
Create the Java project
If you have read my posts before, you’ll know I like to use Maven for my Java projects. Let’s create a Maven POM file (pom.xml) and add the dependencies we need for this application. I’ve also iunclude some profiles to make it easy to run the three main entry points we will create – one to create a queue, one to consume messages, and finally the transactional outbox implementation. Here’s the content for the pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>txoutbox</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>txoutbox</name>
<properties>
<java.version>17</java.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.oracle.database.messaging</groupId>
<artifactId>aqapi</artifactId>
<version>21.3.0.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>compile</scope>
<version>2.14.2</version>
</dependency>
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>jta</artifactId>
<version>1.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>jakarta.jms</groupId>
<artifactId>jakarta.jms-api</artifactId>
<scope>compile</scope>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>jakarta.management.j2ee</groupId>
<artifactId>jakarta.management.j2ee-api</artifactId>
<scope>compile</scope>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc11</artifactId>
<scope>compile</scope>
<version>21.3.0.0</version>
</dependency>
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ucp</artifactId>
<scope>compile</scope>
<version>21.3.0.0</version>
</dependency>
</dependencies>
<profiles>
<profile>
<id>publish</id>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<arguments>
<argument>-Doracle.jdbc.fanEnabled=false</argument>
<argument>-classpath</argument>
<classpath/>
<argument>com.example.Publish</argument>
<argument>jack</argument>
<argument>jack@jack.com</argument>
<argument>0</argument>
</arguments>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>consume</id>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<arguments>
<argument>-Doracle.jdbc.fanEnabled=false</argument>
<argument>-classpath</argument>
<classpath/>
<argument>com.example.Consume</argument>
</arguments>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>createq</id>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<arguments>
<argument>-Doracle.jdbc.fanEnabled=false</argument>
<argument>-classpath</argument>
<classpath/>
<argument>com.example.CreateTxEventQ</argument>
</arguments>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
I won’t go into a heap of detail on this or the first two Java classes, since they are fairly standard and I have talked about very similiar things before in older posts including this one for example. I will go into detail on the transactional outbox implementation though, don’t worry!
Create a Java class to create the queue
We are going to need a queue to put messages on, so let me show you how to do that in Java. Transactional Event Queues support various types of queues and payloads. This example shows how to create a queue that uses the JMS format. Create a file called src/main/com/example/CreateTxEventQ.java with this content:
package com.example;
import java.sql.SQLException;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import oracle.AQ.AQException;
import oracle.AQ.AQQueueTableProperty;
import oracle.jms.AQjmsDestination;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
import oracle.ucp.jdbc.PoolDataSource;
import oracle.ucp.jdbc.PoolDataSourceFactory;
public class CreateTxEventQ {
private static String username = "mark";
private static String password = "Welcome12345";
private static String url = "jdbc:oracle:thin:@//172.17.0.2:1521/freepdb1";
public static void main(String[] args) throws AQException, SQLException, JMSException {
// create a topic session
PoolDataSource ds = PoolDataSourceFactory.getPoolDataSource();
ds.setConnectionFactoryClassName("oracle.jdbc.pool.OracleDataSource");
ds.setURL(url);
ds.setUser(username);
ds.setPassword(password);
TopicConnectionFactory tcf = AQjmsFactory.getTopicConnectionFactory(ds);
TopicConnection conn = tcf.createTopicConnection();
conn.start();
TopicSession session = (AQjmsSession) conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
// create properties
AQQueueTableProperty props = new AQQueueTableProperty("SYS.AQ$_JMS_TEXT_MESAGE");
props.setMultiConsumer(true);
props.setPayloadType("SYS.AQ$_JMS_TEXT_MESSAGE");
// create queue table, topic and start it
Destination myTeq = ((AQjmsSession) session).createJMSTransactionalEventQueue("my_txeventq", true);
((AQjmsDestination) myTeq).start(session, true, true);
}
}
As you read through this, you’ll see I’ve just hardcoded the username, password and url for convenience in this file (and the others in this post), of course we’d never do that in real life, would we 🙂 You should also notice that we get a connection, then create the queue table first, set the consumer type (multiple, i.e. pub/sub – so a JMS Topic) and the format (JMS) and the queue itself, and then start it up. Easy, right?
You can run this and create the queue with this command:
mvn exec:exec -Pcreateq
If you want to see the queue in the database, you can log in using that mark user you created and run a query:
$ sql mark/Welcome12345@//172.17.0.2:1521/freepdb1
SQLcl: Release 22.2 Production on Tue Apr 11 15:18:58 2023
Copyright (c) 1982, 2023, Oracle. All rights reserved.
Last Successful login time: Tue Apr 11 2023 15:18:59 -04:00
Connected to:
Oracle Database 23c Free, Release 23.0.0.0.0 - Developer-Release
Version 23.2.0.0.0
SQL> select * from USER_QUEUES ;
NAME QUEUE_TABLE QID QUEUE_TYPE MAX_RETRIES RETRY_DELAY ENQUEUE_ENABLED DEQUEUE_ENABLED RETENTION USER_COMMENT NETWORK_NAME SHARDED QUEUE_CATEGORY RECIPIENTS
______________ ______________ ________ _______________ ______________ ______________ __________________ __________________ ____________ _______________ _______________ __________ ____________________________ _____________
MY_TXEVENTQ MY_TXEVENTQ 78567 NORMAL_QUEUE 5 0 YES YES 0 TRUE Transactional Event Queue MULTIPLE
While you’re there, let’s also create a table so we have somewhere to perform the database insert operation:
create table customer ( name varchar2(256), email varchar2(256) );
Create the consumer
Let’s create the consumer next. This will be a new Java file in the same directory called Consume.java. Here’s the content:
package com.example;
import java.sql.SQLException;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import oracle.AQ.AQException;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
import oracle.jms.AQjmsTextMessage;
import oracle.jms.AQjmsTopicSubscriber;
import oracle.ucp.jdbc.PoolDataSource;
import oracle.ucp.jdbc.PoolDataSourceFactory;
public class Consume {
private static String username = "mark";
private static String password = "Welcome12345";
private static String url = "jdbc:oracle:thin:@//172.17.0.2:1521/freepdb1";
private static String topicName = "my_txeventq";
public static void main(String[] args) throws AQException, SQLException, JMSException {
// create a topic session
PoolDataSource ds = PoolDataSourceFactory.getPoolDataSource();
ds.setConnectionFactoryClassName("oracle.jdbc.pool.OracleDataSource");
ds.setURL(url);
ds.setUser(username);
ds.setPassword(password);
// create a JMS topic connection and session
TopicConnectionFactory tcf = AQjmsFactory.getTopicConnectionFactory(ds);
TopicConnection conn = tcf.createTopicConnection();
conn.start();
TopicSession session =
(AQjmsSession) conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
// create a subscriber on the topic
Topic topic = ((AQjmsSession) session).getTopic(username, topicName);
AQjmsTopicSubscriber subscriber =
(AQjmsTopicSubscriber) session.createDurableSubscriber(topic, "my_subscriber");
System.out.println("Waiting for messages...");
// wait forever for messages to arrive and print them out
while (true) {
// the 1_000 is a one second timeout
AQjmsTextMessage message = (AQjmsTextMessage) subscriber.receive(1_000);
if (message != null) {
if (message.getText() != null) {
System.out.println(message.getText());
} else {
System.out.println();
}
}
session.commit();
}
}
}
This one is a fairly standard JMS consumer. It is going to create a subscription to that topic we just created, and wait for messages to arrive, and then just print the content on the screen. Nice and simple. You can run this with this command:
mvn exec:exec -Pconsume
Leave that running so that you see messages as they are produced. Later, when you run the transactional outbox producer, run it in a different window so that you can see what happens in the consumer.
Implement the Transactional Outbox pattern
Yay! The fun part! Here’s the code for this class, which will go into a new Java file in the same dircetory called Publish.java. I’ll walk through this code step by step.
package com.example;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import oracle.AQ.AQException;
import oracle.jms.AQjmsAgent;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
import oracle.jms.AQjmsTextMessage;
import oracle.jms.AQjmsTopicPublisher;
import oracle.ucp.jdbc.PoolDataSource;
import oracle.ucp.jdbc.PoolDataSourceFactory;
public class Publish {
private static String username = "mark";
private static String password = "Welcome12345";
private static String url = "jdbc:oracle:thin:@//172.17.0.2:1521/freepdb1";
private static String topicName = "my_txeventq";
public static void main(String[] args) throws JMSException, SQLException {
AQjmsTopicPublisher publisher = null;
TopicSession session = null;
TopicConnection tconn = null;
Connection conn = null;
if (args.length != 3) {
System.err.println("""
You must provide 3 arguments - name, email and failure mode
failure mode:
0 do not fail
1 fail before insert and publish
2 fail after insert, before publish
3 fail after insert and publlsh
""");
}
String name = args[0];
String email = args[1];
int failMode = Integer.parseInt(args[2]);
try {
// create a topic session
PoolDataSource ds = PoolDataSourceFactory.getPoolDataSource();
ds.setConnectionFactoryClassName("oracle.jdbc.pool.OracleDataSource");
ds.setURL(url);
ds.setUser(username);
ds.setPassword(password);
// create a JMS topic connection and session
TopicConnectionFactory tcf = AQjmsFactory.getTopicConnectionFactory(ds);
tconn = tcf.createTopicConnection();
tconn.start();
// open a Transactional session
session = (AQjmsSession) tconn.createSession(true, Session.AUTO_ACKNOWLEDGE);
// also get the JDBC connection
conn = ((AQjmsSession) session).getDBConnection();
conn.setAutoCommit(false);
// if failMode = 1, fail here
if (failMode == 1) throw new Exception();
// first, perform the database operation
PreparedStatement stmt = conn.prepareStatement("insert into customer (name, email) values (?, ?)");
stmt.setString(1, name);
stmt.setString(2, email);
stmt.executeUpdate();
System.out.println("row inserted");
// if failMode = 2, fail here
if (failMode == 2) throw new Exception();
// second, publish the message
Topic topic = ((AQjmsSession) session).getTopic(username, topicName);
publisher = (AQjmsTopicPublisher) session.createPublisher(topic);
AQjmsTextMessage message = (AQjmsTextMessage) session.createTextMessage("new customer with name=" + name + " and email=" + email);
publisher.publish(message, new AQjmsAgent[] { new AQjmsAgent("my_subscription", null) });
System.out.println("message sent");
// if failMode = 3, fail here
if (failMode == 3) throw new Exception();
// we didn't fail - so commit the transaction
if (failMode == 0) session.commit();
} catch (Exception e) {
System.err.println("rolling back");
if (conn != null) conn.rollback();
} finally {
// clean up
if (publisher != null) publisher.close();
if (session != null) session.close();
if (tconn != null) tconn.close();
}
}
}
Ok, so the overall structure of the code is as follows:

First, we are going to start a transaction. Then we will perform two operations – insert a record into the customer table, and send a message on a topic. If eevrything works as expected, we will commit the transaction. Of course, if there is a failure at any point, we will rollback instead. Notice the arrows are labeled with numbers – in the code I have included failure points that correspond to each of these arrows.
At the start of the main method, we are going to check we have the expected arguments — the name and email, and the point at which to fail, i.e., which of those arrows to simulate a failure at. A “0” indicates that no failure should be simulated. So if we run the code with “mark mark@example.com 2” as the input, we expect it to fail on the “2” arrow – after it inserted the row in the table and before it sent the message on the topic.
Next we get both a JMS Connection and a JDBC Connection. This is important because it allows us to have a single transaction. Note the following lines:
// open a Transactional session
session = (AQjmsSession) tconn.createSession(true, Session.AUTO_ACKNOWLEDGE);
also get the JDBC connection
conn = ((AQjmsSession) session).getDBConnection();
conn.setAutoCommit(false);
We explicity set the “auto commit” to false on the JDBC connection – we want to control exactly if and when work is commited, we do not want any automatic commits to occur. And on the JMS session we set the “transacted” parameter to true. That’s the first parameter in the createSession() call. This tells it to use the same database transaction.
Next, you will notice that we simulate a failure if the failure point was “1”:
if (failMode == 1) throw new Exception();
If an exception is thrown at this point (or any point), we’d expect to see no new rows in the database and no messages recieved by the consumer. We can check the table with this query:
select * from customer;
And you will see output like this in the consumer window every time a message is produced, so if you do not see that output – no messages:
new customer with name=jack and email=jack@jack.com
You can also check directly in the database with this query:
select * from my_txeventq;
The next thing you will see is a standard JDBC Prepared Statement to insert a row into the customer table. Notice that I don’t commit yet.
PreparedStatement stmt = conn.prepareStatement("insert into customer (name, email) values (?, ?)");
stmt.setString(1, name);
stmt.setString(2, email);
stmt.executeUpdate();
System.out.println("row inserted");
Then you will see failure point “2”.
And next, we have the code to publish a message on the topic:
AQjmsTextMessage message = (AQjmsTextMessage) session.createTextMessage("new customer with name=" + name + " and email=" + email);
publisher.publish(message, new AQjmsAgent[] { new AQjmsAgent("my_subscription", null) });
System.out.println("message sent");
Then you’ll see failure point “3” and then finally the commit!
Next, notice that the catch block contains a rollback on the database connection. You don’t have to rollback the JMS session as well – since they are in the same transaction, this one rollback call is enough to rollback all of the operations.
Run the Transactional Outbox code
Now we’re ready to run the code! First, notice in the POM file we created a profile called “publish” whic contains the following configuration:
<configuration>
<executable>java</executable>
<arguments>
<argument>-Doracle.jdbc.fanEnabled=false</argument>
<argument>-classpath</argument>
<classpath/>
<argument>com.example.Publish</argument>
<argument>jack</argument>
<argument>jack@jack.com</argument>
<argument>0</argument>
</arguments>
</configuration>
The last three arguments are the name, email and the failure point. If you go ahead and run it as is (with failure point 0, meaning no failure) then it should actually get all the way through to the commit. You should see output in the consumer window to let you know the message was produced, and you can check the table in the database to see the new record in there. Run the code like this:
mvn exec:exec -Pproduce
Of course, you’ll see a record in the table and the message.
If you now edit the POM file and change that last argument from 0 to any of the other options and run it again, you’ll notice that it rolls back and you do not get a new record in the table or a message produced on the topic.
How do I know it really worked?
If you’d like to experiment and convince yourself it really is working, try something like commenting out failure point 2 like this:
// if (failMode == 2) throw new Exception();
When you run the code again, you will now see that there is a row in the database that was not rolled back (because the failure never occured and the exception was never thrown) but the message was never sent (becuase the commit was never run due to failMode being 2, not 0).
If you tweak the failure points you can easily convince yourself that it is in fact working just as expected 🙂
So there you go, that’s the Transactional Outbox pattern implemented using Transactional Event Queues with Oracle Database 23c Free – that was pretty easy, right? Hope you enjoyed it, and see you soon!

You must be logged in to post a comment.