Skip to content

psteinroe/postgres-async-triggers

Repository files navigation

Postgres Async Triggers

Run asynchronous side-effects on database events - reliably stream Postgres table changes to external systems (queues, webhooks, etc.) with automatic failover and zero event loss.

How Does It Work?

  1. User manages subscriptions directly in the database (async_triggers.subscriptions table)
  2. Subscription changes automatically create database triggers on the target tables
  3. Application events fire the managed triggers, which insert into a partitioned events table
  4. The daemon streams events via Postgres logical replication and delivers them to your sink
flowchart LR subgraph Postgres A[subscriptions<br/>table] -->|creates<br/>triggers| B[trigger<br/>on users] B -->|insert<br/>into| C[events<br/>partitioned<br/>20250112..] C -->|logical<br/>replication| D[replication<br/>slot] end D -->|streams| E[daemon<br/>binary] E -->|delivers| F[sink<br/>queue/http] style Postgres fill:#e1f5ff,stroke:#0066cc,stroke-width:2px style A fill:#fff,stroke:#333,stroke-width:1px style B fill:#fff,stroke:#333,stroke-width:1px style C fill:#fff,stroke:#333,stroke-width:1px style D fill:#fff,stroke:#333,stroke-width:1px style E fill:#ffe1e1,stroke:#cc0000,stroke-width:2px style F fill:#e1ffe1,stroke:#00cc00,stroke-width:2px 
Loading
Example of generated trigger function (simplified for readability)
-- Auto-generated when you insert into subscriptions tablecreate or replacefunctionasync_triggers._publish_after_insert_on_users() returns trigger as $$ declare v_jsonb_output jsonb :='[]'::jsonb; v_base_payload jsonb := jsonb_build_object( 'tg_op', tg_op, 'tg_table_name', tg_table_name, 'tg_table_schema', tg_table_schema, 'timestamp', (extract(epoch from now()) *1000)::bigint ); begin-- Check subscription "user-signup" condition if new.email_verified= true then v_jsonb_output := v_jsonb_output || (jsonb_build_object( 'tg_name', 'user-signup', 'new', jsonb_build_object('id', new.id, 'email', new.email), 'old', null ) || v_base_payload ||'{}'::jsonb); -- payload_extensions would go here end if; -- Write to events table if any subscriptions matched if jsonb_array_length(v_jsonb_output) >0 then insert intoasync_triggers.events (payload, stream_id) select elem, 1from jsonb_array_elements(v_jsonb_output) as t(elem); end if; return new; end; $$ language plpgsql set search_path ='' security definer; -- Trigger that calls the function create constraint trigger async_triggers_insert after insert onpublic.users deferrable initially deferred for each row when ((new.email_verified= true)) execute procedure async_triggers._publish_after_insert_on_users();

Note: The actual generated code handles multiple subscriptions per table, merges when clauses with OR logic, and includes all payload_extensions. This example shows the structure for a single subscription.

Benefits

  • Single binary - No complex infrastructure or high-availability destinations required
  • Postgres-native durability - Events are stored in the database, WAL can be released immediately
  • Automatic failover - Daemon queries the events table to replay missed events after recovery
  • No data loss - As long as downtime is less than partition retention (7 days by default)

Drawbacks

  • Small overhead - Additional INSERT into events table on every subscribed operation
  • Partition management - Need to monitor partition growth if event volume is very high
  • Not for dynamic subscriptions - Each subscription change recreates database triggers (expensive operation)

Note

I'm working on a similar but separate project that supports dynamic subscriptions, but this version solves an immediate need at my company, so I decided to publish it first.

Using It

1. Requirements

  • Postgres 15+ with wal_level=logical
  • User with replication privilege

2. Configuration

Create config.yaml:

stream: id: 1# Unique stream IDpg_connection: host: localhostport: 5432name: mydbusername: postgrespassword: postgrestls: enabled: falsebatch: max_size: 1000# Events per batchmax_fill_secs: 5# Max time to fill batchsink: type: memory # Built-in test sink

3. Run the Daemon

# Start the daemon postgres-async-triggers # Or with Docker docker run -v $(pwd)/config.yaml:/config.yaml postgres-async-triggers

4. Create Subscriptions

