-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathjira_pr_check.py
More file actions
executable file
·381 lines (328 loc) · 14.6 KB
/
jira_pr_check.py
File metadata and controls
executable file
·381 lines (328 loc) · 14.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
#!/bin/env python3.11
import argparse
import os
import re
import subprocess
import sys
from jira import JIRA
from release_config import jira_field_map, release_map
CVE_PATTERN = r"CVE-\d{4}-\d{4,7}"
# Reverse lookup: field name -> custom field ID
jira_field_reverse = {v: k for k, v in jira_field_map.items()}
def restore_git_branch(original_branch, kernel_src_tree):
"""Restore the original git branch in case of errors."""
try:
subprocess.run(
["git", "checkout", original_branch], cwd=kernel_src_tree, check=True, capture_output=True, text=True
)
except subprocess.CalledProcessError as e:
print(f"ERROR: Failed to restore original branch {original_branch}: {e.stderr}")
sys.exit(1)
def main():
parser = argparse.ArgumentParser(
description="Validate PR Commits against JIRA VULN Tickets",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
# This is a necessary requirement at the moment to allow multiple different JIRA creds from the same user
parser.add_argument(
"--jira-url",
required=False,
help="JIRA server URL.",
)
parser.add_argument(
"--jira-user",
required=False,
help="JIRA user email",
)
parser.add_argument(
"--jira-key",
required=False,
help="JIRA API Key (or set JIRA_API_TOKEN environment variable)",
)
# End JIRA creds section
parser.add_argument(
"--kernel-src-tree",
required=True,
help="Path to kernel source tree repository",
)
parser.add_argument(
"--merge-target",
required=True,
help="Merge target branch",
)
parser.add_argument(
"--pr-branch",
required=True,
help="PR branch to checkout",
)
args = parser.parse_args()
# Verify kernel source tree path exists
if not os.path.isdir(args.kernel_src_tree):
print(f"ERROR: Kernel source tree path does not exist: {args.kernel_src_tree}")
sys.exit(1)
jira_url = args.jira_url or os.environ.get("JIRA_URL")
jira_user = args.jira_user or os.environ.get("JIRA_API_USER")
jira_key = args.jira_key or os.environ.get("JIRA_API_TOKEN")
if not all([jira_url, jira_user, jira_key]):
print("ERROR: JIRA credentials not provided. Set via --jira-* args or environment variables.")
sys.exit(1)
# Connect to JIRA
try:
jira = JIRA(server=jira_url, basic_auth=(jira_user, jira_key))
except Exception as e:
print(f"ERROR: Failed to connect to JIRA: {e}")
sys.exit(1)
original_branch = None
try:
# Get current branch to restore later
result = subprocess.run(
["git", "branch", "--show-current"], cwd=args.kernel_src_tree, check=True, capture_output=True, text=True
)
original_branch = result.stdout.strip()
except subprocess.CalledProcessError as e:
print(f"ERROR: Failed to get current git branch: {e.stderr}")
sys.exit(1)
# Checkout the merge target branch first to ensure it exists
try:
subprocess.run(
["git", "checkout", args.merge_target], cwd=args.kernel_src_tree, check=True, capture_output=True, text=True
)
except subprocess.CalledProcessError as e:
print(f"ERROR: Failed to checkout merge target branch {args.merge_target}: {e.stderr}")
# Restore original branch if needed
restore_git_branch(original_branch, kernel_src_tree=args.kernel_src_tree)
sys.exit(1)
# Checkout the PR branch
try:
subprocess.run(
["git", "checkout", args.pr_branch], cwd=args.kernel_src_tree, check=True, capture_output=True, text=True
)
except subprocess.CalledProcessError as e:
print(f"ERROR: Failed to checkout PR branch {args.pr_branch}: {e.stderr}")
restore_git_branch(original_branch, kernel_src_tree=args.kernel_src_tree)
sys.exit(1)
# Get commits from merge_target to PR branch
try:
result = subprocess.run(
["git", "log", "--format=%H", f"{args.merge_target}..{args.pr_branch}"],
cwd=args.kernel_src_tree,
check=True,
capture_output=True,
text=True,
)
except subprocess.CalledProcessError as e:
print(f"ERROR: failed to get commits: {e.stderr}")
sys.exit(1)
commit_shas = result.stdout.strip().split("\n") if result.stdout.strip() else []
# Parse each commit and extract header
commits_data = []
for sha in commit_shas:
if not sha:
continue
# Get full commit message
try:
result = subprocess.run(
["git", "show", "--format=%B", "--no-patch", sha],
cwd=args.kernel_src_tree,
check=True,
capture_output=True,
text=True,
)
except subprocess.CalledProcessError as e:
print(f"ERROR: Failed to get commits: {e.stderr}")
sys.exit(1)
commit_msg = result.stdout.strip()
lines = commit_msg.split("\n")
# Extract summary line (first line)
summary = lines[0] if lines else ""
# Extract header (start after first blank line, end at next blank line)
header_lines = []
in_header = False
vuln_tickets = []
commit_cves = []
for i, line in enumerate(lines):
if i == 0: # Skip summary line
continue
if not in_header and line.strip() == "": # First blank line, start of header
in_header = True
continue
if in_header and line.strip() == "": # Second blank line, end of header
break
if in_header:
header_lines.append(line)
stripped = line.strip()
# Check for jira line with VULN
if stripped.lower().startswith("jira ") and "vuln-" in stripped.lower():
parts = stripped.split()
for part in parts[1:]: # Skip 'jira' keyword
if part.upper().startswith("VULN-"):
vuln_tickets.append(part.upper())
# Check for CVE line
# Assume format: "cve CVE-YYYY-NNNN", "cve-bf CVE-YYYY-NNNN", or "cve-pre CVE-YYYY-NNNN"
# There will only be one CVE per line, but possibly multiple CVEs listed
if stripped.lower().startswith(("cve ", "cve-bf ", "cve-pre ")):
parts = stripped.split()
for part in parts[1:]: # Skip 'cve'/'cve-bf'/'cve-pre' keyword/tag
# CVES always start with CVE-
if part.upper().startswith("CVE-"):
commit_cves.append(part.upper())
header = "\n".join(header_lines)
# Check VULN tickets against merge target
lts_match = None
issues_list = []
if vuln_tickets:
for vuln_id in vuln_tickets:
try:
issue = jira.issue(vuln_id)
# Get LTS product
lts_product_field = issue.get_field(jira_field_reverse["LTS Product"])
if hasattr(lts_product_field, "value"):
lts_product = lts_product_field.value
else:
lts_product = str(lts_product_field) if lts_product_field else None
# Get CVEs from JIRA ticket
ticket_cve_field = issue.get_field(jira_field_reverse["CVE"])
ticket_cves = set()
if ticket_cve_field:
# Handle different field types (string, list, or object)
if isinstance(ticket_cve_field, str):
# Split by common delimiters and extract CVE IDs
ticket_cves.update(re.findall(CVE_PATTERN, ticket_cve_field, re.IGNORECASE))
elif isinstance(ticket_cve_field, list):
for item in ticket_cve_field:
if isinstance(item, str):
ticket_cves.update(re.findall(CVE_PATTERN, item, re.IGNORECASE))
else:
# Try to convert to string
ticket_cves.update(re.findall(CVE_PATTERN, str(ticket_cve_field), re.IGNORECASE))
# Normalize to uppercase
ticket_cves = {cve.upper() for cve in ticket_cves}
# Compare CVEs between commit and JIRA ticket
if commit_cves and ticket_cves:
commit_cves_set = set(commit_cves)
if not commit_cves_set.issubset(ticket_cves):
missing_in_ticket = commit_cves_set - ticket_cves
issues_list.append(
{
"type": "error",
"vuln_id": vuln_id,
"message": f"CVE mismatch - Commit has {', '.join(sorted(missing_in_ticket))} but VULN ticket does not",
}
)
if not ticket_cves.issubset(commit_cves_set):
missing_in_commit = ticket_cves - commit_cves_set
issues_list.append(
{
"type": "warning",
"vuln_id": vuln_id,
"message": f"VULN ticket has {', '.join(sorted(missing_in_commit))} but commit does not",
}
)
elif commit_cves and not ticket_cves:
issues_list.append(
{
"type": "warning",
"vuln_id": vuln_id,
"message": f"Commit has CVEs {', '.join(sorted(commit_cves))} but VULN ticket has no CVEs",
}
)
elif ticket_cves and not commit_cves:
issues_list.append(
{
"type": "warning",
"vuln_id": vuln_id,
"message": f"VULN ticket has CVEs {', '.join(sorted(ticket_cves))} but commit has no CVEs",
}
)
# Check ticket status
status = issue.fields.status.name
if status != "In Progress":
issues_list.append(
{
"type": "error",
"vuln_id": vuln_id,
"message": f"Status is '{status}', expected 'In Progress'",
}
)
# Check if time is logged
time_spent = issue.fields.timespent
if not time_spent or time_spent == 0:
issues_list.append(
{
"type": "warning",
"vuln_id": vuln_id,
"message": "No time logged - please log time manually",
}
)
# Check if LTS product matches merge target branch
if lts_product and lts_product in release_map:
expected_branch = release_map[lts_product]["src_git_branch"]
if expected_branch == args.merge_target:
lts_match = True
else:
lts_match = False
issues_list.append(
{
"type": "error",
"vuln_id": vuln_id,
"message": f"LTS product '{lts_product}' expects branch '{expected_branch}', but merge target is '{args.merge_target}'",
}
)
else:
issues_list.append(
{
"type": "error",
"vuln_id": vuln_id,
"message": f"LTS product '{lts_product}' not found in release_map",
}
)
except Exception as e:
issues_list.append(
{"type": "error", "vuln_id": vuln_id, "message": f"Failed to retrieve ticket: {e}"}
)
commits_data.append(
{
"sha": sha,
"summary": summary,
"header": header,
"full_message": commit_msg,
"vuln_tickets": vuln_tickets,
"lts_match": lts_match,
"issues": issues_list,
}
)
# Print formatted results
print("\n## JIRA PR Check Results\n")
commits_with_issues = [c for c in commits_data if c["issues"]]
has_errors = False
if commits_with_issues:
print(f"**{len(commits_with_issues)} commit(s) with issues found:**\n")
for commit in commits_with_issues:
print(f"### Commit `{commit['sha'][:12]}`")
print(f"**Summary:** {commit['summary']}\n")
# Group issues by type
errors = [i for i in commit["issues"] if i["type"] == "error"]
warnings = [i for i in commit["issues"] if i["type"] == "warning"]
if errors:
has_errors = True
print("**❌ Errors:**")
for issue in errors:
print(f"- **{issue['vuln_id']}**: {issue['message']}")
print()
if warnings:
print("**⚠️ Warnings:**")
for issue in warnings:
print(f"- **{issue['vuln_id']}**: {issue['message']}")
print()
else:
print("✅ **No issues found!**\n")
print(f"\n---\n**Summary:** Checked {len(commits_data)} commit(s) total.")
# Exit with error code if any errors were found
if has_errors:
restore_git_branch(original_branch, kernel_src_tree=args.kernel_src_tree)
sys.exit(1)
# Restore original branch
restore_git_branch(original_branch, kernel_src_tree=args.kernel_src_tree)
return jira, commits_data
if __name__ == "__main__":
main()