Skip to content

Commit 04082c0

Browse files
committed
Stream tarball downloads and file uploads to avoid memory buffering
- Stream tarball downloads to disk via Store.get_to_file instead of loading full binary into memory - Stream file uploads via put_file! to avoid File.read! on every doc - Add TmpDir GenServer for process-based temp file cleanup - Add put_file! to Store behaviour and backends (GS, S3, Local) - Fix flaky debouncer test grace time - Fix unsafe paths test to assert on error logs
1 parent 060c8af commit 04082c0

18 files changed

Lines changed: 450 additions & 169 deletions

lib/hexdocs/application.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ defmodule Hexdocs.Application do
1212
Logger.info("Running Cowboy with #{inspect(cowboy_options)}")
1313

1414
children = [
15+
Hexdocs.TmpDir,
1516
{Task.Supervisor, name: Hexdocs.Tasks},
1617
{Hexdocs.Debouncer, name: Hexdocs.Debouncer},
1718
goth_spec(),

lib/hexdocs/bucket.ex

Lines changed: 56 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ defmodule Hexdocs.Bucket do
2323
meta: [{"surrogate-key", key}]
2424
]
2525

26-
Logger.info("Uploading docs_public_bucket #{path}")
27-
2826
case Hexdocs.Store.put(:docs_public_bucket, path, content, opts) do
2927
{:ok, 200, _headers, _body} ->
3028
:ok
@@ -39,10 +37,10 @@ defmodule Hexdocs.Bucket do
3937
purge([key])
4038
end
4139

42-
def upload(repository, package, version, all_versions, files) do
40+
def upload(repository, package, version, all_versions, dir, files) do
4341
latest_version? = Hexdocs.Utils.latest_version?(package, version, all_versions)
4442
upload_type = upload_type(latest_version?)
45-
upload_files = list_upload_files(repository, package, version, files, upload_type)
43+
upload_files = list_upload_files(repository, package, version, dir, files, upload_type)
4644
paths = MapSet.new(upload_files, &elem(&1, 0))
4745

4846
upload_new_files(upload_files)
@@ -53,7 +51,7 @@ defmodule Hexdocs.Bucket do
5351
{:docs_config, repository, package},
5452
@gcs_put_debounce,
5553
fn ->
56-
docs_config = build_docs_config(repository, package, version, all_versions, files)
54+
docs_config = build_docs_config(repository, package, version, all_versions, dir, files)
5755
upload_new_files([docs_config])
5856
end
5957
)
@@ -63,17 +61,24 @@ defmodule Hexdocs.Bucket do
6361
end
6462

6563
# For Elixir and Hex we use the docs_config.js included in the tarball
66-
defp build_docs_config(repository, package, _version, _all_versions, files)
64+
defp build_docs_config(repository, package, _version, _all_versions, dir, files)
6765
when package in @special_package_names do
6866
path = "docs_config.js"
6967
unversioned_path = repository_path(repository, Path.join([package, path]))
7068
cdn_key = docs_config_cdn_key(repository, package)
71-
{"docs_config.js", data} = List.keyfind(files, "docs_config.js", 0)
69+
70+
data =
71+
if "docs_config.js" in files do
72+
File.read!(Path.join(dir, "docs_config.js"))
73+
else
74+
""
75+
end
76+
7277
{unversioned_path, cdn_key, data, public?(repository)}
7378
end
7479

