Module lum.odinson.doc

Expand source code
from __future__ import annotations
import dateutil
from lum.odinson.typing import Tokens
from enum import Enum
from typing import Any, List, Literal, Optional, Sequence, Text, Tuple, Type, Union
import abc
from pydantic import BaseModel, ConfigDict
import pydantic
import gzip
import json

__all__ = ["Document", "AnyField"]


class Fields(Text, Enum):
    # $type      ai.lum.odinson.*
    TOKENS_FIELD = "ai.lum.odinson.TokensField"
    GRAPH_FIELD = "ai.lum.odinson.GraphField"
    STRING_FIELD = "ai.lum.odinson.StringField"
    DATE_FIELD = "ai.lum.odinson.DateField"
    NUMBER_FIELD = "ai.lum.odinson.NumberField"
    NESTED_FIELD = "ai.lum.odinson.NestedField"

    def __repr__(self) -> str:
        return str.__repr__(self.value)


class Field(BaseModel):
    name: Text
    type: Fields = pydantic.Field(alias="$type", default="ai.lum.odinson.Field")
    model_config = ConfigDict(use_enum_values=True, validate_default=True)


class TokensField(Field):
    tokens: Tokens
    type: Literal[Fields.TOKENS_FIELD] = pydantic.Field(
        alias="$type", default=Fields.TOKENS_FIELD.value, frozen=True
    )

    def __hash__(self) -> int:
        return hash((self.name, self.type, tuple(self.tokens)))


class GraphField(Field):
    edges: List[Tuple[int, int, Text]]
    roots: Sequence[int]
    type: Literal[Fields.GRAPH_FIELD] = pydantic.Field(
        alias="$type", default=Fields.GRAPH_FIELD.value, frozen=True
    )

    def __hash__(self) -> int:
        roots = tuple(self.roots)
        edges = tuple(
            sorted(self.edges, key=lambda triple: (triple[0], triple[1], triple[2]))
        )
        return hash((self.name, self.type, roots, edges))


class StringField(Field):
    string: Text
    type: Literal[Fields.STRING_FIELD] = pydantic.Field(
        alias="$type", default=Fields.STRING_FIELD.value, frozen=True
    )

    def __hash__(self) -> int:
        return hash((self.name, self.type, self.string))


class DateField(Field):
    date: Text
    type: Literal[Fields.DATE_FIELD] = pydantic.Field(
        alias="$type", default=Fields.DATE_FIELD.value, frozen=True
    )

    def __hash__(self) -> int:
        return hash((self.name, self.type, self.date))


class NumberField(Field):
    value: float
    type: Literal[Fields.NUMBER_FIELD] = pydantic.Field(
        alias="$type", default=Fields.NUMBER_FIELD.value, frozen=True
    )

    def __hash__(self) -> int:
        return hash((self.name, self.type, self.value))


class NestedField(Field):
    fields: List[Type[Field]]
    type: Literal[Fields.NESTED_FIELD] = pydantic.Field(
        alias="$type", default=Fields.NESTED_FIELD.value, frozen=True
    )

    def __hash__(self) -> int:
        return hash((self.name, self.type, tuple(self.fields)))


AnyField = Union[
    TokensField, GraphField, StringField, DateField, NumberField, NestedField
]


class Metadata:
    """Utility methods for contructing metadata"""

    @staticmethod
    def from_dict(d) -> List[AnyField]:
        fields = []
        for fname, v in d.items():
            if isinstance(v, str):
                # try parsing as date
                try:
                    _ = dateutil.parser.parse(v)
                    fields.append(DateField(name=fname, date=v))
                except:
                    fields.append(StringField(name=fname, string=v))
            elif isinstance(v, float) or isinstance(v, int):
                fields.append(NumberField(name=fname, value=float(v)))
            elif isinstance(v, list):
                # if all elems are str -> TokensField
                pass
            elif isinstance(v, dict):
                # if contains "edges" and "roots" -> GraphField
                pass


class Token:
    """Convenience class to represent a single token
    and its attributes"""

    def __init__(self, **kwargs):
        for k, v in kwargs.items():
            self.__dict__[k] = v

    def __len__(self):
        return len(self.raw)

    def __str__(self):
        return self.raw

    def __hash__(self):
        return hash(tuple(frozenset(sorted(self.__dict__.items()))))


