How To Implement a Source
This page describes new source implementation process. If you need to sync a source that hasn't been implemented yet, you can implement it in go language in about 8 hours! At first, please read How it works section about source synchronization process.
Packages#
All sources implementations are in Jitsu repository server/drivers package where 1 source - 1 sub-package. A new package name should be equal to source name.
Implementation#
Collections#
Jitsu considers every external (third-party) platform or system as a plenty of collections
. Collection - is a piece of API or a configurable report.
For instance Google Play has 2 collections: sales and earnings.
sales
and earnings
are Google pre-configured reports, that can be downloaded.
At the same time Google Analytics has 1 collection - configurable report, where consumers can request certain metrics and dimensions.
Interface#
Every source must implement Driver interface. Every driver implementation should support syncing all collection types.
Every driver instance is in charge to sync a certain collection. For example GooglePlayDriver
implementation supports syncing both sales
and earnings
.
For syncing N (2 or more) collections there should be created N GooglePlayDriver
instances (1 per collection).
//Driver interface must be implemented by every source type
type Driver interface {
io.Closer
//GetAllAvailableIntervals return all the available time intervals for data loading. It means, that if you want
//your driver to load for the last year by month chunks, you need to return 12 time intervals, each covering one
//month. There is drivers/granularity.ALL for data sources that store data which may not be split by date.
GetAllAvailableIntervals() ([]*TimeInterval, error)
//GetObjectsFor returns slice of objects per time interval. Each slice element is one object from the data source.
GetObjectsFor(interval *TimeInterval) ([]map[string]interface{}, error)
//Type returns string type of driver. Should be unique among drivers
Type() string
//GetCollectionTable returns table name
GetCollectionTable() string
//GetCollectionMetaKey returns key for storing signature in meta.Storage
GetCollectionMetaKey() string
}
Every driver implementation should have:
- constructor func:
func NewGooglePlay(ctx context.Context, config *SourceConfig, collection *Collection) (Driver, error)
- test connection func:
func TestGooglePlay(sourceConfig *base.SourceConfig) error
- package init func for initialization constructor and test connection funcs by source type:
func init() {
base.RegisterDriver(base.GooglePlayType, NewGooglePlay)
base.RegisterTestConnectionFunc(base.GooglePlayType, TestGooglePlay)
}
Driver loading#
For driver loading you should add explicit import with newly created driver implementation in server/drivers/factory.go
import (
_ "github.com/jitsucom/jitsu/server/drivers/<YOUR DRIVER PACKAGE>"
)
Supported intervals#
According to How it works section every driver loads data with time interval chunks.
Granularity of chunks depends on the source type (API specification). For instance Google Play earnings and sales reports are stored per month =>
GooglePlayDriver supports intervals with MONTH
granularity. At the same time Google Analytics API returns data per day => GoogleAnalyticsDriver supports
intervals with DAY
granularity.
see drivers/base/time_interval.go
Funcs description#
GetAllAvailableIntervals
func returns all available intervals with supported granularity.
func (gp *GooglePlay) GetAllAvailableIntervals() ([]*TimeInterval, error)
GetObjectsFor
func downloads and returns objects from input interval.
func (gp *GooglePlay) GetObjectsFor(interval *TimeInterval) ([]map[string]interface{}, error)
Type
func returns source type constant (e.g. google_play
)
func (gp *GooglePlay) Type() string
GetCollectionTable
func proxies call to gp.collection.GetTableName()
and returns table name for driver's underlying collection.
func (gp *GooglePlay) GetCollectionTable() string
GetCollectionMetaKey
func returns meta storage key for storing current synchronization state (signature) for preventing
re-syncing old data. It usually has the same implementation in every driver:
func (gp *GooglePlay) GetCollectionMetaKey() string {
return gp.collection.Name + "_" + gp.GetCollectionTable()
}
Tests#
Jitsu has semi-auto test that uses JSON config from OS var and loads and writes first interval data to output file in server/drivers/test_output dir (see driver_test.go
).
Use it for debugging. You should run the test locally with TEST_DRIVER_CONFIG
OS var that should contain source JSON configuration:
Redis source configuration example:
{
"source_id": "test_redis",
"type": "redis",
"collections": [
{
"name": "currencies",
"table_name": "my_currencies",
"parameters": {
"redis_key": "currencies:*"
}
}
],
"config": {
"host": "redis_host",
"password": "redis_password",
"port": 6379
}
}
After running driver test Jitsu will write output in server/drivers/test_output/my_currencies.log
as a table structure that represents
what data structure will be in the datawarehouse.
Troubleshooting#
If you've noticed Unknown source type
error - please make sure that you've added:
- required funcs from Interface section to newly created driver package
- explicit driver loading from Driver loading section to
server/drivers/factory.go
imports