Skip to content
This repository was archived by the owner on Apr 24, 2023. It is now read-only.

Commit 7535e6b

Browse files
committed
working mirrored share and quota from postgres
1 parent 7ff828a commit 7535e6b

10 files changed

Lines changed: 499 additions & 167 deletions

File tree

scheduler/src/cook/caches.clj

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
[cook.cache :as ccache]
44
[cook.config :as config]
55
[mount.core :as mount])
6-
(:import (com.google.common.cache Cache CacheBuilder)
6+
(:import (com.google.common.cache Cache CacheLoader CacheBuilder)
77
(java.util.concurrent TimeUnit)))
88

99
(defn new-cache [config]
@@ -23,14 +23,6 @@
2323
(.expireAfterAccess (get-in config [:settings :passport :job-cache-expiry-time-hours]) TimeUnit/HOURS)
2424
(.build)))
2525

26-
;(defn resource-limit-cache [config]
27-
; "Build a new cache for resource limits"
28-
; (-> (CacheBuilder/newBuilder)
29-
; (.maximumSize (or (get-in config [:settings :pg-config :resource-limit-cache-size]) 10000))
30-
; (.expireAfterAccess (or (get-in config [:settings :pg-config :resource-limit-cache-expiry-time-seconds]) 30) TimeUnit/SECONDS)
31-
; (.refreshAfterWrite (or (get-in config [:settings :pg-config :resource-limit-cache-refresh-time-seconds]) 10) TimeUnit/SECONDS)
32-
; (.build)))
33-
3426
(defn lookup-cache-datomic-entity!
3527
"Specialized function for caching where datomic entities are the key.
3628
Extracts :db/id so that we don't keep the entity alive in the cache."
@@ -54,5 +46,4 @@
5446
(mount/defstate ^Cache pool-name->db-id-cache :start (new-cache config/config))
5547
(mount/defstate ^Cache user-and-pool-name->quota :start (new-cache config/config))
5648
(mount/defstate ^Cache instance-uuid->job-uuid :start (passport-cache config/config))
57-
(mount/defstate ^Cache job-uuid->job-map :start (passport-cache config/config))
58-
;(mount/defstate ^Cache resource-limits :start (resource-limit-cache config/config))
49+
(mount/defstate ^Cache job-uuid->job-map :start (passport-cache config/config))

scheduler/src/cook/quota_pg.clj

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@
8888
(def ratelimit-quota-fields #{:resource.type/launch-rate-saved
8989
:resource.type/launch-rate-per-minute})
9090

91-
9291
(defn maybe-flush-ratelimit
9392
"Look at the quota being updated (or retracted) and flush the rate limit
9493
if it's a rate-limit related quota"
@@ -102,7 +101,7 @@
102101

103102
(defn retract-quota!
104103
[conn user pool-name reason]
105-
(resource-limit/retract-quota! (defaultify-pool pool-name) user))
104+
(resource-limit/retract-quota! (defaultify-pool pool-name) user reason))
106105

107106
(defn set-quota!
108107
"Set the quota for a user. Note that the type of resource must be in the
@@ -122,14 +121,11 @@
122121
and returns the `default-user` value if a user is not returned.
123122
This is usefully if the application will go over ALL users during processing"
124123
[db pool-name]
125-
; TODO: Cache should be a global cache we refresh every minute and use. See text in cook.quotsshare.
126-
(let [cache (resource-limit/sql-result->quotamap (resource-limit/get-all-resource-limits))]
127-
(fn [user] (resource-limit/get-quota-pool-user-from-cache cache (defaultify-pool pool-name) user))))
124+
(let [defaulted-pool-name (defaultify-pool pool-name)]
125+
(fn [user] (resource-limit/get-quota-pool-user defaulted-pool-name user))))
128126

