"""zymb.testing -- unit tests for a bunch of zymb modules. To run these tests, start an interactive Python shell and type: import zymb.testing zymb.testing.run() Or, from a Unix shell: python -c 'import zymb.testing; zymb.testing.run()' """ import sys import re import unittest import logging import sched import tcp, xmlagent, xmldata import jabber.client import jabber.interface import jabber.rpcdata import jabber.rpc import jabber.discodata import jabber.dataform import xml.parsers.expat class AgentHandler(logging.Handler): def __init__(self, logbuf): logging.Handler.__init__(self) self.agent = logbuf def emit(self, record): msg = self.format(record) self.agent.handle(msg) class LoggingBuffer: alsoprint = False def __init__(self): rootlogger = logging.getLogger('') for han in rootlogger.handlers: rootlogger.removeHandler(han) rootlogger.setLevel(logging.DEBUG) roothandler = AgentHandler(self) rootform = logging.Formatter('%(levelname)-8s: (%(name)s) %(message)s') roothandler.setFormatter(rootform) rootlogger.addHandler(roothandler) self.log = [] rootlogger.info('-' * 55) self.errors = [] def clear(self): self.log = [] def handle(self, msg): if (self.alsoprint): print msg self.log.append(msg) def handleerror(self, ex, ag): self.errors.append(ex) def getagentstates(self, ag): ls = [] agname = str(ag) regex = re.compile('in state "([^"]*)"') for msg in self.log: if (agname in msg): match = regex.search(msg) if (match): ls.append(match.group(1)) return ls def getdologs(self): ls = [] for msg in self.log: pos = msg.find('DOLOG ') if (pos >= 0): ls.append(msg[pos+6:]) return ls def geterrors(self): return self.errors def schedloop(): while 1: res = sched.process(None) if (not res): break def dolog(agent, msg, *args): agent.log.info('DOLOG '+msg, *args) def assertnoaccumulate(agent, stanza): if (agent.xmlparse.result.getcontents()): raise Exception('nodes are accumulating') def waitforquit(listenagent, cliagent, msg): if (msg == 'stop'): cliagent.stop() return if (msg == 'quit'): cliagent.stop() listenagent.stop() return if (msg == 'stopall'): sched.stopall() return def acceptwaitquit(listenag, sock, host, port): cl = tcp.TCP(host, port, sock) cl.addhandler('handle', waitforquit, listenag, cl) cl.start() def acceptreflect(sock, host, port): cl = tcp.TCP(host, port, sock) cl.addhandler('handle', cl.send) cl.start() def acceptreflectmost(sock, host, port): def sendback(cl, dat): if (len(dat) > 10): dat = str(len(dat)) cl.send(dat) cl = tcp.TCP(host, port, sock) cl.addhandler('handle', sendback, cl) cl.start() class Pocket: def __init__(self): self.agent = None def set(self, agent): self.agent = agent agent.addhandler('closed', dolog, agent, 'closedpocket') def stop(self): self.agent.stop() def acceptpocket(pocket, sock, host, port): cl = tcp.TCP(host, port, sock) pocket.set(cl) cl.start() def acceptbriefly(listenag, sock, host, port): cl = tcp.TCP(host, port, sock) cl.start() cl.addtimer(cl.stop, delay=0.1) cl.addhandler('end', listenag.stop) class TestSelfJumperAgent(sched.Agent): def __init__(self): sched.Agent.__init__(self) self.addhandler('start', self.starthandler) self.addhandler('s1', self.handler1) self.addhandler('s2', self.handler2) def starthandler(self): self.jump('s1') def handler1(self): self.jump('s2') def handler2(self): self.jump('s3') class TestAssertAgent(sched.Agent): def __init__(self): sched.Agent.__init__(self) self.addhandler('start', self.starthandler) def starthandler(self): assert 0, 'assertion' class TestTimerAgent(sched.Agent): def __init__(self): sched.Agent.__init__(self) self.counter = 0 self.addhandler('start', self.starthandler) def starthandler(self): self.addtimer(self.increment, delay=0.05) def increment(self): self.counter += 1 self.jump('s'+str(self.counter)) if (self.counter < 4): self.addtimer(self.increment, delay=0.05) class TestPeriodicTimerAgent(sched.Agent): def __init__(self, testtype): sched.Agent.__init__(self) self.counter = 0 self.timeraction = None self.testtype = testtype self.addhandler('start', self.starthandler) def starthandler(self): if (self.testtype): ac = sched.PeriodicAction(0.05, self.increment) self.timeraction = self.addtimer(ac) else: self.timeraction = self.addtimer(self.increment, delay=0.05, periodic=True) def increment(self): self.counter += 1 self.jump('s'+str(self.counter)) if (self.counter >= 4): self.timeraction.remove() class TestStringListAgent(sched.Agent): def __init__(self, ls, thenend=False): sched.Agent.__init__(self) self.list = ls[:] self.thenend = thenend self.addhandler('start', self.starthandler) def starthandler(self): for st in self.list: self.perform('handle', st) if (self.thenend): self.stop() # ---- class TestSched(unittest.TestCase): def test_startstop(self): logbuf = LoggingBuffer() schedloop() def test_startagent(self): logbuf = LoggingBuffer() ag = sched.Agent() ag.addhandler('error', logbuf.handleerror) ag.start() schedloop() ls = logbuf.getagentstates(ag) self.assertEqual(ls, ['start']) self.assert_(not logbuf.geterrors()) def test_actionargs(self): logbuf = LoggingBuffer() ag = sched.Agent() ag.addhandler('error', logbuf.handleerror) ag.addhandler('log', dolog, ag) ag.addhandler('logfix', dolog, ag, 'logfixer') ag.addhandler('logplus', dolog, ag, 'logplus-%s') ag.addhandler('middle', dolog, ag, 'inmiddle') ag.addhandler('late', dolog, ag, 'inlate %s %s', 'one') ag.start() ag.perform('log', 'hello') ag.perform('log', 'percent %s %d', 'cheese', 5) ag.perform('logfix') ag.perform('logplus', 'more') ag.jump('middle') ag.jump('late', 'two') schedloop() ls = logbuf.getagentstates(ag) self.assertEqual(ls, ['start', 'middle', 'late']) ls = logbuf.getdologs() self.assertEqual(ls, [ 'hello', 'percent cheese 5', 'logfixer', 'logplus-more', 'inmiddle', 'inlate one two']) self.assert_(not logbuf.geterrors()) def test_handlerremoval(self): logbuf = LoggingBuffer() ag = sched.Agent() ag.addhandler('error', logbuf.handleerror) ac1 = ag.addhandler('log', dolog, ag, 'one %s') ac2 = ag.addhandler('log', dolog, ag, 'two %s') ag.start() ag.perform('log', 'A') ag.perform('log', 'B') schedloop() ac1.remove() ag.perform('log', 'C') schedloop() ac2.remove() ag.perform('log', 'D') schedloop() ac3 = ag.addhandler('log', dolog, ag, 'three %s') ag.perform('log', 'E') ac3.remove() schedloop() ls = logbuf.getdologs() self.assertEqual(ls, ['two A', 'one A', 'two B', 'one B', 'two C']) self.assert_(not logbuf.geterrors()) def test_selfjumperagent(self): logbuf = LoggingBuffer() ag = TestSelfJumperAgent() ag.addhandler('error', logbuf.handleerror) ag.start() schedloop() ls = logbuf.getagentstates(ag) self.assertEqual(ls, ['start', 's1', 's2', 's3']) self.assert_(not logbuf.geterrors()) def test_timeragent(self): logbuf = LoggingBuffer() ag = TestTimerAgent() ag.addhandler('error', logbuf.handleerror) ag.start() schedloop() ls = logbuf.getagentstates(ag) self.assertEqual(ls, ['start', 's1', 's2', 's3', 's4']) self.assert_(not logbuf.geterrors()) def test_assertagent(self): logbuf = LoggingBuffer() ag = TestAssertAgent() ag.addhandler('error', logbuf.handleerror) ag.start() schedloop() self.assert_(not logbuf.geterrors()) def test_periodictimeragent(self): logbuf = LoggingBuffer() ag = TestPeriodicTimerAgent(False) ag.addhandler('error', logbuf.handleerror) ag2 = TestPeriodicTimerAgent(True) ag2.addhandler('error', logbuf.handleerror) ag.start() ag2.start() schedloop() ls = logbuf.getagentstates(ag) self.assertEqual(ls, ['start', 's1', 's2', 's3', 's4']) ls = logbuf.getagentstates(ag2) self.assertEqual(ls, ['start', 's1', 's2', 's3', 's4']) self.assert_(not logbuf.geterrors()) def test_precedence(self): logbuf = LoggingBuffer() ag = sched.Agent() ag.addhandler('error', logbuf.handleerror) def logactioncount(ag, msg, count): ag.queueaction(dolog, ag, msg+'-'+str(count)) if (count > 1): ag.queueaction(logactioncount, ag, msg, count-1) def gotactivity(ag): ag.queueaction(logactioncount, ag, 'gamma', 2) ag.queueaction(logactioncount, ag, 'delta', 2) pollaction.remove() pollaction = ag.registerpoll(gotactivity, ag, interval=0.05) ag.start() ag.addtimer(logactioncount, ag, 'alpha', 2, delay=0.0) ag.addtimer(logactioncount, ag, 'beta', 2, delay=0.0) schedloop() ls = logbuf.getdologs() self.assertEqual(ls, ['gamma-2', 'gamma-1', 'delta-2', 'delta-1', 'alpha-2', 'alpha-1', 'beta-2', 'beta-1']) self.assert_(not logbuf.geterrors()) def test_deferred(self): logbuf = LoggingBuffer() ag = sched.Agent() ag.addhandler('error', logbuf.handleerror) def defer2string(st): defer = sched.Deferred(deferstring, st+'e') ac = ag.queueaction(defer) defer.addaction(ac) raise defer def deferstring(st): defer = sched.Deferred(raisestring, st+'s') ac = ag.queueaction(defer) defer.addaction(ac) raise defer def raisestring(st): raise Exception, st def defwrapper(tup, ag): try: res = sched.Deferred.extract(tup) except Exception, ex: dolog(ag, 'caught-'+str(ex)) def exceptwrapper(ag, func, *args): try: func(*args) except sched.Deferred, ex: ex.addcontext(defwrapper, ag) except Exception, ex: dolog(ag, 'caught-'+str(ex)) ag.addhandler('start', exceptwrapper, ag, raisestring, 'bell') ag.addhandler('start', exceptwrapper, ag, deferstring, 'book') ag.addhandler('start', exceptwrapper, ag, defer2string, 'candl') ag.start() schedloop() self.assert_(not logbuf.geterrors()) ls = logbuf.getdologs() self.assertEqual(ls, ['caught-candles', 'caught-books', 'caught-bell']) class TestTCP(unittest.TestCase): def test_tcplisten(self): logbuf = LoggingBuffer() ag = tcp.TCPListen(4201) ag2 = tcp.TCP('localhost', 4201) ag.addhandler('error', logbuf.handleerror) ag2.addhandler('error', logbuf.handleerror) ag.addhandler('accept', acceptwaitquit, ag) ag2.addhandler('connected', ag2.send, 'quit') ag.start() ag2.start() schedloop() ls = logbuf.getagentstates(ag) self.assertEqual(ls, ['start', 'listening', 'end']) ls = logbuf.getagentstates(ag2) self.assertEqual(ls, ['start', 'connected', 'end']) self.assert_(not logbuf.geterrors()) def test_tcplisten2(self): logbuf = LoggingBuffer() ag = tcp.TCPListen(4201) ag2 = tcp.TCP('localhost', 4201) ag3 = tcp.TCP('localhost', 4201) ag.addhandler('error', logbuf.handleerror) ag2.addhandler('error', logbuf.handleerror) ag3.addhandler('error', logbuf.handleerror) ag.addhandler('accept', acceptwaitquit, ag) ag.addhandler('accept', dolog, ag, 'accepting %s %s %s') ag.start() ag2.start() ag3.start() ag.addtimer(sched.stopall, delay=0.1) schedloop() ls = logbuf.getagentstates(ag) self.assertEqual(ls, ['start', 'listening', 'end']) ls = logbuf.getdologs() ls = [ st[:9] for st in ls ] self.assertEqual(ls, ['accepting', 'accepting']) self.assert_(not logbuf.geterrors()) def test_tcplistenreflect(self): logbuf = LoggingBuffer() ag = tcp.TCPListen(4201) ag2 = tcp.TCP('localhost', 4201) ag.addhandler('error', logbuf.handleerror) ag2.addhandler('error', logbuf.handleerror) ag.addhandler('accept', acceptreflect) ag2.addhandler('connected', ag2.send, 'quit') ag2.addhandler('connected', ag2.send, 'begin') ag2.addhandler('handle', waitforquit, ag, ag2) ag2.addhandler('handle', dolog, ag2) ag.start() ag2.start() schedloop() ls = logbuf.getdologs() self.assertEqual(ls, ['begin', 'quit']) self.assert_(not logbuf.geterrors()) def test_tcpsendbig(self): logbuf = LoggingBuffer() ag = tcp.TCPListen(4201) ag2 = tcp.TCP('localhost', 4201) ag.addhandler('error', logbuf.handleerror) ag2.addhandler('error', logbuf.handleerror) ag.addhandler('accept', acceptreflectmost) def sendone(self): if (self.seqsendlist): val = self.seqsendlist.pop(0) res = self.send(val) if (res != len(val)): self.send('badlen') def seqsender(self): self.seqsendlist = ['begin', ('x'*100000), 'quit'] self.addtimer(sendone, self, delay=0.1, periodic=True) ag2.addhandler('connected', seqsender, ag2) ag2.addhandler('handle', waitforquit, ag, ag2) ag2.addhandler('handle', dolog, ag2) ag.start() ag2.start() schedloop() ls = logbuf.getdologs() self.assertEqual(ls, ['begin', '100000', 'quit']) self.assert_(not logbuf.geterrors()) def test_tcpsendbignonblock(self): logbuf = LoggingBuffer() ag = tcp.TCPListen(4201) ag2 = tcp.TCP('localhost', 4201) ag.addhandler('error', logbuf.handleerror) ag2.addhandler('error', logbuf.handleerror) ag.addhandler('accept', acceptreflectmost) def sendone(self): if (self.seqsendlist): val = self.seqsendlist.pop(0) res = self.sendnb(val) val = val[res:] if (val): self.seqsendlist.insert(0, val) def seqsender(self): self.seqsendlist = ['begin', ('x'*100000), 'quit'] self.addtimer(sendone, self, delay=0.1, periodic=True) ag2.addhandler('connected', seqsender, ag2) ag2.addhandler('handle', waitforquit, ag, ag2) ag2.addhandler('handle', dolog, ag2) ag.start() ag2.start() schedloop() ls = logbuf.getdologs() self.assertNotEqual(ls, ['begin', 'len100000', 'quit']) self.assertEqual(ls[0], 'begin') self.assertEqual(ls[-1], 'quit') sum = 0 for val in ls[1:-1]: sum += int(val) self.assertEqual(sum, 100000) self.assert_(not logbuf.geterrors()) def test_tcplistencloseserv(self): logbuf = LoggingBuffer() ag = tcp.TCPListen(4201) ag2 = tcp.TCP('localhost', 4201) ag.addhandler('error', logbuf.handleerror) ag2.addhandler('error', logbuf.handleerror) ag.addhandler('accept', acceptbriefly, ag) ag.addhandler('closed', dolog, ag, 'closedlisten') ag2.addhandler('closed', dolog, ag2, 'closedcli') ag.start() ag2.start() schedloop() ls = logbuf.getdologs() self.assertEqual(ls, ['closedcli']) self.assert_(not logbuf.geterrors()) def test_tcplistencloseserv2(self): logbuf = LoggingBuffer() ag = tcp.TCPListen(4201) ag2 = tcp.TCP('localhost', 4201) ag.addhandler('error', logbuf.handleerror) ag2.addhandler('error', logbuf.handleerror) pocket = Pocket() ag.addhandler('accept', acceptpocket, pocket) ag.addhandler('closed', dolog, ag, 'closedlisten') ag2.addhandler('closed', dolog, ag2, 'closedcli') ag.start() ag2.start() ag.addtimer(pocket.stop, delay=0.1) ag.addtimer(ag.stop, delay=0.1) schedloop() ls = logbuf.getdologs() self.assertEqual(ls, ['closedcli']) self.assert_(not logbuf.geterrors()) def test_tcplistenclosecli(self): logbuf = LoggingBuffer() ag = tcp.TCPListen(4201) ag2 = tcp.TCP('localhost', 4201) ag.addhandler('error', logbuf.handleerror) ag2.addhandler('error', logbuf.handleerror) pocket = Pocket() ag.addhandler('accept', acceptpocket, pocket) ag.addhandler('closed', dolog, ag, 'closedlisten') ag2.addhandler('closed', dolog, ag2, 'closedcli') ag.start() ag2.start() ag2.addtimer(ag2.stop, delay=0.1) ag.addtimer(ag.stop, delay=0.1) schedloop() ls = logbuf.getdologs() self.assertEqual(ls, ['closedpocket']) self.assert_(not logbuf.geterrors()) class TestXML(unittest.TestCase): def test_basicxml(self): xmldat = [ "", "", "cheese", "" ] logbuf = LoggingBuffer() ag = TestStringListAgent(xmldat) ag2 = xmlagent.XML(ag) ag.addhandler('error', logbuf.handleerror) ag2.addhandler('error', logbuf.handleerror) ag2.addhandler('body', dolog, ag2, 'body %s %s %s') ag2.addhandler('stanza', dolog, ag2, 'stanza %s') ag2.addhandler('stanza', assertnoaccumulate, ag2) ag2.start() ag.start() schedloop() ls = logbuf.getagentstates(ag2) self.assertEqual(ls, ['start', 'body', 'end']) self.assertEqual(ag2.docattrs['key1'], 'val') self.assertEqual(ag2.docattrs['xml:lang'], 'en') ls = logbuf.getdologs() st = ls[0] self.assert_(re.match("body toplevel name:space {[^}]*}", st)) self.assertEqual(ls[1:], [ 'stanza ', 'stanza cheese' ]) self.assert_(not logbuf.geterrors()) def test_unicodexml(self): xmldat = [ "", "\xe2\xa8\xb8", # utf8-encoded: "che\xc3\xa8se", # utf8-encoded: chese "" ] logbuf = LoggingBuffer() stanzas = [] ag = TestStringListAgent(xmldat) ag2 = xmlagent.XML(ag) ag.addhandler('error', logbuf.handleerror) ag2.addhandler('error', logbuf.handleerror) ag2.addhandler('stanza', stanzas.append) ag2.start() ag.start() schedloop() self.assertEqual(len(stanzas), 2) self.assertEqual(stanzas[0].getdata(), u'\u2a38') self.assertEqual(stanzas[1].getdata(), u'che\xe8se') self.assert_(not logbuf.geterrors()) def test_basicxmlended(self): xmldat = [ "", "", "cheese" ] logbuf = LoggingBuffer() ag = TestStringListAgent(xmldat, True) ag2 = xmlagent.XML(ag) ag.addhandler('error', logbuf.handleerror) ag2.addhandler('error', logbuf.handleerror) ag2.addhandler('body', dolog, ag2, 'body %s %s %s') ag2.addhandler('stanza', dolog, ag2, 'stanza %s') ag2.start() ag.start() schedloop() ls = logbuf.getagentstates(ag2) self.assertEqual(ls, ['start', 'body', 'end']) self.assertEqual(ag2.docattrs['key1'], 'val') self.assertEqual(ag2.docattrs['xml:lang'], 'en') ls = logbuf.getdologs() st = ls[0] self.assert_(re.match("body toplevel name:space {[^}]*}", st)) self.assertEqual(ls[1:], [ 'stanza ', 'stanza cheese' ]) self.assert_(not logbuf.geterrors()) def test_basicxmlnoclose(self): xmldat = [ "", "", "cheese" ] logbuf = LoggingBuffer() ag = TestStringListAgent(xmldat) ag2 = xmlagent.XML(ag) ag.addhandler('error', logbuf.handleerror) ag2.addhandler('error', logbuf.handleerror) ag2.addhandler('body', dolog, ag2, 'body %s %s %s') ag2.addhandler('stanza', dolog, ag2, 'stanza %s') ag2.start() ag.start() schedloop() ls = logbuf.getagentstates(ag2) self.assertEqual(ls, ['start', 'body']) self.assertEqual(ag2.docattrs['key1'], 'val') self.assertEqual(ag2.docattrs['xml:lang'], 'en') ls = logbuf.getdologs() st = ls[0] self.assert_(re.match("body toplevel name:space {[^}]*}", st)) self.assertEqual(ls[1:], [ 'stanza ', 'stanza cheese' ]) self.assert_(not logbuf.geterrors()) def test_malformedxml(self): xmldat = [ "", "", "", "cheese", "" ] logbuf = LoggingBuffer() ag = TestStringListAgent(xmldat) ag2 = xmlagent.XML(ag) ag.addhandler('error', logbuf.handleerror) ag2.addhandler('error', logbuf.handleerror) ag2.addhandler('body', dolog, ag2, 'body %s %s %s') ag2.addhandler('stanza', dolog, ag2, 'stanza %s') ag2.start() ag.start() schedloop() ls = logbuf.getagentstates(ag2) self.assertEqual(ls, ['start', 'body', 'end']) self.assertEqual(ag2.docattrs['key1'], 'val') self.assertEqual(ag2.docattrs['xml:lang'], 'en') ls = logbuf.getdologs() st = ls[0] self.assert_(re.match("body toplevel name:space {[^}]*}", st)) self.assertEqual(ls[1:], [ 'stanza ' ]) ls = logbuf.geterrors() self.assertEqual(len(ls), 1) self.assert_(isinstance(ls[0], xml.parsers.expat.ExpatError)) class TestParrotAgent(tcp.TCP): def __init__(self, host, port, socket, dic): tcp.TCP.__init__(self, host, port, socket) self.dic = dic self.addhandler('handle', self.grabstring) def grabstring(self, st): if (self.dic.has_key(st)): res = self.dic[st] if (type(res) != tuple): res = ( res, ) for val in res: if (val == None): self.stop() return wrote = self.send(val) assert (wrote == len(val)) class TestListenParrotAgent(tcp.TCPListen): def __init__(self, port, pocket, dic): tcp.TCPListen.__init__(self, port) self.pocket = pocket self.dic = dic self.addhandler('accept', self.acceptone) def acceptone(self, socket, host, port): cl = TestParrotAgent(host, port, socket, self.dic) cl.start() self.pocket.set(cl) self.stop() class TestJabber(unittest.TestCase): def test_connectandstop(self): dic = { '' : ( "", ""), 'QUIT': None } logbuf = LoggingBuffer() pocket = Pocket() ag = TestListenParrotAgent(4201, pocket, dic) ag2 = jabber.client.JabberConnect('erkyrath@localhost/test', 4201) ag2.addhandler('error', logbuf.handleerror) ag2.addhandler('connected', ag2.conn.send, 'QUIT') ag.start() ag2.start() schedloop() ls = logbuf.getagentstates(ag2) self.assertEqual(ls, ['start', 'gotheader', 'streaming', 'connected', 'end']) self.assert_(not logbuf.geterrors()) def test_malformedheader(self): dic = { '' : ( "", ""), 'QUIT': None } logbuf = LoggingBuffer() pocket = Pocket() ag = TestListenParrotAgent(4201, pocket, dic) ag2 = jabber.client.JabberConnect('erkyrath@localhost/test', 4201) ag2.addhandler('error', logbuf.handleerror) ag2.addhandler('connected', ag2.conn.send, 'QUIT') ag.start() ag2.start() schedloop() ls = logbuf.getagentstates(ag2) self.assertEqual(ls, ['start', 'end']) ls = logbuf.geterrors() self.assertEqual(len(ls), 1) self.assert_(isinstance(ls[0], xml.parsers.expat.ExpatError)) def test_wrongtagheader(self): dic = { '' : ( "", ""), 'QUIT': None } logbuf = LoggingBuffer() pocket = Pocket() ag = TestListenParrotAgent(4201, pocket, dic) ag2 = jabber.client.JabberConnect('erkyrath@localhost/test', 4201) ag2.addhandler('error', logbuf.handleerror) ag2.addhandler('connected', ag2.conn.send, 'QUIT') ag.start() ag2.start() schedloop() ls = logbuf.getagentstates(ag2) self.assertEqual(ls, ['start', 'end']) ls = logbuf.geterrors() self.assertEqual(len(ls), 1) self.assert_(isinstance(ls[0], jabber.interface.StreamLevelError)) (nam, text) = ls[0] self.assert_(nam, 'invalid-xml') def test_wrongtagnamespace(self): dic = { '' : ( "", ""), 'QUIT': None } logbuf = LoggingBuffer() pocket = Pocket() ag = TestListenParrotAgent(4201, pocket, dic) ag2 = jabber.client.JabberConnect('erkyrath@localhost/test', 4201) ag2.addhandler('error', logbuf.handleerror) ag2.addhandler('connected', ag2.conn.send, 'QUIT') ag.start() ag2.start() schedloop() ls = logbuf.getagentstates(ag2) self.assertEqual(ls, ['start', 'end']) ls = logbuf.geterrors() self.assertEqual(len(ls), 1) self.assert_(isinstance(ls[0], jabber.interface.StreamLevelError)) (nam, text) = ls[0] self.assert_(nam, 'invalid-namespace') def test_malformedstanza(self): dic = { '' : ( "", ""), 'QUIT': None } logbuf = LoggingBuffer() pocket = Pocket() ag = TestListenParrotAgent(4201, pocket, dic) ag2 = jabber.client.JabberConnect('erkyrath@localhost/test', 4201) ag2.addhandler('error', logbuf.handleerror) ag2.addhandler('connected', ag2.conn.send, 'QUIT') ag.start() ag2.start() schedloop() ls = logbuf.getagentstates(ag2) self.assertEqual(ls, ['start', 'gotheader', 'end']) ls = logbuf.geterrors() self.assertEqual(len(ls), 1) self.assert_(isinstance(ls[0], xml.parsers.expat.ExpatError)) def test_streamerror(self): dic = { '' : ( "", ""), 'QUIT': None } logbuf = LoggingBuffer() pocket = Pocket() ag = TestListenParrotAgent(4201, pocket, dic) ag2 = jabber.client.JabberConnect('erkyrath@localhost/test', 4201) ag2.addhandler('error', logbuf.handleerror) ag2.addhandler('connected', ag2.conn.send, 'QUIT') ag.start() ag2.start() schedloop() ls = logbuf.getagentstates(ag2) self.assertEqual(ls, ['start', 'gotheader', 'end']) ls = logbuf.geterrors() self.assertEqual(len(ls), 1) self.assert_(isinstance(ls[0], jabber.interface.StreamLevelError)) (nam, text) = ls[0] self.assert_(nam, 'xml-not-well-formed') def test_secondaryqueue(self): dic = { '' : ( "", ""), 'M1': ('alpha1', 'beta1', 'alpha2beta2' 'omega') } def handlestanza(ag, stanza): val = stanza.getdata() dolog(ag, val) if (val.startswith('alpha')): ag.queueaction(dolog, ag, 'startgame') if (val.startswith('beta')): dolog(ag, 'turn') if (val == 'omega'): ag.queueaction(ag.stop) logbuf = LoggingBuffer() pocket = Pocket() ag = TestListenParrotAgent(4201, pocket, dic) ag2 = jabber.client.JabberConnect('erkyrath@localhost/test', 4201) ag2.addhandler('error', logbuf.handleerror) ag2.addhandler('connected', ag2.conn.send, 'M1') ag2.adddispatcher(handlestanza, ag2, name='message', accept=True) ag.start() ag2.start() schedloop() ls = logbuf.getagentstates(ag2) self.assertEqual(ls, ['start', 'gotheader', 'streaming', 'connected', 'end']) self.assert_(not logbuf.geterrors()) ls = logbuf.getdologs() self.assertEqual(ls, ['alpha1', 'startgame', 'beta1', 'turn', 'alpha2', 'startgame', 'beta2', 'turn', 'omega', 'closedpocket']) # ---- testlist = [ ('sched', TestSched), ('tcp', TestTCP), ('xml', TestXML), ('xmldata', xmldata.TestXMLData), ('jabber', TestJabber), ('interface', jabber.interface.TestInterface), ('rpcdata', jabber.rpcdata.TestRpcData), ('rpc', jabber.rpc.TestRpc), ('discodata', jabber.discodata.TestDiscoData), ('dataform', jabber.dataform.TestDataForm), ] def run(arglist=[]): if (type(arglist) == str): arglist = arglist.split() if ('-D' in arglist): arglist.remove('-D') LoggingBuffer.alsoprint = True if (not arglist): tests = testlist else: tests = [(key, case) for (key, case) in testlist if key in arglist] ls = [ case for (key, case) in tests ] print 'Running:', (' '.join([ key for (key, case) in tests ])) suitels = [ unittest.makeSuite(case) for case in ls ] suite = unittest.TestSuite(suitels) unittest.TextTestRunner().run(suite) if __name__ == '__main__': run(sys.argv[1:])