Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
31c5a64
feat: add config and operator node types
ChenZiHong-Gavin Dec 3, 2025
8bcbe51
refactor: refactor readers with ray data
ChenZiHong-Gavin Dec 3, 2025
246348f
fix: delete param parallelism for readers
ChenZiHong-Gavin Dec 3, 2025
319e1e7
fix: fix import error
ChenZiHong-Gavin Dec 3, 2025
42fcb09
refactor read and chunk operators with no side effects
ChenZiHong-Gavin Dec 4, 2025
b458e48
fix: fix import error
ChenZiHong-Gavin Dec 4, 2025
95c4783
fix: fix return logic
ChenZiHong-Gavin Dec 4, 2025
c844d65
refactor: rename operator split to chunk
ChenZiHong-Gavin Dec 4, 2025
c447936
refactor: refactor build_kg to accomodate ray data
ChenZiHong-Gavin Dec 4, 2025
3edbb81
feat: add StorageFactory & global params
ChenZiHong-Gavin Dec 4, 2025
ee0639d
refactor: refactor quiz to accomodata ray data engine
ChenZiHong-Gavin Dec 5, 2025
157f0b0
fix: reload graph before quizzing
ChenZiHong-Gavin Dec 5, 2025
99a6e5f
Merge branch 'main' of https://github.com/open-sciencelab/GraphGen in…
ChenZiHong-Gavin Dec 5, 2025
ec2033b
Potential fix for pull request finding 'Unreachable code'
ChenZiHong-Gavin Dec 5, 2025
bc07222
fix: fix quiz params
ChenZiHong-Gavin Dec 5, 2025
c9435d7
refactor: refactor quiz&judge to ray actors
ChenZiHong-Gavin Dec 10, 2025
c55fc09
Merge branch 'refactor/refactor-with-ray-data' of https://github.com/…
ChenZiHong-Gavin Dec 10, 2025
d7d6c2a
fix: fix transferring quizzed data to JudgeService
ChenZiHong-Gavin Dec 10, 2025
a6aedaf
refactor: refactor partition to accomodate ray data
ChenZiHong-Gavin Dec 10, 2025
ea1603b
fix: fix lint problem
ChenZiHong-Gavin Dec 10, 2025
244deb4
refactor: refactor op generate
ChenZiHong-Gavin Dec 11, 2025
d460a2a
feat: write results in output folder
ChenZiHong-Gavin Dec 11, 2025
cd011ad
fix: raise error when no dataset is created
ChenZiHong-Gavin Dec 11, 2025
aab7438
fix: return generator in ece_partitioner
ChenZiHong-Gavin Dec 11, 2025
7643b9f
fix: return generator in ece_partitioner
ChenZiHong-Gavin Dec 11, 2025
c42b604
refactor: refactor data format to support multi-modal input
ChenZiHong-Gavin Dec 11, 2025
42dc73e
fix: delete fetching schema to avoid ray's duplicate execution
ChenZiHong-Gavin Dec 11, 2025
73f70a5
fix: fix operators' registry
ChenZiHong-Gavin Dec 11, 2025
37cbfcf
feat: refactor schema_guided_extraction & add examples
ChenZiHong-Gavin Dec 11, 2025
b400d2e
feat: seperate ray logs and service logs
ChenZiHong-Gavin Dec 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion graphgen/bases/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
StorageNameSpace,
)
from .base_tokenizer import BaseTokenizer
from .datatypes import Chunk, QAPair, Token
from .datatypes import Chunk, Config, Node, QAPair, Token
95 changes: 55 additions & 40 deletions graphgen/bases/base_reader.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import os
from abc import ABC, abstractmethod
from typing import Any, Dict, List
from typing import Any, Dict, List, Union

import pandas as pd
import requests
from ray.data import Dataset


class BaseReader(ABC):
Expand All @@ -14,52 +16,65 @@ def __init__(self, text_column: str = "content"):
self.text_column = text_column

@abstractmethod
def read(self, file_path: str) -> List[Dict[str, Any]]:
def read(self, input_path: Union[str, List[str]]) -> Dataset:
"""
Read data from the specified file path.

:param file_path: Path to the input file.
:return: List of dictionaries containing the data.
:param input_path: Path to the input file or list of file paths.
:return: Ray Dataset containing the read data.
"""

