AI Industrial Summit - SOFIA, BULGARIA - A High-Speed Data Ingestion Microservice in Java Using MQTT, AMQP, and STOMP

juarezjunior 19 views 52 slides Mar 09, 2025
Slide 1
Slide 1 of 52
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28
Slide 29
29
Slide 30
30
Slide 31
31
Slide 32
32
Slide 33
33
Slide 34
34
Slide 35
35
Slide 36
36
Slide 37
37
Slide 38
38
Slide 39
39
Slide 40
40
Slide 41
41
Slide 42
42
Slide 43
43
Slide 44
44
Slide 45
45
Slide 46
46
Slide 47
47
Slide 48
48
Slide 49
49
Slide 50
50
Slide 51
51
Slide 52
52

About This Presentation

AI Industrial Summit - SOFIA, BULGARIA - A High-Speed Data Ingestion Microservice in Java Using MQTT, AMQP, and STOMP


Slide Content

14 | SEP | 2024AI Industrial Summit 2024
02
THANKS FOR
SUPPORT US TODAY
Platinum Sponsor Platinum Sponsor
Silver Sponsor Partner

A High-Speed Data Ingestion Microservice
in Java Using MQTT, AMQP, and STOMP
AI Industrial Summit 2024
Juarez Barbosa Junior @juarezjunior
Senior Principal Java Developer Evangelist
ORACLE
Copyright © 2024, Oracle and/or its affiliates

Juarez Barbosa Junior
SeniorPrincipal Java DeveloperEvangelist@ Oracle
•Speaking from Dublin, Ireland
•28 years of experience in SW Engineering & DevRel
•Oracle, Microsoft, IBM, Nokia, Unisys, Accenture, startups
•Microsoft Azure Developer Relations Lead
•IBM Watson Tech Evangelist & Cloud Rockstar
•IBM Mobile Tech Evangelist & Global Thought Leader
•Nokia Developers Global Champion
•Java, Python, Cloud, DevOps, SRE, Cloud-native, IoT, AI, Blockchain,
Rust
•Speaker at conferences
•Oracle CloudWorld, Oracle Code, Microsoft Ignite & TechX, jPrime,
JCON, DevConf.cz, GeeCon, DevOpsDays, DeveloperWeek, DevOps
Institute, CloudLand, DWX, The Developer’s Conference (TDC),
Sec4Dev, JSNation, NodeConf, Conf42, Shift Conf, Global Azure,
Open-Source Lisbon, CodeFrenzy, Mêlée Numérique, React Summit,
Test.js Summit, Porto TechHubConf, Pyjamas, MiTechCon, Data
Science Summit, OpenSourceNorth, WeAreDevelopers, Global
Software Architecture Summit, JavaForumNord, JavaCro, JUGs,
OUGs, GDGs, meetups, hackathons, and customer engagements.
@juarezjunior
@juarezjunior

Agenda
•Java App Dev with the Oracle Database
•Oracle JDBC -Support for the Latest Java Versions
•Overview of Oracle DB Access with Java
•Oracle JDBC –Sync and Async
•Classic Java Platform Threads
•Project Loom –Virtual Threads
•Virtual Threads -JEPs 425, 436, 444
•Demo # 1: Virtual vs Platform Threads
•Reactive JDBC -Synchronous vs Asynchronous JDBC
•Reactive Streams Ingestion (RSI)
•Demo # 2: Reactive Streams Ingestion (RSI)
•From Sync to Reactive JDBC: Oracle R2DBC
•Demo # 3: Oracle R2DBC with Project Reactor
•Pipelined Database Operations, Oracle AI Vector Search
•Demo #4: Structured Concurrency + Pipelined Database
Operations + AI Vector Search
•Live Labs/Free Oracle Cloud Account/Oracle ACE Program
Copyright © 2024, Oracle and/or its affiliates

Java App
Dev with
Oracle
Database
Copyright © 2024, Oracle and/or its affiliates

Overview of Oracle DB Access with Java
Copyright © 2024, Oracle and/or its affiliates

