Skip to content

Salesforce to MySQL Database

Ballerina is an open-source programming language that empowers developers to integrate their system easily with the support of connectors. In this guide, we are mainly focusing on using batch processing to synchronize Salesforce data with a MySQL Database.

The Salesforce connector allows you to perform CRUD operations for SObjects, query using SOQL, search using SOSL, and describe SObjects and organizational data through the Salesforce REST API. Also it supports insert, upsert, update, query, and delete operations for CSV, JSON, and XML data types that are available in Salesforce bulk API. It handles OAuth 2.0 authentication.

Ballerina provides standardized interface for accessing any relational database via JDBC. This allows you to run diverse SQL operations on a database, including Select, Insert, Update, and Delete.

You can find other integration modules from wso2-ballerina GitHub organization.

What you'll build

This application queries Salesforce for new or updated contacts at a regular interval. Then it processes SOQL records one at a time. It queries the database using JDBC client to check whether the Salesforce contact exists in the database. If it currently exists in the database, update the existing account in the database. Alternately, insert the new contact into the database. After the process is completed for the entire Salesforce accounts batch, a success message is logged.

sfdc to mysql database

Before you begin

Get the code

Pull the module from Ballerina Central using the following command.

ballerina pull wso2/salesforce_to_mysql

Alternately, you can download the ZIP file and extract the contents to get the code.

Download ZIP

Implementation

A Ballerina project needs to be created for the integration use case explained above. Please follow the steps given below to create the project and modules. You can learn about the Ballerina project and modules in this guide.

1. Create a new project.

$ ballerina new salesforce-to-mysql-db

2. Create a module.

$ ballerina add salesforce_to_mysql

The project structure is created as indicated below.

salesforce-to-mysql-db
├── Ballerina.toml
└── src
    └── salesforce_to_mysql
        ├── Module.md
        ├── main.bal
        ├── resources
        └── tests
            └── resources

3. Set up credentials for accessing Salesforce.

  • Visit Salesforce and create a Salesforce account.

  • Create a connected app and obtain the following credentials:

    • Base URL (Endpoint)
    • Access Token
    • Client ID
    • Client Secret
    • Refresh Token
    • Refresh Token URL
  • Note: When you are setting up the connected app, select the following scopes under Selected OAuth Scopes:

    • Access and manage your data (api)
    • Perform requests on your behalf at any time (refresh_token, offline_access)
    • Provide access to your data via the Web (web)
  • Provide the client ID and client secret to obtain the refresh token and access token. For more information on obtaining OAuth2 credentials, see the Salesforce documentation.

4. Create a database and set up credentials

  • If you have not installed MySQL in your computer, Please install MySql on your local computer. Visit here to download and install MySQL. After installing configure configure a MySQL user and obtain username and password.

  • Create a new database and create a new contacts table. You can use following SQL script to create the table and insert a data row in to the table.

    USE sf_company;
    CREATE TABLE IF NOT EXISTS contacts (
        email varchar(255) NOT NULL,
        first_name varchar(255) NOT NULL,
        last_name varchar(255) NOT NULL,
        last_modified timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
        PRIMARY KEY (email)
    );
    INSERT INTO contacts VALUES ("johndoe@wso2.com", "John", "Doe", CURRENT_TIMESTAMP);

5. Add JDBC client connector

Since we are using JDBC client for Database operations we need to create new directory called lib in the project root directory and add mysql-connector-java.jar to the newly created lib directory. You can install mysql-connector-java.jar from here. After that you should edit your Ballerina.toml file and mentioned the path to mysql-connector-java.jar as follows.

[project]
org-name= "wso2"
version= "0.1.0"

[dependencies]

[platform]
target = "java8"

  [[platform.libraries]]
  module = "salesforce_to_mysql"
  path = "./lib/mysql-connector-java.jar"

6. Add project configurations file

Add the project configuration file by creating a ballerina.conf file under the root path of the project structure. This file should have following configurations. Add the obtained Salesforce configurations and Database configurations to the file.

SF_BASE_URL="<Salesforce base url (eg: https://ap15.salesforce.com)>"
SF_ACCESS_TOKEN="<Salesforce access token>"
SF_CLIENT_ID="<Salesforce client ID>"
SF_CLIENT_SECRET="<Salesforce client secret>"
SF_REFRESH_URL="<Salesforce refresh url (eg: https://login.salesforce.com/services/oauth2/token)>"
SF_REFRESH_TOKEN="<Salesforce refresh token>"
JDBC_URL="<JDBC URL (eg: jdbc:mysql://localhost:3306/sf_company)>"
DB_USERNAME="<MySQL database username>"
DB_PASSWORD="<MySQL database password>"
SCHEDULER_INTERVAL_IN_MILLIS=<Scheduler interval in milli-seconds (eg: 60000)>

7. Write the integration

Open the project with VS Code. The integration implementation is written in the src/sfdc_to_mysql_db/main.bal file.

import ballerina/config;
import ballerina/log;
import ballerina/task;
import ballerinax/java.jdbc;
import wso2/sfdc46;

