Develop a Connector

The flowmate.io platform supports Node.js programming language for building connectors.

To help you create a component in Node.js we have created a simple My SaaS connector which connects to the described below API and demonstrates multiple features of the platform.

My SaaS Rest API

To help you create a component in Node.js, we have prepared a simple MySaaS component that connects to the MySaaS API and demonstrates various features of the platform.

MySaaS Component

Let's take a look at the structure of the MySaaS Component in Node.js:

mysaas-component-nodejs
β”œβ”€β”€ component.json                                          (1)
β”œβ”€β”€ lib
β”‚   β”œβ”€β”€ actions                                             (2)
β”‚   β”‚   β”œβ”€β”€ createContact.js
β”‚   β”œβ”€β”€ lookups                                             (3)
β”‚   β”‚   β”œβ”€β”€ lookup.js
β”‚   β”œβ”€β”€ schemas                                             (4)
β”‚   β”‚   β”œβ”€β”€ createContact.in.json
β”‚   β”‚   β”œβ”€β”€ createContact.out.json
β”‚   β”‚   └── getAllContacts.in.json
β”‚   β”‚   └── getAllContacts.out.json
β”‚   └── triggers                                            (5)
β”‚       β”œβ”€β”€ getAllContacts.js
β”œβ”€β”€ package.json                                            (6)

The Flowmate.io platform utilizes Node.js components built using the NPM run-script. This script first examines the package.json (6) configuration file to set up the node and npm versions and then initiates the build process. It subsequently downloads and constructs all necessary dependencies. For all Node.js components, the following dependency is mandatory:

"dependencies": {
      "@openintegrationhub/ferryman": "2.4.4",
}

*Ferryman version 2.4.4 is the most current at the time of writing the documentation. Please always use the latest version.

Ferryman serves as the Node.js SDK for the Flowmate.io platform, facilitating component integration by providing a straightforward programming model and ensuring effective communication with the platform.

The component.json file (1) acts as the descriptor for the component, used by the platform to collect all necessary information for displaying it in the user interface (UI). This descriptor exclusively lists the component’s functionality, including triggers and actions.

The lib directory contains subdirectories actions(2), lookups(3), schemas(4), and triggers(5), as defined in the component.json file. The source files for Node.js are located in lib/actions and lib/triggers, while the JSON schemas that define the component’s metadata are stored in lib/schemas.

Component Descriptor

The component.json file acts as the component descriptor, which the platform uses to gather all necessary information about the component. Let's examine the descriptor for the MySaaS component:

{
  "title": "My SaaS Connector",                                            (1)
  "description": "Connector for the MySaaS API",                           (2)
  "triggers": {                                                            (3)
    ...
  },
  "actions": {                                                             (4)
    ...
  }
}

In this descriptor, the component title (1) and description (2) are specified.

The triggers (3) and actions (4) properties outline the component's available triggers and actions.

Triggers

Let's delve into how triggers are defined. The example below illustrates the triggers section from the component.json descriptor file:

"triggers": {
	"lookup": {                                                         (1)
      "main": "./lib/lookups/lookup.js",
      "title": "Function to run a lookup",
      "description": "Function to call an internal trigger"
    },
  "getAllContacts": {                                                 (2)
    "main": "./lib/triggers/getAllContacts.js",                       (3)
    "title": "Get All Contacts",                                      (4)
    "metadata": {                                                     (5)
	    "in": "./lib/schemas/getAllContacts.in.json"                    (6)
      "out": "./lib/schemas/getAllContacts.out.json"                  (7)
    }
  }
}

In this description lookup (1) trigger describes file for calling internal trigger in fields with Lookup selector type.

In this example, the trigger identified by getallContacts (2) is executed by the function specified in getAllContacts.js (3). Metadata (5) described input params and schema of response. Input metadata contains path to json schema with input parameters and is defined in the getAllContacts.in.json file(6). Output metadata is defined in the getAllContacts.out.json file (7).

Lookup trigger

Such kind of trigger gathers all other triggers and execute if on lookup selector field .

