PostgreSQL is an open-source database renowned for its advanced features, reliability, and performance. It enables the development of robust, scalable applications.
Our connectors support the most recent versions of PostgreSQL. You may choose different strategies to sync your data:
This is a standard connector that performs queries against the source database to sync data. It is the simplest approach suitable for most use cases and allows for time-stamp based CDC replication.
They are all configured in the same way and have an advanced mode.
Their basic configuration is also part of the Tutorial - Loading Data with Database Extractor.
This connector uses the Debezium connector
under the hood. The connector captures row-level changes in the schemas of a PostgreSQL database. It uses the pgoutput
logical decoding output plug-in available in PostgreSQL 10+.
Public Beta Warning:
This feature is currently in public beta. Please provide feedback using the feedback button in your project.
This connector uses the Debezium connector
under the hood. The connector captures row-level changes in the schemas of a PostgreSQL database. It uses the pgoutput
logical decoding output plug-in available in PostgreSQL 10+. It is maintained by the PostgreSQL community, and
used by PostgreSQL itself for logical replication. This plug-in is always present so no additional libraries need to be
installed.
The first time it connects to a PostgreSQL server or cluster, the connector takes a consistent snapshot of all schemas. After that snapshot is complete, the connector continuously captures row-level changes that insert, update, and delete database content and that were committed to a PostgreSQL database.
Note: The component abstracts the underlying Debezium connector configuration and provides a simplified interface for the user. This means that only a subset of the Debezium connector capabilities are exposed to the user.
When the connector starts for the first time, it performs an initial consistent snapshot of your database. This snapshot establishes a baseline for the database’s current state.
The connector completes a series of tasks during the snapshot process, which vary depending on the selected snapshot mode and the table-locking policy in effect for the database.
You can choose from different snapshot modes in the Sync Options
> Replication Mode
configuration setting.
For more technical details on how snapshots work, see the official Debezium documentation.
The connector seamlessly manages schema changes in the source database, such as ADD
and DROP
columns.
Schema changes are handled as follows:
Each result table includes the following system columns:
Name | Base Type | Note |
---|---|---|
KBC__OPERATION | STRING | Event type, e.g., r - read on init sync; c - INSERT; u - UPDATE; d - DELETE |
KBC__EVENT_TIMESTAMP_MS | TIMESTAMP | Source database transaction timestamp. Represents milliseconds since epoch if native types are not enabled. |
KBC__DELETED | BOOLEAN | True when the event is a delete event (indicating the record is deleted). |
KBC__LSN | INTEGER | Log Sequence Number (LSN) of the transaction. |
KBC__BATCH_EVENT_ORDER | INTEGER | Order of the events in the current batch (extraction). Can be used with KBC__EVENT_TIMESTAMP_MS to track the latest event per record (ID). |
MySQL datatypes are mapped to Keboola Base Types as follows:
Based on the selected JSON file, the base_type
column in the table will be updated accordingly:
source_type | base_type | note |
---|---|---|
INTEGER | INTEGER | |
SMALLINT | INTEGER | |
INTEGER | INTEGER | |
INTEGER | INTEGER | |
BIGINT | INTEGER | |
DECIMAL | NUMERIC | |
NUMERIC | NUMERIC | |
FLOAT | FLOAT | |
REAL | FLOAT | |
DOUBLE PRECISION | FLOAT | |
SMALLSERIAL | INTEGER | |
SERIAL | INTEGER | |
BIGSERIAL | INTEGER | |
MONEY | NUMERIC | |
CHARACTER | STRING | |
CHAR | STRING | |
CHARACTER VARYING | STRING | |
VARCHAR | STRING | |
TEXT | STRING | |
BYTEA | STRING | |
TIMESTAMP | TIMESTAMP | |
TIMESTAMP WITH TIME ZONE | TIMESTAMP | |
DATE | DATE | |
TIME | STRING | HH:MM:SS format |
TIME WITH TIME ZONE | STRING | HH:MM:SS+TZ format |
INTERVAL | STRING | |
BOOLEAN | BOOLEAN | |
POINT | STRING | |
CIDR | STRING | |
INET | STRING | |
MACADDR | STRING | |
MACADDR8 | STRING | |
BIT | STRING | |
BIT VARYING | STRING | |
UUID | STRING | |
XML | STRING | |
JSON | STRING | |
JSONB | STRING | |
INTEGER[] | STRING | |
INT4RANGE | STRING | |
LTREE | STRING | Contains the string representation of a PostgreSQL LTREE value. |
CITEXT | STRING |
Other types are not supported, and such columns will be skipped from syncing.
The connector requires a user with the rds_replication
role.
To enable a user account other than the master account to initiate logical replication,
you must grant the account the rds_replication
role. For example,
grant rds_replication to <my_user>
There are several options for determining how publications are created. In general, it is best to manually create publications for the tables you want to capture before setting up the connector.
This is the recommended approach. Set the connector’s Publication Auto Create Mode
to disabled
.
This way, the connector will not attempt to create a publication. A database administrator or
the user configured for replication should create the publication before running the connector.
You can create a publication manually for a specific table, use the following SQL command:
CREATE PUBLICATION my_publication FOR TABLE my_table;
Or, to create a publication for all tables in a schema:
CREATE PUBLICATION my_publication FOR ALL TABLES;
Then set the Publication Name
in the connector configuration to my_publication
.
Note: To create a PostgreSQL publication, the connector must run as a user with the following privileges:
CREATE
privileges on the database to add publications.SELECT
privileges on the tables to copy the initial data. Table owners automatically have SELECT
permission.Set the connector’s Publication Auto Create Mode
to one of the following options:
all_tables
In this mode, the user must be a superuser to add all tables to the publication.
If a publication does not exist, the connector creates a publication for all tables in the database from which the connector captures changes,
using the following SQL command: CREATE PUBLICATION <publication_name> FOR ALL TABLES;
filtered
To add tables to a publication, the user must be the table owner. If the source table already exists, you need a mechanism to share ownership with the original owner. To enable shared ownership, create a PostgreSQL replication group and add both the original table owner and the replication user to the group.
Procedure
CREATE ROLE `<replication_group>`;
GRANT REPLICATION_GROUP TO <original_owner>;
GRANT REPLICATION_GROUP TO <replication_user>;
ALTER TABLE <table_name> OWNER TO REPLICATION_GROUP;
Connector Publication Creation Process
Datasource
connector configuration. For
example:
CREATE PUBLICATION <publication_name> FOR TABLE <tbl1, tbl2, tbl3>
ALTER PUBLICATION <publication_name> SET TABLE <tbl1, tbl2, tbl3>
Note that each configuration of the connector creates a new publication with a unique name. The publication name contains configuration_id and, alternatively, branch_id if it’s a branch configuration. The publication name is generated as follows:
kbc_publication_{config_id}_prod
” for production configurationkbc_publication_{config_id}_dev_{branch_id}
” for branch configuration.Note: be careful when running configurations in a Development branch. Once the branch is deleted, the assigned publication still exists and it’s not deleted automatically. It is recommended to clean up any unused dev publications manually or using a script.
Note that each configuration of the connector creates a new slot with a unique name. The slot name contains configuration_id and alternatively branch id if it’s a branch configuration. The slot name is generated as follows:
slot_kbc_{config_id}_prod
” for production configurationslot_kbc_{config_id}_dev_{branch_id}
” for branch configuration.Note: Be cautious when running configurations in a development branch. Once the branch is deleted, any created slots will still exist and are not deleted automatically. This can lead to increased WAL log size. It’s recommended to clean up any unused slots manually or with a script.
Having multiple publications (connector configurations) can have performance implications. Each publication will have its own set of triggers and other replication mechanisms, which can increase the load on the database server. However, this can also be beneficial if different publications have different performance requirements or if they need to be replicated to different types of subscribers with different capabilities.
In certain cases, it is possible for PostgreSQL disk space consumed by WAL files to spike or increase out of the usual proportions. There are several possible reasons for this situation:
The LSN up to which the connector has received data is available in the confirmed_flush_lsn
column of the server’s pg_replication_slots
view. Data that is older than this LSN is no longer available, and the database is responsible for reclaiming the disk space.
pg_replication_slots
view, the restart_lsn
column contains the LSN of the oldest WAL that the connector might require. If the value for confirmed_flush_lsn regularly increases and the value of restart_lsn lags, the database needs to reclaim the space.There are many updates in a database being tracked, but only a tiny number of updates are related to the table(s) and schema(s) for which the connector is capturing changes. This situation can be easily solved with periodic heartbeat events. Set the heartbeat.interval.ms connector configuration property.
Note:** For the connector to detect and process events from a heartbeat table, you must add the table to the PostgreSQL publication created by the connector. **You can do that by selecting the heartbeat table in the Datasource > Tables to sync
configuration property.
heartbeat > interval.ms
connector configuration property.A separate process would then periodically update the table by either inserting a new row or repeatedly updating the same row. PostgreSQL then invokes Debezium, which confirms the latest LSN and allows the database to reclaim the WAL space. This task can be automated by means of theheart beat > action query
connector configuration property.
TIP: For users on AWS RDS with PostgreSQL, a situation similar to the high-traffic/low-traffic scenario can occur in an idle environment. AWS RDS causes writes to its own system tables to be invisible to clients on a frequent basis (5 minutes). Again, regularly emitting events solves the problem.
Prerequisites
Before enabling the Heartbeat signals, the Heartbeat table must be created in the source database. Recommended heartbeat table schema:
CREATE SCHEMA IF NOT EXISTS kbc;
CREATE TABLE kbc.heartbeat (id SERIAL PRIMARY KEY, last_heartbeat TIMESTAMP NOT NULL DEFAULT NOW());
INSERT INTO kbc.heartbeat (last_heartbeat) VALUES (NOW());
The connector will then perform an UPDATE query on that table in the selected interval. It is recommended that you use UPDATE query to avoid table bloat.
Enable the heartbeat signals:
heartbeat > Heartbeat interval [ms]
connector configuration property to the desired interval in milliseconds.heartbeat > Action query
connector configuration property to the desired query that will be executed on the heartbeat table.
UPDATE kbc.heartbeat SET last_heartbeat = NOW()
Datasource > Tables to sync
configuration property to track the heartbeat table and make sure it is contained in the publication.This connector uses the Debezium connector under the hood. The following instructions are partially taken from there.
This connector currently uses the native pgoutput
logical replication stream support that is available only
in PostgreSQL 10+
.
Currently, lower versions are not supported, but it is theoretically possible (please submit a feature request).
When not running in read_only
mode, the connector requires access to a signaling table in the source database. This signaling table is used by the connector to store various signal events and incremental snapshot watermarks.
You create a signaling table by submitting a standard SQL DDL query to the source database.
Prerequisites
You have sufficient access privileges to create a table on the source database.
Procedure
Submit an SQL query to the source database to create a table that is consistent with the required structure, as shown in the following example:
The following example shows a CREATE TABLE command that creates a three-column debezium_signal table:
CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data TEXT NULL);
For this connector to work, you must enable a replication slot and configure a user with sufficient privileges to perform the replication.
It is possible to capture changes in a PostgreSQL database that is running in link: Amazon RDS. To do this:
rds.logical_replication
to 1
.wal_level
parameter is set to logical
by running the query SHOW wal_level
as the database RDS
master user.
This might not be the case in multi-zone replication setups.
You cannot set this option manually.
It is the
link: automatically changed
when the rds.logical_replication
parameter is set to 1
.
If the wal_level
is not set to logical
after you make the preceding change, it is probably because the instance
has to be restarted after the parameter group change.
Restarts occur during your maintenance window, or you can initiate a restart manually.plugin.name
parameter to pgoutput
.rds_replication
role.
The role grants permissions to manage logical slots and to stream data using logical slots.
By default, only the master user account on AWS has the rds_replication
role on Amazon RDS.
To enable a user account other than the master account to initiate logical replication, you must grant the account
the rds_replication
role.
For example, grant rds_replication to _<my_user>_
. You must have superuser
access to grant the rds_replication
role to a user.
To enable accounts other than the master account to create an initial snapshot, you must grant SELECT
permission to
the accounts on the tables to be captured.
For more information about security for PostgreSQL logical replication, see the
link: PostgreSQL documentation.It is possible to use {prodname} with Azure Database for PostgreSQL,
which has support for the pgoutput
logical decoding.
Set the Azure replication support to logical
. You can use the
link: Azure CLI or
the Azure Portal to configure
this. For example, to use the Azure CLI, here are
the: az postgres server
commands that
you need to execute:
az postgres server configuration set --resource-group mygroup --server-name myserver --name azure.replication_support --value logical
az postgres server restart --resource-group mygroup --name myserver
It is possible to use {prodname} with CrunchyBridge; logical replication is already
turned on. The pgoutput
plugin is available. You will have to create a replication user and provide the correct
privileges.
. To configure the replication slot regardless of the decoder being used, specify the following in the postgresql.conf
file:
# REPLICATION
wal_level = logical // Instructs the server to use logical decoding with the write-ahead log.
Depending on your requirements, you may have to set other PostgreSQL streaming replication parameters when using
{prodname}.
Examples include max_wal_senders
and max_replication_slots
for increasing the number of connectors that can access
the sending server concurrently and wal_keep_size
for limiting the maximum WAL size which a replication slot will
retain.
For more information about configuring streaming replication, see the
link:https://www.postgresql.org/docs/current/runtime-config-replication.html#RUNTIME-CONFIG-REPLICATION-SENDER[PostgreSQL
documentation].
Debezium uses PostgreSQL’s logical decoding, which uses replication slots. Replication slots are guaranteed to retain all WAL segments required for Debezium even during Debezium outages. For this reason, it is important to closely monitor replication slots to avoid excessive disk consumption and other conditions that can happen, such as catalog bloat if a replication slot stays unused for too long. For more information, see the PostgreSQL streaming replication documentation.
If you are working with a synchronous_commit
setting other than on
,
the recommendation is to set wal_writer_delay
to a value such as 10 milliseconds to achieve a low latency of change
events.
Otherwise, its default value is applied, which adds a latency of about 200 milliseconds.
TIP: Reading and understanding PostgreSQL documentation about the mechanics and configuration of the PostgreSQL write-ahead log is strongly recommended. endif::community[]
The connector requires appropriate permissions to:
rds_replication
The connector requires a user with the rds_replication
role.
To enable a user account other than the master account to initiate logical replication,
you must grant the account the rds_replication role. For example, grant rds_replication to <my_user>
.
Setting up a PostgreSQL server to run a Debezium connector requires a database user who can perform replications. Replication can be performed only by a database user with appropriate permissions and only for a configured number of hosts.
Although superusers have the necessary REPLICATION
and LOGIN
roles by default, it is best not to provide the
Keboola replication user with elevated privileges.
Instead, create a Keboola user with the minimum required privileges.
PostgreSQL administrative permissions.
To provide a user with replication permissions, define a PostgreSQL role that has at least the REPLICATION
and LOGIN
permissions, and then grant that role to the user.
For example:
CREATE ROLE __<name>__ REPLICATION LOGIN;
Setting privileges to enable Keboola to create PostgreSQL publications when you use pgoutput
:
Keboola(Debezium) streams change events for PostgreSQL source tables from publications that are created for the tables. Publications contain a filtered set of change events that are generated from one or more tables. The data in each publication is filtered based on the publication specification. The specification can be created by the PostgreSQL database administrator or by the {prodname} connector. To permit the {prodname} PostgreSQL connector to create publications and specify the data to replicate to them, the connector must operate with specific privileges in the database.
There are several options for determining how publications are created. In general, it is best to manually create publications for the tables that you want to capture, before you set up the connector. However, you can configure your environment to permit the Keboola connector to create publications automatically, and to specify the data that is added to them.
The Keboola connector uses include-list and exclude-list properties to specify how data is inserted in the publication.
For the Keboola connector to create a PostgreSQL publication, it must run as a user that has the following privileges:
CREATE
privileges on the database to add publications.SELECT
privileges on the tables to copy the initial table data. Table owners automatically have SELECT
permission
for the table.To add tables to a publication, the user must be the owner of the table. However, because the source table already exists, you need a mechanism to share ownership with the original owner. To enable shared ownership, you create a PostgreSQL replication group and then add the existing table owner and the replication user to the group.
CREATE ROLE _<replication_group>_;
GRANT REPLICATION_GROUP TO __<original_owner>__;
GRANT REPLICATION_GROUP TO __<replication_user>__;
<replication_group>
.ALTER TABLE __<table_name>__ OWNER TO REPLICATION_GROUP;
You may opt to use an SSH Tunnel to secure your connection. The [developer documentation](https://developers.keboola.com/integrate/database/ provides detailed instructions for setting up an SSH tunnel. While setting up an SSH tunnel requires some work, it is the most reliable and secure option for connecting to your database server.
The column filters are used to specify which columns should be included in the extraction. The list can be defined as a
comma-separated list of
fully-qualified names, i.e., in the form schemaName.tableName.columnName
.
To match the name of a column, the connector applies the regular expression that you specify as an anchored regular expression. The expression is used to match the entire name string of the column; it does not match substrings that might be present in a column name.
TIP: To test your regex expressions, you can use online tools such as this one.
None
: No filter applied, all columns in all tables will be extracted.Include List
: The columns to be included in the CDC.Exclude List
: The columns to be excluded from the CDC.Column masks are used to mask sensitive data in the extraction.
Specify a comma-separated list of fully qualified column names in the format schemaName.tableName.columnName
.
The connector applies the specified expression as an anchored regular expression to match the full column name, not partial substrings that may appear in the name.
There are two types of masks available:
This mask replaces the length of string columns in the output data with a specified number of *
characters.
For more details, refer to the Debezium documentation.
This mask hashes string columns in the output data using a selected algorithm and salt.
You may choose from various hashing algorithms, such as SHA-256
, SHA-512
, MD5
, and SHA-1
.
Based on the hash function used, referential integrity is maintained while column values are replaced with pseudonyms.
Supported hash functions are described in the MessageDigest section of the Java Cryptography Architecture Standard Algorithm Name Documentation.
Note: Hashing strategy version 2 is used to ensure consistency across job runs and configurations.
For more details, refer to the Debezium documentation.
Standard
: The connector performs an initial consistent snapshot of each of your databases and reads
the binlog from the point at which the snapshot was made.Changes only
: The connector reads the changes from the binlog immediately, skipping the initial load.Base64
: represents binary data as a base64-encoded String.Base64-url-safe
: represents binary data as a base64-url-safe-encoded String.Hex
: represents binary data as a hex-encoded (base16) String.Bytes
: represents binary data as a byte array.10240
.Enable heartbeat signals to prevent the consumption of WAL disk space. The connector will periodically emit a heartbeat signal to the selected table.
Note: The heartbeat signal must also be selected in the Datasource > Tables to sync
configuration property. For more information, see the WAL disk space consumption section.
3000
(3 s).The destination is a mandatory configuration option that specifies how the data is loaded into the destination tables.
The Load Type
configuration option specifies how the data is loaded into the destination tables.
The following options are available:
Incremental Load - Deduplicated
: The connector upserts records into the destination table. The connector uses the
primary key to perform an upsert. The connector does not delete records from the destination table.Incremental Load - Append
: The connector produces no primary key. The order of the events will be given by
the KBC__EVENT_TIMESTAMP_MS
column + helper KBC__BATCH_EVENT_ORDER
column which contains the order in one batch.Full Load - Deduplicated
: The destination table data will be replaced with the current batch and deduplicated by the
primary key.Full Load - Append
: The destination table data will be replaced with the current batch, which will not be deduplicated.