summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: 15a5ffa)
raw | patch | inline | side by side (parent: 15a5ffa)
author | richard <richard@57a73879-2fb5-44c3-a270-3262357dd7e2> | |
Sun, 14 Jul 2002 02:16:29 +0000 (02:16 +0000) | ||
committer | richard <richard@57a73879-2fb5-44c3-a270-3262357dd7e2> | |
Sun, 14 Jul 2002 02:16:29 +0000 (02:16 +0000) |
a special case for it in testing)
git-svn-id: http://svn.roundup-tracker.org/svnroot/roundup/trunk@871 57a73879-2fb5-44c3-a270-3262357dd7e2
git-svn-id: http://svn.roundup-tracker.org/svnroot/roundup/trunk@871 57a73879-2fb5-44c3-a270-3262357dd7e2
roundup/backends/back_metakit.py | patch | blob | history | |
test/test_db.py | patch | blob | history |
index 49028519361c754d4902b792cb4029c9554b6164..6e51507fa7fe7f0b8496fce37d6f1bca9ed8976d 100755 (executable)
self.get(nodeid, 'content'), mimetype)
# Yuck - c&p to avoid getting hyperdb.Class
-class IssueClass(Class):
+class IssueClass(Class, roundupdb.IssueClass):
# Overridden methods:
properties['superseder'] = hyperdb.Multilink(classname)
Class.__init__(self, db, classname, **properties)
- # New methods:
-
- def addmessage(self, nodeid, summary, text):
- """Add a message to an issue's mail spool.
-
- A new "msg" node is constructed using the current date, the user that
- owns the database connection as the author, and the specified summary
- text.
-
- The "files" and "recipients" fields are left empty.
-
- The given text is saved as the body of the message and the node is
- appended to the "messages" field of the specified issue.
- """
-
- def nosymessage(self, nodeid, msgid, oldvalues):
- """Send a message to the members of an issue's nosy list.
-
- The message is sent only to users on the nosy list who are not
- already on the "recipients" list for the message.
-
- These users are then added to the message's "recipients" list.
- """
- users = self.db.user
- messages = self.db.msg
-
- # figure the recipient ids
- sendto = []
- r = {}
- recipients = messages.get(msgid, 'recipients')
- for recipid in messages.get(msgid, 'recipients'):
- r[recipid] = 1
-
- # figure the author's id, and indicate they've received the message
- authid = messages.get(msgid, 'author')
-
- # possibly send the message to the author, as long as they aren't
- # anonymous
- if (self.db.config.MESSAGES_TO_AUTHOR == 'yes' and
- users.get(authid, 'username') != 'anonymous'):
- sendto.append(authid)
- r[authid] = 1
-
- # now figure the nosy people who weren't recipients
- nosy = self.get(nodeid, 'nosy')
- for nosyid in nosy:
- # Don't send nosy mail to the anonymous user (that user
- # shouldn't appear in the nosy list, but just in case they
- # do...)
- if users.get(nosyid, 'username') == 'anonymous':
- continue
- # make sure they haven't seen the message already
- if not r.has_key(nosyid):
- # send it to them
- sendto.append(nosyid)
- recipients.append(nosyid)
-
- # generate a change note
- if oldvalues:
- note = self.generateChangeNote(nodeid, oldvalues)
- else:
- note = self.generateCreateNote(nodeid)
-
- # we have new recipients
- if sendto:
- # map userids to addresses
- sendto = [users.get(i, 'address') for i in sendto]
-
- # update the message's recipients list
- messages.set(msgid, recipients=recipients)
-
- # send the message
- self.send_message(nodeid, msgid, note, sendto)
-
- # XXX backwards compatibility - don't remove
- sendmessage = nosymessage
-
- def send_message(self, nodeid, msgid, note, sendto):
- '''Actually send the nominated message from this node to the sendto
- recipients, with the note appended.
- '''
- users = self.db.user
- messages = self.db.msg
- files = self.db.file
-
- # determine the messageid and inreplyto of the message
- inreplyto = messages.get(msgid, 'inreplyto')
- messageid = messages.get(msgid, 'messageid')
-
- # make up a messageid if there isn't one (web edit)
- if not messageid:
- # this is an old message that didn't get a messageid, so
- # create one
- messageid = "<%s.%s.%s%s@%s>"%(time.time(), random.random(),
- self.classname, nodeid, self.db.config.MAIL_DOMAIN)
- messages.set(msgid, messageid=messageid)
-
- # send an email to the people who missed out
- cn = self.classname
- title = self.get(nodeid, 'title') or '%s message copy'%cn
- # figure author information
- authid = messages.get(msgid, 'author')
- authname = users.get(authid, 'realname')
- if not authname:
- authname = users.get(authid, 'username')
- authaddr = users.get(authid, 'address')
- if authaddr:
- authaddr = ' <%s>'%authaddr
- else:
- authaddr = ''
-
- # make the message body
- m = ['']
-
- # put in roundup's signature
- if self.db.config.EMAIL_SIGNATURE_POSITION == 'top':
- m.append(self.email_signature(nodeid, msgid))
-
- # add author information
- if len(self.get(nodeid,'messages')) == 1:
- m.append("New submission from %s%s:"%(authname, authaddr))
- else:
- m.append("%s%s added the comment:"%(authname, authaddr))
- m.append('')
-
- # add the content
- m.append(messages.get(msgid, 'content'))
-
- # add the change note
- if note:
- m.append(note)
-
- # put in roundup's signature
- if self.db.config.EMAIL_SIGNATURE_POSITION == 'bottom':
- m.append(self.email_signature(nodeid, msgid))
-
- # encode the content as quoted-printable
- content = cStringIO.StringIO('\n'.join(m))
- content_encoded = cStringIO.StringIO()
- quopri.encode(content, content_encoded, 0)
- content_encoded = content_encoded.getvalue()
-
- # get the files for this message
- message_files = messages.get(msgid, 'files')
-
- # make sure the To line is always the same (for testing mostly)
- sendto.sort()
-
- # create the message
- message = cStringIO.StringIO()
- writer = MimeWriter.MimeWriter(message)
- writer.addheader('Subject', '[%s%s] %s'%(cn, nodeid, title))
- writer.addheader('To', ', '.join(sendto))
- writer.addheader('From', '%s <%s>'%(authname,
- self.db.config.ISSUE_TRACKER_EMAIL))
- writer.addheader('Reply-To', '%s <%s>'%(self.db.config.INSTANCE_NAME,
- self.db.config.ISSUE_TRACKER_EMAIL))
- writer.addheader('MIME-Version', '1.0')
- if messageid:
- writer.addheader('Message-Id', messageid)
- if inreplyto:
- writer.addheader('In-Reply-To', inreplyto)
-
- # add a uniquely Roundup header to help filtering
- writer.addheader('X-Roundup-Name', self.db.config.INSTANCE_NAME)
-
- # attach files
- if message_files:
- part = writer.startmultipartbody('mixed')
- part = writer.nextpart()
- part.addheader('Content-Transfer-Encoding', 'quoted-printable')
- body = part.startbody('text/plain')
- body.write(content_encoded)
- for fileid in message_files:
- name = files.get(fileid, 'name')
- mime_type = files.get(fileid, 'type')
- content = files.get(fileid, 'content')
- part = writer.nextpart()
- if mime_type == 'text/plain':
- part.addheader('Content-Disposition',
- 'attachment;\n filename="%s"'%name)
- part.addheader('Content-Transfer-Encoding', '7bit')
- body = part.startbody('text/plain')
- body.write(content)
- else:
- # some other type, so encode it
- if not mime_type:
- # this should have been done when the file was saved
- mime_type = mimetypes.guess_type(name)[0]
- if mime_type is None:
- mime_type = 'application/octet-stream'
- part.addheader('Content-Disposition',
- 'attachment;\n filename="%s"'%name)
- part.addheader('Content-Transfer-Encoding', 'base64')
- body = part.startbody(mime_type)
- body.write(base64.encodestring(content))
- writer.lastpart()
- else:
- writer.addheader('Content-Transfer-Encoding', 'quoted-printable')
- body = writer.startbody('text/plain')
- body.write(content_encoded)
-
- # now try to send the message
- if SENDMAILDEBUG:
- open(SENDMAILDEBUG, 'w').write('FROM: %s\nTO: %s\n%s\n'%(
- self.db.config.ADMIN_EMAIL,
- ', '.join(sendto),message.getvalue()))
- else:
- try:
- # send the message as admin so bounces are sent there
- # instead of to roundup
- smtp = smtplib.SMTP(self.db.config.MAILHOST)
- smtp.sendmail(self.db.config.ADMIN_EMAIL, sendto,
- message.getvalue())
- except socket.error, value:
- raise MessageSendError, \
- "Couldn't send confirmation email: mailhost %s"%value
- except smtplib.SMTPException, value:
- raise MessageSendError, \
- "Couldn't send confirmation email: %s"%value
-
- def email_signature(self, nodeid, msgid):
- ''' Add a signature to the e-mail with some useful information
- '''
- web = self.db.config.ISSUE_TRACKER_WEB + 'issue'+ nodeid
- email = '"%s" <%s>'%(self.db.config.INSTANCE_NAME,
- self.db.config.ISSUE_TRACKER_EMAIL)
- line = '_' * max(len(web), len(email))
- return '%s\n%s\n%s\n%s'%(line, email, web, line)
-
- def generateCreateNote(self, nodeid):
- """Generate a create note that lists initial property values
- """
- cn = self.classname
- cl = self.db.classes[cn]
- props = cl.getprops(protected=0)
-
- # list the values
- m = []
- l = props.items()
- l.sort()
- for propname, prop in l:
- value = cl.get(nodeid, propname, None)
- # skip boring entries
- if not value:
- continue
- if isinstance(prop, hyperdb.Link):
- link = self.db.classes[prop.classname]
- if value:
- key = link.labelprop(default_to_id=1)
- if key:
- value = link.get(value, key)
- else:
- value = ''
- elif isinstance(prop, hyperdb.Multilink):
- if value is None: value = []
- l = []
- link = self.db.classes[prop.classname]
- key = link.labelprop(default_to_id=1)
- if key:
- value = [link.get(entry, key) for entry in value]
- value.sort()
- value = ', '.join(value)
- m.append('%s: %s'%(propname, value))
- m.insert(0, '----------')
- m.insert(0, '')
- return '\n'.join(m)
-
- def generateChangeNote(self, nodeid, oldvalues):
- """Generate a change note that lists property changes
- """
- cn = self.classname
- cl = self.db.classes[cn]
- changed = {}
- props = cl.getprops(protected=0)
-
- # determine what changed
- for key in oldvalues.keys():
- if key in ['files','messages']: continue
- new_value = cl.get(nodeid, key)
- # the old value might be non existent
- try:
- old_value = oldvalues[key]
- if type(new_value) is type([]):
- new_value.sort()
- old_value.sort()
- if new_value != old_value:
- changed[key] = old_value
- except:
- changed[key] = new_value
-
- # list the changes
- m = []
- l = changed.items()
- l.sort()
- for propname, oldvalue in l:
- prop = props[propname]
- value = cl.get(nodeid, propname, None)
- if isinstance(prop, hyperdb.Link):
- link = self.db.classes[prop.classname]
- key = link.labelprop(default_to_id=1)
- if key:
- if value:
- value = link.get(value, key)
- else:
- value = ''
- if oldvalue:
- oldvalue = link.get(oldvalue, key)
- else:
- oldvalue = ''
- change = '%s -> %s'%(oldvalue, value)
- elif isinstance(prop, hyperdb.Multilink):
- change = ''
- if value is None: value = []
- if oldvalue is None: oldvalue = []
- l = []
- link = self.db.classes[prop.classname]
- key = link.labelprop(default_to_id=1)
- # check for additions
- for entry in value:
- if entry in oldvalue: continue
- if key:
- l.append(link.get(entry, key))
- else:
- l.append(entry)
- if l:
- change = '+%s'%(', '.join(l))
- l = []
- # check for removals
- for entry in oldvalue:
- if entry in value: continue
- if key:
- l.append(link.get(entry, key))
- else:
- l.append(entry)
- if l:
- change += ' -%s'%(', '.join(l))
- else:
- change = '%s -> %s'%(oldvalue, value)
- m.append('%s: %s'%(propname, change))
- if m:
- m.insert(0, '----------')
- m.insert(0, '')
- return '\n'.join(m)
diff --git a/test/test_db.py b/test/test_db.py
index e8a5b6b198e6426de01e7bb1c9e8754a312529b7..1bfc15ed7a2004a5b5b422d0d8714071f4a4ab8b 100644 (file)
--- a/test/test_db.py
+++ b/test/test_db.py
# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#
-# $Id: test_db.py,v 1.27 2002-07-14 02:05:54 richard Exp $
+# $Id: test_db.py,v 1.28 2002-07-14 02:16:29 richard Exp $
import unittest, os, shutil
self.db.file.create(name="test", type="text/plain", content="hi")
self.db.rollback()
- def testNewProperty(self):
- ' make sure a new property is added ok '
- self.db.issue.create(title="spam", status='1')
- self.db.issue.addprop(fixer=Link("user"))
- props = self.db.issue.getprops()
- keys = props.keys()
- keys.sort()
- # metakit _base_ Class has the activity, creation and creator too
- self.assertEqual(keys, ['activity', 'creation', 'creator',
- 'deadline', 'files', 'fixer', 'foo', 'id', 'nosy', 'status',
- 'title'])
- self.assertEqual(self.db.issue.get('1', "fixer"), None)
-
class metakitReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
def setUp(self):
from roundup.backends import metakit
#
# $Log: not supported by cvs2svn $
+# Revision 1.27 2002/07/14 02:05:54 richard
+# . all storage-specific code (ie. backend) is now implemented by the backends
+#
# Revision 1.26 2002/07/11 01:11:03 richard
# Added metakit backend to the db tests and fixed the more easily fixable test
# failures.