This is what it looks like on the platform:

When you open mapping setup page, choose Lookup selector field type and specify other required fields

In almost all cases you can use the standard code:

const logger = require("@openintegrationhub/ferryman/lib/logging");

async function processAction(req, res, _, actionParams) {
  const { secretId, data } = actionParams;
  const { ferryman } = req;
  const { operationId, parameters, cfg } = data;
  const { process: triggerProcess } = require(`../triggers/${operationId}`);

  const msg = { data: parameters || {}, metadata: {} };

  const snapshot = {},
    incomingMessageHeaders = {};
  const tokenData = { function: operationId };

  // only when the secretId parameter is provided
  if (secretId) {
    const { authorization } = req.headers;
    const splittedAuthorization = authorization.split(" ");
    const token = splittedAuthorization[1];

    try {
      const secret = await ferryman.fetchSecret(secretId, token);
      Object.assign(cfg, secret);
    } catch (error) {
      logger.error("Error getting the secret", error);
    }
  }

  // to not fail the lookup function when they call to the emit function
  globalThis.emit = () => {};
  const context = {
    logger,
    emit() {},
  };
  const dataResponse = await triggerProcess.call(
    context,
    msg,
    cfg,
    snapshot,
    incomingMessageHeaders,
    tokenData
  );

  return res.send(dataResponse);
}

module.exports.process = processAction;

Trigger getAllContacts

Trigger function process has 3 arguments:

async function process(msg, cfg, snapshot = {}) {...}

Context

Function process called with context which contain 2 main options: logger and emit

you can access logger like:

  this.logger.info('Get All Contacts trigger started'); 

This logger based on bunyan logger and contains different levels like info, debug, trace , etc.

Emitter emit allows execute next operation:

emit data:

      this.emit('data', { data: {...}, metadata: {...} });

data object must contain data and metadat objects.

emit snapshot (see above).

Argument msg

It is incoming message. This data you can set on Mapping section on platform UI:

When you push pensil button you see fields from object which described at input metadata schema, for getAllContacts trigger it is /lib/schemas/getAllContacts.in.json.

All specified data stored in msg object like:

{ data: { name: 'Hello' } }

Argument cfg

It is incoming configuration. At Options section you see the same fields like in Mapping . This data stored in cfg object like:

{ triggerParams: { name: 'Hello' } }

But cfg object also contains service information. In nodeSettings object you can find following options:

  • skipSnapshot - if this options is true - trigger should return array of first found items. This options is true when trigger used in Lookup selector

  • continueOnError - if this option enabled, and during trigger execution caused error - trigger should emit empty message like:

this.emit('data', { data: {}, metadata: {} });

In the UI this is checkbox:

  • initialSnapshot - if this option enabled, snapshot will contain date when flow created. This option prevent initial import

In the UI it is checkbox:

Argument snapshot

It is object where we need to store information between flow execution. The most common case - save here lastUpdated timestamp and on the next execution filter items which we already emited on previous executions. If API contains pagination functionality it is possible to save next token in snapshot.

When we retrieve items and emit them, it is needed to emit new version of snapshot like:

    this.emit('snapshot', {lastUpdated: '2024-07-03T12:57:36.216Z'});

Actions

Let's delve into how actions are defined. The example below illustrates the actions section from the component.json descriptor file:

"actions": {
  "createContact": {                                                 (1)
    "main": "./lib/triggers/createContact.js",                       (2)
    "title": "Create Contact",                                       (3)
    "metadata": {                                                    (4)
	    "in": "./lib/schemas/createContact.in.json"                    (5)
      "out": "./lib/schemas/createContact.out.json"                  (6)
    }
  }
}

In this example, the action identified by createContact (1) is executed by the function specified in createContact.js (2). Title for this action described at (3). Metadata (4) described input params and schema of response. Input metadata contains path to json schema with input parameters and is defined in the createContact.in.json file(5). Output metadata is defined in the createContact.out.json file (6).

