-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdepsdev.py
More file actions
105 lines (93 loc) · 4.2 KB
/
Copy pathdepsdev.py
File metadata and controls
105 lines (93 loc) · 4.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from __future__ import annotations
import json
from typing import Any
import httpx
from config import get_http_client
DEPSDEV_API = "https://api.deps.dev/v3"
async def get_resolved_dependencies(system: str, package: str, version: str) -> dict:
try:
url = f"{DEPSDEV_API}/systems/{system}/packages/{package}/versions/{version}:dependencies"
c = get_http_client()
r = await c.get(url, timeout=15)
if r.status_code != 200:
return {"success": False, "error": f"deps.dev: HTTP {r.status_code}"}
data = r.json()
nodes: list[dict] = []
for n in (data.get("nodes", []) or []):
vk = n.get("versionKey", {}) or {}
pk = n.get("packageKey", {}) or {}
nodes.append({
"package": vk.get("name", pk.get("name", "")),
"version": vk.get("version", ""),
"system": vk.get("system", pk.get("system", system)),
"errors": n.get("errors", []),
"relation": n.get("relation", ""),
})
edges: list[dict] = []
for e in (data.get("edges", []) or []):
edges.append({
"fromNode": e.get("fromNode", 0),
"toNode": e.get("toNode", 0),
"requirement": e.get("requirement", ""),
})
return {
"success": True,
"nodes": nodes,
"edges": edges,
"total_dependencies": len(nodes),
}
except (httpx.HTTPError, ValueError, KeyError, json.JSONDecodeError) as e:
return {"success": False, "error": f"deps.dev: {e}"}
async def get_package_info(system: str, package: str) -> dict:
try:
url = f"{DEPSDEV_API}/systems/{system}/packages/{package}"
c = get_http_client()
r = await c.get(url, timeout=10)
if r.status_code != 200:
return {"success": False, "error": f"deps.dev: HTTP {r.status_code}"}
data = r.json()
versions = []
for v in (data.get("versions", []) or [])[:50]:
vk = v.get("versionKey", {})
versions.append({
"version": vk.get("version", ""),
"publishedAt": v.get("publishedAt", ""),
"isDefault": v.get("isDefault", False),
"isDeprecated": v.get("isDeprecated", False),
})
advisory_keys = data.get("advisoryKeys", [])
licenses = data.get("licenses", [])
return {
"success": True,
"package": package,
"system": system,
"versions": versions,
"total_versions": len(versions),
"advisory_keys": advisory_keys[:20],
"licenses": licenses[:5],
}
except (httpx.HTTPError, ValueError, KeyError, json.JSONDecodeError) as e:
return {"success": False, "error": f"deps.dev: {e}"}
async def get_advisory(advisory_id: str) -> dict:
try:
c = get_http_client()
r = await c.get(f"{DEPSDEV_API.replace('/v3','/v3alpha')}/advisories/{advisory_id}", timeout=10)
if r.status_code != 200: return {"success": False, "error": f"advisory: {r.status_code}"}
d = r.json()
return {"success": True, "id": d.get("id",""), "summary": d.get("summary",""),
"aliases": d.get("aliases",[]), "severity": d.get("severity",""),
"affected": d.get("affected",[]), "references": d.get("references",[])}
except (httpx.HTTPError, ValueError, json.JSONDecodeError) as e:
return {"success": False, "error": f"advisory: {e}"}
async def query_by_hash(hash_type: str, hash_value: str) -> dict:
try:
c = get_http_client()
r = await c.get(f"{DEPSDEV_API.replace('/v3','/v3alpha')}/query",
params={"hash.type": hash_type, "hash.value": hash_value}, timeout=10)
if r.status_code != 200: return {"success": False, "error": f"query: {r.status_code}"}
d = r.json()
versions = [{"packageKey": v.get("packageKey",{}), "version": v.get("version","")}
for v in (d.get("versions",[]) or [])]
return {"success": True, "versions": versions}
except (httpx.HTTPError, ValueError, json.JSONDecodeError) as e:
return {"success": False, "error": f"query: {e}"}