Skip to content

Receiving Events and Persisting Them in Redis Store

Purpose:

This application demonstrates how to perform CRUD operations using Siddhi queries in Redis stores. The sample depicts a scenario in a sweet production factory. The sweet production details such as name of the raw material, amount used for production can be stored using insertSweetProductionStream. The following streams can be used to search, delete, update or upsert(update or insert) the existing data in the store.

  • Search - searchSweetProductionStream
  • insert - insertSweetProductionStream
  • delete - deleteSweetProductionStream
  • update - updateSweetProductionStream
  • update or insert - updateOrInsertSweetProductionStream
  • contains - containsSweetProductionStream (verifies whether all the attributes that enter in the stream exists in the store).

Prerequisites:

  1. Download the Redis from https://redis.io/.
  2. Download redis java client 'Jedis' jar (>2.7.0) from https://mvnrepository.com/artifact/redis.clients/jedis and place in <SI_HOME>/lib folder.
  3. In redis.conf, provide the requirepass as root.
  4. Start redis by redis-server .

Executing the Sample:

  1. Start the Siddhi application by clicking on 'Run'.
  2. If the Siddhi application starts successfully, the following messages would be shown on the console.
    * Store-redis.siddhi - Started Successfully!

Testing the Sample:

  1. Simulate single events. For this, click on 'Event Simulator' (double arrows on left tab) -> 'Single Simulation' -> Select 'Store-redis' as 'Siddhi App Name' -> Select 'searchSweetProductionStream' as 'Stream Name' -> Provide attribute values -> Send.
  2. Send at-least one event with the single event simulator, where the name matches a name value in the data we previously inserted to the SweetProductionTable. This would satisfy the 'on' condition of our join query.
  3. Likewise the events can be sent to the other corresponding streams to add, delete, update, insert, search events.
  4. After a change in the store, using the search stream the developer can see whether the operation is successful.
  5. Primary Key constraint SweetProductionTable is disabled, since name cannot be used as a PrimaryKey in ProductionTable. Siddhi functions can be used to create a unique id for the received events which can then be used to apply Primary Key constraint on the data store records. (http://wso2.github.io/siddhi/documentation/siddhi-4.0/#function)

Viewing the Results:

See the output for row materials on the console. Inserted, deleted, updated events can be checked by searchSweetProductionStream.

@App:name("Store-redis")
@App:description("Receive events via simulator and received data are persisted in store.")


define stream insertSweetProductionStream (symbol string, price float, volume long);
define stream deleteSweetProductionStream (symbol string, price float, volume long);
define stream searchSweetProductionStream(symbol string);
define stream updateOrInsertSweetProductionStream(symbol string, price float, volume long);
define stream updateSweetProductionStream (symbol string, price float, volume long);
define stream containsSweetProductionStream (symbol string, price float, volume long);

@Store(type='redis', table.name='SweetProductionTable', host= 'localhost', port='6379', password="root")
@primaryKey('symbol')
@index('price')
define table SweetProductionTable (symbol string, price float, volume long);

@sink(type='log')
define stream OutStream(symbol string, price float, volume long);

@info(name='query1')
from insertSweetProductionStream
insert into SweetProductionTable;

@info(name = 'query2')
from updateSweetProductionStream
update SweetProductionTable
set SweetProductionTable.price = price
on SweetProductionTable.symbol == symbol;

@info(name = 'query3')
from deleteSweetProductionStream
delete SweetProductionTable
on SweetProductionTable.symbol==symbol ;

@info(name = 'query4')
from updateOrInsertSweetProductionStream#window.timeBatch(1 sec)
update or insert into SweetProductionTable
on SweetProductionTable.symbol==symbol;

@info(name = 'query5')
from containsSweetProductionStream[ (symbol==SweetProductionTable.symbol) in SweetProductionTable]
insert into OutStream;

@info(name = 'query6')
from searchSweetProductionStream#window.length(1) join SweetProductionTable on searchSweetProductionStream.symbol==SweetProductionTable.symbol
select searchSweetProductionStream.symbol as symbol, SweetProductionTable.price as price,
SweetProductionTable.volume as volume
insert into OutStream;
Top