Storage Integration¶
Introduction¶
This section explains how to integrate data stores into Streaming Integration flows. The Streaming integrator can consume data from stores in a streaming manner as well as publish data into them. You can perform CRUD operations and Change Data Capture (CDC).
Configuring stores¶
You can store data in-memory or in a physical data store. In a production environment, it is recommended to store data in a physical store because data stored in-memory can be lost if a system failure occurs.
If you want to share a database across multiple Siddhi applications, you must define a data store in the <SI_HOME>/conf/server/deployment.yaml
file. If you want a single connection pool, you can define a store as a ref in the ref
section of the <SI_HOME>/conf/server/deployment.yaml
file. If you want to define a unique data store for your Siddhi application, you can define it inline.
To understand all three methods consider an example where a factory manager wants to store the purchase records of raw material supplies to be stored in order to able to check the availability of materials at any given time.
Storing data in existing data sources¶
To understand how to store data in existing data sources, follow the procedure below:
Info
You need to create a database and a table, and then connect it to the Streaming Integrator via a data source. For this example, you can create a database named FactoryMaterialDB
as follows:
-
Download and install MySQL Server.
-
Download the MySQL JDBC driver.
-
Unzip the downloaded MySQL driver zipped archive, and copy the MySQL JDBC driver JAR (mysql-connector-java-x.x.xx-bin.jar) into the
/lib directory. -
Enter the following command in a terminal/command window, where username is the username you want to use to access the databases.
mysql -u username -p
-
When prompted, specify the password you are using to access the databases with the username you specified.
-
Add the following configuration under the
Data Sources Configuration
section of the<SI_HOME>/conf/server/deployment.yaml
file.Info
You need to change the values for the username and password parameters to the username and password that you are using to access the MySQL database.
- name: FactoryMaterialDB description: Datasource used for Factory Supply Records jndiConfig: name: jdbc/FactoryMaterialDB useJndiReference: true definition: type: RDBMS configuration: jdbcUrl: 'jdbc:mysql://localhost:3306/FactoryMaterialDB' username: root password: root driverClassName: com.mysql.jdbc.Driver maxPoolSize: 50 idleTimeout: 60000 connectionTestQuery: SELECT 1 validationTimeout: 30000 isAutoCommit: false
-
To create a database table named
FactoryMaterialDB
, issue the following commands from the terminal.mysql> create database FactoryMaterialDB;
mysql> use FactoryMaterialDB;
mysql> source <SI_HOME>/wso2/editor/dbscripts/metrics/mysql.sql;
mysql> grant all on FactoryMaterialDB.* TO username@localhost identified by "password";
-
Start creating a new Siddhi application. You can name it
ShipmentHistoryApp
For instructions, see Creating a Siddhi Application.@App:name('ShipmentHistory');
-
First, define an input stream to capture the data that you need to store as follows.
@source(type = 'http', @map(type = 'json')) define stream RawMaterialStream(transRef long, material string, supplier string, amount double);
Here, each event representing a material shipment transaction is received via the
http
transport and in JSON format. This is specified via the source annotation. For more information about consuming data via sources, see the Consuming Data guide. -
Now let's define the table in which you are storing the data as follows.
define table ShipmentDetails(transRef long, material string, supplier string, amount double);
-
To save the data in the required database, connect the table you defined to the datasource that you previously created.
@Store(type='rdbms', datasource='FactoryMaterialDB')@PrimaryKey("transRef") define table ShipmentDetails(transRef long, material string, supplier string, amount double);
-
To store the information arriving at the
RawMaterialStream
stream to theShipmentDetails
table, write a Siddhi query as follows:-
To specify that events to be stored are taken from the
RawMaterialStream
input stream, add thefrom
clause as follows.from RawMaterialStream
-
To derive the exact attribute names with which the records are stored in the table, add the
select
statement as follows.select transRef, material, supplier
-
If a record from the stream already exists in a the
ShipmentDetails
table, it should be updated with the latest information from the stream. If it does not already exist, it must be inserted as a new record. To specify this, add anupdate and insert
clause as follows.update or insert into ShipmentDetails on ShipmentDetails.transRef == transRef;
Each record is identified by the transaction reference which is the primary key. Therefore, the condition
on ShipmentDetails.transRef == transRef
means that if a record with the same value for thetransRef
attribute exists in the table, the record from the stream updates it. If not, the record from the stream is inserted as a new record. -
Referring to externally defined stores¶
To understand how to refer to am externally defined store, follow the procedure below:
Before you begin:
You need to create a database and a table, and then define an external store referring to it as follows.
-
Download and install MySQL Server.
-
Download the MySQL JDBC driver.
-
Unzip the downloaded MySQL driver zipped archive, and copy the MySQL JDBC driver JAR (mysql-connector-java-x.x.xx-bin.jar) into the
<SI_HOME>/lib
directory. -
Enter the following command in a terminal/command window, where username is the username you want to use to access the databases.
mysql -u username -p
-
When prompted, specify the password you are using to access the databases with the username you specified.
-
In the
<SI_HOME>/conf/server/deployment.yaml
file, add a subsection for refs and connect to the database you created as shown below.siddhi: refs: - ref: name: 'FactoryMaterial' type: 'rdbms' properties: jdbc.url: ''jdbc:mysql://localhost:3306/FactoryMaterialDB'' username: '<YOUR_USERNAME>' password: '<YOUR_PASSWORD>' field.length='currentTime:100' jdbc.driver.name: 'com.mysql.jdbc.Driver'
-
To create a database table named
FactoryMaterialDB
, issue the following commands from the terminal.mysql> create database FactoryMaterialDB;
mysql> use FactoryMaterialDB;
mysql> source <SI_HOME>/wso2/editor/dbscripts/metrics/mysql.sql;
mysql> grant all on FactoryMaterialDB.* TO username@localhost identified by "password";
-
Start creating a new Siddhi application. You can name it
ShipmentHistoryApp
For instructions, see Creating a Siddhi Application.@App:name('ShipmentHistory');
-
First, define an input stream to capture the data that you need to store as follows.
@source(type = 'http', @map(type = 'json')) define stream RawMaterialStream(transRef long, material string, supplier string, amount double);
Here, each event representing a material shipment transaction is received via the
http
transport and in JSON format. This is specified via the source annotation. For more information about consuming data via sources, see the Consuming Data guide. -
Now let's define the table in which you are storing the data as follows.
define table ShipmentDetails(transRef long, material string, supplier string, amount double);
-
To save the data in the required database, connect the table you defined to the ref that you previously created.
@Store(ref='FactoryMaterial') @PrimaryKey('transRef') define table ShipmentDetails(transRef long, material string, supplier string, amount double);
Here, you are referring to theFactoryMaterial
external store you configured and specifying thetransRef
attribute as the primary key. -
To store the information arriving at the
RawMaterialStream
stream to theShipmentDetails
table, write a Siddhi query as follows:-
To specify that events to be stored are taken from the
RawMaterialStream
input stream, add thefrom
clause as follows.from RawMaterialStream
-
To derive the exact attribute names with which the records are stored in the table, add the
select
statement as follows.select transRef, material, supplier
-
If a record from the stream already exists in a the
ShipmentDetails
table, it should be updated with the latest information from the stream. If it does not already exist, it must be inserted as a new record. To specify this, add anupdate and insert
clause as follows.update or insert into ShipmentDetails on ShipmentDetails.transRef == transRef;
Each record is identified by the transaction reference which is the primary key. Therefore, the condition
on ShipmentDetails.transRef == transRef
means that if a record with the same value for thetransRef
attribute exists in the table, the record from the stream updates it. If not, the record from the stream is inserted as a new record. -
Configuring data stores inline¶
To understand how to define a store inline, follow the procedure below:
Before you begin
You need to create a database and a table, and then define an external store referring to it as follows.
-
Download and install MySQL Server.
-
Download the MySQL JDBC driver.
-
Unzip the downloaded MySQL driver zipped archive, and copy the MySQL JDBC driver JAR (mysql-connector-java-x.x.xx-bin.jar) into the
/lib directory. -
Enter the following command in a terminal/command window, where username is the username you want to use to access the databases.
mysql -u username -p
-
When prompted, specify the password you are using to access the databases with the username you specified.
-
To create a database table named
FactoryMaterialDB
, issue the following commands from the terminal.mysql> create database FactoryMaterialDB;
mysql> use FactoryMaterialDB;
mysql> source <SI_HOME>/wso2/editor/dbscripts/metrics/mysql.sql;
mysql> grant all on FactoryMaterialDB.* TO username@localhost identified by "password";
-
Start creating a new Siddhi application. You can name it
ShipmentHistoryApp
For instructions, see Creating a Siddhi Application.@App:name('ShipmentHistory');
-
First, define an input stream to capture the data that you need to store as follows.
@source(type = 'http', @map(type = 'json')) define stream RawMaterialStream(transRef long, material string, supplier string, amount double);
Here, each event representing a material shipment transaction is received via the
http
transport and in JSON format. This is specified via the source annotation. For more information about consuming data via sources, see the Consuming Data guide. -
Now let's define the table in which you are storing the data as follows.
define table ShipmentDetails(transRef long, material string, supplier string, amount double);
-
To save the data in the required database, connect an inline store definition to the table definition via the
@store
annotation as follows@primaryKey('transRef') @index('supplier') @store(type='rdbms', jdbc.url="jdbc:mysql://localhost:3306/FactoryMaterialDB, username="<YOUR_USERNAME>", password="<YOUR_PASSWORD>" , jdbc.driver.name="com.mysql.jdbc.Driver") define table ShipmentDetails(transRef long, material string, supplier string, amount double);
Here, thejdbc.url
parameter connects theShipmentDetails
table to theFactoryMaterialDB
database that you previously created. The value for thetransRef
parameter is unique for each purchase event. Therefore, it is specified as the primary key for the table. The records are indexed by the supplier. -
To store the information arriving at the
RawMaterialStream
stream to theShipmentDetails
table, write a Siddhi query as follows:-
To specify that events to be stored are taken from the
RawMaterialStream
input stream, add thefrom
clause as follows.from RawMaterialStream
-
To derive the exact attribute names with which the records are stored in the table, add the
select
statement as follows.select transRef, material, supplier
-
If a record from the stream already exists in a the
ShipmentDetails
table, it should be updated with the latest information from the stream. If it does not already exist, it must be inserted as a new record. To specify this, add anupdate and insert
clause as follows.update or insert into ShipmentDetails on ShipmentDetails.transRef == transRef;
Each record is identified by the transaction reference which is the primary key. Therefore, the condition
on ShipmentDetails.transRef == transRef
means that if a record with the same value for thetransRef
attribute exists in the table, the record from the stream updates it. If not, the record from the stream is inserted as a new record. -
Performing CRUD operations¶
Performing CRUD operations via streams¶
If you tried out the example in the previous sections of this guide, you have already performed an update or insert operation
via streams when you updated or inserted a record relating to the purchase records into the ShipmentDetails
table. In
this section, you can try more CRUD operations via streams as follows:
- If you want to insert all the records from the
RawMaterialStream
stream to theShipmentDetails
table, you can write the Siddhi query as follows.from RawMaterialStream select * insert into ShipmentDetails
-
If you want to search for all the records by a specific supplier, you can write the Siddhi query as follows.
from RawMaterialStream on supplier == 'Lakspur landing' in ShipmentDetails select * insert into SearchResultsStream
Here, you search for records by entering the search condition in the
from
clause and directing the filtered results into an output stream namedSearchResultsStream
. -
If you want to delete all the records for a specific material, you can write the Siddhi query as follows:
from DeleteMaterialStream on material == 'honey' in ShipmentDetails select * delete ShipmentDeails on ShipmentDetails.material == matarial;
Performing CRUD operations via REST API¶
This section explains how to use REST API to perform the same CRUD operations that you previously performed via streams.
Before you begin:
The Siddhi store query endpoint can be configured as follows:
- In the siddhi.stores.query.api: section of the
<SI_HOME>/conf/server/deployment.yaml
file, configure the following properties. The following is a sample configuration with default values.siddhi.stores.query.api: transportProperties: - name: "server.bootstrap.socket.timeout" value: 60 - name: "client.bootstrap.socket.timeout" value: 60 - name: "latency.metrics.enabled" value: true listenerConfigurations: - id: "default" host: "0.0.0.0" port: 7070 - id: "msf4j-https" host: "0.0.0.0" port: 7443 scheme: https keyStoreFile: "${carbon.home}/resources/security/wso2carbon.jks" keyStorePassword: wso2carbon certPass: wso2carbon
- transport properties
server.bootstrap.socket.timeout
: The number of seconds after which the connection socket of the bootstrap server times out.client.bootstrap.socket.timeout
: The number of seconds after which the connection socket of the bootstrap server times out.latency.metrics.enabled
: If this is set totrue
, the latency metrics are enabled and logged for the HTTP transport.- listenerConfigurations: Multiple listeners can be configured as shown in the above sample.
id
: A unique ID for the listener.host
: The host of the listener.port
: The port of the listener.scheme
: This specifies whether the transport scheme is HTTP or HTTPS.keyStoreFile
: If the transport scheme is HTTPS, this parameter specifies the path to the key store file.keyStorePassword
: If the transport scheme is HTTPS, this parameter specifies the key store password.
- In the
siddhi.stores.query.api:
section of the<SI_HOME>/conf/server/deployment.yaml
file, the following properties are configured by default:
The same parameter descriptions provided for thesiddhi.stores.query.api: transportProperties: - name: "server.bootstrap.socket.timeout" value: 60 - name: "client.bootstrap.socket.timeout" value: 60 - name: "latency.metrics.enabled" value: true listenerConfigurations: - id: "default" host: "0.0.0.0" port: 7370
<SI_HOME>/conf/server/deployment.yaml
file apply to this configuration.
Inserting records¶
This allows you to insert a new record to the table with the attribute values you define in the select
section.
Syntax
select <attribute name>, <attribute name>, ...
insert into <table>;
Sample CURL command
The following CURL command submits a query that inserts a new record with the specified attribute values to a table named RoomOccupancyTable
.
-
For Streaming Integrator server:
curl -X POST https://localhost:7443/stores/query -H "content-type: application/json" -u "admin:admin" -d '{"appName" : "RoomService", "query" : "select 10 as roomNo, 2 as people insert into RoomOccupancyTable;" }' -k
-
For Streaming Integrator Tooling:
curl -X POST http://localhost:7370/stores/query -H "content-type: application/json" -u "admin:admin" -d '{"appName" : "RoomService", "query" : "select 10 as roomNo, 2 as people insert into RoomOccupancyTable;" }' -k
Retrieving records¶
This is the store query to retrieve records from a table or a window.
Retrieving records from tables and windows¶
Syntax
from <table/window>
<on condition>?
select <attribute name>, <attribute name>, ...
<group by>?
<having>?
<order by>?
<limit>?
Sample CURL command
The following CURL command submits a query that retrieves room numbers and types of the rooms starting from room no 10
, from a table named roomTypeTable
.
The roomTypeTable table
must be defined in the RoomService
Siddhi application.
-
For Streaming Integrator server:
curl -X POST https://localhost:7443/stores/query -H "content-type: application/json" -u "admin:admin" -d '{"appName" : "RoomService", "query" : "from roomTypeTable on roomNo >= 10 select roomNo, type; " }' -k
-
For Streaming Integrator Tooling:
curl -X POST http://localhost:7370/stores/query -H "content-type: application/json" -u "admin:admin" -d '{"appName" : "RoomService", "query" : "from roomTypeTable on roomNo >= 10 select roomNo, type; " }' -k
Sample response
The following is a sample response to the sample CURL command given above.
{"records":[
[10,"single"],
[11, "triple"],
[12, "double"]
]}
Retrieving records from aggregations¶
This is the store query to retrieve records from an aggregation.
Syntax
from <aggregation>
<on condition>?
within <time range>
per <time granularity>
select <attribute name>, <attribute name>, ...
<groupby>?
<having>?
<order by>?
<limit>?
Sample CURL command
The following CURL command submits a query that retrieves average price of a stock.
- For Streaming Integrator server:
curl -X POST https://localhost:7443/stores/query -H "content-type: application/json" -u "admin:admin"
-d '{"appName" : "StockAggregationAnalysis", "query" : "from TradeAggregation on symbol=='FB' within '2018-**-** +05:00' per 'hours' select AGG_TIMESTAMP, symbol, total, avgPrice" }' -k
- For Streaming Integrator Tooling:
curl -X POST http://localhost:7370/stores/query -H "content-type: application/json" -u "admin:admin"
-d '{"appName" : "StockAggregationAnalysis", "query" : "from TradeAggregation on symbol=='FB' within '2018-**-** +05:00' per 'hours' select AGG_TIMESTAMP, symbol, total, avgPrice" }' -k
Sample response
The following is a sample response to the sample CURL command given above.
{"records":[
[1531180800, 'FB', 10000.0, 250.0],
[1531184400, 'FB', 11000.0, 260.0],
[1531188000, 'FB', 9000.0, 240.0]
]}
Updating records¶
This store query updates selected attributes stored in a specific table based on a given condition.
Syntax
select <attribute name>, <attribute name>, ...?
update <table>
set <table>.<attribute name> = (<attribute name>|<expression>)?, <table>.<attribute name> = (<attribute name>|<expression>)?, ...
on <condition>
Sample CURL command
The following CURL command updates the room occupancy for selected records in the table named RoomOccupancyTable
. The records that are updated are ones of which the roon number is greater than 10
. The room occupancy is updated by adding 1
to the existing value of the people
attribute.
- For Streaming Integrator server:
curl -X POST https://localhost:7443/stores/query -H "content-type: application/json" -u "admin:admin" -d '{"appName" : "RoomService", "query" : "select 10 as roomNumber, 1 as arrival update RoomTypeTable set RoomTypeTable.people = RoomTypeTable.people + arrival on RoomTypeTable.roomNo == roomNumber;" }' -k
- For Streaming Integrator Tooling:
curl -X POST http://localhost:7370/stores/query -H "content-type: application/json" -u "admin:admin" -d '{"appName" : "RoomService", "query" : "select 10 as roomNumber, 1 as arrival update RoomTypeTable set RoomTypeTable.people = RoomTypeTable.people + arrival on RoomTypeTable.roomNo == roomNumber;" }' -k
Deleting records¶
This store query deletes selected records from a specified table.
Syntax
<select>?
delete <table>
on <conditional expresssion>
Sample CURL command
The following CURL command submits a query that deletes a record in the table named RoomTypeTable
if it has value for the roomNo
attribute that matches the value for the roomNumber
attribute of the selection that has 10
as the actual value.
- For Streaming Integrator server:
curl -X POST https://localhost:7443/stores/query -H "content-type: application/json" -u "admin:admin"
-d '{"appName" : "RoomService", "query" : "select 10 as roomNumber
delete RoomTypeTable on RoomTypeTable.roomNo == roomNumber;;" }' -k
- For Streaming Integrator Tooling:
curl -X POST http://localhost:7370/stores/query -H "content-type: application/json" -u "admin:admin"
-d '{"appName" : "RoomService", "query" : "select 10 as roomNumber
delete RoomTypeTable on RoomTypeTable.roomNo == roomNumber;;" }' -k
Inserting/updating records¶
Syntax
select <attribute name>, <attribute name>, ...
update or insert into <table>
set <table>.<attribute name> = <expression>, <table>.<attribute name> = <expression>, ...
on <condition>
Sample CURL command
The following CURL command submits a query that attempts to update selected records in the RoomAssigneeTable
table. The records that are selected to be updated are ones with room numbers that match the numbers specified in the select
clause. If matching records are not found, it inserts a new record with the values provided in the select
clause.
- For Streaming Integrator server:
curl -X POST https://localhost:7443/stores/query -H "content-type: application/json" -u "admin:admin" -d '{"appName" : "RoomService", "query" : "select 10 as roomNo, "single" as type, "abc" as assignee update or insert into RoomAssigneeTable set RoomAssigneeTable.assignee = assignee on RoomAssigneeTable.roomNo == roomNo;" }' -k
- For Streaming Integrator Tooling:
curl -X POST http://localhost:7370/stores/query -H "content-type: application/json" -u "admin:admin" -d '{"appName" : "RoomService", "query" : "select 10 as roomNo, "single" as type, "abc" as assignee update or insert into RoomAssigneeTable set RoomAssigneeTable.assignee = assignee on RoomAssigneeTable.roomNo == roomNo;" }' -k
For more information, see Siddhi Query Guide - Store Query.
Manupulating data in multiple tables¶
The Streaming Integrator allows you to perform CRUD operations (i.e., inserting updating and deleting data) and retrieval queries for multiple normalized tables within a single data store. This is supported via the siddhi-store-rdbms extension.
Performing CRUD operations for multiple tables
In order to perform CRUD operations, the system parameter named perform.CRUD.operations
needs to be set to true
in the deployment.yaml
file.
The syntax for a Siddhi query to perform a CRUD operation in multiple tables is as follows.
from TriggerStream#rdbms:cud(<STRING> datasource.name, <STRING> query)
select numRecords
insert into OutputStream;
e.g., If you need to change the details of a customer in customer details table connected to a data source named, SAMPLE_DB
a Siddhi query can be written as follows.
from Trigger Stream#rdbms:cud('SAMPLE_DB', 'UPDATE Customers ON CUSTOMERS SET ContactName='Alfred Schmidt', City='Frankfurt' WHERE CustomerID=1;')
select numRecords
insert into OutputStream
Retrieving data from multiple tables
In order to retrieve information from multiple tables in a data store, the syntax is as follows:
from TriggerStream#rdbms:query(<STRING> dataource.name, <STRING> query, <STRING> stream definition)
select <attributes>
insert into OutputStream
e.g., If you need to find matching records in both customer and orders table based on orderId
and customerId
in the SAMPLE_DB
database, a Siddhi query can be written as follows.
from TriggerStream#rdbms:query('SAMPLE_DB', 'SELECT Orders.OrderID, Customers.CustomerName, Orders.OrderDate FROM Orders INNER JOIN Customers ON Orders.CustomerID=Customers.CustomerID', 'orderId string, customerName string, orderDate string')
select orderId, customerName, orderData
insert into OutputStream;
Top