Copyright © 2024, Oracle and/or its affiliates
Oracle JDBC -Support for the Latest Java Versions
•Java 11 -native support, compiled with it
•Java 17 -certified
•JDBC Standards -4.2 and 4.3
•GraalVM-native image instrumentation
•Reactive Streams -Java Flow API support
•Project Loom -Virtual Threadssupport
•Data access is crucial in mission-critical apps

Oracle JDBC ReactiveExtensions
(Flow API)
Copyright © 2024, Oracle and/or its affiliates
SQL Execution (OraclePreparedStatement):
Publisher<Boolean> executeAsyncOracle()
Publisher<Long> executeUpdateAsyncOracle()
Publisher<Long> executeBatchAsyncOracle()
Publisher<OracleResultSet>
executeQueryAsyncOracle()
Row Fetching (OracleResultSet):
Publisher<T>
publisherOracle(Function<OracleRow, T> f)
LOB I/O (OracleBlob):
Publisher<byte[]> publisherOracle(long
position)
Subscriber<byte[]> subscriberOracle(long
position)
LOB I/O (OracleClob):
Publisher<String> publisherOracle(long
position)
Subscriber<String> subscriberOracle(long
position)
Connection Closing
(OracleConnection):
Publisher<Success>
closeAsyncOracle()
Transaction Closing
(OracleConnection):
Publisher<Success>
commitAsyncOracle()
Publisher<Success>
rollbackAsyncOracle()
Connection Creation
(OracleConnectionBuilder):
Publisher<OracleConnection>
buildConnectionPublisherOracle()
Introduction to JDBC Reactive
Extensions with the Oracle
Database 23c Free —Developer
Release

Oracle JDBC –Async and Sync
•Project Loom -Virtual Threads
•Synchronous database access with lightweight threads
•Standard JDBC + Virtual Threads
•Client application call stack may use blocking, synchronous code
•Libraries must be compatible with Virtual Threads
•Oracle instrumented the Oracle JDBC driver to support Virtual Threads
•Reactive Programming
•Asynchronous database access with non-blocking network I/O
•Oracle Reactive Streams Ingestion (RSI) + Oracle R2DBC + Oracle JDBC Reactive
Extensions (Flow)
•Reactive Streams Libraries: Reactor, RxJava, Akka, Vert.x
Copyright © 2024, Oracle and/or its affiliates

Classic Java Platform Threads
•Database access with blocking threads
•A JDBC call blocks a thread for 100s of milliseconds
(thread-per-request style)
•Thread count increases linearly with the number
of JDBC calls
•Performance degrades as the number of threads
increases
•1 MB of stack memory per thread typically
•Scheduling many platform threads is expensive
•Preemptive scheduling vs time-slicing
Copyright © 2024, Oracle and/or its affiliates

Virtual Threads -JEPs 425, 436, 444
•Virtual Threads –JEPs (JDK Enhancement Proposals)
•Lightweight (user mode) threads that dramatically reduce the effort of writing,
maintaining, and observing high-throughput concurrent applications
•Enable applications written in the simple thread-per-request style to scale with near-
optimal hardware utilization
•Enable easy troubleshooting, debugging, and profiling of virtual threads with existing JDK
tools
•Do not remove the traditional implementation of threads
•Do not alter the basic concurrency model of Java
•Enable existing code that uses Java threads (java.lang.Thread) to adopt virtual threads with
minimal change
Copyright © 2024, Oracle and/or its affiliates

Virtual Threads -JEPs 425, 436, 444
•Virtual Threads –JEPs (JDK Enhancement Proposals)
•Virtual threads typically employ a small set of platform threads used as carrier threads
•Code executing in a virtual thread is not aware of the underlying carrier thread
•The currentThread() method will always return the Threadobject for the virtual thread
•Virtual threads do not have a thread name by default. The getName()method returns the
empty string if a thread name is not set
•Virtual threads have a fixed thread prioritythat cannot be changed
•Virtual threads are daemon threads so do not prevent the shutdown sequence from
beginning (low-priority ones).
Copyright © 2024, Oracle and/or its affiliates

