Correlating Data¶
The streaming integrator can correlate data in order to detect patterns and trends in streaming data. Correlating can be done via patterns as well as sequences.
The difference between patterns and sequence is that sequences require all the matching events to arrive consecutively to match the sequence condition, whereas patterns identify events that match the pattern condition irrespective of the order in which they arrive.
Correlating events to find a pattern¶
This section explains how you can use Siddhi patterns to detect trends and patterns. There are two types of Siddhi patterns as follows:
- Counting Patterns: These count the number of instances that match the given pattern condition.
- Logical Patterns: These identify logical relationships between events.
Count and match multiple events for a given pattern condition¶
To understand how to count and match multiple events that match a specific condition, consider the example where a store wants to check the frequency with which a specific product needs to be repaired within two months after it is purchased. If the buyer brings back a specific product for repairs more than five times within two months, the manager of purchases needs to be notified via a mail. To do this, create a Siddhi application as follows.
-
Start creating a new Siddhi application. You can name it
DefectDetectionApp
For instructions, see Creating a Siddhi Application.@App:name("DefectDetectionApp")
-
Define the input streams into which the events compared are received.
- To capture information about purchases, define a stream as follows.
define stream PurchasesStream (productName string, custID string);
- To capture information about repairs, define a stream as follows.
define stream RepairsStream (productName string, custID string);
- To capture information about purchases, define a stream as follows.
-
To notify the purchase manager that the threshold is reached define an output sink with an email sink attached as follows:
@sink(type='email', address='[email protected]', username='storemanager', password='secret_password', subject='Defective Product Alert', to='[email protected]', @map(type = 'text', @payload("Hello,The product {{productName}} is identified as defective.\n\nThis message was generated automatically."))) define stream DefectiveProductsStream (productName string);
-
To count occurrences where a product is brought back for repairs withing two months following its purchase, and identify products where the treshold for such occurrences is reached, create a query as follows.
-
To specify the input streams from which the input events to be analyzed for pattern detection are taken, add a
from
clause as follows.from every (e1=PurchasesStream) -> e2=RepairsStream[e1.productName==e2.productName and e1.custID==e2.custID]<5:> within 2 months
Info
Note the following about the above
from
clause: - The input is derived from two streams. Therefore, first, both streams considered are specified and a unique reference is assigned to each stream. ThePurchasesStream
is referred to ase1
and theRepairsStream
is referred to ase2
. - The matching condition to be met is that both streams should have an event where the values for bothproductName
andcustID
attributes are the same. - The event in thePurchasesStream
stream need to arrive before the matching event in theRepairsStream
stream. - The matching event in theRepairsStream
stream should arrive within two months after the arrival of the event in thePurchasesStream
stream. -<5:>
indicates that an output is generated only when the matching condition is met five times. - A time window of2 months
is added to consider only a period of two months in a sliding manner when counting the number of times the matching condition for the pattern is met. For more information about time windows, see Summarizing Data - Calculate and store clock-time based aggregate values -
To specify how the value for each attribute in the
DefectiveProductsStream
output stream is defined, add theselect
clause as follows.select e1.productName
The output should only consist of the product identified to be possibly defective. Therefore, only the
productName
attribute is selected.- To specify that the output has to directed to the
DefectiveProductsStream
, add theinsert into
clause as follows.insert into DefectiveProductsStream
-
The completed Siddhi application is as follows.
@App:name("DefectDetectionApp")
define stream PurchasesStream (productName string, custID string);
define stream RepairsStream (productName string, custID string);
@sink(type='email', address='[email protected]', username='storemanager', password='secret_password', subject='Defective Product Alert', to='[email protected]', @map(type = 'text', @payload("Hello,The product {{productName}} is identified as defective.\n\nThis message was generated automatically.")))
define stream DefectiveProductsStream (productName string);
from every (e1=PurchasesStream) -> e2=RepairsStream[e1.productName==e2.productName and e1.custID==e2.custID]<5:> within 2 months
select e1.productName
insert into DefectiveProductsStream
Info
For more information, see Siddhi Query Guide - Counting Patterns.
Combine several patterns logically and match events¶
To understand how to combine several patterns logically and match events, consider an example of a factory foreman who needs to observe the factory output, identify any production decreases and check whether those decreases have reached maximum threshold which requires him to take action. To do this, you can create a Siddhi application as follows:
-
Start creating a new Siddhi application. You can name it
ProductionDecreaseDetectionApp
For instructions, see Creating a Siddhi Application.@App:name("ProductionDecreaseDetectionApp")
-
Define an input stream as follows to capture the factory output.
define stream ProductionStream(productName string, factoryBranch string, productionAmount long);
-
Now define an output stream as follows to present the observed production trend after applying the logical pattern.
The output directed to this stream is published via a sink of the@sink(type='log', prefix='Decrease in production detected:') define stream ProductionDecreaseAlertStream (productName string, originalAmount long, laterAmount long, factoryBranch string);
log
type. For more information about publishing data via sinks, see the Publishing Data guide. -
To apply the pattern so that the production trend can be observed, add the
from
clause as follows.from every (e1=ProductionStream) -> e2=ProductionStream[e1.productName == e2.productName and e1.productionAmount - e2.productionAmount > 10] within 10 min
Info
Note the following about the
from
clause:-
Here, two events from the same stream are compared to identify whether the production has decreased. The unique reference for the first event is
e1
, and the unique reference for the second event ise2
. -
e2
arrives aftere1
, but it is not necessarily the event that arrives immediately aftere1
. -
The condition that should be met for
e1
ande2
to be compared ise1.productName == e2.productName and e1.productionAmount - e2.productionAmount > 10
. This means, both the events should report the production of the same product, and there should be a decrease in production that is greater than 10 between thee1
ande2
events. -
A
10 min
time window is included to indicate that an output event is generated only if the decrease in production by 10 or more units takes place every ten minutes in a sliding manner. For more information about time windows, see Calculate and store clock time-based aggregate values.
-
-
To present the required output by deriving values for the attributes of the
ProductionDecreaseAlertStream
output stream you created, add theselect
clause as follows.select e1.productName, e1.productionAmount as originalAmount, e2.productionAmount as laterAmount, e1.factoryBranch
Here, the production amount of the first event is presented as
originalAmount
, and the amount of the second event is presented aslaterAmount
. -
To insert the output into the
ProductionDecreaseAlertStream
output stream, add theinsert into
clause as follows.insert into ProductionDecreaseAlertStream;
The completed Siddhi application is as follows.
@App:name("ProductionDecreaseDetectionApp")
define stream ProductionStream(productName string, factoryBranch string, productionAmount long);
@sink(type='log', prefix='Decrease in production detected:')
define stream ProductionDecreaseAlertStream (productName string, originalAmount long, laterAmount long, factoryBranch string);
from every (e1=ProductionStream) -> e2=ProductionStream[e1.productName == e2.productName and e1.productionAmount - e2.productionAmount > 10] within 10 min
select e1.productName, e1.productionAmount as originalAmount, e2.productionAmount as laterAmount, e1.factoryBranch
insert into ProductionDecreaseAlertStream;
Find non-occurrence of events¶
This section explains how to analyze data by observing scenarios where events do not occur. To understand how this is done, consider a taxi service company that tracks the movements of the taxis it runs and wants to be notified of unexpected delays. Consider a specific scenario where the manager needs to contact the driver if the taxi has not reached either of two specified locations withi 15 minutes. For this, you can create a Siddhi application as follows:
-
Start creating a new Siddhi application. You can name it
DelayDetectionApp
For instructions, see Creating a Siddhi Application.@App:name("DelayDetectionApp")
-
To receive information about the location of taxis, define an input stream as follows.
define stream LocationStream (taxiID string, driverID string, latitude double, longitude double);
-
To publish delay notification as a message, define an output stream as follows.
@sink(type='http', publisher.url='http://headoffice:8080/endpoint', @map(type = 'json')) define stream AlertStream (taxiID string, driverID string, message string);
The output directed to this stream is published via a sink of thehttp
type. For more information about publishing data via sinks, see the Publishing Data guide. -
To specify the pattern to be used to detect the delays, add the
from
clause as follows.from not LocationStream[latitude == 44.0096 and longitude == 81.2735] for 15 minutes or not LocationStream[latitude == 43.0096 and longitude == 81.2737] for 15 minutes
Info
Note the following about this
from
clause:-
The
not
keyword is added to indicate that the SI should look for instances where an event has not occurred when the given conditions are met. -
Two conditions are given. The alert is generated when either of the two conditions has not occured. To indicate this, the
or
keyword is used between the two conditions. -
The given conditions indicate that the taxi should have reached either the
latitude == 44.0096 and longitude == 81.2735
location or thelatitude == 43.0096 and longitude == 81.2737
location. Either of the locations should be reached within 15 minutes. Therefore, each location is specified as a separate condition and a time window of 15 minutes is applied to each condition in a sliding manner. For more information about time windows, see the Siddhi Query Guide - Calculate and store clock time-based aggregate values.
-
-
To derive the information relating to the delay to be published as the output, add the
select
clause as follows.select LocationStream.taxiID, LocationStream.driverID, 'Unexpected Delay' as message
The alert message is a standard message that is assigned as a static value to the message
attribute.
- To insert the results into the
AlertStream
so that the message about the delay can be published, add theinsert into
clause as follows.insert into AlertStream;
The completed Siddhi application is as follows.
@App:name("DelayDetectionApp")
define stream LocationStream (taxiID string, driverID string, latitude double, longitude double);
@sink(type='http', publisher.url='http://headoffice:8080/endpoint', @map(type = 'json'))
define stream AlertStream (taxiID string, driverID string, message string);
from not LocationStream[latitude == 44.0096 and longitude == 81.2735] for 15 minutes or not LocationStream[latitude == 43.0096 and longitude == 81.2737] for 15 minutes
select LocationStream.taxiID, LocationStream.driverID, 'Unexpected Delay' as message
insert into AlertStream;
For the complete list of methods in which you can apply Siddhi patterns to detect non occuring events, see Siddhi Query Guide - Detecting Non-Occurring Events.
Correlating events to find a trend(sequence)¶
This section explains how you can use Siddhi sequences to detect trends in events that arrive in a specific order. There are two types of Siddhi sequences as follows:
- Counting Sequences: These count the number of instances that match the given sequence condition.
- Logical Sequences: These identify logical relationships between events.
Count and match multiple events for a given trend¶
Counting and matching multiple events over a given period is done via sequences when you need to identify trends in events that occur in a specific order. To understand how this is done, consider a scenario where the temperature is read from a sensor and you need to identify the peaks in temperature. If an event (i.e., a single reading) is a peak, it should report a temperaature greater than that reported by the event that occured immediately before it as well as the event that occurred immediately after it. Therefore, to identify the peaks, follow the procedure below:
-
Start creating a new Siddhi application. You can name it
TemperaturePeaksApp
For instructions, see Creating a Siddhi Application.@App:name("TemperaturePeaksApp")
-
To capture the temperature readings, define an input stream as follows.
define stream TempStream(deviceID long, roomNo int, temp double);
-
To report the peaks once they are identified, define an output stream as follows.
@sink(type='log', prefix='TemperaturePeak]:') define stream PeakTempStream(initialTemp double, peakTemp double);
The output directed to this stream is published via a sink of thelog
type. For more information about publishing data via sinks, see the Publishing Data guide. -
To specify how to identify the peaks, add a
from
clause as follows.from every e1=TempStream, e2=TempStream[e1.temp <= temp]+, e3=TempStream[e2[last].temp > temp]
Info
Note the following about the
from
clause:-
every
indicates that all the events in theTempStream
must be checked for the given conditions. -
Here,
e2
is the reference for the event identified as the peak temperature. Thee2=TempStream[e1.temp <= temp]+
condition specifies that to be identified as an event reporting a peak temperature, an event should have one or more preceding events that reports a lower or an equal temperature. -
The
e3=TempStream[e2[last].temp > temp]
condition specifies a condition fore3
which is the event that followse2
. It indicates thate2
, the peak temperature event should be the last event beforee3
, and that the temperature reported bye2
must be greater than the temperature reported bye3
.
-
-
To specify how to derive the values for the attributes in the
PeakTempStream
output stream are derived, add aselect
clause as follows.select e1.temp as initialTemp, e2[last].temp as peakTemp
Here, the temperature reported by
e2
event is selected to be output aspeakTemp
because it is greater than the temperatures reported by events occuring before and aftere2
. The temperature reported by the event immediately beforee2
is selected asinitialTemp
. -
To insert the output generated into the
PeakTempStream
output stream, add aninsert into
clause as follows.insert into PeakTempStream;
The completed Siddhi application is as follows.
@App:name("TemperaturePeaksApp")
define stream TempStream(deviceID long, roomNo int, temp double);
@sink(type='log', prefix='TemperaturePeak]:')
define stream PeakTempStream(initialTemp double, peakTemp double);
from every e1=TempStream, e2=TempStream[e1.temp <= temp]+, e3=TempStream[e2[last].temp > temp]
select e1.temp as initialTemp, e2[last].temp as peakTemp
insert into PeakTempStream;
Combine several trends logically and match events¶
Logical sequences are used to identify logical relationships between events that occur in a specific order. To understand this consider a scenario where an application is able to notify the state only when the event that notifies that the regulator is switched on is immediately followed by two other events to report the temperature and humidity. To create such a Siddhi application, follow the procedure below.
-
Start creating a new Siddhi application. You can name it
RoomStateApp
For instructions, see Creating a Siddhi Application.@App:name("RoomStateApp")
-
You need three input streams to capture information about the state of the regulator, the temperature, and humidity.
-
Define the inpt stream that captures the state of the regulator as follows.
define stream RegulatorStream(deviceID long, isOn bool);
-
Define the input stream that captures the temperature as follows.
define stream TempStream(deviceID long, temp double);
-
Define the input stream that captures the humidity as follows.
@sink(type='log', prefix='RoomState]:') define stream StateNotificationStream(temp double, humid double);
The output directed to this stream is published via a sink of thelog
type. For more information about publishing data via sinks, see the Publishing Data guide.
-
-
Now let's define an output stream to publish the temperature and humidity.
define stream StateNotificationStream(temp double, humid double)
-
To apply the logical sequence to derive the output, add the
from
clause as follows.from every e1=RegulatorStream, e2=TempStream and e3=HumidStream
Here, the unique references
e1
,e2
, ande3
are assigned to the first, second, and thid events respectively.e1
must arrive at theRegulatorStream
stream,e2
must arrive at theTempStream
stream, ande3
must arrive at theHumidStream
stream in that order. The output event is generated only after all three of these input events have arrived. -
To derive values for the attributes of the
StateNotificationStream
output stream, add aselect
clause as follows.select e2.temp, e3.humid
To generate the output event, the value for the temp
attribute must be taken from the e2
(second) event, and the value for the humid
attribute must be taken from the e3
(third) event.
- To direct the output to the
StateNotificationStream
output stream so that it can be logged, add aninsert into
clause as follows.insert into StateNotificationStream;
The completed Siddhi application is as follows.
@App:name("RoomStateApp")
define stream TempStream(deviceID long, temp double);
define stream HumidStream(deviceID long, humid double);
define stream RegulatorStream(deviceID long, isOn bool);
@sink(type='log', prefix='RoomState]:')
define stream StateNotificationStream(temp double, humid double);
from every e1=RegulatorStream, e2=TempStream and e3=HumidStream
select e2.temp, e3.humid
insert into StateNotificationStream;
Correlating two streams of data and unify¶
For a detailed explanation, see Enriching Data - Enrich data by connecting with another stream of data
Correlate a stream and a static datasource to enrich¶
For a detailed explanation, see Enriching Data - Enrich data by connecting with a data store
Top