Generic selectors
Exact matches only
Search in title
Search in content
Search in posts
Search in pages
Filter by Categories
Investor blog
Blog

Spring Cloud Stream for seamless transition from Oracle AQ to a modern messaging platform

Tõnis Ojandu, Senior Software Developer, March 25, 2021

Have you ever needed to leverage event-based design abstractions provided by Spring Cloud Stream in a system that relied more on a legacy messaging implementation (Oracle Advanced Queues)? Well, we have, and we came up with a solution that eventually enables you to use these abstractions to migrate to a more modern and prominent messaging platform (RabbitMQ) without much additional hassle.

Two years ago, we were looking to break up a monolithic information system into event-driven microservices-based architecture. To this end, we needed a messaging platform that would drive this event communication between the microservices. After some research and deliberation, Kafka came out on top as the platform we planned to go with. However, soon after we presented our plans to our client, we hit a roadblock. Regulations prevented them from using cloud-provided solutions when the service providing servers themselves reside abroad.

Thus, self-hosting was a known requirement, but our client saw that they lacked in-house systems operational knowledge to host Kafka or any other messaging broker. They promised to start hiring and building this knowledge, but it left us in a bit of a bind since development teams had already been lined up and were looking to start the actual work.

We were asked to use Oracle Database Advanced Queues (AQ) for messaging instead since it had been used for limited tasks in the existing monolith. This was to be the alternative until the client had built its operational capabilities for a more modern messaging system. Our solution had to be designed to create a sufficient abstraction layer between the messaging and the rest of the implementation so that, down the line, we would be able to switch the message broker technology seamlessly.

Oracle Advanced Queues

Oracle AQ is a queueing system inside Oracle Database using dedicated PL/SQL commands. Typically, you would create your queues inside your DDL code, and then, the application itself would connect over the Oracle JDBC driver and enqueue/dequeue messages appropriately.

BEGIN 
    SYS.DBMS_AQADM.CREATE_QUEUE_TABLE( 
        queue_table => 'DEMO_SCHEMA.DEMO_TABLE', 
        multiple_consumers => false, 
        queue_payload_type => 'SYS.AQ$_JMS_MESSAGE' 
    ); 
     SYS.DBMS_AQADM.CREATE_QUEUE ( 
        queue_name => 'DEMO_SCHEMA.DEMO_QUEUE', 
        queue_table => 'DEMO_SCHEMA.DEMO_TABLE', 
        queue_type => SYS.DBMS_AQADM.NORMAL_QUEUE 
    ); 
    SYS.DBMS_AQADM.START_QUEUE ( 
        queue_name => 'DEMO_SCHEMA.DEMO_QUEUE', 
        enqueue => TRUE, 
        dequeue => TRUE 
    ); 
END 

DECLARE 
    id RAW(16); 
    queue_options DBMS_AQ.ENQUEUE_OPTIONS_T; 
    msg_props DBMS_AQ.MESSAGE_PROPERTIES_T; 
    msg SYS.AQ$_jms_message; 