@staticmethod
def filter(data: List[dict]) -> List[dict]:
def _should_keep_item(self, item: Dict[str, Any]) -> bool:
"""
Determine whether to keep the given item based on the text column.

:param item: Dictionary representing a data entry.
:return: True if the item should be kept, False otherwise.
"""
Filter out entries with empty or missing text in the specified column.
item_type = item.get("type")
assert item_type in [
"text",
"image",
"table",
"equation",
"protein",
], f"Unsupported item type: {item_type}"
if item_type == "text":
content = item.get(self.text_column, "").strip()
return bool(content)
return True

:param data: List of dictionaries containing the data.
:return: Filtered list of dictionaries.
def _validate_batch(self, batch: pd.DataFrame) -> pd.DataFrame:
"""
Validate data format.
"""
if "type" not in batch.columns:
raise ValueError(f"Missing 'type' column. Found: {list(batch.columns)}")

def _image_exists(path_or_url: str, timeout: int = 3) -> bool:
"""
Check if an image exists at the given local path or URL.
:param path_or_url: Local file path or remote URL of the image.
:param timeout: Timeout for remote URL requests in seconds.
:return: True if the image exists, False otherwise.
"""
if not path_or_url:
return False
if not path_or_url.startswith(("http://", "https://", "ftp://")):
path = path_or_url.replace("file://", "", 1)
path = os.path.abspath(path)
return os.path.isfile(path)
try:
resp = requests.head(path_or_url, allow_redirects=True, timeout=timeout)
return resp.status_code == 200
except requests.RequestException:
return False
if "text" in batch["type"].values:
if self.text_column not in batch.columns:
raise ValueError(
f"Missing '{self.text_column}' column for text documents"
)

filtered_data = []
for item in data:
if item.get("type") == "text":
content = item.get("content", "").strip()
if content:
filtered_data.append(item)
elif item.get("type") in ("image", "table", "equation"):
img_path = item.get("img_path")
if _image_exists(img_path):
filtered_data.append(item)
else:
filtered_data.append(item)
return filtered_data
return batch

@staticmethod
def _image_exists(path_or_url: str, timeout: int = 3) -> bool:
"""
Check if an image exists at the given local path or URL.
:param path_or_url: Local file path or remote URL of the image.
:param timeout: Timeout for remote URL requests in seconds.
:return: True if the image exists, False otherwise.
"""
if not path_or_url:
return False
if not path_or_url.startswith(("http://", "https://", "ftp://")):
path = path_or_url.replace("file://", "", 1)
path = os.path.abspath(path)
return os.path.isfile(path)
try:
resp = requests.head(path_or_url, allow_redirects=True, timeout=timeout)
return resp.status_code == 200
except requests.RequestException:
return False
37 changes: 37 additions & 0 deletions graphgen/bases/datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from dataclasses import dataclass, field
from typing import List, Union

from pydantic import BaseModel, Field, field_validator


@dataclass
class Chunk:
Expand Down Expand Up @@ -48,3 +50,38 @@ class Community:
nodes: List[str] = field(default_factory=list)
edges: List[tuple] = field(default_factory=list)
metadata: dict = field(default_factory=dict)


class Node(BaseModel):
id: str = Field(..., description="unique node id")
op_name: str = Field(..., description="operator name")
type: str = Field(
..., description="task type, e.g., map, filter, flatmap, aggregate, map_batch"
)
params: dict = Field(default_factory=dict, description="operator parameters")
dependencies: List[str] = Field(
default_factory=list, description="list of dependent node ids"
)

@classmethod
@field_validator("type")
def validate_type(cls, v: str) -> str:
valid_types = {"map", "filter", "flatmap", "aggregate", "map_batch"}
if v not in valid_types:
raise ValueError(f"Invalid node type: {v}. Must be one of {valid_types}.")
return v


class Config(BaseModel):
nodes: List[Node] = Field(
..., min_length=1, description="list of nodes in the computation graph"
)

