import weaviate from weaviate.classes.config import DataType, Property, Configure, VectorDistances from weaviate.classes.query import Filter import hashlib import json from typing import Dict, List, Optional, Any import logging from urllib.parse import urlparse from .config import config logger = logging.getLogger(__name__) class WeaviateClient: """Weaviate kliendi klass versioon 4.19.0""" def __init__(self): self.logger = logging.getLogger(__name__) self.client = self._connect_to_weaviate() self.class_name = "ScientificArticle" self._setup_schema() def _connect_to_weaviate(self): """Ühenda Weaviate'iga Weaviate 4.19.0 süntaksiga""" try: url = config.weaviate_url self.logger.info(f"Ühendan Weaviate: {url}") # Eemalda http:// või https:// kui on parsed_url = urlparse(url) if not parsed_url.scheme: # Kui pole protokolli, lisa http:// url = f"http://{url}" parsed_url = urlparse(url) host = parsed_url.hostname port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80) secure = parsed_url.scheme == 'https' self.logger.info(f"Parsitud: host={host}, port={port}, secure={secure}") # Uus süntaks Weaviate 4.19.0 jaoks if config.weaviate_api_key: # Kui on API võti auth_credentials = weaviate.auth.AuthApiKey(config.weaviate_api_key) client = weaviate.WeaviateClient( connection_params=weaviate.connect.ConnectionParams.from_params( http_host=host, http_port=port, http_secure=secure, grpc_host=host, grpc_port=50051, grpc_secure=secure, auth_credentials=auth_credentials ) ) else: # Ilma autentimiseta client = weaviate.WeaviateClient( connection_params=weaviate.connect.ConnectionParams.from_params( http_host=host, http_port=port, http_secure=secure, grpc_host=host, grpc_port=50051, grpc_secure=secure ) ) # Ühenda client.connect() self.logger.info(f"Ühendatud Weaviate'iga: {host}:{port}") return client except Exception as e: self.logger.error(f"Viga Weaviate'iga ühendumisel: {str(e)}") self.logger.info("Proovin alternatiivset ühendusviisi...") return self._connect_fallback() def _connect_fallback(self): """Alternatiivne ühendusviis""" try: url = config.weaviate_url self.logger.info(f"Alternatiivne ühendus: {url}") # Lihtsam viis if not url.startswith(("http://", "https://")): url = f"http://{url}" # Kasutame otse weaviate.connect_to_weaviate funktsiooni if config.weaviate_api_key: client = weaviate.connect_to_weaviate( cluster_url=url, auth_credentials=weaviate.auth.AuthApiKey(config.weaviate_api_key) ) else: # Kui on localhost, kasuta connect_to_local if "localhost" in url or "127.0.0.1" in url: # Eralda host ja port parsed = urlparse(url) host = parsed.hostname port = parsed.port or 8080 client = weaviate.connect_to_local( host=host, port=port ) else: client = weaviate.connect_to_weaviate( cluster_url=url ) self.logger.info(f"Alternatiivne ühendus õnnestus: {url}") return client except Exception as e: self.logger.error(f"Mõlemad ühendusviisid ebaõnnestusid: {str(e)}") raise ConnectionError(f"Ei saanud Weaviate'iga ühendust: {str(e)}") def _setup_schema(self): """Loo või kontrolli Weaviate skeemi""" try: # Kontrolli, kas klass on olemas if self.client.collections.exists(self.class_name): self.logger.info(f"Klass {self.class_name} on juba olemas") return # Loo uus klass self.client.collections.create( name=self.class_name, # Kasutame oma embeddinguid vector_config=Configure.Vector.none(), properties=[ Property( name="article_id", data_type=DataType.TEXT, description="Artikli unikaalne ID" ), Property( name="title", data_type=DataType.TEXT, description="Artikli pealkiri" ), Property( name="authors", data_type=DataType.TEXT_ARRAY, description="Artikli autorid" ), Property( name="year", data_type=DataType.INT, description="Avaldamisaasta" ), Property( name="journal", data_type=DataType.TEXT, description="Žurnaal" ), Property( name="doi", data_type=DataType.TEXT, description="DOI identifikaator" ), Property( name="abstract_en", data_type=DataType.TEXT, description="Inglise keelne abstrakt" ), Property( name="summary_et", data_type=DataType.TEXT, description="Eesti keelne kokkuvõte" ), Property( name="key_concepts", data_type=DataType.TEXT_ARRAY, description="Võtmesõnad ja mõisted" ), Property( name="methods_used", data_type=DataType.TEXT_ARRAY, description="Kasutatud meetodid" ), Property( name="transport_context", data_type=DataType.TEXT, description="Transpordi konteksti analüüs" ), Property( name="relevance_score", data_type=DataType.INT, description="Relevantsus skoor 1-10" ), Property( name="processing_date", data_type=DataType.TEXT, description="Töötlemise kuupäev" ), Property( name="source_file", data_type=DataType.TEXT, description="Algne PDF fail" ), Property( name="file_hash", data_type=DataType.TEXT, description="Faili hash duplikaatide kontrolliks" ) ] ) self.logger.info(f"Loodi klass: {self.class_name}") except Exception as e: self.logger.error(f"Viga skeemi seadistamisel: {str(e)}") # Võib olla klass juba olemas pass def generate_article_id(self, article_data: Dict) -> str: """Genereeri artikli unikaalne ID""" unique_string = "" if article_data.get('doi'): unique_string = article_data['doi'] elif article_data.get('file_hash'): unique_string = article_data['file_hash'] else: title = article_data.get('title', '') authors = article_data.get('authors', []) unique_string = f"{title}_{'_'.join(authors[:3])}" # Loo MD5 hash article_id = hashlib.md5(unique_string.encode()).hexdigest() return article_id def article_exists(self, article_id: str) -> bool: """Kontrolli, kas artikkel on juba baasis""" try: collection = self.client.collections.get(self.class_name) response = collection.query.fetch_objects( filters=Filter.by_property("article_id").equal(article_id), limit=1 ) return len(response.objects) > 0 except Exception as e: self.logger.error(f"Viga artikli olemasolu kontrollimisel: {str(e)}") return False def save_article(self, article_data: Dict, embeddings: Dict) -> bool: """Salvesta artikkel Weaviate'i""" try: # Genereeri artikkel ID article_id = self.generate_article_id(article_data) # Kontrolli duplikaati if self.article_exists(article_id): self.logger.info(f"Artikkel {article_id} on juba olemas, jätan vahele") return False # Puhasta transport_context enne salvestamist transport_context = article_data.get('transport_context', {}) # Kui on dict, puhasta kõik stringiväljad if isinstance(transport_context, dict): cleaned_context = {} for key, value in transport_context.items(): if isinstance(value, str): # Asenda stringi sees olevad topeltjutumärgid cleaned_value = value.replace('"', "'") cleaned_value = cleaned_value.replace('\\"', "'") cleaned_context[key] = cleaned_value else: cleaned_context[key] = value transport_context_str = json.dumps(cleaned_context, ensure_ascii=False) else: # Kui on string, puhasta see transport_context_str = str(transport_context) transport_context_str = transport_context_str.replace('\\"', "'") transport_context_str = transport_context_str.replace('"', "'") # Valmistame andmed ette properties = { "article_id": article_id, "title": article_data.get('title', ''), "authors": article_data.get('authors', []), "year": int(article_data.get('year', 0)) if str(article_data.get('year', '0')).isdigit() else 0, "journal": article_data.get('journal', ''), "doi": article_data.get('doi', ''), "abstract_en": article_data.get('abstract_en', ''), "summary_et": article_data.get('summary_et', ''), "key_concepts": article_data.get('key_concepts', []), "methods_used": article_data.get('methods_used', []), "transport_context": transport_context_str, # Kasuta puhastatud stringi "relevance_score": article_data.get('relevance_score', 5), "processing_date": article_data.get('processing_date', ''), "source_file": article_data.get('source_file', ''), "file_hash": article_data.get('file_hash', '') } # Kasutame kokkuvõtte embeddingut vektorina vector = embeddings.get('summary', []) # Salvesta Weaviate'i collection = self.client.collections.get(self.class_name) # Lisa objekt koos vektoriga collection.data.insert( properties=properties, vector=vector ) self.logger.info(f"Artikkel salvestatud: {article_id}") return True except Exception as e: self.logger.error(f"Viga artikli salvestamisel: {str(e)}") return False def search_articles(self, query: str, limit: int = 10) -> List[Dict]: """Otsi artikleid""" try: collection = self.client.collections.get(self.class_name) # Lihtne otsing response = collection.query.bm25( query=query, query_properties=["title", "summary_et", "abstract_en", "key_concepts"], limit=limit ) results = [] for obj in response.objects: article_data = { 'article_id': obj.properties.get('article_id'), 'title': obj.properties.get('title'), 'authors': obj.properties.get('authors', []), 'year': obj.properties.get('year'), 'summary': obj.properties.get('summary_et', '')[:500] + '...', 'relevance_score': obj.properties.get('relevance_score'), 'score': obj.metadata.score } results.append(article_data) return results except Exception as e: self.logger.error(f"Viga otsingul: {str(e)}") return [] def close(self): """Sulge Weaviate ühendus""" if hasattr(self, 'client') and self.client: try: self.client.close() except: pass