BEGIN 
    msg := SYS.AQ$_jms_message.construct(dbms_aqjms.JMS_TEXT_MESSAGE); 
    msg.set_text(to_clob('{ 
  "field1" : "data1", 
  "field2" : "data2"}')); 
    DBMS_AQ.ENQUEUE( queue_name => 'DEMO_SCHEMA.DEMO_QUEUE' 
        , enqueue_options => queue_options 
        , message_properties => msg_props 
        , payload => msg 
        , msgid => id); 
END; 
/

Spring Cloud Stream

Spring Cloud Stream is part of the Spring Cloud toolkit that provides event-driven design abstractions for Spring Boot applications. It comes with an opinionated design to connect your events to a message broker. By default, all the messaging objects (topics, queues, etc.) provisioning are handled by Spring Cloud Stream, mostly behind the scenes. Setup itself is easy and handled via class-loaded Spring Boot Auto-configurations:

plugins {
  id 'org.springframework.boot' version '2.4.2'
  id 'io.spring.dependency-management' version '1.0.11.RELEASE'
  id 'java'
}

group = 'demo'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

repositories {
  mavenCentral()
}

ext {
  set('springCloudVersion', "2020.0.1")
}

dependencies {
  implementation 'org.springframework.boot:spring-boot-starter-amqp'
  implementation 'org.springframework.cloud:spring-cloud-stream'
  implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'
  testImplementation 'org.springframework.boot:spring-boot-starter-test'
  testImplementation 'org.springframework.amqp:spring-rabbit-test'
}

dependencyManagement {
  imports {
    mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
  }
}

test {
  useJUnitPlatform()
}

 

DemoApplication.java:

package demo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import java.util.function.Function;

@SpringBootApplication
public class DemoApplication {

  private static final Logger log = LoggerFactory.getLogger(DemoApplication.class);

  public static void main(String[] args) {
    SpringApplication.run(DemoApplication.class, args);
  }

  @Bean
  public Function<DemoEventPayload, DemoEventPayload> demoStreamFunction() {
    return this::service;
  }

  private DemoEventPayload service(DemoEventPayload payload) {
    log.info("Servicing payload: {}", payload.getDemoField());
    return new DemoEventPayload(payload.getDemoField() + "-serviced");
  }

  public static class DemoEventPayload {
    private String demoField;

    public DemoEventPayload(String demoField) {
      this.demoField = demoField;
    }

    public DemoEventPayload() {}

    public String getDemoField() {
      return demoField;
    }

    public void setDemoField(String demoField) {
      this.demoField = demoField;
    }
  }
}

application.yml:

spring:
  application:
    name: demo-service
  rabbitmq:
    addresses: localhost:5672
    username: guest
    password: guest
  cloud:
    stream:
      binders.binderName.type: rabbit
      default:
        group: ${spring.application.name}
      bindings:
        demoStreamFunction-in-0:
          destination: demo-in-v1
          contentType: application/json
        demoStreamFunction-out-0:
          destination: demo-out-v1
          contentType: application/json

Oracle Advanced Queues via Spring Cloud Stream

We decided to build our solution on top of Spring Cloud Stream because it provided all the event patterns we were looking for. Spring Cloud Stream is broker agnostic by design, and this promised us that we would be later able to upgrade to Kafka without much hassle. We only had to find a suitable binder implementation for the Oracle AQ online or write one ourselves. Unfortunately, we could not find anything out of the box for Oracle AQ but were able to discover available JMS-based solutions provided by Spring itself: https://github.com/spring-cloud/spring-cloud-stream-binder-jms . This solution provided only drivers for JMS implementations on ActiveMQ, Solace, IBM MQ and non-suitable for Oracle AQ. Still, the shared JMS portion of the binding was reasonably reusable for our purposes. Since Oracle itself provides a JMS driver for Oracle AQ, then this seemed a perfect match. The plan was to connect these edges, and we would have a working alternative until we could switch.

Problems (and solutions)

Queue Ownership

The first problem we ran into was that by our design the messaging had to be a separate entity from both message consumers and producers. We did not want to create a boot order for our microservices. Therefore, the Oracle AQ tables and queues should not reside on either side. We did not want to make an explicit requirement that all the microservices would require Oracle Database just to provide messaging. The solution was to isolate the Oracle AQ binder JDBC connection and its properties from the rest of the application. Now that neither consumer nor producer owned the underlying database objects, our binder implementation had to be intelligent enough to dynamically create the required queues when the first connection was established and not make any duplicate objects. To this end, we wrote a simple database procedure that the resulting binder was able to invoke.

Message Destination Naming

Another interesting problem was that Oracle AQ queues are, in fact, Oracle Database objects. Therefore, the same naming restrictions apply. We did not want to let this dictate our eventual destination naming design. Since we were looking to move away from Oracle AQ eventually, the solution we chose was not a perfect one but one we could live with until we could migrate. In order to comply with Oracle Database object naming scheme restrictions, we had to map the desired destination names into something more suitable. If the destination name contained any restricted characters, then we mapped them to underscores. If the destination name was too long for Oracle, then we trimmed it. Essentially, we create a one-way function that both consumers and producers could use. This function had a certain large likelihood for collision, but we accepted this as a temporary issue and planned to manage this risk until we switched to the other broker technology.

Connection Number Issues

After deploying this solution, we found that it was creating too many connections. This was the result of how the different pieces we had sewn together worked. These were the components that were involved in the call stack when creating a connection and interacting with the AQ resources:

Oracle Database

   Oracle Database  

All the custom logic we provided was in the setup stage. spring-cloud-stream-jms just needed to know the connection factory it had to use. This meant we had less control over how everything worked together. spring-cloud-stream-jms just assumed it should create a new JmsConnection any time any communication took place. OracleDataSource provided inside obdc.jar does not pool nor throttle database connections. For each new communication with the database server, a new connection is created. During an intensive load this becomes a problem, since the number of available connections is physically capped. Even more frustrating was that we were unable to use some industry standard wrapper like Hikari DataSource, Tomcat JDBC Connection Pool, etc., because the JMS Connection Factories provided in aq.jar strictly require direct access to an Oracle DataSource. As a solution, we chose to use PoolDataSource provided by Oracle Universal Connection Pool (ucp.jar) instead.

For added stability, we included -Doracle.jms.useEmulatedXA=false since the Oracle AQ library, by default, attempts to initialize emulated XA and fails with a hidden exception. This exception causes AQ JMS driver to close the connection. This failure happens several times during the startup until a certain retry count is reached. Because we did not use XA, we opted to disable it.

Conclusion

With these corrections, we were able to get a reasonably stable solution that worked in production, but there remain issues we have not addressed. The largest problem is that the set up for us makes it impossible to use the multi-consume pattern with different consumer groups — at least without doing heavy redesigning and refactoring in spring-cloud-stream-jms. Also, Oracle Universal Connection Pool works well when pooling and reusing connections, but we have started to see issues where the pool can overflow when the pool size is not large enough and there is increased communication.

I would not recommend this exact solution when looking for a more long-term stable system that would mix Spring Cloud Stream and legacy technologies like Oracle Advanced Queues. spring-cloud-stream-jms itself was already largely abandoned by the Spring community for newer technologies back when we started. For a more long-term and stable solution, I would recommend skipping spring-cloud-stream-jms and Oracle-provided JMS drivers and implement the Spring Cloud Stream binder yourself completely. This will give you more control over the resulting solution.

 

Oracle Database                           

                 Oracle Database                                                                                     RabbitMQ

Considering the issues we faced, and some we continue to face, we have deliberated several times to do just that and implement the whole binder directly on top of a more traditional JDBC DataSource wrapper like Hikari. We never decided to do so because the migration onto a more modern platform was always around the corner. As often happens with a temporary solution, this one has managed to stick around longer than intended, but has proven to be mostly stable. We have currently opted to go with RabbitMQ instead of Kafka because of the same system operational problems mentioned at the beginning. At this moment, we are already running all our testing environments on RabbitMQ while still supporting Oracle AQ in production. Production migration itself is planned to take place soon. Spring Cloud Stream has proven itself to be successfully flexible to give us the seamless migration that we were initially looking for.

The solution is very client-specific. Therefore, we do not have any git repo prepared that is suitable for publishing. But if you are interested in the code, want to ask for advice, or have more specific questions, then feel free to reach out to our Head of Engineering Priit Liivak: Priit.Liivak@nortal.com. 

Tõnis Ojandu

Senior Software Developer at Nortal

Related content