Skip to content
Snippets Groups Projects
wordlist.py 11.2 KiB
Newer Older
# -*- coding: utf-8 -*-

"""This module provides the WordList class, which serves to look up
form analyses produced by the Morpheus tool.
"""

from collections import defaultdict
import os
import subprocess
from typing import Dict, List, Set, Union

from sqlalchemy import and_, or_
from sqlalchemy.orm import sessionmaker

from . import postags
from .config import MACRONS_FILE, MORPHEUS_DIR, POPULATE_DATABASE
from .db import SESSION_FACTORY, FormAnalysis


def clean_lemma(lemma):
    # TODO: Find out what this is for and write a docstring for it.
    return (lemma.replace("#", "").replace("1", "").replace(" ", "+")
            .replace("-", "").replace("^", "").replace("_", ""))


class WordList:
    """Mapping from forms to Morpheus analyses of the forms.

    A WordList stores FormAnalysis objects in the `form_analyses`
    attribute and forms that are unknown to Morpheus in the set of
    `unknown_forms`.

    Use the functions :func: `get_morphtags`, :func: `get_lemmas`,
    :func: `get_accenteds` and the more general :func: `analyze` to
    look up information about a form. Use the :func:
    `populate_database` function to initially populate the database.
    """

    def __init__(self, form_analyses: Dict[str, Set[FormAnalysis]] = None,
                 unknown_forms: Set[str] = None,
                 session_factory: sessionmaker = SESSION_FACTORY,
                 populate_database: bool = POPULATE_DATABASE) -> None:
        """Initialize a WordList.

        :param form_analyses: Mapping of forms to form analyses.
        :param unknown_forms: Words unknown to morpheus.
        :param session_factory: The sqlalchemy sessionmaker.
        """
        self.form_analyses = form_analyses or defaultdict(set)
        self.unknown_forms = unknown_forms or set()
        self.session_factory = session_factory
        self._session = self.session_factory()
        if populate_database:
            self.populate_database()

    def get_morphtags(self, form: str) -> Set[str]:
        """Get the morphtags of a form.

        :param form: The form that is to be analyzed.
        :return: The morphtags of the form.
        """
        analyses = self.analyze(form)
        return {a.morphtag for a in analyses} if analyses else set()

    def get_lemmas(self, form: str) -> Set[str]:
        """Get the lemmas of a form.

        :param form: The form that is to be analyzed.
        :return: The lemmas of the form.
        """
        analyses = self.analyze(form)
        return {a.lemma for a in analyses} if analyses else set()

    def get_accenteds(self, form: str) -> Set[str]:
        """Get the accented versions of a form.

        :param form: The form that is to be analyzed.
        :return: The accented versions of the form.
        """
        analyses = self.analyze(form)
        return {a.accented for a in analyses} if analyses else set()

    def populate_database(self, macrons_file: str = MACRONS_FILE) -> None:
        """Populate database with form analyses from `macrons_file`.

        `macrons_file` has to consist of lines that are either

            1. a form, a morphtag, a lemma and an accented version
               separated by spaces or
            2. a comment starting with a number sign (#)

        :param `macrons_file`: A text file containing the analysis
            info.
        """
        with open(macrons_file) as f:
            for line in f:
                line = line.strip()
                if not line.startswith('#'):
                    form, morphtag, lemma, accented = line.split()
                    analysis = FormAnalysis(form=form, morphtag=morphtag,
                                            lemma=lemma, accented=accented)
                    self._session.add(analysis)
        self._session.commit()

    def analyze(self, form: str) -> Set[FormAnalysis]:
        """Get a list of analyses for `form`.

        The function first attempts to get the analyses from the saved
        analyses, then attempts to load them from the database, then
        attempts to analyze it by giving it to the Morpheus cruncher.

        :param form: The form that is to be analyzed.
        :return: The analyses or an empty list if the form is unknown.
        """
        if form not in self.form_analyses:
            if form in self.unknown_forms:
                return set()
            elif not self.load_from_db(form):
                morpheus_analyses = self.analyze_with_morpheus([form])
                if morpheus_analyses:
                    self.cache_analyses(morpheus_analyses)
                else:
                    self.unknown_forms.add(form)

        if not self.form_analyses[form] and form[0].isupper():
            # Try to look up the non-capitalized version of the form.
            analyses = self.analyze(form.lower())
            if analyses:
                self.cache_analyses({form: analyses})

        return self.form_analyses[form]

    def load_from_db(self, form: str) -> Set[FormAnalysis]:
        """Load analyses of `form` from the database.

        :param form: The form that is to be analyzed.
        :return: The analyses or an empty list if the form is unknown.
        """
        analyses = set(self._session.query(FormAnalysis)
                       .filter_by(form=form).all())
        if analyses:
            self.cache_analyses({form: analyses})
        return analyses

    def load_all_from_db(self) -> Set[FormAnalysis]:
        """Load all analyses from the database.

        :return: The analyses in the database
        """
        analyses = set(self._session.query(FormAnalysis).all())
        for analysis in analyses:
            self.form_analyses[analysis.form].add(analysis)
        return analyses

    def analyze_with_morpheus(self, forms: Union[List[str], str],
                              update_db: bool = True,
                              morpheus_dir: str = MORPHEUS_DIR) -> Dict[
                                  str, Set[FormAnalysis]]:
        """Start a morpheus process to analyze several forms.

        :param forms: The forms that are to be analyzed.
        :param update_db: Whether to update the database with the
            analyses.
        :param morpheus_dir: Directory where the Morpheus tool is
            installed
        :return: The analyses or an empty dict if all forms are unknown.

        """
        env = os.environ.copy()
        env['MORPHLIB'] = os.path.join(morpheus_dir, 'stemlib')
        cruncher = os.path.join(morpheus_dir, 'bin', 'cruncher')
        args = [cruncher, '-L']
        proc = subprocess.run(args, env=env, universal_newlines=True,
                              input='\n'.join(forms), stdout=subprocess.PIPE,
                              stderr=subprocess.PIPE)
        if proc.returncode != 0:
            raise RuntimeError(
                'Failed executing morpheus with these args: {}\nStderr: "{}"'
                .format(args, proc.stderr)
            )
        else:
            out_lines = proc.stdout.split('\n')
            analyzed_forms = {}
            final_analyses = defaultdict(set)
            unknown_forms = set()
            for i in range(len(out_lines)):
                form = out_lines[i]
                if (i < len(out_lines) - 1
                    and out_lines[i+1].startswith('<NL>')):
                    # Next line has NL analyses, collect them.
                    nls = out_lines[i+1].strip()
                    analyzed_forms[form] = analyzed_forms.get(form, '') + nls
                elif not form.startswith('<NL>'):
                    # form is actually a word form, but since the next
                    # line does not start with <NL>, form must be unknown to
                    # morpheus.
                    # TODO: Don’t add the empty string here.
                    unknown_forms.add(form)

            for form, nls in analyzed_forms.items():
                parses = []
                for nl in nls.split("<NL>"):
                    nl = nl.replace("</NL>", "")
                    nlparts = nl.split()
                    if len(nlparts) > 0:
                        parses += postags.morpheus_to_parses(form, nl)
                lemmatag_to_accenteds = defaultdict(list)
                for parse in parses:
                    lemma = clean_lemma(parse[postags.LEMMA])
                    parse[postags.LEMMA] = lemma
                    accented = parse[postags.ACCENTEDFORM]
                    parse[postags.ACCENTEDFORM] = accented
                    tag = postags.parse_to_ldt(parse)
                    lemmatag_to_accenteds[(lemma, tag)].append(accented)
                if len(lemmatag_to_accenteds) == 0:
                    print('Unexpected place')
                    continue
                for (lemma, tag), accenteds in lemmatag_to_accenteds.items():
                    # Sometimes there are multiple accented forms;
                    # prefer 'volvit' to 'voluit',
                    # but keep 'Ju_lius' as well as 'I^u_lius'.
                    bestaccented = sorted(accenteds,
                                          key=lambda x: x.count('v'))[-1]
                    lemmatag_to_accenteds[(lemma, tag)] = bestaccented

                for (lemma, tag), accented in lemmatag_to_accenteds.items():
                    analysis = FormAnalysis(form=form, morphtag=tag,
                                            lemma=lemma, accented=accented)
                    final_analyses[form].add(analysis)
                    if update_db:
                        self._session.add(analysis)

                if update_db:
                    self._session.commit()

            for form in unknown_forms:
                self.unknown_forms.add(form)
                if update_db:
                    self._session.add(FormAnalysis(form=form))
            if update_db:
                self._session.commit()
                self._delete_duplicates_from_db()
        return final_analyses

    def _delete_duplicates_from_db(self) -> None:
        """Delete duplicate lines from the database."""
        fa1 = FormAnalysis
        fa2 = self._session.query(FormAnalysis).subquery('fa2')
        tbd = (self._session.query(fa1)
               .filter(fa1.form == fa2.c.form)
               .filter(or_(fa1.morphtag == fa2.c.morphtag,
                           and_(fa1.morphtag.is_(None),
                                fa2.c.morphtag.is_(None))))
               .filter(or_(fa1.lemma == fa2.c.lemma,
                           and_(fa1.lemma.is_(None), fa2.c.lemma.is_(None))))
               .filter(or_(fa1.accented == fa2.c.accented,
                           and_(fa1.accented.is_(None),
                                fa2.c.accented.is_(None))))
               .filter(fa1.id > fa2.c.id)
               .all())
        for a in tbd:
            self._session.delete(a)
        self._session.commit()

    def depopulate_database(self) -> None:
        """Delete all form analyses from the database."""
        self._session.query(FormAnalysis).delete()
        self._session.commit()

    def cache_analyses(self, analyses: Dict[str, Set[FormAnalysis]]) -> None:
        """Store some form analyses in self.form_analyses.

        :param analyses: The analyses to cache.
        """
        for form, ana_set in analyses.items():
            self.form_analyses[form].update(ana_set)
            if ana_set and form in self.unknown_forms:
                self.unknown_forms.remove(form)