129127
(defn create-pool->user->quota-fn
130128
"Creates a function that takes a pool name, and returns an equivalent of user->quota-fn for each pool"
131129
[db]
132-
; TODO: Cache should be a global cache we refresh every minute and use. See text in cook.quotsshare.
133-
(let [cache (resource-limit/sql-result->quotamap (resource-limit/get-all-resource-limits))]
134-
(fn [pool-name]
135-
(fn [user] (resource-limit/get-quota-pool-user-from-cache cache (defaultify-pool pool-name) user)))))
130+
(fn [pool-name]
131+
(fn [user] (resource-limit/get-quota-pool-user (defaultify-pool pool-name) user))))

scheduler/src/cook/resource_limit.clj

Lines changed: 127 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
[clojure.tools.logging :as log]
1919
[clojure.walk :as walk]
2020
[cook.cache :as ccache]
21-
[cook.caches :as caches]
2221
[cook.config :as config]
2322
[cook.datomic]
2423
[cook.pool :as pool]
@@ -29,44 +28,100 @@
2928
[datomic.api :as d]
3029
[mount.core :as mount]
3130
[next.jdbc :as sql]
32-
[plumbing.core :as pc]))
31+
[plumbing.core :as pc])
32+
(:import (com.google.common.cache CacheLoader CacheBuilder LoadingCache)
33+
(java.util.concurrent TimeUnit)))
3334

3435
(def default-user "default")
3536

