# file: src/sync_weaviate.py import os import json from pathlib import Path from typing import Any, Dict, List, Optional from datetime import datetime from dotenv import load_dotenv import psycopg2 from psycopg2.extras import execute_values from psycopg2 import sql from uuid import UUID from weaviate import WeaviateClient from weaviate.connect import ConnectionParams load_dotenv() DB_HOST = os.getenv("DB_HOST", "localhost") DB_PORT = os.getenv("DB_PORT", "5432") DB_NAME = os.getenv("DB_NAME", "pdf_research") DB_USER = os.getenv("DB_USER", "pdf_user") DB_PASSWORD = os.getenv("DB_PASSWORD") WEAVIATE_HOST = os.getenv("WEAVIATE_HOST", "localhost") WEAVIATE_HTTP_PORT = int(os.getenv("WEAVIATE_HTTP_PORT", "8080")) WEAVIATE_CLASS = os.getenv("WEAVIATE_CLASS", "ScientificArticle") def get_db_conn(): return psycopg2.connect( host=DB_HOST, port=DB_PORT, database=DB_NAME, user=DB_USER, password=DB_PASSWORD, ) def get_weaviate_client() -> WeaviateClient: """Weaviate v4 klient - HTTP ainult""" client = WeaviateClient( connection_params=ConnectionParams.from_params( http_host=WEAVIATE_HOST, http_port=WEAVIATE_HTTP_PORT, http_secure=False, grpc_host=WEAVIATE_HOST, grpc_port=50051, grpc_secure=False, ) ) client.connect() print(f"✓ Ühendatud Weaviate'ga: {WEAVIATE_HOST}:{WEAVIATE_HTTP_PORT}") return client def clean_nul_bytes(value): """Eemalda NULL-baidid kõikidest väärtustest""" if isinstance(value, str): return value.replace("\x00", "").replace("\x01", "") elif isinstance(value, dict): return {k: clean_nul_bytes(v) for k, v in value.items()} elif isinstance(value, list): return [clean_nul_bytes(item) for item in value] return value def fetch_weaviate_articles() -> List[Dict[str, Any]]: """Loeb kõik ScientificArticle objektid Weaviate'st koos metaandmetega.""" print(f"Fetching articles from Weaviate (klass: {WEAVIATE_CLASS})...") client = get_weaviate_client() try: collection = client.collections.get(WEAVIATE_CLASS) results = collection.query.fetch_objects( limit=10000, return_properties=[ "source_file", "title", "doi", "journal", "year", "authors", "key_concepts", "methods_used", "summary_et", "abstract_en", "transport_context", "relevance_score", "processing_date", "file_hash", ], ) print(f"✓ Leitud {len(results.objects)} artiklit Weaviate'st") articles: List[Dict[str, Any]] = [] for obj in results.objects: props = obj.properties or {} source_file = props.get("source_file", "") source_file = source_file.strip() # Normalize path: eemalda ./data/pdfs/ ja data/pdfs/ if source_file.startswith("./data/pdfs/"): filename = source_file.replace("./data/pdfs/", "") elif source_file.startswith("data/pdfs/"): filename = source_file.replace("data/pdfs/", "") else: filename = Path(source_file).name if not source_file: continue article = { "weaviate_id": str(obj.uuid), "source_file": source_file, "filename": filename, "title": props.get("title"), "doi": props.get("doi"), "journal": props.get("journal"), "year": props.get("year"), "authors": props.get("authors") or [], "key_concepts": props.get("key_concepts") or [], "methods_used": props.get("methods_used") or [], "summary_et": props.get("summary_et"), "abstract_en": props.get("abstract_en"), "transport_context": props.get("transport_context"), "relevance_score": props.get("relevance_score"), "processing_date": props.get("processing_date"), "file_hash": clean_nul_bytes(str(props.get("file_hash") if props.get("file_hash") else "")), } articles.append(article) return articles finally: client.close() def sync_weaviate_to_postgres(): """ Sünkib Weaviate ScientificArticle metaandmed raw_documents tabeliga. MATCHING STRATEEGIA (PRIORITEET JÄRJEKORRAS): 1. Filehash match (kõige usaldusväärne) 2. Filename match (fallback, kuid võib olla false positive) - match: raw_documents.filename == Path(source_file).name - uuendab: weaviate_article_id, title, doi, journal, year, authors, key_concepts, methods_used, summary_et, abstract_en, transport_context, relevance_score, processing_date """ print("Connecting to PostgreSQL...") conn = get_db_conn() cur = conn.cursor() # Uus osa print("Loading PostgreSQL hash index...") cur.execute("SELECT id, filename, file_hash FROM raw_documents WHERE file_hash IS NOT NULL") postgres_by_hash = {} for row in cur.fetchall(): doc_id, filename, file_hash = row if file_hash: postgres_by_hash[file_hash] = (doc_id, filename) #print(f"file_hash: {file_hash}") print(f" Loaded {len(postgres_by_hash)} documents with hashes\n") synced = 0 not_found_hash = 0 not_found_filename = 0 errors = 0 stats = { "hash_matches": 0, "filename_matches": 0, "filename_fallback": 0, } # Uue osa lõpp articles = fetch_weaviate_articles() #print(f"Alustan sünkroniseerimist {len(articles)} artikliga...") synced = 0 not_found = 0 errors = 0 for art in articles: try: filename = art["filename"] weaviate_id = art["weaviate_id"] source_file = art["source_file"] #print(f"Testimine: {filename} - {weaviate_uuid} - {source_file}") weaviate_hash = art.get("file_hash", "") #print(f"Testimine weaviate_hash: {weaviate_hash}, weaviate_id: {weaviate_id} ") raw_doc_id = None match_strategy = None # STRATEEGIA 1: Filehash match (esmane) if weaviate_hash: result = postgres_by_hash.get(weaviate_hash) if result: raw_doc_id, pg_filename = result match_strategy = "HASH" stats["hash_matches"] += 1 print(f"✅ [HASH] {filename}") else: not_found_hash += 1 # STRATEEGIA 2: Filename match (fallback) if not raw_doc_id: cur.execute( "SELECT id FROM raw_documents WHERE filename = %s", (filename,) ) row = cur.fetchone() if row and len(row) > 0: raw_doc_id = str(row[0]) #print(f"Testimine raw_doc_id: {raw_doc_id}") match_strategy = "FILENAME" stats["filename_matches"] += 1 if not weaviate_hash: stats["filename_fallback"] += 1 print(f"⚠️ [FALLBACK] {filename} (filehash puudub)") else: print(f"⚠️ [FALLBACK] {filename} (filehash: {weaviate_hash[:16]}...)") elif row: print(f"⚠️ Empty tuple returned for {filename}") not_found_filename += 1 continue else: not_found_filename += 1 print(f"❌ [NOT FOUND] {filename}") continue # UPDATE raw_documents title = clean_nul_bytes(art.get("title", "")) doi = clean_nul_bytes(art.get("doi", "")) journal = clean_nul_bytes(art.get("journal", "")) year = clean_nul_bytes(art.get("year", "")) #transport_context = clean_nul_bytes(art.get("transport_context", "")) relevance_score = clean_nul_bytes(art.get("relevance_score", "")) #processing_date = clean_nul_bytes(art.get("processing_date", "")) summary_et = clean_nul_bytes(art.get("summary_et", "")) abstract_en = clean_nul_bytes(art.get("abstract_en", "")) authors = clean_nul_bytes(art.get("authors", "")) key_concepts = clean_nul_bytes(art.get("key_concepts", "")) methods_used = clean_nul_bytes(art.get("methods_used", "")) # Transport context JSON transport_context_json = None if art.get("transport_context"): try: if isinstance(art["transport_context"], str): transport_context_json = json.loads(art["transport_context"]) else: transport_context_json = art["transport_context"] except (json.JSONDecodeError, TypeError): transport_context_json = None if transport_context_json: transport_context = json.dumps(transport_context_json) else: transport_context = None # Processing date processing_date = None if art.get("processing_date"): try: if isinstance(art["processing_date"], str): processing_date = datetime.fromisoformat(art["processing_date"]) else: processing_date = art["processing_date"] except (ValueError, TypeError): processing_date = None #print(f"1.weaviate_id {weaviate_id}") #print(f"2.title {title}") #print(f"3.doi {doi}") #print(f"4.journal {journal}") #print(f"5.year {year}") #print(f"6.authors {authors}") #print(f"7.key_concepts {key_concepts}") #print(f"8.methods_used {methods_used}") #print(f"9.summary_et {summary_et}") #print(f"10.transport_context {transport_context_json}") #print(f"11.relevance_score {relevance_score}") #print(f"12.processing_date {processing_date}") # Uuenda väljad - NULL handling on õige nüüd cur.execute( """ UPDATE raw_documents SET weaviate_article_id = %s, title = COALESCE(%s, title), doi = COALESCE(%s, doi), journal = COALESCE(%s, journal), year = COALESCE(%s, year), authors = COALESCE(%s, authors), key_concepts = COALESCE(%s, key_concepts), methods_used = COALESCE(%s, methods_used), summary_et = COALESCE(%s, summary_et), abstract_en = COALESCE(%s, abstract_en), transport_context = COALESCE(%s::jsonb, transport_context), relevance_score = COALESCE(%s, relevance_score), processing_date = COALESCE(%s, processing_date) WHERE id = %s """, ( weaviate_id, title, title, art.get("journal"), art.get("year"), authors, key_concepts, methods_used, summary_et, abstract_en, transport_context, art.get("relevance_score"), processing_date, raw_doc_id ), ) synced += 1 print(f"✓ {art['title'][:60]}... (filename={filename}, pg_id={raw_doc_id})") except Exception as e: errors += 1 print(f"❌ Viga {art.get('title', 'N/A')} (file={art.get('filename')}): {e}") # ✅ OLULINE: Rollback selle vea jaoks! conn.rollback() # Ühenda uuesti cur.close() conn.close() conn = get_db_conn() cur = conn.cursor() conn.commit() cur.close() conn.close() print(f"\n✓ Kokkuvõte: {synced} sünkroniseeritud, {not_found} ei leitud, {errors} viga") if __name__ == "__main__": sync_weaviate_to_postgres() # Lihtne debug: prindi olemasolevad Weaviate artiklid articles = fetch_weaviate_articles() print(f"Leitud {len(articles)} artiklit Weaviate'st") #for art in articles: # print( # f"WeaviateID={art['weaviate_id']} " # f"filename={art['filename']} " # f"doi={art.get('doi')} " # f"title={art.get('title')[:80] if art.get('title') else ''}" # )