NOTIFY is the “publish” command (writer)LISTEN is the “subscribe” command (reader)NOTIFY sends a message on a channelLISTENing on that channel receives the messageNOTIFY IDLISTENs, reads messages from tableTable to store messages
Subscriber listens
Publisher writes message…
… and notifies that it has been written (transactionally)
Subscriber receives notification…
'1'
… and reads message
| id | message |
|---|---|
| 1 | hi |
Table to store messages
CREATE TABLE a_queue (
id SERIAL PRIMARY KEY,
message TEXT,
processed BOOLEAN DEFAULT FALSE -- new column
)Subscriber listens
Publisher writes message…
… and notifies that it has been written (transactionally)
Example config blob
| data |
|---|
| {‘profile_id’: 1, ‘config_key’: ‘config_val’} |
Insert blob
INSERT INTO profile_config (modified_at, config)
VALUES (
now(),
json_build_object(
'profile_id', 1,
'config_key', 'config_val'
)
)Notify config_update channel with contents of blob
Subscriber LISTENs (before data was written)
Receives
{‘profile_id’: 1, ‘config_key’: ‘config_val’}
… there’s no SELECT!
Cool!
But what if the newly written config blob is big?
SELECT pg_notify(
'config_update',
json_build_object(
'profile_id', 1,
'config_key', REPEAT('a', 10000)
)::text
)--------------------------------------------------------------------------- InvalidParameterValue Traceback (most recent call last) Cell In[14], line 1 ----> 1 run_sql(""" 2 SELECT pg_notify( 3 'config_update', 4 json_build_object( 5 'profile_id', 1, 6 'config_key', REPEAT('a', 10000) 7 )::text 8 ) 9 """, print_sql=True, print_output=False) Cell In[1], line 40, in run_sql(sql, print_sql, print_output, client) 38 cursor = conn.cursor() 39 cursor.execute("SET schema 'pubsub'") ---> 40 cursor.execute(sql) 41 if print_output: 42 fetch_and_print_md_table(cursor) InvalidParameterValue: payload string too long
NOTIFY payloads have a hard 8kb limitNOTIFY instructions about what to read, not the data itself
LISTENing to postgres
Rather than NOTIFYing the ID, we NOTIFY for the user:
🟢 Pattern 🟢: NOTIFY instructions about what to read, not the data itself
LISTEN, query for the whole state
Add a column representing items that were removed from the cart
User removes from cart
🟢 Pattern 🟢: Query for the whole state, let the database manage the state
Stock price changes a lot
INSERT INTO stock_price (ticker, timestamp, price)
VALUES ('AAPL', '2025-01-01 8:30:00', 100.00),
('AAPL', '2025-01-01 8:30:01', 100.01),
('AAPL', '2025-01-01 8:30:02', 100.02)SELECT pg_notify('stock_price', 'AAPL');
SELECT pg_notify('stock_price', 'AAPL');
SELECT pg_notify('stock_price', 'AAPL')'AAPL'
'AAPL'
'AAPL'
On LISTEN, we do this aggregation query 3 times
LISTENLISTEN thread can get backed upLISTEN threadLISTENer doesn’t need to care about every messageINSERT INTO stock_price (ticker, timestamp, price)
VALUES ('AAPL', '2025-01-01 8:30:00', 100.00),
('AAPL', '2025-01-01 8:30:01', 100.01),
('AAPL', '2025-01-01 8:30:02', 100.02)Only notify once for the whole batch
LISTENer doesn’t need to care about every rowWe’ve seen all this before…
| ticker |
|---|
| AAPL |
0 to indicate it hasn’t received any messagesBut what if we missed multiple messages?
| id | ticker |
|---|---|
| 1 | AAPL |
| 2 | NVDA |
For this use case, we only care about the last message of the ones that we missed
| id | ticker |
|---|---|
| 2 | NVDA |
LISTENer doesn’t need to care about every messageNOTIFY payloads have a hard 8kb limitNOTIFY instructions about what to read, not the data itselfLISTENer doesn’t need to care about every messageLISTEN, query for the whole state, let the database manage the stateLISTEN thread can get backed upLISTEN threadQuestions?