The Kafka inbound endpoint provides the functionalilties of the Kafka messaging system. Kafka maintains feeds of messages in topics. Producers write data to topics and consumers read from topics. The Kafka inbound endpoint serves as a message consumer by creating a connection to ZooKeeper and requesting messages for a topic, topics, or topic filters.
Listed below are the properties used for creating an Kafka inbound endpiont.
The following properties are required when creating a Kafka inbound endpiont.
|zookeeper.connect||The host and port of a ZooKeeper server (
|consumer.type||The consumer configuration type. This can either be
|interval||The polling interval for the inbound endpoint to poll the messages.|
|coordination||If set to
|sequential||The behaviour when executing the given sequence. The property is set to
|topics||The category to feed the messages. A high-level kafka configuration can have more than one topic. You can specify multiple topic names as comma separated values.|
|content.type||The content of the message. The possible values are as follows:
If all the consumer instances have the same consumer group, this works as a traditional queue balancing the load over the consumers.
|topic.filter||The name of the topic filter.|
|filter.from.whitelist||If this is set to
If this is set to
This optional property is only applicable in a cluster environment. In a clustered environment, an inbound endpoint will only be executed in worker nodes. If set to
|sequential||Whether the messages need to be polled and injected sequentially or not.|
The following optional properties can be configured when creating a Kafka inbound endpiont.
|thread.count||The number of threads. The default value is 1.|
|consumer.id||The id of the consumer. The default value is
|socket.timeout.ms||The socket timeout for network requests. The default value is
|socket.receive.buffer.bytes||The socket receive buffer for network requests. The default value is
|fetch.message.max.bytes||The number of bytes of messages that the system should attempt to fetch for each topic-partition in each fetch request. The default values is
|num.consumer.fetchers||The number fetcher threads used to fetch data. The default value is 1.|
|auto.commit.enable||The committed offset to be used as the position from which the new consumer will begin when the process fails. The default value is
|auto.commit.interval.ms||The frequency (in miliseconds) at which the consumer offsets are committed to zookeeper. The default value is
|queued.max.message.chunks||The maximum number of message chunks buffered for consumption. Each chunk can go up to the value specified in
|rebalance.max.retries||The maximum number of retry attempts. The default value is 4.|
|fetch.min.bytes||The minimum amount of data the server should return for a fetch request. The default value is 1.|
|fetch.wait.max.ms||The maximum amount of time the server will stay blocked before responding to the fetch request when sufficient data is not available to immediately serve
|rebalance.backoff.ms||The backoff time between retries during rebalance. The default value is 2000.|
|refresh.leader.backoff.ms||The backoff time to wait before trying to determine the leader of a partition that has just lost its leader. The default value is 200.|
Set this to one of the following values based on what you need to do when there is no initial offset in ZooKeeper or if an offset is out of range.
|consumer.timeout.ms||The timeout interval after which a timeout exception is to be thrown to the consumer if no message is available for consumption. It is a good practice to set this value lower than the interval of the Kafka inbound endpoint. The default value is 2000.|
|partition.assignment.strategy||The partitions assignment strategy to be used when assigning partitions to consumer streams. Possible values are
|client.id||The user specified string sent in each request to help trace calls.|
|The ZooKeeper session timeout value in milliseconds. The default value is 6000.|
|zookeeper.connection.timeout.ms||The maximum time in milliseconds that the client should wait while establishing a connection to ZooKeeper. The default value is 6000.|
|The time difference in milliseconds that a ZooKeeper follower can be behind a ZooKeeper leader. The default value is 2000.|
|offsets.storage||The offsets storage location. Possible values are
|offsets.channel.backoff.ms||The backoff period in milliseconds when reconnecting the offsets channel or retrying failed offset fetch/commit requests. Default value is 1000.|
|offsets.channel.socket.timeout.ms||The socket timeout in milliseconds when reading responses for offset fetch/commit requests. The default value is 10000.|
|offsets.commit.max.retries||The maximum retry attempts allowed. If a consumer metadata request fails for any reason, retry takes place but does not have an impact on this limit. Default value is 5.|
|simple.topic||The category to feed the messages.|
|simple.brokers||The specific Kafka broker name.|
|simple.port||The specific Kafka server port number.|
|simple.partition||The partition of the topic.|
The maximum number of messages to retrieve.