# Copyright 2020 Mario Graff Guerrero
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import defaultdict
from microtc.utils import load_model, Counter
from b4msa.textmodel import TextModel as TM
from microtc.weighting import TFIDF
from microtc.utils import SparseMatrix
from scipy.sparse import csr_matrix
from typing import List, Iterable, OrderedDict, Union, Dict, Any, Tuple
from text_models.utils import download_tokens, handle_day, date_range, TM_ARGS
from os.path import isfile
import re
[docs]class TextModel(TM):
[docs] def text_transformations(self, text):
"""
>>> tm = TextModel(**TM_ARGS)
>>> tm.text_transformations('@user abd')
'~abd~'
"""
txt = super(TextModel, self).text_transformations(text)
return re.sub('~+', '~', txt)
[docs]class Vocabulary(object):
"""
Vocabulary class is used to transform the tokens and their
respective frequencies in a Text Model, as well as, to analyze
the tokens obtained from tweets collected.
This class can be used to replicate some of the Text Models
developed for :py:class:`EvoMSA.base.EvoMSA`.
:param data: Tokens and their frequencies
:type data: str or list
:param lang: Language (Ar, En, or Es)
:type lang: str
:param country: Two letter country code
:type country: str
:param states: Whether to keep the state or accumulate the information on the country
:type states: bool
>>> from text_models.vocabulary import Vocabulary
>>> day = dict(year=2020, month=2, day=14)
>>> voc = Vocabulary(day, lang="En", country="US")
"""
def __init__(self, data, lang: str="Es",
country: str='nogeo', states: bool=False) -> None:
self._lang = lang
self._country = country
self._states = states
if isinstance(data, dict) and len(data) > 3:
self._data = data
elif isinstance(data, str) and isfile(data):
self.voc = load_model(data)
else:
self.date = data
self._init(data)
if not states:
self._n_words = sum([v for k, v in self.voc.items() if k.count("~") == 0])
self._n_bigrams = sum([v for k, v in self.voc.items() if k.count("~")])
[docs] def probability(self):
"""Transform frequency to a probability"""
voc = self.voc
for k in voc:
num = voc[k]
if k.count("~"):
den = self._n_bigrams
else:
den = self._n_words
voc[k] = num / den
def _init(self, data):
"""
Process the :py:attr:`data` to create a :py:class:`microtc.utils.Counter`
"""
def sum_vocs(vocs):
voc = vocs[0]
for v in vocs[1:]:
voc = voc + v
return voc
if isinstance(data, list):
vocs = [download_tokens(day, lang=self._lang, country=self._country)
for day in data]
vocs = [load_model(x) for x in vocs]
if isinstance(vocs[0], Counter):
voc = sum_vocs(vocs)
elif not self._states:
vocs = [sum_vocs([v for _, v in i]) for i in vocs]
voc = sum_vocs(vocs)
else:
voc = {k: v for k, v in vocs[0]}
for v in vocs[1:]:
for k, d in v:
try:
voc[k] = voc[k] + d
except KeyError:
voc[k] = d
self._data = voc
else:
self.voc = load_model(download_tokens(data, lang=self._lang, country=self._country))
@property
def date(self):
"""
Date obtained from the filename, on multiple files, this is not available.
"""
return self._date
@date.setter
def date(self, d):
if isinstance(d, list):
self._date = None
return
self._date = handle_day(d)
@property
def weekday(self):
"""
Weekday
"""
return str(self.date.weekday())
@property
def voc(self):
"""Vocabulary, i.e., tokens and their frequencies"""
return self._data
@voc.setter
def voc(self, d):
if not isinstance(d, list):
self._data = d
return
if self._states:
self._data = {k: v for k, v in d}
return
aggr = d[0][1]
for _, v in d[1:]:
aggr = aggr + v
self._data = aggr
[docs] def common_words(self, quantile: float=None, bigrams=True):
"""Words used frequently; these correspond to py:attr:`EvoMSA.base.EvoMSA(B4MSA=True)`
In the case quantile is given the these words and bigrams correspond to
the most frequent.
"""
if quantile is None:
from EvoMSA.utils import download
return load_model(download("b4msa_%s.tm" % self._lang)).model.word2id
words_N = sum([v for k, v in self.voc.items() if k.count("~") == 0])
score = [[k, v / words_N] for k, v in self.voc.items() if k.count("~") == 0]
score.sort(key=lambda x: x[1], reverse=True)
cum, k = 0, 0
while cum <= quantile:
cum += score[k][1]
k += 1
output = [k for k, _ in score[:k]]
if bigrams:
bigrams_N = sum([v for k, v in self.voc.items() if k.count("~")])
score_bi = [[k, v / bigrams_N] for k, v in self.voc.items() if k.count("~")]
score_bi.sort(key=lambda x: x[1], reverse=True)
cum, k = 0, 0
while cum <= quantile:
cum += score_bi[k][1]
k += 1
output += [k for k, _ in score_bi[:k]]
return output
@staticmethod
def _co_occurrence(word: str, voc: dict) -> dict:
D = dict()
for k, v in voc.items():
if k.count("~") == 0:
continue
a, b = k.split("~")
if a != word and b != word:
continue
key = a if a != word else b
D[key] = v
return D
def co_occurrence(self, word: str) -> dict:
if self._states:
return {k: self._co_occurrence(word, v) for k, v in self.voc.items()}
return self._co_occurrence(word, self.voc)
[docs] def day_words(self) -> "Vocabulary":
"""Words used on the same day of different years"""
from datetime import date, datetime
hoy = date.today()
hoy = datetime(year=hoy.year, month=hoy.month, day=hoy.month)
L = []
for year in range(2015, hoy.year + 1):
try:
curr = datetime(year=year, month=self.date.month, day=self.date.day)
except ValueError:
continue
if (curr - self.date).days == 0:
continue
try:
download_tokens(curr, lang=self._lang, country=self._country)
except Exception:
continue
L.append(curr)
if len(L) == 0:
return None
return self.__class__(L if len(L) > 1 else L[0],
lang=self._lang,
country=self._country,
states=self._states)
def __iter__(self):
for x in self.voc:
yield x
[docs] def remove_emojis(self):
"""Remove emojis"""
from .dataset import Dataset
data = Dataset()
data.add(data.load_emojis())
# keys = [(k, [x for x in data.klass(k) if not x.isnumeric()]) for k in self]
keys = [(k, [x for x in data.klass(k)]) for k in self]
# keys = [(k, v) for k, v in keys if len(v) and v[0] != "#"]
keys = [(k, v) for k, v in keys if len(v)]
for k, v in keys:
del self.voc[k]
[docs] def previous_day(self):
"""Previous day"""
import datetime
one_day = datetime.timedelta(days=1)
r = self.date - one_day
_ = self.__class__(r, lang=self._lang,
country=self._country,
states=self._states)
return _
def __len__(self):
return len(self.voc)
def __getitem__(self, key):
return self.voc[key]
def __contains__(self, key):
return key in self.voc
[docs] def get(self, data, defaultvalue=0):
"""Frequency of data"""
return self.voc.get(data, defaultvalue)
[docs] def items(self):
"""Items of :py:attr:`self.voc`"""
return self.voc.items()
[docs] def remove(self, words: dict, bigrams=True) -> None:
"""
Remove the words from the current vocabulary
:param words: Tokens
"""
if not bigrams:
voc = self.voc
for w in words:
try:
del voc[w]
except Exception:
continue
return
K = []
for k in self.voc:
if k.count("~"):
a, b = k.split("~")
if a in words or b in words:
K.append(k)
if k in words:
K.append(k)
for k in K:
del self.voc[k]
def histogram(self, min_elements: int=30, words: bool=False):
group = defaultdict(list)
[group[v].append(k) for k, v in self.voc.items() if words or k.count("~")]
keys = list(group.keys())
keys.sort()
lst = list()
hist = OrderedDict()
for k in keys:
_ = group[k]
if len(lst) + len(_) >= min_elements:
hist[k] = lst + _
lst = list()
continue
lst += _
if len(lst):
hist[k] = lst
return hist
[docs] @staticmethod
def available_dates(dates=List, n=int, countries=List, lang=str):
"""Retrieve the first n dates available for all the countries
:param dates: List of dates
:param n: Number of days
:param countries: List of countries
:lang lang: Language
"""
missing = Counter(countries) if countries != 'nogeo' else None
rest = []
dates = dates[::-1]
while len(dates) and (len(rest) < n or n == -1):
day = dates.pop()
flag = True
iter = missing.most_common() if missing is not None else [[None, None]]
for country, _ in iter:
try:
download_tokens(day, lang=lang,
country=country if country is not None else 'nogeo')
except Exception:
flag = False
if missing is not None:
missing.update([country])
break
if flag:
rest.append(day)
return rest
[docs]class Tokenize(object):
""" Tokenize transforms a text into a sequence, where
each number identifies a particular token; the q-grams
that are not found in the text are ignored.
>>> from text_models import Tokenize
>>> tok = Tokenize().fit(["hi~mario", "mario"])
>>> tok.transform("good morning mario")
[1]
"""
def __init__(self, tm_args: Dict[str, Any]=TM_ARGS):
self._head = dict()
self._vocabulary = dict()
self._tag = "__end__"
self._textmodel = TextModel(**tm_args)
@property
def vocabulary(self) -> Dict[str, int]:
"""Vocabulary used"""
return self._vocabulary
@property
def textModel(self):
"""Text model, i.e., :py:class::`b4msa.text_model.TextModel`
"""
return self._textmodel
@textModel.setter
def textModel(self, v):
self._textmodel = v
[docs] def fit(self, tokens: List[str]) -> 'Tokenize':
"""Train the tokenizer.
:param tokens: Vocabulary as a list of tokens
:type tokens: List[str]
"""
voc = self.vocabulary
head = self._head
tag = self._tag
for word in tokens:
if word in voc:
continue
current = head
for char in word:
try:
current = current[char]
except KeyError:
_ = dict()
current[char] = _
current = _
cnt = len(voc)
voc[word] = cnt
current[tag] = cnt
return self
def _transform(self, text: str) -> List[int]:
L = []
i = 0
while i < len(text):
wordid, pos = self.find(text, i=i)
if wordid == -1:
i += 1
continue
i = pos
L.append(wordid)
return L
def find(self, text: str, i: int=0) -> Tuple[int, int]:
end = i
head = self._head
current = head
tag = self._tag
wordid = -1
while i < len(text):
char = text[i]
try:
current = current[char]
i += 1
try:
wordid = current[tag]
end = i
except KeyError:
pass
except KeyError:
break
return wordid, end
[docs] def id2word(self, id: int) -> str:
"""Token associated with id
:param id: Identifier
:type id: int
"""
try:
id2w = self._id2w
except AttributeError:
id2w = {v: k for k, v in self.vocabulary.items()}
self._id2w = id2w
return id2w[id]
[docs]class BagOfWords(SparseMatrix):
"""Bag of word model using TFIDF and
:py:class:`text_models.vocabulary.Tokenize`
:param tokens: Language (Ar|En|Es) or list of tokens
:type tokens: [str|List]
>>> from EvoMSA.tests.test_base import TWEETS
>>> from microtc.utils import tweet_iterator
>>> from text_models.vocabulary import BagOfWords
>>> tw = list(tweet_iterator(TWEETS))
>>> BoW = BagOfWords().fit(tw)
>>> BoW['hola mundo']
[(758, 0.7193757438600711), (887, 0.6946211479258095)]
"""
def __init__(self, tokens: Union[str, List[str]]="Es"):
from microtc.utils import load_model
from EvoMSA.utils import download
tok = Tokenize()
if isinstance(tokens, list):
xx = tokens
else:
textModel = load_model(download("b4msa_%s.tm" % tokens))
xx = list(textModel.model.word2id.keys())
tok.textModel = textModel
f = lambda cdn: "~".join([x for x in cdn.split("~") if len(x)])
tok.fit([f(k) for k in xx if k.count("~") and k[:2] != "q:"])
tok.fit([f(k) for k in xx if k.count("~") == 0 and k[:2] != "q:"])
qgrams = [f(k[2:]) for k in xx if k[:2] == "q:"]
tok.fit([x for x in qgrams if x.count("~") == 0 if len(x) >=2])
self._tokenize = tok
self._text = "text"
@property
def tokenize(self) -> Tokenize:
"""
:py:class:`text_models.vocabulary.Tokenize` instance
"""
return self._tokenize
[docs] def fit(self, X: List[Union[str, dict]]) -> 'BagOfWords':
""" Train the Bag of words model"""
from microtc.utils import Counter
cnt = Counter()
tokens = self.tokenize.transform([x for x in X])
[cnt.update(x) for x in tokens]
self._tfidf = TFIDF.counter(cnt)
return self
@property
def tfidf(self)->TFIDF:
return self._tfidf
[docs] def id2word(self, id: int) -> str:
"""Token associated with id
:param id: Identifier
:type id: int
"""
try:
w_id2w = self._w_id2w
except AttributeError:
self._w_id2w = {v: k for k, v in self.tfidf.word2id.items()}
w_id2w = self._w_id2w
id = w_id2w[id]
return self.tokenize.id2word(id)
@property
def num_terms(self):
return len(self.tokenize.vocabulary)
# def _transform(self, data: List[str]) -> List[Tuple[int, float]]:
# """Transform a list of text to a Bag of Words using TFIDF"""
# data = self.tokenize.transform(data)
# tfidf = self.tfidf
# return [tfidf[x] for x in data]
def __getitem__(self, data: str):
if isinstance(data, (list, tuple)):
tokens = []
for txt in data:
_ = self.tokenize.transform(txt)
tokens.extend(_)
else:
tokens = self.tokenize.transform(data)
return self.tfidf[tokens]
[docs]class TopicDetection(object):
"""
TopicDetection Class is used to visualize the topics of interest for
a specified date based on the tweets from Twitter for that day
:param date: Date provided in format dict(year=yyyy, month=mm, day=dd)
:param lang: Language (Ar, En, or Es)
:type lang: str
:param country: Two letter country code
:type country: str
"""
def __init__(self, date, lang: str="En", country: str="US",
window: int=2):
self._window = window
self.date = handle_day(date)
self._lang = lang
self._country = country
self._voc = Vocabulary(date, lang=self._lang, country=self._country)
self._prev_date = self.similar_date()
self._prev_voc = Vocabulary(self._prev_date, lang=self._lang, country=self._country)
@property
def window(self):
return self._window
@property
def prev_date(self):
return self._prev_date
@property
def voc(self):
return self._voc
@voc.setter
def voc(self, new_voc):
if not isinstance(new_voc, dict()):
return
self._voc = new_voc
@property
def prev_voc(self):
return self._prev_voc
[docs] def similar_date(self):
"""
Use cosine similarity to return the most similar date from
around the same date in the previous year.
"""
from sklearn.metrics.pairwise import cosine_similarity
from datetime import timedelta
import numpy as np
date = self.date
voc = self._voc
# Get vocabulary of previous dates
prev_voc = []
w = self.window
prev_days = date_range(date - timedelta(days=w), date + timedelta(days=w + 1))
for day in prev_days:
_ = dict(year=day.year - 1, month=day.month, day=day.day)
prev_voc.append(Vocabulary(_, lang=self._lang, country=self._country))
# Create a set containing all unique words in vocabulary
words = set()
for word in voc:
words.update([word])
for word in prev_voc:
words.update(word)
# Map all unique words to an id
w2id = {word: index for index, word in enumerate(words)}
# Use mapping to create vectors for the vocabulary of day of interest
vec = np.zeros(len(w2id))
for word, num in voc.items():
vec[w2id[word]] = num
# Use mapping to create a matrix containing the vectors for the vocabulary
# of all previous dates
vec_matrix = np.zeros((len(prev_voc), len(w2id)))
for day, voc in enumerate(prev_voc):
for word, num in voc.items():
vec_matrix[day, w2id[word]] = num
# Find the most similar day from the year before
cs_matrix = cosine_similarity(np.atleast_2d(vec), vec_matrix)
prev_day = prev_days[cs_matrix[0].argmax()]
return dict(year=prev_day.year-1, month=prev_day.month, day=prev_day.day)
[docs] def topic_wordcloud(self, figname: str="wordcloud"):
"""
Uses WordCloud library to display the topic
Use Laplace Smoothing and compare the vocabs from
the date of interest with vocab from the date of
the year before
"""
from wordcloud import WordCloud as WC
import matplotlib.pyplot as plt
prev_yr_voc = self._prev_voc
prev_day_voc = self._voc.previous_day()
prev_yr_voc.voc.update(prev_day_voc)
self._voc = self.laplace_smoothing(self._voc, prev_yr_voc)
# Create wordcloud
wc = WC().generate_from_frequencies(self._voc)
plt.imshow(wc)
plt.axis('off')
plt.tight_layout()
plt.savefig(figname)
[docs] @staticmethod
def probability(voc) -> dict():
"""
Calculate the probability of each word appearing in vocab
"""
N = sum(list(voc.values()))
prob = {k: v / N for k, v in voc.items()}
return prob
[docs] @staticmethod
def laplace_smoothing(voc1, voc2) -> dict():
"""
Uses Laplace smoothing to handle words that appear in
voc1 but not in voc2
"""
voc1_prob = TopicDetection.probability(voc1.voc)
voc2_prob = TopicDetection.probability(voc2.voc)
N = sum(list(voc2.voc.values()))
V = len(voc2.items())
prob = 1 / (N + V)
updated_voc = dict()
for word, freq in voc1_prob.items():
if word in voc2_prob:
updated_voc[word] = freq / voc2_prob[word]
else:
updated_voc[word] = freq / prob
return updated_voc