Skip to content

Handling Errors

WSO2 Streaming Integrator allows you to handle any errors that may occur when handling streaming data in a graceful manner. Thgis section explains the different types of errors that can occur and how they can be handled.

Types of errors

Runtime errors

Description These are errors identified based on Siddhi logic when processing events.

To specify how the system should handle errors that occur at runtime, you need to add an @OnError annotation to a stream definition as shown below.

@OnError(action='on_error_action')
define stream ( , , ... );
Events with such errors are collected and stored only if the @OnError(action='STORE') annotation is connected to a stream or if the `on.error='STORE'`parameter is set within a sink annotation.
Supported actions
  • LOG
  • STREAM
  • STORE (


For more information about these on-error actions, see [Supported on-error actions](#supported-on-error-actions).
Example In the following Siddhi application, the sink annotation specifies STORE as the on-error action. If you send the {"event": {"name": "Cake2", "amount": 20.222}} event to the `http://localhost:8007/testUnavailableEP` endpoint, the event is collected and stored in the error store because the `unavailableEndpoint` does not exist.

@App:name("SinkTransportErrorTest")

@sink(type = 'http', on.error='STORE', blocking.io='true', publisher.url = "http://localhost:8090/unavailableEndpoint", method = "POST", @map(type = 'json'))
define stream TestPublisherStream (name string, amount double);

@Source(type = 'http', receiver.url='http://localhost:8007/testUnavailableEP', basic.auth.enabled='false', @map(type='json', enclosing.element='$.event', @attributes(name='name', amount='amount')))
define stream TestInput(name string, amount double);

from TestInput
select name, amount
insert into TestPublisherStream;

Publishing errors

Description These errors occur at the time of publishing streaming data. The errors can be identified based on the Siddhi logic defined by the Siddhi extensions via which the events are published, or they may occur due to connection errors.

To specify the error handling methods for errors that occur at the time the output is published, you can include the on.error parameter in the sink configuration as shown below.

@sink(type='sink_type', on.error='on.error.action')

define stream ( , , ... );
Supported actions
  • LOG
  • STREAM
  • WAIT
  • STORE


For more information about these on-error actions, see [Supported on-error actions](#supported-on-error-actions).
Example The following is a sink annotation of the tcp type of which on-error type is WAIT. If an error occurs at publishing, the system would keep retrying to send the events until the connection is re-established.

@Sink(type = 'tcp', url='tcp://localhost:8080/abc', sync='true'
@map(type='binary'))
define stream SalesStream (productName string, amount int);

Mapping errors
Description A mapping error occurs when the payload of an event is missing a value for one or more attributes of the event schema (as per the stream definition). Events with such errors are collected when the error store is enabled. For more information, see the STORE on-error type described under [Supported on-error actions](#supported-on-error-actions).
Supported actions STORE

For more information, see [Supported on-error actions](#supported-on-error-actions).
Example The source annotation in the following Siddhi application defines a mapping with two attributes named `name` and `amount` for JSON events. If you send the `{"foo": "Cake", "amount": 20.02}` event to the `http://localhost:8006/productionStream` endpoint, a mapping error occurs because the event includes an attribute named `foo` instead of `name`.

@App:name("MappingErrorTest")

@Source(type = 'http', receiver.url='http://localhost:8006/productionStream', basic.auth.enabled='false', @map(type='json', @attributes(name='name', amount='amount')))
define stream InvalidMappingCaller(name string, amount double);

@sink(type='log', prefix='Successful mapping: ')
define stream LogStream(name string, amount double);

from InvalidMappingCaller
select *
insert into LogStream;

Supported on-error actions

The supported error actions are as follows:

LOG

This logs the event with details of the error and then drops the event. This is the default on-error action if you do not specify an on-error action via the @OnError annotation (for streams) or via the on.error='STORE parameter (for sinks).

STREAM

This automatically creates an error stream for the base stream. The definition of the error stream includes all the attributes of the base stream as well as an additional attribute named _error. The events are inserted into the error stream during a failure. The error identified is captured as the value for the _error attribute.

e.g., The following is a Siddhi application that includes the @OnError annotation to handle failures during runtime.

@OnError(name='STREAM')

define stream StreamA (symbol string, volume long);

from StreamA[custom:fault() > volume]
insert into StreamB;

from !StreamA#log("Error Occured")
select symbol, volume long, _error
insert into tempStream;

Here, if an error occurs for the base stream named StreamA , a stream named !StreamA is automatically created. The base stream has two attributes named symbol and volume. Therefore, !StreamA has the same two attributes, and in addition, another attribute named _error.

The Siddhi query uses the custom:fault() extension generates the error detected based on the specified condition (i.e., if the volume is less than a specified amount). If no error is detected, the output is inserted into the StreamB stream. However, if an error is detected, it is logged with the Error Occured text. The output is inserted into a stream named tempStream, and the error details are presented via the _error stream attribute (which is automatically included in the !StreamA error stream and then inserted into the TempStream which is the inferred output stream).

WAIT

This on-error type is only applicable to publishing errors. Here, the thread waits in the back-off and re-trying state, and reconnects once the connection is re-established.

STORE

This stores the events with errors in the error store. Before using this on-error action, you need to enable the error store in the <SI_HOME>/conf/server/deployment.yaml file by adding the following configuration.

error.store:
  enabled: true
  bufferSize: 1024
  dropWhenBufferFull: true
  errorStore: org.wso2.carbon.streaming.integrator.core.siddhi.error.handler.DBErrorStore
  config:
    datasource: ERROR_STORE_DB
    table: ERROR_STORE_TABLE
- bufferSize denotes the size of the ring buffer that is used in the disruptor when publishing events to the ErrorStore. This has to be a power of two. If not, it throws an exception during initialization. The default buffer size is 1024.

  • If the dropWhenBufferFull is set to true, the event is dropped when the capacity of the ring buffer is insufficient.
Top