Streamkap Set Up

January 9, 2025
To connect Streamkap to AWS RDS PostgreSQL, we need to ensure the database is configured to accept traffic from Streamkap by safelisting Streamkap’s IP addresses in the PostgreSQL instance.
Note: If your AWS RDS PostgreSQL instance accepts traffic from anywhere from the world you can move on to the “Configuring AWS RDS PostgreSQL Database for Streamkap Integration” section.
Safelisting Streamkap’s IP Address
Streamkap’s dedicated IP addresses are
When signing up, Oregon (us-west-2) is the default region. Let us know if you need it to be elsewhere. For more information about our IP addresses please visit this link.
- In order to safelist one of our IPs, open your RDS PostgreSQL instance’s security group with type = “CIDR/IP - Inbound” and click “Edit inbound rules”.
- Click on “Add rule”, choose “PostgreSQL” as type, “Custom” as source, and paste any of our relevant IP addresses followed by “/32”. For instance, if you want to add IP address for Oregon in the custom box it would look like “52.32.238.100/32” as shown in the below image.

Configuring AWS RDS PostgreSQL Database for Streamkap Integration
Streamkap recommends creating a dedicated user and role within your PostgreSQL instance to facilitate secure and controlled data streaming. This approach ensures granular access management and follows best practices for database security.
Access your PostgreSQL database via a tool like Dbeaver and run the following code
Step 1: Create a dedicated user and role for Streamkap
```sql
CREATE USER Streamkap_user PASSWORD '{password}';
CREATE ROLE Streamkap_role;
```
Note: You must plug in your own password
Step 2: Attach role to user and grant replication privileges to the role
```sql
GRANT Streamkap_role TO Streamkap_user;
GRANT rds_replication TO Streamkap_role;
```
Step 3: Grant permissions on the database
```sql
GRANT CONNECT ON DATABASE "{database}" TO Streamkap_role;
```
Note: In this guide our initial database at the time of configuration was “source_database”. Plug in your own database name if it is different from the one mentioned earlier.
Step 4: Grant permissions on schema(s), table(s), and future table(s)
```sql
GRANT CREATE, USAGE ON SCHEMA "{schema}" TO Streamkap_role;
```
Note: In this guide our initial schema at the time of configuration was “public”. Plug in your own schema name if it is different from the one mentioned earlier.
```sql
GRANT SELECT ON ALL TABLES IN SCHEMA "{schema}" TO Streamkap_role;
ALTER DEFAULT PRIVILEGES IN SCHEMA "{schema}" GRANT SELECT ON TABLES TO Streamkap_role;
```
Step 5: Enabling Streamkap signal schema and table
Streamkap will use the following schema and table to manage snapshot collections.
```sql
CREATE SCHEMA Streamkap;
SET search_path TO Streamkap;
CREATE TABLE Streamkap_signal
( id VARCHAR(255) PRIMARY KEY,
type VARCHAR(32) NOT NULL,
data VARCHAR(2000) NULL );
GRANT SELECT, UPDATE, INSERT ON TABLE Streamkap_signal TO Streamkap_role;
GRANT USAGE, CREATE ON SCHEMA Streamkap TO Streamkap_role;
GRANT SELECT, INSERT, UPDATE, DELETE, TRUNCATE, REFERENCES, TRIGGER ON ALL TABLES IN SCHEMA Streamkap TO Streamkap_role;
ALTER DEFAULT PRIVILEGES IN SCHEMA Streamkap
GRANT SELECT, INSERT, UPDATE, DELETE, TRUNCATE, REFERENCES, TRIGGER ON TABLES TO Streamkap_role;
GRANT ALTER ON ALL TABLES IN SCHEMA Streamkap TO Streamkap_role;
ALTER DEFAULT PRIVILEGES IN SCHEMA Streamkap GRANT ALTER ON TABLES TO Streamkap_role;
```
Step 6: Creating sample table
Let’s create a sample table from which Streamkap will stream data. If you already have an existing table(s), you would like to stream from, you can skip this step.
```sql
SET search_path TO public;
CREATE TABLE orders (
order_id SERIAL PRIMARY KEY,
customer_name VARCHAR(100) NOT NULL,
order_date DATE NOT NULL,
total_amount NUMERIC(10, 2) NOT NULL,
product_name VARCHAR(100) NOT NULL
);
INSERT INTO orders (customer_name, order_date, total_amount, product_name)
VALUES
('Alice Johnson', '2024-12-10', 120.50, 'Wireless Headphones'),
('Bob Smith', '2024-12-11', 75.00, 'Bluetooth Speaker'),
('Catherine Lee', '2024-12-12', 200.00, 'Smartwatch');
```
Step 7: Create publication(s), and replication slot
We can create publication(s) for all tables or for only the sample orders table that was created few seconds ago or for a select group of tables
```sql
-- Option 1: All Tables
CREATE PUBLICATION Streamkap_pub FOR ALL TABLES;
-- Option 2: Sample Orders Tables
CREATE PUBLICATION Streamkap_pub FOR TABLE orders;
-- Option 3: Group of Tables
CREATE PUBLICATION Streamkap_pub FOR TABLE table1, table2, table3, ...;
```
```sql
-- Create a logical replication slot
SELECT pg_create_logical_replication_slot('Streamkap_pgoutput_slot', 'pgoutput');
-- Verify the table(s) to replicate were added to the publication
SELECT * FROM pg_publication_tables;
-- Log in as the Streamkap user and verify it can read the replication slot by running the following command
SELECT count(*) FROM pg_logical_slot_peek_binary_changes('Streamkap_pgoutput_slot', null, null, 'proto_version', '1', 'publication_names', 'Streamkap_pub');
```
Adding a Destination Connector
To add Databricks as a destination connector click on “Connectors” on the left side, then click on “Destinations” and press the “+ Add” button.

You will land in the Databricks destination connection configuration page as shown below.

Plug in your desired
- Name
- Choose “append” or “insert” as ingestion mode
- Paste the personal access token and JDBC url retrieved from Databricks Warehouse Set Up section of this guide
- Key in the destination schema of your choice
Press “Save” on the right-side bottom of the screen.
If all goes well your Databricks destination will be created with status as “Active” as shown below.

Adding a Pipeline
Click on “Pipelines” on the left side of the screen, click on “+ Create” and then choose your relevant Postgres source from “select a Source” and your Databricks destination from “select a Destination” as depicted below.

Select the "Orders" table and the "Public" schema if you followed the guide. Otherwise, make the appropriate selections and click "Save."

Upon successful creation of the pipeline, it should look similar to the below screenshot with status as “Active”.

Go to your Databricks warehouse and query the “orders” table to see sub-second latency streaming in action as shown below.

By design Your “Orders” in the destination will have the following meta columns apart from the regular columns.
