Source Extensions (Plugins)
- Overview
- Quickstart
- Project Structure
- Source Extension Interface
- Source Config
- Catalog of Streams
- Pulling data
- Descriptor
- Testing
- Adding to Jitsu
- Advanced
Overview#
Jitsu Source Plugins allow anyone to implement a new source type for Jitsu and publish it to make it available for all users of Jitsu.
Source Plugins designed to pull data from third party services. Each source may have multiple types of data stream, e.g.: users and events.
Quickstart#
We need to use Jitsu SDK's CLI tool to bootstrap a project for new source plugin:
npx jitsu-cli extension create --type source
nodejs
and npx
must be installed on your system.
jitsu-cli
creates a project for a source plugin. All parts are working, but they are placeholder implementation and don't do anything meaningful ā
sample source project simply returns one data row with run_number and configuration parameters.
If you are an experienced developer, you can start replacing placeholder logic with your own right away.
Project Structure#
jitsu-cli generates project directory structure with a set of files typical for Typescript node.js project:
āāā package.json āāā tsconfig.json āāā src ā āāā index.ts āāā __test__
package.json
ā file contains meta-information about npm project including name, versionsrc/index.ts
ā file contains the main logic along with plugin descriptor, config objects, source stream catalog__test__/
ā directory for teststsconfig.json
ā settings for Typescript compiler. No need to change that
Source Extension Interface#
Jitsu Source Extension must export following symbols:
descriptor
- instance ofExtensionDescriptor
, to describe the extension - name, icon, config paramsvalidator
- instance ofConfigValidator
, to run validation againsourceCatalog
- instance ofSourceCatalog
, to describe the datastreamReader
- instance ofStreamReader
, to
For more on these types please check files with full type descriptions, or check out @jitsu/types
npm package
To make concrete implementation of these types we need to define our types for Source config and Stream config.
Source Config#
Source Config must contain properties required to connect the third party service, e.g.:
userId
, password
, or apiKey
or for oauth2.0: clientId
, clientSecret
, accessToken
,...
also it may contain configuration parameters of data streams that are common for all source's stream type, e.g.:
start_date
,refresh_window
Starting from here we will use Airtable source implementation for examples:
export interface AirtableConfig {
//API Key. Read on how to get API key here: https://support.airtable.com/hc/en-us/articles/219046777-How-do-I-get-my-API-key-
apiKey: string;
//Base ID. Read how to get Base ID: https://support.airtable.com/hc/en-us/articles/4405741487383-Understanding-Airtable-IDs
baseId: string;
}
If you're using Jitsu Configurator, those parameters will be displayed properly in the UI
Validating Config#
Once we have the source config, we implement the validator
.
The validator is an optional part, but highly recommend
In the validator we need to check whether provided config grants access to the API.
async function validator(config: AirtableConfig): Promise<ConfigValidationResult> {
console.log(`Will connect to airtable with apiKey=${config.apiKey} and baseId=${config.baseId}`);
let res = await fetch("https://api.airtable.com/v0/meta/bases/" + config.baseId + "/tables", {
method: "get",
headers: [["Authorization", "Bearer " + config.apiKey]],
});
if (res.headers?.get("Content-Type").indexOf("application/json") >= 0) {
let json = await res.json();
if (json.error) {
return { ok: false, message: `${json.error?.type || json.error} ${json.error?.message || ""}`.trim() };
} else {
return true
}
} else {
return { ok: false, message: `Error Code: ${res.status} msg: ${await res.text()}` };
}
}
Now we can test our validator with validate-config action that jitsu-cli already added to the project
yarn build && yarn validate-config -c '{"apiKey":"keyMy", "baseId":"app1234"}'
alternatively config may be stored in JSON-file:
yarn build && yarn validate-config -c config.json
If everything is fine, we should get the following output:
[info ] - š¤ Validating configuration {"apiKey":"keyMy", "baseId":"app1234"} Will connect to airtable with apiKey=keyMy and baseId=app1234 [info ] - ā Config is valid. Hooray! [info ] - āØ Done
Catalog of Streams#
The source extension need a way to tell Jitsu what types of Streams it supports, and those Streams should be configured.
Think of streams as tables in database. In fact, each stream will be represented as a table in destination database.
First we need to declare type from StreamConfig
:
export interface TableStreamConfig {
//Table Id. Read how to get table id: https://support.airtable.com/hc/en-us/articles/4405741487383-Understanding-Airtable-IDs
tableId: string;
//View Id (Optional). Read how to get view id: https://support.airtable.com/hc/en-us/articles/4405741487383-Understanding-Airtable-IDs
viewId?: string;
//Fields. Comma separated list of field names. If empty or undefined - all fields will be downloaded
fields?: string;
}
And then declare sourceCatalog
function that returns an array of StreamPrototype
type:
const sourceCatalog: SourceCatalog<AirtableConfig, TableStreamConfig> = async (config: AirtableConfig) => {
return [
{
type: "table",
supportedModes: ["full_sync"],
params: [
{
id: "tableId",
displayName: "Table Id",
documentation:
"Read how to get table id: https://support.airtable.com/hc/en-us/articles/4405741487383-Understanding-Airtable-IDs",
required: true,
},
{
id: "viewId",
displayName: "View Id",
documentation:
"Read how to get view id: https://support.airtable.com/hc/en-us/articles/4405741487383-Understanding-Airtable-IDs",
required: false,
},
{
id: "fields",
displayName: "Fields",
documentation: "Comma separated list of field names. If empty or undefined - all fields will be downloaded",
required: false,
},
],
},
];
};
In this example, the source exports on stream type (called table
), which is configurable with tableId
, viewId
and fields
parameter. User
can configure a multiple instances of one stream type. In case of Airtable, each stream will represent a view of a table. Optionally, with a
subset of fields (see fields
parameter)
Pulling data#
The main logic happens in streamReader
function. This function pulls data from third party API and send it to Jitsu Server
as messages through StreamSink
interface (see detailed description of all message types below)
const streamReader: StreamReader<AirtableConfig, TableStreamConfig> = async (
sourceConfig: AirtableConfig,
streamType: string,
streamConfiguration: StreamConfiguration<TableStreamConfig>,
streamSink: StreamSink,
services: { state: StateService }
) => {
const airtable = new Airtable({ apiKey: sourceConfig.apiKey });
let table = airtable.base(sourceConfig.baseId).table(streamConfiguration.parameters.tableId);
const selectedFields = streamConfiguration.parameters.fields
? streamConfiguration.parameters.fields.split(",").map(f => f.trim())
: [];
let selectParams: QueryParams<any> = {};
if (selectedFields.length > 0) {
streamSink.log("INFO", "Fields filter: " + JSON.stringify(selectedFields));
selectParams.fields = selectedFields;
}
if (streamConfiguration.parameters.viewId) {
selectParams.view = streamConfiguration.parameters.viewId;
}
let allRecords = await table.select(selectParams).all();
allRecords.forEach(r => {
const { id, createdTime, fields } = r._rawJson;
//add record message
streamSink.addRecord({
$id: id,
$recordTimestamp: new Date(createdTime),
__sql_type_created: "timestamp with time zone",
...fields,
});
});
};
Descriptor#
descriptor
object is used by Jitsu to build Source UI form with proper descriptions and documentations:
const descriptor: ExtensionDescriptor = {
id: "airtable",
displayName: "Airtable Source",
icon: "<svg xmlns=\"http://www.w3.org/2000/svg\" viewBox=\"0 0 64 64\" width=\"100%\" height=\"100%\"><path d=\"M28.578 5.906L4.717 15.78c-1.327.55-1.313 2.434.022 2.963l23.96 9.502a8.89 8.89 0 0 0 6.555 0l23.96-9.502c1.335-.53 1.35-2.414.022-2.963l-23.86-9.873a8.89 8.89 0 0 0-6.799 0\" fill=\"#fc0\"/><path d=\"M34.103 33.433V57.17a1.6 1.6 0 0 0 2.188 1.486l26.7-10.364A1.6 1.6 0 0 0 64 46.806V23.07a1.6 1.6 0 0 0-2.188-1.486l-26.7 10.364a1.6 1.6 0 0 0-1.009 1.486\" fill=\"#31c2f2\" /> <path d=\"M27.87 34.658l-8.728 4.215-16.727 8.015c-1.06.512-2.414-.26-2.414-1.44V23.17c0-.426.218-.794.512-1.07a1.82 1.82 0 0 1 .405-.304c.4-.24.97-.304 1.455-.112l25.365 10.05c1.3.512 1.4 2.318.133 2.925\" fill=\"#ed3049\" /><path d=\"M27.87 34.658l-7.924 3.826L.512 22.098a1.82 1.82 0 0 1 .405-.304c.4-.24.97-.304 1.455-.112l25.365 10.05c1.3.512 1.4 2.318.133 2.925\" fill=\"#c62842\" /></svg>",
description: "This source pulls data from Airtable base",
configurationParameters: [
{
id: "apiKey",
displayName: "API Key",
documentation:
"Read on how to get API key here: https://support.airtable.com/hc/en-us/articles/219046777-How-do-I-get-my-API-key-",
required: true,
},
{
id: "baseId",
displayName: "Base Id",
documentation:
"Read how to get Base ID: https://support.airtable.com/hc/en-us/articles/4405741487383-Understanding-Airtable-IDs",
required: true,
},
],
};
Testing#
We need Jitsu Server to run the Source extension, but jitsu-cli
created our project with the execute
action that allows executing the source in command line mode with provided configs.
Pass Source Config json to with -c
parameter and StreamConfig with -s
If source supports multiple types of streams it is required to pass stream type with stream
parameter of StreamConfig object
yarn execute -c '{apiKey: "keyMy", baseId:"app1234"}' -s '{stream: "table", tableId:"tblMy"}'
Result will look like that:
[info ] - š Getting available streams... [info ] - š Stream: table. Parameters: tableId - Table Id, viewId - View Id, fields - Fields [info ] - [record] {"$id":"rec55g5x7GyRkz0Ex","$recordTimestamp":"2020-06-11T01:30:22.000Z","Client":["recNvBXPsvaiu5QaH"],"Due date":"2020-11-01","Project lead":{"id":"usrSdmrY5yGdbcZzg","email":"katherineduh+collab23@demo.htable.com","name":"Leslie Walker"},"Category":"Technology design","Name":"Lemon headband","Project team":[{"id":"usr6hWARwVNgmt3WW","email":"katherineduh+collab35@demo.htable.com","name":"Sam Epps"},{"id":"usru7j5m2lcNhriKv","email":"katherineduh+collab7@demo.htable.com","name":"Cameron Toth"},{"id":"usrSdmrY5yGdbcZzg","email":"katherineduh+collab23@demo.htable.com","name":"Leslie Walker"}],"Tasks":["recLLYKORkbdzQV1g","recUyxiAcW4x56HBH"],"Kickoff date":"2020-10-18"} [info ] - š Result data: [ { "$id": "rec55g5x7GyRkz0Ex", "$recordTimestamp": "2020-06-11T01:30:22.000Z", "Client": "[\"recNvBXPsvaiu5QaH\"]", "Due date": "2020-11-01", "Project lead": { "id": "usrSdmrY5yGdbcZzg", "email": "katherineduh+collab23@demo.htable.com", "name": "Leslie Walker" }, "Category": "Technology design", "Name": "Lemon headband", "Project team": "[{\"id\":\"usr6hWARwVNgmt3WW\",\"email\":\"katherineduh+collab35@demo.htable.com\",\"name\":\"Sam Epps\"},{\"id\":\"usru7j5m2lcNhriKv\",\"email\":\"katherineduh+collab7@demo.htable.com\",\"name\":\"Cameron Toth\"},{\"id\":\"usrSdmrY5yGdbcZzg\",\"email\":\"katherineduh+collab23@demo.htable.com\",\"name\":\"Leslie Walker\"}]", "Tasks": "[\"recLLYKORkbdzQV1g\",\"recUyxiAcW4x56HBH\"]", "Kickoff date": "2020-10-18" } ] Special column types: [info ] - āØ Done
First jitsu-cli prints all messages received from source. Then it prints array of all received objects.
Adding to Jitsu#
To add our plugin to jitsu we need to build and publish it.
To build plugin code use:
yarn build
Publishing to NPM Repository#
Publishing plugin to public npm repository to make it available for other users. You need to have an account in https://www.npmjs.com
The following commands in the project directory will publish the package to the npmjs repository:
npm login npm publish
npm will ask to provide some additional details to complete the publishing.
Setting up Jitsu Server#
Users of a standalone jitsu server can setup a source based on plugin since version 1.42.
Add a new source of type sdk_source, information about plugin package and version, and config to eventnative.yaml
file:
sources:
...
myairtable:
type: sdk_source
destinations:
- target_destination
collections:
- name: table
type: table
table_name: myairtable_table
schedule: '@daily'
parameters:
tableId: tblMy
schedule: '@daily'
config:
apiKey: keyMy
baseId: app1234
package_name: jitsu-airtable-source
package_version: ^0.7.2
Setting up Jitsu Joint Image or Configurator UI#
The Configurator doesn't support an addition of custom sources yet To make your source plugin appear in Jitsu Configurator UI please create a new ticket or pull request in the jitsu repository
More Details#
In previous sections we have reviewed process of source creation based on Airtable source as an example. But this source implementation doesn't cover all features and modes that Source Extension can use. Here we give more details on stream sink modes, data type conversions, persistent state management and more
Message types#
Source Extension communicates with Jitsu Server by means of StreamSink
object and messages of JitsuDataMessage
type.
Jitsu supports following message types:
Type | StreamSink helper method | Mode | Payload | Description |
---|---|---|---|---|
record | addRecord | both | DataRecord | The main message that sends a single data object to the stream. |
clear_stream | clearStream | incremental | no | In case incremental Source need to clear all previous date. Truncates table associated with stream in target destinations. When used clear_stream message must be the first message in transaction and can be used only once per stream. |
state | changeState | both | any object | Stores provided object in persistent storage. |
new_transaction | newTransaction | incremental | no | Starts a new transaction. Previous transaction will be committed to the destination. All consequent messages will be processed in the new transaction. First transaction is started implicitly even in the absence of new_transaction message. |
delete_records | deleteRecords | incremental | DeleteRecords | Deletes records that belongs to certain time period from destination storages. Time period is controlled with DeleteRecords's partitionTimestamp and granularity parameters.Requires DataRecord objects to have $recordTimestamp set with value.delete_records must precede any record messages in transaction |
log | log | both | LogRecord | Prints log message to the Jitsu server task log. |
schema | schema | both | StreamSchema | Defines stream schema. Not implemented yet |
Sync modes#
Each stream type can be declared to support full_sync
mode, incremental
or both.
If both modes are supported, 'mode' parameter in StreamConfiguration
object must be set to full_sync
or incremental
.
Supported modes are declared in supportedModes
property of StreamInstance's returned by sourceCatalog
Full Sync
In Full Sync mode a Source Extension must download all data for the stream on each run.
Full Sync mode is recommended for the most of the Sources. Exceptions are sources that have big amount of data that it takes long time to process or services that charge money from pulling the data. For such case Incremental mode may be a good choice despite being little harder to implement.
Incremental
In Incremental mode a Source Extension download only fresh data on each run.
Incremental mode usually requires Source to save some information between runs, e.g. ID or _timestamp of last loaded data record.
To save data between run use state
message. See Persistent State section for more details.
Refresh window
Some third party services may update data during a day and other service may even update data for longer periods like 30-day. It is still possible to use Incremental mode with such services. But we need to make sure that we delete all previous data for current refresh window before loading it again. For that we need to:
- Set
$recordTimestamp
for all record object that we send to Jitsu. - Process stream data using date chunks(partitions). Choose granularity -
DAY
probably will suit for the most cases. - Start
new_transaction
for each new date chunk processed. - Add
delete_records
message at the beginning of each new transaction(including the first one)
You can use Jitsu Google-Analytics source as an example of Refresh Window implementation: https://github.com/jitsucom/jitsu-sdk/blob/main/source-connectors/google-analytics
Persistent State#
It may be useful to make some information from previous source run available for consequent runs.
For that purpose Source Extension may send any custom object as a payload with state
message.
Jitsu Service persist that object in meta storage and make it available to the streamReader
via StateService
We recommend using StateService
for modifying state too instead of sending state
message directly.
To test that your source work correctly with state you can pass path of the file containing state object to the execute
script using -t
parameter:
yarn execute -c '{apiKey: "keyMy", baseId:"app1234"}' -s '{stream: "table", tableId:"tblMy"}' -t ./state.json
Record $id#
It is required to set $id
property for each record.
Usually Source data has some ID associated with its entities.
When this is not the case you can use helper function buildSignatureId
from @jitsu/jlib/lib/sources-lib
to generate id as hash of object data.
Flattening#
Source Extension is allowed to use object with nested objects as DataRecord
.
If target destination is SQL based warehouse - Jitsu will automatically convert record to flat structure.
Otherwise, object structure will be left intact.
If you want to force flat structure for all warehouses you can use flatten
function from @jitsu/jlib
module
Data types mapping#
Jitsu assumes types of data record parameters in destination based on original type
with exception to dates represented as string in simplified extended ISO format: YYYY-MM-DDTHH:mm:ss.sssZ
ā such string will be converted to the Date type in the destination storage
Date timezone#
Jitsu runs Sources with default timezone set to UTC