class Sentence(BaseModel):
    numTokens: int
    # FIXME: figure out how to just use List[Type[Field]]
    fields: List[AnyField]
    model_config = ConfigDict(extra="allow")

    def model_post_init(self, ctx) -> None:
        """Easy access to fields"""
        tokens = [dict([]) for _ in range(self.numTokens)]
        graph = None
        for f in self.fields:
            key = f.name
            # token attributes
            if isinstance(f, TokensField):
                # populate tokens
                for i, label in enumerate(f.tokens):
                    tokens[i][key] = label
                self.__dict__[key] = f.tokens
            # syntactic dependencies, semantic roles, etc.
            elif isinstance(f, GraphField):
                graph = f
        # create tokens
        self.__dict__["tokens"] = [Token(**d) for d in tokens]

        # TODO: create graph
        # if graph is not None:
        #   edges = []
        #   self.__dict__["edges"] =
        #   key = "edges"

    def __getitem__(self, index: int) -> Token:
        return self.tokens[index]

    def __hash__(self):
        num_tokens = [self.numTokens]
        fields = [hash(f) for f in self.fields]
        components = num_tokens + fields
        return hash(tuple(components))

    def model_dump(self, by_alias=True, **kwargs):
        return super().model_dump(by_alias=by_alias, **kwargs)

    def model_dump_json(self, by_alias=True, **kwargs):
        return super().model_dump_json(by_alias=by_alias, **kwargs)

    def dict(self, **kwargs):
        return self.model_dump(**kwargs)

    def json(self, **kwargs):
        return self.model_dump_json(**kwargs)

    def copy(self, fields: List[AnyField]) -> "Sentence":
        """Convenience method for easily copying an Odinson Sentence and replacing specific attributes"""
        return Sentence(
            # validate and count tokens
            numTokens=Sentence._count_tokens(fields),
            fields=fields,
        )

    @staticmethod
    def validate_fields(fields: List[AnyField]) -> bool:
        # validation
        num_tokens = set()
        for f in fields:
            if isinstance(f, TokensField):
                num_tokens.add(len(f.tokens))
        # NOTE: this will also fail if no TokensField are present
        if len(num_tokens) != 1:
            raise Exception(
                f"All TokensField for sentence should have same length, but found {len(num_tokens)}"
            )
        return True

    @staticmethod
    def _count_tokens(fields: List[AnyField]) -> bool:
        """Get count of tokens based on TokensField after first validating with Sentence.validate_fields"""
        _ = Sentence.validate_fields(fields)
        for f in fields:
            if isinstance(f, TokensField):
                return len(f.tokens)

    @staticmethod
    def from_fields(fields: List[AnyField]) -> "Sentence":
        """Create an Odinson Sentence from a collection of fields"""
        return Sentence(
            # validate and count
            numTokens=Sentence._count_tokens(fields),
            fields=fields,
        )

    @staticmethod
    def from_tokens(tokens: List[Token]) -> "Sentence":
        """Create an Odinson Sentence from a collection Tokens"""
        fields_dict = dict()
        for tok in tokens:
            for k, v in tok.__dict__.items():
                value = fields_dict.get(k, [])
                value.append(v)
                fields_dict[k] = value
        num_tokens = list({len(values) for values in fields_dict.items()})
        assert num_tokens == 1, "All token attributes must have the same length"
        fields = [TokensField(name=k, tokens=toks) for k, toks in fields_dict.items()]
        return Sentence(numTokens=num_tokens[0], fields=fields)