Virtual
Threads -
JEPs 425,
436, 444
•JDK Enhancement Proposals
•JEP 425: Virtual Threads (Preview)
•JEP 436: Virtual Threads (Second Preview)
•JEP 444: Virtual Threads
•java.lang.Thread
•final Boolean -isVirtual()
•static Thread.Builder.OfVirtual-ofVirtual()
•static Thread.Builder.OfPlatform-ofPlatform()
•static Thread -startVirtualThread(Runnable task)
•java.lang.Thread.Builder
•java.util.concurrent.Executors
•static ExecutorService-
newVirtualThreadPerTaskExecutor()
Copyright © 2024, Oracle and/or its affiliates

Virtual Threads -JEPs 425, 436, 444
•java.lang.Thread.Builder
•Thread defines a
Thread.Builder API for
creating and starting
both platform and virtual
threads. The following
are examples that use
the builder:
Copyright © 2024, Oracle and/or its affiliates

Copyright © 2022, Oracle and/or its affiliates
Demo # 1: Virtual vs Platform Threads
⚫Virtual Threads vs Platform Threads
⚫JEP 425 Virtual Threads (Preview)
⚫JEP 436 (Second Preview)
⚫JEP 444 (JDK 21)
⚫Oracle JDBC Driver instrumented to support Virtual Threads
⚫Intrinsic locks replaced with ReentrantLockto avoid thread pinning
⚫Verifiable comparison of OS/HW resources consumption
(Platform Threads versusVirtual Threads)
⚫Introduction to Oracle JDBC Driver Support for Virtual Threads

Copyright © 2024, Oracle and/or its affiliates
Reactive JDBC -Sync vs Async JDBC
Setup
Blocking
Handle Result
Setup
Non-Blocking
Handle Result
Synchronous JDBC Setup
Blocking
Handle Result
Setup
Non-Blocking
Handle Result
Setup
Non-Blocking
Handle Result
Reactive JDBC
Database
Setup
Blocking
Handle Result
Database

Copyright © 2022, Oracle and/or its affiliates
Reactive Streams Ingestion (RSI)
⚫Java Library for Reactive Streams Ingestion
⚫Streaming capability: Ingest data in an unblocking, and
reactive way from a large group of clients
⚫Group records through RAC (Real App Clusters),
and Shard affinity using native UCP (Universal Connection
Pool)
⚫Optimize CPU allocation while decoupling record
Processing from I/O
⚫Fastest insert method for the Oracle Database through
Direct Path Insert, bypassing SQL and writing directly into the DB files

Copyright © 2024, Oracle and/or its affiliates
Demo #2: RSI -Architecture

Copyright © 2024, Oracle and/or its affiliates
Demo #2: RSI -Project Structure

Copyright © 2024, Oracle and/or its affiliates
Demo # 2: Reactive Streams Ingestion (RSI)
Container
Pipelines,
Jenkins, etc.
Build
Test
Push
Push Docker
images to
Registry
Cloud
Infrastructur
e Registry
Container
Engine for
Kubernetes
Pull images
from Registry
Deploy
images to
production
Kubernetes
worker nodes
Containers
running
microservices
deployed over
Kubernetes
ORACLE CLOUD INFRASTRUCTURE
ATP, ADW, ATP-D,
AFDW-D
Memoptimized
Rowstore
RSI Runtime: Non-
blocking, optimized library
for streaming data
through Direct Path, Shard
& RAC/FAN support.
HTTP / REST Engine over
Helidon
Define build
for CI/CD
toolchain
gRPC/ AMQP / MQTT
Engines
Microservices
Files / Logs
IoT Devices / Apps
cv
MQTT
gRPC
AMQP
HTTP / REST
JDBC
Direct Path
INSERT
Record
Streaming
over
multiple
protocols.

Virtual Threads or Reactive?
•Oracle JDBC supports both!
•Want Virtual Threads?
•The Oracle JDBC driver is Virtual Thread compatible
•Pipelined Database Operations
•Want Reactive?
•Oracle R2DBC, Oracle RSI, Oracle JDBC Reactive Extensions
•Pipelined Database Operations
Copyright © 2024, Oracle and/or its affiliates

