We just published the new 23c version of the Kafka-compatible Java APIs for Transactional Event Queues in Maven Central, and I wanted to show you how to use them! If you are not familiar with these APIs – they basically allow you to use the standard Kafka Java API with Transactaional Event Queues acting as the Kafka broker. The only things that you would need to change are the broker address, and you need to use the Oracle versions of KafkaProducer and KafkaConsumer – other than that, your existing Kafka Java code should just work!
We also published updated source and sink Kafka connectors for Transactional Event Queues – but I’ll cover those in a separate post.
Let’s build a Kafka producer and consumer using the updated Kafka-compatible APIs.
Prepare the database
The first thing we want to do is start up the Oracle 23c Free Database. This is very easy to do in a container using a command like this:
docker run --name free23c -d -p 1521:1521 -e ORACLE_PWD=Welcome12345 container-registry.oracle.com/database/free:latest
This will pull the image and start up the database with a listener on port 1521. It will also create a pluggable database (a database container) called “FREEPDB1” and will set the admin passwords to the password you specified on this command.
You can tail the logs to see when the database is ready to use:
docker logs -f free23c
(look for this message...)
#########################
DATABASE IS READY TO USE!
#########################
Also, grab the IP address of the container, we’ll need that to connect to the database:
docker inspect free23c | grep IPA
"SecondaryIPAddresses": null,
"IPAddress": "172.17.0.2",
"IPAMConfig": null,
"IPAddress": "172.17.0.2",
To set up the necessary permissions, you’ll need to connect to the database with a client. If you don’t have one already, I’d recommend trying the new SQLcl CLI which you can download here. Start it up and connect to the database like this (note that your IP address and password may be different):
sql sys/Welcome12345@//172.17.0.2:1521/freepdb1 as sysdba
SQLcl: Release 22.2 Production on Tue Apr 11 12:36:24 2023
Copyright (c) 1982, 2023, Oracle. All rights reserved.
Connected to:
Oracle Database 23c Free, Release 23.0.0.0.0 - Developer-Release
Version 23.2.0.0.0
SQL>
Now, run these commands to create a user called “mark” and give it the necessary privileges:
SQL> create user mark identified by Welcome12345;
User MARK created.
SQL> grant resource, connect, unlimited tablespace to mark;
Grant succeeded.
SQL> grant execute on dbms_aq to mark;
Grant succeeded.
SQL> grant execute on dbms_aqadm to mark;
Grant succeeded.
SQL> grant execute on dbms_aqin to mark;
Grant succeeded.
SQL> grant execute on dbms_aqjms_internal to mark;
Grant succeeded.
SQL> grant execute on dbms_teqk to mark;
Grant succeeded.
SQL> grant execute on DBMS_RESOURCE_MANAGER to mark;
Grant succeeded.
SQL> grant select_catalog_role to mark;
Grant succeeded.
SQL> grant select on sys.aq$_queue_shards to mark;
Grant succeeded.
SQL> grant select on user_queue_partition_assignment_table to mark;
Grant succeeded.
SQL> exec dbms_teqk.AQ$_GRANT_PRIV_FOR_REPL('MARK');
PL/SQL procedure successfully completed.
SQL> commit;
Commit complete.
SQL> quit;
Create a Kafka topic and consumer group using these statements. Note that you could also do this from the Java code, or using the Kafka-compatible Transactional Event Queues REST API (which I wrote about in this post):
begin
-- Creates a topic named TEQ with 5 partitions and 7 days of retention time
dbms_teqk.aq$_create_kafka_topic('TEQ', 5);
-- Creates a Consumer Group CG1 for Topic TEQ
dbms_aqadm.add_subscriber('TEQ', subscriber => sys.aq$_agent('CG1', null, null));
end;
/
You should note that the dbms_teqk package is likely to be renamed in the GA release of Oracle Database 23c, but for the Oracle Database 23c Free – Developer Release you can use it.
Ok, we are ready to start on our Java code!
Create a Java project
Let’s create a Maven POM file (pom.xml) and add the dependencies we need for this application. I’ve also iunclude some profiles to make it easy to run the two main entry points we will create – the producer, and the consumer. Here’s the content for the pom.xml. Note that I have excluded the osdt_core and osdt_cert transitive dependencies, since we are not using a wallet or SSL in this example, so we do not need those libraries:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>okafka-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>okafka-demo</name>
<description>OKafka demo</description>
<properties>
<java.version>17</java.version>
<maven.compiler.target>17</maven.compiler.target>
<maven.compiler.source>17</maven.compiler.source>
</properties>
<dependencies>
<dependency>
<groupId>com.oracle.database.messaging</groupId>
<artifactId>okafka</artifactId>
<version>23.2.0.0</version>
<exclusions>
<exclusion>
<artifactId>osdt_core</artifactId>
<groupId>com.oracle.database.security</groupId>
</exclusion>
<exclusion>
<artifactId>osdt_cert</artifactId>
<groupId>com.oracle.database.security</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<profiles>
<profile>
<id>consumer</id>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<arguments>
<argument>-Doracle.jdbc.fanEnabled=false</argument>
<argument>-classpath</argument>
<classpath/>
<argument>com.example.SimpleConsumerOKafka</argument>
</arguments>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>producer</id>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<arguments>
<argument>-Doracle.jdbc.fanEnabled=false</argument>
<argument>-classpath</argument>
<classpath/>
<argument>com.example.SimpleProducerOKafka</argument>
</arguments>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
This is a pretty straightforward POM. I just set the project’s coordinates, declared my one dependency, and then created the two profiles so I can run the code easily.
Next, we are going to need a file called ojdbc.properties in the same directory as the POM with this content:
user=mark
password=Welcome12345
The KafkaProducer and KafkaConsumer will use this to connect to the database.
Create the consumer
Ok, now let’s create our consumer. In a directory called src/main/jaba/com/example, create a new Java file called SimpleConsumerOKafka.java with the following content:
package com.example;
import java.util.Properties;
import java.time.Duration;
import java.util.Arrays;
import org.oracle.okafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleConsumerOKafka {
public static void main(String[] args) {
// set the required properties
Properties props = new Properties();
props.put("bootstrap.servers", "172.17.0.2:1521");
props.put("group.id" , "CG1");
props.put("enable.auto.commit","false");
props.put("max.poll.records", 100);
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("oracle.service.name", "freepdb1");
props.put("oracle.net.tns_admin", ".");
props.put("security.protocol","PLAINTEXT");
// create the consumer
Consumer<String , String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("TEQ"));
int expectedMsgCnt = 4000;
int msgCnt = 0;
long startTime = 0;
// consume messages
try {
startTime = System.currentTimeMillis();
while(true) {
try {
ConsumerRecords <String, String> records =
consumer.poll(Duration.ofMillis(10_000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("partition = %d, offset = %d, key = %s, value = %s\n ",
record.partition(), record.offset(), record.key(), record.value());
for(Header h: record.headers()) {
System.out.println("Header: " + h.toString());
}
}
// commit the records we received
if (records != null && records.count() > 0) {
msgCnt += records.count();
System.out.println("Committing records " + records.count());
try {
consumer.commitSync();
} catch(Exception e) {
System.out.println("Exception in commit " + e.getMessage());
continue;
}
// if we got all the messages we expected, then exit
if (msgCnt >= expectedMsgCnt ) {
System.out.println("Received " + msgCnt + ". Expected " +
expectedMsgCnt +". Exiting Now.");
break;
}
} else {
System.out.println("No records fetched. Retrying...");
Thread.sleep(1000);
}
} catch(Exception e) {
System.out.println("Inner Exception " + e.getMessage());
throw e;
}
}
} catch(Exception e) {
System.out.println("Exception from consumer " + e);
e.printStackTrace();
} finally {
long runDuration = System.currentTimeMillis() - startTime;
System.out.println("Application closing Consumer. Run duration " +
runDuration + " ms");
consumer.close();
}
}
}
Let’s walk through this code together.
The first thing we do is prepare the properties for the KafkaConsumer. This is fairly standard, though notice that the bootstrap.servers property contains the address of your database listener:
Properties props = new Properties();
props.put("bootstrap.servers", "172.17.0.2:1521");
props.put("group.id" , "CG1");
props.put("enable.auto.commit","false");
props.put("max.poll.records", 100);
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
Then, we add some Oracle-specific properties – oracle.service.name is the name of the service we are connecting to, in our case this is freepdb1; oracle.net.tns_admin needs to point to the directory where we put our ojdbc.properties file; and security.protocol controls whether we are using SSL, or not, as in this case:
props.put("oracle.service.name", "freepdb1");
props.put("oracle.net.tns_admin", ".");
props.put("security.protocol","PLAINTEXT");
With that done, we can create the KafkaConsumer and subscribe to a topic. Note that we use the Oracle version of KafkaConsumer which is basically just a wrapper that understand those extra Oracle-specific properites:
import org.oracle.okafka.clients.consumer.KafkaConsumer;
// ...
Consumer<String , String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("TEQ"));
The rest of the code is standard Kafka code that polls for records, prints out any it finds, commits them, and then loops until it has received the number of records it expected and then exits.
Run the consumer
We can build and run the consumer with this command:
mvn exec:exec -P consumer
It will connect to the database and start polling for records, of course there won’t be any yet because we have not created the producer. It should output a message like this about every ten seconds:
No records fetched. Retrying...
Let’s write that producer!
Create the producer
In a directory called src/main/jaba/com/example, create a new Java file called SimpleProducerOKafka.java with the following content:
package com.example;
import org.oracle.okafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeader;
import java.util.Properties;
import java.util.concurrent.Future;
public class SimpleProducerOKafka {
public static void main(String[] args) {
long startTime = 0;
try {
// set the required properties
Properties props = new Properties();
props.put("bootstrap.servers", "172.17.0.2:1521");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("batch.size", "5000");
props.put("linger.ms","500");
props.put("oracle.service.name", "freepdb1");
props.put("oracle.net.tns_admin", ".");
props.put("security.protocol","PLAINTEXT");
// create the producer
Producer<String, String> producer = new KafkaProducer<String, String>(props);
Future<RecordMetadata> lastFuture = null;
int msgCnt = 4000;
startTime = System.currentTimeMillis();
// send the messages
for (int i = 0; i < msgCnt; i++) {
RecordHeader rH1 = new RecordHeader("CLIENT_ID", "FIRST_CLIENT".getBytes());
RecordHeader rH2 = new RecordHeader("REPLY_TO", "TOPIC_M5".getBytes());
ProducerRecord<String, String> producerRecord =
new ProducerRecord<String, String>(
"TEQ", String.valueOf(i), "Test message "+ i
);
producerRecord.headers().add(rH1).add(rH2);
lastFuture = producer.send(producerRecord);
}
// wait for the last one to finish
lastFuture.get();
// print summary
long runTime = System.currentTimeMillis() - startTime;
System.out.println("Produced "+ msgCnt +" messages in " + runTime + "ms.");
producer.close();
}
catch(Exception e) {
System.out.println("Caught exception: " + e );
e.printStackTrace();
}
}
}
This code is quite similar to the consumer. We first set up the Kafka properties, including the Oracle-specific ones. Then we create a KafkaProducer, again using the Oracle version which understands those extra properties. After that we just loop and produce the desired number of records.
Make sure your consumer is still running (or restart it) and then build and run the producer with this command:
mvn exec:exec -P producer
When you do this, it will run for a short time and then print a message like this to let you know it is done:
Produced 4000 messages in 1955ms.
Now take a look at the output in the consumer window. You should see quite a lot of output there. Here’s a short snippet from the end:
partition = 0, offset = 23047, key = 3998, value = Test message 3998
Header: RecordHeader(key = CLIENT_ID, value = [70, 73, 82, 83, 84, 95, 67, 76, 73, 69, 78, 84])
Header: RecordHeader(key = REPLY_TO, value = [84, 79, 80, 73, 67, 95, 77, 53])
Committing records 27
Received 4000. Expected 4000. Exiting Now.
Application closing Consumer. Run duration 510201 ms
It prints out a message for each record it finds, including the partition ID, the offset, and the key and value. It them prints out the headers. You will also see commit messages, and at the end it prints out how many records it found and how long it ws running for. I left mine running while I got the producer ready to go, so it shows a fairly long duration 🙂 But you can run it again and start the producer immediately after it and you will see a much shorter run duration.
Well, there you go! That’s a Kafka producer and consumer using the new updated 23c version of the Kafka-compatible Java API for Transactional Event Queues. Stay tuned for more!