@classmethod
@field_validator("nodes")
def validate_unique_ids(cls, v: List[Node]) -> List[Node]:
ids = [node.id for node in v]
if len(ids) != len(set(ids)):
duplicates = {id_ for id_ in ids if ids.count(id_) > 1}
raise ValueError(f"Duplicate node ids found: {duplicates}")
return v
1 change: 0 additions & 1 deletion graphgen/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
)
from .reader import (
CSVReader,
JSONLReader,
JSONReader,
ParquetReader,
PDFReader,
Expand Down
1 change: 0 additions & 1 deletion graphgen/models/reader/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from .csv_reader import CSVReader
from .json_reader import JSONReader
from .jsonl_reader import JSONLReader
from .parquet_reader import ParquetReader
from .pdf_reader import PDFReader
from .pickle_reader import PickleReader
Expand Down
25 changes: 14 additions & 11 deletions graphgen/models/reader/csv_reader.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Any, Dict, List
from typing import List, Union

import pandas as pd
import ray
from ray.data import Dataset

from graphgen.bases.base_reader import BaseReader

Expand All @@ -13,13 +14,15 @@ class CSVReader(BaseReader):
- if type is "text", "content" column must be present.
"""

def read(self, file_path: str) -> List[Dict[str, Any]]:
def read(self, input_path: Union[str, List[str]]) -> Dataset:
"""
Read CSV files and return Ray Dataset.

df = pd.read_csv(file_path)
for _, row in df.iterrows():
assert "type" in row, f"Missing 'type' column in document: {row.to_dict()}"
if row["type"] == "text" and self.text_column not in row:
raise ValueError(
f"Missing '{self.text_column}' in document: {row.to_dict()}"
)
return self.filter(df.to_dict(orient="records"))
:param input_path: Path to CSV file or list of CSV files.
:return: Ray Dataset containing validated and filtered data.
"""

ds = ray.data.read_csv(input_path)
ds = ds.map_batches(self._validate_batch, batch_format="pandas")
ds = ds.filter(self._should_keep_item)
return ds
31 changes: 16 additions & 15 deletions graphgen/models/reader/json_reader.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
import json
from typing import Any, Dict, List
from typing import List, Union

import ray
from ray.data import Dataset

from graphgen.bases.base_reader import BaseReader


class JSONReader(BaseReader):
"""
Reader for JSON files.
Reader for JSON and JSONL files.
Columns:
- type: The type of the document (e.g., "text", "image", etc.)
- if type is "text", "content" column must be present.
"""

def read(self, file_path: str) -> List[Dict[str, Any]]:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list):
for doc in data:
assert "type" in doc, f"Missing 'type' in document: {doc}"
if doc.get("type") == "text" and self.text_column not in doc:
raise ValueError(
f"Missing '{self.text_column}' in document: {doc}"
)
return self.filter(data)
raise ValueError("JSON file must contain a list of documents.")
def read(self, input_path: Union[str, List[str]]) -> Dataset:
"""
Read JSON file and return Ray Dataset.
:param input_path: Path to JSON/JSONL file or list of JSON/JSONL files.
:return: Ray Dataset containing validated and filtered data.
"""

ds = ray.data.read_json(input_path)
ds = ds.map_batches(self._validate_batch, batch_format="pandas")
ds = ds.filter(self._should_keep_item)
return ds
30 changes: 0 additions & 30 deletions graphgen/models/reader/jsonl_reader.py

This file was deleted.

26 changes: 16 additions & 10 deletions graphgen/models/reader/parquet_reader.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Any, Dict, List
from typing import List, Union

import pandas as pd
import ray
from ray.data import Dataset

from graphgen.bases.base_reader import BaseReader

Expand All @@ -13,12 +14,17 @@ class ParquetReader(BaseReader):
- if type is "text", "content" column must be present.
"""

def read(self, file_path: str) -> List[Dict[str, Any]]:
df = pd.read_parquet(file_path)
data: List[Dict[str, Any]] = df.to_dict(orient="records")
def read(self, input_path: Union[str, List[str]]) -> Dataset:
"""
Read Parquet files using Ray Data.

for doc in data:
assert "type" in doc, f"Missing 'type' in document: {doc}"
if doc.get("type") == "text" and self.text_column not in doc:
raise ValueError(f"Missing '{self.text_column}' in document: {doc}")
return self.filter(data)
:param input_path: Path to Parquet file or list of Parquet files.
:return: Ray Dataset containing validated documents.
"""
if not ray.is_initialized():
ray.init()

ds = ray.data.read_parquet(input_path)
ds = ds.map_batches(self._validate_batch, batch_format="pandas")
ds = ds.filter(self._should_keep_item)
return ds
Loading
Loading