Virtual Threads or Reactive?
•Benefits of Virtual Threads:
•Easier to read and write
•Easier to debug
•Integration with JDK tools
•Do not alter the basic concurrency model of Java
•Now available in JDK 21 / 22
•Limitations of Virtual Threads:
•Some libraries/frameworks are not
compatible yet
Copyright © 2024, Oracle and/or its affiliates
Benefits of Reactive:
•Reactive Libraries (Reactor, RxJava, Akka, Vert.x)
•Stream-like API with a functional style
•Low-level concurrency is handled for you
(locks, atomics, queues)
Limitations of Reactive:
•Steep learning curve
•Harder to read and write
•Harder to debug

From Sync to Reactive JDBC: Oracle
R2DBC
•Oracle Reactive Relational Database Connectivity (R2DBC)
•Oracle R2DBC Driveris a Java library that supports reactive
programming with the Oracle Database
•It implements the R2DBC Service Provider Interface (SPI) as
specified by the Reactive Relational Database Connectivity
(R2DBC) spec
•The R2DBC SPI exposes Reactive Streams as an abstraction
for remote database operations
•The sample code uses Project Reactor. It could use RxJava,
Akka, or any Reactive Streams library
•Runs on Java 11+
•Oracle RDBC 1.2.0, Apache v2.0 (OSS)
Copyright © 2024, Oracle and/or its affiliates
R2DBC –Reactive Relational DB Connectivity

Copyright © 2024, Oracle and/or its affiliates
Demo # 3: Oracle R2DBC
static String queryJdbc(java.sql.Connectionconnection) throws
SQLException{
try (java.sql.Statementstatement = connection.createStatement()) {
ResultSetresultSet=
statement.executeQuery("SELECT * FROM CUSTOMERS" );
if (resultSet.next())
return resultSet.getString(1);
else
throw new NoSuchElementException ("Query returned zero rows" );
}
}
static Publisher<String> queryR2dbc(io.r2dbc.spi.Connection
connection) {
return Flux.from(connection.createStatement(
"SELECT * FROM CUSTOMERS" )
.execute())
.flatMap(result ->
result.map(row -> row.get(0, String.class)))
.switchIfEmpty(Flux.error(
new NoSuchElementException ("Query returned zero rows" )));
}

Copyright © 2024, Oracle and/or its affiliates
Pipelined Database Operations
Before Pipelined Database Operations
The server must complete the processing
of the request and sending the response
before accepting another request
The client and server have idle time
The Java app sends
a request
The server sends
the response
The Java app
receives
the response

Copyright © 2024, Oracle and/or its affiliates
Pipelined Database Operations
With Pipelining
No-Blocking Requests and Responses
Send multiple requests from client to server
without waiting for responses
Application can asynchronously perform
other operations
The server processes the requests in order
and sends the responses back to the client
when ready, in the same order they
were received
Benefits
Improved response time and throughput
Particularly useful when network latency is high

Copyright © 2024, Oracle and/or its affiliates
Pipelined Database Operations
Performance Improvements depending on the CPU load, the workload
profiles on the server side, and the network latency between client and server
•Up to 90% improvement for queries only workload over a fast network
•Up to 90x improvement for queries only workload over a slow network
•Up to 24% improvement for OLTP workload over a fast network
•Up to 7x improvement for OLTP workload over a slow network
•Up to 100+x improvement for DML workload
The Oracle JDBC Driver 23aisupports Pipelined Database Operations through:
•The Oracle JDBC Reactive Extensions
•Reactive Streams libraries
•Java Virtual Threads
•Structured Concurrency
•The Standard JDBC Batching API

Copyright © 2024, Oracle and/or its affiliates
JDBC Reactive Extensions and Pipelined DB Operations

Copyright © 2024, Oracle and/or its affiliates
R2DBC and Pipelined Database Operations