class Document(BaseModel):
    """ai.lum.odinson.Document"""

    id: Text
    metadata: List[AnyField]
    sentences: List[Sentence]

    def __hash__(self):
        metadata = [0] if len(self.metadata) == 0 else [hash(f) for f in self.metadata]
        sentences = [hash(s) for s in self.sentences]
        components = [hash(self.id)] + metadata + sentences
        return hash(tuple(components))

    def model_post_init(self, ctx) -> None:
        tokens = []
        attributes = dict()
        for s in self.sentences:
            # append tokens
            tokens.append(s.tokens)
            for f in s.fields:
                key = f.name
                # token attributes
                if isinstance(f, TokensField):
                    values = attributes.get(key, [])
                    # nested list
                    values.append(s.__dict__[key])
                    attributes[key] = values
        for k, v in attributes.items():
            self.__dict__[k] = v

        self.__dict__["tokens"] = [s.tokens for s in self.sentences]

    def metadata_by_name(self, name: str) -> List[AnyField]:
        return [m for m in self.metadata if m.name == name]

    def metadata_by_type(self, mtype: AnyField) -> List[AnyField]:
        return [m for m in self.metadata if m.type == mtype]

    def model_dump(self, by_alias=True, **kwargs):
        return super().model_dump(by_alias=by_alias, **kwargs)

    def model_dump_json(self, by_alias=True, **kwargs):
        return super().model_dump_json(by_alias=by_alias, **kwargs)

    def dict(self, **kwargs):
        return self.model_dump(**kwargs)

    def json(self, **kwargs):
        return self.model_dump_json(**kwargs)

    @staticmethod
    def from_file(fp: Text) -> Document:
        if fp.lower().endswith(".gz"):
            with gzip.open(fp, "rb") as f:
                return Document(**json.loads(f.read()))
        else:
            with open(fp, "r") as f:
                return Document(**json.loads(f.read()))

    def copy(
        self,
        id: Optional[str] = None,
        metadata: Optional[List[AnyField]] = None,
        sentences: Optional[List[Sentence]] = None,
    ) -> "Document":
        """Convenience method for easily copying an Odinson Document and replacing specific attributes"""
        return Document(
            id=id or self.id,
            metadata=metadata or self.metadata,
            sentences=sentences or self.sentences,
        )

Classes

class Document (**data: Any)

ai.lum.odinson.Document

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Uses __pydantic_self__ instead of the more common self for the first arg to allow self as a field name.

Expand source code
class Document(BaseModel):
    """ai.lum.odinson.Document"""

    id: Text
    metadata: List[AnyField]
    sentences: List[Sentence]

    def __hash__(self):
        metadata = [0] if len(self.metadata) == 0 else [hash(f) for f in self.metadata]
        sentences = [hash(s) for s in self.sentences]
        components = [hash(self.id)] + metadata + sentences
        return hash(tuple(components))

    def model_post_init(self, ctx) -> None:
        tokens = []
        attributes = dict()
        for s in self.sentences:
            # append tokens
            tokens.append(s.tokens)
            for f in s.fields:
                key = f.name
                # token attributes
                if isinstance(f, TokensField):
                    values = attributes.get(key, [])
                    # nested list
                    values.append(s.__dict__[key])
                    attributes[key] = values
        for k, v in attributes.items():
            self.__dict__[k] = v

        self.__dict__["tokens"] = [s.tokens for s in self.sentences]

    def metadata_by_name(self, name: str) -> List[AnyField]:
        return [m for m in self.metadata if m.name == name]

    def metadata_by_type(self, mtype: AnyField) -> List[AnyField]:
        return [m for m in self.metadata if m.type == mtype]

    def model_dump(self, by_alias=True, **kwargs):
        return super().model_dump(by_alias=by_alias, **kwargs)

    def model_dump_json(self, by_alias=True, **kwargs):
        return super().model_dump_json(by_alias=by_alias, **kwargs)

    def dict(self, **kwargs):
        return self.model_dump(**kwargs)

    def json(self, **kwargs):
        return self.model_dump_json(**kwargs)

    @staticmethod
    def from_file(fp: Text) -> Document:
        if fp.lower().endswith(".gz"):
            with gzip.open(fp, "rb") as f:
                return Document(**json.loads(f.read()))
        else:
            with open(fp, "r") as f:
                return Document(**json.loads(f.read()))

    def copy(
        self,
        id: Optional[str] = None,
        metadata: Optional[List[AnyField]] = None,
        sentences: Optional[List[Sentence]] = None,
    ) -> "Document":
        """Convenience method for easily copying an Odinson Document and replacing specific attributes"""
        return Document(
            id=id or self.id,
            metadata=metadata or self.metadata,
            sentences=sentences or self.sentences,
        )

Ancestors

  • pydantic.main.BaseModel

Class variables

var id : str
var metadata : List[Union[lum.odinson.doc.TokensField, lum.odinson.doc.GraphField, lum.odinson.doc.StringField, lum.odinson.doc.DateField, lum.odinson.doc.NumberField, lum.odinson.doc.NestedField]]
var model_config
var model_fields
var sentences : List[lum.odinson.doc.Sentence]

Static methods

def from_file(fp: Text) ‑> Document
Expand source code
@staticmethod
def from_file(fp: Text) -> Document:
    if fp.lower().endswith(".gz"):
        with gzip.open(fp, "rb") as f:
            return Document(**json.loads(f.read()))
    else:
        with open(fp, "r") as f:
            return Document(**json.loads(f.read()))

Methods

