Cleansing Data¶
Introduction¶
When you receive input data via the Streaming Integrator, it may consist of data that is not required to generate the required output, null values for certain attributes, etc. Cleansing data refers to refining the input data received by applying null values,
Filtering data based on conditions¶
To understand the different ways you can filter the specific data you need to transform and enrich in order to generate the required output, follow the procedures below:
-
Filtering based on exact match of attribute:
-
Open the Streaming Integrator Tooling and start creating a new Siddhi application. For more information, see Creating a Siddhi Application.
-
Enter a name for the Siddhi application via the
@App:nameannotation. In this example, let's name itTemperatureApp. -
Define an input stream to specify the schema based on which events are selected to the Streaming Integration flow.
define stream TempStream (deviceID long, roomNo string, temp double);Info
For more information about defining input streams to receive events, see the Consuming Data guide.
-
Add a query to generate filtered temperature readings as follows. For this example, let's assume that you want to filter only temperature readings for a specific room number (e.g., room no
2233).-
Add the
fromclause and enterTempStreamas the input stream from which the input data. However, because you only need to extract readings for room no2233, include a filter in thefromclause as shown below:from TempStream [roomNo=='2233'] -
Add the
selectclause with*to indicate that all the attributes should be selected without any changes.from TempStream [roomNo=='2233'] select * -
Add the
insert toclause and direct the output to a stream namedRoom2233AnalysisStream.Info
Note that the
Room2233AnalysisStreamis not defined in the Siddhi application. It is inferred by specifying it as an output stream.from TempStream [roomNo=='2233'] select * insert into Room2233AnalysisStream;Tip
As a best practice, name your queries using the
@infoannotation. In this example, you can name the queryFilteringas follows.@info(name = 'Filtering2233') from TempStream [roomNo=='2233'] select * insert into Room2233AnalysisStream;The saved Siddhi application is as follows:
@App:name("TemperatureApp") @App:description("Description of the plan") define stream TempStream (deviceID long, roomNo string, temp double); @info(name = 'Filtering2233') from TempStream [roomNo=='2233'] select * insert into Room2233AnalysisStream
-
-
-
Filtering based on regex pattern
You can filter events by providing a condition where only events that match a specific Regex pattern are taken for further processing.
For this purpose, you can use the
TemperatureAppSiddhi application that you created in the previous example. However, instead of filtering the readings for a specific room no, you can filter the readings for many rooms of which the room number matches a specific regex pattern.Assume that you want to filter the temperature readings for a specific rage of rooms located in the Southern wing and used for purpose B. Also assume that this can be derived from the room number because the first three characters of the room no represent the wing, and the eighth character represents the purpose. e.g., in room no
SOU5438B765, the first three charactersSOUrepresent the Southern wing, and the eighth characterBrepresents purpose B.To filter events as described, follow the procedure below.
- Open the
TemperatureAppSiddi application. -
Create a new query named
FilteredRoomRangeas follows:- Add a
fromclause as follows to get the required events from theTempStreamstream.
from TempStream-
Add
selectstatement with theregexpattern as follows:select deviceID, regex.find(SOU*B*) as roomNo, temp -
Add the
insert toclause as follows to insert the results into a stream namedFilteredResultsStream.
insert into FilteredResultsStream;The completed query is as follows.
@info(name = 'FilteredRoomRange') from TempStream select deviceID, regex.find(SOU*B*) as roomNo, temp insert into FilteredResultsStream; - Add a
-
Save the Siddhi application. The completed Siddhi application looks as follows.
@App:name("TemperatureApp") @App:description("Description of the plan") define stream TempStream (deviceID long, roomNo string, temp double); @info(name = 'FilteredRoomRange') from TempStream select deviceID, regex.find(SOU*B*) as roomNo, temp insert into FilteredResultsStream;
- Open the
-
Filtering based on multiple criteria
For this purpose, you can use the
TemperatureAppSiddhi application that you created in the example under Filtering based on exact match of attribute section. However, instead of filtering only readings for room No2233, assume that you need to filter the readings for a range of rooms (e.g., rooms 100-210) where the temperature is greater than 40. For this, you can update the filter as follows.[(roomNo >= 100 and roomNo < 210) and temp > 40]Here, the
andlogical expression is used to indicate that both the filter conditions provided need to be considered.
Modifying, removing and replacing attributes¶
The input data may include attributes that are not required in order to generate the required output, attributes with values that need to be updated or replaced before further processing.
Assume that in the previous example, you do not need the device ID for further processing, and you
need to remove some unnecessary white spaces from the roomNo before sending the input data for further processing. To do this, follow the procedure below:
- Open the
TemperatureAppSiddhi application that you previously created in the Filtering data based on conditions section and start adding a new query. You can name it asCleaningDataas shown below.
@info(name = 'CleaningData')
-
Add the
fromclause and enterFilteredResultsStreamas the input stream from which the input data is taken.from FilteredResultsStream -
Let's create the
selectstatement as follows.-
To select only the
roomNoandtempattributes for further processing and remove thedeviceIDattribute, add them as follows.select roomNo, temp -
To remove the unnecessary white spaces from the room number, add the
trim()function as shown below.trim(roomNo)
Now the completed
selectstatement is as follows.select trim(roomNo), temp -
-
Insert the results into an output stream as follows.
insert into CleansedDataStream;
The completed query is as follows:
@info(name = 'CleaningData')
from FilteredResultsStream
select trim(roomNo), temp
insert into CleansedDataStream;
Modifying and replacing is also demonstrated in the Enriching Data and Transforming Data guides.
Handling attributes with null values¶
To understand this section, you can reuse the TemperatureApp Siddhi application that you created in the Filtering data based on conditions.
Assume that some events arrive with null values for the deviceID attribute, and you want to assign the value unknown in such scenarios.
To do this, follow the procedure below:
-
Start adding a new query to the
TemperatureAppSiddhi application. You can name itAddingMissingValuesas follows.@info(name = 'AddingMissingValues') -
Add the
fromclause and enterFilteredResultsStreamas the input stream from which the input data is taken.from FilteredResultsStreamNote
Here, we are using the inferred output stream of the previous query as the input stream for this query. As a result, the changes made via this query are applied to the filtered data.
-
Add the
selectclause. To assignunknownas the value for thedeviceIDattribute when it has a null value, you need to use theifThenElsefunction as shown below.ifThenElse(deviceID is null, "UNKNOWN", deviceID) as deviceID
Select the roomNo and temp attributes can be selected without any changes. The query updated with the select clause now looks as follows.
select ifThenElse(deviceID is null, "UNKNOWN", deviceID) as deviceID, roomNo, temp
-
Insert the results into an output stream as follows.
insert into CleansedDataStreamThe completed query now looks as follows.
@info(name = 'AddingMissingValues') from FilteredResultsStream select ifThenElse(deviceID is null, "UNKNOWN", deviceID) as deviceID, roomNo, temp insert into CleansedDataStream -
Save the Siddhi application. The completed version looks as follows.
@App:name("TemperatureApp") @App:description("Description of the plan") define stream TempStream (deviceID long, roomNo string, temp double); @info(name = 'Filtering') from TempStream [roomNo=='2233'] select * insert into FilteredResultsStream; @info(name = 'AddingMissingValues') from FilteredResultsStream select ifThenElse(deviceID is null, "UNKNOWN", deviceID) as deviceID, roomNo, temp insert into CleansedDataStream;