7580
# TODO: don't include retired versions?
76-
defp build_docs_config(repository, package, version, all_versions, _files) do
81+
defp build_docs_config(repository, package, version, all_versions, _dir, _files) do
7782
versions =
7883
if version in all_versions do
7984
all_versions
@@ -135,22 +140,32 @@ defmodule Hexdocs.Bucket do
135140
cond do
136141
deleting_latest_version? && new_latest_version ->
137142
key = build_key(repository, package, new_latest_version)
138-
body = Hexdocs.Store.get(:repo_bucket, key)
139-
140-
case Hexdocs.Tar.unpack(body, repository: repository, package: package, version: version) do
141-
{:ok, files} ->
142-
upload_files =
143-
list_upload_files(repository, package, new_latest_version, files, :both)
144-
145-
paths = MapSet.new(upload_files, &elem(&1, 0))
146-
update_versions = [version, new_latest_version]
147-
148-
upload_new_files(upload_files)
149-
delete_old_docs(repository, package, update_versions, paths, :both)
150-
purge_hexdocs_cache(repository, package, update_versions, :both)
151-
152-
{:error, reason} ->
153-
Logger.error("Failed unpack #{repository}/#{package} #{version}: #{reason}")
143+
tarball_path = Hexdocs.TmpDir.tmp_file("docs-tarball")
144+
145+
case Hexdocs.Store.get_to_file(:repo_bucket, key, tarball_path) do
146+
:ok ->
147+
case Hexdocs.Tar.unpack_to_dir({:file, tarball_path},
148+
repository: repository,
149+
package: package,
150+
version: version
151+
) do
152+
{:ok, dir, files} ->
153+
upload_files =
154+
list_upload_files(repository, package, new_latest_version, dir, files, :both)
155+
156+
paths = MapSet.new(upload_files, &elem(&1, 0))
157+
update_versions = [version, new_latest_version]
158+
159+
upload_new_files(upload_files)
160+
delete_old_docs(repository, package, update_versions, paths, :both)
161+
purge_hexdocs_cache(repository, package, update_versions, :both)
162+
163+
{:error, reason} ->
164+
Logger.error("Failed unpack #{repository}/#{package} #{version}: #{reason}")
165+
end
166+
167+
nil ->
168+
Logger.error("Failed to get tarball #{repository}/#{package} #{new_latest_version}")
154169
end
155170

156171
deleting_latest_version? ->
@@ -171,21 +186,23 @@ defmodule Hexdocs.Bucket do
171186
Path.join(["repos", repository, "docs", "#{package}-#{version}.tar.gz"])
172187
end
173188

174-
defp list_upload_files(repository, package, version, files, upload_type) do
189+
defp list_upload_files(repository, package, version, dir, files, upload_type) do
175190
Enum.flat_map(files, fn
176-
{"docs_config.js", _data} ->
191+
"docs_config.js" ->
177192
[]
178193

179-
{path, data} ->
194+
path ->
195+
source = Path.join(dir, path)
196+
180197
versioned_path =
181198
repository_path(repository, Path.join([package, to_string(version), path]))
182199

183200
cdn_key = docspage_versioned_cdn_key(repository, package, version)
184-
versioned = {versioned_path, cdn_key, data, public?(repository)}
201+
versioned = {versioned_path, cdn_key, {:file, source}, public?(repository)}
185202

186203
unversioned_path = repository_path(repository, Path.join([package, path]))
187204
cdn_key = docspage_unversioned_cdn_key(repository, package)
188-
unversioned = {unversioned_path, cdn_key, data, public?(repository)}
205+
unversioned = {unversioned_path, cdn_key, {:file, source}, public?(repository)}
189206

190207
case upload_type do
191208
:both -> [versioned, unversioned]
@@ -210,8 +227,12 @@ defmodule Hexdocs.Bucket do
210227
{bucket(public?), store_key, data, opts}
211228
end)
212229
|> Task.async_stream(
213-
fn {bucket, key, data, opts} ->
214-
put(bucket, key, data, opts)
230+
fn
231+
{bucket, key, {:file, source}, opts} ->
232+
put_file(bucket, key, source, opts)
233+
234+
{bucket, key, data, opts} ->
235+
put(bucket, key, data, opts)
215236
end,
216237
max_concurrency: 10,
217238
timeout: 60_000
@@ -239,10 +260,6 @@ defmodule Hexdocs.Bucket do
239260
&delete_key?(&1, paths, repository, package, versions, upload_type)
240261
)
241262

242-
Enum.each(keys_to_delete, fn key ->
243-
Logger.info("Deleting #{bucket} #{key}")
244-
end)
245-
246263
Hexdocs.Store.delete_many(bucket, keys_to_delete)
247264
end
248265

@@ -332,7 +349,10 @@ defmodule Hexdocs.Bucket do
332349
end
333350

334351
defp put(bucket, key, data, opts) do
335-
Logger.info("Uploading #{bucket} #{key}")
336352
Hexdocs.Store.put!(bucket, key, data, opts)
337353
end
354+
355+
defp put_file(bucket, key, source, opts) do
356+
Hexdocs.Store.put_file!(bucket, key, source, opts)
357+
end
338358
end

