forked from Tencent/CodeAnalysis
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathquickscan.py
More file actions
373 lines (331 loc) · 15.5 KB
/
quickscan.py
File metadata and controls
373 lines (331 loc) · 15.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
# -*- encoding: utf-8 -*-
# Copyright (c) 2021-2025 Tencent
#
# This source code file is made available under MIT License
# See LICENSE for details
# ==============================================================================
"""
tca quick scan
"""
import os
import json
import logging
from node.app import settings
from node.localtask.status import StatusType
from util.logutil import LogPrinter
from util.pathfilter import FilterPathUtil
from util.pathlib import PathMgr
from node.quicktask.params import JobParams
from util.errcode import E_NODE_TASK_CONFIG
from util.exceptions import NodeError
logger = logging.getLogger(__name__)
class QuickScan(object):
@staticmethod
def is_quick_scan():
"""
判断是否快速分析模式
:return:
"""
if os.getenv("TCA_QUICK_SCAN") == "True":
return True
else:
return False
@staticmethod
def get_task_json_files(config_dir, languages):
"""
从对应标签的配置目录中读取需要的任务参数json文件
目录结构:
|- label_name
|- tool_name.json 适用于所有语言的任务
|- language_name 语言类型名
|- tool_name.json 适用于指定语言的任务
"""
task_json_paths = []
for f_name in os.listdir(config_dir):
f_path = os.path.join(config_dir, f_name)
if os.path.isdir(f_path): # 如果是个目录,则目录名为语言类型,如果在需要扫描的语言列表中,则添加到list
if not languages or (languages and f_name in languages):
for json_name in os.listdir(f_path):
json_path = os.path.join(f_path, json_name)
task_json_paths.append(json_path)
else: # 如果是文件,表示适用于所有语言,直接添加到list
task_json_paths.append(f_path)
# logger.info(f">> task_json_paths: {task_json_paths}")
return task_json_paths
@staticmethod
def get_scan_tasks(languages, labels, input_params):
"""获取工具任务参数"""
tasks = []
# 使用标签指定工具规则类型(标签优先使用命令行参数,其次从 input_params 中获取)
if not labels:
labels = input_params.get("labels", [])
if labels:
for label in labels:
config_dir = os.path.join(settings.TOOL_BASE_DIR, f"quickscan/tasks/{label}")
if not os.path.exists(config_dir):
raise NodeError(code=E_NODE_TASK_CONFIG,
msg=f"config dir({config_dir}) not exist, please init tca.")
new_task_json_files = QuickScan.get_task_json_files(config_dir, languages)
new_tasks = []
for file_path in new_task_json_files:
if not file_path.endswith(".json"):
continue
with open(file_path, "r") as rf:
task_config = json.load(rf)
new_tasks.append(task_config)
# 不同的标签可能会包含同样的工具,这里通过merge合并相同工具
tasks = QuickScan.merge_tasks(tasks, new_tasks)
else:
raise NodeError(code=E_NODE_TASK_CONFIG, msg=f"no label param, please specify lable by --label.")
# LogPrinter.info(f"----->>> tasks: {[task['task_name'] for task in tasks]}")
return tasks
@staticmethod
def get_proj_config(scm_url, languages, labels, input_params):
# logger.info(f"===>>>> scm_url: {scm_url}")
# logger.info(f"===>>>> languages: {languages}")
JobParams.job_context["scm_url"] = scm_url
tasks = QuickScan.get_scan_tasks(languages, labels, input_params)
project_config = {
"job_context": JobParams.job_context,
"tasks": tasks
}
return project_config
@staticmethod
def merge_tasks(task_list, new_tasks):
"""将新task列表添加到原有task参数列表中,合并去重,一个工具只保留一个task"""
task_dict = {}
for task in task_list:
task_name = task["task_name"]
task_dict[task_name] = task
for task in new_tasks:
task_name = task["task_name"]
if task_name in task_dict:
new_rule_list = task["task_params"]["rule_list"]
rule_list = task_dict[task_name]["task_params"]["rule_list"]
task_dict[task_name]["task_params"]["rule_list"] = QuickScan.merge_rule_list(rule_list, new_rule_list)
new_rules = task["task_params"]["rules"]
rules = task_dict[task_name]["task_params"]["rules"]
rules.extend(new_rules)
task_dict[task_name]["task_params"]["rules"] = list(set(rules))
else:
task_dict[task_name] = task
return list(task_dict.values())
@staticmethod
def merge_rule_list(rule_list, new_rule_list):
"""合并rule list, 如果规则相同,使用新规则参数替换原有规则参数"""
rule_dict = {}
for rule_info in rule_list:
name = rule_info["name"]
rule_dict[name] = rule_info
for new_rule_info in new_rule_list:
name = new_rule_info["name"]
rule_dict[name] = new_rule_info
return list(rule_dict.values())
@staticmethod
def get_scan_files_in_dir(source_dir, sub_dir, exclude_regex):
dir_path = os.path.join(source_dir, sub_dir)
if not dir_path.startswith(source_dir):
LogPrinter.error(f"Wrong dir: {sub_dir}, skip!")
return []
file_paths = PathMgr().get_dir_files(dir_path)
relpos = len(source_dir) + 1
task_params = {
"path_filters": {
"inclusion": [],
"exclusion": exclude_regex
}
}
filtered_paths = FilterPathUtil(task_params).get_include_files(file_paths, relpos)
rel_paths = [path[relpos:] for path in filtered_paths]
# logger.info(f"{len(rel_paths)} files in {sub_dir} to scan: {rel_paths}")
return rel_paths
@staticmethod
def get_path_filters(input_params=None):
"""获取过滤路径"""
if not input_params:
input_params = QuickScan.get_input_params()
regex_include_paths = []
regex_exclude_paths = []
scan_paths = input_params.get("scan_path", [])
for path_info in scan_paths:
if path_info["type"] == "file":
regex_include_paths.append(path_info["path"])
elif path_info["type"] == "dir":
path = path_info["path"]
if path == "": # 空字符串的目录,表示代码根目录
include_path = ".*"
else:
include_path = path_info["path"].rstrip('/')
if not include_path:
LogPrinter.error(f"Skip wrong dir: {path_info['path']}")
continue
include_path = f"{include_path}/.*"
regex_include_paths.append(include_path)
exclude_regex = path_info.get("exclude_regex", [])
if exclude_regex:
regex_exclude_paths.extend(path_info["exclude_regex"])
return {
"inclusion": [],
"exclusion": [],
"re_inclusion": regex_include_paths,
"re_exclusion": regex_exclude_paths
}
@staticmethod
def get_input_params(scan_files=None):
"""
从json文件中获取输入参数
"""
if scan_files: # 如果命令行指定了扫描文件路径,直接使用
scan_rel_paths = []
for rel_path in scan_files:
scan_rel_paths.append({
"path": rel_path,
"type": "file"
})
return {
"scan_path": scan_rel_paths
}
params = {}
input_env_key = "TCA_QUICK_SCAN_INPUT"
input_file_env = os.getenv(input_env_key)
if input_file_env:
input_file_path = os.path.abspath(input_file_env)
if os.path.exists(input_file_path):
with open(input_file_path, "r") as rf:
params = json.load(rf)
else:
LogPrinter.warning(f"env {input_env_key} not exists, scan the source dir.")
# 未指定扫描路径,扫描整个代码仓库目录
params = {
"scan_path": [
{
"path": "", # 空字符串的目录,表示代码根目录
"type": "dir"
}
]
}
return params
@staticmethod
def get_scan_paths(params, source_dir):
"""获取需要扫描的文件列表(相对路径)"""
path_info_dict = {}
scan_rel_paths = []
scan_paths = params.get("scan_path", [])
for path_info in scan_paths:
if path_info["type"] == "file":
path = path_info.get("path")
lines = path_info.get("lines", [])
if path:
scan_rel_paths.append(path)
path_info_dict[path] = lines # 指定文件时,可以指定改动的代码行,供过滤结果使用
elif path_info["type"] == "dir":
exclude_regex = path_info.get("exclude_regex", [])
rel_paths = QuickScan.get_scan_files_in_dir(source_dir, path_info["path"], exclude_regex)
for path in rel_paths:
if path not in scan_rel_paths:
scan_rel_paths.append(path)
path_info_dict[path] = [] # 指定目录时,不支持指定改动代码行,为空,表示扫描整个文件
return scan_rel_paths, path_info_dict
@staticmethod
def get_result(proj_scan_succ, error_code, error_msg):
if proj_scan_succ:
status = StatusType.SUCCESS
text = "分析成功"
else:
status = StatusType.ERROR
text = "分析异常"
scan_result = {
"status": status,
"error_code": error_code,
"text": text,
"description": error_msg,
}
return scan_result
def generate_qucik_scan_report(self, scan_path_info, result, task_result_paths, task_rules):
"""
从各个taskdir中读取本地分析结果issues(暂时只读取代码检查结果)
代码度量结果(除了圈复杂度是在issue_dir目录,其他都在单独一个json里):
1. 本次分析的代码行数据:local_task_88/codeline_data.json
2. 重复代码结果(cpd工具):local_task_89/task_response.json
3. 圈复杂度结果(lizard工具):local_task_90/workdir/issue_dir/
4. 代码统计结果(codecount工具):local_task_91/task_response.json
代码检查结果(都在issue_dir目录):
1. 代码检查-customfilescan工具结果:local_task_92/workdir/issue_dir
2. 代码检查-regexscan工具结果:local_task_93/workdir/issue_dir
:return:
"""
rule_severity = ["fatal", "error", "warning", "info"]
report_path = os.getenv("TCA_QUICK_SCAN_OUTPUT", "tca_quick_scan_report.json")
report_path = os.path.abspath(report_path)
total_path_issues = {}
total_issues_cnt = 0
security_count = {
"fatal": 0,
"error": 0,
"warning": 0,
"info": 0
}
for task_name, result_path in task_result_paths.items():
if QuickScan.is_quick_scan():
# 输出debug log,方便问题定位
logger.debug("*" * 100)
task_log = os.path.join(os.path.dirname(result_path), "task.log")
with open(task_log, 'r') as rf:
logger.debug(rf.read())
logger.debug("*" * 100)
with open(result_path, "r") as rf:
response_data = json.load(rf)
file_issues = response_data["result"]
if file_issues:
# 收集当前task用到的规则信息
rule_info_dict = {}
rule_list = task_rules[task_name]
for rule_params in rule_list:
rule_info_dict[rule_params["name"]] = rule_params
for file_issue in file_issues:
path = file_issue.get("path")
path = path.replace(os.sep, '/') # windows分隔符先转换成unix分隔符,再比较
issues = file_issue.get("issues", [])
for issue in issues:
# 检查行号和列号,统一为int类型
issue["line"] = int(issue["line"])
issue["column"] = int(issue.get("column", 1))
if path not in scan_path_info: # 不在需要扫描的文件中,过滤掉
if issue["rule"] != "FilesNotFound": # 该规则是检查文件不存在,是个例外
logger.info(f"skip filtered path: {path}")
continue
if path in scan_path_info:
lines = scan_path_info[path]
if lines:
lineno = issue["line"]
if lineno not in lines: # 如果指定了代码行,过滤不在代码行列表中的问题
# logger.info(f"skip filtered code, line number: {lineno} ({path})")
continue
if issue.get("resolution") == 8: # 通过注释忽略的问题,过滤掉
LogPrinter.info(f"comment igonre: {path}, {issue}")
continue
# 补充规则信息
rule_info = rule_info_dict.get(issue["rule"], {})
rule_severity_int = rule_info.get("severity")
if rule_severity_int in [1, 2, 3, 4]:
issue["severity"] = rule_severity[rule_severity_int - 1]
else:
LogPrinter.warning(
f"Wrong rule severity, change to Error: \n{json.dumps(issue, indent=2)}")
issue["severity"] = "Error"
security_count[issue["severity"]] += 1
# issue["tool_name"] = rule_info.get("tool_name") # issue中已经有checker字段
issue["rule_title"] = rule_info.get("rule_title")
total_issues_cnt += 1
if path in total_path_issues:
total_path_issues[path].append(issue)
else:
total_path_issues[path] = [issue]
result["issue_count"] = total_issues_cnt
result["security_count"] = security_count
result["issue_detail"] = total_path_issues
if "code_line" in result:
result.pop("code_line")
with open(report_path, "wb") as status_obj:
status_obj.write(str.encode(json.dumps(result, indent=2, ensure_ascii=False)))
LogPrinter.info(f"tca qucik scan report path: {report_path}")