3637
(defn- net-map
3738
"A helper function for turning a list of tuples into a set of layered maps.
3839
Given a list of maps, a function for extracting a key from the maps, group the map
3940
by that key, then call another function on the underlying map."
40-
[amap key-fn sub-fn]
41-
(->> amap
41+
[maps key-fn sub-fn]
42+
(->> maps
4243
(group-by key-fn)
4344
(pc/map-vals sub-fn)))
4445

45-
(defn- sql-result->split-quotshares
46-
"Returns a map from 'quotssharetype -> pool -> user -> resource -> amount.
47-
Convenience debugging method to show an entire entire resource limit map"
48-
[sql-result]
49-
;; BROKEN: NEEDS KEYWORDS HERE.
50-
(let [split-by-resource-name (fn [key] (net-map key :resource_name #(-> % first :amount)))
51-
split-by-user-name (fn [key] (net-map key :user_name split-by-resource-name))
52-
split-by-pool-name (fn [key] (net-map key :pool_name split-by-user-name))
53-
split-by-resource-limit-type (fn [key] (net-map key :resource_limit_type split-by-pool-name))]
54-
(split-by-resource-limit-type sql-result)))
55-
56-
(let [miss-fn
57-
(fn [_]
58-
(sql/execute! (pg/pg-db) ["SELECT resource_limit_type, pool_name, user_name, resource_name, amount from resource_limits"]))]
59-
(defn get-all-resource-limits
60-
"Get everything in resource_limits. Values might be cached."
61-
[]
62-
;(ccache/lookup-cache! caches/resource-limits identity miss-fn :all-resource-limits)
63-
))
64-
65-
(defn query-quota-pool-user
66-
"Do a query for the quota for a specific user and pool in resource_limits"
67-
[pool user]
68-
(sql/execute! (pg/pg-db) ["SELECT resource_limit_type, pool_name, user_name, resource_name, amount from resource_limits WHERE resource_limit_type = 'quota' AND pool_name = ? and user_name = ?" pool user]))
46+
(defn make-resource-limit-map-from-sql-row
47+
"Given a :resource_name and :amount keys in a sql result, map them into the resource limit keywords (:count, :gpus, etc.) and assoc onto the result."
48+
[result {:keys [:resource_limits/resource_name :resource_limits/amount]}]
49+
(assoc result
50+
(case resource_name
51+
"count" :count
52+
"cpus" :cpus
53+
"mem" :mem
54+
"gpus" :gpus
55+
"launch-rate-saved" :launch-rate-saved
56+
"launch-rate-per-minute" :launch-rate-per-minute)
57+
(if (= resource_name "count")
58+
(int amount)
59+
amount)))
6960

61+
(defn sql-result->resource-limits-map
62+
"Take a sql result of querying resource_limits and turn into a map
63+
resource-limit-type -> pool -> user -> {:cpu ... :mem ... :count ... ...}"
64+
[sql-rows]
65+
"Returns a map from 'resource-limit-type -> pool -> user -> resource-limit-map"
66+
(let [split-by-resource-name (fn [sql-rows] (reduce make-resource-limit-map-from-sql-row {} sql-rows))
67+
split-by-user-name (fn [sql-rows] (net-map sql-rows :resource_limits/user_name split-by-resource-name))
68+
split-by-pool-name (fn [sql-rows] (net-map sql-rows :resource_limits/pool_name split-by-user-name))
69+
split-by-resource-limit-type (fn [sql-rows] (net-map sql-rows :resource_limits/resource_limit_type split-by-pool-name))]
70+
(split-by-resource-limit-type sql-rows)))
71+
72+
(defn query-all-resource-limits
73+
"Get everything in resource_limits"
74+
[]
75+
(sql/execute! (pg/pg-db) ["SELECT resource_limit_type, pool_name, user_name, resource_name, amount from resource_limits"]))
76+
77+
(defn make-full-resource-limits-map
78+
"Query the database to make a full map of all resource limits - including for all resource limit types, pools, and users."
79+
[]
80+
(sql-result->resource-limits-map (query-all-resource-limits)))
81+
82+
(defn make-resource-limits-cache []
83+
"Build a new cache for resource limits"
84+
(-> (CacheBuilder/newBuilder)
85+
(.maximumSize (or (get-in config/config [:settings :pg-config :resource-limit-cache-size]) 10000))
86+
(.expireAfterAccess (or (get-in config/config [:settings :pg-config :resource-limit-cache-expiry-time-seconds]) 30) TimeUnit/SECONDS)
87+
(.refreshAfterWrite (or (get-in config/config [:settings :pg-config :resource-limit-cache-refresh-time-seconds]) 10) TimeUnit/SECONDS)
88+
(.build (proxy [CacheLoader] []
89+
(load [_]
90+
(log/info "Loading resource-limits using CacheLoader")
91+
(make-full-resource-limits-map))))))
92+
93+
(def resource-limits-cache-atom (atom nil))
94+
(def initialization-promise-atom (atom nil))
95+
96+
(defn initialize-resource-limits-cache!
97+
"Initialize the resource-limits-cache exactly once"
98+
[]
99+
(let [p (promise)]
100+
(if (compare-and-set! initialization-promise-atom nil p)
101+
(let [^LoadingCache resource-limits-cache (make-resource-limits-cache)]
102+
(reset! resource-limits-cache-atom resource-limits-cache)
103+
(deliver p resource-limits-cache)
104+
resource-limits-cache)
105+
(deref @initialization-promise-atom 5000 nil))))
106+
107+
(defn ^LoadingCache get-resource-limits-cache
108+
"Get the resource-limits-cache. This method ensures it exists."
109+
[]
110+
(or @resource-limits-cache-atom (initialize-resource-limits-cache!)))
111+
112+
(defn invalidate-resource-limits-cache!
113+
"Invalidate the resource-limits-cache"
114+
[]
115+
(.invalidateAll (get-resource-limits-cache)))
116+
117+
(defn get-resource-limits-map
118+
"Get everything in resource_limits. Values might be cached."
119+
[]
120+
(ccache/lookup-cache! (get-resource-limits-cache) identity
121+
(fn [_]
122+
(log/info "Loading resource-limits because of a cache miss")
123+
(make-full-resource-limits-map))
124+
:all-resource-limits))
70125

71126
; Some defaults to be effectively infinity if you don't configure quotas explicitly.
72127
; 10M jobs and 10k/sec sustained seems to have a lot of headroom. Don't want to go into the billions
@@ -80,94 +135,69 @@
80135
; Set of all quota-relevant resource types (and default values)
81136
(def all-quota-resource-types (merge {:count Integer/MAX_VALUE :launch-rate-saved default-launch-rate-saved :launch-rate-per-minute default-launch-rate-per-minute} all-mesos-resource-types))
82137

83-
(defn make-quotadict-from-val
84-
"Given a :resource_name and :amount keys in a sql result, map them into the quota keywords (:count, :gpus, etc.) and assoc onto the result."
85-
[result {:keys [:resource_limits/resource_name :resource_limits/amount] :as tuple}]
86-
(assoc result
87-
(case resource_name
88-
"count" :count
89-
"cpus" :cpus
90-
"mem" :mem
91-
"gpus" :gpus
92-
"launch-rate-saved" :launch-rate-saved
93-
"launch-rate-per-minute" :launch-rate-per-minute)
94-
(if (= resource_name "count")
95-
(int amount)
96-
amount)))
97-
98-
(defn split-one-resource-type
99-
"Take either a sql result of quota or share and turn into a map
100-
pool -> user -> {:cpu ... :mem ... :count ... ...}"
101-
[quota-subset-sql-result]
102-
"Returns a map from 'pool -> user -> quota-map"
103-
(let [split-by-resource-name (fn [key] (reduce make-quotadict-from-val {} key))
104-
split-by-user-name (fn [key] (net-map key :resource_limits/user_name split-by-resource-name))
105-
split-by-pool-name (fn [key] (net-map key :resource_limits/pool_name split-by-user-name))]
106-
(split-by-pool-name quota-subset-sql-result)))
107-
108-
(defn sql-result->quotamap
109-
"Given a sql result, extract just the 'quota' fields and turn into a map:
110-
pool -> user -> {:cpu ... :mem ... :count ... ...}"
111-
[sql-result]
112-
(let [split-by-type (group-by :resource_limits/resource_limit_type sql-result)
113-
sqlresult-quota (get split-by-type "quota")]
114-
; TODO: This should just cache all of the quota maps and refresh every 30 seconds into a global var for quota and share.
115-
(split-one-resource-type sqlresult-quota)))
116-
117-
118-
; TODO: This shouldn't exist. We should just cache all of the quota maps and refresh every 30 seconds. This then just delecates to the cache case with the global cache.
119-
(defn get-quota-dict-pool-user
120-
[pool-name user]
121-
(-> (query-quota-pool-user pool-name user)
122-
sql-result->quotamap
123-
(get-in [pool-name user])))
124-
125-
; TODO: This shouldn't exist. We should just cache all of the quota maps and refresh every 30 seconds. This then just delecates to the cache case with the global cache.
126138
(defn get-quota-pool-user
127139
[pool-name user]
128140
(assert pool-name)
129-
(merge
130-
all-quota-resource-types
131-
(or (get-quota-dict-pool-user pool-name default-user) {})
132-
(or (get-quota-dict-pool-user pool-name user) {})))
133-
134-
(defn get-quota-dict-pool-user-from-cache
135-
[cache pool-name user]
136-
(get-in cache [pool-name user]))
137-
138-
(defn get-quota-pool-user-from-cache
139-
"Cache is a quota map, as returned from get-all-resource-limits and fed through sql-result->quotamap"
140-
[cache pool-name user]
141+
(let [user->quota (get-in (get-resource-limits-map) ["quota" pool-name])]
142+
(merge
143+
all-quota-resource-types
144+
(or (get user->quota default-user) {})
145+
(or (get user->quota user) {}))))
146+
147+
(defn get-share-pool-user
148+
[pool-name user]
141149
(assert pool-name)
142-
(merge
143-
all-quota-resource-types
144-
(or (get-quota-dict-pool-user-from-cache cache pool-name default-user) {})
145-
(or (get-quota-dict-pool-user-from-cache cache pool-name user) {})))
150+
(let [user->quota (get-in (get-resource-limits-map) ["share" pool-name])]
151+
(merge
152+
all-mesos-resource-types
153+
(or (get user->quota default-user) {})
154+
(or (get user->quota user) {}))))
155+
156+
(defn retract-resource-limit!
157+
"Retract a resource limit."
158+
[resource-limit-type pool user reason]
159+
; TODO: mark reason when we are not just deleting
160+
; FIXME: why don't we need COMMIT?
161+
(sql/execute! (pg/pg-db) ["DELETE FROM resource_limits WHERE resource_limit_type = ? AND pool_name = ? and user_name = ?;" resource-limit-type pool user]))
146162

147163
(defn retract-quota!
148164
"Retract quota."
149-
[pool user]
150-
(sql/execute! (pg/pg-db) ["DELETE FROM resource_limits WHERE resource_limit_type = 'quota' AND pool_name = ? and user_name = ?;" pool user]))
165+
[pool user reason]
166+
(retract-resource-limit! "quota" pool user reason))
151167

152-
(defn quota-key-to-sql-key
153-
"Convert from quota keyword notation to sql resource_type"
168+
(defn retract-share!
169+
"Retract share."
170+
[pool user reason]
171+
(retract-resource-limit! "share" pool user reason))
172+
173+
(defn resource-key-to-sql-key
174+
"Convert from resource keyword notation to sql resource_type"
154175
[keyword]
155176
(case keyword
156177
:count "count"
157178
:cpus "cpus"
158179
:mem "mem"
159180
:gpus "gpus"
160181
:launch-rate-saved "launch-rate-saved"
161-
:launch-rate-per-minute "launch-rate-per-minute" ))
182+
:launch-rate-per-minute "launch-rate-per-minute"))
162183

