-
Notifications
You must be signed in to change notification settings - Fork 336
Expand file tree
/
Copy pathcsv_processor.py
More file actions
138 lines (107 loc) · 4.86 KB
/
csv_processor.py
File metadata and controls
138 lines (107 loc) · 4.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""CSV Data Processor Ingests CSV files and prepares them for OPEA knowledge base."""
import json
import logging
import os
from pathlib import Path
from typing import Any, Dict, List
import pandas as pd
logger = logging.getLogger(__name__)
class CSVProcessor:
"""Process CSV files for inventory data."""
def __init__(self, data_dir: str = "/data"):
self.data_dir = Path(data_dir)
self.processed_data = {}
def load_all_csv_files(self) -> Dict[str, pd.DataFrame]:
"""Load all CSV files from the data directory."""
csv_files = list(self.data_dir.glob("*.csv"))
logger.info(f"Found {len(csv_files)} CSV files")
dataframes = {}
for csv_file in csv_files:
try:
df = pd.read_csv(csv_file)
dataframes[csv_file.stem] = df
logger.info(f"Loaded {csv_file.name}: {len(df)} rows")
except Exception as e:
logger.error(f"Error loading {csv_file.name}: {e}")
return dataframes
def prepare_for_embedding(self, dataframes: Dict[str, pd.DataFrame]) -> List[Dict[str, Any]]:
"""Prepare data for OPEA embedding service."""
documents = []
for name, df in dataframes.items():
for idx, row in df.iterrows():
# Create a text representation of the row
text_parts = []
for col in df.columns:
value = row[col]
if pd.notna(value):
text_parts.append(f"{col}: {value}")
doc_text = " | ".join(text_parts)
documents.append(
{
"id": f"{name}_{idx}",
"source": name,
"text": doc_text,
"metadata": row.to_dict(),
}
)
logger.info(f"Prepared {len(documents)} documents for embedding")
return documents
def extract_inventory_data(self, dataframes: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
"""Extract structured inventory data."""
inventory_data = {
"products": [],
"categories": set(),
"warehouses": set(),
"total_items": 0,
}
for name, df in dataframes.items():
# Try to identify inventory-related columns
if any(col in df.columns for col in ["product", "Product", "item", "Item"]):
for _, row in df.iterrows():
product_info = {"source": name, "data": row.to_dict()}
inventory_data["products"].append(product_info)
inventory_data["total_items"] += 1
# Extract categories if available
if "category" in df.columns or "Category" in df.columns:
cat_col = "category" if "category" in df.columns else "Category"
if pd.notna(row[cat_col]):
inventory_data["categories"].add(str(row[cat_col]))
inventory_data["categories"] = list(inventory_data["categories"])
inventory_data["warehouses"] = list(inventory_data["warehouses"])
return inventory_data
def create_knowledge_base(self) -> Dict[str, Any]:
"""Create a complete knowledge base from CSV data."""
dataframes = self.load_all_csv_files()
knowledge_base = {
"documents": self.prepare_for_embedding(dataframes),
"inventory": self.extract_inventory_data(dataframes),
"metadata": {
"total_files": len(dataframes),
"total_documents": sum(len(df) for df in dataframes.values()),
"files": [name for name in dataframes.keys()],
},
}
# Save processed data
output_dir = self.data_dir / "processed"
output_dir.mkdir(exist_ok=True)
with open(output_dir / "knowledge_base.json", "w") as f:
json.dump(knowledge_base, f, indent=2, default=str)
logger.info(f"Knowledge base created with {len(knowledge_base['documents'])} documents")
return knowledge_base
def get_product_summary(self) -> Dict[str, Any]:
"""Get a summary of products for quick access."""
dataframes = self.load_all_csv_files()
summary = {"total_files": len(dataframes), "file_details": []}
for name, df in dataframes.items():
file_info = {
"name": name,
"rows": len(df),
"columns": list(df.columns),
"sample": df.head(3).to_dict("records") if len(df) > 0 else [],
}
summary["file_details"].append(file_info)
return summary
# Global CSV processor instance
csv_processor = CSVProcessor(data_dir=os.getenv("CSV_DATA_DIR", "../data"))