数据处理之从代码行号反向爬取代码片段
在修复代码 bug 的 Agent check_list 策略中,一个基本的三步方法如下:
- LLM 阅读给定代码块,根据给定的参考错误列表找到于 bug 描述相对应的有问题的“代码行号”。
- 根据有问题的“代码片段”,判断代码片段是否确实违反代码规范,以
0(正确)和1(错误)表示。 - 对于错误值为
1的代码片段,进行 bug 修复。
很明显可以看到在阶段 1 和阶段 2 之间需要运行某一个脚本,来根据“代码行号”反向爬取代码块中的“代码片段”。
这么做的原因是在阶段 1 直接让模型输出“代码片段”的策略可能存在大量错误,因为模型的评估标准较为宽松,并不能保证准确无误地找到确切的代码片段。
以下脚本可以完成给定数据的处理工作。
此外,为了方便模型微调,还对于有问题的代码片段增加了一个随机的 snippet_id,使用 uuid.uuid4().hex 方法生成。
import json
import re
import uuid
with open("config/config.json", encoding="utf-8")as conf:
con = json.loads(conf.read())
def extract_code_snippets_from_jsonl(input_jsonl, output_file):
# 从代码行号提取出代码片段区间,批量处理
line_count = 0
with open(input_jsonl, 'r', encoding='utf-8') as f_in:
with open(output_file, 'w', encoding='utf-8') as f_out:
for line in f_in:
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
output_rules_str = data["output"]
json_match = re.search(r'```json\s*\n(.*?)```', output_rules_str, re.DOTALL)
all_rules = json.loads(json_match.group(1) if json_match else json.loads(output_rules_str))
code_block = data["input"]
code_match = re.search(r'```c\s*\n(.*?)```', code_block, re.DOTALL)
if not code_match:
continue
raw_code_lines = code_match.group(1).split('\n')
line_rules = []
for rule in all_rules:
if not rule.get("output"):
continue
code_snippets = []
for code_range in rule["output"]:
start = code_range["start_line"] - 1
end = code_range.get("end_line", code_range["start_line"]) - 1
if 0 <= start < len(raw_code_lines) and 0 <= end < len(raw_code_lines):
snippet_lines = []
for line in raw_code_lines[start:end+1]:
cleaned_line = re.sub(r'^\s*\d+[: ]\s*', '', line)
snippet_lines.append(cleaned_line.rstrip())
snippet_text = '\n'.join(snippet_lines).strip()
if snippet_text:
code_snippets.append({
"snippet_id": uuid.uuid4().hex,
"content": snippet_text
})
if code_snippets:
line_rules.append({
"rule": rule["rule"],
"code": code_snippets
})
line_count += 1
if line_rules:
line_rules_str = json.dumps(line_rules, ensure_ascii=False)
output_data = {
"output": line_rules_str
}
# 将当前行的所有规则写入一行(不换行)
f_out.write(json.dumps(output_data, ensure_ascii=False) + '\n')
except json.JSONDecodeError:
continue # 跳过无效的 JSON 行
print(f"Processed {line_count} rules")