Actions has the same context and incoming arguments, but snapshot argument in most cases is unused.

πŸ” Authorization Types

This page documents the four supported authorization types. The actual credentials are stored in the cfg field. You can use these values to authorize API requests in your integration code.


1. API_KEY

Structure:

{
  "cfg": {
    "key": "API_KEY_HERE",
    "headerName": "X-API-Key"
  }
}

If headerName is specified, use it to set a custom header:

{ [headerName]: key }

If headerName is not provided (empty), use the default:

Authorization: key

Example usage:

const axios = require('axios');

class Client {
  constructor(context, cfg, baseURL) {
    this.logger = context.logger;
    this.cfg = cfg;
    this.baseURL = baseURL;
  }

  async apiRequest(options, axiosInstance = axios) {
    const headers = {
      ...(options.headers || {})
    };

    if (this.cfg.headerName) {
      headers[this.cfg.headerName] = this.cfg.key;
    } else {
      headers['Authorization'] = this.cfg.key;
    }

    const callOptions = {
      ...options,
      baseURL: this.baseURL,
      headers
    };

    try {
      return await axiosInstance.request(callOptions);
    } catch (err) {
      this.logger.error(`API request failed: ${err.message}`);
      throw err;
    }
  }
}

module.exports = Client;

2. OA2_AUTHORIZATION_CODE

Structure:

{
  "cfg": {
    "authClientId": "xxx",
    "refreshToken": "xxx",
    "accessToken": "xxx",
    "expires": "ISO_DATE",
    "externalId": "user-id"
  }
}

Example usage:

const axios = require('axios');

class Client {
  constructor(context, cfg, baseURL) {
    this.logger = context.logger;
    this.cfg = cfg;
    this.baseURL = baseURL;
  }

  async apiRequest(options, axiosInstance = axios) {
    const callOptions = {
      ...options,
      baseURL: this.baseURL,
      headers: {
        ...(options.headers || {}),
        Authorization: `Bearer ${this.cfg.accessToken}`
      }
    };

    try {
      return await axiosInstance.request(callOptions);
    } catch (err) {
      this.logger.error(`API request failed: ${err.message}`);
      throw err;
    }
  }
}

module.exports = Client;

3. SIMPLE (Username & Password)

Structure:

{
  "cfg": {
    "username": "user@example.com",
    "passphrase": "password123"
  }
}

Example usage:

const axios = require('axios');

class Client {
  constructor(context, cfg, baseURL) {
    this.logger = context.logger;
    this.cfg = cfg;
    this.baseURL = baseURL;
  }

  async apiRequest(options, axiosInstance = axios) {
    const auth = {
      username: this.cfg.username,
      password: this.cfg.passphrase
    };

    const callOptions = {
      ...options,
      baseURL: this.baseURL,
      auth
    };

    try {
      return await axiosInstance.request(callOptions);
    } catch (err) {
      this.logger.error(`API request failed: ${err.message}`);
      throw err;
    }
  }
}

module.exports = Client;

4. SESSION_AUTH

Structure:

{
  "cfg": {
    "authClientId": "xxx",
    "accessToken": "xxx",
    "inputFields": {
      "username": "user@example.com",
      "password": "password123"
    },
    "expires": null}
}

This method assumes a session-based authentication, where credentials are used to obtain an access token. The token is reused until expiration.

Example usage:

const axios = require('axios');

class Client {
  constructor(context, cfg, baseURL) {
    this.logger = context.logger;
    this.cfg = cfg;
    this.baseURL = baseURL;
  }

  async apiRequest(options, axiosInstance = axios) {
    const callOptions = {
      ...options,
      baseURL: this.baseURL,
      headers: {
        ...(options.headers || {}),
        Authorization: `Bearer ${this.cfg.accessToken}`
      }
    };

    try {
      return await axiosInstance.request(callOptions);
    } catch (err) {
      this.logger.error(`API request failed: ${err.message}`);
      throw err;
    }
  }
}

module.exports = Client

Summary

You can see example of connector here.

Last updated