// Salesforce configuration.
sfdc46:SalesforceConfiguration sfConfig = {
    baseUrl: config:getAsString("SF_BASE_URL"),
    clientConfig: {
        accessToken: config:getAsString("SF_ACCESS_TOKEN"),
        refreshConfig: {
            clientId: config:getAsString("SF_CLIENT_ID"),
            clientSecret: config:getAsString("SF_CLIENT_SECRET"),
            refreshToken: config:getAsString("SF_REFRESH_TOKEN"),
            refreshUrl: config:getAsString("SF_REFRESH_URL")
        }
    }
};

// MySQL configuration.
jdbc:Client contactsDB = new ({
    url: config:getAsString("JDBC_URL"),
    username: config:getAsString("DB_USERNAME"),
    password: config:getAsString("DB_PASSWORD"),
    poolOptions: {maximumPoolSize: 5},
    dbOptions: {useSSL: false}
});

// Represents `contacts` table.
type Contact record {
    string email;
    string first_name;
    string last_name;
};

// Create salesforce client.
sfdc46:Client salesforceClient = new (sfConfig);

public function main() returns error? {
    task:Scheduler timer = new ({
        intervalInMillis: config:getAsInt("SCHEDULER_INTERVAL_IN_MILLIS"),
        initialDelayInMillis: 0
    });

    // Attach the service to the scheduler.
    var attachResult = timer.attach(sfdcToMysqlService);
    if (attachResult is error) {
        log:printError("Error attaching the sfdcToMysqlService.", attachResult);
        return;
    }

    // Start the scheduler.
    var startResult = timer.start();
    if (startResult is error) {
        log:printError("Starting the task is failed.", startResult);
        return;
    }
}

service sfdcToMysqlService = service {
    resource function onTrigger() returns error? {
        log:printInfo("service started...");
        sfdc46:SoqlRecord[] newSfContacts = check getNewSfContacts();
        if (updateDb(newSfContacts)) {
            log:printInfo("Batch job SFDC -> MySQL has been completed.");
        } else {
            log:printError("Batch job SFDC -> MySQL has been failed!");
        }
    }
};

function getNewSfContacts() returns @tainted sfdc46:SoqlRecord[]|error {
    string q = "SELECT Email, FirstName, LastName, LastModifiedDate FROM Contact WHERE LastModifiedDate > YESTERDAY";
    sfdc46:SoqlResult queryResult = check salesforceClient->getQueryResult(q);

    if (queryResult.done == true) {
        return queryResult.records;
    } else {
        return error("Query failed!");
    }
}

function updateDb(sfdc46:SoqlRecord[] newSfContacts) returns boolean {
    foreach sfdc46:SoqlRecord newSfContact in newSfContacts {
        // Query DB and select contacts using the `email`.
        string q = "SELECT first_name, last_name, email FROM contacts WHERE email=?";
        table<Contact> | error dbRes = contactsDB->select(<@untainted> q, Contact, newSfContact["Email"].toString());

        if (dbRes is table<Contact>) {
            if (dbRes.hasNext()) {
                // Update the contact.
                if (updateContact(newSfContact)) {
                    log:printDebug("DB contact updated successfully!");
                } else {
                    log:printError("DB update failed, newSfContact:" + newSfContact.toString());
                }
            } else {
                // Inserting to the DB, since this is a new salesforce account.
                if (insertToDb(newSfContact)) {
                    log:printDebug("New salesforce contact inserted to DB successfully!");
                } else {
                    log:printError("DB Insertion failed, newSfContact:" + newSfContact.toString());
                }
            }
            dbRes.close();
        } else {
            log:printError("Select data from student table failed: " + <string> dbRes.detail()["message"], dbRes);
            return false;
        }
    }
    return true;
}

function insertToDb(sfdc46:SoqlRecord c) returns boolean {
    string q = "INSERT INTO contacts (first_name, last_name, email) VALUES (?,?,?)";
    return handleUpdate(q, c);
}

function updateContact(sfdc46:SoqlRecord c) returns boolean {
    string q = "UPDATE contacts SET first_name=?, last_name=? WHERE email=?";
    return handleUpdate(q, c);
}

function handleUpdate(string q, sfdc46:SoqlRecord c) returns boolean {
    jdbc:UpdateResult | jdbc:Error updateResult =
    contactsDB->update(q, c["FirstName"].toString(), c["LastName"].toString(), c["Email"].toString());

    if (updateResult is jdbc:UpdateResult) {
        return updateResult.updatedRowCount == 1;
    } else {
        log:printError("DB update failed: " + <string> updateResult.detail()["message"], updateResult);
        return false;
    }
}

Here we are running sfdcToMysqlService using a task scheduler. You can set the Scheduler interval in the ballerina.conf file. When the sfdcToMysqlService service's onTrigger function is triggered, it will retrieve newly modified Salesforce contacts and update the database using them.

Testing

First let’s build the module. Navigate to the project root directory and execute the following command.

$ ballerina build salesforce_to_mysql

This creates the executables. Now run the salesforce_to_mysql.jar file created in the above step.

$ java -jar target/bin/salesforce_to_mysql.jar

You will see the following log after successfully updating the database.

2019-09-26 17:41:27,708 INFO  [wso2/sfdc_to_mysql_db] - service started...
2019-09-26 17:41:32,094 INFO  [wso2/sfdc_to_mysql_db] - Batch job SFDC -> MySQL has been completed.
Top