-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinterface.rb
More file actions
148 lines (125 loc) · 5.38 KB
/
interface.rb
File metadata and controls
148 lines (125 loc) · 5.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# Creates connection to the RabbitMQ server (in this case the docker container 'rabbitmq')
require 'bunny'
require 'net/http'
require 'uri'
require 'json'
HOST = 'rabbitmq'
attempts = 0
begin
connection = Bunny.new(:host => HOST, :port => 5672, :user => ENV["RABBIT_USER"], :pass => ENV["RABBIT_PASSWORD"], :vhost => "/")
status = connection.start
rescue Bunny::TCPConnectionFailedForAllHosts
puts 'Connection failed: TCPConnectionFailedForAllHosts'
end
while (status == nil) do
sleep 10
begin
status = connection.start
rescue Bunny::TCPConnectionFailedForAllHosts
puts "Connection retry ##{attempts} failed: TCPConnectionFailedForAllHosts"
end
attempts += 1
if attempts > 10
puts "Connection attemps failed 10 times, stopping"
break
end
end
# Once the connection has been successful, subscribe to the 'sdbm' channel, and create a connection to the Jena query endpoint
# (for future use)
channel = connection.create_channel
queue = channel.queue("sdbm")
uri = URI.parse("http://jena:3030/sdbm/update")
http = Net::HTTP.new(uri.host, uri.port)
# Next, start waiting for incoming messages into the RabbitMQ "mailbox"
begin
puts 'Waiting for messages. Q: CTRL-C'
queue.subscribe(block: true) do |_delivery_info, _properties, body|
message = JSON.parse(body)
if message['action'] == "destroy"
# The message action will either be "destroy" or "update". In the case of destroy, the query simply deletes every triple
# with the given entity as its 'subject'. For update, each field is iterated over and the triple is deleted and rewritten.
# For example, the 'destroy' query is as follows:
query = %Q(
PREFIX sdbm: <https://sdbm.library.upenn.edu/>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
DELETE { ?subject ?predicate ?object }
WHERE {
BIND (<https://sdbm.library.upenn.edu/#{message['model_class']}/#{message['id']}> as ?subject) .
OPTIONAL { ?subject ?predicate ?object }
}
)
# Finally, the query is sent and the response is examined to see if it was successful. If it fails immediately, this is sent
# back in the response, but most of the time the actual sending is fine. In this second case, the update is recorded as
# 'sent', since it takes an unknown (to the script) amount of time for the update to be processed. This is all checked each
# day using the verify_jena rake/cron task.
begin
request = Net::HTTP::Post.new(uri.request_uri)
request.set_form_data({"update" => query})
request.basic_auth("admin", ENV["ADMIN_PASSWORD"])
response = http.request(request)
if response.code.to_i != 200
puts "PROBLEM: #{response}: code #{response.code.inspect} \n #{query}"
end
status_queue = channel.queue("sdbm_status")
status_queue.publish({id: message['response_id'], code: response.code, message: response.message}.to_json)
rescue Exception => err
status_queue = channel.queue("sdbm_status")
status_queue.publish({id: message['response_id'], code: "404", message: err.to_s }.to_json)
end
elsif message['action'] == "update"
query = %Q(
PREFIX sdbm: <https://sdbm.library.upenn.edu/>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
)
message['fields'].each do |field, new_value|
predicate = "sdbm:#{message['model_class']}_#{field}"
# When the record is updated, the process is the same, but as well as the old triple being deleted (for each triple in the
# record), the new triple is added with the new value.
query += %Q(
DELETE { ?subject #{predicate} ?object }
)
# Don't insert triples with empty objects
unless new_value.to_s.empty?
query += %Q(
INSERT { ?subject #{predicate} #{new_value} }
)
end
query += %Q(
WHERE {
BIND (<https://sdbm.library.upenn.edu/#{message['model_class']}/#{message['id']}> as ?subject) .
OPTIONAL { ?subject #{predicate} ?object }
};
)
end
query += %Q(
DELETE { ?subject <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> ?object }
INSERT { ?subject <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://sdbm.library.upenn.edu/#{message['model_class']}> }
WHERE {
BIND (<https://sdbm.library.upenn.edu/#{message['model_class']}/#{message['id']}> as ?subject) .
OPTIONAL { ?subject <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> ?object }
};
)
# The message response handling is the same in both cases, however.
begin
request = Net::HTTP::Post.new(uri.request_uri)
request.set_form_data({"update" => query})
request.basic_auth("admin", ENV["ADMIN_PASSWORD"])
response = http.request(request)
if response.code.to_i != 200
puts "PROBLEM: #{response}: code #{response.code.inspect} \n #{query}"
end
status_queue = channel.queue("sdbm_status")
status_queue.publish({id: message['response_id'], code: response.code, message: response.message}.to_json)
rescue Exception => err
status_queue = channel.queue("sdbm_status")
status_queue.publish({id: message['response_id'], code: "404", message: err.to_s }.to_json)
end
else
puts "OTHER: #{message}"
end
end
rescue Interrupt => err
connection.close
exit(0)
end
# And that's it!