from django.core.management.base import BaseCommand, CommandError # -*- encoding: utf-8 -*- # from_yaml(cls, loader, node) from optparse import make_option import sys # # # # # Questions stuff # # # # from xml.dom import minidom as _dom from optparse import OptionParser # from django import db import sys import re import string import codecs from itertools import product, combinations from random import choice # Some XML shortcuts _elements = lambda e, x: e.getElementsByTagName(x) _attribute = lambda e, x: e.getAttribute(x) def _data(e): try: return e.firstChild.data except AttributeError: return False def _firstelement(e, x): e = _elements(e, x) try: return e[0] except IndexError: return None from smadrill.models import Tagname, Tagset, Form _boolify = lambda v: True and v.lower() in ['yes', 'true', 'y'] or False # TODO: for now i assume these don't change throughout the course of the # operation, but they might. # TODO: tags install-- need to cat all paradigms together TAGS = Tagname.objects.all().values_list('tagname', flat=True) TAGSETS = Tagset.objects.all().values_list('tagset', flat=True) _T = dict([(t.tagset, t.tagname_set.all().values_list('tagname', flat=True)) for t in Tagset.objects.all()]) # List of tags that agree in. Key-value pairs. Key in head, means agree # tag must contain one of the items in the list. # Languages with Subject agreement are easy, languages with object AGREEMENT # or some other kind of agreement will need to have more key-value # pairs. or perhaps a slightly different arrangement here... AGREEMENT = { 'Sg': ['Sg3'], 'Du': ['Du3'], 'Pl': ['Pl3'], 'Sg1': ['Sg1'], 'Sg2': ['Sg2'], 'Sg3': ['Sg3'], 'Du1': ['Du1'], 'Du2': ['Du2'], 'Du3': ['Du3'], 'Pl1': ['Pl1'], 'Pl2': ['Pl2'], 'Pl3': ['Pl3'], '': '', } # TODO: Cleaning code thoughts # SUBJ = elements_d.get('SUBJ') or False # MAINV = elements_d.get('MAINV') or False class GrammarDefaults(object): def expandTags(self, tags): all_tags = [] for tag in tags: split = tag.split('+') tag_items = [] for i in split: tag_item = [] if i in TAGS: tag_item.append(i) if i in TAGSETS: tag_item.extend(_T[i]) tag_items.append(tag_item) tag_items = ['+'.join(a) for a in product(*tag_items)] all_tags.append(tag_items) return all_tags def __init__(self, defaults_node): self.node = defaults_node definitions = _firstelement(self.node, 'definitions') tags = _firstelement(definitions, 'tags') partitions = _elements(self.node, 'partitions') tag_elements = _elements(tags, 'element') grammar_definitions = {} for element in tag_elements: elem_id = _attribute(element, 'id') grammar_definitions[elem_id] = {} grammars = _elements(element, 'grammar') word_id = _data(_firstelement(element, 'id')) tag_list = [] for grammar in grammars: pos = _attribute(grammar, 'pos') tag = _attribute(grammar, 'tag') expanded = sum(self.expandTags([tag]), []) tag_list.extend(expanded) if len(tag_list) > 0: grammar_definitions[elem_id]['tags'] = tag_list if word_id: if word_id.strip(): grammar_definitions[elem_id]['lemmas'] = [word_id] self.grammar_definitions = grammar_definitions class QObj(GrammarDefaults): """ Contains methods necessary for testing questions for Morfa-C. Eventually, this may be used to assemble Question objects and store them in the database, and then also used to create the questions in the actual game. If this is used to store database info, it seems like it Would almost be better to store tags and semantic types instead of creating a ton of WordQElements, because just as much sorting would need to be done to either read from WordQElements as it would be to sort through forms-- or at least this would be worth testing. """ # Question-Answer agreement QAPN = { 'Sg':'Sg', # Dïhte? Dïhte. 'Pl':'Pl', # Dah? Dah. 'Sg1':'Sg2', # Manne? Datne. 'Sg2':'Sg1', # Datne? Manne. 'Sg3':'Sg3', # Dïhte? Dïhte. 'Du1':'Du2', # Månnoeh? Dåtnoeh. 'Du2':'Du1', # Dåtnoeh? Månnoeh. 'Du3':'Du3', # Dah guaktah? Dah guaktah. 'Pl1':'Pl2', # Mijjieh? Dijjieh. 'Pl2':'Pl1', # Dijjieh? Mijjieh. 'Pl3':'Pl3'} # Dah? Dah. def handleMeta(self): """ assign qtypes and question IDs """ self.qtype = ','.join([_data(q) for q in _elements(self.node, 'qtype')]) self.qid = _attribute(self.node, 'id') def parseElements(self, elements): """ """ element_queries = [] for element in elements: elem_q = {'query': {}} game, content, task, elem_id, sem, grammar, word_lemma, hid = [None]*8 elem_id = _attribute(element, "id") task = _boolify(_attribute(element, "task")) game = _attribute(element, "game") content = _attribute(element, "content") elem_q['meta'] = { 'id': elem_id, 'task': task, 'game': game, } if content: elem_q['meta']['content'] = content sem = _elements(element, 'sem') if sem: sem = [_attribute(s, 'class') for s in sem] if len(sem) > 0: elem_q['query']['semtypes'] = sem grammar = _elements(element, 'grammar') default_lemma = False if elem_id in self.defaults: if self.defaults[elem_id].has_key('lemmas'): default_lemma = self.defaults[elem_id]['lemmas'] else: default_lemma = False if self.defaults[elem_id].has_key('tags'): default_tags = self.defaults[elem_id]['tags'] else: default_tags = False if grammar: g_pos = _attribute(grammar[0], 'pos') if g_pos: elem_q['query']['pos'] = g_pos else: g_pos = False tags = [_attribute(c, 'tag') for c in grammar] tags = [a for a in tags if a.strip()] if not tags: if default_tags: tags = default_tags expanded_tags = self.expandTags(tags) expanded_tags = sum(expanded_tags, []) # Need to insert grammar defaults here. # If grammar defaults for key exist, use this, # otherwise... if expanded_tags: if g_pos: t_match = g_pos + '+' expanded_tags = [t for t in expanded_tags if t_match in t] elem_q['query']['tags'] = expanded_tags # grammar tag specified, but grammar pos not. if tags and not g_pos: g_pos = tags[0].partition('+')[0] elem_q['query']['pos'] = g_pos # errormsg = '*** Grammar tag specified, but Grammar PoS not specified' # self.errors['self.parseElements'] = [errormsg] word_lemma = _firstelement(element, 'id') if default_lemma: elem_q['query']['lemma'] = default_lemma elif word_lemma: lemma, hid = _data(word_lemma), _attribute(word_lemma, 'hid') if lemma: elem_q['query']['lemma'] = lemma if hid: elem_q['query']['hid'] = int(hid) element_queries.append((elem_id, elem_q)) return element_queries def elementizeText(self, text, elements): """ >>> q = QObj() >>> text = "Mika SUBJ MAINV" >>> elements = [('SUBJ', {}), ('MAINV', {})] >>> q.elementizeText(text, elements) [('Mika', None), ('SUBJ', {}), ('MAINV', {})] """ tokens = text.split(' ') new_elements = [] elements_d = dict(elements) for token in tokens: if token in elements_d: new_elements.append((token, elements_d[token])) else: new_elements.append((token, None)) return new_elements def queryElements(self, elements): element_to_query = { 'tags': 'tag__string', 'semtypes': 'word__semtype__semtype', 'pos': 'word__pos', 'lemma': 'word__lemma', 'hid': 'word__hid', } for item, data in elements: qkwargs = {} if data: if data.has_key('query'): qkwargs = {} for k, v in data['query'].items(): if type(v) == list: if len(v) > 0: v = choice(v) else: v = False elif type(v) == string: if v.strip(): pass else: v = False if v: qkwargs[element_to_query[k]] = v nocopy = False if data.has_key('copy'): if data['copy'] == True: copies = dict(self.question_elements)[item] data['wordforms'] = copies['wordforms'] if data.has_key('selected'): data['selected'] = copies['selected'] else: data['selected'] = item else: nocopy = True else: nocopy = True if nocopy: data['wordforms'] = wfs = Form.objects.filter(**qkwargs) wfs = wfs.order_by('?') try: data['selected'] = wfs[0] except: if not self.NO_ERRORS: errormsg = 'Query failed\n' errormsg += 'Question ID: %s\n' % self.qid errormsg += 'Question element: %s\n' % repr(item) errormsg += 'Query arguments: %s\n' % repr(qkwargs) errormsg += 'Zero forms found.\n' if len(qkwargs.keys()) > 0: qkw_tup = [(a, b) for a, b in qkwargs.items()] n_comb = range(1, len(qkw_tup)+1) query_product = [] for c in n_comb: for a in combinations(qkw_tup, r=c): query_product.append(dict(a)) for kp in query_product: count = Form.objects.filter(**kp).count() errormsg += ' Subquery: \n' for partk, partv in kp.items(): errormsg += ' - %s: %s\n' % (partk, partv) errormsg += ' => Object count: %d\n' % count self.errors['self.queryElements'] = errormsg.splitlines() return elements def elementsToSentence(self, elements, blanks=False): """ Expects list of tuples, element data with ['wordforms'] """ # TODO: should just append fullform to data, instead. # For testing now this is good. sentence = [] for item, data in elements: if data: if data.has_key('wordforms'): if data.has_key('selected'): wf = data['selected'] if type(wf) == Form: if data.has_key('meta'): if data['meta'].has_key('task'): if data['meta']['task']: sentence.append('__') else: sentence.append(wf.fullform) else: sentence.append(wf.fullform) else: sentence.append(item) else: sentence.append(item) else: sentence.append(item) return ' '.join(sentence) def personQA(self, tag): QA_tags = [] tag_elem = tag.split('+') new_elems = [] for elem in tag_elem: if elem in self.QAPN.keys(): elem = self.QAPN[elem] new_elems.append(elem) new_elems = '+'.join(new_elems) return new_elems def checkSyntax(self, elements): elements_d = dict(elements) if elements_d.has_key('SUBJ') and elements_d.has_key('MAINV'): agr = 'SUBJ' if elements_d['MAINV']['meta']: elements_d['MAINV']['meta']['agreement'] = agr else: agr = False # Check for Question-Answer person agreement (see QAPN) if elements_d.has_key('SUBJ'): if elements_d['SUBJ'].has_key('copy'): if elements_d['SUBJ']['copy']: SUBJ = elements_d.get('SUBJ') if SUBJ['query']['pos'] == 'Pron': # TODO: error handling - If this fails, there's something wrong with # tags.txt or grammar_defaults, tags need to be # corrected and reinstalled subj_tags = SUBJ['query']['tags'] # Pop these items so that queryElements gets new forms. try: SUBJ.pop('wordforms') except: pass try: SUBJ.pop('copy') except: pass try: SUBJ.pop('selected') except: pass SUBJ['query']['tags'] = [self.personQA(subj_tags)] elements_d['SUBJ'] = SUBJ elements_reorder = [] for a, v in elements: elements_reorder.append((a, elements_d[a])) return elements_reorder def selectItems(self, elements): elements_d = dict(elements) agreement = False # Find agreement for elem_id, elem_data in elements_d.items(): if elem_data: if elem_data.has_key('meta'): if elem_data['meta'].has_key('agreement'): agreement = (elem_data['meta']['agreement'], elem_id) # SUBJ, MAINV # If there's agreement, strip non-agreeing tags. if agreement: head_tag = '' agreement_head = agreement[0] agreeing_item = agreement[1] try: head = elements_d[agreement_head] except KeyError: # Likely cause of exception here is that the question # had a SUBJ element, but the answer does not contain # this element. head = False agree = elements_d[agreeing_item] if head: if head.has_key('query'): if head['query'].has_key('tags'): head['query']['tags'] = head_tag = choice(head['query']['tags']) head_agr = ''.join([a for a in head_tag.split('+') if a in AGREEMENT.keys()]) # TODO: if a question is part of grammar_defaults but ends # up without tags, an error happens here. This is something # that should be added to error logging. if agree.has_key('query'): if agree['query'].has_key('tags'): agr_match = AGREEMENT[head_agr] allowed = [] for a in agree['query']['tags']: for b in agr_match: if b in a: allowed.append(a) agree['query']['tags'] = allowed elements_d[agreement_head] = head elements_d[agreeing_item] = agree # Choose random tag for elem_id, elem_data in elements_d.items(): if elem_data: e_data = elem_data.copy() if e_data.has_key('query'): for k, v in e_data['query'].items(): if type(v) == list: if len(v) > 0: random_query = choice(v) k_s = k.replace('__in', '') e_data['query'][k_s] = random_query if k_s != k: e_data['query'][k] = '' elements_d[elem_id] = e_data elements_reorder = [] for a, v in elements: elements_reorder.append((a, elements_d[a])) return elements_reorder def handleQuestions(self): question = _firstelement(self.node, 'question') text = _data(_firstelement(question, 'text')) elements = _elements(question, 'element') pelements = self.parseElements(elements) # TODO: Is this where we have to stop in order to use this class to # fill the database? Would need to create QElement and # WordQElements of all possible elements, so they can't be # trimmed or reduced to reflect element selections and agreement # Skip syntax and trimming steps, then query; which should # return all possible elements, then can begin creating Question # objects text_with_elements = self.elementizeText(text, pelements) # Check for agreement syntax_text = self.checkSyntax(text_with_elements) query_elements = self.selectItems(syntax_text) queried_elements = self.queryElements(query_elements) sentence_text = self.elementsToSentence(queried_elements) self.question_elements = queried_elements self.question_text = sentence_text + '?' def copyQuestion(self, aelements): aelements_d = dict(aelements) copy_elements = {} for k, v in aelements_d.items(): if not v: copied = dict(self.question_elements).get(k) if copied: copied['copy'] = True else: copied = v copied['copy'] = False copy_elements[k] = copied aelements_copied = [] for a, v in aelements: aelements_copied.append((a, copy_elements[a])) return aelements_copied def selectTask(self, elements): """ Takes a list of elements, and returns selects the task. This should occur after the queries phase. """ for element_id, element_data in elements: if element_data: if element_data.has_key('meta'): if element_data['meta'].has_key('task'): if element_data['meta']['task']: return dict([(element_id, element_data)]) return False def handleAnswers(self): answers = _elements(self.node, 'answer') # TODO: There is a forloop here, but this actually # only stores whatever question comes last in the loop. class Answer(object): pass self.answer_set = [] for answer in answers: text = _data(_firstelement(answer, 'text')) elements = _elements(answer, 'element') pelements = self.parseElements(elements) text_with_elements = self.elementizeText(text, pelements) answer_elements = self.copyQuestion(text_with_elements) # Is this where we have to stop in order to use this class to # fill the database? Would need to create QElement and # WordQElements of all possible elements, so they can't be # trimmed or reduced to reflect element selections and agreement # Check for agreement, and also Q-A person changes syntax_text = self.checkSyntax(answer_elements) query_elements = self.selectItems(syntax_text) queried_elements = self.queryElements(query_elements) sentence_text = self.elementsToSentence(queried_elements) sentence_text_blank = self.elementsToSentence(queried_elements, blanks=True) answer = Answer() answer.task = self.selectTask(queried_elements) answer.answer_elements = queried_elements answer.answer_full_text = sentence_text + '.' answer.answer_text_blank = sentence_text_blank + '.' self.answer_set.append(answer) def reselect(self): """ Selects a new iteration of the same question. """ # TODO: handleAnswers needs to set attributes for all steps, # uff. pass def requery(self): """ Reruns the queries, and selects a new iteration. """ pass def __init__(self, q_node, grammar_defaults=False): self.errors = {} self.NO_ERRORS = False if grammar_defaults: self.defaults = grammar_defaults.grammar_definitions else: defaults_file = file('data_sma/meta/grammar_defaults.xml') defaults_tree = _dom.parse(defaults_file) self.defaults = GrammarDefaults(defaults_tree).grammar_definitions self.node = q_node self.handleMeta() self.handleQuestions() self.handleAnswers() # # # # # Command class # # # # class FileLog(object): def __init__(self, fname): self.loglines = [] if fname: self.fname = fname self.logfile = open(fname, 'w') def log(self, string, pipe=False): if not string.endswith('\n'): string += '\n' try: string = string.encode('utf-8') except UnicodeEncodeError: pass if self.logfile: self.logfile.write(string) else: self.loglines.append(string) if not pipe: pipe = sys.stderr print >> pipe, string.rstrip('\n') return class Command(BaseCommand): args = '--grammarfile FILE --questionfile FILE --qid QID' help = """ Runs through a question XML file and produces test sentences. Errors are printed to stderr, so that the rest can be filtered out. Example command: ./manage.py testquestions --grammarfile grammar_defaults.xml \\ --questionfile noun_questions.xml \\ --logfile accusative_errors.log \\ --iterations 3 \\ --qid acc# """ option_list = BaseCommand.option_list + ( make_option("-g", "--grammarfile", dest="grammarfile", default=False, help="XML-file for grammar defaults for questions"), make_option("-q", "--questionfile", dest="questionfile", default=False, help="XML-file that contains questions"), make_option("--qid", dest="qid", default=False, help="Specify a list of IDs to test with commas and no spaces, or specify a partial part of an id to filter questions by, e.g. ill1,ill2 OR ill#; note the wildcard symbol."), make_option("--iterations", dest="itercount", default=5, help="The count of iterations for each question"), make_option("--logfile", dest="logfile", default=False, help="Store all output to a file in addition to stdout."), # TODO: question iterations count ) def handle(self, *args, **options): import sys, os qpath = options['questionfile'] gpath = options['grammarfile'] iterations = int(options['itercount']) test_qid = options['qid'] logfile = options['logfile'] if logfile: log = FileLog(logfile) else: log = FileLog(None) if not qpath: print 'Question file required.' if not gpath: print 'Grammar file required.' if not qpath and not gpath: sys.exit(2) defaults_file = file(gpath) defaults_tree = _dom.parse(defaults_file) defaults = GrammarDefaults(defaults_tree) questionfile = open(qpath) tree = _dom.parse(questionfile) tree = _elements(tree, 'q') if test_qid: test_nodes = [] test_qid = test_qid.split(',') for node in tree: qid = _attribute(node, 'id') for tid in test_qid: if tid.find('#') > -1: tid = tid.replace('#', '') if tid in qid: test_nodes.append(node) else: if tid == qid: test_nodes.append(node) tree = test_nodes _OUT = sys.stdout _ERR = sys.stderr for q_node in tree: q = QObj(q_node, grammar_defaults=defaults) log.log(' == QUESTION: %s ==' % q.qid, _OUT) for iteration in range(iterations): c = iteration + 1 log.log(' - %d' % c, _OUT) error = False baseform = False for answer in q.answer_set: try: qword = answer.task.values()[0]['selected'].getBaseform() qword = qword.fullform except Form.DoesNotExist: qword = 'NO FORM' baseform = answer.task.values()[0]['selected'].word except: qword = 'TASK' finally: error = True log.log(' Q: ' + u'%s (%s)' % (q.question_text, qword), _OUT) log.log(' A: ' + u'%s' % answer.answer_text_blank, _OUT) try: aword = answer.task.values()[0]['selected'] aword = aword.fullform except: aword = 'TASK' finally: error = True log.log(' - %s' % aword, _OUT) if error: if baseform: log.log(' *** Baseform does not exist for <%s>' % baseform.lemma) if len(q.errors.keys()) > 0: for k, v in q.errors.items(): log.log(' *** Error in %s' % k, _ERR) indent = ' ' log.log(''.join([indent + a + '\n' for a in v]), _ERR) q = QObj(q_node, grammar_defaults=defaults)