lib/hexdocs/http.ex

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,24 @@ defmodule Hexdocs.HTTP do
7070
end
7171
end
7272

73+
def put_file(url, headers, path) do
74+
body = File.stream!(path, 65_536)
75+
76+
case Req.put(url,
77+
headers: headers,
78+
body: body,
79+
retry: false,
80+
decode_body: false,
81+
receive_timeout: @receive_timeout
82+
) do
83+
{:ok, response} ->
84+
{:ok, response.status, normalize_headers(response.headers), response.body}
85+
86+
{:error, reason} ->
87+
{:error, reason}
88+
end
89+
end
90+
7391
def post(url, headers, body, opts \\ []) do
7492
timeout = Keyword.get(opts, :receive_timeout, @receive_timeout)
7593

lib/hexdocs/queue.ex

Lines changed: 59 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -58,16 +58,26 @@ defmodule Hexdocs.Queue do
5858

5959
case key_components(key) do
6060
{:ok, repository, package, version} ->
61-
body = Hexdocs.Store.get(:repo_bucket, key)
62-
63-
case Hexdocs.Tar.unpack(body, repository: repository, package: package, version: version) do
64-
{:ok, files} ->
65-
update_index_sitemap(repository, key)
66-
update_package_sitemap(repository, key, package, files)
67-
Logger.info("#{key}: done")
68-
69-
{:error, reason} ->
70-
Logger.error("Failed unpack #{repository}/#{package} #{version}: #{reason}")
61+
tarball_path = Hexdocs.TmpDir.tmp_file("docs-tarball")
62+
63+
case Hexdocs.Store.get_to_file(:repo_bucket, key, tarball_path) do
64+
:ok ->
65+
case Hexdocs.Tar.unpack_to_dir({:file, tarball_path},
66+
repository: repository,
67+
package: package,
68+
version: version
69+
) do
70+
{:ok, _dir, files} ->
71+
update_index_sitemap(repository, key)
72+
update_package_sitemap(repository, key, package, files)
73+
Logger.info("#{key}: done")
74+
75+
{:error, reason} ->
76+
Logger.error("Failed unpack #{repository}/#{package} #{version}: #{reason}")
77+
end
78+
79+
nil ->
80+
Logger.error("#{key}: package not found in store")
7181
end
7282

7383
:error ->
@@ -104,26 +114,28 @@ defmodule Hexdocs.Queue do
104114
version: version
105115
})
106116

107-
body = Hexdocs.Store.get(:repo_bucket, key)
117+
tarball_path = Hexdocs.TmpDir.tmp_file("docs-tarball")
108118

109-
if body do
110-
case type do
111-
:upload ->
112-
process_upload(key, repository, package, version, body, start)
119+
case Hexdocs.Store.get_to_file(:repo_bucket, key, tarball_path) do
120+
:ok ->
121+
case type do
122+
:upload ->
123+
process_upload(key, repository, package, version, {:file, tarball_path}, start)
113124

114-
:search ->
115-
process_search(key, repository, package, version, body, start)
116-
end
117-
else
118-
Logger.error("#{log_prefix} #{key}: package not found in store")
125+
:search ->
126+
process_search(key, repository, package, version, {:file, tarball_path}, start)
127+
end
128+
129+
nil ->
130+
Logger.error("#{log_prefix} #{key}: package not found in store")
119131
end
120132

121133
:error ->
122134
Logger.info("#{key}: skip")
123135
end
124136
end
125137

126-
defp process_upload(key, repository, package, version, body, start) do
138+
defp process_upload(key, repository, package, version, input, start) do
127139
{version, all_versions} =
128140
if package in @special_package_names do
129141
version =
@@ -144,15 +156,20 @@ defmodule Hexdocs.Queue do
144156
{version, all_versions}
145157
end
146158

147-
case Hexdocs.Tar.unpack(body, repository: repository, package: package, version: version) do
148-
{:ok, files} ->
149-
files = rewrite_files(files)
159+
case Hexdocs.Tar.unpack_to_dir(input,
160+
repository: repository,
161+
package: package,
162+
version: version
163+
) do
164+
{:ok, dir, files} ->
165+
rewrite_files(dir, files)
150166