Copyright © 2024, Oracle and/or its affiliates
Structured Concurrency and Java Virtual Threads
Structured concurrency treats groups of related tasks running in different threads as a single unit of work,
thereby streamlining error handling and cancellation, improving reliability, and enhancing observability.
•The StructuredTaskScopeclass enables you
coordinate a group of concurrent subtasks as a unit.
With a StructuredTaskScope instance, you fork each
subtask, which runs them in their own individual
thread. After, you join them as a unit.
•As a result, the StructuredTaskScope ensures that
the subtasks are completed before the main task
continues.
•Alternatively, you can specify that the application
continues when one subtask succeeds.

New| Oracle Database 23ai
Next-Generation Converged Database Services
Over300 major new features plusthousands of enhancements
Available on OCI and Azure
Major focus on
•AI for Data features
•Developer/Analyst features
•Mission-Critical features
Addresses data management pain points that have frustrated customers forever
Copyright © 2024, Oracle and/or its affiliates

Oracle Database 23ai -AI for Data
Algorithmic AI AI Vector Search Augmented Generative AI (LLMs)
Distributed AI AI Storage AI Developer Tools
Copyright © 2024, Oracle and/or its affiliates

Oracle Database 23ai -Dev for Data
JSON Relational
Duality Views
Property Graph Views
JavaScript
Stored Procedures
Data Intent
Language
Lock-free Consistent Updates,
Long-running Transactions
True
Cache
Copyright © 2024, Oracle and/or its affiliates

Oracle Database 23ai -Mission-Critical Data
RAFT Replication for
Globally Distributed Database
In-Database
Firewall
Real-Time SQL Plan
Management
RAC, Exadata, Data Guard
Simplicity and Scalability
Analytic SQL
Simplicity and Scalability
Priority
Transactions
Copyright © 2024, Oracle and/or its affiliates

Free| Oracle Database 23ai For Developers
Oracle Always Free ADB
Available on OCI
ADB Free Container Image
Available for download
Oracle Database Free
Available as RPM, Docker
Image, VBoxVM
Copyright © 2024, Oracle and/or its affiliates | Confidential -Internal
Very Low Cost Supported DeveloperEdition of ADB, BaseDB, ExaDB-D, and ExaDB-C@C

Copyright © 2024, Oracle and/or its affiliates
LLMs, Oracle DB 23ai, Oracle AI Vector Search and
Vector Embeddings
1-Gather and pre-process
the target dataset
(Texts/Documents
Images, Audio)
3-The LLM learns patterns and
relationships within the data.
LLM
2-Feed LLM with
the pre-processed
data. 33
42
16
21
50
33
42
16
21
50
4-The LLM generates
Embedding Vectors
•GPT-4
•Cohere Command-R
•Llama 3
•Others

Copyright © 2024, Oracle and/or its affiliates
LLMs, Oracle DB 23ai, Oracle AI Vector Search and
Vector Embeddings
LLM
33
42
16
21
50
33
42
16
21
50
5-Embedding Vectors stored in Oracle database
along with the original texts.

Copyright © 2024, Oracle and/or its affiliates
Demo #4: Structured Concurrency + Pipelined Database
Operations + Oracle AI Vector Search
Combining Structured Concurrency + Pipelined Database Operations +
Oracle AI Vector Search
•Create a database table
•Populate the database
•Update the database table with vector embeddings
•Perform vector similarity searches

Copyright © 2024, Oracle and/or its affiliates
Demo #4: Structured Concurrency + Pipelined Database
Operations + Oracle AI Vector Search
Text +
Vector
Embeddings
2-Update the Text table with
corresponding Vector
Embedding fromOracle
Cloud’s
Generative AI service
1-Stream text from a
book about animals
into a table
3-Perform similarities
Searches for search
terms
Demo code @ https://tinyurl.com/nbjmczjr
Oracle Database 23ai FREE

Copyright © 2024, Oracle and/or its affiliates
Demo #4: Structured Concurrency + Pipelined Database
Operations + AI Vector Search
Create and populate the DB table
loadTable(Connection): loads a database table with text data.
Stream Text from book URL
Concurrent batch inserts with StructuredTaskScope
Pipeline batch insert (id, Text) into the table

