Kafka Inbound¶
Introduction¶
The Kafka inbound endpoint provides the functionalilties of the Kafka messaging system. Kafka maintains feeds of messages in topics. Producers write data to topics and consumers read from topics. The Kafka inbound endpoint serves as a message consumer by creating a connection to ZooKeeper and requesting messages for a topic, topics, or topic filters.
Properties¶
Listed below are the properties used for creating an Kafka inbound endpiont.
Required Properties¶
The following properties are required when creating a Kafka inbound endpiont.
Property | Description |
---|---|
zookeeper.connect | The host and port of a ZooKeeper server (hostname:port ). |
consumer.type | The consumer configuration type. This can either be simple or highlevel . |
interval | The polling interval for the inbound endpoint to poll the messages. |
coordination | If set to true in a clustered setup, this will run the inbound only in a single worker node. The property is set to true by default. |
sequential | The behaviour when executing the given sequence. The property is set to true by default. Set this property to false to use the Kafka inbound in a non-sequential mode as it allows better performance than the sequential mode. |
topics | The category to feed the messages. A high-level kafka configuration can have more than one topic. You can specify multiple topic names as comma separated values. |
content.type | The content of the message. The possible values are as follows: appllication/xml or application/json . |
group.id |
If all the consumer instances have the same consumer group, this works as a traditional queue balancing the load over the consumers. |
topic.filter | The name of the topic filter. |
filter.from.allowlist | If this is set to true , messages are consumed from the allowlist(include).If this is set to false , messages are consumed from the denylist(exclude).
|
coordination |
This optional property is only applicable in a cluster environment. In a clustered environment, an inbound endpoint will only be executed in worker nodes. If set to true in a cluster setup, this will run the inbound only in a single worker node. Once the running worker is down, the inbound starts on another available worker in the cluster. By default, coordniation is enabled.
|
sequential | Whether the messages need to be polled and injected sequentially or not. |
Optional Properties¶
The following optional properties can be configured when creating a Kafka inbound endpiont.
Property Name |
Description |
---|---|
thread.count | The number of threads. The default value is 1. |
consumer.id | The id of the consumer. The default value is null . |
socket.timeout.ms | The socket timeout for network requests. The default value is 30 * 1000 . |
socket.receive.buffer.bytes | The socket receive buffer for network requests. The default value is 64 * 1024 . |
fetch.message.max.bytes | The number of bytes of messages that the system should attempt to fetch for each topic-partition in each fetch request. The default values is 1024 * 1024 . |
num.consumer.fetchers | The number fetcher threads used to fetch data. The default value is 1. |
auto.commit.enable | The committed offset to be used as the position from which the new consumer will begin when the process fails. The default value is true . |
auto.commit.interval.ms | The frequency (in miliseconds) at which the consumer offsets are committed to zookeeper. The default value is 60 * 1000 . |
queued.max.message.chunks | The maximum number of message chunks buffered for consumption. Each chunk can go up to the value specified in fetch.message.max.bytes . The default value is 2. |
rebalance.max.retries | The maximum number of retry attempts. The default value is 4. |
fetch.min.bytes | The minimum amount of data the server should return for a fetch request. The default value is 1. |
fetch.wait.max.ms | The maximum amount of time the server will stay blocked before responding to the fetch request when sufficient data is not available to immediately serve fetch.min.bytes . The default value is 100 |
rebalance.backoff.ms | The backoff time between retries during rebalance. The default value is 2000. |
refresh.leader.backoff.ms | The backoff time to wait before trying to determine the leader of a partition that has just lost its leader. The default value is 200. |
auto.offset.reset |
Set this to one of the following values based on what you need to do when there is no initial offset in ZooKeeper or if an offset is out of range.
|
consumer.timeout.ms | The timeout interval after which a timeout exception is to be thrown to the consumer if no message is available for consumption. It is a good practice to set this value lower than the interval of the Kafka inbound endpoint. The default value is 2000. |
exclude.internal.topics | Set to true if messages from internal topics such as offsets should be exposed to the consumer. The default value is true . |
partition.assignment.strategy | The partitions assignment strategy to be used when assigning partitions to consumer streams. Possible values are range and roundrobin . Default setting is range . |
client.id | The user specified string sent in each request to help trace calls. |
zookeeper.session.timeout.ms |
The ZooKeeper session timeout value in milliseconds. The default value is 6000. |
zookeeper.connection.timeout.ms | The maximum time in milliseconds that the client should wait while establishing a connection to ZooKeeper. The default value is 6000. |
zookeeper.sync.time.ms |
The time difference in milliseconds that a ZooKeeper follower can be behind a ZooKeeper leader. The default value is 2000. |
offsets.storage | The offsets storage location. Possible values are zookeeper |
offsets.channel.backoff.ms | The backoff period in milliseconds when reconnecting the offsets channel or retrying failed offset fetch/commit requests. Default value is 1000. |
offsets.channel.socket.timeout.ms | The socket timeout in milliseconds when reading responses for offset fetch/commit requests. The default value is 10000. |
offsets.commit.max.retries | The maximum retry attempts allowed. If a consumer metadata request fails for any reason, retry takes place but does not have an impact on this limit. Default value is 5. |
dual.commit.enabled | If offsets.storage is set to kafka , the commit offsets can be dual to ZooKeeper. Set this to true if you need to perform migration from zookeeper-based offset storage to kafka-based offset storage. The default value is true . |
simple.topic | The category to feed the messages. |
simple.brokers | The specific Kafka broker name. |
simple.port | The specific Kafka server port number. |
simple.partition | The partition of the topic. |
simple.max.messages.to.read |
The maximum number of messages to retrieve. |