Receiving Events and Persisting Them in Redis Store
Purpose:¶
This application demonstrates how to perform CRUD operations using Siddhi queries in Redis stores. The sample depicts a scenario in a sweet production factory. The sweet production details such as name of the raw material, amount used for production can be stored using insertSweetProductionStream
. The following streams can be used to search, delete, update or upsert(update or insert) the existing data in the store.
- Search -
searchSweetProductionStream
- insert -
insertSweetProductionStream
- delete -
deleteSweetProductionStream
- update -
updateSweetProductionStream
- update or insert -
updateOrInsertSweetProductionStream
- contains -
containsSweetProductionStream
(verifies whether all the attributes that enter in the stream exists in the store).
Prerequisites:¶
- Download the Redis from https://redis.io/.
- Download redis java client 'Jedis' jar (>2.7.0) from https://mvnrepository.com/artifact/redis.clients/jedis and place in
<SI_HOME>/lib
folder. - In
redis.conf
, provide the requirepass as root. - Start redis by redis-server
.
Executing the Sample:¶
- Start the Siddhi application by clicking on 'Run'.
- If the Siddhi application starts successfully, the following messages would be shown on the console.
* Store-redis.siddhi - Started Successfully!
Testing the Sample:¶
- Simulate single events. For this, click on 'Event Simulator' (double arrows on left tab) -> 'Single Simulation' -> Select 'Store-redis' as 'Siddhi App Name' -> Select 'searchSweetProductionStream' as 'Stream Name' -> Provide attribute values -> Send.
- Send at-least one event with the single event simulator, where the name matches a name value in the data we previously inserted to the SweetProductionTable. This would satisfy the 'on' condition of our join query.
- Likewise the events can be sent to the other corresponding streams to add, delete, update, insert, search events.
- After a change in the store, using the search stream the developer can see whether the operation is successful.
- Primary Key constraint SweetProductionTable is disabled, since name cannot be used as a PrimaryKey in ProductionTable. Siddhi functions can be used to create a unique id for the received events which can then be used to apply Primary Key constraint on the data store records. (http://wso2.github.io/siddhi/documentation/siddhi-4.0/#function)
Viewing the Results:¶
See the output for row materials on the console. Inserted, deleted, updated events can be checked by searchSweetProductionStream.
@App:name("Store-redis")
@App:description("Receive events via simulator and received data are persisted in store.")
define stream insertSweetProductionStream (symbol string, price float, volume long);
define stream deleteSweetProductionStream (symbol string, price float, volume long);
define stream searchSweetProductionStream(symbol string);
define stream updateOrInsertSweetProductionStream(symbol string, price float, volume long);
define stream updateSweetProductionStream (symbol string, price float, volume long);
define stream containsSweetProductionStream (symbol string, price float, volume long);
@Store(type='redis', table.name='SweetProductionTable', host= 'localhost', port='6379', password="root")
@primaryKey('symbol')
@index('price')
define table SweetProductionTable (symbol string, price float, volume long);
@sink(type='log')
define stream OutStream(symbol string, price float, volume long);
@info(name='query1')
from insertSweetProductionStream
insert into SweetProductionTable;
@info(name = 'query2')
from updateSweetProductionStream
update SweetProductionTable
set SweetProductionTable.price = price
on SweetProductionTable.symbol == symbol;
@info(name = 'query3')
from deleteSweetProductionStream
delete SweetProductionTable
on SweetProductionTable.symbol==symbol ;
@info(name = 'query4')
from updateOrInsertSweetProductionStream#window.timeBatch(1 sec)
update or insert into SweetProductionTable
on SweetProductionTable.symbol==symbol;
@info(name = 'query5')
from containsSweetProductionStream[ (symbol==SweetProductionTable.symbol) in SweetProductionTable]
insert into OutStream;
@info(name = 'query6')
from searchSweetProductionStream#window.length(1) join SweetProductionTable on searchSweetProductionStream.symbol==SweetProductionTable.symbol
select searchSweetProductionStream.symbol as symbol, SweetProductionTable.price as price,
SweetProductionTable.volume as volume
insert into OutStream;
Top