Copyright © 2024, Oracle and/or its affiliates
Demo #4: Structured Concurrency + Pipelined Database
Operations + AI Vector Search
Update the Table with Embeddings
updateTable(Connection): store vector Embeddings for the text data.
Pipeline row fetches ( id, Text) from table where Embeddings IS NULL
Request Embeddings from Oracle Cloud's Generative AI service
Pipeline batch update with Embeddings

Copyright © 2024, Oracle and/or its affiliates
Demo #4: Structured Concurrency + Pipelined Database
Operations + AI Vector Search
Perform Similarities Search 1/2
Perform vector similarity searches
with the following prompts:
•Predatory behavior of cats
•Location of bears
•Best climate for dogs
•Animals in ancient times
•Beautiful birds
•Deadly fish
•Where the wild things are

Copyright © 2024, Oracle and/or its affiliates
Demo #4: Structured Concurrency + Pipelined Database
Operations + AI Vector Search
Perform Similarities Search 2/2
searchTableText(Connection, List):
Perform a similarity search against vector embeddings
Convert Search Terms to Vector Embeddings
Pipeline queries with VECTOR_DISTANCE on Virtual Threads as it allows for concurrent progress of
MANY statements from one (1) JDBC connection
SELECT text
FROM example
ORDER BY VECTOR_DISTANCE(embedding, ?, COSINE)
FETCH APPROX FIRST 1 ROW ONLY
Return Similarity Search Results

Copyright © 2024, Oracle and/or its affiliates
Demo #4: Structured Concurrency + Pipelined Database
Operations + AI Vector Search
Perform Similarities Search 2/2

Copyright © 2024, Oracle and/or its affiliates
Technical References
•JDK 22
•The Arrival of JDK 22 -https://blogs.oracle.com/java/post/the-arrival-of-java-22
•Virtual Threads
•JEP 444 Virtual Threads -https://openjdk.org/jeps/444
•JEP 480 Structured Concurrency (Third Preview) -https://openjdk.org/jeps/480
•Introduction to Oracle JDBC Driver Support for Virtual Threads -https://bit.ly/3UlNJWP
•Reactive Streams Ingestion Library
•Getting Started with the Java library for Reactive Streams Ingestion (RSI) -https://bit.ly/3rEiRnC
•A High-Speed Data Ingestion Solution in Java Using MQTT, AMQP, and STOMP -
https://medium.com/oracledevs/a-high-speed-data-ingestion-microservice-in-java-using-mqtt-
amqp-and-stomp-135724223ae1
•R2DBC
•Getting Started with Reactive Relational Database Connectivity and the Oracle R2DBC Driver –
http://rb.gy/95u5f8
•Pipelined Database Operations -https://rb.gy/k57g3r
•Generative AI Service -Oracle Cloud Infrastructure (OCI) –https://rb.gy/a9k69e
•What’s in Oracle Database 23ai for Java Developers?
•https://www.oracle.com/a/tech/docs/database/whats-in-oracledb23ai-for-java-developers.pdf

Oracle LiveLabs
Showcasing how Oracle’s solutions can
solve yourbusiness problems
500+
free workshops,
available or in
development
3.5 million
people have already visited
LiveLabs
developer.oracle.com/livelabs
learn something new …at your pace!
600+
events run
using LiveLabs
workshops

3 membership tiers
Connect: @oracleace facebook.com/[email protected]
500+ technical experts &
community leaders helping peers globally
The Oracle ACE Program recognizes & rewards individuals for
their technical & community contributions to the Oracle community
Nominate
yourself or a candidate:
ace.oracle.com/nominate
Learn more-ace.oracle.com
blogs.oracle.com/ace

Create your FREE
Cloud Account
•Go to
https://signup.cloud.oracle.com/
Copyright © 2024, Oracle and/or its affiliates

Juarez Junior
@juarezjunior

AI Industrial Summit 2024
08
NEXT CONFERENCE
Data Saturday Sofia 2024
05 | October | 2024
Free Ticket
Labs building, Sofia Tech Park