| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 |
- """
- Weaviate Collection Export/Import Utility
- Korduvkasutatav tööriist Weaviate kollektsioonide eksportimiseks ja importimiseks JSON backup failide kaudu.
- Toetab UUID normaliseerimist, int/float tüüpe, doc_hash, vigu ja batch operatsioone.
- """
- import datetime
- import json
- import uuid
- import logging
- from pathlib import Path
- from typing import Dict, List, Any, Optional, Union
- from weaviate import WeaviateClient, ConnectionParams
- from weaviate.classes.config import Property, DataType
- from decimal import Decimal
- import ijson
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
- logger = logging.getLogger(__name__)
- class WeaviateExportImport:
- """Korduvkasutatav klass Weaviate kollektsioonide eksportimiseks ja importimiseks."""
- def __init__(self, src_client: Optional[WeaviateClient] = None, dst_client: Optional[WeaviateClient] = None):
- self.src_client = src_client
- self.dst_client = dst_client
- @staticmethod
- def create_client(host: str, http_port: int = 9020, grpc_port: int = 50051, secure: bool = False) -> WeaviateClient:
- client = WeaviateClient(connection_params=ConnectionParams.from_params(
- http_host=host,
- http_port=http_port,
- http_secure=secure,
- grpc_host=host,
- grpc_port=grpc_port,
- grpc_secure=secure,
- ))
- client.connect()
- logger.info(f"Ühendatud Weaviate'ga: {host}:{http_port}")
- return client
- def normalize_int_fields(self, props: Dict[str, Any], int_fields: List[str] = None) -> Dict[str, Any]:
- if int_fields is None:
- int_fields = ["page_start", "page_end", "chunk"]
- for field in int_fields:
- if field in props:
- value = props[field]
- # Kui väärtus on float, Decimal või int-tüüp
- if isinstance(value, float) and value.is_integer():
- props[field] = int(value)
- elif isinstance(value, Decimal):
- # kasuta kas int() või float(), olenevalt kontekstist
- props[field] = int(value) if value % 1 == 0 else float(value)
- return props
- def normalize_doc_hash(self, doc_hash: Any) -> str:
- if isinstance(doc_hash, uuid.UUID):
- return doc_hash.hex
- if isinstance(doc_hash, str) and len(doc_hash) == 36 and "-" in doc_hash:
- return doc_hash.replace("-", "")
- return str(doc_hash)
- def clean_uuid(self, obj: Any) -> Any:
- if isinstance(obj, dict):
- return {k: self.clean_uuid(v) for k, v in obj.items()}
- if isinstance(obj, (list, tuple)):
- return [self.clean_uuid(x) for x in obj]
- if isinstance(obj, uuid.UUID):
- return str(obj)
- if hasattr(obj, "__str__") and obj.__class__.__name__.lower().startswith("uuid"):
- return str(obj)
- return obj
- def process_properties(self, props: Dict[str, Any], int_fields: List[str] = None, hash_fields: List[str] = None) -> Dict[str, Any]:
- if hash_fields is None:
- hash_fields = ["doc_hash"]
- props = self.clean_uuid(props)
- props = self.normalize_int_fields(props, int_fields)
- for field in hash_fields:
- if field in props:
- props[field] = self.normalize_doc_hash(props[field])
- return props
- def export_collection(self, collection_name: str, output_file: Union[str, Path],
- int_fields: List[str] = None, hash_fields: List[str] = None,
- include_vectors: bool = True) -> int:
- if not self.src_client:
- raise ValueError("Source client pole määratud")
-
- logger.info(f"Alustan kollektsiooni '{collection_name}' streaming eksporti...")
- collection = self.src_client.collections.get(collection_name)
- output_path = Path(output_file)
- count = 0
-
- def custom_json_encoder(obj):
- if isinstance(obj, datetime.datetime):
- return obj.isoformat()
- return str(obj)
-
- # Kirjuta otse faili, mitte mällu
- with open(output_path, "w", encoding="utf-8") as f:
- f.write("[\n") # alusta JSON array
- first = True
-
- for item in collection.iterator(include_vector=include_vectors):
- props = self.process_properties(dict(item.properties),
- int_fields=int_fields,
- hash_fields=hash_fields)
- export_obj = {
- 'uuid': str(item.uuid),
- 'properties': props,
- }
-
- if include_vectors:
- export_obj['vector'] = item.vector
-
- # Kirjuta objekt otse faili
- if not first:
- f.write(",\n")
- json.dump(export_obj, f, ensure_ascii=False, default=custom_json_encoder)
- first = False
- count += 1
-
- # Progress log iga 1000 objekti järel
- if count % 1000 == 0:
- logger.info(f"Eksporditud: {count} objekti...")
-
- f.write("\n]") # lõpeta JSON array
-
- logger.info(f"Eksport valmis: {count} objekti")
- return count
-
- def clean_decimals(self, obj: Any) -> Any:
- '''Teisenda kõik Decimal objektid float-ideks'''
- if isinstance(obj, Decimal):
- return float(obj)
- if isinstance(obj, dict):
- return {k: self.clean_decimals(v) for k, v in obj.items()}
- if isinstance(obj, list):
- return [self.clean_decimals(item) for item in obj]
- return obj
- def import_collection(self, collection_name: str, input_file: Union[str, Path],
- int_fields: List[str] = None, batch_size: int = 100,
- recreate_collection: bool = False) -> int:
- if not self.dst_client:
- raise ValueError("Destination client pole määratud")
-
- logger.info(f"Alustan kollektsiooni '{collection_name}' streaming importi...")
- collection = self.dst_client.collections.get(collection_name)
- input_path = Path(input_file)
-
- imported_count = 0
- batch = []
-
- # ijson.items loeb faili osade kaupa, mitte kogu faili mällu
- with open(input_path, 'rb') as f:
- # ✅ PARANDUS 1: Lisa use_decimal=False
- for obj in ijson.items(f, 'item', use_float=True):
- try:
- props = obj["properties"]
- props = self.clean_decimals(props)
- if int_fields:
- props = self.normalize_int_fields(props, int_fields)
-
- # ✅ PARANDUS 2: clean_decimals ka vectorile
- vector = obj.get("vector")
- if vector is not None:
- vector = self.clean_decimals(vector)
-
- batch.append({
- 'uuid': str(obj["uuid"]),
- 'properties': props,
- 'vector': vector
- })
-
- # Kui batch täis, importi
- if len(batch) >= batch_size:
- self._import_batch(collection, batch)
- imported_count += len(batch)
- logger.info(f"Imporditud: {imported_count} objekti...")
- batch = []
-
- except Exception as e:
- logger.warning(f"Import error: {e}")
-
- # Importi viimane batch
- if batch:
- self._import_batch(collection, batch)
- imported_count += len(batch)
-
- logger.info(f"Import lõpetatud: {imported_count} objekti")
- return imported_count
- def _import_batch(self, collection, batch):
- '''Batch import helper'''
- for item in batch:
- try:
- collection.data.insert(
- properties=item['properties'],
- uuid=item['uuid'],
- vector=item.get('vector')
- )
- except Exception as e:
- if "already exists" not in str(e):
- logger.warning(f"Insert error: {e}")
- def close_clients(self):
- if self.src_client:
- self.src_client.close()
- logger.info("Source client suletud")
- if self.dst_client:
- self.dst_client.close()
- logger.info("Destination client suletud")
|