Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 192 additions & 0 deletions etc/scripts/dataset_pipeline/build_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
# extracts required phrases from .RULE files
# outputs a JSONL dataset for NER model training
import hashlib
import json
import re
import unicodedata
from collections import Counter
from pathlib import Path
import click

from licensedcode.models import Rule
from licensedcode.models import rules_data_dir as default_rules_data_dir
from licensedcode.required_phrases import get_required_phrase_verbatim
from licensedcode.tokenize import required_phrase_splitter


def normalize_phrase(phrase):
"""Clean raw marker phrase for training"""
result = phrase
# replace html entities
result = result.replace('"', '"').replace('&', '&')
result = result.replace('&lt;', '<').replace('&gt;', '>')
# strip xml tags like <name>,</license> but keep urls in angle brackets
result = re.sub(r'<(?![a-zA-Z]+://)[^>]+>', '', result)
# remove markdown backticks
result = result.replace('`', '')
# collapse whitespace and trim
result = re.sub(r'\s+', ' ', result).strip()
# strip trailing/leading punct thats not meaningful
result = result.strip('.,;:<>')
return result


def get_rule_type(rule):
"""is_* flag set on the rule"""
for flag in ('is_license_text', 'is_license_notice', 'is_license_reference',
'is_license_tag', 'is_license_intro', 'is_license_clue',
'is_false_positive'):
if getattr(rule, flag, False):
return flag
return 'unknown'


def tag_tokens(text):
"""Tag each word token with a BIOES label"""
tokens = []
labels = []
in_phrase = False
count = 0 # word tokens seen since the last {{

for tok in required_phrase_splitter(text):
if tok == '{{':
in_phrase = True
count = 0
continue
if tok == '}}':
if in_phrase and count > 0:
labels[-1] = 'S-REQ' if count == 1 else 'E-REQ'
in_phrase = False
count = 0
continue
tokens.append(tok)
if in_phrase:
labels.append('B-REQ' if count == 0 else 'I-REQ')
count += 1
else:
labels.append('O')
return tokens, labels


def assign_splits(results, threshold=50):
"""80/10/10 split. common expressions (>= threshold rules) get split per-rule,
rare ones stay together in one split"""
expr_counts = Counter(e['license_expression'] for e in results)
heavy = {e for e, c in expr_counts.items() if c >= threshold}

# rare expressions: assign each to the split that needs more rules
light_exprs = sorted((e for e in expr_counts if e not in heavy),
key=lambda x: (-expr_counts[x], x))
total = sum(expr_counts[e] for e in light_exprs)
targets = {'train': 0.8 * total, 'val': 0.1 * total, 'test': 0.1 * total}
filled = {'train': 0, 'val': 0, 'test': 0}
assignment = {}
for expr in light_exprs:
best = min(targets, key=lambda s: filled[s] / max(targets[s], 1))
assignment[expr] = best
filled[best] += expr_counts[expr]

return heavy, assignment


@click.command()
@click.option('--rules-dir', type=click.Path(exists=True), default=None,
help='Path to rules directory (defaults to repo rules dir)')
@click.option('--output-dir', default='dataset-output',
help='Output directory for train/val/test JSONL files')
def main(rules_dir, output_dir):
"""Extract required phrases from rule files for NER training"""
if not rules_dir:
repo_rules = Path(__file__).resolve().parents[3] / 'src' / 'licensedcode' / 'data' / 'rules'
rules_dir = str(repo_rules) if repo_rules.is_dir() else default_rules_data_dir

rules_path = Path(rules_dir)
out_dir = Path(output_dir)
out_dir.mkdir(parents=True, exist_ok=True)

total_rules = 0
annotated = 0
total_phrases = 0
results = []

click.echo(f'scanning rules from: {rules_path}')
for rf in sorted(rules_path.glob('*.RULE')):
try:
rule = Rule.from_file(rule_file=str(rf))
except Exception:
continue
total_rules += 1

# is_required_phrase rules don't need {{ }}.the flag covers them
if getattr(rule, 'is_required_phrase', False):
continue

text = rule.text or ''
if not text:
continue

# normalize line endings and unicode
text = text.replace('\r\n', '\n').replace('\r', '\n')
text = unicodedata.normalize('NFKC', text)

phrases = list(get_required_phrase_verbatim(text))
if not phrases:
continue

# word tokens + BIOES labels (computed before stripping markers)
tokens, bioes_labels = tag_tokens(text)

# strip out the {{ }} markers
text = text.replace('{{', '').replace('}}', '')

valid_phrases = [
{'phrase': p, 'phrase_normalized': normalize_phrase(p)}
for p in phrases
]

annotated += 1
total_phrases += len(valid_phrases)
results.append({
'identifier': rule.identifier,
'license_expression': rule.license_expression or '',
'rule_type': get_rule_type(rule),
'text': text,
'tokens': tokens,
'bioes_labels': bioes_labels,
'required_phrases': valid_phrases,
})

# split by license expression and write
heavy, assignment = assign_splits(results)
splits = {'train': [], 'val': [], 'test': []}
for entry in results:
expr = entry['license_expression']
if expr in heavy:
# common expressions: hash rule name for 80/10/10
bucket = int(hashlib.md5(entry['identifier'].encode('utf-8')).hexdigest(), 16) % 100
if bucket < 80:
splits['train'].append(entry)
elif bucket < 90:
splits['val'].append(entry)
else:
splits['test'].append(entry)
else:
splits[assignment[expr]].append(entry)

for name, records in splits.items():
path = out_dir / f'{name}.jsonl'
with open(path, 'w', encoding='utf-8') as f:
for entry in records:
f.write(json.dumps(entry, ensure_ascii=False) + '\n')

click.echo('\ndone')
click.echo(f' rules scanned: {total_rules}')
click.echo(f' annotated: {annotated}')
click.echo(f' phrases extracted: {total_phrases}')
click.echo(f' train: {len(splits["train"])} val: {len(splits["val"])} test: {len(splits["test"])}')
click.echo(f' output: {out_dir}')

# stuff to do(follow up commits):
# tests to be added in script
if __name__ == '__main__':
main()