-
Notifications
You must be signed in to change notification settings - Fork 50
Expand file tree
/
Copy pathingestion.py
More file actions
111 lines (101 loc) · 5.88 KB
/
ingestion.py
File metadata and controls
111 lines (101 loc) · 5.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import os
import sys
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from pathlib import Path
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.node_parser import SimpleFileNodeParser
from llama_index.readers.file import FlatReader
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.schema import ImageNode
from llama_index.core import SimpleDirectoryReader
from llms.vl_embedding import VL_Embedding
class Ingestion:
def __init__(self, dataset_dir,input_prefix='ppocr',output_prefix='bge_ingestion',embed_model_name='BAAI/bge-m3'):
self.dataset_dir = dataset_dir
self.input_dir = os.path.join(dataset_dir, input_prefix)
self.output_dir = os.path.join(dataset_dir, output_prefix)
self.output_file_format = 'document'
self.chunk_size = 1024
self.overlap_size = 0
self.workers = 5
self.reader = FlatReader()
self.embed_model_name = embed_model_name
# colqwen/colpali/visrag(openbmb)
if 'vidore' in embed_model_name or 'openbmb' in embed_model_name:
if input_prefix == 'img':
self.reader = SimpleDirectoryReader(input_dir = self.input_dir)
self.pipeline = IngestionPipeline(transformations=[
SimpleFileNodeParser(),
VL_Embedding(model=embed_model_name,mode='image')
])
else:
self.pipeline = IngestionPipeline(
transformations=[
SimpleFileNodeParser(),
SentenceSplitter(
include_metadata=True, include_prev_next_rel=True,
chunk_size=self.chunk_size,
chunk_overlap=self.overlap_size,
separator=' ',
paragraph_separator='\n\n\n', secondary_chunking_regex='[^,.;。?!]+[,.;。?!]?'),
VL_Embedding(model=embed_model_name,mode='text')
],
)
else:
self.pipeline = IngestionPipeline(
transformations=[
SimpleFileNodeParser(),
SentenceSplitter(
include_metadata=True, include_prev_next_rel=True,
chunk_size=self.chunk_size,
chunk_overlap=self.overlap_size,
separator=' ',
paragraph_separator='\n\n\n', secondary_chunking_regex='[^,.;。?!]+[,.;。?!]?'),
HuggingFaceEmbedding(model_name=self.embed_model_name,trust_remote_code=True)
],
)
def ingestion_example(self, input_file, output_file):
# image
if input_file.endswith('.jpg') or input_file.endswith('.png'):
documents = self.reader.load_file(Path(input_file),self.reader.file_metadata,self.reader.file_extractor)
nodes = self.pipeline.run(documents=documents,num_workers=1, show_progress=False)
else: # txt
documents=self.reader.load_data(Path(input_file))
nodes = self.pipeline.run(documents=documents, show_progress=False)
nodes_json = [node.to_dict() for node in nodes]
with open(output_file, 'w') as json_file:
json.dump(nodes_json, json_file, indent=2, ensure_ascii=False)
return True
def ingestion_multi_session(self):
os.makedirs(self.output_dir, exist_ok=True)
file_to_process = []
for file in os.listdir(self.input_dir):
file_prefix,_ = os.path.splitext(file)
input_file = os.path.join(self.input_dir, file)
output_file = os.path.join(self.output_dir, file_prefix) + '.node'
if not os.path.exists(output_file):
file_to_process.append((input_file, output_file))
if self.workers == 1:
for input_file, output_file in tqdm(file_to_process):
self.ingestion_example(input_file, output_file)
else:
with ThreadPoolExecutor(max_workers=self.workers) as executor:
future_to_file = {executor.submit(self.ingestion_example, input_file, output_file): (input_file, output_file) for input_file, output_file in file_to_process}
for future in tqdm(as_completed(future_to_file), total=len(file_to_process), desc='Processing files'):
result_type = future.result()
if __name__ == '__main__':
root_path = './data'
datasets = ['ExampleDataset', 'SlideVQA']
for dataset in datasets:
dataset_dir = os.path.join(root_path, dataset)
# select a embedding model
ingestion = Ingestion(dataset_dir,input_prefix='img',output_prefix='colqwen_ingestion',embed_model_name='vidore/colqwen2-v1.0') # colqwen2
# ingestion = Ingestion(dataset_dir,input_prefix='img',output_prefix='colpali_ingestion',embed_model_name='vidore/colpali-v1.2') # colpali
# ingestion = Ingestion(dataset_dir,input_prefix='img',output_prefix='visrag_ingestion',embed_model_name='openbmb/VisRAG-Ret') # visrag
# ingestion = Ingestion(dataset_dir,input_prefix='ppocr',output_prefix='nv_ingestion',embed_model_name='nvidia/NV-Embed-v2') # nv-embed
# ingestion = Ingestion(dataset_dir,input_prefix='ppocr',output_prefix='bge_ingestion',embed_model_name='BAAI/bge-m3') # bge-m3
# run
ingestion.ingestion_multi_session()