-
Notifications
You must be signed in to change notification settings - Fork 32
Expand file tree
/
Copy pathjson_utils.py
More file actions
272 lines (215 loc) · 9.34 KB
/
json_utils.py
File metadata and controls
272 lines (215 loc) · 9.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
import ast
import json
import sys
from typing import Any, ForwardRef, Union, get_args, get_origin
from jsonpath_ng import parse # type: ignore[import-untyped]
from pydantic import BaseModel, RootModel
def get_json_paths_by_type(model: type[BaseModel], type_name: str) -> list[str]:
"""Get JSONPath expressions for all fields that reference a specific type.
This function recursively traverses nested Pydantic models to find all paths
that lead to fields of the specified type.
Args:
model: A Pydantic model class
type_name: The name of the type to search for (e.g., "Job_attachment")
Returns:
List of JSONPath expressions using standard JSONPath syntax.
For array fields, uses [*] to indicate all array elements.
Example:
>>> schema = {
... "type": "object",
... "properties": {
... "attachment": {"$ref": "#/definitions/job-attachment"},
... "attachments": {
... "type": "array",
... "items": {"$ref": "#/definitions/job-attachment"}
... }
... },
... "definitions": {
... "job-attachment": {"type": "object", "properties": {"id": {"type": "string"}}}
... }
... }
>>> model = transform(schema)
>>> _get_json_paths_by_type(model, "Job_attachment")
['$.attachment', '$.attachments[*]']
"""
def _recursive_search(
current_model: type[BaseModel], current_path: str
) -> list[str]:
"""Recursively search for fields of the target type."""
json_paths = []
target_type = _get_target_type(current_model, type_name)
matches_type = _create_type_matcher(type_name, target_type)
for field_name, field_info in current_model.model_fields.items():
annotation = field_info.annotation
json_key = _json_key(field_name, field_info)
if current_path:
field_path = f"{current_path}.{json_key}"
else:
field_path = f"$.{json_key}"
annotation = _unwrap_optional(annotation)
origin = get_origin(annotation)
if matches_type(annotation):
json_paths.append(field_path)
continue
if origin is list:
inner_type, suffix = _unwrap_lists(annotation)
inner_path = f"{field_path}{suffix}"
if matches_type(inner_type):
json_paths.append(inner_path)
continue
if _is_pydantic_model(inner_type):
nested_paths = _recursive_search(inner_type, inner_path)
json_paths.extend(nested_paths)
continue
if _is_pydantic_model(annotation):
nested_paths = _recursive_search(annotation, field_path)
json_paths.extend(nested_paths)
return json_paths
# RootModel serializes without the "root" wrapper — e.g. RootModel[list[X]]
# dumps as [...], not {"root": [...]}. Iterating model_fields directly would
# produce wrong paths like "$.root.field". Instead we peel off the RootModel
# envelope (and any Optional/list layers) so _recursive_search only ever sees
# a plain BaseModel with correct JSONPath prefixes (e.g. "$[*].field").
if issubclass(model, RootModel):
inner = _unwrap_optional(model.model_fields["root"].annotation)
inner, suffix = _unwrap_lists(inner)
# Primitive or non-model root types can't contain nested typed fields.
if not _is_pydantic_model(inner):
return []
return _recursive_search(inner, f"${suffix}" if suffix else "")
return _recursive_search(model, "")
def extract_values_by_paths(
obj: dict[str, Any] | BaseModel, json_paths: list[str]
) -> list[Any]:
"""Extract values from an object using JSONPath expressions.
Args:
obj: The object (dict or Pydantic model) to extract values from
json_paths: List of JSONPath expressions. **Paths are assumed to be disjoint**
(non-overlapping). If paths overlap, duplicate values will be returned.
Returns:
List of all extracted values (flattened)
Example:
>>> obj = {
... "attachment": {"id": "123"},
... "attachments": [{"id": "456"}, {"id": "789"}]
... }
>>> paths = ['$.attachment', '$.attachments[*]']
>>> _extract_values_by_paths(obj, paths)
[{'id': '123'}, {'id': '456'}, {'id': '789'}]
"""
data = obj.model_dump() if isinstance(obj, BaseModel) else obj
results = []
for json_path in json_paths:
expr = parse(json_path)
matches = expr.find(data)
results.extend([match.value for match in matches])
return results
def _get_target_type(model: type[BaseModel], type_name: str) -> Any:
"""Get the target type from the model's module.
Args:
model: A Pydantic model class
type_name: The name of the type to search for
Returns:
The target type if found, None otherwise
"""
model_module = sys.modules.get(model.__module__)
if model_module and hasattr(model_module, type_name):
return getattr(model_module, type_name)
return None
def _create_type_matcher(type_name: str, target_type: Any) -> Any:
"""Create a function that checks if an annotation matches the target type.
Args:
type_name: The name of the type to match
target_type: The actual type object (can be None)
Returns:
A function that takes an annotation and returns True if it matches
"""
def matches_type(annotation: Any) -> bool:
"""Check if an annotation matches the target type name."""
if isinstance(annotation, ForwardRef):
return annotation.__forward_arg__ == type_name
if isinstance(annotation, str):
return annotation == type_name
if hasattr(annotation, "__name__") and annotation.__name__ == type_name:
return True
if target_type is not None and annotation is target_type:
return True
return False
return matches_type
def _unwrap_optional(annotation: Any) -> Any:
"""Unwrap Optional/Union types to get the underlying type.
Args:
annotation: The type annotation to unwrap
Returns:
The unwrapped type, or the original if not Optional/Union
"""
origin = get_origin(annotation)
if origin is Union:
args = get_args(annotation)
non_none_args = [arg for arg in args if arg is not type(None)]
if non_none_args:
return non_none_args[0]
return annotation
def _unwrap_lists(annotation: Any) -> tuple[Any, str]:
"""Unwrap nested list types, returning (inner_type, jsonpath_suffix).
Each list layer adds a "[*]" wildcard so the resulting suffix maps directly
to JSONPath: list[list[X]] → (X, "[*][*]").
"""
suffix = ""
while get_origin(annotation) is list:
args = get_args(annotation)
if not args:
break
annotation = args[0]
suffix += "[*]"
return annotation, suffix
def _json_key(field_name: str, field_info: Any) -> str:
"""Get the JSON property name for a field, accounting for aliases."""
return field_info.alias or field_name
def _is_pydantic_model(annotation: Any) -> bool:
return isinstance(annotation, type) and issubclass(annotation, BaseModel)
def _coerce_field(key: str, value: Any, schema: type[BaseModel] | None) -> Any:
"""Coerce a single field value, skipping str-typed fields when schema is available."""
if schema is None:
return coerce_json_strings(value)
field_info = schema.model_fields.get(key)
if field_info is None:
return coerce_json_strings(value)
annotation = _unwrap_optional(field_info.annotation)
if annotation is str:
return value
if _is_pydantic_model(annotation):
return coerce_json_strings(value, annotation)
if get_origin(annotation) is list:
item_args = get_args(annotation)
item_schema = None
if item_args and _is_pydantic_model(item_args[0]):
item_schema = item_args[0]
if isinstance(value, list):
return [coerce_json_strings(item, item_schema) for item in value]
return coerce_json_strings(value)
def coerce_json_strings(data: Any, schema: type[BaseModel] | None = None) -> Any:
"""Parse stringified dicts/lists back into Python objects.
LLMs sometimes serialize nested objects as strings instead of dicts,
either as JSON (double quotes) or Python repr (single quotes).
When a schema is provided, str-typed fields are left untouched.
"""
if isinstance(data, dict):
return {k: _coerce_field(k, v, schema) for k, v in data.items()}
if isinstance(data, list):
return [coerce_json_strings(item) for item in data]
if isinstance(data, str):
try:
parsed = json.loads(data)
if isinstance(parsed, (dict, list)):
return parsed
except (json.JSONDecodeError, TypeError):
pass
# LLMs sometimes emit Python repr (single quotes) instead of JSON
try:
parsed = ast.literal_eval(data)
if isinstance(parsed, (dict, list)):
return parsed
except (ValueError, SyntaxError):
pass
return data