Skip to content

Kafka Connector Example

Given below is a sample scenario that demonstrates how to send messages to a Kafka broker via Kafka topics. The publishMessages operation allows you to publish messages to the Kafka brokers via Kafka topics.

What you'll build

Given below is a sample API that illustrates how you can connect to a Kakfa broker with the init operation and then use the publishMessages operation to publish messages via the topic. It exposes Kakfa functionalities as a RESTful service. Users can invoke the API using HTTP/HTTPs with the required information.

API has the context /publishMessages. It will publish messages via the topic to the Kafka server.

The following diagram illustrates all the required functionality of the Kafka service that you are going to build.

KafkaConnector

If you do not want to configure this yourself, you can simply get the project and run it.

Configure the connector in WSO2 Integration Studio

Follow these steps to set up the Integration Project and the Connector Exporter Project.

  1. Open WSO2 Integration Studio and create an Integration Project. Creating a new Integration Project

  2. Right click on the project that you created and click on Add or Remove Connector -> Add Connector. You will get directed to the WSO2 Connector Store.

  3. Search for the specific connector and download it to the workspace. In this case, Salesforce is used as an example, but you can do this for the connector you require for your integration scenario.
    Search Connector in the Connector Store

  4. Click Finish, and your Integration Project is ready. The downloaded connector is displayed on the left side palette with its operations.

  5. You can drag and drop the operations to the design canvas and build your integration logic. Drag connector operations

  6. Right click on the created Integration Project and select New -> Rest API to create the REST API.

  7. Specify the API name as KafkaTransport and API context as /publishMessages. You can go to the source view of the XML configuration file of the API and copy the following configuration (source view).

    <?xml version="1.0" encoding="UTF-8"?>
    <api context="/publishMessages" name="KafkaTransport" xmlns="http://ws.apache.org/ns/synapse">
        <resource methods="POST">
           <inSequence>
                <kafkaTransport.init>
                   <bootstrapServers>localhost:9092</bootstrapServers>
                   <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
                   <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
                   <maxPoolSize>100</maxPoolSize>
                </kafkaTransport.init>
                <kafkaTransport.publishMessages>
                    <topic>test</topic>
                </kafkaTransport.publishMessages>
            </inSequence>
            <outSequence/>
            <faultSequence/>
           </resource>
    </api>
    Now we can export the imported connector and the API into a single CAR application. The CAR application needs to be deployed during server runtime.

Get the project

You can download the ZIP file and extract the contents to get the project code.

Download ZIP

Deployment

Follow these steps to deploy the exported CApp in the Enterprise Integrator Runtime.

Deploying on WSO2 Enterprise Integrator 7

You can copy the composite application to the /repository/deployment/server/carbonapps folder and start the server. Micro Integrator will be started and the composite application will be deployed. You can further refer to the application deployed through the CLI tool. You can download the CLI tool from here from the Other Resources section. Make sure you first export the PATH as below.

  $ export PATH=/path/to/mi/cli/directory/bin:$PATH
  1. Log in to Micro Integrator using the following command.

    ./mi remote login
  2. Provide default credentials admin for both username and password.

  3. In order to view the APIs deployed, execute the following command.

    ./mi api show
Click here for instructions on deploying on WSO2 Enterprise Integrator 6
  1. You can copy the composite application to the /repository/deployment/server/carbonapps folder and start the server.

  2. WSO2 EI server starts and you can login to the Management Console https://localhost:9443/carbon/ URL. Provide login credentials. The default credentials will be admin/admin.

  3. You can see that the API is deployed under the API section.

Testing

Create a topic:

Let’s create a topic named “test” with a single partition and only one replica. Navigate to the and run following command.

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test     
Sample Request:

Send a message to the Kafka broker using a CURL command or sample client.

curl -v POST -d '{"name":"sample"}' "http://localhost:8290/services/KafkaTransport" -H "Content-Type:application/json"
Expected Response:

Navigate to the and run the following command to verify the messages:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
See the following message content:
{"name":"sample"}
This demonstrates how the Kafka connector publishes messages to the Kafka brokers.

What's next

Top