| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357 |
- # file: src/sync_weaviate.py
- import os
- import json
- from pathlib import Path
- from typing import Any, Dict, List, Optional
- from datetime import datetime
- from dotenv import load_dotenv
- import psycopg2
- from psycopg2.extras import execute_values
- from psycopg2 import sql
- from uuid import UUID
- from weaviate import WeaviateClient
- from weaviate.connect import ConnectionParams
- load_dotenv()
- DB_HOST = os.getenv("DB_HOST", "localhost")
- DB_PORT = os.getenv("DB_PORT", "5432")
- DB_NAME = os.getenv("DB_NAME", "pdf_research")
- DB_USER = os.getenv("DB_USER", "pdf_user")
- DB_PASSWORD = os.getenv("DB_PASSWORD")
- WEAVIATE_HOST = os.getenv("WEAVIATE_HOST", "localhost")
- WEAVIATE_HTTP_PORT = int(os.getenv("WEAVIATE_HTTP_PORT", "8080"))
- WEAVIATE_CLASS = os.getenv("WEAVIATE_CLASS", "ScientificArticle")
- def get_db_conn():
- return psycopg2.connect(
- host=DB_HOST,
- port=DB_PORT,
- database=DB_NAME,
- user=DB_USER,
- password=DB_PASSWORD,
- )
- def get_weaviate_client() -> WeaviateClient:
- """Weaviate v4 klient - HTTP ainult"""
- client = WeaviateClient(
- connection_params=ConnectionParams.from_params(
- http_host=WEAVIATE_HOST,
- http_port=WEAVIATE_HTTP_PORT,
- http_secure=False,
- grpc_host=WEAVIATE_HOST,
- grpc_port=50051,
- grpc_secure=False,
- )
- )
- client.connect()
- print(f"✓ Ühendatud Weaviate'ga: {WEAVIATE_HOST}:{WEAVIATE_HTTP_PORT}")
- return client
- def clean_nul_bytes(value):
- """Eemalda NULL-baidid kõikidest väärtustest"""
- if isinstance(value, str):
- return value.replace("\x00", "").replace("\x01", "")
- elif isinstance(value, dict):
- return {k: clean_nul_bytes(v) for k, v in value.items()}
- elif isinstance(value, list):
- return [clean_nul_bytes(item) for item in value]
- return value
- def fetch_weaviate_articles() -> List[Dict[str, Any]]:
- """Loeb kõik ScientificArticle objektid Weaviate'st koos metaandmetega."""
- print(f"Fetching articles from Weaviate (klass: {WEAVIATE_CLASS})...")
-
- client = get_weaviate_client()
- try:
- collection = client.collections.get(WEAVIATE_CLASS)
-
- results = collection.query.fetch_objects(
- limit=10000,
- return_properties=[
- "source_file",
- "title",
- "doi",
- "journal",
- "year",
- "authors",
- "key_concepts",
- "methods_used",
- "summary_et",
- "abstract_en",
- "transport_context",
- "relevance_score",
- "processing_date",
- "file_hash",
- ],
- )
-
- print(f"✓ Leitud {len(results.objects)} artiklit Weaviate'st")
-
- articles: List[Dict[str, Any]] = []
- for obj in results.objects:
- props = obj.properties or {}
-
- source_file = props.get("source_file", "")
- source_file = source_file.strip()
-
- # Normalize path: eemalda ./data/pdfs/ ja data/pdfs/
- if source_file.startswith("./data/pdfs/"):
- filename = source_file.replace("./data/pdfs/", "")
- elif source_file.startswith("data/pdfs/"):
- filename = source_file.replace("data/pdfs/", "")
- else:
- filename = Path(source_file).name
-
- if not source_file:
- continue
-
- article = {
- "weaviate_id": str(obj.uuid),
- "source_file": source_file,
- "filename": filename,
- "title": props.get("title"),
- "doi": props.get("doi"),
- "journal": props.get("journal"),
- "year": props.get("year"),
- "authors": props.get("authors") or [],
- "key_concepts": props.get("key_concepts") or [],
- "methods_used": props.get("methods_used") or [],
- "summary_et": props.get("summary_et"),
- "abstract_en": props.get("abstract_en"),
- "transport_context": props.get("transport_context"),
- "relevance_score": props.get("relevance_score"),
- "processing_date": props.get("processing_date"),
- "file_hash": clean_nul_bytes(str(props.get("file_hash") if props.get("file_hash") else "")),
- }
-
- articles.append(article)
-
- return articles
-
- finally:
- client.close()
- def sync_weaviate_to_postgres():
- """
- Sünkib Weaviate ScientificArticle metaandmed raw_documents tabeliga.
- MATCHING STRATEEGIA (PRIORITEET JÄRJEKORRAS):
- 1. Filehash match (kõige usaldusväärne)
- 2. Filename match (fallback, kuid võib olla false positive)
- - match: raw_documents.filename == Path(source_file).name
- - uuendab: weaviate_article_id, title, doi, journal, year, authors, key_concepts,
- methods_used, summary_et, abstract_en, transport_context, relevance_score, processing_date
- """
- print("Connecting to PostgreSQL...")
- conn = get_db_conn()
- cur = conn.cursor()
-
- # Uus osa
- print("Loading PostgreSQL hash index...")
- cur.execute("SELECT id, filename, file_hash FROM raw_documents WHERE file_hash IS NOT NULL")
- postgres_by_hash = {}
- for row in cur.fetchall():
- doc_id, filename, file_hash = row
- if file_hash:
- postgres_by_hash[file_hash] = (doc_id, filename)
- #print(f"file_hash: {file_hash}")
- print(f" Loaded {len(postgres_by_hash)} documents with hashes\n")
-
- synced = 0
- not_found_hash = 0
- not_found_filename = 0
- errors = 0
-
- stats = {
- "hash_matches": 0,
- "filename_matches": 0,
- "filename_fallback": 0,
- }
- # Uue osa lõpp
-
- articles = fetch_weaviate_articles()
- #print(f"Alustan sünkroniseerimist {len(articles)} artikliga...")
-
- synced = 0
- not_found = 0
- errors = 0
-
- for art in articles:
- try:
- filename = art["filename"]
- weaviate_id = art["weaviate_id"]
- source_file = art["source_file"]
- #print(f"Testimine: {filename} - {weaviate_uuid} - {source_file}")
- weaviate_hash = art.get("file_hash", "")
- #print(f"Testimine weaviate_hash: {weaviate_hash}, weaviate_id: {weaviate_id} ")
- raw_doc_id = None
- match_strategy = None
-
- # STRATEEGIA 1: Filehash match (esmane)
- if weaviate_hash:
- result = postgres_by_hash.get(weaviate_hash)
- if result:
- raw_doc_id, pg_filename = result
- match_strategy = "HASH"
- stats["hash_matches"] += 1
- print(f"✅ [HASH] {filename}")
- else:
- not_found_hash += 1
-
- # STRATEEGIA 2: Filename match (fallback)
- if not raw_doc_id:
- cur.execute(
- "SELECT id FROM raw_documents WHERE filename = %s",
- (filename,)
- )
- row = cur.fetchone()
- if row and len(row) > 0:
- raw_doc_id = str(row[0])
- #print(f"Testimine raw_doc_id: {raw_doc_id}")
- match_strategy = "FILENAME"
- stats["filename_matches"] += 1
-
- if not weaviate_hash:
- stats["filename_fallback"] += 1
- print(f"⚠️ [FALLBACK] {filename} (filehash puudub)")
- else:
- print(f"⚠️ [FALLBACK] {filename} (filehash: {weaviate_hash[:16]}...)")
- elif row:
- print(f"⚠️ Empty tuple returned for {filename}")
- not_found_filename += 1
- continue
- else:
- not_found_filename += 1
- print(f"❌ [NOT FOUND] {filename}")
- continue
-
- # UPDATE raw_documents
- title = clean_nul_bytes(art.get("title", ""))
- doi = clean_nul_bytes(art.get("doi", ""))
- journal = clean_nul_bytes(art.get("journal", ""))
- year = clean_nul_bytes(art.get("year", ""))
- #transport_context = clean_nul_bytes(art.get("transport_context", ""))
- relevance_score = clean_nul_bytes(art.get("relevance_score", ""))
- #processing_date = clean_nul_bytes(art.get("processing_date", ""))
-
- summary_et = clean_nul_bytes(art.get("summary_et", ""))
- abstract_en = clean_nul_bytes(art.get("abstract_en", ""))
- authors = clean_nul_bytes(art.get("authors", ""))
- key_concepts = clean_nul_bytes(art.get("key_concepts", ""))
- methods_used = clean_nul_bytes(art.get("methods_used", ""))
-
- # Transport context JSON
- transport_context_json = None
- if art.get("transport_context"):
- try:
- if isinstance(art["transport_context"], str):
- transport_context_json = json.loads(art["transport_context"])
- else:
- transport_context_json = art["transport_context"]
- except (json.JSONDecodeError, TypeError):
- transport_context_json = None
-
- if transport_context_json:
- transport_context = json.dumps(transport_context_json)
- else:
- transport_context = None
-
- # Processing date
- processing_date = None
- if art.get("processing_date"):
- try:
- if isinstance(art["processing_date"], str):
- processing_date = datetime.fromisoformat(art["processing_date"])
- else:
- processing_date = art["processing_date"]
- except (ValueError, TypeError):
- processing_date = None
-
- #print(f"1.weaviate_id {weaviate_id}")
- #print(f"2.title {title}")
- #print(f"3.doi {doi}")
- #print(f"4.journal {journal}")
- #print(f"5.year {year}")
- #print(f"6.authors {authors}")
- #print(f"7.key_concepts {key_concepts}")
- #print(f"8.methods_used {methods_used}")
- #print(f"9.summary_et {summary_et}")
- #print(f"10.transport_context {transport_context_json}")
- #print(f"11.relevance_score {relevance_score}")
- #print(f"12.processing_date {processing_date}")
-
- # Uuenda väljad - NULL handling on õige nüüd
- cur.execute(
- """
- UPDATE raw_documents
- SET
- weaviate_article_id = %s,
- title = COALESCE(%s, title),
- doi = COALESCE(%s, doi),
- journal = COALESCE(%s, journal),
- year = COALESCE(%s, year),
- authors = COALESCE(%s, authors),
- key_concepts = COALESCE(%s, key_concepts),
- methods_used = COALESCE(%s, methods_used),
- summary_et = COALESCE(%s, summary_et),
- abstract_en = COALESCE(%s, abstract_en),
- transport_context = COALESCE(%s::jsonb, transport_context),
- relevance_score = COALESCE(%s, relevance_score),
- processing_date = COALESCE(%s, processing_date)
- WHERE id = %s
- """,
- (
- weaviate_id,
- title,
- title,
- art.get("journal"),
- art.get("year"),
- authors,
- key_concepts,
- methods_used,
- summary_et,
- abstract_en,
- transport_context,
- art.get("relevance_score"),
- processing_date,
- raw_doc_id
- ),
- )
-
- synced += 1
- print(f"✓ {art['title'][:60]}... (filename={filename}, pg_id={raw_doc_id})")
-
- except Exception as e:
- errors += 1
- print(f"❌ Viga {art.get('title', 'N/A')} (file={art.get('filename')}): {e}")
- # ✅ OLULINE: Rollback selle vea jaoks!
- conn.rollback()
- # Ühenda uuesti
- cur.close()
- conn.close()
- conn = get_db_conn()
- cur = conn.cursor()
-
- conn.commit()
- cur.close()
- conn.close()
-
- print(f"\n✓ Kokkuvõte: {synced} sünkroniseeritud, {not_found} ei leitud, {errors} viga")
- if __name__ == "__main__":
- sync_weaviate_to_postgres()
- # Lihtne debug: prindi olemasolevad Weaviate artiklid
- articles = fetch_weaviate_articles()
- print(f"Leitud {len(articles)} artiklit Weaviate'st")
- #for art in articles:
- # print(
- # f"WeaviateID={art['weaviate_id']} "
- # f"filename={art['filename']} "
- # f"doi={art.get('doi')} "
- # f"title={art.get('title')[:80] if art.get('title') else ''}"
- # )
|