""" Weaviate Collection Export/Import Utility Korduvkasutatav tööriist Weaviate kollektsioonide eksportimiseks ja importimiseks JSON backup failide kaudu. Toetab UUID normaliseerimist, int/float tüüpe, doc_hash, vigu ja batch operatsioone. """ import datetime import json import uuid import logging from pathlib import Path from typing import Dict, List, Any, Optional, Union from weaviate import WeaviateClient, ConnectionParams from weaviate.classes.config import Property, DataType from decimal import Decimal import ijson logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class WeaviateExportImport: """Korduvkasutatav klass Weaviate kollektsioonide eksportimiseks ja importimiseks.""" def __init__(self, src_client: Optional[WeaviateClient] = None, dst_client: Optional[WeaviateClient] = None): self.src_client = src_client self.dst_client = dst_client @staticmethod def create_client(host: str, http_port: int = 9020, grpc_port: int = 50051, secure: bool = False) -> WeaviateClient: client = WeaviateClient(connection_params=ConnectionParams.from_params( http_host=host, http_port=http_port, http_secure=secure, grpc_host=host, grpc_port=grpc_port, grpc_secure=secure, )) client.connect() logger.info(f"Ühendatud Weaviate'ga: {host}:{http_port}") return client def normalize_int_fields(self, props: Dict[str, Any], int_fields: List[str] = None) -> Dict[str, Any]: if int_fields is None: int_fields = ["page_start", "page_end", "chunk"] for field in int_fields: if field in props: value = props[field] # Kui väärtus on float, Decimal või int-tüüp if isinstance(value, float) and value.is_integer(): props[field] = int(value) elif isinstance(value, Decimal): # kasuta kas int() või float(), olenevalt kontekstist props[field] = int(value) if value % 1 == 0 else float(value) return props def normalize_doc_hash(self, doc_hash: Any) -> str: if isinstance(doc_hash, uuid.UUID): return doc_hash.hex if isinstance(doc_hash, str) and len(doc_hash) == 36 and "-" in doc_hash: return doc_hash.replace("-", "") return str(doc_hash) def clean_uuid(self, obj: Any) -> Any: if isinstance(obj, dict): return {k: self.clean_uuid(v) for k, v in obj.items()} if isinstance(obj, (list, tuple)): return [self.clean_uuid(x) for x in obj] if isinstance(obj, uuid.UUID): return str(obj) if hasattr(obj, "__str__") and obj.__class__.__name__.lower().startswith("uuid"): return str(obj) return obj def process_properties(self, props: Dict[str, Any], int_fields: List[str] = None, hash_fields: List[str] = None) -> Dict[str, Any]: if hash_fields is None: hash_fields = ["doc_hash"] props = self.clean_uuid(props) props = self.normalize_int_fields(props, int_fields) for field in hash_fields: if field in props: props[field] = self.normalize_doc_hash(props[field]) return props def export_collection(self, collection_name: str, output_file: Union[str, Path], int_fields: List[str] = None, hash_fields: List[str] = None, include_vectors: bool = True) -> int: if not self.src_client: raise ValueError("Source client pole määratud") logger.info(f"Alustan kollektsiooni '{collection_name}' streaming eksporti...") collection = self.src_client.collections.get(collection_name) output_path = Path(output_file) count = 0 def custom_json_encoder(obj): if isinstance(obj, datetime.datetime): return obj.isoformat() return str(obj) # Kirjuta otse faili, mitte mällu with open(output_path, "w", encoding="utf-8") as f: f.write("[\n") # alusta JSON array first = True for item in collection.iterator(include_vector=include_vectors): props = self.process_properties(dict(item.properties), int_fields=int_fields, hash_fields=hash_fields) export_obj = { 'uuid': str(item.uuid), 'properties': props, } if include_vectors: export_obj['vector'] = item.vector # Kirjuta objekt otse faili if not first: f.write(",\n") json.dump(export_obj, f, ensure_ascii=False, default=custom_json_encoder) first = False count += 1 # Progress log iga 1000 objekti järel if count % 1000 == 0: logger.info(f"Eksporditud: {count} objekti...") f.write("\n]") # lõpeta JSON array logger.info(f"Eksport valmis: {count} objekti") return count def clean_decimals(self, obj: Any) -> Any: '''Teisenda kõik Decimal objektid float-ideks''' if isinstance(obj, Decimal): return float(obj) if isinstance(obj, dict): return {k: self.clean_decimals(v) for k, v in obj.items()} if isinstance(obj, list): return [self.clean_decimals(item) for item in obj] return obj def import_collection(self, collection_name: str, input_file: Union[str, Path], int_fields: List[str] = None, batch_size: int = 100, recreate_collection: bool = False) -> int: if not self.dst_client: raise ValueError("Destination client pole määratud") logger.info(f"Alustan kollektsiooni '{collection_name}' streaming importi...") collection = self.dst_client.collections.get(collection_name) input_path = Path(input_file) imported_count = 0 batch = [] # ijson.items loeb faili osade kaupa, mitte kogu faili mällu with open(input_path, 'rb') as f: # ✅ PARANDUS 1: Lisa use_decimal=False for obj in ijson.items(f, 'item', use_float=True): try: props = obj["properties"] props = self.clean_decimals(props) if int_fields: props = self.normalize_int_fields(props, int_fields) # ✅ PARANDUS 2: clean_decimals ka vectorile vector = obj.get("vector") if vector is not None: vector = self.clean_decimals(vector) batch.append({ 'uuid': str(obj["uuid"]), 'properties': props, 'vector': vector }) # Kui batch täis, importi if len(batch) >= batch_size: self._import_batch(collection, batch) imported_count += len(batch) logger.info(f"Imporditud: {imported_count} objekti...") batch = [] except Exception as e: logger.warning(f"Import error: {e}") # Importi viimane batch if batch: self._import_batch(collection, batch) imported_count += len(batch) logger.info(f"Import lõpetatud: {imported_count} objekti") return imported_count def _import_batch(self, collection, batch): '''Batch import helper''' for item in batch: try: collection.data.insert( properties=item['properties'], uuid=item['uuid'], vector=item.get('vector') ) except Exception as e: if "already exists" not in str(e): logger.warning(f"Insert error: {e}") def close_clients(self): if self.src_client: self.src_client.close() logger.info("Source client suletud") if self.dst_client: self.dst_client.close() logger.info("Destination client suletud")