def copy(self, id: Optional[str] = None, metadata: Optional[List[AnyField]] = None, sentences: Optional[List[Sentence]] = None) ‑> Document

Convenience method for easily copying an Odinson Document and replacing specific attributes

Expand source code
def copy(
    self,
    id: Optional[str] = None,
    metadata: Optional[List[AnyField]] = None,
    sentences: Optional[List[Sentence]] = None,
) -> "Document":
    """Convenience method for easily copying an Odinson Document and replacing specific attributes"""
    return Document(
        id=id or self.id,
        metadata=metadata or self.metadata,
        sentences=sentences or self.sentences,
    )
def dict(self, **kwargs)
Expand source code
def dict(self, **kwargs):
    return self.model_dump(**kwargs)
def json(self, **kwargs)
Expand source code
def json(self, **kwargs):
    return self.model_dump_json(**kwargs)
def metadata_by_name(self, name: str) ‑> List[Union[lum.odinson.doc.TokensField, lum.odinson.doc.GraphField, lum.odinson.doc.StringField, lum.odinson.doc.DateField, lum.odinson.doc.NumberField, lum.odinson.doc.NestedField]]
Expand source code
def metadata_by_name(self, name: str) -> List[AnyField]:
    return [m for m in self.metadata if m.name == name]
def metadata_by_type(self, mtype: AnyField) ‑> List[Union[lum.odinson.doc.TokensField, lum.odinson.doc.GraphField, lum.odinson.doc.StringField, lum.odinson.doc.DateField, lum.odinson.doc.NumberField, lum.odinson.doc.NestedField]]
Expand source code
def metadata_by_type(self, mtype: AnyField) -> List[AnyField]:
    return [m for m in self.metadata if m.type == mtype]
def model_dump(self, by_alias=True, **kwargs)

Usage docs: https://docs.pydantic.dev/dev-v2/usage/serialization/#modelmodel_dump

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

Args

mode
The mode in which to_python should run. If mode is 'json', the dictionary will only contain JSON serializable types. If mode is 'python', the dictionary may contain any Python objects.
include
A list of fields to include in the output.
exclude
A list of fields to exclude from the output.
by_alias
Whether to use the field's alias in the dictionary key if defined.
exclude_unset
Whether to exclude fields that are unset or None from the output.
exclude_defaults
Whether to exclude fields that are set to their default value from the output.
exclude_none
Whether to exclude fields that have a value of None from the output.
round_trip
Whether to enable serialization and deserialization round-trip support.
warnings
Whether to log warnings when invalid fields are encountered.

Returns

A dictionary representation of the model.

Expand source code
def model_dump(self, by_alias=True, **kwargs):
    return super().model_dump(by_alias=by_alias, **kwargs)
def model_dump_json(self, by_alias=True, **kwargs)

Usage docs: https://docs.pydantic.dev/dev-v2/usage/serialization/#modelmodel_dump_json

Generates a JSON representation of the model using Pydantic's to_json method.

Args

indent
Indentation to use in the JSON output. If None is passed, the output will be compact.
include
Field(s) to include in the JSON output. Can take either a string or set of strings.
exclude
Field(s) to exclude from the JSON output. Can take either a string or set of strings.
by_alias
Whether to serialize using field aliases.
exclude_unset
Whether to exclude fields that have not been explicitly set.
exclude_defaults
Whether to exclude fields that have the default value.
exclude_none
Whether to exclude fields that have a value of None.
round_trip
Whether to use serialization/deserialization between JSON and class instance.
warnings
Whether to show any warnings that occurred during serialization.

Returns

A JSON string representation of the model.

Expand source code
def model_dump_json(self, by_alias=True, **kwargs):
    return super().model_dump_json(by_alias=by_alias, **kwargs)
def model_post_init(self, ctx) ‑> None

Override this method to perform additional initialization after __init__ and model_construct. This is useful if you want to do some validation that requires the entire model to be initialized.

Expand source code
def model_post_init(self, ctx) -> None:
    tokens = []
    attributes = dict()
    for s in self.sentences:
        # append tokens
        tokens.append(s.tokens)
        for f in s.fields:
            key = f.name
            # token attributes
            if isinstance(f, TokensField):
                values = attributes.get(key, [])
                # nested list
                values.append(s.__dict__[key])
                attributes[key] = values
    for k, v in attributes.items():
        self.__dict__[k] = v

    self.__dict__["tokens"] = [s.tokens for s in self.sentences]