-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathweaviate_integration.py
More file actions
153 lines (125 loc) · 5.74 KB
/
weaviate_integration.py
File metadata and controls
153 lines (125 loc) · 5.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""
Weaviate integration module for ChatDash.
This module provides a singleton class for managing Weaviate connections
and checking literature collection availability.
"""
import os
import logging
from typing import Dict, Optional, Tuple
import weaviate
from weaviate.config import AdditionalConfig, Timeout
from dotenv import load_dotenv
from pathlib import Path
from contextlib import contextmanager
# Load environment variables
project_root = Path(__file__).parent
dotenv_path = project_root / '.env'
load_dotenv(dotenv_path=dotenv_path)
class WeaviateConnection:
"""Singleton class for managing Weaviate connection."""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(WeaviateConnection, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._initialized = True
self.client = None
self.logger = logging.getLogger(__name__)
# Get OpenAI configuration from environment
self.openai_api_key = os.getenv('OPENAI_API_KEY', '')
self.openai_base_url = os.getenv('OPENAI_BASE_URL', 'https://api.openai.com')
# Get Weaviate configuration
self.weaviate_host = "weaviate.kbase.us"
self.weaviate_http_port = 443
self.weaviate_grpc_host = "weaviate-grpc.kbase.us"
self.weaviate_grpc_port = 443
def _create_client(self):
"""Create a new Weaviate client."""
return weaviate.connect_to_custom(
http_host=self.weaviate_host,
http_port=self.weaviate_http_port,
http_secure=True,
grpc_host=self.weaviate_grpc_host,
grpc_port=self.weaviate_grpc_port,
grpc_secure=True,
headers={
"X-OpenAI-Api-Key": self.openai_api_key,
},
additional_config=AdditionalConfig(
timeout_config=120,
timeout_vectorizer=120
),
skip_init_checks=True
)
@contextmanager
def get_client(self):
"""Context manager for Weaviate client connection."""
client = None
try:
client = self._create_client()
yield client
finally:
if client:
client.close()
def connect(self) -> Tuple[bool, str]:
"""Establish connection to Weaviate."""
try:
if not self.openai_api_key:
return False, "OpenAI API key not found in environment"
with self.get_client() as client:
# Just test the connection
return True, "Connected"
except Exception as e:
self.logger.error(f"Connection error: {str(e)}")
return False, str(e)
def check_literature_collections(self) -> Tuple[bool, str]:
"""Check if required literature collections exist."""
try:
# Get managed collections from settings
from weaviate_manager.config.settings import MANAGED_COLLECTIONS
with self.get_client() as client:
try:
# Get all collections with full configuration
collections = client.collections.list_all(simple=False)
existing_collections = set(collections.keys())
# Check if all required collections exist
missing = [col for col in MANAGED_COLLECTIONS if col not in existing_collections]
if missing:
return False, f"Missing collections: {', '.join(missing)}"
# Verify each collection has the expected configuration
for collection_name in MANAGED_COLLECTIONS:
if collection_name not in collections:
continue
collection = client.collections.get(collection_name)
config = collection.config.get()
# Check if collection has vectorizer configuration
if not hasattr(config, 'vectorizer'):
return False, f"Collection {collection_name} missing vectorizer configuration"
# Check if collection has required properties
if not hasattr(config, 'properties') or not config.properties:
return False, f"Collection {collection_name} has no properties"
return True, "Literature collections available"
except Exception as e:
self.logger.error(f"Error listing collections: {str(e)}")
return False, f"Error listing collections: {str(e)}"
except Exception as e:
self.logger.error(f"Collection check error: {str(e)}")
return False, str(e)
def get_status(self) -> Dict[str, Dict[str, str]]:
"""Get current connection and collection status."""
connected, conn_msg = self.connect()
collections_ok, coll_msg = self.check_literature_collections() if connected else (False, "Not connected")
return {
'connection': {
'status': 'connected' if connected else 'error',
'message': conn_msg
},
'collections': {
'status': 'available' if collections_ok else 'error',
'message': coll_msg
}
}