163-
(defn set-quota!
164-
[pool user kvs reason]
184+
(defn set-resource-limit!
185+
[resource-type pool user kvs reason]
165186
(doseq [[key val] kvs]
166187
; This is a bit overcomplicated to handle upsert logic, Insert or upate.
167-
(sql/execute! (pg/pg-db) ["insert into resource_limits as r (resource_limit_type,pool_name,user_name,resource_name,amount,reason) VALUES (?,?,?,?,?,?) ON CONFLICT (resource_limit_type,pool_name,user_name,resource_name) DO UPDATE set amount=excluded.amount, reason=excluded.reason where r.resource_limit_type = excluded.resource_limit_type AND r.pool_name = excluded.pool_name and r.user_name=excluded.user_name and r.resource_name = excluded.resource_name;" "quota" pool user (quota-key-to-sql-key key) val reason]))
188+
(sql/execute! (pg/pg-db) ["insert into resource_limits as r (resource_limit_type,pool_name,user_name,resource_name,amount,reason) VALUES (?,?,?,?,?,?) ON CONFLICT (resource_limit_type,pool_name,user_name,resource_name) DO UPDATE set amount=excluded.amount, reason=excluded.reason where r.resource_limit_type = excluded.resource_limit_type AND r.pool_name = excluded.pool_name and r.user_name=excluded.user_name and r.resource_name = excluded.resource_name;" resource-type pool user (resource-key-to-sql-key key) val reason]))
189+
; FIXME: does COMMIT do anything?
168190
(sql/execute! (pg/pg-db) ["COMMIT;"]))
169191

192+
(defn set-quota!
193+
[pool user kvs reason]
194+
(set-resource-limit! "quota" pool user kvs reason))
195+
196+
(defn set-share!
197+
[pool user kvs reason]
198+
(set-resource-limit! "share" pool user kvs reason))
199+
170200
(defn truncate!
171-
"Reset the quota table between unit tests"
201+
"Reset the resource_limits table between unit tests"
172202
[]
173-
(sql/execute! (pg/pg-db) ["delete from resource_limits where true;"]))
203+
(sql/execute! (pg/pg-db) ["delete from resource_limits where true;"]))

0 commit comments

Comments
 (0)