Subscriptions define which events to capture. Simply insert into the subscriptions table:

-- Subscribe to verified user signupsinsert intoasync_triggers.subscriptions ( key, stream_id, operation, schema_name, table_name, when_clause, column_names, payload_extensions ) values ( 'user-signup', -- Unique identifier1, -- Stream ID from config'INSERT', -- Operation: INSERT, UPDATE, or DELETE'public', -- Schema name'users', -- Table name'new.email_verified = true', -- Optional filter (SQL expression) array['id', 'email', 'created_at'], -- Columns to include in payload'[]'::jsonb -- Payload extensions (see below) );

This automatically creates the trigger on public.users. Now when you insert a verified user, the event is captured.

To reduce no-op trigger recreation (important for production), use merge to only update when values actually change:

Click to show merge-based helper function
-- Helper function that only recreates triggers when subscription actually changescreate or replacefunctionset_subscriptions( p_stream_id bigint, p_subscriptions async_triggers.subscriptions[] ) returns void language plpgsql security definer set search_path to ''as $$ begin create temporary table temp_subscriptions asselect*from unnest(p_subscriptions); -- Only update if values actually changed (avoids trigger recreation) merge into async_triggers.subscriptionsas target using temp_subscriptions as source on (target.key=source.keyandtarget.stream_id= p_stream_id) when matched and ( target.operation is distinct fromsource.operationortarget.schema_name is distinct fromsource.schema_nameortarget.table_name is distinct fromsource.table_nameortarget.when_clause is distinct fromsource.when_clauseortarget.column_names is distinct fromsource.column_namesortarget.metadata is distinct fromsource.metadataortarget.payload_extensions is distinct fromsource.payload_extensions ) then updateset operation =source.operation, schema_name =source.schema_name, table_name =source.table_name, when_clause =source.when_clause, column_names =source.column_names, metadata =source.metadata, payload_extensions =source.payload_extensions when not matched then insert ( key, stream_id, operation, schema_name, table_name, when_clause, column_names, metadata, payload_extensions ) values ( source.key, p_stream_id, source.operation, source.schema_name, source.table_name, source.when_clause, source.column_names, source.metadata, source.payload_extensions ); -- Remove subscriptions not in inputdeletefromasync_triggers.subscriptionswhere stream_id = p_stream_id and not exists ( select1from temp_subscriptions whereasync_triggers.subscriptions.key =temp_subscriptions.key ); droptable temp_subscriptions; end; $$;

5. Testing Your Subscription

Now when a user signs up with verified email:

insert into users (email, email_verified) values ('[email protected]', true);

The sink receives:

{"tg_name": "user-signup", "tg_op": "INSERT", "tg_table_name": "users", "tg_table_schema": "public", "timestamp": 1703001234567, "new":{"id": 123, "email": "[email protected]", "created_at": "2024-12-12T10:30:00Z" }, "old": null }

6. Add Metadata with Payload Extensions

Add dynamic routing information or static metadata using payload_extensions:

insert intoasync_triggers.subscriptions ( key, stream_id, operation, schema_name, table_name, when_clause, column_names, payload_extensions ) values ( 'order-notification', 1, 'INSERT', 'public', 'orders', null, array['id', 'user_id', 'total'], '[{"json_path": "group_id", "expression": "new.user_id::text"},{"json_path": "queue_name", "expression": "''orders-high-priority''"} ]'::jsonb );

Result:

{"tg_name": "order-notification", "new":{"id": 456, "user_id": 123, "total": 99.99}, "group_id": "123", "queue_name": "orders-high-priority" }

Common use cases:

  • Routing: Add group_id for partitioning in Kafka/queues
  • Static metadata: Add queue_name, priority, tenant_id
  • Dynamic values: Add computed fields like new.created_at::date
  • Context info: Add auth.user_id(), current_setting('app.tenant_id')

How Failover Works

Scenario: Your message queue goes down for 2 hours.

10:00 - Queue fails, event #1000 can't be delivered 10:00 - Daemon saves checkpoint: event #1000 ID in database 10:00 - Daemon continues consuming replication stream (new events still written to events table) 12:00 - Queue recovers 12:00 - Daemon tries checkpoint event again → succeeds 12:00 - Daemon uses COPY to stream all events from table between checkpoint and current batch 12:00 - Replays all missed events in order 12:01 - Returns to normal streaming mode 

