sync_weaviate.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. # file: src/sync_weaviate.py
  2. import os
  3. import json
  4. from pathlib import Path
  5. from typing import Any, Dict, List, Optional
  6. from datetime import datetime
  7. from dotenv import load_dotenv
  8. import psycopg2
  9. from psycopg2.extras import execute_values
  10. from psycopg2 import sql
  11. from uuid import UUID
  12. from weaviate import WeaviateClient
  13. from weaviate.connect import ConnectionParams
  14. load_dotenv()
  15. DB_HOST = os.getenv("DB_HOST", "localhost")
  16. DB_PORT = os.getenv("DB_PORT", "5432")
  17. DB_NAME = os.getenv("DB_NAME", "pdf_research")
  18. DB_USER = os.getenv("DB_USER", "pdf_user")
  19. DB_PASSWORD = os.getenv("DB_PASSWORD")
  20. WEAVIATE_HOST = os.getenv("WEAVIATE_HOST", "localhost")
  21. WEAVIATE_HTTP_PORT = int(os.getenv("WEAVIATE_HTTP_PORT", "8080"))
  22. WEAVIATE_CLASS = os.getenv("WEAVIATE_CLASS", "ScientificArticle")
  23. def get_db_conn():
  24. return psycopg2.connect(
  25. host=DB_HOST,
  26. port=DB_PORT,
  27. database=DB_NAME,
  28. user=DB_USER,
  29. password=DB_PASSWORD,
  30. )
  31. def get_weaviate_client() -> WeaviateClient:
  32. """Weaviate v4 klient - HTTP ainult"""
  33. client = WeaviateClient(
  34. connection_params=ConnectionParams.from_params(
  35. http_host=WEAVIATE_HOST,
  36. http_port=WEAVIATE_HTTP_PORT,
  37. http_secure=False,
  38. grpc_host=WEAVIATE_HOST,
  39. grpc_port=50051,
  40. grpc_secure=False,
  41. )
  42. )
  43. client.connect()
  44. print(f"✓ Ühendatud Weaviate'ga: {WEAVIATE_HOST}:{WEAVIATE_HTTP_PORT}")
  45. return client
  46. def clean_nul_bytes(value):
  47. """Eemalda NULL-baidid kõikidest väärtustest"""
  48. if isinstance(value, str):
  49. return value.replace("\x00", "").replace("\x01", "")
  50. elif isinstance(value, dict):
  51. return {k: clean_nul_bytes(v) for k, v in value.items()}
  52. elif isinstance(value, list):
  53. return [clean_nul_bytes(item) for item in value]
  54. return value
  55. def fetch_weaviate_articles() -> List[Dict[str, Any]]:
  56. """Loeb kõik ScientificArticle objektid Weaviate'st koos metaandmetega."""
  57. print(f"Fetching articles from Weaviate (klass: {WEAVIATE_CLASS})...")
  58. client = get_weaviate_client()
  59. try:
  60. collection = client.collections.get(WEAVIATE_CLASS)
  61. results = collection.query.fetch_objects(
  62. limit=10000,
  63. return_properties=[
  64. "source_file",
  65. "title",
  66. "doi",
  67. "journal",
  68. "year",
  69. "authors",
  70. "key_concepts",
  71. "methods_used",
  72. "summary_et",
  73. "abstract_en",
  74. "transport_context",
  75. "relevance_score",
  76. "processing_date",
  77. "file_hash",
  78. ],
  79. )
  80. print(f"✓ Leitud {len(results.objects)} artiklit Weaviate'st")
  81. articles: List[Dict[str, Any]] = []
  82. for obj in results.objects:
  83. props = obj.properties or {}
  84. source_file = props.get("source_file", "")
  85. source_file = source_file.strip()
  86. # Normalize path: eemalda ./data/pdfs/ ja data/pdfs/
  87. if source_file.startswith("./data/pdfs/"):
  88. filename = source_file.replace("./data/pdfs/", "")
  89. elif source_file.startswith("data/pdfs/"):
  90. filename = source_file.replace("data/pdfs/", "")
  91. else:
  92. filename = Path(source_file).name
  93. if not source_file:
  94. continue
  95. article = {
  96. "weaviate_id": str(obj.uuid),
  97. "source_file": source_file,
  98. "filename": filename,
  99. "title": props.get("title"),
  100. "doi": props.get("doi"),
  101. "journal": props.get("journal"),
  102. "year": props.get("year"),
  103. "authors": props.get("authors") or [],
  104. "key_concepts": props.get("key_concepts") or [],
  105. "methods_used": props.get("methods_used") or [],
  106. "summary_et": props.get("summary_et"),
  107. "abstract_en": props.get("abstract_en"),
  108. "transport_context": props.get("transport_context"),
  109. "relevance_score": props.get("relevance_score"),
  110. "processing_date": props.get("processing_date"),
  111. "file_hash": clean_nul_bytes(str(props.get("file_hash") if props.get("file_hash") else "")),
  112. }
  113. articles.append(article)
  114. return articles
  115. finally:
  116. client.close()
  117. def sync_weaviate_to_postgres():
  118. """
  119. Sünkib Weaviate ScientificArticle metaandmed raw_documents tabeliga.
  120. MATCHING STRATEEGIA (PRIORITEET JÄRJEKORRAS):
  121. 1. Filehash match (kõige usaldusväärne)
  122. 2. Filename match (fallback, kuid võib olla false positive)
  123. - match: raw_documents.filename == Path(source_file).name
  124. - uuendab: weaviate_article_id, title, doi, journal, year, authors, key_concepts,
  125. methods_used, summary_et, abstract_en, transport_context, relevance_score, processing_date
  126. """
  127. print("Connecting to PostgreSQL...")
  128. conn = get_db_conn()
  129. cur = conn.cursor()
  130. # Uus osa
  131. print("Loading PostgreSQL hash index...")
  132. cur.execute("SELECT id, filename, file_hash FROM raw_documents WHERE file_hash IS NOT NULL")
  133. postgres_by_hash = {}
  134. for row in cur.fetchall():
  135. doc_id, filename, file_hash = row
  136. if file_hash:
  137. postgres_by_hash[file_hash] = (doc_id, filename)
  138. #print(f"file_hash: {file_hash}")
  139. print(f" Loaded {len(postgres_by_hash)} documents with hashes\n")
  140. synced = 0
  141. not_found_hash = 0
  142. not_found_filename = 0
  143. errors = 0
  144. stats = {
  145. "hash_matches": 0,
  146. "filename_matches": 0,
  147. "filename_fallback": 0,
  148. }
  149. # Uue osa lõpp
  150. articles = fetch_weaviate_articles()
  151. #print(f"Alustan sünkroniseerimist {len(articles)} artikliga...")
  152. synced = 0
  153. not_found = 0
  154. errors = 0
  155. for art in articles:
  156. try:
  157. filename = art["filename"]
  158. weaviate_id = art["weaviate_id"]
  159. source_file = art["source_file"]
  160. #print(f"Testimine: {filename} - {weaviate_uuid} - {source_file}")
  161. weaviate_hash = art.get("file_hash", "")
  162. #print(f"Testimine weaviate_hash: {weaviate_hash}, weaviate_id: {weaviate_id} ")
  163. raw_doc_id = None
  164. match_strategy = None
  165. # STRATEEGIA 1: Filehash match (esmane)
  166. if weaviate_hash:
  167. result = postgres_by_hash.get(weaviate_hash)
  168. if result:
  169. raw_doc_id, pg_filename = result
  170. match_strategy = "HASH"
  171. stats["hash_matches"] += 1
  172. print(f"✅ [HASH] {filename}")
  173. else:
  174. not_found_hash += 1
  175. # STRATEEGIA 2: Filename match (fallback)
  176. if not raw_doc_id:
  177. cur.execute(
  178. "SELECT id FROM raw_documents WHERE filename = %s",
  179. (filename,)
  180. )
  181. row = cur.fetchone()
  182. if row and len(row) > 0:
  183. raw_doc_id = str(row[0])
  184. #print(f"Testimine raw_doc_id: {raw_doc_id}")
  185. match_strategy = "FILENAME"
  186. stats["filename_matches"] += 1
  187. if not weaviate_hash:
  188. stats["filename_fallback"] += 1
  189. print(f"⚠️ [FALLBACK] {filename} (filehash puudub)")
  190. else:
  191. print(f"⚠️ [FALLBACK] {filename} (filehash: {weaviate_hash[:16]}...)")
  192. elif row:
  193. print(f"⚠️ Empty tuple returned for {filename}")
  194. not_found_filename += 1
  195. continue
  196. else:
  197. not_found_filename += 1
  198. print(f"❌ [NOT FOUND] {filename}")
  199. continue
  200. # UPDATE raw_documents
  201. title = clean_nul_bytes(art.get("title", ""))
  202. doi = clean_nul_bytes(art.get("doi", ""))
  203. journal = clean_nul_bytes(art.get("journal", ""))
  204. year = clean_nul_bytes(art.get("year", ""))
  205. #transport_context = clean_nul_bytes(art.get("transport_context", ""))
  206. relevance_score = clean_nul_bytes(art.get("relevance_score", ""))
  207. #processing_date = clean_nul_bytes(art.get("processing_date", ""))
  208. summary_et = clean_nul_bytes(art.get("summary_et", ""))
  209. abstract_en = clean_nul_bytes(art.get("abstract_en", ""))
  210. authors = clean_nul_bytes(art.get("authors", ""))
  211. key_concepts = clean_nul_bytes(art.get("key_concepts", ""))
  212. methods_used = clean_nul_bytes(art.get("methods_used", ""))
  213. # Transport context JSON
  214. transport_context_json = None
  215. if art.get("transport_context"):
  216. try:
  217. if isinstance(art["transport_context"], str):
  218. transport_context_json = json.loads(art["transport_context"])
  219. else:
  220. transport_context_json = art["transport_context"]
  221. except (json.JSONDecodeError, TypeError):
  222. transport_context_json = None
  223. if transport_context_json:
  224. transport_context = json.dumps(transport_context_json)
  225. else:
  226. transport_context = None
  227. # Processing date
  228. processing_date = None
  229. if art.get("processing_date"):
  230. try:
  231. if isinstance(art["processing_date"], str):
  232. processing_date = datetime.fromisoformat(art["processing_date"])
  233. else:
  234. processing_date = art["processing_date"]
  235. except (ValueError, TypeError):
  236. processing_date = None
  237. #print(f"1.weaviate_id {weaviate_id}")
  238. #print(f"2.title {title}")
  239. #print(f"3.doi {doi}")
  240. #print(f"4.journal {journal}")
  241. #print(f"5.year {year}")
  242. #print(f"6.authors {authors}")
  243. #print(f"7.key_concepts {key_concepts}")
  244. #print(f"8.methods_used {methods_used}")
  245. #print(f"9.summary_et {summary_et}")
  246. #print(f"10.transport_context {transport_context_json}")
  247. #print(f"11.relevance_score {relevance_score}")
  248. #print(f"12.processing_date {processing_date}")
  249. # Uuenda väljad - NULL handling on õige nüüd
  250. cur.execute(
  251. """
  252. UPDATE raw_documents
  253. SET
  254. weaviate_article_id = %s,
  255. title = COALESCE(%s, title),
  256. doi = COALESCE(%s, doi),
  257. journal = COALESCE(%s, journal),
  258. year = COALESCE(%s, year),
  259. authors = COALESCE(%s, authors),
  260. key_concepts = COALESCE(%s, key_concepts),
  261. methods_used = COALESCE(%s, methods_used),
  262. summary_et = COALESCE(%s, summary_et),
  263. abstract_en = COALESCE(%s, abstract_en),
  264. transport_context = COALESCE(%s::jsonb, transport_context),
  265. relevance_score = COALESCE(%s, relevance_score),
  266. processing_date = COALESCE(%s, processing_date)
  267. WHERE id = %s
  268. """,
  269. (
  270. weaviate_id,
  271. title,
  272. title,
  273. art.get("journal"),
  274. art.get("year"),
  275. authors,
  276. key_concepts,
  277. methods_used,
  278. summary_et,
  279. abstract_en,
  280. transport_context,
  281. art.get("relevance_score"),
  282. processing_date,
  283. raw_doc_id
  284. ),
  285. )
  286. synced += 1
  287. print(f"✓ {art['title'][:60]}... (filename={filename}, pg_id={raw_doc_id})")
  288. except Exception as e:
  289. errors += 1
  290. print(f"❌ Viga {art.get('title', 'N/A')} (file={art.get('filename')}): {e}")
  291. # ✅ OLULINE: Rollback selle vea jaoks!
  292. conn.rollback()
  293. # Ühenda uuesti
  294. cur.close()
  295. conn.close()
  296. conn = get_db_conn()
  297. cur = conn.cursor()
  298. conn.commit()
  299. cur.close()
  300. conn.close()
  301. print(f"\n✓ Kokkuvõte: {synced} sünkroniseeritud, {not_found} ei leitud, {errors} viga")
  302. if __name__ == "__main__":
  303. sync_weaviate_to_postgres()
  304. # Lihtne debug: prindi olemasolevad Weaviate artiklid
  305. articles = fetch_weaviate_articles()
  306. print(f"Leitud {len(articles)} artiklit Weaviate'st")
  307. #for art in articles:
  308. # print(
  309. # f"WeaviateID={art['weaviate_id']} "
  310. # f"filename={art['filename']} "
  311. # f"doi={art.get('doi')} "
  312. # f"title={art.get('title')[:80] if art.get('title') else ''}"
  313. # )