Source code for kwx.utils

"""
utils
-----

Utility functions for data loading, cleaning, output formatting, and user interaction.

Contents:
    load_data,
    _combine_texts_to_str,
    _remove_unwanted,
    _lemmatize,
    clean,
    prepare_data,
    _prepare_corpus_path,
    translate_output,
    organize_by_pos,
    prompt_for_word_removal
"""

import warnings

with warnings.catch_warnings():
    warnings.filterwarnings("ignore", category=DeprecationWarning)
    from collections import defaultdict

import gc
import os
import random
import string
from multiprocessing import Pool

import emoji
import gensim
import pandas as pd
import spacy
from googletrans import Translator
from nltk.stem.snowball import SnowballStemmer
from stopwordsiso import stopwords
from tqdm.auto import tqdm

with warnings.catch_warnings():
    warnings.filterwarnings("ignore", category=UserWarning)
    from gensim.models import Phrases

from kwx import languages


[docs]def load_data(data, target_cols=None): """ Loads data from a path and formats it into a pandas df. Parameters ---------- data : pd.DataFrame or csv/xlsx path The data in df or path form. target_cols : str or list (default=None) The columns in the csv/xlsx or dataframe that contain the text data to be modeled. Returns ------- df_texts : pd.DataFrame The texts as a df. """ if isinstance(data, str): if data[-len("xlsx") :] == "xlsx": df_texts = pd.read_excel(io=data) elif data[-len("csv") :] == "csv": df_texts = pd.read_csv(filepath_or_buffer=data) else: ValueError("Strings passed should be paths to csv or xlsx files.") elif isinstance(data, pd.DataFrame): df_texts = data elif isinstance(data, pd.Series): df_texts = pd.DataFrame(data).reset_index(drop=True) df_texts.columns = data.index.values.tolist() else: ValueError( "The 'data' argument should be either the name of a csv/xlsx file a pandas dataframe." ) if target_cols is None: target_cols = df_texts.columns elif isinstance(target_cols, str): target_cols = [target_cols] df_texts = df_texts[target_cols] return df_texts
[docs]def _combine_texts_to_str(text_corpus, ignore_words=None): """ Combines texts into one string. Parameters ---------- text_corpus : str or list The texts to be combined. ignore_words : str or list Strings that should be removed from the text body. Returns ------- texts_str : str A string of the full text with unwanted words removed. """ if isinstance(ignore_words, str): words_to_ignore = [ignore_words] elif isinstance(ignore_words, list): words_to_ignore = ignore_words else: words_to_ignore = [] if isinstance(text_corpus[0], list): flat_words = [text for sublist in text_corpus for text in sublist] flat_words = [ token for subtext in flat_words for token in subtext.split(" ") if token not in words_to_ignore ] else: flat_words = [ token for subtext in text_corpus for token in subtext.split(" ") if token not in words_to_ignore ] return " ".join(flat_words)
[docs]def _remove_unwanted(args): """ Lower cases tokens and removes numbers and possibly names. Parameters ---------- args : list of tuples The following arguments zipped. text : list The text to clean. words_to_ignore : str or list Strings that should be removed from the text body. stop_words : str or list Stopwords for the given language. Returns ------- text_words_removed : list The text without unwanted tokens. """ text, words_to_ignore, stop_words = args return [ token.lower() for token in text if token not in words_to_ignore and token not in stop_words ]
[docs]def _lemmatize(tokens, nlp=None, verbose=True): """ Lemmatizes tokens. Parameters ---------- tokens : list or list of lists Tokens to be lemmatized. nlp : spacy.load object A spacy language model. verbose : bool (default=True) Whether to show a tqdm progress bar for the query. Returns ------- base_tokens : list or list of lists Tokens that have been lemmatized for nlp analysis. """ allowed_pos_tags = ["NOUN", "PROPN", "ADJ", "ADV", "VERB"] base_tokens = [] for t in tqdm( tokens, total=len(tokens), desc="Texts lemmatized", unit="texts", disable=not verbose, ): combined_texts = _combine_texts_to_str(text_corpus=t) lem_tokens = nlp(combined_texts) lemmed_tokens = [ token.lemma_ for token in lem_tokens if token.pos_ in allowed_pos_tags ] base_tokens.append(lemmed_tokens) return base_tokens
[docs]def clean( texts, input_language=None, min_token_freq=2, min_token_len=3, min_tokens=0, max_token_index=-1, min_ngram_count=3, remove_stopwords=True, ignore_words=None, sample_size=1, verbose=True, ): """ Cleans and tokenizes a text body to prepare it for analysis. Parameters ---------- texts : str or list The texts to be cleaned and tokenized. input_language : str (default=None) The English name of the language in which the texts are found. min_token_freq : int (default=2) The minimum allowable frequency of a word inside the text corpus. min_token_len : int (default=3) The smallest allowable length of a word. min_tokens : int (default=0) The minimum allowable length of a tokenized text. max_token_index : int (default=-1) The maximum allowable length of a tokenized text. min_ngram_count : int (default=5) The minimum occurrences for an n-gram to be included. remove_stopwords : bool (default=True) Whether to remove stopwords. ignore_words : str or list Strings that should be removed from the text body. sample_size : float (default=1) The amount of data to be randomly sampled. verbose : bool (default=True) Whether to show a tqdm progress bar for the query. Returns ------- text_corpus : list or list of lists The texts formatted for analysis. """ input_language = input_language.lower() # Select abbreviation for the lemmatizer, if it's available. if input_language in languages.lem_abbr_dict().keys(): input_language = languages.lem_abbr_dict()[input_language] if isinstance(texts, str): texts = [texts] if isinstance(ignore_words, str): words_to_ignore = [ignore_words] elif ignore_words is None: words_to_ignore = [] else: words_to_ignore = ignore_words stop_words = [] if remove_stopwords: if stopwords(input_language) != set(): # the input language has stopwords stop_words = stopwords(input_language) # Stemming and normal stopwords are still full language names. elif input_language in languages.stem_abbr_dict().keys(): stop_words = stopwords(languages.stem_abbr_dict()[input_language]) elif input_language in languages.sw_abbr_dict().keys(): stop_words = stopwords(languages.sw_abbr_dict()[input_language]) pbar = tqdm( desc="Cleaning steps complete", total=7, unit="step", disable=not verbose ) # Remove spaces that are greater that one in length. texts_no_large_spaces = [] for r in texts: for i in range( 25, 0, -1 ): # loop backwards to assure that smaller spaces aren't made large_space = str(i * " ") if large_space in r: r = r.replace(large_space, " ") texts_no_large_spaces.append(r) texts_no_random_punctuation = [] # Prevent words from being combined when a user types word/word or word-word. for r in texts_no_large_spaces: r = r.replace("/", " ") r = r.replace("-", " ") if input_language == "fr": # Get rid of the 'of' abbreviation for French. r = r.replace("d'", "") texts_no_random_punctuation.append(r) texts_no_punctuation = [ r.translate(str.maketrans("", "", string.punctuation + "–" + "’")) for r in texts_no_random_punctuation ] gc.collect() pbar.update() texts_no_emojis = [ emoji.replace_emoji(response, replace="") for response in texts_no_punctuation ] tokenized_texts = [ [token for token in text.lower().split() if not token.isnumeric()] for text in texts_no_emojis ] tokenized_texts = [t for t in tokenized_texts if t != []] gc.collect() pbar.update() # Add bigrams and trigrams. # Use half the normal threshold. if float(gensim.__version__[0]) >= 4: bigrams = Phrases( sentences=tokenized_texts, min_count=min_ngram_count, threshold=5.0, connector_words=stop_words, ) trigrams = Phrases( sentences=bigrams[tokenized_texts], min_count=min_ngram_count, threshold=5.0, connector_words=stop_words, ) else: bigrams = Phrases( # pylint: disable=unexpected-keyword-arg sentences=tokenized_texts, min_count=min_ngram_count, threshold=5.0, common_terms=stop_words, ) trigrams = Phrases( # pylint: disable=unexpected-keyword-arg sentences=bigrams[tokenized_texts], min_count=min_ngram_count, threshold=5.0, common_terms=stop_words, ) tokens_with_ngrams = [] for text in tqdm( tokenized_texts, total=len(tokenized_texts), desc="n-grams generated", unit="texts", disable=not verbose, ): for token in bigrams[text]: if token.count("_") == 1: # Token is a bigram, so add it to the tokens. text.insert(0, token) for token in trigrams[bigrams[text]]: if token.count("_") == 2: # Token is a trigram, so add it to the tokens. text.insert(0, token) tokens_with_ngrams.append(text) gc.collect() pbar.update() args = zip( tokens_with_ngrams, [words_to_ignore] * len(tokens_with_ngrams), [stop_words] * len(tokens_with_ngrams), ) num_cores = os.cpu_count() if __name__ == "kwx.utils": with Pool(processes=num_cores) as pool: tokens_remove_unwanted = list( tqdm( pool.imap(_remove_unwanted, args), total=len(tokens_with_ngrams), desc="Unwanted words removed", unit="texts", disable=not verbose, ) ) gc.collect() pbar.update() # Lemmatize or stem words (try the former first, then the latter). nlp = None try: nlp = spacy.load(input_language) base_tokens = _lemmatize( tokens=tokens_remove_unwanted, nlp=nlp, verbose=verbose ) except OSError: try: os.system("python -m spacy download {}".format(input_language)) nlp = spacy.load(input_language) base_tokens = _lemmatize( tokens=tokens_remove_unwanted, nlp=nlp, verbose=verbose ) except OSError: nlp = None if nlp is None: # Lemmatization failed, so try stemming. stemmer = None if input_language in SnowballStemmer.languages: stemmer = SnowballStemmer(input_language) # Correct if the abbreviations were put in. elif input_language == "ar": stemmer = SnowballStemmer("arabic") elif input_language == "fi": stemmer = SnowballStemmer("finish") elif input_language == "hu": stemmer = SnowballStemmer("hungarian") elif input_language == "sv": stemmer = SnowballStemmer("swedish") if stemmer is None: # We cannot lemmatize or stem. base_tokens = tokens_remove_unwanted else: # Stemming instead of lemmatization. base_tokens = [] # still call it lemmatized for consistency. for tokens in tqdm( tokens_remove_unwanted, total=len(tokens_remove_unwanted), desc="Texts stemmed", unit="texts", disable=not verbose, ): stemmed_tokens = [stemmer.stem(t) for t in tokens] base_tokens.append(stemmed_tokens) gc.collect() pbar.update() # Remove words that don't appear enough or are too small. token_frequencies = defaultdict(int) for tokens in base_tokens: for t in list(set(tokens)): token_frequencies[t] += 1 if min_token_len is None or min_token_len == False: min_token_len = 0 if min_token_freq is None or min_token_freq == False: min_token_freq = 0 assert isinstance( min_token_len, int ), "The 'min_token_len' argument must be an integer if used." assert isinstance( min_token_freq, int ), "The 'min_token_freq' argument must be an integer if used." min_len_freq_tokens = [ [ t for t in tokens if len(t) >= min_token_len and token_frequencies[t] >= min_token_freq ] for tokens in base_tokens ] gc.collect() pbar.update() # Derive those texts that still have valid words. non_empty_token_indexes = [i for i, t in enumerate(min_len_freq_tokens) if t != []] text_corpus = [min_len_freq_tokens[i] for i in non_empty_token_indexes] # Sample words, if necessary. if sample_size == 1: selected_idxs = list(range(len(text_corpus))) else: selected_idxs = [ i for i in random.choices( range(len(text_corpus)), k=int(sample_size * len(text_corpus)) ) ] text_corpus = [ _combine_texts_to_str(text_corpus=text_corpus[i]) for i in selected_idxs ] gc.collect() pbar.update() return text_corpus
[docs]def prepare_data( data=None, target_cols=None, input_language=None, min_token_freq=2, min_token_len=3, min_tokens=0, max_token_index=-1, min_ngram_count=3, remove_stopwords=True, ignore_words=None, sample_size=1, verbose=True, ): """ Prepares input data for analysis from a pandas.DataFrame or path. Parameters ---------- data : pd.DataFrame or csv/xlsx path The data in df or path form. target_cols : str or list (default=None) The columns in the csv/xlsx or dataframe that contain the text data to be modeled. input_language : str (default=None) The English name of the language in which the texts are found. min_token_freq : int (default=2) The minimum allowable frequency of a word inside the text corpus. min_token_len : int (default=3) The smallest allowable length of a word. min_tokens : int (default=0) The minimum allowable length of a tokenized text. max_token_index : int (default=-1) The maximum allowable length of a tokenized text. min_ngram_count : int (default=5) The minimum occurrences for an n-gram to be included. remove_stopwords : bool (default=True) Whether to remove stopwords. ignore_words : str or list Strings that should be removed from the text body. sample_size : float (default=1) The amount of data to be randomly sampled. verbose : bool (default=True) Whether to show a tqdm progress bar for the query. Returns ------- text_corpus, clean_texts, selected_idxs : list or list of lists, list, list The texts formatted for text analysis both as tokens and strings, as well as the indexes for selected entries. """ input_language = input_language.lower() # Select abbreviation for the lemmatizer, if it's available. if input_language in languages.lem_abbr_dict().keys(): input_language = languages.lem_abbr_dict()[input_language] if isinstance(target_cols, str): target_cols = [target_cols] df_texts = load_data(data) # Select columns from which texts should come. raw_texts = [] for i in df_texts.index: text = "".join( " " + df_texts.loc[i, c] for c in target_cols if isinstance(df_texts.loc[i, c], str) ) text = text[1:] # remove first blank space raw_texts.append(text) return clean( texts=raw_texts, input_language=input_language, min_token_freq=min_token_freq, min_token_len=min_token_len, min_tokens=min_tokens, max_token_index=max_token_index, min_ngram_count=min_ngram_count, remove_stopwords=remove_stopwords, ignore_words=ignore_words, sample_size=sample_size, verbose=verbose, )
[docs]def _prepare_corpus_path( text_corpus=None, clean_texts=None, target_cols=None, input_language=None, min_token_freq=2, min_token_len=3, min_tokens=0, max_token_index=-1, min_ngram_count=3, remove_stopwords=True, ignore_words=None, sample_size=1, verbose=True, ): """ Checks a text corpus to see if it's a path, and prepares the data if so. Parameters ---------- text_corpus : str or list or list of lists A path or text corpus over which analysis should be done. clean_texts : str The texts formatted for analysis as strings. target_cols : str or list (default=None) The columns in the csv/xlsx or dataframe that contain the text data to be modeled. input_language : str (default=None) The English name of the language in which the texts are found. min_token_freq : int (default=2) The minimum allowable frequency of a word inside the text corpus. min_token_len : int (default=3) The smallest allowable length of a word. min_tokens : int (default=0) The minimum allowable length of a tokenized text. max_token_index : int (default=-1) The maximum allowable length of a tokenized text. min_ngram_count : int (default=5) The minimum occurrences for an n-gram to be included. remove_stopwords : bool (default=True) Whether to remove stopwords. ignore_words : str or list Strings that should be removed from the text body. sample_size : float (default=1) The amount of data to be randomly sampled. verbose : bool (default=True) Whether to show a tqdm progress bar for the query. Returns ------- text_corpus : list or list of lists A prepared text corpus for the data in the given path. """ if isinstance(text_corpus, str): try: os.path.exists(text_corpus) # a path has been provided text_corpus = prepare_data( data=text_corpus, target_cols=target_cols, input_language=input_language, min_token_freq=min_token_freq, min_token_len=min_token_len, min_tokens=min_tokens, max_token_index=max_token_index, min_ngram_count=min_ngram_count, remove_stopwords=remove_stopwords, ignore_words=ignore_words, sample_size=sample_size, verbose=verbose, ) return text_corpus except OSError: return text_corpus return text_corpus
[docs]def translate_output(outputs, input_language, output_language): """ Translates model outputs using https://github.com/ssut/py-googletrans. Parameters ---------- outputs : list Output keywords of a model. input_language : str The English name of the language in which the texts are found. output_language The English name of the desired language for outputs. Returns ------- translated_outputs : list A list of keywords translated to the given output_language. """ translator = Translator() if isinstance(outputs[0], list): translated_outputs = [ [ translator.translate( text=o, src=input_language, dest=output_language ).text for o in sub_output ] for sub_output in outputs ] elif isinstance(outputs[0], str): translated_outputs = [ translator.translate(text=o, src=input_language, dest=output_language).text for o in outputs ] return translated_outputs
[docs]def organize_by_pos(outputs, output_language): """ Orders a keyword output by the part of speech of the words. Parameters ---------- outputs : list The keywords that have been extracted. output_language : str The spoken language in which the results should be given. Returns ------- ordered_outputs : list The given keywords ordered by their pos. """ if output_language in languages.lem_abbr_dict().keys(): output_language = languages.lem_abbr_dict()[output_language] if ( output_language in languages.lem_abbr_dict().values() ): # we can use spacy to detect parts of speech. nlp = spacy.load(output_language) nlp_outputs = [nlp(o)[0] for o in outputs] # Those parts of speech to be considered (others go to an 'Other' category). pos_order = ["NOUN", "PROPN", "ADJ", "ADV", "VERB"] ordered_outputs = [[o for o in nlp_outputs if o.pos_ == p] for p in pos_order] flat_ordered_outputs = [str(o) for sub in ordered_outputs for o in sub] other = [] for o in outputs: if o not in flat_ordered_outputs: other.append(o) ordered_outputs.append(other) outputs_dict = {} for i, o in enumerate(ordered_outputs): if i == 0: outputs_dict["Nouns:"] = o if i == 1: outputs_dict["Nouns:"] += o # proper nouns put in nouns if i == 2: outputs_dict["Adjectives:"] = ordered_outputs[i] if i == 3: outputs_dict["Adverbs:"] = ordered_outputs[i] if i == 4: outputs_dict["Verbs:"] = ordered_outputs[i] if i == 5: outputs_dict["Other:"] = ordered_outputs[i] outputs_dict = { k: v for k, v in outputs_dict.items() if v != [] } # remove if no entries return outputs_dict else: return outputs
[docs]def prompt_for_word_removal(words_to_ignore=None): """ Prompts the user for words that should be ignored in kewword extraction. Parameters ---------- words_to_ignore : str or list Words that should not be included in the output. Returns ------- ignore words, words_added : list, bool A new list of words to ignore and a boolean indicating if words have been added. """ if isinstance(words_to_ignore, str): words_to_ignore = [words_to_ignore] words_to_ignore = [w.replace("'", "") for w in words_to_ignore] words_added = False # whether to run the models again more_words = True while more_words: more_words = input("\nShould words be removed [y/n]? ") if more_words == "y": new_words_to_ignore = input("Type or copy word(s) to be removed: ") # Remove commas if the user has used them to separate words, # as well as apostraphes. new_words_to_ignore = [ char for char in new_words_to_ignore if char not in [",", "'"] ] new_words_to_ignore = "".join(new_words_to_ignore) if " " in new_words_to_ignore: new_words_to_ignore = new_words_to_ignore.split(" ") elif isinstance(new_words_to_ignore, str): new_words_to_ignore = [new_words_to_ignore] words_to_ignore += new_words_to_ignore words_added = True # we need to run the models again more_words = False elif more_words == "n": more_words = False else: print("Invalid input") return words_to_ignore, words_added