Guarantees:

  • No events lost (if downtime < 7 days partition retention)
  • Events delivered at least once (may retry on failure)
  • Order preserved within daily partitions
  • No WAL retention required (events are in table, not WAL)

Automated Partition Management

The daemon automatically manages daily partitions in the background:

Retention policy:

  • Creates partitions 3 days in advance (today, tomorrow, day after)
  • Drops partitions older than 7 days
  • Runs on startup and then daily

Monitoring & Operations

Logs:

# Normal operation RUST_LOG=info postgres-async-triggers # Debug mode RUST_LOG=debug postgres-async-triggers

Check stream state:

select*fromasync_triggers.streamswhere id =1; -- Shows: failover status, next maintenance time

Verify subscriptions:

select key, table_name, operation, when_clause fromasync_triggers.subscriptionswhere stream_id =1;

Check triggers were created:

select tgname, tgrelid::regclass from pg_trigger where tgname like'async_triggers_%';

Partition health:

select tablename from pg_tables where schemaname ='async_triggers'and tablename like'events_%'order by tablename;

Implementing Custom Sinks

Create custom sinks to deliver events to any destination (HTTP, Kafka, RabbitMQ, etc.).

Important: Sink dependencies and implementations should be behind feature flags to avoid bloating the binary. Users should only compile the sinks they actually use.

1. Add Dependencies with Feature Flags

Update Cargo.toml:

[dependencies] # Existing dependencies...# Optional sink dependenciesreqwest ={version = "0.11", features = ["json"], optional = true } rdkafka ={version = "0.36", optional = true } [features] # Sink feature flagssink-http = ["dep:reqwest"] sink-kafka = ["dep:rdkafka"]

2. Create the Sink Implementation

Create a new file src/sink/http.rs:

use etl::error::EtlResult;use reqwest::Client;use serde::Deserialize;use tracing::info;usecrate::sink::Sink;usecrate::types::TriggeredEvent;#[derive(Clone,Debug,Deserialize)]pubstructHttpSinkConfig{puburl:String,#[serde(default)]pubheaders: std::collections::HashMap<String,String>,}#[derive(Clone)]pubstructHttpSink{config:HttpSinkConfig,client:Client,}implHttpSink{pubfnnew(config:HttpSinkConfig) -> Self{let client = Client::new();Self{ config, client }}}implSinkforHttpSink{fnname() -> &'staticstr{"http"}asyncfnpublish_events(&self,events:Vec<TriggeredEvent>) -> EtlResult<()>{for event in events {letmut request = self.client.post(&self.config.url).json(&event.payload);// Add custom headersfor(key, value)in&self.config.headers{ request = request.header(key, value);}let response = request.send().await?;if !response.status().is_success(){returnErr(etl::etl_error!( etl::error::ErrorKind::Network,"HTTP request failed:{}", response.status()));}info!("published event{} to{}", event.id.id,self.config.url);}Ok(())}}

3. Register in src/sink/mod.rs

mod base;pubmod memory;#[cfg(feature = "sink-http")]pubmod http;pubuse base::Sink;

4. Add to SinkConfig Enum

Update src/config/sink.rs:

use serde::Deserialize;#[cfg(feature = "sink-http")]usecrate::sink::http::HttpSinkConfig;#[derive(Clone,Debug,Deserialize)]#[serde(tag = "type", rename_all = "lowercase")]pubenumSinkConfig{Memory,#[cfg(feature = "sink-http")]Http(HttpSinkConfig),}

5. Wire Up in Core

Update src/core.rs:

#[cfg(feature = "sink-http")]usecrate::sink::http::HttpSink;// In start_pipeline_with_config():let sink = match&config.sink{SinkConfig::Memory => MemorySink::new(),#[cfg(feature = "sink-http")]SinkConfig::Http(cfg) => HttpSink::new(cfg.clone()),};

6. Build and Use

Build with the HTTP sink feature:

cargo build --release --features sink-http

Use in config.yaml:

sink: type: httpurl: https://webhook.example.com/eventsheaders: Authorization: Bearer token123X-Custom-Header: value

About

Run asynchronous side-effects on database events.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Languages