151167
Hexdocs.Bucket.upload(
152168
repository,
153169
package,
154170
version,
155171
all_versions,
172+
dir,
156173
files
157174
)
158175

@@ -170,7 +187,7 @@ defmodule Hexdocs.Queue do
170187
end
171188
end
172189

173-
defp process_search(key, repository, package, version, body, start) do
190+
defp process_search(key, repository, package, version, input, start) do
174191
if repository != "hexpm" do
175192
Logger.warning("SKIPPING SEARCH INDEX #{key} (repository is not hexpm)")
176193
else
@@ -180,9 +197,9 @@ defmodule Hexdocs.Queue do
180197
:error when package in @special_package_names -> version
181198
end
182199

183-
case Hexdocs.Tar.unpack(body, package: package, version: version) do
184-
{:ok, files} ->
185-
update_search_index(key, package, version, files)
200+
case Hexdocs.Tar.unpack_to_dir(input, package: package, version: version) do
201+
{:ok, dir, files} ->
202+
update_search_index(key, package, version, dir, files)
186203
elapsed = System.os_time(:millisecond) - start
187204
Logger.info("FINISHED INDEXING DOCS #{key} #{elapsed}ms")
188205

@@ -279,9 +296,12 @@ defmodule Hexdocs.Queue do
279296
{package, version}
280297
end
281298

282-
defp rewrite_files(files) do
283-
Enum.map(files, fn {path, content} ->
284-
{path, Hexdocs.FileRewriter.run(path, content)}
299+
defp rewrite_files(dir, files) do
300+
Enum.each(files, fn path ->
301+
full_path = Path.join(dir, path)
302+
content = File.read!(full_path)
303+
rewritten = Hexdocs.FileRewriter.run(path, content)
304+
File.write!(full_path, rewritten)
285305
end)
286306
end
287307

@@ -314,7 +334,7 @@ defmodule Hexdocs.Queue do
314334
defp update_package_sitemap("hexpm", key, package, files) do
315335
Logger.info("UPDATING PACKAGE SITEMAP #{key}")
316336

317-
pages = for {path, _content} <- files, Path.extname(path) == ".html", do: path
337+
pages = for path <- files, Path.extname(path) == ".html", do: path
318338
body = Hexdocs.PackageSitemap.render(package, pages, DateTime.utc_now())
319339
Hexdocs.Bucket.upload_package_sitemap(package, body)
320340

@@ -344,8 +364,13 @@ defmodule Hexdocs.Queue do
344364
:ok
345365
end
346366

347-
defp update_search_index(key, package, version, files) do
348-
case Hexdocs.Search.find_search_items(package, version, files) do
367+
defp update_search_index(key, package, version, dir, files) do
368+
files_with_content =
369+
Enum.map(files, fn path ->
370+
{path, File.read!(Path.join(dir, path))}
371+
end)
372+
373+
case Hexdocs.Search.find_search_items(package, version, files_with_content) do
349374
{proglang, items} ->
350375
Logger.info("DELETING SEARCH INDEX #{key}")
351376
Hexdocs.Search.delete(package, version)

lib/hexdocs/store/gs.ex

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,18 @@ defmodule Hexdocs.Store.GS do
5252
end
5353

5454
def put!(bucket, key, blob, opts) do
55+
upload(bucket, key, opts, fn url, headers ->
56+
Hexdocs.HTTP.put(url, headers, blob)
57+
end)
58+
end
59+
60+
def put_file!(bucket, key, source, opts) do
61+
upload(bucket, key, opts, fn url, headers ->
62+
Hexdocs.HTTP.put_file(url, headers, source)
63+
end)
64+
end
65+
66+
defp upload(bucket, key, opts, fun) do
5567
headers =
5668
headers() ++
5769
meta_headers(Keyword.fetch!(opts, :meta)) ++
@@ -64,7 +76,7 @@ defmodule Hexdocs.Store.GS do
6476
headers = filter_nil_values(headers)
6577

6678
{:ok, 200, _headers, _body} =
67-
Hexdocs.HTTP.retry("gs", url, fn -> Hexdocs.HTTP.put(url, headers, blob) end)
79+
Hexdocs.HTTP.retry("gs", url, fn -> fun.(url, headers) end)
6880

6981
:ok
7082
end

0 commit comments

Comments
 (0)