from __future__ import annotations
# typing imports
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..config import Config
from logging import Logger
from typing import Any
from ..sparql import RequestHandler
from ..enums import EndpointType, DecisionQuery
from ..data_models import Taxonomy
from tqdm import tqdm
from uuid import uuid4
import json, os
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
[docs]
def binarize(taxonomy, labels) -> list[int]:
"""
This function maps labels to a multilabel binazired input.
:return: list of 0 or 1 values based on provided labels
"""
_tmp = {label: 0 for label in taxonomy.get_labels(max_depth=1)}
for k, v in _tmp.items():
if k in labels:
_tmp[k] = 1
return list(_tmp.values())
[docs]
class DatasetBuilder:
"""
The builder class is mainly used to control the creation/loading of datasets.
During the creation of a new dataset, it is possible to tweak behaviour by setting specific values in the config.
You have control over the usage of the predefined train-test split, split size, ... more info can be found in the config module.
In general there are two main approaches to interface/load datasets:
1. Loading dataset from sparql
>>> dataset_builder = DatasetBuilder.from_sparql(...)
>>> train_dataset = dataset_builder.train_dataset
more related information can be found at the classmethod from_sparql
2. Loading dataset from local checkpoint
>>> dataset_builder = DatasetBuilder.from_checkpoint(...)
>>> train_dataset = dataset_builder.train_dataset
more related information can be found at the classmethod from_checkpoint
"""
def __init__(
self,
config: Config,
logger: Logger,
train_dataset: list[dict[str, str]],
test_dataset: list[dict[str, str]],
taxonomy: Taxonomy
):
self.config = config
self.logger = logger
self.train_dataset = train_dataset
self.test_dataset = test_dataset
self.taxonomy = taxonomy
[docs]
def _dump_json(self, file_path: str, dictionary: dict | list[dict[Any, Any]]) -> None:
"""
This function dumps the content from the provided dictionary to the provided filepath.
:param file_path: path where to dump the json
:param dictionary: dictionary that will be saved to json file
:return: Nothing at al
"""
self.logger.debug(f"Dumping content to file: {file_path}")
with open(file_path, "w+") as f:
json.dump(dictionary, f)
[docs]
def create_checkpoint(self, checkpoint_folder: str) -> str:
"""
This function provides functionality to save all relevant information (train- and test dataset + taxonomy).
These checkpoints are a 1-on-1 match for loading when using the from_checkpoint classmethod
Example usage:
>>> dataset_builder = DatasetBuilder(...)
>>> dataset_builder.create_checkpoint(checkpoint_folder="...")
:param checkpoint_folder: folder to save checkpoint to
:return: returns the unique checkpoint subfolder where the artifacts were saved to
"""
checkpoint_sub_folder = os.path.join(checkpoint_folder, uuid4().hex)
os.makedirs(checkpoint_sub_folder)
train_dataset_path = os.path.join(checkpoint_sub_folder, "train_dataset.json")
test_dataset_path = os.path.join(checkpoint_sub_folder, "test_dataset.json")
taxonomy_path = os.path.join(checkpoint_sub_folder, "taxonomy.json")
self._dump_json(train_dataset_path, self.train_dataset)
self._dump_json(test_dataset_path, self.test_dataset)
self._dump_json(taxonomy_path, self.taxonomy.todict(with_children=True))
return checkpoint_sub_folder
[docs]
@classmethod
def from_sparql(
cls,
config: Config,
logger: Logger,
request_handler: RequestHandler,
taxonomy_uri: str,
query_type: DecisionQuery,
do_train_test_split: bool = True,
**kwargs
):
"""
Class method for class initialization from sparql.
When provided with a taxonomy uri, it will create a new dataset based on annotated decisions that can be
found in the sparql database.
Example usage:
>>> annotation = DatasetBuilder.from_sparql(
config = DataModelConfig(),
logger = logging.logger,
request_handler = RequestHandler(...),
taxonomy_uri = "...",
query_type = DecisionQuery.ANNOTATED
)
:param config: the general DataModelConfig
:param logger: logger object that can be used for logs
:param request_handler: the request wrapper used for sparql requests
:param query_type: What type of query will be executed
:param taxonomy_uri: what taxonomy to pull when using annotated dataset query_type
:param do_train_test_split: wheter or not to execute the train test split (not via config, check code for clarity)
:return: an instance of the DatasetBuilder Class
"""
# import this way because of circular imports...
from ..data_models import Decision, TaxonomyTypes
# Loading dataset
dataset = []
decision_query = DecisionQuery.match(config.data_models, query_type).format(
taxonomy_uri=taxonomy_uri
)
logger.debug(f"Decision query: {decision_query}")
# filter only for most recent annotations (done by date)
_memory = dict()
decision_response = request_handler.post2json(decision_query)
for response in decision_response:
decision_uri = response.get("_besluit")
# adding annotation uri to use while pulling the data.
# overwrite with most recent annotation (if 2 of same data take latest)
value = int(response.get("date", 0))
if _memory.get(decision_uri, [0])[0] <= value:
_memory[decision_uri] = [value, response.get("anno", None)]
# Limit the amount of information to process
if limit := os.getenv("DATA_MAX_LIMIT", False):
print(f"Limiting {len(_memory)} to {limit}")
_memory = dict(list(_memory.items())[:int(limit)])
# create cleaned list of decisions and pull all relevant information
annotated_decisions = [
dict(
decision=k,
date=v[0],
annotation=v[1]
) for k, v in _memory.items()
]
logger.info(f"Creating dataset from sparql")
for response in tqdm(annotated_decisions, desc="pulling data from endpoint"):
decision = Decision.from_sparql(
config=config.data_models,
logger=logger,
decision_uri=response.get("decision"),
request_handler=request_handler,
annotation_uri=response.get("annotation", None)
)
try:
dataset.append(decision.train_record)
except AttributeError:
pass # because faulty database records suddenly exist
except Exception:
raise Exception
taxonomy = TaxonomyTypes.from_sparql(
config=config.data_models,
logger=logger,
request_handler=request_handler,
endpoint=EndpointType.TAXONOMY
)
selected_taxonomy = taxonomy.get(taxonomy_uri)
df = pd.DataFrame(dataset)
assert len(df) != 0, "Length of dataset is 0"
logger.debug(f"dataframe: {df}")
if do_train_test_split:
# create top labels
df["top_labels"] = df.labels.apply(
lambda x: list(set([selected_taxonomy.find(label).get(1, {}).get("label") for label in x]))
)
train_dataset = []
test_dataset = []
if config.run.dataset.use_predefined_split:
for record in dataset:
if record.get("uri") in config.run.dataset.predefined_uris:
test_dataset.append(record)
else:
train_dataset.append(record)
logger.debug(f"Train examples: {len(train_dataset)}, Test examples: {len(test_dataset)}")
y_test = [
binarize(
taxonomy=selected_taxonomy,
labels=label
) for label in df.top_labels.tolist()
]
logger.info(f"Current class split count: {np.asarray(y_test).sum(axis=0)}")
else:
X = np.asarray(df.uri.tolist())
y = np.asarray([
binarize(
taxonomy=selected_taxonomy,
labels=label
) for label in df.top_labels.tolist()
])
# specifically splitting on uri -> this way we can select original dataset results later
_, test_uris, _, y_test = train_test_split(
X,
y,
test_size=config.run.dataset.train_test_split,
random_state=42
)
for record in dataset:
if record.get("uri") in test_uris.tolist():
test_dataset.append(record)
else:
train_dataset.append(record)
logger.info(f"Current class split count: {y_test.sum(axis=0)}")
else:
train_dataset, test_dataset = dataset, None
logger.info(f"Training records: {len(train_dataset)} test records: {len(test_dataset) if test_dataset else 0}")
selected_taxonomy.label = taxonomy_uri.split("/")[-1]
selected_taxonomy.uri = taxonomy_uri
return cls(
config=config,
logger=logger,
train_dataset=train_dataset,
test_dataset=test_dataset,
taxonomy=selected_taxonomy
)
[docs]
@classmethod
def from_checkpoint(
cls,
config: Config,
logger: Logger,
checkpoint_folder: str
):
"""
Classmethod to create an instance of the databuilder class from a checkpoint.
This checkpoint is based on the checkpoints that are created using the 'create_checkpoint' method.
:param config: general config provided
:param logger: logging object that is used throughout the project
:param checkpoint_folder: folder where to save everything to
:return: an instance of the DatasetBuilder object
"""
# Loading dataset
with open(os.path.join(checkpoint_folder, "train_dataset.json"), "r") as f:
train_dataset = json.load(f)
with open(os.path.join(checkpoint_folder, "test_dataset.json"), "r") as f:
test_dataset = json.load(f)
# loading taxonomy
taxonomy = Taxonomy.from_checkpoint(
config=config.data_models,
logger=logger,
checkpoint_folder=
os.path.join(checkpoint_folder)
)
return cls(
config=config,
logger=logger,
train_dataset=train_dataset,
test_dataset=test_dataset,
taxonomy=taxonomy
)