-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path13.mq-ddl.sql
More file actions
134 lines (107 loc) · 3.76 KB
/
13.mq-ddl.sql
File metadata and controls
134 lines (107 loc) · 3.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
-- =====================================================
-- CREATE SCHEMA AND QUEUE TABLE
-- Demonstrates: Messaging layer foundation
-- =====================================================
CREATE SCHEMA IF NOT EXISTS mq;
DROP TABLE IF EXISTS mq.queue;
CREATE TABLE mq.queue
(
id BIGSERIAL PRIMARY KEY,
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP DEFAULT now(),
processed BOOLEAN DEFAULT FALSE
);
-- =====================================================
-- GENERATE RANDOM EVENTS (BATCH PRODUCER)
-- Demonstrates: Simulating a real event stream
-- =====================================================
INSERT INTO mq.queue (event_type, payload)
SELECT (ARRAY ['sensor_reading', 'sensor_failure', 'sensor_recovery'])
[1 + floor(random() * 3)],
jsonb_build_object(
'sensor_id', (1 + floor(random() * 200))::int,
'cpu', round((random() * 100)::numeric, 2),
'temperature', round((20 + random() * 15)::numeric, 2),
'status',
CASE
WHEN random() > 0.95 THEN 'FAIL'
ELSE 'OK'
END,
'ts', now() - (random() * interval '1 hour')
)
FROM generate_series(1, 50);
-- CHANGE VOLUME HERE
-- =====================================================
-- BASIC QUEUE CONSUMPTION
-- Demonstrates: Pull-based message retrieval
-- =====================================================
SELECT *
FROM mq.queue
WHERE processed = FALSE
ORDER BY created_at
LIMIT 10;
-- =====================================================
-- SAFE CONCURRENT CONSUMPTION
-- Demonstrates: Multi-consumer processing
-- =====================================================
WITH cte AS (SELECT id
FROM mq.queue
WHERE processed = FALSE
ORDER BY created_at
FOR UPDATE SKIP LOCKED
LIMIT 10)
UPDATE mq.queue
SET processed = TRUE
WHERE id IN (SELECT id FROM cte)
RETURNING *;
-- =====================================================
-- LISTEN FOR EVENTS (RUN IN SEPARATE SESSION)
-- =====================================================
LISTEN mq_channel;
-- =====================================================
-- TRIGGER FUNCTION WITH DYNAMIC PAYLOAD
-- Demonstrates: Real-time event notification
-- =====================================================
CREATE OR REPLACE FUNCTION mq.notify_queue_event()
RETURNS trigger AS
$$
BEGIN
PERFORM pg_notify(
'mq_channel',
json_build_object(
'id', NEW.id,
'event_type', NEW.event_type
)::text
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- =====================================================
-- TRIGGER ON INSERT
-- =====================================================
DROP TRIGGER IF EXISTS trg_notify_queue ON mq.queue;
CREATE TRIGGER trg_notify_queue
AFTER INSERT
ON mq.queue
FOR EACH ROW
EXECUTE FUNCTION mq.notify_queue_event();
-- =====================================================
-- INSERT RANDOM EVENTS (REAL-TIME STREAM)
-- Demonstrates: Continuous event generation
-- =====================================================
INSERT INTO mq.queue (event_type, payload)
SELECT (ARRAY ['sensor_reading', 'sensor_failure', 'sensor_recovery'])
[1 + floor(random() * 3)],
jsonb_build_object(
'sensor_id', (1 + floor(random() * 200))::int,
'cpu', round((random() * 100)::numeric, 2),
'temperature', round((20 + random() * 15)::numeric, 2),
'status',
CASE
WHEN random() > 0.97 THEN 'FAIL'
ELSE 'OK'
END,
'ts', now()
)
FROM generate_series(1, 10);