|
|
@@ -0,0 +1,219 @@
|
|
|
+"""
|
|
|
+Weaviate Collection Export/Import Utility
|
|
|
+
|
|
|
+Korduvkasutatav tööriist Weaviate kollektsioonide eksportimiseks ja importimiseks JSON backup failide kaudu.
|
|
|
+Toetab UUID normaliseerimist, int/float tüüpe, doc_hash, vigu ja batch operatsioone.
|
|
|
+"""
|
|
|
+import datetime
|
|
|
+import json
|
|
|
+import uuid
|
|
|
+import logging
|
|
|
+from pathlib import Path
|
|
|
+from typing import Dict, List, Any, Optional, Union
|
|
|
+from weaviate import WeaviateClient, ConnectionParams
|
|
|
+from weaviate.classes.config import Property, DataType
|
|
|
+from decimal import Decimal
|
|
|
+import ijson
|
|
|
+
|
|
|
+logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
+class WeaviateExportImport:
|
|
|
+ """Korduvkasutatav klass Weaviate kollektsioonide eksportimiseks ja importimiseks."""
|
|
|
+
|
|
|
+ def __init__(self, src_client: Optional[WeaviateClient] = None, dst_client: Optional[WeaviateClient] = None):
|
|
|
+ self.src_client = src_client
|
|
|
+ self.dst_client = dst_client
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def create_client(host: str, http_port: int = 9020, grpc_port: int = 50051, secure: bool = False) -> WeaviateClient:
|
|
|
+ client = WeaviateClient(connection_params=ConnectionParams.from_params(
|
|
|
+ http_host=host,
|
|
|
+ http_port=http_port,
|
|
|
+ http_secure=secure,
|
|
|
+ grpc_host=host,
|
|
|
+ grpc_port=grpc_port,
|
|
|
+ grpc_secure=secure,
|
|
|
+ ))
|
|
|
+ client.connect()
|
|
|
+ logger.info(f"Ühendatud Weaviate'ga: {host}:{http_port}")
|
|
|
+ return client
|
|
|
+
|
|
|
+
|
|
|
+ def normalize_int_fields(self, props: Dict[str, Any], int_fields: List[str] = None) -> Dict[str, Any]:
|
|
|
+ if int_fields is None:
|
|
|
+ int_fields = ["page_start", "page_end", "chunk"]
|
|
|
+
|
|
|
+ for field in int_fields:
|
|
|
+ if field in props:
|
|
|
+ value = props[field]
|
|
|
+ # Kui väärtus on float, Decimal või int-tüüp
|
|
|
+ if isinstance(value, float) and value.is_integer():
|
|
|
+ props[field] = int(value)
|
|
|
+ elif isinstance(value, Decimal):
|
|
|
+ # kasuta kas int() või float(), olenevalt kontekstist
|
|
|
+ props[field] = int(value) if value % 1 == 0 else float(value)
|
|
|
+ return props
|
|
|
+
|
|
|
+
|
|
|
+ def normalize_doc_hash(self, doc_hash: Any) -> str:
|
|
|
+ if isinstance(doc_hash, uuid.UUID):
|
|
|
+ return doc_hash.hex
|
|
|
+ if isinstance(doc_hash, str) and len(doc_hash) == 36 and "-" in doc_hash:
|
|
|
+ return doc_hash.replace("-", "")
|
|
|
+ return str(doc_hash)
|
|
|
+
|
|
|
+ def clean_uuid(self, obj: Any) -> Any:
|
|
|
+ if isinstance(obj, dict):
|
|
|
+ return {k: self.clean_uuid(v) for k, v in obj.items()}
|
|
|
+ if isinstance(obj, (list, tuple)):
|
|
|
+ return [self.clean_uuid(x) for x in obj]
|
|
|
+ if isinstance(obj, uuid.UUID):
|
|
|
+ return str(obj)
|
|
|
+ if hasattr(obj, "__str__") and obj.__class__.__name__.lower().startswith("uuid"):
|
|
|
+ return str(obj)
|
|
|
+ return obj
|
|
|
+
|
|
|
+ def process_properties(self, props: Dict[str, Any], int_fields: List[str] = None, hash_fields: List[str] = None) -> Dict[str, Any]:
|
|
|
+ if hash_fields is None:
|
|
|
+ hash_fields = ["doc_hash"]
|
|
|
+ props = self.clean_uuid(props)
|
|
|
+ props = self.normalize_int_fields(props, int_fields)
|
|
|
+ for field in hash_fields:
|
|
|
+ if field in props:
|
|
|
+ props[field] = self.normalize_doc_hash(props[field])
|
|
|
+ return props
|
|
|
+
|
|
|
+ def export_collection(self, collection_name: str, output_file: Union[str, Path],
|
|
|
+ int_fields: List[str] = None, hash_fields: List[str] = None,
|
|
|
+ include_vectors: bool = True) -> int:
|
|
|
+ if not self.src_client:
|
|
|
+ raise ValueError("Source client pole määratud")
|
|
|
+
|
|
|
+ logger.info(f"Alustan kollektsiooni '{collection_name}' streaming eksporti...")
|
|
|
+ collection = self.src_client.collections.get(collection_name)
|
|
|
+ output_path = Path(output_file)
|
|
|
+ count = 0
|
|
|
+
|
|
|
+ def custom_json_encoder(obj):
|
|
|
+ if isinstance(obj, datetime.datetime):
|
|
|
+ return obj.isoformat()
|
|
|
+ return str(obj)
|
|
|
+
|
|
|
+ # Kirjuta otse faili, mitte mällu
|
|
|
+ with open(output_path, "w", encoding="utf-8") as f:
|
|
|
+ f.write("[\n") # alusta JSON array
|
|
|
+ first = True
|
|
|
+
|
|
|
+ for item in collection.iterator(include_vector=include_vectors):
|
|
|
+ props = self.process_properties(dict(item.properties),
|
|
|
+ int_fields=int_fields,
|
|
|
+ hash_fields=hash_fields)
|
|
|
+ export_obj = {
|
|
|
+ 'uuid': str(item.uuid),
|
|
|
+ 'properties': props,
|
|
|
+ }
|
|
|
+
|
|
|
+ if include_vectors:
|
|
|
+ export_obj['vector'] = item.vector
|
|
|
+
|
|
|
+ # Kirjuta objekt otse faili
|
|
|
+ if not first:
|
|
|
+ f.write(",\n")
|
|
|
+ json.dump(export_obj, f, ensure_ascii=False, default=custom_json_encoder)
|
|
|
+ first = False
|
|
|
+ count += 1
|
|
|
+
|
|
|
+ # Progress log iga 1000 objekti järel
|
|
|
+ if count % 1000 == 0:
|
|
|
+ logger.info(f"Eksporditud: {count} objekti...")
|
|
|
+
|
|
|
+ f.write("\n]") # lõpeta JSON array
|
|
|
+
|
|
|
+ logger.info(f"Eksport valmis: {count} objekti")
|
|
|
+ return count
|
|
|
+
|
|
|
+ def clean_decimals(self, obj: Any) -> Any:
|
|
|
+ '''Teisenda kõik Decimal objektid float-ideks'''
|
|
|
+ if isinstance(obj, Decimal):
|
|
|
+ return float(obj)
|
|
|
+ if isinstance(obj, dict):
|
|
|
+ return {k: self.clean_decimals(v) for k, v in obj.items()}
|
|
|
+ if isinstance(obj, list):
|
|
|
+ return [self.clean_decimals(item) for item in obj]
|
|
|
+ return obj
|
|
|
+
|
|
|
+ def import_collection(self, collection_name: str, input_file: Union[str, Path],
|
|
|
+ int_fields: List[str] = None, batch_size: int = 100,
|
|
|
+ recreate_collection: bool = False) -> int:
|
|
|
+ if not self.dst_client:
|
|
|
+ raise ValueError("Destination client pole määratud")
|
|
|
+
|
|
|
+ logger.info(f"Alustan kollektsiooni '{collection_name}' streaming importi...")
|
|
|
+ collection = self.dst_client.collections.get(collection_name)
|
|
|
+ input_path = Path(input_file)
|
|
|
+
|
|
|
+ imported_count = 0
|
|
|
+ batch = []
|
|
|
+
|
|
|
+ # ijson.items loeb faili osade kaupa, mitte kogu faili mällu
|
|
|
+ with open(input_path, 'rb') as f:
|
|
|
+ # ✅ PARANDUS 1: Lisa use_decimal=False
|
|
|
+ for obj in ijson.items(f, 'item', use_float=True):
|
|
|
+ try:
|
|
|
+ props = obj["properties"]
|
|
|
+ props = self.clean_decimals(props)
|
|
|
+ if int_fields:
|
|
|
+ props = self.normalize_int_fields(props, int_fields)
|
|
|
+
|
|
|
+ # ✅ PARANDUS 2: clean_decimals ka vectorile
|
|
|
+ vector = obj.get("vector")
|
|
|
+ if vector is not None:
|
|
|
+ vector = self.clean_decimals(vector)
|
|
|
+
|
|
|
+ batch.append({
|
|
|
+ 'uuid': str(obj["uuid"]),
|
|
|
+ 'properties': props,
|
|
|
+ 'vector': vector
|
|
|
+ })
|
|
|
+
|
|
|
+ # Kui batch täis, importi
|
|
|
+ if len(batch) >= batch_size:
|
|
|
+ self._import_batch(collection, batch)
|
|
|
+ imported_count += len(batch)
|
|
|
+ logger.info(f"Imporditud: {imported_count} objekti...")
|
|
|
+ batch = []
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"Import error: {e}")
|
|
|
+
|
|
|
+ # Importi viimane batch
|
|
|
+ if batch:
|
|
|
+ self._import_batch(collection, batch)
|
|
|
+ imported_count += len(batch)
|
|
|
+
|
|
|
+ logger.info(f"Import lõpetatud: {imported_count} objekti")
|
|
|
+ return imported_count
|
|
|
+
|
|
|
+
|
|
|
+ def _import_batch(self, collection, batch):
|
|
|
+ '''Batch import helper'''
|
|
|
+ for item in batch:
|
|
|
+ try:
|
|
|
+ collection.data.insert(
|
|
|
+ properties=item['properties'],
|
|
|
+ uuid=item['uuid'],
|
|
|
+ vector=item.get('vector')
|
|
|
+ )
|
|
|
+ except Exception as e:
|
|
|
+ if "already exists" not in str(e):
|
|
|
+ logger.warning(f"Insert error: {e}")
|
|
|
+
|
|
|
+
|
|
|
+ def close_clients(self):
|
|
|
+ if self.src_client:
|
|
|
+ self.src_client.close()
|
|
|
+ logger.info("Source client suletud")
|
|
|
+ if self.dst_client:
|
|
|
+ self.dst_client.close()
|
|
|
+ logger.info("Destination client suletud")
|