Change Replication Overview

In Integrator, it is possible to create a flow that tracks changes in a database or API and loads the data into another database or API. Another name for change replication is data synchronization.

Integrator supports unidirectional change replication: from the source to the destination. To replicate changes in the opposite direction--from the destination to the source--a separate flow must be created, where the source and the destination swap places.

Use Cases

Change replication can occur:

Resolving change conflicts

When a record is modified in both the source and destination, Integrator will simply apply the source changes to the destination, and all destination changes will be lost.

High watermark

A high watermark is the highest peak in value that a field has reached.

For example:

Suppose there is a table audit_trail in a source PostgreSQL database. This table will be updated each time the user logs into the system or executes a specific function. We would like to track changes in this table and load them into an online data warehouse (which is also a database).

Suppose that the table audit_trail has a field Last_modified TIMESTAMP NOT NULL which is updated each time a record is inserted or updated.

We would use Last_modified as the High watermark field, so we can track changes in the audit_trail table.

How change replication works

  1. Integrator always tracks what records have been extracted and what records have been loaded during a typical ETL.

  2. It is possible to setup a High watermark field for that particular transformation.

  3. When a high watermark field is set, Integrator calculates a maximum value for the field (high watermark) for the last successful load.

  4. Integrator stores this value with the metadata.

  5. The next time the ETL process is executed, Integrator uses the previously calculated high watermark value to filter out records which are older than the previous high watermark.

  6. This technique works equally well for databases, files and APIs.

  7. If the change replication is from a database, Integrator modifies the WHERE clause on the fly, to select records which are newer than the high watermark: select * from audit_trail where last_modified > ? -- where ? is a high watermark value.

  8. If the change replication is from an API or file, Integrator automatically adds a JavaScript filter which skips all records that are older than the high watermark.

Which fields could become a high watermark field?

Basically any field can be a high watermark field as long it has the data type TIMESTAMP, DATE or NUMERIC and can be used to uniquely identify recent changes.

For a database, it is recommended that there is an index for the high watermark field.

Is it possible to set two or more fields as a high watermark for a single transformation?

There can only be one high watermark field.

Tracking changes in real-time

To track changes in (almost) real-time you will need to schedule a change replication flow to be executed as often as possible. The smallest period allowed between flow runs is 30 seconds.

If a third-party application supports sending HTTP requests to Integrator, it can push changes to Integrator using the HTTP listener API.

Configuring Change Replication Flows

Change replication between two databases

Important: since we will be using a high watermark field in the WHERE clause, it is important to have an index for this field (unless your table is very small).

Step 1. Create a source and destination database connection.

Step 2. Start creating a change replication flow in the Flow Builder by clicking the + button and typing in change replication:

Change Replication Flow Type

Step 3. Continue by defining flow transformations, mapping and parameters as you would for any other flow where:

Step 4. When configuring parameters for transformations, set the High watermark field and enable change replication:

High watermark field

Step 5. Optionally, define the Source Query.

Important: typically the extract SQL used for change replication is generated automatically. If you have entered custom SQL in the Source Query field, Integrator will be able to automatically modify it by adding a predicate (a WHERE clause) for the high watermark field.

Step 6. If needed, you can configure Integrator to perform MERGE (UPSERT) on the destination database (the default is INSERT):

  1. On the Parameters screen, set the Action to MERGE. MERGE is supported for Oracle, MS SQL Server, PostgreSQL, MySQL, DB2, Informix and Sybase databases. This action can also be conditional, which requires entering conditions, using JavaScript, in the Action Conditions fields.

  2. Set the Lookup Fields - a comma-separated list of fields used to uniquely identify a record for the MERGE action. Important:: for PostgeSQL and MySQL there must be a unique index which includes all "lookup fields".

  3. If needed - specify Action Conditions. When a conditional action is selected, use this field to enter a JavaScript expression which identifies which action (INSERT, UPDATE, DELETE, MERGE) should actually be used. You can reference field values from the current source record as {"field name"}, as in this example: {Types}.equals('U') ? 'update' : 'insert';.

Change replication from a database to anywhere

Important: since we will be using a high watermark field in the WHERE clause, it is important to have an index for this field (unless your table is very small).

Step 1. Create a source database connection.

Step 2. Create a destination connection, which can be anything supported by Integrator.

Step 3. Start creating a flow in the Flow Builder by clicking the + button and typing in database. Select any flow where the source is a database:

Database Flows

Step 4. Continue by defining flow transformations, mapping and parameters as you would for any other flow when:

Step 5. When configuring the parameters for the transformations, set the High watermark field and enable change replication:

High watermark field

Step 6. Optionally, define the Source Query.

Important: typically the extract SQL used for change replication is generated automatically. If you have entered a custom SQL in the Source Query field, Integrator will be able to automatically modify it by adding a predicate (a WHERE clause) for the high watermark field.

Change replication from anywhere to anywhere

If your source is not a database - you can still set up a change replication flow which will load only the records that have changed since the last successful load.

Basically, all you need to do is to set the High watermark field and Enable change replication for the transformation:

High watermark field

Important: make sure that the high watermark field is: either TIMESTAMP or DATE (or can be converted to TIMESTAMP or DATE), or NUMERIC. When configuring a specific format in Integrator to be used for data exchange, it is always possible to set the TIMESTAMP and DATE formats:

JSON

It is also possible to configure Integrator to parse all strings and automatically detect dates and timestamps. To enable automatic date parsing - check Parse Dates.

Using a fully-qualified field name

If a database is the source for the change replication, it is possible to use a fully-qualified field name as the value for the high watermark field, as in the example below:

public.audit_trail.partition_date

It makes sense to enter a fully-qualified field name if the Source Query which drives the change replication includes JOINs, subselects and table aliases, so that field-name collisions can be avoided.

Resetting the high watermark field

Once change replication is set, Integrator will continuously update values for the high watermark fields while the flow is being executed (manually or by the scheduler).

From time to time, you might need to reset a value for the high watermark field. For example, if a new column is added to the table, you will need to run a full extraction again, in order to include the new column.

To reset a high watermark:

Step 1. Open the flow in the Flow Builder, select the Transformation and disable change replication. Save the flow.

Disable change replication

Step 2. Run the flow manually.

Step 3. Once again, enable change replication and save the flow.