from django.core.management.base import BaseCommand, CommandError
# -*- encoding: utf-8 -*-
# from_yaml(cls, loader, node)
from optparse import make_option
import sys
# # #
#
# Questions stuff
#
# # #
from xml.dom import minidom as _dom
from optparse import OptionParser
# from django import db
import sys
import re
import string
import codecs
from itertools import product, combinations
from random import choice
# Some XML shortcuts
_elements = lambda e, x: e.getElementsByTagName(x)
_attribute = lambda e, x: e.getAttribute(x)
def _data(e):
try:
return e.firstChild.data
except AttributeError:
return False
def _firstelement(e, x):
e = _elements(e, x)
try:
return e[0]
except IndexError:
return None
from drill.models import Tagname, Tagset, Form
_boolify = lambda v: True and v.lower() in ['yes', 'true', 'y'] or False
# TODO: for now i assume these don't change throughout the course of the
# operation, but they might.
# TODO: tags install-- need to cat all paradigms together
TAGS = Tagname.objects.all().values_list('tagname', flat=True)
TAGSETS = Tagset.objects.all().values_list('tagset', flat=True)
_T = dict([(t.tagset, t.tagname_set.all().values_list('tagname', flat=True))
for t in Tagset.objects.all()])
# List of tags that agree in. Key-value pairs. Key in head, means agree
# tag must contain one of the items in the list.
# Languages with Subject agreement are easy, languages with object AGREEMENT
# or some other kind of agreement will need to have more key-value
# pairs. or perhaps a slightly different arrangement here...
AGREEMENT = {
'Sg': ['Sg3'],
# TODO: Can agreement be shifted out to xml or yaml configuration somehow, and then be
# set in in questions as something generic?
# 'Du': ['Du3'],
'Pl': ['Pl3'],
'Sg1': ['Sg1'],
'Sg2': ['Sg2'],
'Sg3': ['Sg3'],
# 'Du1': ['Du1'],
# 'Du2': ['Du2'],
# 'Du3': ['Du3'],
'Pl1': ['Pl1'],
'Pl2': ['Pl2'],
'Pl3': ['Pl3'],
'': '',
}
# TODO: Cleaning code thoughts
# SUBJ = elements_d.get('SUBJ') or False
# MAINV = elements_d.get('MAINV') or False
class GrammarDefaults(object):
def expandTags(self, tags):
all_tags = []
for tag in tags:
split = tag.split('+')
tag_items = []
for i in split:
tag_item = []
if i in TAGS: tag_item.append(i)
if i in TAGSETS: tag_item.extend(_T[i])
tag_items.append(tag_item)
tag_items = ['+'.join(a) for a in product(*tag_items)]
all_tags.append(tag_items)
return all_tags
def __init__(self, defaults_node):
self.node = defaults_node
definitions = _firstelement(self.node, 'definitions')
tags = _firstelement(definitions, 'tags')
partitions = _elements(self.node, 'partitions')
tag_elements = _elements(tags, 'element')
grammar_definitions = {}
for element in tag_elements:
elem_id = _attribute(element, 'id')
grammar_definitions[elem_id] = {}
grammars = _elements(element, 'grammar')
word_id = _data(_firstelement(element, 'id'))
tag_list = []
for grammar in grammars:
pos = _attribute(grammar, 'pos')
tag = _attribute(grammar, 'tag')
expanded = sum(self.expandTags([tag]), [])
tag_list.extend(expanded)
if len(tag_list) > 0:
grammar_definitions[elem_id]['tags'] = tag_list
if word_id:
if word_id.strip():
grammar_definitions[elem_id]['lemmas'] = [word_id]
self.grammar_definitions = grammar_definitions
class QObj(GrammarDefaults):
""" Contains methods necessary for testing questions for Morfa-C.
Eventually, this may be used to assemble Question objects and
store them in the database, and then also used to create the
questions in the actual game.
If this is used to store database info, it seems like it Would
almost be better to store tags and semantic types instead of creating
a ton of WordQElements, because just as much sorting would need to be
done to either read from WordQElements as it would be to sort through
forms-- or at least this would be worth testing.
"""
# Question-Answer agreement
QAPN = { 'Sg':'Sg', # Dïhte? Dïhte.
'Pl':'Pl', # Dah? Dah.
'Sg1':'Sg2', # Manne? Datne.
'Sg2':'Sg1', # Datne? Manne.
'Sg3':'Sg3', # Dïhte? Dïhte.
# 'Du1':'Du2', # Månnoeh? Dåtnoeh.
# 'Du2':'Du1', # Dåtnoeh? Månnoeh.
# 'Du3':'Du3', # Dah guaktah? Dah guaktah.
'Pl1':'Pl2', # Mijjieh? Dijjieh.
'Pl2':'Pl1', # Dijjieh? Mijjieh.
'Pl3':'Pl3'} # Dah? Dah.
def handleMeta(self):
""" assign qtypes and question IDs
"""
self.qtype = ','.join([_data(q) for q in _elements(self.node, 'qtype')])
self.qid = _attribute(self.node, 'id')
def parseElements(self, elements):
"""
"""
element_queries = []
for element in elements:
elem_q = {'query': {}}
game, content, task, elem_id, sem, grammar, word_lemma, hid = [None]*8
elem_id = _attribute(element, "id")
task = _boolify(_attribute(element, "task"))
game = _attribute(element, "game")
content = _attribute(element, "content")
elem_q['meta'] = {
'id': elem_id,
'task': task,
'game': game,
}
if content:
elem_q['meta']['content'] = content
sem = _elements(element, 'sem')
if sem:
sem = [_attribute(s, 'class') for s in sem]
if len(sem) > 0:
elem_q['query']['semtypes'] = sem
grammar = _elements(element, 'grammar')
default_lemma = False
if elem_id in self.defaults:
if self.defaults[elem_id].has_key('lemmas'):
default_lemma = self.defaults[elem_id]['lemmas']
else:
default_lemma = False
if self.defaults[elem_id].has_key('tags'):
default_tags = self.defaults[elem_id]['tags']
else:
default_tags = False
if grammar:
g_pos = _attribute(grammar[0], 'pos')
if g_pos:
elem_q['query']['pos'] = g_pos
else:
g_pos = False
tags = [_attribute(c, 'tag') for c in grammar]
tags = [a for a in tags if a.strip()]
if not tags:
if default_tags:
tags = default_tags
expanded_tags = self.expandTags(tags)
expanded_tags = sum(expanded_tags, [])
# Need to insert grammar defaults here.
# If grammar defaults for key exist, use this,
# otherwise...
if expanded_tags:
if g_pos:
t_match = g_pos + '+'
expanded_tags = [t for t in expanded_tags if t_match in t]
elem_q['query']['tags'] = expanded_tags
# grammar tag specified, but grammar pos not.
if tags and not g_pos:
g_pos = tags[0].partition('+')[0]
elem_q['query']['pos'] = g_pos
# errormsg = '*** Grammar tag specified, but Grammar PoS not specified'
# self.errors['self.parseElements'] = [errormsg]
word_lemma = _firstelement(element, 'id')
if default_lemma:
elem_q['query']['lemma'] = default_lemma
elif word_lemma:
lemma, hid = _data(word_lemma), _attribute(word_lemma, 'hid')
if lemma:
elem_q['query']['lemma'] = lemma
if hid:
elem_q['query']['hid'] = int(hid)
element_queries.append((elem_id, elem_q))
return element_queries
def elementizeText(self, text, elements):
"""
>>> q = QObj()
>>> text = "Mika SUBJ MAINV"
>>> elements = [('SUBJ', {}), ('MAINV', {})]
>>> q.elementizeText(text, elements)
[('Mika', None), ('SUBJ', {}), ('MAINV', {})]
"""
tokens = text.split(' ')
new_elements = []
elements_d = dict(elements)
for token in tokens:
if token in elements_d:
new_elements.append((token, elements_d[token]))
else:
new_elements.append((token, None))
return new_elements
def queryElements(self, elements):
element_to_query = {
'tags': 'tag__string',
'semtypes': 'word__semtype__semtype',
'pos': 'word__pos',
'lemma': 'word__lemma',
'hid': 'word__hid',
}
for item, data in elements:
qkwargs = {}
if data:
if data.has_key('query'):
qkwargs = {}
for k, v in data['query'].items():
if type(v) == list:
if len(v) > 0:
v = choice(v)
else:
v = False
elif type(v) == string:
if v.strip():
pass
else:
v = False
if v:
qkwargs[element_to_query[k]] = v
data['qkwargs'] = qkwargs
nocopy = False
if data.has_key('copy'):
if data['copy'] == True:
copies = dict(self.question_elements)[item]
data['wordforms'] = copies['wordforms']
if data.has_key('selected'):
data['selected'] = copies['selected']
else:
data['selected'] = item
else:
nocopy = True
else:
nocopy = True
if nocopy:
data['wordforms'] = wfs = Form.objects.filter(**qkwargs)
wfs = wfs.order_by('?')
try:
data['selected'] = wfs[0]
except:
if not self.NO_ERRORS:
errormsg = 'Query failed\n'
errormsg += 'Question ID: %s\n' % self.qid
errormsg += 'Question element: %s\n' % repr(item)
errormsg += 'Query arguments: %s\n' % repr(qkwargs)
errormsg += 'Zero forms found.\n'
if len(qkwargs.keys()) > 0:
qkw_tup = [(a, b) for a, b in qkwargs.items()]
n_comb = range(1, len(qkw_tup)+1)
query_product = []
for c in n_comb:
for a in combinations(qkw_tup, r=c):
query_product.append(dict(a))
for kp in query_product:
count = Form.objects.filter(**kp).count()
errormsg += ' Subquery: \n'
for partk, partv in kp.items():
errormsg += ' - %s: %s\n' % (partk, partv)
errormsg += ' => Object count: %d\n' % count
self.errors['self.queryElements'] = errormsg.splitlines()
return elements
def elementsToSentence(self, elements, blanks=False):
""" Expects list of tuples, element data with ['wordforms']
"""
# TODO: should just append fullform to data, instead.
# For testing now this is good.
sentence = []
for item, data in elements:
if data:
if data.has_key('wordforms'):
if data.has_key('selected'):
wf = data['selected']
if type(wf) == Form:
if data.has_key('meta'):
if data['meta'].has_key('task'):
if data['meta']['task']:
sentence.append('__')
else:
sentence.append(wf.fullform)
else:
sentence.append(wf.fullform)
else:
sentence.append(item)
else:
sentence.append(item)
else:
sentence.append(item)
return ' '.join(sentence)
def personQA(self, tag):
QA_tags = []
tag_elem = tag.split('+')
new_elems = []
for elem in tag_elem:
if elem in self.QAPN.keys():
elem = self.QAPN[elem]
new_elems.append(elem)
new_elems = '+'.join(new_elems)
return new_elems
def checkSyntax(self, elements):
elements_d = dict(elements)
if elements_d.has_key('SUBJ') and elements_d.has_key('MAINV'):
agr = 'SUBJ'
if elements_d['MAINV']['meta']:
elements_d['MAINV']['meta']['agreement'] = agr
else:
agr = False
# Check for Question-Answer person agreement (see QAPN)
if elements_d.has_key('SUBJ'):
if elements_d['SUBJ'].has_key('copy'):
if elements_d['SUBJ']['copy']:
SUBJ = elements_d.get('SUBJ')
if SUBJ['query']['pos'] == 'Pron':
# TODO: error handling - If this fails, there's something wrong with
# tags.txt or grammar_defaults, tags need to be
# corrected and reinstalled
subj_tags = SUBJ['query']['tags']
# Pop these items so that queryElements gets new forms.
try: SUBJ.pop('wordforms')
except: pass
try: SUBJ.pop('copy')
except: pass
try: SUBJ.pop('selected')
except: pass
SUBJ['query']['tags'] = [self.personQA(subj_tags)]
else:
pass
elements_d['SUBJ'] = SUBJ
# TODO: Merge this with subj part of Q-A person thing?
if elements_d.has_key('MAINV'):
if elements_d['MAINV'].has_key('copy'):
if elements_d['MAINV']['copy']:
MAINV = elements_d.get('MAINV')
if MAINV['query']['pos'] == 'V':
mainv_tags = MAINV['query']['tags']
try: MAINV.pop('wordforms')
except: pass
try: MAINV.pop('copy')
except: pass
try: MAINV.pop('selected')
except: pass
MAINV['query']['tags'] = [self.personQA(mainv_tags)]
elements_d['MAINV'] = MAINV
elements_reorder = []
for a, v in elements:
elements_reorder.append((a, elements_d[a]))
return elements_reorder
def selectItems(self, elements):
elements_d = dict(elements)
agreement = False
# Find agreement
for elem_id, elem_data in elements_d.items():
if elem_data:
if elem_data.has_key('meta'):
if elem_data['meta'].has_key('agreement'):
agreement = (elem_data['meta']['agreement'], elem_id) # SUBJ, MAINV
# If there's agreement, strip non-agreeing tags.
if agreement:
head_tag = ''
agreement_head = agreement[0]
agreeing_item = agreement[1]
try:
head = elements_d[agreement_head]
except KeyError:
# Likely cause of exception here is that the question
# had a SUBJ element, but the answer does not contain
# this element.
head = False
agree = elements_d[agreeing_item]
if head:
if head.has_key('query'):
if head['query'].has_key('tags'):
head['query']['tags'] = head_tag = choice(head['query']['tags'])
head_agr = ''.join([a for a in head_tag.split('+') if a in AGREEMENT.keys()])
# TODO: if a question is part of grammar_defaults but ends
# up without tags, an error happens here. This is something
# that should be added to error logging.
if agree.has_key('query'):
if agree['query'].has_key('tags'):
agr_match = AGREEMENT[head_agr]
allowed = []
for a in agree['query']['tags']:
for b in agr_match:
if b in a:
allowed.append(a)
agree['query']['tags'] = allowed
elements_d[agreement_head] = head
elements_d[agreeing_item] = agree
# Choose random tag
for elem_id, elem_data in elements_d.items():
if elem_data:
e_data = elem_data.copy()
if e_data.has_key('query'):
for k, v in e_data['query'].items():
if type(v) == list:
if len(v) > 0:
random_query = choice(v)
k_s = k.replace('__in', '')
e_data['query'][k_s] = random_query
if k_s != k:
e_data['query'][k] = ''
elements_d[elem_id] = e_data
elements_reorder = []
for a, v in elements:
elements_reorder.append((a, elements_d[a]))
return elements_reorder
def handleQuestions(self):
question = _firstelement(self.node, 'question')
text = _data(_firstelement(question, 'text'))
elements = _elements(question, 'element')
pelements = self.parseElements(elements)
# TODO: Is this where we have to stop in order to use this class to
# fill the database? Would need to create QElement and
# WordQElements of all possible elements, so they can't be
# trimmed or reduced to reflect element selections and agreement
# Skip syntax and trimming steps, then query; which should
# return all possible elements, then can begin creating Question
# objects
text_with_elements = self.elementizeText(text, pelements)
# Check for agreement
syntax_text = self.checkSyntax(text_with_elements)
query_elements = self.selectItems(syntax_text)
queried_elements = self.queryElements(query_elements)
sentence_text = self.elementsToSentence(queried_elements)
self.question_elements = queried_elements
self.question_text = sentence_text + '?'
def copyQuestion(self, aelements):
aelements_d = dict(aelements)
copy_elements = {}
for k, v in aelements_d.items():
if not v:
copied = dict(self.question_elements).get(k)
if copied:
copied['copy'] = True
else:
copied = v
copied['copy'] = False
copy_elements[k] = copied
aelements_copied = []
for a, v in aelements:
aelements_copied.append((a, copy_elements[a]))
return aelements_copied
def selectTask(self, elements):
""" Takes a list of elements, and returns selects the task.
This should occur after the queries phase.
"""
for element_id, element_data in elements:
if element_data:
if element_data.has_key('meta'):
if element_data['meta'].has_key('task'):
if element_data['meta']['task']:
return dict([(element_id, element_data)])
return False
def handleAnswers(self):
answers = _elements(self.node, 'answer')
# TODO: There is a forloop here, but this actually
# only stores whatever question comes last in the loop.
class Answer(object):
pass
self.answers = []
for answer in answers:
text = _data(_firstelement(answer, 'text'))
elements = _elements(answer, 'element')
pelements = self.parseElements(elements)
text_with_elements = self.elementizeText(text, pelements)
answer_elements = self.copyQuestion(text_with_elements)
# Is this where we have to stop in order to use this class to
# fill the database? Would need to create QElement and
# WordQElements of all possible elements, so they can't be
# trimmed or reduced to reflect element selections and agreement
# Check for agreement, and also Q-A person changes
syntax_text = self.checkSyntax(answer_elements)
query_elements = self.selectItems(syntax_text)
queried_elements = self.queryElements(query_elements)
self.question_query_elements = queried_elements
sentence_text = self.elementsToSentence(queried_elements)
sentence_text_blank = self.elementsToSentence(queried_elements, blanks=True)
answer = Answer()
answer.task = self.selectTask(queried_elements)
answer.answer_elements = queried_elements
answer.answer_full_text = sentence_text + '.'
answer.answer_text_blank = sentence_text_blank + '.'
self.answers.append(answer)
def reselect(self):
""" Selects a new iteration of the same question.
"""
# TODO: handleAnswers needs to set attributes for all steps,
# uff.
pass
def requery(self):
""" Reruns the queries, and selects a new iteration.
"""
pass
def __init__(self, q_node, grammar_defaults=False):
self.errors = {}
self.NO_ERRORS = False
if grammar_defaults:
self.defaults = grammar_defaults.grammar_definitions
else:
defaults_file = file('data_sma/meta/grammar_defaults.xml')
defaults_tree = _dom.parse(defaults_file)
self.defaults = GrammarDefaults(defaults_tree).grammar_definitions
self.node = q_node
self.handleMeta()
self.handleQuestions()
self.handleAnswers()
# # #
#
# Command class
#
# # #
class FileLog(object):
def __init__(self, fname):
self.loglines = []
if fname:
self.fname = fname
self.logfile = open(fname, 'w')
def log(self, string, pipe=False):
if not string.endswith('\n'):
string += '\n'
try:
string = string.encode('utf-8')
except UnicodeEncodeError:
pass
if self.logfile:
self.logfile.write(string)
else:
self.loglines.append(string)
if not pipe:
pipe = sys.stderr
print >> pipe, string.rstrip('\n')
return
class Command(BaseCommand):
args = '--grammarfile FILE --questionfile FILE --qid QID'
help = """
Runs through a question XML file and produces test sentences.
Errors are printed to stderr, so that the rest can be filtered out.
Example command:
./manage.py testquestions --grammarfile grammar_defaults.xml \\
--questionfile noun_questions.xml \\
--logfile accusative_errors.log \\
--iterations 3 \\
--qid acc#
"""
option_list = BaseCommand.option_list + (
make_option("-g", "--grammarfile", dest="grammarfile", default=False,
help="XML-file for grammar defaults for questions"),
make_option("-q", "--questionfile", dest="questionfile", default=False,
help="XML-file that contains questions"),
make_option("--qid", dest="qid", default=False,
help="Specify a list of IDs to test with commas and no spaces, or specify a partial part of an id to filter questions by, e.g. ill1,ill2 OR ill#; note the wildcard symbol."),
make_option("--iterations", dest="itercount", default=5,
help="The count of iterations for each question"),
make_option("--logfile", dest="logfile", default=False,
help="Store all output to a file in addition to stdout."),
# TODO: question iterations count
)
def handle(self, *args, **options):
import sys, os
qpath = options['questionfile']
gpath = options['grammarfile']
iterations = int(options['itercount'])
test_qid = options['qid']
logfile = options['logfile']
if logfile:
log = FileLog(logfile)
else:
log = FileLog(None)
if not qpath:
print 'Question file required.'
if not gpath:
print 'Grammar file required.'
if not qpath and not gpath:
sys.exit(2)
defaults_file = file(gpath)
defaults_tree = _dom.parse(defaults_file)
defaults = GrammarDefaults(defaults_tree)
questionfile = open(qpath)
tree = _dom.parse(questionfile)
tree = _elements(tree, 'q')
if test_qid:
test_nodes = []
test_qid = test_qid.split(',')
for node in tree:
qid = _attribute(node, 'id')
for tid in test_qid:
if tid.find('#') > -1:
tid = tid.replace('#', '')
if tid in qid:
test_nodes.append(node)
else:
if tid == qid:
test_nodes.append(node)
tree = test_nodes
_OUT = sys.stdout
_ERR = sys.stderr
for q_node in tree:
q = QObj(q_node, grammar_defaults=defaults)
log.log(' == QUESTION: %s ==' % q.qid, _OUT)
for iteration in range(iterations):
c = iteration + 1
log.log(' - %d' % c, _OUT)
error = False
baseform = False
for answer in q.answers:
try:
qword = answer.task.values()[0]['selected'].getBaseform()
qword = qword.fullform
except Form.DoesNotExist:
qword = 'NO FORM'
baseform = answer.task.values()[0]['selected'].word
except:
qword = 'TASK'
finally:
error = True
log.log(' Q: ' + u'%s (%s)' % (q.question_text, qword), _OUT)
log.log(' A: ' + u'%s' % answer.answer_text_blank, _OUT)
try:
aword = answer.task.values()[0]['selected']
aword = aword.fullform
except:
aword = 'TASK'
finally:
error = True
log.log(' - %s' % aword, _OUT)
if error:
if baseform:
log.log(' *** Baseform does not exist for <%s>' % baseform.lemma)
if len(q.errors.keys()) > 0:
for k, v in q.errors.items():
log.log(' *** Error in %s' % k, _ERR)
indent = ' '
log.log(''.join([indent + a + '\n' for a in v]), _ERR)
q = QObj(q_node, grammar_defaults=defaults)