summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: 67517de)
raw | patch | inline | side by side (parent: 67517de)
author | richard <richard@57a73879-2fb5-44c3-a270-3262357dd7e2> | |
Sat, 25 Oct 2003 22:53:26 +0000 (22:53 +0000) | ||
committer | richard <richard@57a73879-2fb5-44c3-a270-3262357dd7e2> | |
Sat, 25 Oct 2003 22:53:26 +0000 (22:53 +0000) |
coverage analysis
- all RDMBS backends now have indexes on several columns
- added testing of schema mutation, fixed rdbms backends handling of a
couple of cases
- !BETA! added postgresql backend, needs work !BETA!
git-svn-id: http://svn.roundup-tracker.org/svnroot/roundup/trunk@1942 57a73879-2fb5-44c3-a270-3262357dd7e2
- all RDMBS backends now have indexes on several columns
- added testing of schema mutation, fixed rdbms backends handling of a
couple of cases
- !BETA! added postgresql backend, needs work !BETA!
git-svn-id: http://svn.roundup-tracker.org/svnroot/roundup/trunk@1942 57a73879-2fb5-44c3-a270-3262357dd7e2
27 files changed:
diff --git a/CHANGES.txt b/CHANGES.txt
index 400fabd9d337100f3fa8c69a39be05e15e83953f..cc5356431e2f43d09f1da057a694979cef77088f 100644 (file)
--- a/CHANGES.txt
+++ b/CHANGES.txt
Feature:
- support confirming registration by replying to the email (sf bug 763668)
- support setgid and running on port < 1024 (sf patch 777528)
+- using Zope3's test runner now, allowing GC checks, nicer controls and
+ coverage analysis
+- !BETA! added postgresql backend, needs work !BETA!
+- all RDMBS backends now have indexes on several columns
Fixed:
- mysql documentation fixed to note requirement of 4.0+ and InnoDB
+- added testing of schema mutation, fixed rdbms backends handling of a
+ couple of cases
2003-10-?? 0.6.3
diff --git a/doc/postgresql.txt b/doc/postgresql.txt
--- /dev/null
+++ b/doc/postgresql.txt
@@ -0,0 +1,48 @@
+==========================
+PostgreSQL/psycopg Backend
+==========================
+
+This is notes about PostreSQL backend based on the psycopg adapter for
+roundup issue tracker.
+
+
+Prerequisites
+=============
+
+To use PostgreSQL as backend for storing roundup data, you should
+additionally install:
+
+ 1. PostgreSQL 7.x - http://www.postgresql.org/
+
+ 2. The psycopg python interface to PostgreSQL -
+ http://initd.org/software/initd/psycopg
+
+
+Additional configuration
+========================
+
+To initialise and use PostgreSQL database roundup's configuration file
+(config.py in the tracker's home directory) should be appended with the
+following constants (substituting real values, obviously):
+
+ POSTGRESQL_DBHOST = 'localhost'
+ POSTGRESQL_DBUSER = 'roundup'
+ POSTGRESQL_DBPASSWORD = 'roundup'
+ POSTGRESQL_DBNAME = 'roundup'
+ POSTGRESQL_PORT = 5432
+ POSTGRESQL_DATABASE = {'host':MYSQL_DBHOST, 'port':POSTGRESQL_PORT,
+ 'user':MYSQL_DBUSER, 'password':MYSQL_DBPASSWORD,
+ 'database':MYSQL_DBNAME}
+
+Also note that you can leave some values out of POSTGRESQL_DATABASE: 'host' and
+'port' are not necessary when connecting to a local database and 'password'
+is optional if postgres trusts local connections. The user specified in
+POSTGRESQL_DBUSER must have rights to create a new database and to connect to
+the "template1" database, used while initializing roundup.
+
+
+ Have fun with psycopg,
+ Federico Di Gregorio <fog@initd.org>
+
+
+vim: et tw=80
index d2253015458cbd13a1600dea7db62c387b7c018d..15ae2ac9248784b897072630bf46903bd50ff09e 100644 (file)
# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#
-# $Id: __init__.py,v 1.24 2003-09-14 18:55:37 jlgijsbers Exp $
+# $Id: __init__.py,v 1.25 2003-10-25 22:53:26 richard Exp $
''' Container for the hyperdb storage backend implementations.
__all__ = []
for backend in ['anydbm', ('mysql', 'MySQLdb'), 'bsddb', 'bsddb3', 'sqlite',
- 'metakit']:
+ 'metakit', ('postgresql', 'psycopg')]:
if len(backend) == 2:
backend, backend_module = backend
else:
backend_module = backend
try:
- globals()[backend] = __import__('back_%s' % backend, globals())
+ globals()[backend] = __import__('back_%s'%backend, globals())
__all__.append(backend)
except ImportError, e:
- if not str(e).startswith('No module named %s' % backend_module):
+ if not str(e).startswith('No module named %s'%backend_module):
raise
# vim: set filetype=python ts=4 sw=4 et si
index 130327bc45921dc92a771bbee17fe5bc0dde4783..05f2de33a685c859e45ce134254fcc5e5bbe6e9c 100644 (file)
import os, shutil
from MySQLdb.constants import ER
-class Maintenance:
- """ Database maintenance functions """
- def db_nuke(self, config):
- """Clear all database contents and drop database itself"""
- db = Database(config, 'admin')
+# Database maintenance functions
+def db_nuke(config):
+ """Clear all database contents and drop database itself"""
+ db = Database(config, 'admin')
+ try:
db.sql_commit()
db.sql("DROP DATABASE %s" % config.MYSQL_DBNAME)
db.sql("CREATE DATABASE %s" % config.MYSQL_DBNAME)
- if os.path.exists(config.DATABASE):
- shutil.rmtree(config.DATABASE)
-
- def db_exists(self, config):
- """Check if database already exists"""
- # Yes, this is a hack, but we must must open connection without
- # selecting a database to prevent creation of some tables
- config.MYSQL_DATABASE = (config.MYSQL_DBHOST, config.MYSQL_DBUSER,
- config.MYSQL_DBPASSWORD)
- db = Database(config, 'admin')
+ finally:
+ db.close()
+ if os.path.exists(config.DATABASE):
+ shutil.rmtree(config.DATABASE)
+
+def db_exists(config):
+ """Check if database already exists"""
+ # Yes, this is a hack, but we must must open connection without
+ # selecting a database to prevent creation of some tables
+ config.MYSQL_DATABASE = (config.MYSQL_DBHOST, config.MYSQL_DBUSER,
+ config.MYSQL_DBPASSWORD)
+ db = Database(config, 'admin')
+ try:
db.conn.select_db(config.MYSQL_DBNAME)
config.MYSQL_DATABASE = (config.MYSQL_DBHOST, config.MYSQL_DBUSER,
config.MYSQL_DBPASSWORD, config.MYSQL_DBNAME)
db.sql("SHOW TABLES")
tables = db.sql_fetchall()
- if tables or os.path.exists(config.DATABASE):
- return 1
- return 0
+ finally:
+ db.close()
+ if tables or os.path.exists(config.DATABASE):
+ return 1
+ return 0
class Database(Database):
arg = '%s'
print >>hyperdb.DEBUG, 'create_class', (self, sql)
self.cursor.execute(sql)
- # Static methods
- nuke = Maintenance().db_nuke
- exists = Maintenance().db_exists
-
class MysqlClass:
# we're overriding this method for ONE missing bit of functionality.
# look for "I can't believe it's not a toy RDBMS" below
diff --git a/roundup/backends/back_postgresql.py b/roundup/backends/back_postgresql.py
--- /dev/null
@@ -0,0 +1,219 @@
+#
+# Copyright (c) 2003 Martynas Sklyzmantas, Andrey Lebedev <andrey@micro.lt>
+#
+# This module is free software, and you may redistribute it and/or modify
+# under the same terms as Python, so long as this copyright message and
+# disclaimer are retained in their original form.
+#
+# psycopg backend for roundup
+#
+
+from roundup.backends.rdbms_common import *
+from roundup.backends import rdbms_common
+import psycopg
+import os, shutil
+
+class Maintenance:
+ """ Database maintenance functions """
+ def db_nuke(self, config):
+ """Clear all database contents and drop database itself"""
+ config.POSTGRESQL_DATABASE['database'] = 'template1'
+ db = Database(config, 'admin')
+ db.conn.set_isolation_level(0)
+ db.sql("DROP DATABASE %s" % config.POSTGRESQL_DBNAME)
+ db.sql("CREATE DATABASE %s" % config.POSTGRESQL_DBNAME)
+ if os.path.exists(config.DATABASE):
+ shutil.rmtree(config.DATABASE)
+ config.POSTGRESQL_DATABASE['database'] = config.POSTGRESQL_DBNAME
+
+ def db_exists(self, config):
+ """Check if database already exists"""
+ try:
+ db = Database(config, 'admin')
+ return 1
+ except:
+ return 0
+
+class Database(Database):
+ arg = '%s'
+
+ def open_connection(self):
+ db = getattr(self.config, 'POSTGRESQL_DATABASE')
+ try:
+ self.conn = psycopg.connect(**db)
+ except psycopg.OperationalError, message:
+ raise DatabaseError, message
+
+ self.cursor = self.conn.cursor()
+
+ try:
+ self.database_schema = self.load_dbschema()
+ except:
+ self.rollback()
+ self.database_schema = {}
+ self.sql("CREATE TABLE schema (schema TEXT)")
+ self.sql("CREATE TABLE ids (name VARCHAR(255), num INT4)")
+
+ def close(self):
+ self.conn.close()
+
+ def __repr__(self):
+ return '<psycopgroundsql 0x%x>' % id(self)
+
+ def sql_fetchone(self):
+ return self.cursor.fetchone()
+
+ def sql_fetchall(self):
+ return self.cursor.fetchall()
+
+ def sql_stringquote(self, value):
+ return psycopg.QuotedString(str(value))
+
+ def save_dbschema(self, schema):
+ s = repr(self.database_schema)
+ self.sql('INSERT INTO schema VALUES (%s)', (s,))
+
+ def load_dbschema(self):
+ self.cursor.execute('SELECT schema FROM schema')
+ schema = self.cursor.fetchone()
+ if schema:
+ return eval(schema[0])
+
+ def save_journal(self, classname, cols, nodeid, journaldate,
+ journaltag, action, params):
+ params = repr(params)
+ entry = (nodeid, journaldate, journaltag, action, params)
+
+ a = self.arg
+ sql = 'INSERT INTO %s__journal (%s) values (%s, %s, %s, %s, %s)'%(
+ classname, cols, a, a, a, a, a)
+
+ if __debug__:
+ print >>hyperdb.DEBUG, 'addjournal', (self, sql, entry)
+
+ self.cursor.execute(sql, entry)
+
+ def load_journal(self, classname, cols, nodeid):
+ sql = 'SELECT %s FROM %s__journal WHERE nodeid = %s' % (
+ cols, classname, self.arg)
+
+ if __debug__:
+ print >>hyperdb.DEBUG, 'getjournal', (self, sql, nodeid)
+
+ self.cursor.execute(sql, (nodeid,))
+ res = []
+ for nodeid, date_stamp, user, action, params in self.cursor.fetchall():
+ params = eval(params)
+ res.append((nodeid, date.Date(date_stamp), user, action, params))
+ return res
+
+ def create_class_table(self, spec):
+ cols, mls = self.determine_columns(spec.properties.items())
+ cols.append('id')
+ cols.append('__retired__')
+ scols = ',' . join(['"%s" VARCHAR(255)' % x for x in cols])
+ sql = 'CREATE TABLE "_%s" (%s)' % (spec.classname, scols)
+
+ if __debug__:
+ print >>hyperdb.DEBUG, 'create_class', (self, sql)
+
+ self.cursor.execute(sql)
+ return cols, mls
+
+ def create_journal_table(self, spec):
+ cols = ',' . join(['"%s" VARCHAR(255)' % x
+ for x in 'nodeid date tag action params' . split()])
+ sql = 'CREATE TABLE "%s__journal" (%s)'%(spec.classname, cols)
+
+ if __debug__:
+ print >>hyperdb.DEBUG, 'create_class', (self, sql)
+
+ self.cursor.execute(sql)
+
+ def create_multilink_table(self, spec, ml):
+ sql = '''CREATE TABLE "%s_%s" (linkid VARCHAR(255),
+ nodeid VARCHAR(255))''' % (spec.classname, ml)
+
+ if __debug__:
+ print >>hyperdb.DEBUG, 'create_class', (self, sql)
+
+ self.cursor.execute(sql)
+
+ # Static methods
+ nuke = Maintenance().db_nuke
+ exists = Maintenance().db_exists
+
+class PsycopgClass:
+ def find(self, **propspec):
+ """Get the ids of nodes in this class which link to the given nodes."""
+
+ if __debug__:
+ print >>hyperdb.DEBUG, 'find', (self, propspec)
+
+ # shortcut
+ if not propspec:
+ return []
+
+ # validate the args
+ props = self.getprops()
+ propspec = propspec.items()
+ for propname, nodeids in propspec:
+ # check the prop is OK
+ prop = props[propname]
+ if not isinstance(prop, Link) and not isinstance(prop, Multilink):
+ raise TypeError, "'%s' not a Link/Multilink property"%propname
+
+ # first, links
+ l = []
+ where = []
+ allvalues = ()
+ a = self.db.arg
+ for prop, values in propspec:
+ if not isinstance(props[prop], hyperdb.Link):
+ continue
+ if type(values) is type(''):
+ allvalues += (values,)
+ where.append('_%s = %s' % (prop, a))
+ else:
+ allvalues += tuple(values.keys())
+ where.append('_%s in (%s)' % (prop, ','.join([a]*len(values))))
+ tables = []
+ if where:
+ self.db.sql('SELECT id AS nodeid FROM _%s WHERE %s' % (
+ self.classname, ' and '.join(where)), allvalues)
+ l += [x[0] for x in self.db.sql_fetchall()]
+
+ # now multilinks
+ for prop, values in propspec:
+ vals = ()
+ if not isinstance(props[prop], hyperdb.Multilink):
+ continue
+ if type(values) is type(''):
+ vals = (values,)
+ s = a
+ else:
+ vals = tuple(values.keys())
+ s = ','.join([a]*len(values))
+ query = 'SELECT nodeid FROM %s_%s WHERE linkid IN (%s)'%(
+ self.classname, prop, s)
+ self.db.sql(query, vals)
+ l += [x[0] for x in self.db.sql_fetchall()]
+
+ if __debug__:
+ print >>hyperdb.DEBUG, 'find ... ', l
+
+ # Remove duplicated ids
+ d = {}
+ for k in l:
+ d[k] = 1
+ return d.keys()
+
+ return l
+
+class Class(PsycopgClass, rdbms_common.Class):
+ pass
+class IssueClass(PsycopgClass, rdbms_common.IssueClass):
+ pass
+class FileClass(PsycopgClass, rdbms_common.FileClass):
+ pass
+
index 00921016ff1c185a6e67cd47970b7dc2b2e370ef..4c5760cdc56d4f3cb4594cb3c8a6ea0395460fc4 100644 (file)
-# $Id: rdbms_common.py,v 1.65 2003-10-07 11:58:58 anthonybaxter Exp $
+# $Id: rdbms_common.py,v 1.66 2003-10-25 22:53:26 richard Exp $
''' Relational database (SQL) backend common code.
Basics:
self.database_schema[classname] = spec.schema()
save = 1
- for classname in self.database_schema.keys():
+ for classname, spec in self.database_schema.items():
if not self.classes.has_key(classname):
- self.drop_class(classname)
+ self.drop_class(classname, spec)
+ del self.database_schema[classname]
+ save = 1
# update the database version of the schema
if save:
database version of the spec, and update where necessary.
If 'force' is true, update the database anyway.
'''
- new_spec = spec
- new_has = new_spec.properties.has_key
-
- new_spec = new_spec.schema()
+ new_has = spec.properties.has_key
+ new_spec = spec.schema()
new_spec[1].sort()
old_spec[1].sort()
if not force and new_spec == old_spec:
if __debug__:
print >>hyperdb.DEBUG, 'update_class FIRING'
- # key property changed?
- if force or old_spec[0] != new_spec[0]:
- if __debug__:
- print >>hyperdb.DEBUG, 'update_class setting keyprop', `spec[0]`
- # XXX turn on indexing for the key property.
- index_sql = 'drop index _%s_%s_idx'%(
- spec.classname, old_spec[0])
- if __debug__:
- print >>hyperdb.DEBUG, 'drop_index', (self, index_sql)
- try:
- self.cursor.execute(index_sql1)
- except:
- # Hackage. Until we update the schema to include some
- # backend-specific knowledge, assume that this might fail.
- pass
-
- index_sql = 'create index _%s_%s_idx on _%s(%s)'%(
- spec.classname, new_spec[0],
- spec.classname, new_spec[0])
- if __debug__:
- print >>hyperdb.DEBUG, 'create_index', (self, index_sql)
- self.cursor.execute(index_sql1)
-
# detect multilinks that have been removed, and drop their table
old_has = {}
for name,prop in old_spec[1]:
self.cursor.execute(sql)
olddata = self.cursor.fetchall()
- # drop the old table - indexes first
+ # drop the old table indexes first
index_sqls = [ 'drop index _%s_id_idx'%cn,
'drop index _%s_retired_idx'%cn ]
+ if old_spec[0]:
+ index_sqls.append('drop index _%s_%s_idx'%(cn, old_spec[0]))
for index_sql in index_sqls:
if __debug__:
print >>hyperdb.DEBUG, 'drop_index', (self, index_sql)
# The database may not actually have any indexes.
# assume the worst.
pass
+
+ # drop the old table
self.cursor.execute('drop table _%s'%cn)
+
# create the new table
self.create_class_table(spec)
if __debug__:
print >>hyperdb.DEBUG, 'create_class', (self, sql)
self.cursor.execute(sql)
+
+ # create id index
index_sql1 = 'create index _%s_id_idx on _%s(id)'%(
spec.classname, spec.classname)
if __debug__:
print >>hyperdb.DEBUG, 'create_index', (self, index_sql1)
self.cursor.execute(index_sql1)
+
+ # create __retired__ index
index_sql2 = 'create index _%s_retired_idx on _%s(__retired__)'%(
spec.classname, spec.classname)
if __debug__:
print >>hyperdb.DEBUG, 'create_index', (self, index_sql2)
self.cursor.execute(index_sql2)
+ # create index for key property
+ if spec.key:
+ if __debug__:
+ print >>hyperdb.DEBUG, 'update_class setting keyprop %r'% \
+ spec.key
+ index_sql3 = 'create index _%s_%s_idx on _%s(_%s)'%(
+ spec.classname, spec.key,
+ spec.classname, spec.key)
+ if __debug__:
+ print >>hyperdb.DEBUG, 'create_index', (self, index_sql3)
+ self.cursor.execute(index_sql3)
+
return cols, mls
def create_journal_table(self, spec):
if __debug__:
print >>hyperdb.DEBUG, 'create_class', (self, sql)
self.cursor.execute(sql)
+
+ # index on nodeid
index_sql = 'create index %s_journ_idx on %s__journal(nodeid)'%(
spec.classname, spec.classname)
if __debug__:
''' Create a multilink table for the "ml" property of the class
given by the spec
'''
+ # create the table
sql = 'create table %s_%s (linkid varchar, nodeid varchar)'%(
spec.classname, ml)
if __debug__:
print >>hyperdb.DEBUG, 'create_class', (self, sql)
self.cursor.execute(sql)
+
+ # create index on linkid
index_sql = 'create index %s_%s_l_idx on %s_%s(linkid)'%(
spec.classname, ml, spec.classname, ml)
if __debug__:
print >>hyperdb.DEBUG, 'create_index', (self, index_sql)
self.cursor.execute(index_sql)
+
+ # create index on nodeid
index_sql = 'create index %s_%s_n_idx on %s_%s(nodeid)'%(
spec.classname, ml, spec.classname, ml)
if __debug__:
print >>hyperdb.DEBUG, 'create_class', (self, sql, vals)
self.cursor.execute(sql, vals)
- def drop_class(self, spec):
+ def drop_class(self, cn, spec):
''' Drop the given table from the database.
Drop the journal and multilink tables too.
'''
+ properties = spec[1]
# figure the multilinks
mls = []
- for col, prop in spec.properties.items():
+ for propanme, prop in properties:
if isinstance(prop, Multilink):
- mls.append(col)
+ mls.append(propname)
index_sqls = [ 'drop index _%s_id_idx'%cn,
'drop index _%s_retired_idx'%cn,
'drop index %s_journ_idx'%cn ]
+ if spec[0]:
+ index_sqls.append('drop index _%s_%s_idx'%(cn, spec[0]))
for index_sql in index_sqls:
if __debug__:
print >>hyperdb.DEBUG, 'drop_index', (self, index_sql)
# assume the worst.
pass
- sql = 'drop table _%s'%spec.classname
+ sql = 'drop table _%s'%cn
if __debug__:
print >>hyperdb.DEBUG, 'drop_class', (self, sql)
self.cursor.execute(sql)
- sql = 'drop table %s__journal'%spec.classname
+
+ sql = 'drop table %s__journal'%cn
if __debug__:
print >>hyperdb.DEBUG, 'drop_class', (self, sql)
self.cursor.execute(sql)
for ml in mls:
index_sqls = [
- 'drop index %s_%s_n_idx'%(spec.classname, ml),
- 'drop index %s_%s_l_idx'%(spec.classname, ml),
+ 'drop index %s_%s_n_idx'%(cn, ml),
+ 'drop index %s_%s_l_idx'%(cn, ml),
]
for index_sql in index_sqls:
if __debug__:
diff --git a/run_tests.py b/run_tests.py
--- /dev/null
+++ b/run_tests.py
@@ -0,0 +1,890 @@
+#! /usr/bin/env python2.2
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+test.py [-aBbcdDfgGhLmprtTuv] [modfilter [testfilter]]
+
+Test harness.
+
+-a level
+--all
+ Run the tests at the given level. Any test at a level at or below this is
+ run, any test at a level above this is not run. Level 0 runs all tests.
+ The default is to run tests at level 1. --all is a shortcut for -a 0.
+
+-b
+--build
+ Run "python setup.py build" before running tests, where "python"
+ is the version of python used to run test.py. Highly recommended.
+ Tests will be run from the build directory. (Note: In Python < 2.3
+ the -q flag is added to the setup.py command line.)
+
+-B
+ Run "python setup.py build_ext -i" before running tests. Tests will be
+ run from the source directory.
+
+-c use pychecker
+
+-d
+ Instead of the normal test harness, run a debug version which
+ doesn't catch any exceptions. This is occasionally handy when the
+ unittest code catching the exception doesn't work right.
+ Unfortunately, the debug harness doesn't print the name of the
+ test, so Use With Care.
+
+--dir directory
+ Option to limit where tests are searched for. This is
+ important when you *really* want to limit the code that gets run.
+ For example, if refactoring interfaces, you don't want to see the way
+ you have broken setups for tests in other packages. You *just* want to
+ run the interface tests.
+
+-D
+ Works like -d, except that it loads pdb when an exception occurs.
+
+-f
+ Run functional tests instead of unit tests.
+
+-g threshold
+ Set the garbage collector generation0 threshold. This can be used to
+ stress memory and gc correctness. Some crashes are only reproducible when
+ the threshold is set to 1 (agressive garbage collection). Do "-g 0" to
+ disable garbage collection altogether.
+
+-G gc_option
+ Set the garbage collection debugging flags. The argument must be one
+ of the DEBUG_ flags defined bythe Python gc module. Multiple options
+ can be specified by using "-G OPTION1 -G OPTION2."
+
+--libdir test_root
+ Search for tests starting in the specified start directory
+ (useful for testing components being developed outside the main
+ "src" or "build" trees).
+
+--keepbytecode
+ Do not delete all stale bytecode before running tests
+
+-L
+ Keep running the selected tests in a loop. You may experience
+ memory leakage.
+
+-t
+ Time the individual tests and print a list of the top 50, sorted from
+ longest to shortest.
+
+-p
+ Show running progress. It can be combined with -v or -vv.
+
+-r
+ Look for refcount problems.
+ This requires that Python was built --with-pydebug.
+
+-T
+ Use the trace module from Python for code coverage. XXX This only works
+ if trace.py is explicitly added to PYTHONPATH. The current utility writes
+ coverage files to a directory named `coverage' that is parallel to
+ `build'. It also prints a summary to stdout.
+
+-v
+ Verbose output. With one -v, unittest prints a dot (".") for each test
+ run. With -vv, unittest prints the name of each test (for some definition
+ of "name" ...). With no -v, unittest is silent until the end of the run,
+ except when errors occur.
+
+ When -p is also specified, the meaning of -v is sligtly changed. With
+ -p and no -v only the percent indicator is displayed. With -p and -v
+ the test name of the current test is shown to the right of the percent
+ indicator. With -p and -vv the test name is not truncated to fit into
+ 80 columns and it is not cleared after the test finishes.
+
+-u
+-m
+ Use the PyUnit GUI instead of output to the command line. The GUI imports
+ tests on its own, taking care to reload all dependencies on each run. The
+ debug (-d), verbose (-v), progress (-p), and Loop (-L) options will be
+ ignored. The testfilter filter is also not applied.
+
+ -m starts the gui minimized. Double-clicking the progress bar will start
+ the import and run all tests.
+
+
+modfilter
+testfilter
+ Case-sensitive regexps to limit which tests are run, used in search
+ (not match) mode.
+ In an extension of Python regexp notation, a leading "!" is stripped
+ and causes the sense of the remaining regexp to be negated (so "!bc"
+ matches any string that does not match "bc", and vice versa).
+ By default these act like ".", i.e. nothing is excluded.
+
+ modfilter is applied to a test file's path, starting at "build" and
+ including (OS-dependent) path separators.
+
+ testfilter is applied to the (method) name of the unittest methods
+ contained in the test files whose paths modfilter matched.
+
+Extreme (yet useful) examples:
+
+ test.py -vvb . "^testWriteClient$"
+
+ Builds the project silently, then runs unittest in verbose mode on all
+ tests whose names are precisely "testWriteClient". Useful when
+ debugging a specific test.
+
+ test.py -vvb . "!^testWriteClient$"
+
+ As before, but runs all tests whose names aren't precisely
+ "testWriteClient". Useful to avoid a specific failing test you don't
+ want to deal with just yet.
+
+ test.py -m . "!^testWriteClient$"
+
+ As before, but now opens up a minimized PyUnit GUI window (only showing
+ the progress bar). Useful for refactoring runs where you continually want
+ to make sure all tests still pass.
+"""
+
+import gc
+import os
+import re
+import pdb
+import sys
+import threading # just to get at Thread objects created by tests
+import time
+import traceback
+import unittest
+import warnings
+
+from distutils.util import get_platform
+
+PLAT_SPEC = "%s-%s" % (get_platform(), sys.version[0:3])
+
+class ImmediateTestResult(unittest._TextTestResult):
+
+ __super_init = unittest._TextTestResult.__init__
+ __super_startTest = unittest._TextTestResult.startTest
+ __super_printErrors = unittest._TextTestResult.printErrors
+
+ def __init__(self, stream, descriptions, verbosity, debug=False,
+ count=None, progress=False):
+ self.__super_init(stream, descriptions, verbosity)
+ self._debug = debug
+ self._progress = progress
+ self._progressWithNames = False
+ self._count = count
+ self._testtimes = {}
+ if progress and verbosity == 1:
+ self.dots = False
+ self._progressWithNames = True
+ self._lastWidth = 0
+ self._maxWidth = 80
+ try:
+ import curses
+ except ImportError:
+ pass
+ else:
+ curses.setupterm()
+ self._maxWidth = curses.tigetnum('cols')
+ self._maxWidth -= len("xxxx/xxxx (xxx.x%): ") + 1
+
+ def stopTest(self, test):
+ self._testtimes[test] = time.time() - self._testtimes[test]
+ if gc.garbage:
+ print "The following test left garbage:"
+ print test
+ print gc.garbage
+ # eat the garbage here, so that the garbage isn't
+ # printed for every subsequent test.
+ gc.garbage[:] = []
+
+ # Did the test leave any new threads behind?
+ new_threads = [t for t in threading.enumerate()
+ if (t.isAlive()
+ and
+ t not in self._threads)]
+ if new_threads:
+ print "The following test left new threads behind:"
+ print test
+ print "New thread(s):", new_threads
+
+ def print_times(self, stream, count=None):
+ results = self._testtimes.items()
+ results.sort(lambda x, y: cmp(y[1], x[1]))
+ if count:
+ n = min(count, len(results))
+ if n:
+ print >>stream, "Top %d longest tests:" % n
+ else:
+ n = len(results)
+ if not n:
+ return
+ for i in range(n):
+ print >>stream, "%6dms" % int(results[i][1] * 1000), results[i][0]
+
+ def _print_traceback(self, msg, err, test, errlist):
+ if self.showAll or self.dots or self._progress:
+ self.stream.writeln("\n")
+ self._lastWidth = 0
+
+ tb = "".join(traceback.format_exception(*err))
+ self.stream.writeln(msg)
+ self.stream.writeln(tb)
+ errlist.append((test, tb))
+
+ def startTest(self, test):
+ if self._progress:
+ self.stream.write("\r%4d" % (self.testsRun + 1))
+ if self._count:
+ self.stream.write("/%d (%5.1f%%)" % (self._count,
+ (self.testsRun + 1) * 100.0 / self._count))
+ if self.showAll:
+ self.stream.write(": ")
+ elif self._progressWithNames:
+ # XXX will break with multibyte strings
+ name = self.getShortDescription(test)
+ width = len(name)
+ if width < self._lastWidth:
+ name += " " * (self._lastWidth - width)
+ self.stream.write(": %s" % name)
+ self._lastWidth = width
+ self.stream.flush()
+ self._threads = threading.enumerate()
+ self.__super_startTest(test)
+ self._testtimes[test] = time.time()
+
+ def getShortDescription(self, test):
+ s = self.getDescription(test)
+ if len(s) > self._maxWidth:
+ pos = s.find(" (")
+ if pos >= 0:
+ w = self._maxWidth - (pos + 5)
+ if w < 1:
+ # first portion (test method name) is too long
+ s = s[:self._maxWidth-3] + "..."
+ else:
+ pre = s[:pos+2]
+ post = s[-w:]
+ s = "%s...%s" % (pre, post)
+ return s[:self._maxWidth]
+
+ def addError(self, test, err):
+ if self._progress:
+ self.stream.write("\r")
+ if self._debug:
+ raise err[0], err[1], err[2]
+ self._print_traceback("Error in test %s" % test, err,
+ test, self.errors)
+
+ def addFailure(self, test, err):
+ if self._progress:
+ self.stream.write("\r")
+ if self._debug:
+ raise err[0], err[1], err[2]
+ self._print_traceback("Failure in test %s" % test, err,
+ test, self.failures)
+
+ def printErrors(self):
+ if self._progress and not (self.dots or self.showAll):
+ self.stream.writeln()
+ self.__super_printErrors()
+
+ def printErrorList(self, flavor, errors):
+ for test, err in errors:
+ self.stream.writeln(self.separator1)
+ self.stream.writeln("%s: %s" % (flavor, self.getDescription(test)))
+ self.stream.writeln(self.separator2)
+ self.stream.writeln(err)
+
+
+class ImmediateTestRunner(unittest.TextTestRunner):
+
+ __super_init = unittest.TextTestRunner.__init__
+
+ def __init__(self, **kwarg):
+ debug = kwarg.get("debug")
+ if debug is not None:
+ del kwarg["debug"]
+ progress = kwarg.get("progress")
+ if progress is not None:
+ del kwarg["progress"]
+ self.__super_init(**kwarg)
+ self._debug = debug
+ self._progress = progress
+
+ def _makeResult(self):
+ return ImmediateTestResult(self.stream, self.descriptions,
+ self.verbosity, debug=self._debug,
+ count=self._count, progress=self._progress)
+
+ def run(self, test):
+ self._count = test.countTestCases()
+ return unittest.TextTestRunner.run(self, test)
+
+# setup list of directories to put on the path
+class PathInit:
+ def __init__(self, build, build_inplace, libdir=None):
+ self.inplace = None
+ # Figure out if we should test in-place or test in-build. If the -b
+ # or -B option was given, test in the place we were told to build in.
+ # Otherwise, we'll look for a build directory and if we find one,
+ # we'll test there, otherwise we'll test in-place.
+ if build:
+ self.inplace = build_inplace
+ if self.inplace is None:
+ # Need to figure it out
+ if os.path.isdir(os.path.join("build", "lib.%s" % PLAT_SPEC)):
+ self.inplace = False
+ else:
+ self.inplace = True
+ # Calculate which directories we're going to add to sys.path, and cd
+ # to the appropriate working directory
+ org_cwd = os.getcwd()
+ if self.inplace:
+ self.libdir = "src"
+ else:
+ self.libdir = "lib.%s" % PLAT_SPEC
+ os.chdir("build")
+ # Hack sys.path
+ self.cwd = os.getcwd()
+ sys.path.insert(0, os.path.join(self.cwd, self.libdir))
+ # Hack again for external products.
+ global functional
+ kind = functional and "functional" or "unit"
+ if libdir:
+ extra = os.path.join(org_cwd, libdir)
+ print "Running %s tests from %s" % (kind, extra)
+ self.libdir = extra
+ sys.path.insert(0, extra)
+ else:
+ print "Running %s tests from %s" % (kind, self.cwd)
+ # Make sure functional tests find ftesting.zcml
+ if functional:
+ config_file = 'ftesting.zcml'
+ if not self.inplace:
+ # We chdired into build, so ftesting.zcml is in the
+ # parent directory
+ config_file = os.path.join('..', 'ftesting.zcml')
+ print "Parsing %s" % config_file
+ from zope.testing.functional import FunctionalTestSetup
+ FunctionalTestSetup(config_file)
+
+def match(rx, s):
+ if not rx:
+ return True
+ if rx[0] == "!":
+ return re.search(rx[1:], s) is None
+ else:
+ return re.search(rx, s) is not None
+
+class TestFileFinder:
+ def __init__(self, prefix):
+ self.files = []
+ self._plen = len(prefix)
+ if not prefix.endswith(os.sep):
+ self._plen += 1
+ global functional
+ if functional:
+ self.dirname = "ftest"
+ else:
+ self.dirname = "test"
+
+ def visit(self, rx, dir, files):
+ if os.path.split(dir)[1] != self.dirname:
+ return
+ # ignore tests that aren't in packages
+ if not "__init__.py" in files:
+ if not files or files == ["CVS"]:
+ return
+ print "not a package", dir
+ return
+
+ # Put matching files in matches. If matches is non-empty,
+ # then make sure that the package is importable.
+ matches = []
+ for file in files:
+ if file.startswith('test') and os.path.splitext(file)[-1] == '.py':
+ path = os.path.join(dir, file)
+ if match(rx, path):
+ matches.append(path)
+
+ # ignore tests when the package can't be imported, possibly due to
+ # dependency failures.
+ pkg = dir[self._plen:].replace(os.sep, '.')
+ try:
+ __import__(pkg)
+ # We specifically do not want to catch ImportError since that's useful
+ # information to know when running the tests.
+ except RuntimeError, e:
+ if VERBOSE:
+ print "skipping %s because: %s" % (pkg, e)
+ return
+ else:
+ self.files.extend(matches)
+
+ def module_from_path(self, path):
+ """Return the Python package name indicated by the filesystem path."""
+ assert path.endswith(".py")
+ path = path[self._plen:-3]
+ mod = path.replace(os.sep, ".")
+ return mod
+
+def walk_with_symlinks(top, func, arg):
+ """Like os.path.walk, but follows symlinks on POSIX systems.
+
+ This could theoreticaly result in an infinite loop, if you create symlink
+ cycles in your Zope sandbox, so don't do that.
+ """
+ try:
+ names = os.listdir(top)
+ except os.error:
+ return
+ func(arg, top, names)
+ exceptions = ('.', '..')
+ for name in names:
+ if name not in exceptions:
+ name = os.path.join(top, name)
+ if os.path.isdir(name):
+ walk_with_symlinks(name, func, arg)
+
+
+def check_test_dir():
+ global test_dir
+ if test_dir and not os.path.exists(test_dir):
+ d = pathinit.libdir
+ d = os.path.join(d, test_dir)
+ if os.path.exists(d):
+ if not os.path.isdir(d):
+ raise ValueError(
+ "%s does not exist and %s is not a directory"
+ % (test_dir, d)
+ )
+ test_dir = d
+ else:
+ raise ValueError("%s does not exist!" % test_dir)
+
+
+def find_tests(rx):
+ global finder
+ finder = TestFileFinder(pathinit.libdir)
+
+ check_test_dir()
+ walkdir = test_dir or pathinit.libdir
+ walk_with_symlinks(walkdir, finder.visit, rx)
+ return finder.files
+
+def package_import(modname):
+ mod = __import__(modname)
+ for part in modname.split(".")[1:]:
+ mod = getattr(mod, part)
+ return mod
+
+def get_suite(file):
+ modname = finder.module_from_path(file)
+ try:
+ mod = package_import(modname)
+ except ImportError, err:
+ # print traceback
+ print "Error importing %s\n%s" % (modname, err)
+ traceback.print_exc()
+ if debug:
+ raise
+ return None
+ try:
+ suite_func = mod.test_suite
+ except AttributeError:
+ print "No test_suite() in %s" % file
+ return None
+ return suite_func()
+
+def filter_testcases(s, rx):
+ new = unittest.TestSuite()
+ for test in s._tests:
+ # See if the levels match
+ dolevel = (level == 0) or level >= getattr(test, "level", 0)
+ if not dolevel:
+ continue
+ if isinstance(test, unittest.TestCase):
+ name = test.id() # Full test name: package.module.class.method
+ name = name[1 + name.rfind("."):] # extract method name
+ if not rx or match(rx, name):
+ new.addTest(test)
+ else:
+ filtered = filter_testcases(test, rx)
+ if filtered:
+ new.addTest(filtered)
+ return new
+
+def gui_runner(files, test_filter):
+ if build_inplace:
+ utildir = os.path.join(os.getcwd(), "utilities")
+ else:
+ utildir = os.path.join(os.getcwd(), "..", "utilities")
+ sys.path.append(utildir)
+ import unittestgui
+ suites = []
+ for file in files:
+ suites.append(finder.module_from_path(file) + ".test_suite")
+
+ suites = ", ".join(suites)
+ minimal = (GUI == "minimal")
+ unittestgui.main(suites, minimal)
+
+class TrackRefs:
+ """Object to track reference counts across test runs."""
+
+ def __init__(self):
+ self.type2count = {}
+ self.type2all = {}
+
+ def update(self):
+ obs = sys.getobjects(0)
+ type2count = {}
+ type2all = {}
+ for o in obs:
+ all = sys.getrefcount(o)
+ t = type(o)
+ if t in type2count:
+ type2count[t] += 1
+ type2all[t] += all
+ else:
+ type2count[t] = 1
+ type2all[t] = all
+
+ ct = [(type2count[t] - self.type2count.get(t, 0),
+ type2all[t] - self.type2all.get(t, 0),
+ t)
+ for t in type2count.iterkeys()]
+ ct.sort()
+ ct.reverse()
+ for delta1, delta2, t in ct:
+ if delta1 or delta2:
+ print "%-55s %8d %8d" % (t, delta1, delta2)
+
+ self.type2count = type2count
+ self.type2all = type2all
+
+def runner(files, test_filter, debug):
+ runner = ImmediateTestRunner(verbosity=VERBOSE, debug=debug,
+ progress=progress)
+ suite = unittest.TestSuite()
+ for file in files:
+ s = get_suite(file)
+ # See if the levels match
+ dolevel = (level == 0) or level >= getattr(s, "level", 0)
+ if s is not None and dolevel:
+ s = filter_testcases(s, test_filter)
+ suite.addTest(s)
+ try:
+ r = runner.run(suite)
+ if timesfn:
+ r.print_times(open(timesfn, "w"))
+ if VERBOSE:
+ print "Wrote timing data to", timesfn
+ if timetests:
+ r.print_times(sys.stdout, timetests)
+ except:
+ if debugger:
+ print "%s:" % (sys.exc_info()[0], )
+ print sys.exc_info()[1]
+ pdb.post_mortem(sys.exc_info()[2])
+ else:
+ raise
+
+def remove_stale_bytecode(arg, dirname, names):
+ names = map(os.path.normcase, names)
+ for name in names:
+ if name.endswith(".pyc") or name.endswith(".pyo"):
+ srcname = name[:-1]
+ if srcname not in names:
+ fullname = os.path.join(dirname, name)
+ print "Removing stale bytecode file", fullname
+ os.unlink(fullname)
+
+def main(module_filter, test_filter, libdir):
+ if not keepStaleBytecode:
+ os.path.walk(os.curdir, remove_stale_bytecode, None)
+
+ # Get the log.ini file from the current directory instead of possibly
+ # buried in the build directory. XXX This isn't perfect because if
+ # log.ini specifies a log file, it'll be relative to the build directory.
+ # Hmm...
+ logini = os.path.abspath("log.ini")
+
+ # Initialize the path and cwd
+ global pathinit
+ pathinit = PathInit(build, build_inplace, libdir)
+
+ # Initialize the logging module.
+
+ import logging.config
+ logging.basicConfig()
+
+ level = os.getenv("LOGGING")
+ if level:
+ level = int(level)
+ else:
+ level = logging.CRITICAL
+ logging.root.setLevel(level)
+
+ if os.path.exists(logini):
+ logging.config.fileConfig(logini)
+
+ files = find_tests(module_filter)
+ files.sort()
+
+ if GUI:
+ gui_runner(files, test_filter)
+ elif LOOP:
+ if REFCOUNT:
+ rc = sys.gettotalrefcount()
+ track = TrackRefs()
+ while True:
+ runner(files, test_filter, debug)
+ gc.collect()
+ if gc.garbage:
+ print "GARBAGE:", len(gc.garbage), gc.garbage
+ return
+ if REFCOUNT:
+ prev = rc
+ rc = sys.gettotalrefcount()
+ print "totalrefcount=%-8d change=%-6d" % (rc, rc - prev)
+ track.update()
+ else:
+ runner(files, test_filter, debug)
+
+
+def process_args(argv=None):
+ import getopt
+ global module_filter
+ global test_filter
+ global VERBOSE
+ global LOOP
+ global GUI
+ global TRACE
+ global REFCOUNT
+ global debug
+ global debugger
+ global build
+ global level
+ global libdir
+ global timesfn
+ global timetests
+ global progress
+ global build_inplace
+ global keepStaleBytecode
+ global functional
+ global test_dir
+
+ if argv is None:
+ argv = sys.argv
+
+ module_filter = None
+ test_filter = None
+ VERBOSE = 1
+ LOOP = False
+ GUI = False
+ TRACE = False
+ REFCOUNT = False
+ debug = False # Don't collect test results; simply let tests crash
+ debugger = False
+ build = False
+ build_inplace = False
+ gcthresh = None
+ gcdebug = 0
+ gcflags = []
+ level = 1
+ libdir = '.'
+ progress = False
+ timesfn = None
+ timetests = 0
+ keepStaleBytecode = 0
+ functional = False
+ test_dir = None
+
+ try:
+ opts, args = getopt.getopt(argv[1:], "a:bBcdDfg:G:hLmprtTuv",
+ ["all", "help", "libdir=", "times=",
+ "keepbytecode", "dir=", "build"])
+ except getopt.error, msg:
+ print msg
+ print "Try `python %s -h' for more information." % argv[0]
+ sys.exit(2)
+
+ for k, v in opts:
+ if k == "-a":
+ level = int(v)
+ elif k == "--all":
+ level = 0
+ os.environ["COMPLAIN_IF_TESTS_MISSED"]='1'
+ elif k in ("-b", "--build"):
+ build = True
+ elif k == "-B":
+ build = build_inplace = True
+ elif k == "-c":
+ # make sure you have a recent version of pychecker
+ if not os.environ.get("PYCHECKER"):
+ os.environ["PYCHECKER"] = "-q"
+ import pychecker.checker
+ elif k == "-d":
+ debug = True
+ elif k == "-D":
+ debug = True
+ debugger = True
+ elif k == "-f":
+ functional = True
+ elif k in ("-h", "--help"):
+ print __doc__
+ sys.exit(0)
+ elif k == "-g":
+ gcthresh = int(v)
+ elif k == "-G":
+ if not v.startswith("DEBUG_"):
+ print "-G argument must be DEBUG_ flag, not", repr(v)
+ sys.exit(1)
+ gcflags.append(v)
+ elif k == '--keepbytecode':
+ keepStaleBytecode = 1
+ elif k == '--libdir':
+ libdir = v
+ elif k == "-L":
+ LOOP = 1
+ elif k == "-m":
+ GUI = "minimal"
+ elif k == "-p":
+ progress = True
+ elif k == "-r":
+ if hasattr(sys, "gettotalrefcount"):
+ REFCOUNT = True
+ else:
+ print "-r ignored, because it needs a debug build of Python"
+ elif k == "-T":
+ TRACE = True
+ elif k == "-t":
+ if not timetests:
+ timetests = 50
+ elif k == "-u":
+ GUI = 1
+ elif k == "-v":
+ VERBOSE += 1
+ elif k == "--times":
+ try:
+ timetests = int(v)
+ except ValueError:
+ # must be a filename to write
+ timesfn = v
+ elif k == '--dir':
+ test_dir = v
+
+ if sys.version_info < ( 2,2,3 ):
+ print """\
+ ERROR: Your python version is not supported by Zope3.
+ Zope3 needs either Python2.3 or Python2.2.3 or greater.
+ In particular, Zope3 on Python2.2.2 is a recipe for
+ pain. You are running:""" + sys.version
+ sys.exit(1)
+
+ if gcthresh is not None:
+ if gcthresh == 0:
+ gc.disable()
+ print "gc disabled"
+ else:
+ gc.set_threshold(gcthresh)
+ print "gc threshold:", gc.get_threshold()
+
+ if gcflags:
+ val = 0
+ for flag in gcflags:
+ v = getattr(gc, flag, None)
+ if v is None:
+ print "Unknown gc flag", repr(flag)
+ print gc.set_debug.__doc__
+ sys.exit(1)
+ val |= v
+ gcdebug |= v
+
+ if gcdebug:
+ gc.set_debug(gcdebug)
+
+ if build:
+ # Python 2.3 is more sane in its non -q output
+ if sys.hexversion >= 0x02030000:
+ qflag = ""
+ else:
+ qflag = "-q"
+ cmd = sys.executable + " setup.py " + qflag + " build"
+ if build_inplace:
+ cmd += "_ext -i"
+ if VERBOSE:
+ print cmd
+ sts = os.system(cmd)
+ if sts:
+ print "Build failed", hex(sts)
+ sys.exit(1)
+
+ if VERBOSE:
+ kind = functional and "functional" or "unit"
+ if level == 0:
+ print "Running %s tests at all levels" % kind
+ else:
+ print "Running %s tests at level %d" % (kind, level)
+
+ # XXX We want to change *visible* warnings into errors. The next
+ # line changes all warnings into errors, including warnings we
+ # normally never see. In particular, test_datetime does some
+ # short-integer arithmetic that overflows to long ints, and, by
+ # default, Python doesn't display the overflow warning that can
+ # be enabled when this happens. The next line turns that into an
+ # error instead. Guido suggests that a better to get what we're
+ # after is to replace warnings.showwarning() with our own thing
+ # that raises an error.
+## warnings.filterwarnings("error")
+ warnings.filterwarnings("ignore", module="logging")
+
+ if args:
+ if len(args) > 1:
+ test_filter = args[1]
+ module_filter = args[0]
+ try:
+ if TRACE:
+ # if the trace module is used, then we don't exit with
+ # status if on a false return value from main.
+ coverdir = os.path.join(os.getcwd(), "coverage")
+ import trace
+ ignoremods = ["os", "posixpath", "stat"]
+ tracer = trace.Trace(ignoredirs=[sys.prefix, sys.exec_prefix],
+ ignoremods=ignoremods,
+ trace=False, count=True)
+
+ tracer.runctx("main(module_filter, test_filter, libdir)",
+ globals=globals(), locals=vars())
+ r = tracer.results()
+ path = "/tmp/trace.%s" % os.getpid()
+ import cPickle
+ f = open(path, "wb")
+ cPickle.dump(r, f)
+ f.close()
+ print path
+ r.write_results(show_missing=True, summary=True, coverdir=coverdir)
+ else:
+ bad = main(module_filter, test_filter, libdir)
+ if bad:
+ sys.exit(1)
+ except ImportError, err:
+ print err
+ print sys.path
+ raise
+
+
+if __name__ == "__main__":
+ process_args()
diff --git a/setup.py b/setup.py
index 70918726f4c8c7e3ad35adf796ceaf8f929476a7..df99ff30455b174ed7d19ef22727ead8abcf8ac5 100644 (file)
--- a/setup.py
+++ b/setup.py
# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#
-# $Id: setup.py,v 1.56 2003-08-18 06:31:59 richard Exp $
+# $Id: setup.py,v 1.57 2003-10-25 22:53:26 richard Exp $
from distutils.core import setup, Extension
from distutils.util import get_platform
setup(
name = "roundup",
version = __version__,
- description = "Roundup issue tracking system.",
+ description = "A simple-to-use and -install issue-tracking system"
+ " with command-line, web and e-mail interfaces. Highly"
+ " customisable.",
long_description =
'''Roundup is a simple-to-use and -install issue-tracking system with
command-line, web and e-mail interfaces. It is based on the winning design
diff --git a/test/__init__.py b/test/__init__.py
index 90d0f01d66674425a8546563022f96702181c9b9..07060e54b13a0c53849292443be99d633af6077d 100644 (file)
--- a/test/__init__.py
+++ b/test/__init__.py
-#
-# Copyright (c) 2001 Bizar Software Pty Ltd (http://www.bizarsoftware.com.au/)
-# This module is free software, and you may redistribute it and/or modify
-# under the same terms as Python, so long as this copyright message and
-# disclaimer are retained in their original form.
-#
-# IN NO EVENT SHALL BIZAR SOFTWARE PTY LTD BE LIABLE TO ANY PARTY FOR
-# DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING
-# OUT OF THE USE OF THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
-#
-# BIZAR SOFTWARE PTY LTD SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING,
-# BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
-# FOR A PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS"
-# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
-# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
-#
-# $Id: __init__.py,v 1.19 2003-09-07 20:37:33 jlgijsbers Exp $
-
-import os, tempfile, unittest, shutil, errno
-import roundup.roundupdb
-roundup.roundupdb.SENDMAILDEBUG=os.environ['SENDMAILDEBUG']=tempfile.mktemp()
-
-from roundup import init
-
-# figure all the modules available
-dir = os.path.split(__file__)[0]
-test_mods = {}
-for file in os.listdir(dir):
- if file.startswith('test_') and file.endswith('.py'):
- name = file[5:-3]
- test_mods[name] = __import__(file[:-3], globals(), locals(), [])
-all_tests = test_mods.keys()
-
-dirname = '_empty_instance'
-def create_empty_instance():
- remove_empty_instance()
- init.install(dirname, 'templates/classic')
- init.write_select_db(dirname, 'anydbm')
- init.initialise(dirname, 'sekrit')
-
-def remove_empty_instance():
- try:
- shutil.rmtree(dirname)
- except OSError, error:
- if error.errno not in (errno.ENOENT, errno.ESRCH): raise
-
-def go(tests=all_tests):
- try:
- l = []
- needs_instance = 0
- for name in tests:
- mod = test_mods[name]
- if hasattr(mod, 'NEEDS_INSTANCE'):
- needs_instance = 1
- l.append(test_mods[name].suite())
-
- if needs_instance:
- create_empty_instance()
-
- suite = unittest.TestSuite(l)
- runner = unittest.TextTestRunner()
- runner.run(suite)
- finally:
- remove_empty_instance()
-
-# vim: set filetype=python ts=4 sw=4 et si
+# make this dir a package
diff --git a/test/db_test_base.py b/test/db_test_base.py
--- /dev/null
+++ b/test/db_test_base.py
@@ -0,0 +1,988 @@
+#
+# Copyright (c) 2001 Bizar Software Pty Ltd (http://www.bizarsoftware.com.au/)
+# This module is free software, and you may redistribute it and/or modify
+# under the same terms as Python, so long as this copyright message and
+# disclaimer are retained in their original form.
+#
+# IN NO EVENT SHALL BIZAR SOFTWARE PTY LTD BE LIABLE TO ANY PARTY FOR
+# DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING
+# OUT OF THE USE OF THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# BIZAR SOFTWARE PTY LTD SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING,
+# BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+# FOR A PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS"
+# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
+# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
+#
+# $Id: db_test_base.py,v 1.1 2003-10-25 22:53:26 richard Exp $
+
+import unittest, os, shutil, errno, imp, sys, time
+
+from roundup.hyperdb import String, Password, Link, Multilink, Date, \
+ Interval, DatabaseError, Boolean, Number, Node
+from roundup import date, password
+from roundup import init
+from roundup.indexer import Indexer
+
+def setupSchema(db, create, module):
+ status = module.Class(db, "status", name=String())
+ status.setkey("name")
+ user = module.Class(db, "user", username=String(), password=Password(),
+ assignable=Boolean(), age=Number(), roles=String())
+ user.setkey("username")
+ file = module.FileClass(db, "file", name=String(), type=String(),
+ comment=String(indexme="yes"), fooz=Password())
+ issue = module.IssueClass(db, "issue", title=String(indexme="yes"),
+ status=Link("status"), nosy=Multilink("user"), deadline=Date(),
+ foo=Interval(), files=Multilink("file"), assignedto=Link('user'))
+ session = module.Class(db, 'session', title=String())
+ session.disableJournalling()
+ db.post_init()
+ if create:
+ user.create(username="admin", roles='Admin')
+ status.create(name="unread")
+ status.create(name="in-progress")
+ status.create(name="testing")
+ status.create(name="resolved")
+ db.commit()
+
+class MyTestCase(unittest.TestCase):
+ def tearDown(self):
+ if hasattr(self, 'db'):
+ self.db.close()
+ if os.path.exists('_test_dir'):
+ shutil.rmtree('_test_dir')
+
+class config:
+ DATABASE='_test_dir'
+ MAILHOST = 'localhost'
+ MAIL_DOMAIN = 'fill.me.in.'
+ TRACKER_NAME = 'Roundup issue tracker'
+ TRACKER_EMAIL = 'issue_tracker@%s'%MAIL_DOMAIN
+ TRACKER_WEB = 'http://some.useful.url/'
+ ADMIN_EMAIL = 'roundup-admin@%s'%MAIL_DOMAIN
+ FILTER_POSITION = 'bottom' # one of 'top', 'bottom', 'top and bottom'
+ ANONYMOUS_ACCESS = 'deny' # either 'deny' or 'allow'
+ ANONYMOUS_REGISTER = 'deny' # either 'deny' or 'allow'
+ MESSAGES_TO_AUTHOR = 'no' # either 'yes' or 'no'
+ EMAIL_SIGNATURE_POSITION = 'bottom'
+
+ # Mysql connection data
+ MYSQL_DBHOST = 'localhost'
+ MYSQL_DBUSER = 'rounduptest'
+ MYSQL_DBPASSWORD = 'rounduptest'
+ MYSQL_DBNAME = 'rounduptest'
+ MYSQL_DATABASE = (MYSQL_DBHOST, MYSQL_DBUSER, MYSQL_DBPASSWORD, MYSQL_DBNAME)
+
+ # Postgresql connection data
+ POSTGRESQL_DBHOST = 'localhost'
+ POSTGRESQL_DBUSER = 'rounduptest'
+ POSTGRESQL_DBPASSWORD = 'rounduptest'
+ POSTGRESQL_DBNAME = 'rounduptest'
+ POSTGRESQL_PORT = 5432
+ POSTGRESQL_DATABASE = {'host': POSTGRESQL_DBHOST, 'port': POSTGRESQL_PORT,
+ 'user': POSTGRESQL_DBUSER, 'password': POSTGRESQL_DBPASSWORD,
+ 'database': POSTGRESQL_DBNAME}
+
+class nodbconfig(config):
+ MYSQL_DATABASE = (config.MYSQL_DBHOST, config.MYSQL_DBUSER, config.MYSQL_DBPASSWORD)
+
+class DBTest(MyTestCase):
+ def setUp(self):
+ # remove previous test, ignore errors
+ if os.path.exists(config.DATABASE):
+ shutil.rmtree(config.DATABASE)
+ os.makedirs(config.DATABASE + '/files')
+ self.db = self.module.Database(config, 'admin')
+ setupSchema(self.db, 1, self.module)
+
+ #
+ # schema mutation
+ #
+ def testAddProperty(self):
+ self.db.issue.create(title="spam", status='1')
+ self.db.commit()
+
+ self.db.issue.addprop(fixer=Link("user"))
+ # force any post-init stuff to happen
+ self.db.post_init()
+ props = self.db.issue.getprops()
+ keys = props.keys()
+ keys.sort()
+ self.assertEqual(keys, ['activity', 'assignedto', 'creation',
+ 'creator', 'deadline', 'files', 'fixer', 'foo', 'id', 'messages',
+ 'nosy', 'status', 'superseder', 'title'])
+ self.assertEqual(self.db.issue.get('1', "fixer"), None)
+
+ def testRemoveProperty(self):
+ self.db.issue.create(title="spam", status='1')
+ self.db.commit()
+
+ del self.db.issue.properties['title']
+ self.db.post_init()
+ props = self.db.issue.getprops()
+ keys = props.keys()
+ keys.sort()
+ self.assertEqual(keys, ['activity', 'assignedto', 'creation',
+ 'creator', 'deadline', 'files', 'foo', 'id', 'messages',
+ 'nosy', 'status', 'superseder'])
+ self.assertEqual(self.db.issue.list(), ['1'])
+
+ def testAddRemoveProperty(self):
+ self.db.issue.create(title="spam", status='1')
+ self.db.commit()
+
+ self.db.issue.addprop(fixer=Link("user"))
+ del self.db.issue.properties['title']
+ self.db.post_init()
+ props = self.db.issue.getprops()
+ keys = props.keys()
+ keys.sort()
+ self.assertEqual(keys, ['activity', 'assignedto', 'creation',
+ 'creator', 'deadline', 'files', 'fixer', 'foo', 'id', 'messages',
+ 'nosy', 'status', 'superseder'])
+ self.assertEqual(self.db.issue.list(), ['1'])
+
+ #
+ # basic operations
+ #
+ def testIDGeneration(self):
+ id1 = self.db.issue.create(title="spam", status='1')
+ id2 = self.db.issue.create(title="eggs", status='2')
+ self.assertNotEqual(id1, id2)
+
+ def testStringChange(self):
+ for commit in (0,1):
+ # test set & retrieve
+ nid = self.db.issue.create(title="spam", status='1')
+ self.assertEqual(self.db.issue.get(nid, 'title'), 'spam')
+
+ # change and make sure we retrieve the correct value
+ self.db.issue.set(nid, title='eggs')
+ if commit: self.db.commit()
+ self.assertEqual(self.db.issue.get(nid, 'title'), 'eggs')
+
+ def testStringUnset(self):
+ for commit in (0,1):
+ nid = self.db.issue.create(title="spam", status='1')
+ if commit: self.db.commit()
+ self.assertEqual(self.db.issue.get(nid, 'title'), 'spam')
+ # make sure we can unset
+ self.db.issue.set(nid, title=None)
+ if commit: self.db.commit()
+ self.assertEqual(self.db.issue.get(nid, "title"), None)
+
+ def testLinkChange(self):
+ for commit in (0,1):
+ nid = self.db.issue.create(title="spam", status='1')
+ if commit: self.db.commit()
+ self.assertEqual(self.db.issue.get(nid, "status"), '1')
+ self.db.issue.set(nid, status='2')
+ if commit: self.db.commit()
+ self.assertEqual(self.db.issue.get(nid, "status"), '2')
+
+ def testLinkUnset(self):
+ for commit in (0,1):
+ nid = self.db.issue.create(title="spam", status='1')
+ if commit: self.db.commit()
+ self.db.issue.set(nid, status=None)
+ if commit: self.db.commit()
+ self.assertEqual(self.db.issue.get(nid, "status"), None)
+
+ def testMultilinkChange(self):
+ for commit in (0,1):
+ u1 = self.db.user.create(username='foo%s'%commit)
+ u2 = self.db.user.create(username='bar%s'%commit)
+ nid = self.db.issue.create(title="spam", nosy=[u1])
+ if commit: self.db.commit()
+ self.assertEqual(self.db.issue.get(nid, "nosy"), [u1])
+ self.db.issue.set(nid, nosy=[])
+ if commit: self.db.commit()
+ self.assertEqual(self.db.issue.get(nid, "nosy"), [])
+ self.db.issue.set(nid, nosy=[u1,u2])
+ if commit: self.db.commit()
+ self.assertEqual(self.db.issue.get(nid, "nosy"), [u1,u2])
+
+ def testDateChange(self):
+ for commit in (0,1):
+ nid = self.db.issue.create(title="spam", status='1')
+ a = self.db.issue.get(nid, "deadline")
+ if commit: self.db.commit()
+ self.db.issue.set(nid, deadline=date.Date())
+ b = self.db.issue.get(nid, "deadline")
+ if commit: self.db.commit()
+ self.assertNotEqual(a, b)
+ self.assertNotEqual(b, date.Date('1970-1-1 00:00:00'))
+
+ def testDateUnset(self):
+ for commit in (0,1):
+ nid = self.db.issue.create(title="spam", status='1')
+ self.db.issue.set(nid, deadline=date.Date())
+ if commit: self.db.commit()
+ self.assertNotEqual(self.db.issue.get(nid, "deadline"), None)
+ self.db.issue.set(nid, deadline=None)
+ if commit: self.db.commit()
+ self.assertEqual(self.db.issue.get(nid, "deadline"), None)
+
+ def testIntervalChange(self):
+ for commit in (0,1):
+ nid = self.db.issue.create(title="spam", status='1')
+ if commit: self.db.commit()
+ a = self.db.issue.get(nid, "foo")
+ i = date.Interval('-1d')
+ self.db.issue.set(nid, foo=i)
+ if commit: self.db.commit()
+ self.assertNotEqual(self.db.issue.get(nid, "foo"), a)
+ self.assertEqual(i, self.db.issue.get(nid, "foo"))
+ j = date.Interval('1y')
+ self.db.issue.set(nid, foo=j)
+ if commit: self.db.commit()
+ self.assertNotEqual(self.db.issue.get(nid, "foo"), i)
+ self.assertEqual(j, self.db.issue.get(nid, "foo"))
+
+ def testIntervalUnset(self):
+ for commit in (0,1):
+ nid = self.db.issue.create(title="spam", status='1')
+ self.db.issue.set(nid, foo=date.Interval('-1d'))
+ if commit: self.db.commit()
+ self.assertNotEqual(self.db.issue.get(nid, "foo"), None)
+ self.db.issue.set(nid, foo=None)
+ if commit: self.db.commit()
+ self.assertEqual(self.db.issue.get(nid, "foo"), None)
+
+ def testBooleanChange(self):
+ userid = self.db.user.create(username='foo', assignable=1)
+ self.assertEqual(1, self.db.user.get(userid, 'assignable'))
+ self.db.user.set(userid, assignable=0)
+ self.assertEqual(self.db.user.get(userid, 'assignable'), 0)
+
+ def testBooleanUnset(self):
+ nid = self.db.user.create(username='foo', assignable=1)
+ self.db.user.set(nid, assignable=None)
+ self.assertEqual(self.db.user.get(nid, "assignable"), None)
+
+ def testNumberChange(self):
+ nid = self.db.user.create(username='foo', age=1)
+ self.assertEqual(1, self.db.user.get(nid, 'age'))
+ self.db.user.set(nid, age=3)
+ self.assertNotEqual(self.db.user.get(nid, 'age'), 1)
+ self.db.user.set(nid, age=1.0)
+ self.assertEqual(self.db.user.get(nid, 'age'), 1)
+ self.db.user.set(nid, age=0)
+ self.assertEqual(self.db.user.get(nid, 'age'), 0)
+
+ nid = self.db.user.create(username='bar', age=0)
+ self.assertEqual(self.db.user.get(nid, 'age'), 0)
+
+ def testNumberUnset(self):
+ nid = self.db.user.create(username='foo', age=1)
+ self.db.user.set(nid, age=None)
+ self.assertEqual(self.db.user.get(nid, "age"), None)
+
+ def testKeyValue(self):
+ newid = self.db.user.create(username="spam")
+ self.assertEqual(self.db.user.lookup('spam'), newid)
+ self.db.commit()
+ self.assertEqual(self.db.user.lookup('spam'), newid)
+ self.db.user.retire(newid)
+ self.assertRaises(KeyError, self.db.user.lookup, 'spam')
+
+ # use the key again now that the old is retired
+ newid2 = self.db.user.create(username="spam")
+ self.assertNotEqual(newid, newid2)
+ # try to restore old node. this shouldn't succeed!
+ self.assertRaises(KeyError, self.db.user.restore, newid)
+
+ def testRetire(self):
+ self.db.issue.create(title="spam", status='1')
+ b = self.db.status.get('1', 'name')
+ a = self.db.status.list()
+ self.db.status.retire('1')
+ # make sure the list is different
+ self.assertNotEqual(a, self.db.status.list())
+ # can still access the node if necessary
+ self.assertEqual(self.db.status.get('1', 'name'), b)
+ self.db.commit()
+ self.assertEqual(self.db.status.get('1', 'name'), b)
+ self.assertNotEqual(a, self.db.status.list())
+ # try to restore retired node
+ self.db.status.restore('1')
+
+ def testCacheCreateSet(self):
+ self.db.issue.create(title="spam", status='1')
+ a = self.db.issue.get('1', 'title')
+ self.assertEqual(a, 'spam')
+ self.db.issue.set('1', title='ham')
+ b = self.db.issue.get('1', 'title')
+ self.assertEqual(b, 'ham')
+
+ def testSerialisation(self):
+ nid = self.db.issue.create(title="spam", status='1',
+ deadline=date.Date(), foo=date.Interval('-1d'))
+ self.db.commit()
+ assert isinstance(self.db.issue.get(nid, 'deadline'), date.Date)
+ assert isinstance(self.db.issue.get(nid, 'foo'), date.Interval)
+ uid = self.db.user.create(username="fozzy",
+ password=password.Password('t. bear'))
+ self.db.commit()
+ assert isinstance(self.db.user.get(uid, 'password'), password.Password)
+
+ def testTransactions(self):
+ # remember the number of items we started
+ num_issues = len(self.db.issue.list())
+ num_files = self.db.numfiles()
+ self.db.issue.create(title="don't commit me!", status='1')
+ self.assertNotEqual(num_issues, len(self.db.issue.list()))
+ self.db.rollback()
+ self.assertEqual(num_issues, len(self.db.issue.list()))
+ self.db.issue.create(title="please commit me!", status='1')
+ self.assertNotEqual(num_issues, len(self.db.issue.list()))
+ self.db.commit()
+ self.assertNotEqual(num_issues, len(self.db.issue.list()))
+ self.db.rollback()
+ self.assertNotEqual(num_issues, len(self.db.issue.list()))
+ self.db.file.create(name="test", type="text/plain", content="hi")
+ self.db.rollback()
+ self.assertEqual(num_files, self.db.numfiles())
+ for i in range(10):
+ self.db.file.create(name="test", type="text/plain",
+ content="hi %d"%(i))
+ self.db.commit()
+ num_files2 = self.db.numfiles()
+ self.assertNotEqual(num_files, num_files2)
+ self.db.file.create(name="test", type="text/plain", content="hi")
+ self.db.rollback()
+ self.assertNotEqual(num_files, self.db.numfiles())
+ self.assertEqual(num_files2, self.db.numfiles())
+
+ # rollback / cache interaction
+ name1 = self.db.user.get('1', 'username')
+ self.db.user.set('1', username = name1+name1)
+ # get the prop so the info's forced into the cache (if there is one)
+ self.db.user.get('1', 'username')
+ self.db.rollback()
+ name2 = self.db.user.get('1', 'username')
+ self.assertEqual(name1, name2)
+
+ def testDestroyNoJournalling(self):
+ self.innerTestDestroy(klass=self.db.session)
+
+ def testDestroyJournalling(self):
+ self.innerTestDestroy(klass=self.db.issue)
+
+ def innerTestDestroy(self, klass):
+ newid = klass.create(title='Mr Friendly')
+ n = len(klass.list())
+ self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
+ klass.destroy(newid)
+ self.assertRaises(IndexError, klass.get, newid, 'title')
+ self.assertNotEqual(len(klass.list()), n)
+ if klass.do_journal:
+ self.assertRaises(IndexError, klass.history, newid)
+
+ # now with a commit
+ newid = klass.create(title='Mr Friendly')
+ n = len(klass.list())
+ self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
+ self.db.commit()
+ klass.destroy(newid)
+ self.assertRaises(IndexError, klass.get, newid, 'title')
+ self.db.commit()
+ self.assertRaises(IndexError, klass.get, newid, 'title')
+ self.assertNotEqual(len(klass.list()), n)
+ if klass.do_journal:
+ self.assertRaises(IndexError, klass.history, newid)
+
+ # now with a rollback
+ newid = klass.create(title='Mr Friendly')
+ n = len(klass.list())
+ self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
+ self.db.commit()
+ klass.destroy(newid)
+ self.assertNotEqual(len(klass.list()), n)
+ self.assertRaises(IndexError, klass.get, newid, 'title')
+ self.db.rollback()
+ self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
+ self.assertEqual(len(klass.list()), n)
+ if klass.do_journal:
+ self.assertNotEqual(klass.history(newid), [])
+
+ def testExceptions(self):
+ # this tests the exceptions that should be raised
+ ar = self.assertRaises
+
+ #
+ # class create
+ #
+ # string property
+ ar(TypeError, self.db.status.create, name=1)
+ # invalid property name
+ ar(KeyError, self.db.status.create, foo='foo')
+ # key name clash
+ ar(ValueError, self.db.status.create, name='unread')
+ # invalid link index
+ ar(IndexError, self.db.issue.create, title='foo', status='bar')
+ # invalid link value
+ ar(ValueError, self.db.issue.create, title='foo', status=1)
+ # invalid multilink type
+ ar(TypeError, self.db.issue.create, title='foo', status='1',
+ nosy='hello')
+ # invalid multilink index type
+ ar(ValueError, self.db.issue.create, title='foo', status='1',
+ nosy=[1])
+ # invalid multilink index
+ ar(IndexError, self.db.issue.create, title='foo', status='1',
+ nosy=['10'])
+
+ #
+ # key property
+ #
+ # key must be a String
+ ar(TypeError, self.db.file.setkey, 'fooz')
+ # key must exist
+ ar(KeyError, self.db.file.setkey, 'fubar')
+
+ #
+ # class get
+ #
+ # invalid node id
+ ar(IndexError, self.db.issue.get, '99', 'title')
+ # invalid property name
+ ar(KeyError, self.db.status.get, '2', 'foo')
+
+ #
+ # class set
+ #
+ # invalid node id
+ ar(IndexError, self.db.issue.set, '99', title='foo')
+ # invalid property name
+ ar(KeyError, self.db.status.set, '1', foo='foo')
+ # string property
+ ar(TypeError, self.db.status.set, '1', name=1)
+ # key name clash
+ ar(ValueError, self.db.status.set, '2', name='unread')
+ # set up a valid issue for me to work on
+ id = self.db.issue.create(title="spam", status='1')
+ # invalid link index
+ ar(IndexError, self.db.issue.set, id, title='foo', status='bar')
+ # invalid link value
+ ar(ValueError, self.db.issue.set, id, title='foo', status=1)
+ # invalid multilink type
+ ar(TypeError, self.db.issue.set, id, title='foo', status='1',
+ nosy='hello')
+ # invalid multilink index type
+ ar(ValueError, self.db.issue.set, id, title='foo', status='1',
+ nosy=[1])
+ # invalid multilink index
+ ar(IndexError, self.db.issue.set, id, title='foo', status='1',
+ nosy=['10'])
+ # NOTE: the following increment the username to avoid problems
+ # within metakit's backend (it creates the node, and then sets the
+ # info, so the create (and by a fluke the username set) go through
+ # before the age/assignable/etc. set, which raises the exception)
+ # invalid number value
+ ar(TypeError, self.db.user.create, username='foo', age='a')
+ # invalid boolean value
+ ar(TypeError, self.db.user.create, username='foo2', assignable='true')
+ nid = self.db.user.create(username='foo3')
+ # invalid number value
+ ar(TypeError, self.db.user.set, nid, age='a')
+ # invalid boolean value
+ ar(TypeError, self.db.user.set, nid, assignable='true')
+
+ def testJournals(self):
+ self.db.user.create(username="mary")
+ self.db.user.create(username="pete")
+ self.db.issue.create(title="spam", status='1')
+ self.db.commit()
+
+ # journal entry for issue create
+ journal = self.db.getjournal('issue', '1')
+ self.assertEqual(1, len(journal))
+ (nodeid, date_stamp, journaltag, action, params) = journal[0]
+ self.assertEqual(nodeid, '1')
+ self.assertEqual(journaltag, self.db.user.lookup('admin'))
+ self.assertEqual(action, 'create')
+ keys = params.keys()
+ keys.sort()
+ self.assertEqual(keys, [])
+
+ # journal entry for link
+ journal = self.db.getjournal('user', '1')
+ self.assertEqual(1, len(journal))
+ self.db.issue.set('1', assignedto='1')
+ self.db.commit()
+ journal = self.db.getjournal('user', '1')
+ self.assertEqual(2, len(journal))
+ (nodeid, date_stamp, journaltag, action, params) = journal[1]
+ self.assertEqual('1', nodeid)
+ self.assertEqual('1', journaltag)
+ self.assertEqual('link', action)
+ self.assertEqual(('issue', '1', 'assignedto'), params)
+
+ # journal entry for unlink
+ self.db.issue.set('1', assignedto='2')
+ self.db.commit()
+ journal = self.db.getjournal('user', '1')
+ self.assertEqual(3, len(journal))
+ (nodeid, date_stamp, journaltag, action, params) = journal[2]
+ self.assertEqual('1', nodeid)
+ self.assertEqual('1', journaltag)
+ self.assertEqual('unlink', action)
+ self.assertEqual(('issue', '1', 'assignedto'), params)
+
+ # test disabling journalling
+ # ... get the last entry
+ time.sleep(1)
+ entry = self.db.getjournal('issue', '1')[-1]
+ (x, date_stamp, x, x, x) = entry
+ self.db.issue.disableJournalling()
+ self.db.issue.set('1', title='hello world')
+ self.db.commit()
+ entry = self.db.getjournal('issue', '1')[-1]
+ (x, date_stamp2, x, x, x) = entry
+ # see if the change was journalled when it shouldn't have been
+ self.assertEqual(date_stamp, date_stamp2)
+ time.sleep(1)
+ self.db.issue.enableJournalling()
+ self.db.issue.set('1', title='hello world 2')
+ self.db.commit()
+ entry = self.db.getjournal('issue', '1')[-1]
+ (x, date_stamp2, x, x, x) = entry
+ # see if the change was journalled
+ self.assertNotEqual(date_stamp, date_stamp2)
+
+ def testJournalPreCommit(self):
+ id = self.db.user.create(username="mary")
+ self.assertEqual(len(self.db.getjournal('user', id)), 1)
+ self.db.commit()
+
+ def testPack(self):
+ id = self.db.issue.create(title="spam", status='1')
+ self.db.commit()
+ self.db.issue.set(id, status='2')
+ self.db.commit()
+
+ # sleep for at least a second, then get a date to pack at
+ time.sleep(1)
+ pack_before = date.Date('.')
+
+ # wait another second and add one more entry
+ time.sleep(1)
+ self.db.issue.set(id, status='3')
+ self.db.commit()
+ jlen = len(self.db.getjournal('issue', id))
+
+ # pack
+ self.db.pack(pack_before)
+
+ # we should have the create and last set entries now
+ self.assertEqual(jlen-1, len(self.db.getjournal('issue', id)))
+
+ def testSearching(self):
+ self.db.file.create(content='hello', type="text/plain")
+ self.db.file.create(content='world', type="text/frozz",
+ comment='blah blah')
+ self.db.issue.create(files=['1', '2'], title="flebble plop")
+ self.db.issue.create(title="flebble frooz")
+ self.db.commit()
+ self.assertEquals(self.db.indexer.search(['hello'], self.db.issue),
+ {'1': {'files': ['1']}})
+ self.assertEquals(self.db.indexer.search(['world'], self.db.issue), {})
+ self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue),
+ {'2': {}})
+ self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
+ {'2': {}, '1': {}})
+
+ def testReindexing(self):
+ self.db.issue.create(title="frooz")
+ self.db.commit()
+ self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue),
+ {'1': {}})
+ self.db.issue.set('1', title="dooble")
+ self.db.commit()
+ self.assertEquals(self.db.indexer.search(['dooble'], self.db.issue),
+ {'1': {}})
+ self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue), {})
+
+ def testForcedReindexing(self):
+ self.db.issue.create(title="flebble frooz")
+ self.db.commit()
+ self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
+ {'1': {}})
+ self.db.indexer.quiet = 1
+ self.db.indexer.force_reindex()
+ self.db.post_init()
+ self.db.indexer.quiet = 9
+ self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
+ {'1': {}})
+
+ #
+ # searching tests follow
+ #
+ def testFind(self):
+ self.db.user.create(username='test')
+ ids = []
+ ids.append(self.db.issue.create(status="1", nosy=['1']))
+ oddid = self.db.issue.create(status="2", nosy=['2'], assignedto='2')
+ ids.append(self.db.issue.create(status="1", nosy=['1','2']))
+ self.db.issue.create(status="3", nosy=['1'], assignedto='1')
+ ids.sort()
+
+ # should match first and third
+ got = self.db.issue.find(status='1')
+ got.sort()
+ self.assertEqual(got, ids)
+
+ # none
+ self.assertEqual(self.db.issue.find(status='4'), [])
+
+ # should match first and third
+ got = self.db.issue.find(assignedto=None)
+ got.sort()
+ self.assertEqual(got, ids)
+
+ # should match first three
+ got = self.db.issue.find(status='1', nosy='2')
+ got.sort()
+ ids.append(oddid)
+ ids.sort()
+ self.assertEqual(got, ids)
+
+ # none
+ self.assertEqual(self.db.issue.find(status='4', nosy='3'), [])
+
+ def testStringFind(self):
+ ids = []
+ ids.append(self.db.issue.create(title="spam"))
+ self.db.issue.create(title="not spam")
+ ids.append(self.db.issue.create(title="spam"))
+ ids.sort()
+ got = self.db.issue.stringFind(title='spam')
+ got.sort()
+ self.assertEqual(got, ids)
+ self.assertEqual(self.db.issue.stringFind(title='fubar'), [])
+
+ def filteringSetup(self):
+ for user in (
+ {'username': 'bleep'},
+ {'username': 'blop'},
+ {'username': 'blorp'}):
+ self.db.user.create(**user)
+ iss = self.db.issue
+ for issue in (
+ {'title': 'issue one', 'status': '2',
+ 'foo': date.Interval('1:10'),
+ 'deadline': date.Date('2003-01-01.00:00')},
+ {'title': 'issue two', 'status': '1',
+ 'foo': date.Interval('1d'),
+ 'deadline': date.Date('2003-02-16.22:50')},
+ {'title': 'issue three', 'status': '1',
+ 'nosy': ['1','2'], 'deadline': date.Date('2003-02-18')},
+ {'title': 'non four', 'status': '3',
+ 'foo': date.Interval('0:10'),
+ 'nosy': ['1'], 'deadline': date.Date('2004-03-08')}):
+ self.db.issue.create(**issue)
+ self.db.commit()
+ return self.assertEqual, self.db.issue.filter
+
+ def testFilteringID(self):
+ ae, filt = self.filteringSetup()
+ ae(filt(None, {'id': '1'}, ('+','id'), (None,None)), ['1'])
+
+ def testFilteringString(self):
+ ae, filt = self.filteringSetup()
+ ae(filt(None, {'title': ['one']}, ('+','id'), (None,None)), ['1'])
+ ae(filt(None, {'title': ['issue']}, ('+','id'), (None,None)),
+ ['1','2','3'])
+ ae(filt(None, {'title': ['one', 'two']}, ('+','id'), (None,None)),
+ ['1', '2'])
+
+ def testFilteringLink(self):
+ ae, filt = self.filteringSetup()
+ ae(filt(None, {'status': '1'}, ('+','id'), (None,None)), ['2','3'])
+
+ def testFilteringRetired(self):
+ ae, filt = self.filteringSetup()
+ self.db.issue.retire('2')
+ ae(filt(None, {'status': '1'}, ('+','id'), (None,None)), ['3'])
+
+ def testFilteringMultilink(self):
+ ae, filt = self.filteringSetup()
+ ae(filt(None, {'nosy': '2'}, ('+','id'), (None,None)), ['3'])
+ ae(filt(None, {'nosy': '-1'}, ('+','id'), (None,None)), ['1', '2'])
+
+ def testFilteringMany(self):
+ ae, filt = self.filteringSetup()
+ ae(filt(None, {'nosy': '2', 'status': '1'}, ('+','id'), (None,None)),
+ ['3'])
+
+ def testFilteringRange(self):
+ ae, filt = self.filteringSetup()
+ # Date ranges
+ ae(filt(None, {'deadline': 'from 2003-02-10 to 2003-02-23'}), ['2','3'])
+ ae(filt(None, {'deadline': '2003-02-10; 2003-02-23'}), ['2','3'])
+ ae(filt(None, {'deadline': '; 2003-02-16'}), ['1'])
+ # Lets assume people won't invent a time machine, otherwise this test
+ # may fail :)
+ ae(filt(None, {'deadline': 'from 2003-02-16'}), ['2', '3', '4'])
+ ae(filt(None, {'deadline': '2003-02-16;'}), ['2', '3', '4'])
+ # year and month granularity
+ ae(filt(None, {'deadline': '2002'}), [])
+ ae(filt(None, {'deadline': '2003'}), ['1', '2', '3'])
+ ae(filt(None, {'deadline': '2004'}), ['4'])
+ ae(filt(None, {'deadline': '2003-02'}), ['2', '3'])
+ ae(filt(None, {'deadline': '2003-03'}), [])
+ ae(filt(None, {'deadline': '2003-02-16'}), ['2'])
+ ae(filt(None, {'deadline': '2003-02-17'}), [])
+ # Interval ranges
+ ae(filt(None, {'foo': 'from 0:50 to 2:00'}), ['1'])
+ ae(filt(None, {'foo': 'from 0:50 to 1d 2:00'}), ['1', '2'])
+ ae(filt(None, {'foo': 'from 5:50'}), ['2'])
+ ae(filt(None, {'foo': 'to 0:05'}), [])
+
+ def testFilteringIntervalSort(self):
+ ae, filt = self.filteringSetup()
+ # ascending should sort None, 1:10, 1d
+ ae(filt(None, {}, ('+','foo'), (None,None)), ['3', '4', '1', '2'])
+ # descending should sort 1d, 1:10, None
+ ae(filt(None, {}, ('-','foo'), (None,None)), ['2', '1', '4', '3'])
+
+# XXX add sorting tests for other types
+# XXX test auditors and reactors
+
+
+class ROTest(MyTestCase):
+ def setUp(self):
+ # remove previous test, ignore errors
+ if os.path.exists(config.DATABASE):
+ shutil.rmtree(config.DATABASE)
+ os.makedirs(config.DATABASE + '/files')
+ self.db = self.module.Database(config, 'admin')
+ setupSchema(self.db, 1, self.module)
+ self.db.close()
+
+ self.db = self.module.Database(config)
+ setupSchema(self.db, 0, self.module)
+
+ def testExceptions(self):
+ # this tests the exceptions that should be raised
+ ar = self.assertRaises
+
+ # this tests the exceptions that should be raised
+ ar(DatabaseError, self.db.status.create, name="foo")
+ ar(DatabaseError, self.db.status.set, '1', name="foo")
+ ar(DatabaseError, self.db.status.retire, '1')
+
+
+class SchemaTest(MyTestCase):
+ def setUp(self):
+ # remove previous test, ignore errors
+ if os.path.exists(config.DATABASE):
+ shutil.rmtree(config.DATABASE)
+ os.makedirs(config.DATABASE + '/files')
+
+ def init_a(self):
+ self.db = self.module.Database(config, 'admin')
+ a = self.module.Class(self.db, "a", name=String())
+ a.setkey("name")
+ self.db.post_init()
+
+ def init_ab(self):
+ self.db = self.module.Database(config, 'admin')
+ a = self.module.Class(self.db, "a", name=String())
+ a.setkey("name")
+ b = self.module.Class(self.db, "b", name=String())
+ b.setkey("name")
+ self.db.post_init()
+
+ def test_addNewClass(self):
+ self.init_a()
+ aid = self.db.a.create(name='apple')
+ self.db.commit(); self.db.close()
+
+ # add a new class to the schema and check creation of new items
+ # (and existence of old ones)
+ self.init_ab()
+ bid = self.db.b.create(name='bear')
+ self.assertEqual(self.db.a.get(aid, 'name'), 'apple')
+ self.db.commit()
+ self.db.close()
+
+ # now check we can recall the added class' items
+ self.init_ab()
+ self.assertEqual(self.db.a.get(aid, 'name'), 'apple')
+ self.assertEqual(self.db.a.lookup('apple'), aid)
+ self.assertEqual(self.db.b.get(bid, 'name'), 'bear')
+ self.assertEqual(self.db.b.lookup('bear'), bid)
+
+ def init_amod(self):
+ self.db = self.module.Database(config, 'admin')
+ a = self.module.Class(self.db, "a", name=String(), fooz=String())
+ a.setkey("name")
+ b = self.module.Class(self.db, "b", name=String())
+ b.setkey("name")
+ self.db.post_init()
+
+ def test_modifyClass(self):
+ self.init_ab()
+
+ # add item to user and issue class
+ aid = self.db.a.create(name='apple')
+ bid = self.db.b.create(name='bear')
+ self.db.commit(); self.db.close()
+
+ # modify "a" schema
+ self.init_amod()
+ self.assertEqual(self.db.a.get(aid, 'name'), 'apple')
+ self.assertEqual(self.db.a.get(aid, 'fooz'), None)
+ self.assertEqual(self.db.b.get(aid, 'name'), 'bear')
+ aid2 = self.db.a.create(name='aardvark', fooz='booz')
+ self.db.commit(); self.db.close()
+
+ # test
+ self.init_amod()
+ self.assertEqual(self.db.a.get(aid, 'name'), 'apple')
+ self.assertEqual(self.db.a.get(aid, 'fooz'), None)
+ self.assertEqual(self.db.b.get(aid, 'name'), 'bear')
+ self.assertEqual(self.db.a.get(aid2, 'name'), 'aardvark')
+ self.assertEqual(self.db.a.get(aid2, 'fooz'), 'booz')
+
+ def init_amodkey(self):
+ self.db = self.module.Database(config, 'admin')
+ a = self.module.Class(self.db, "a", name=String(), fooz=String())
+ a.setkey("fooz")
+ b = self.module.Class(self.db, "b", name=String())
+ b.setkey("name")
+ self.db.post_init()
+
+ def test_changeClassKey(self):
+ self.init_amod()
+ aid = self.db.a.create(name='apple')
+ self.assertEqual(self.db.a.lookup('apple'), aid)
+ self.db.commit(); self.db.close()
+
+ # change the key to fooz
+ self.init_amodkey()
+ self.assertEqual(self.db.a.get(aid, 'name'), 'apple')
+ self.assertEqual(self.db.a.get(aid, 'fooz'), None)
+ self.assertRaises(KeyError, self.db.a.lookup, 'apple')
+ aid2 = self.db.a.create(name='aardvark', fooz='booz')
+ self.db.commit(); self.db.close()
+
+ # check
+ self.init_amodkey()
+ self.assertEqual(self.db.a.lookup('booz'), aid2)
+
+ def init_ml(self):
+ self.db = self.module.Database(config, 'admin')
+ a = self.module.Class(self.db, "a", name=String())
+ a.setkey('name')
+ b = self.module.Class(self.db, "b", name=String(),
+ fooz=Multilink('a'))
+ b.setkey("name")
+ self.db.post_init()
+
+ def test_makeNewMultilink(self):
+ self.init_a()
+ aid = self.db.a.create(name='apple')
+ self.assertEqual(self.db.a.lookup('apple'), aid)
+ self.db.commit(); self.db.close()
+
+ # add a multilink prop
+ self.init_ml()
+ bid = self.db.b.create(name='bear', fooz=[aid])
+ self.assertEqual(self.db.b.find(fooz=aid), [bid])
+ self.assertEqual(self.db.a.lookup('apple'), aid)
+ self.db.commit(); self.db.close()
+
+ # check
+ self.init_ml()
+ self.assertEqual(self.db.b.find(fooz=aid), [bid])
+ self.assertEqual(self.db.a.lookup('apple'), aid)
+ self.assertEqual(self.db.b.lookup('bear'), bid)
+
+ def test_removeMultilink(self):
+ # add a multilink prop
+ self.init_ml()
+ aid = self.db.a.create(name='apple')
+ bid = self.db.b.create(name='bear', fooz=[aid])
+ self.assertEqual(self.db.b.find(fooz=aid), [bid])
+ self.assertEqual(self.db.a.lookup('apple'), aid)
+ self.assertEqual(self.db.b.lookup('bear'), bid)
+ self.db.commit(); self.db.close()
+
+ # remove the multilink
+ self.init_ab()
+ self.assertEqual(self.db.a.lookup('apple'), aid)
+ self.assertEqual(self.db.b.lookup('bear'), bid)
+
+ def test_removeClass(self):
+ self.init_ml()
+ aid = self.db.a.create(name='apple')
+ bid = self.db.b.create(name='bear', fooz=[aid])
+ self.db.commit(); self.db.close()
+
+ # drop the b class
+ self.init_a()
+ self.assertEqual(self.db.a.get(aid, 'name'), 'apple')
+ self.assertEqual(self.db.a.lookup('apple'), aid)
+ self.db.commit(); self.db.close()
+
+ # now check we can recall the added class' items
+ self.init_a()
+ self.assertEqual(self.db.a.get(aid, 'name'), 'apple')
+ self.assertEqual(self.db.a.lookup('apple'), aid)
+
+
+class ClassicInitTest(unittest.TestCase):
+ count = 0
+ db = None
+
+ def setUp(self):
+ ClassicInitTest.count = ClassicInitTest.count + 1
+ self.dirname = '_test_init_%s'%self.count
+ try:
+ shutil.rmtree(self.dirname)
+ except OSError, error:
+ if error.errno not in (errno.ENOENT, errno.ESRCH): raise
+
+ def testCreation(self):
+ ae = self.assertEqual
+
+ # create the instance
+ init.install(self.dirname, 'templates/classic')
+ init.write_select_db(self.dirname, self.backend)
+ init.initialise(self.dirname, 'sekrit')
+
+ # check we can load the package
+ instance = imp.load_package(self.dirname, self.dirname)
+
+ # and open the database
+ db = self.db = instance.open()
+
+ # check the basics of the schema and initial data set
+ l = db.priority.list()
+ ae(l, ['1', '2', '3', '4', '5'])
+ l = db.status.list()
+ ae(l, ['1', '2', '3', '4', '5', '6', '7', '8'])
+ l = db.keyword.list()
+ ae(l, [])
+ l = db.user.list()
+ ae(l, ['1', '2'])
+ l = db.msg.list()
+ ae(l, [])
+ l = db.file.list()
+ ae(l, [])
+ l = db.issue.list()
+ ae(l, [])
+
+ def tearDown(self):
+ if self.db is not None:
+ self.db.close()
+ try:
+ shutil.rmtree(self.dirname)
+ except OSError, error:
+ if error.errno not in (errno.ENOENT, errno.ESRCH): raise
+
diff --git a/test/test_anydbm.py b/test/test_anydbm.py
--- /dev/null
+++ b/test/test_anydbm.py
@@ -0,0 +1,53 @@
+#
+# Copyright (c) 2001 Bizar Software Pty Ltd (http://www.bizarsoftware.com.au/)
+# This module is free software, and you may redistribute it and/or modify
+# under the same terms as Python, so long as this copyright message and
+# disclaimer are retained in their original form.
+#
+# IN NO EVENT SHALL BIZAR SOFTWARE PTY LTD BE LIABLE TO ANY PARTY FOR
+# DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING
+# OUT OF THE USE OF THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# BIZAR SOFTWARE PTY LTD SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING,
+# BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+# FOR A PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS"
+# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
+# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
+#
+# $Id: test_anydbm.py,v 1.1 2003-10-25 22:53:26 richard Exp $
+
+import unittest, os, shutil, time
+
+from db_test_base import DBTest, ROTest, SchemaTest, ClassicInitTest
+
+class anydbmOpener:
+ from roundup.backends import anydbm as module
+
+class anydbmDBTest(anydbmOpener, DBTest):
+ pass
+
+class anydbmROTest(anydbmOpener, ROTest):
+ pass
+
+class anydbmSchemaTest(anydbmOpener, SchemaTest):
+ pass
+
+class anydbmClassicInitTest(ClassicInitTest):
+ backend = 'anydbm'
+
+def test_suite():
+ suite = unittest.TestSuite()
+ print 'Including anydbm tests'
+ suite.addTest(unittest.makeSuite(anydbmDBTest))
+ suite.addTest(unittest.makeSuite(anydbmROTest))
+ suite.addTest(unittest.makeSuite(anydbmSchemaTest))
+ suite.addTest(unittest.makeSuite(anydbmClassicInitTest))
+ return suite
+
+if __name__ == '__main__':
+ runner = unittest.TextTestRunner()
+ unittest.main(testRunner=runner)
+
+
+# vim: set filetype=python ts=4 sw=4 et si
diff --git a/test/test_bsddb.py b/test/test_bsddb.py
--- /dev/null
+++ b/test/test_bsddb.py
@@ -0,0 +1,57 @@
+#
+# Copyright (c) 2001 Bizar Software Pty Ltd (http://www.bizarsoftware.com.au/)
+# This module is free software, and you may redistribute it and/or modify
+# under the same terms as Python, so long as this copyright message and
+# disclaimer are retained in their original form.
+#
+# IN NO EVENT SHALL BIZAR SOFTWARE PTY LTD BE LIABLE TO ANY PARTY FOR
+# DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING
+# OUT OF THE USE OF THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# BIZAR SOFTWARE PTY LTD SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING,
+# BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+# FOR A PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS"
+# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
+# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
+#
+# $Id: test_bsddb.py,v 1.1 2003-10-25 22:53:26 richard Exp $
+
+import unittest, os, shutil, time
+
+from db_test_base import DBTest, ROTest, SchemaTest, \
+ ClassicInitTest
+from roundup import backends
+
+class bsddbOpener:
+ if hasattr(backends, 'bsddb'):
+ from roundup.backends import bsddb as module
+
+class bsddbDBTest(bsddbOpener, DBTest):
+ pass
+
+class bsddbROTest(bsddbOpener, ROTest):
+ pass
+
+class bsddbSchemaTest(bsddbOpener, SchemaTest):
+ pass
+
+class bsddbClassicInitTest(ClassicInitTest):
+ backend = 'bsddb'
+
+def test_suite():
+ suite = unittest.TestSuite()
+ if not hasattr(backends, 'bsddb'):
+ print 'Skipping bsddb tests'
+ return suite
+ print 'Including bsddb tests'
+ suite.addTest(unittest.makeSuite(bsddbDBTest))
+ suite.addTest(unittest.makeSuite(bsddbROTest))
+ suite.addTest(unittest.makeSuite(bsddbSchemaTest))
+ suite.addTest(unittest.makeSuite(bsddbClassicInitTest))
+ return suite
+
+if __name__ == '__main__':
+ runner = unittest.TextTestRunner()
+ unittest.main(testRunner=runner)
+
diff --git a/test/test_bsddb3.py b/test/test_bsddb3.py
--- /dev/null
+++ b/test/test_bsddb3.py
@@ -0,0 +1,57 @@
+#
+# Copyright (c) 2001 Bizar Software Pty Ltd (http://www.bizarsoftware.com.au/)
+# This module is free software, and you may redistribute it and/or modify
+# under the same terms as Python, so long as this copyright message and
+# disclaimer are retained in their original form.
+#
+# IN NO EVENT SHALL BIZAR SOFTWARE PTY LTD BE LIABLE TO ANY PARTY FOR
+# DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING
+# OUT OF THE USE OF THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# BIZAR SOFTWARE PTY LTD SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING,
+# BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+# FOR A PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS"
+# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
+# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
+#
+# $Id: test_bsddb3.py,v 1.1 2003-10-25 22:53:26 richard Exp $
+
+import unittest, os, shutil, time
+
+from db_test_base import DBTest, ROTest, SchemaTest, \
+ ClassicInitTest
+from roundup import backends
+
+class bsddb3Opener:
+ if hasattr(backends, 'bsddb3'):
+ from roundup.backends import bsddb3 as module
+
+class bsddb3DBTest(bsddb3Opener, DBTest):
+ pass
+
+class bsddb3ROTest(bsddb3Opener, ROTest):
+ pass
+
+class bsddb3SchemaTest(bsddb3Opener, SchemaTest):
+ pass
+
+class bsddb3ClassicInitTest(ClassicInitTest):
+ backend = 'bsddb3'
+
+def test_suite():
+ suite = unittest.TestSuite()
+ if not hasattr(backends, 'bsddb3'):
+ print 'Skipping bsddb3 tests'
+ return suite
+ print 'Including bsddb3 tests'
+ suite.addTest(unittest.makeSuite(bsddb3DBTest))
+ suite.addTest(unittest.makeSuite(bsddb3ROTest))
+ suite.addTest(unittest.makeSuite(bsddb3SchemaTest))
+ suite.addTest(unittest.makeSuite(bsddb3ClassicInitTest))
+ return suite
+
+if __name__ == '__main__':
+ runner = unittest.TextTestRunner()
+ unittest.main(testRunner=runner)
+
diff --git a/test/test_cgi.py b/test/test_cgi.py
index 9e40feb8f1f802938db774d734ac67293adc54a6..e4f26bb5f6385a5e5e34e2be2b4462347a7790b5 100644 (file)
--- a/test/test_cgi.py
+++ b/test/test_cgi.py
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
-# $Id: test_cgi.py,v 1.20 2003-09-24 14:54:23 jlgijsbers Exp $
+# $Id: test_cgi.py,v 1.21 2003-10-25 22:53:26 richard Exp $
import unittest, os, shutil, errno, sys, difflib, cgi, re
except OSError, error:
if error.errno not in (errno.ENOENT, errno.ESRCH): raise
# create the instance
- shutil.copytree('_empty_instance', self.dirname)
+ init.install(self.dirname, 'templates/classic')
+ init.write_select_db(self.dirname, 'anydbm')
+ init.initialise(self.dirname, 'sekrit')
# check we can load the package
self.instance = instance.open(self.dirname)
'name': 'foo.txt', 'type': 'text/plain'}},
[('issue', None, 'files', [('file', '-1')])]))
-def suite():
- l = [
- unittest.makeSuite(FormTestCase),
- unittest.makeSuite(MessageTestCase),
- ]
- return unittest.TestSuite(l)
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(FormTestCase))
+ suite.addTest(unittest.makeSuite(MessageTestCase))
+ return suite
-def run():
+if __name__ == '__main__':
runner = unittest.TextTestRunner()
unittest.main(testRunner=runner)
-if __name__ == '__main__':
- run()
-
# vim: set filetype=python ts=4 sw=4 et si
diff --git a/test/test_dates.py b/test/test_dates.py
index 69c1f4e9263c1dcfc740fbc2a023ca02b5bd3ee5..b15abaff9239d6251117686f2a3b7f15e043adc8 100644 (file)
--- a/test/test_dates.py
+++ b/test/test_dates.py
# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#
-# $Id: test_dates.py,v 1.24 2003-04-22 20:53:54 kedder Exp $
+# $Id: test_dates.py,v 1.25 2003-10-25 22:53:26 richard Exp $
import unittest, time
ae(str(Interval('+1w', add_granularity=1)), '+ 14d')
ae(str(Interval('-2m 3w', add_granularity=1)), '- 2m 14d')
-def suite():
- return unittest.makeSuite(DateTestCase, 'test')
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(DateTestCase))
+ return suite
+if __name__ == '__main__':
+ runner = unittest.TextTestRunner()
+ unittest.main(testRunner=runner)
# vim: set filetype=python ts=4 sw=4 et si
diff --git a/test/test_indexer.py b/test/test_indexer.py
index b170f9a514676e0635c678ce3e361a64a67686ae..18e56f971e81690ae753f155a2d72f31a7000a50 100644 (file)
--- a/test/test_indexer.py
+++ b/test/test_indexer.py
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
-# $Id: test_indexer.py,v 1.2 2002-09-10 00:19:54 richard Exp $
+# $Id: test_indexer.py,v 1.3 2003-10-25 22:53:26 richard Exp $
import os, unittest, shutil
def tearDown(self):
shutil.rmtree('test-index')
-def suite():
- return unittest.makeSuite(IndexerTest)
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(IndexerTest))
+ return suite
+if __name__ == '__main__':
+ runner = unittest.TextTestRunner()
+ unittest.main(testRunner=runner)
# vim: set filetype=python ts=4 sw=4 et si
diff --git a/test/test_locking.py b/test/test_locking.py
index 40756c7f5931e94add86a48ac0b00c62e02ef124..b5945731fba242cd726ec92409f8a91e84528ed2 100644 (file)
--- a/test/test_locking.py
+++ b/test/test_locking.py
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
-# $Id: test_locking.py,v 1.3 2002-12-09 02:51:46 richard Exp $
+# $Id: test_locking.py,v 1.4 2003-10-25 22:53:26 richard Exp $
import os, unittest, tempfile
def tearDown(self):
os.remove(self.path)
-def suite():
- return unittest.makeSuite(LockingTest)
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(LockingTest))
+ return suite
+if __name__ == '__main__':
+ runner = unittest.TextTestRunner()
+ unittest.main(testRunner=runner)
# vim: set filetype=python ts=4 sw=4 et si
diff --git a/test/test_mailgw.py b/test/test_mailgw.py
index 5f0ca242234836978f1d1666d6c712dde4ea9c2e..ea5adde2dc93d70213e78737bc0895211c5595fc 100644 (file)
--- a/test/test_mailgw.py
+++ b/test/test_mailgw.py
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
-# $Id: test_mailgw.py,v 1.56 2003-10-25 12:02:36 jlgijsbers Exp $
+# $Id: test_mailgw.py,v 1.57 2003-10-25 22:53:26 richard Exp $
import unittest, tempfile, os, shutil, errno, imp, sys, difflib, rfc822
from cStringIO import StringIO
+if not os.environ.has_key('SENDMAILDEBUG'):
+ os.environ['SENDMAILDEBUG'] = 'mail-test.log'
+SENDMAILDEBUG = os.environ['SENDMAILDEBUG']
+
from roundup.mailgw import MailGW, Unauthorized, uidFromAddress, parseContent
from roundup import init, instance, rfc2822
-NEEDS_INSTANCE = 1
class Message(rfc822.Message):
"""String-based Message class with equivalence test."""
except OSError, error:
if error.errno not in (errno.ENOENT, errno.ESRCH): raise
# create the instance
- shutil.copytree('_empty_instance', self.dirname)
+ init.install(self.dirname, 'templates/classic')
+ init.write_select_db(self.dirname, 'anydbm')
+ init.initialise(self.dirname, 'sekrit')
# check we can load the package
self.instance = instance.open(self.dirname)
+
# and open the database
self.db = self.instance.open('admin')
self.db.user.create(username='Chef', address='chef@bork.bork.bork',
realname='John Doe')
def tearDown(self):
- if os.path.exists(os.environ['SENDMAILDEBUG']):
- os.remove(os.environ['SENDMAILDEBUG'])
+ if os.path.exists(SENDMAILDEBUG):
+ os.remove(SENDMAILDEBUG)
self.db.close()
try:
shutil.rmtree(self.dirname)
except OSError, error:
if error.errno not in (errno.ENOENT, errno.ESRCH): raise
+ def _get_mail(self):
+ f = open(SENDMAILDEBUG)
+ try:
+ return f.read()
+ finally:
+ f.close()
+
def testEmptyMessage(self):
message = StringIO('''Content-Type: text/plain;
charset="iso-8859-1"
handler = self.instance.MailGW(self.instance, self.db)
handler.trapExceptions = 0
nodeid = handler.main(message)
- if os.path.exists(os.environ['SENDMAILDEBUG']):
- error = open(os.environ['SENDMAILDEBUG']).read()
- self.assertEqual('no error', error)
+ assert not os.path.exists(SENDMAILDEBUG)
self.assertEqual(self.db.issue.get(nodeid, 'title'), 'Testing...')
def doNewIssue(self):
handler = self.instance.MailGW(self.instance, self.db)
handler.trapExceptions = 0
nodeid = handler.main(message)
- if os.path.exists(os.environ['SENDMAILDEBUG']):
- error = open(os.environ['SENDMAILDEBUG']).read()
- self.assertEqual('no error', error)
+ assert not os.path.exists(SENDMAILDEBUG)
l = self.db.issue.get(nodeid, 'nosy')
l.sort()
self.assertEqual(l, ['3', '4'])
handler = self.instance.MailGW(self.instance, self.db)
handler.trapExceptions = 0
nodeid = handler.main(message)
- if os.path.exists(os.environ['SENDMAILDEBUG']):
- error = open(os.environ['SENDMAILDEBUG']).read()
- self.assertEqual('no error', error)
+ assert not os.path.exists(SENDMAILDEBUG)
l = self.db.issue.get(nodeid, 'nosy')
l.sort()
self.assertEqual(l, ['3', '4'])
handler = self.instance.MailGW(self.instance, self.db)
handler.trapExceptions = 0
handler.main(message)
- if os.path.exists(os.environ['SENDMAILDEBUG']):
- error = open(os.environ['SENDMAILDEBUG']).read()
- self.assertEqual('no error', error)
+ assert not os.path.exists(SENDMAILDEBUG)
self.assertEqual(userlist, self.db.user.list(),
"user created when it shouldn't have been")
handler = self.instance.MailGW(self.instance, self.db)
handler.trapExceptions = 0
handler.main(message)
- if os.path.exists(os.environ['SENDMAILDEBUG']):
- error = open(os.environ['SENDMAILDEBUG']).read()
- self.assertEqual('no error', error)
+ assert not os.path.exists(SENDMAILDEBUG)
def testNewIssueAuthMsg(self):
message = StringIO('''Content-Type: text/plain;
self.db.config.MESSAGES_TO_AUTHOR = 'yes'
handler.main(message)
- self.compareMessages(open(os.environ['SENDMAILDEBUG']).read(),
+ self.compareMessages(self._get_mail(),
'''FROM: roundup-admin@your.tracker.email.domain.example
TO: chef@bork.bork.bork, mary@test, richard@test
Content-Type: text/plain; charset=utf-8
handler = self.instance.MailGW(self.instance, self.db)
handler.trapExceptions = 0
handler.main(message)
- self.compareMessages(open(os.environ['SENDMAILDEBUG']).read(),
+ self.compareMessages(self._get_mail(),
'''FROM: roundup-admin@your.tracker.email.domain.example
TO: chef@bork.bork.bork, richard@test
Content-Type: text/plain; charset=utf-8
l.sort()
self.assertEqual(l, ['3', '4', '5', '6'])
- self.compareMessages(open(os.environ['SENDMAILDEBUG']).read(),
+ self.compareMessages(self._get_mail(),
'''FROM: roundup-admin@your.tracker.email.domain.example
TO: chef@bork.bork.bork, john@test, mary@test
Content-Type: text/plain; charset=utf-8
handler.trapExceptions = 0
handler.main(message)
- self.compareMessages(open(os.environ['SENDMAILDEBUG']).read(),
+ self.compareMessages(self._get_mail(),
'''FROM: roundup-admin@your.tracker.email.domain.example
TO: chef@bork.bork.bork, john@test, mary@test
Content-Type: text/plain; charset=utf-8
handler.trapExceptions = 0
handler.main(message)
- self.compareMessages(open(os.environ['SENDMAILDEBUG']).read(),
+ self.compareMessages(self._get_mail(),
'''FROM: roundup-admin@your.tracker.email.domain.example
TO: chef@bork.bork.bork, richard@test
Content-Type: text/plain; charset=utf-8
handler.trapExceptions = 0
handler.main(message)
- self.compareMessages(open(os.environ['SENDMAILDEBUG']).read(),
+ self.compareMessages(self._get_mail(),
'''FROM: roundup-admin@your.tracker.email.domain.example
TO: chef@bork.bork.bork
Content-Type: text/plain; charset=utf-8
handler.trapExceptions = 0
handler.main(message)
- self.compareMessages(open(os.environ['SENDMAILDEBUG']).read(),
+ self.compareMessages(self._get_mail(),
'''FROM: roundup-admin@your.tracker.email.domain.example
TO: chef@bork.bork.bork, john@test, richard@test
Content-Type: text/plain; charset=utf-8
handler.trapExceptions = 0
handler.main(message)
- self.compareMessages(open(os.environ['SENDMAILDEBUG']).read(),
+ self.compareMessages(self._get_mail(),
'''FROM: roundup-admin@your.tracker.email.domain.example
TO: chef@bork.bork.bork, richard@test
Content-Type: text/plain; charset=utf-8
handler.trapExceptions = 0
handler.main(message)
- self.compareMessages(open(os.environ['SENDMAILDEBUG']).read(),
+ self.compareMessages(self._get_mail(),
'''FROM: roundup-admin@your.tracker.email.domain.example
TO: chef@bork.bork.bork
Content-Type: text/plain; charset=utf-8
self.assertEqual(l, ['3', '4', '5', '6'])
# should be no file created (ie. no message)
- assert not os.path.exists(os.environ['SENDMAILDEBUG'])
+ assert not os.path.exists(SENDMAILDEBUG)
def testNosyRemove(self):
self.doNewIssue()
self.assertEqual(l, ['3'])
# NO NOSY MESSAGE SHOULD BE SENT!
- self.assert_(not os.path.exists(os.environ['SENDMAILDEBUG']))
+ assert not os.path.exists(SENDMAILDEBUG)
def testNewUserAuthor(self):
# first without the permission
handler = self.instance.MailGW(self.instance, self.db)
handler.trapExceptions = 0
handler.main(message)
- self.compareMessages(open(os.environ['SENDMAILDEBUG']).read(),
+ self.compareMessages(self._get_mail(),
'''FROM: roundup-admin@your.tracker.email.domain.example
TO: chef@bork.bork.bork, richard@test
Content-Type: text/plain; charset=utf-8
handler = self.instance.MailGW(self.instance, self.db)
handler.trapExceptions = 0
handler.main(message)
- self.compareMessages(open(os.environ['SENDMAILDEBUG']).read(),
+ self.compareMessages(self._get_mail(),
'''FROM: roundup-admin@your.tracker.email.domain.example
TO: chef@bork.bork.bork, richard@test
Content-Type: text/plain; charset=utf-8
handler.trapExceptions = 0
handler.main(message)
- self.compareMessages(open(os.environ['SENDMAILDEBUG']).read(),
+ self.compareMessages(self._get_mail(),
'''FROM: roundup-admin@your.tracker.email.domain.example
TO: chef@bork.bork.bork
Content-Type: text/plain; charset=utf-8
self.db.user.lookup('johannes')
-def suite():
- l = [unittest.makeSuite(MailgwTestCase)]
- return unittest.TestSuite(l)
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(MailgwTestCase))
+ return suite
+if __name__ == '__main__':
+ runner = unittest.TextTestRunner()
+ unittest.main(testRunner=runner)
# vim: set filetype=python ts=4 sw=4 et si
diff --git a/test/test_mailsplit.py b/test/test_mailsplit.py
index a50e2d4c1254ac3f75d9ba0c3006051dbaddac77..9c960df4357255621095bb4ea79a3a214ac781b1 100644 (file)
--- a/test/test_mailsplit.py
+++ b/test/test_mailsplit.py
# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#
-# $Id: test_mailsplit.py,v 1.14 2003-10-25 12:02:37 jlgijsbers Exp $
+# $Id: test_mailsplit.py,v 1.15 2003-10-25 22:53:26 richard Exp $
import unittest, cStringIO
summary, content = parseContent(body, 1, 0)
self.assertEqual(body, content)
-def suite():
- return unittest.makeSuite(MailsplitTestCase, 'test')
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(MailsplitTestCase))
+ return suite
+if __name__ == '__main__':
+ runner = unittest.TextTestRunner()
+ unittest.main(testRunner=runner)
# vim: set filetype=python ts=4 sw=4 et si
diff --git a/test/test_metakit.py b/test/test_metakit.py
--- /dev/null
+++ b/test/test_metakit.py
@@ -0,0 +1,100 @@
+#
+# Copyright (c) 2001 Bizar Software Pty Ltd (http://www.bizarsoftware.com.au/)
+# This module is free software, and you may redistribute it and/or modify
+# under the same terms as Python, so long as this copyright message and
+# disclaimer are retained in their original form.
+#
+# IN NO EVENT SHALL BIZAR SOFTWARE PTY LTD BE LIABLE TO ANY PARTY FOR
+# DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING
+# OUT OF THE USE OF THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# BIZAR SOFTWARE PTY LTD SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING,
+# BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+# FOR A PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS"
+# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
+# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
+#
+# $Id: test_metakit.py,v 1.1 2003-10-25 22:53:26 richard Exp $
+
+import unittest, os, shutil, time, weakref
+
+from db_test_base import DBTest, ROTest, SchemaTest, \
+ ClassicInitTest
+
+from roundup import backends
+
+class metakitOpener:
+ if hasattr(backends, 'metakit'):
+ from roundup.backends import metakit as module
+ module._instances = weakref.WeakValueDictionary()
+
+class metakitDBTest(metakitOpener, DBTest):
+ def testTransactions(self):
+ # remember the number of items we started
+ num_issues = len(self.db.issue.list())
+ self.db.issue.create(title="don't commit me!", status='1')
+ self.assertNotEqual(num_issues, len(self.db.issue.list()))
+ self.db.rollback()
+ self.assertEqual(num_issues, len(self.db.issue.list()))
+ self.db.issue.create(title="please commit me!", status='1')
+ self.assertNotEqual(num_issues, len(self.db.issue.list()))
+ self.db.commit()
+ self.assertNotEqual(num_issues, len(self.db.issue.list()))
+ self.db.rollback()
+ self.assertNotEqual(num_issues, len(self.db.issue.list()))
+ self.db.file.create(name="test", type="text/plain", content="hi")
+ self.db.rollback()
+ num_files = len(self.db.file.list())
+ for i in range(10):
+ self.db.file.create(name="test", type="text/plain",
+ content="hi %d"%(i))
+ self.db.commit()
+ # TODO: would be good to be able to ensure the file is not on disk after
+ # a rollback...
+ num_files2 = len(self.db.file.list())
+ self.assertNotEqual(num_files, num_files2)
+ self.db.file.create(name="test", type="text/plain", content="hi")
+ num_rfiles = len(os.listdir(self.db.config.DATABASE + '/files/file/0'))
+ self.db.rollback()
+ num_rfiles2 = len(os.listdir(self.db.config.DATABASE + '/files/file/0'))
+ self.assertEqual(num_files2, len(self.db.file.list()))
+ self.assertEqual(num_rfiles2, num_rfiles-1)
+
+ def testBooleanUnset(self):
+ # XXX: metakit can't unset Booleans :(
+ nid = self.db.user.create(username='foo', assignable=1)
+ self.db.user.set(nid, assignable=None)
+ self.assertEqual(self.db.user.get(nid, "assignable"), 0)
+
+ def testNumberUnset(self):
+ # XXX: metakit can't unset Numbers :(
+ nid = self.db.user.create(username='foo', age=1)
+ self.db.user.set(nid, age=None)
+ self.assertEqual(self.db.user.get(nid, "age"), 0)
+
+class metakitROTest(metakitOpener, ROTest):
+ pass
+
+class metakitSchemaTest(metakitOpener, SchemaTest):
+ pass
+
+class metakitClassicInitTest(ClassicInitTest):
+ backend = 'metakit'
+
+def test_suite():
+ suite = unittest.TestSuite()
+ if not hasattr(backends, 'metakit'):
+ print 'Skipping metakit tests'
+ return suite
+ print 'Including metakit tests'
+ suite.addTest(unittest.makeSuite(metakitDBTest))
+ suite.addTest(unittest.makeSuite(metakitROTest))
+ suite.addTest(unittest.makeSuite(metakitSchemaTest))
+ suite.addTest(unittest.makeSuite(metakitClassicInitTest))
+ return suite
+
+if __name__ == '__main__':
+ runner = unittest.TextTestRunner()
+ unittest.main(testRunner=runner)
+
diff --git a/test/test_multipart.py b/test/test_multipart.py
index e58362929ed527166a95c837ee98522a262dd737..9022d8674ee9b6b176f75eff58862af73703fca0 100644 (file)
--- a/test/test_multipart.py
+++ b/test/test_multipart.py
# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#
-# $Id: test_multipart.py,v 1.5 2002-09-10 00:19:55 richard Exp $
+# $Id: test_multipart.py,v 1.6 2003-10-25 22:53:26 richard Exp $
import unittest, cStringIO
p = m.getPart()
self.assert_(p is None)
-def suite():
- return unittest.makeSuite(MultipartTestCase, 'test')
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(MultipartTestCase))
+ return suite
+
+if __name__ == '__main__':
+ runner = unittest.TextTestRunner()
+ unittest.main(testRunner=runner)
# vim: set filetype=python ts=4 sw=4 et si
diff --git a/test/test_mysql.py b/test/test_mysql.py
--- /dev/null
+++ b/test/test_mysql.py
@@ -0,0 +1,128 @@
+#
+# Copyright (c) 2001 Bizar Software Pty Ltd (http://www.bizarsoftware.com.au/)
+# This module is free software, and you may redistribute it and/or modify
+# under the same terms as Python, so long as this copyright message and
+# disclaimer are retained in their original form.
+#
+# IN NO EVENT SHALL BIZAR SOFTWARE PTY LTD BE LIABLE TO ANY PARTY FOR
+# DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING
+# OUT OF THE USE OF THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# BIZAR SOFTWARE PTY LTD SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING,
+# BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+# FOR A PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS"
+# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
+# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
+#
+# $Id: test_mysql.py,v 1.1 2003-10-25 22:53:26 richard Exp $
+
+import unittest, os, shutil, time, imp
+
+from roundup.hyperdb import DatabaseError
+from roundup import init
+
+from db_test_base import DBTest, ROTest, config, SchemaTest, nodbconfig, \
+ ClassicInitTest
+
+class mysqlOpener:
+ from roundup.backends import mysql as module
+
+ def tearDown(self):
+ self.db.close()
+ self.module.db_nuke(config)
+
+class mysqlDBTest(mysqlOpener, DBTest):
+ pass
+
+class mysqlROTest(mysqlOpener, ROTest):
+ pass
+
+class mysqlSchemaTest(mysqlOpener, SchemaTest):
+ pass
+
+class mysqlClassicInitTest(ClassicInitTest):
+ backend = 'mysql'
+
+ def testCreation(self):
+ ae = self.assertEqual
+
+ # create the instance
+ init.install(self.dirname, 'templates/classic')
+ init.write_select_db(self.dirname, self.backend)
+ f = open(os.path.join(self.dirname, 'config.py'), 'a')
+ try:
+ f.write('''
+MYSQL_DBHOST = 'localhost'
+MYSQL_DBUSER = 'rounduptest'
+MYSQL_DBPASSWORD = 'rounduptest'
+MYSQL_DBNAME = 'rounduptest'
+MYSQL_DATABASE = (MYSQL_DBHOST, MYSQL_DBUSER, MYSQL_DBPASSWORD, MYSQL_DBNAME)
+ ''')
+ finally:
+ f.close()
+ init.initialise(self.dirname, 'sekrit')
+
+ # check we can load the package
+ instance = imp.load_package(self.dirname, self.dirname)
+
+ # and open the database
+ db = instance.open()
+
+ # check the basics of the schema and initial data set
+ l = db.priority.list()
+ ae(l, ['1', '2', '3', '4', '5'])
+ l = db.status.list()
+ ae(l, ['1', '2', '3', '4', '5', '6', '7', '8'])
+ l = db.keyword.list()
+ ae(l, [])
+ l = db.user.list()
+ ae(l, ['1', '2'])
+ l = db.msg.list()
+ ae(l, [])
+ l = db.file.list()
+ ae(l, [])
+ l = db.issue.list()
+ ae(l, [])
+
+ from roundup.backends import mysql as module
+ def tearDown(self):
+ ClassicInitTest.tearDown(self)
+ self.module.db_nuke(config)
+
+def test_suite():
+ suite = unittest.TestSuite()
+
+ from roundup import backends
+ if not hasattr(backends, 'mysql'):
+ return suite
+
+ from roundup.backends import mysql
+ try:
+ # Check if we can run mysql tests
+ import MySQLdb
+ db = mysql.Database(nodbconfig, 'admin')
+ db.conn.select_db(config.MYSQL_DBNAME)
+ db.sql("SHOW TABLES");
+ tables = db.sql_fetchall()
+ if 0: #tables:
+ # Database should be empty. We don't dare to delete any data
+ raise DatabaseError, "Database %s contains tables"%\
+ config.MYSQL_DBNAME
+ db.sql("DROP DATABASE %s" % config.MYSQL_DBNAME)
+ db.sql("CREATE DATABASE %s" % config.MYSQL_DBNAME)
+ db.close()
+ except (MySQLdb.ProgrammingError, DatabaseError), msg:
+ print "Skipping mysql tests (%s)"%msg
+ else:
+ print 'Including mysql tests'
+ suite.addTest(unittest.makeSuite(mysqlDBTest))
+ suite.addTest(unittest.makeSuite(mysqlROTest))
+ suite.addTest(unittest.makeSuite(mysqlSchemaTest))
+ suite.addTest(unittest.makeSuite(mysqlClassicInitTest))
+ return suite
+
+if __name__ == '__main__':
+ runner = unittest.TextTestRunner()
+ unittest.main(testRunner=runner)
+
diff --git a/test/test_postgresql.py b/test/test_postgresql.py
--- /dev/null
+++ b/test/test_postgresql.py
@@ -0,0 +1,78 @@
+#
+# Copyright (c) 2001 Bizar Software Pty Ltd (http://www.bizarsoftware.com.au/)
+# This module is free software, and you may redistribute it and/or modify
+# under the same terms as Python, so long as this copyright message and
+# disclaimer are retained in their original form.
+#
+# IN NO EVENT SHALL BIZAR SOFTWARE PTY LTD BE LIABLE TO ANY PARTY FOR
+# DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING
+# OUT OF THE USE OF THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# BIZAR SOFTWARE PTY LTD SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING,
+# BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+# FOR A PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS"
+# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
+# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
+#
+# $Id: test_postgresql.py,v 1.1 2003-10-25 22:53:26 richard Exp $
+
+import unittest, os, shutil, time
+
+from roundup.hyperdb import DatabaseError
+
+from db_test_base import DBTest, ROTest, config, SchemaTest, nodbconfig, \
+ ClassicInitTest
+
+from roundup import backends
+
+class postgresqlOpener:
+ if hasattr(backends, 'metakit'):
+ from roundup.backends import postgresql as module
+
+ def tearDown(self):
+ self.db.close()
+ self.module.Database.nuke(config)
+
+class postgresqlDBTest(postgresqlOpener, DBTest):
+ pass
+
+class postgresqlROTest(postgresqlOpener, ROTest):
+ pass
+
+class postgresqlSchemaTest(postgresqlOpener, SchemaTest):
+ pass
+
+class postgresqlClassicInitTest(ClassicInitTest):
+ backend = 'postgresql'
+
+def test_suite():
+ suite = unittest.TestSuite()
+ if not hasattr(backends, 'postgresql'):
+ return suite
+
+ from roundup.backends import postgresql
+ try:
+ # Check if we can run postgresql tests
+ import psycopg
+ db = psycopg.Database(nodbconfig, 'admin')
+ db.conn.select_db(config.POSTGRESQL_DBNAME)
+ db.sql("SHOW TABLES");
+ tables = db.sql_fetchall()
+ if tables:
+ # Database should be empty. We don't dare to delete any data
+ raise DatabaseError, "(Database %s contains tables)"%\
+ config.POSTGRESQL_DBNAME
+ db.sql("DROP DATABASE %s" % config.POSTGRESQL_DBNAME)
+ db.sql("CREATE DATABASE %s" % config.POSTGRESQL_DBNAME)
+ db.close()
+ except (MySQLdb.ProgrammingError, DatabaseError), msg:
+ print "Skipping postgresql tests (%s)"%msg
+ else:
+ print 'Including postgresql tests'
+ suite.addTest(unittest.makeSuite(postgresqlDBTest))
+ suite.addTest(unittest.makeSuite(postgresqlROTest))
+ suite.addTest(unittest.makeSuite(postgresqlSchemaTest))
+ suite.addTest(unittest.makeSuite(postgresqlClassicInitTest))
+ return suite
+
diff --git a/test/test_schema.py b/test/test_schema.py
index 051b07d9d562f8c649bc2daf878a71f4d73943fb..d6c9e6455057ceeb3f8b684664af5f34499b301e 100644 (file)
--- a/test/test_schema.py
+++ b/test/test_schema.py
# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#
-# $Id: test_schema.py,v 1.12 2002-09-20 05:08:00 richard Exp $
+# $Id: test_schema.py,v 1.13 2003-10-25 22:53:26 richard Exp $
import unittest, os, shutil
user.setkey("username")
-def suite():
- return unittest.makeSuite(SchemaTestCase, 'test')
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(SchemaTestCase))
+ return suite
+
+if __name__ == '__main__':
+ runner = unittest.TextTestRunner()
+ unittest.main(testRunner=runner)
# vim: set filetype=python ts=4 sw=4 et si
diff --git a/test/test_security.py b/test/test_security.py
index 97e759f7a58e3da873a4395062c9c6fb0ae97fc5..00a8a24223094deb78fe7ff2315a77856abaa152 100644 (file)
--- a/test/test_security.py
+++ b/test/test_security.py
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
-# $Id: test_security.py,v 1.5 2002-09-20 05:08:00 richard Exp $
+# $Id: test_security.py,v 1.6 2003-10-25 22:53:26 richard Exp $
import os, unittest, shutil
from roundup.password import Password
-from test_db import setupSchema, MyTestCase, config
+from db_test_base import setupSchema, MyTestCase, config
class PermissionTest(MyTestCase):
def setUp(self):
self.assertEquals(self.db.security.hasNodePermission('issue',
issueid, nosy=userid), 1)
-def suite():
- return unittest.makeSuite(PermissionTest)
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(PermissionTest))
+ return suite
+if __name__ == '__main__':
+ runner = unittest.TextTestRunner()
+ unittest.main(testRunner=runner)
# vim: set filetype=python ts=4 sw=4 et si
diff --git a/test/test_sqlite.py b/test/test_sqlite.py
--- /dev/null
+++ b/test/test_sqlite.py
@@ -0,0 +1,56 @@
+#
+# Copyright (c) 2001 Bizar Software Pty Ltd (http://www.bizarsoftware.com.au/)
+# This module is free software, and you may redistribute it and/or modify
+# under the same terms as Python, so long as this copyright message and
+# disclaimer are retained in their original form.
+#
+# IN NO EVENT SHALL BIZAR SOFTWARE PTY LTD BE LIABLE TO ANY PARTY FOR
+# DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING
+# OUT OF THE USE OF THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# BIZAR SOFTWARE PTY LTD SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING,
+# BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+# FOR A PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS"
+# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
+# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
+#
+# $Id: test_sqlite.py,v 1.1 2003-10-25 22:53:26 richard Exp $
+
+import unittest, os, shutil, time
+
+from db_test_base import DBTest, ROTest, SchemaTest, \
+ ClassicInitTest
+
+class sqliteOpener:
+ from roundup.backends import sqlite as module
+
+class sqliteDBTest(sqliteOpener, DBTest):
+ pass
+
+class sqliteROTest(sqliteOpener, ROTest):
+ pass
+
+class sqliteSchemaTest(sqliteOpener, SchemaTest):
+ pass
+
+class sqliteClassicInitTest(ClassicInitTest):
+ backend = 'sqlite'
+
+def test_suite():
+ suite = unittest.TestSuite()
+ from roundup import backends
+ if not hasattr(backends, 'sqlite'):
+ print 'Skipping sqlite tests'
+ return suite
+ print 'Including sqlite tests'
+ suite.addTest(unittest.makeSuite(sqliteDBTest))
+ suite.addTest(unittest.makeSuite(sqliteROTest))
+ suite.addTest(unittest.makeSuite(sqliteSchemaTest))
+ suite.addTest(unittest.makeSuite(sqliteClassicInitTest))
+ return suite
+
+if __name__ == '__main__':
+ runner = unittest.TextTestRunner()
+ unittest.main(testRunner=runner)
+
diff --git a/test/test_token.py b/test/test_token.py
index 7318a0c6e29b5643d311ed2e219601360b09ae42..505a0c816a79012b6dfeeac975090216803b977a 100644 (file)
--- a/test/test_token.py
+++ b/test/test_token.py
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
-# $Id: test_token.py,v 1.2 2002-09-10 00:19:55 richard Exp $
+# $Id: test_token.py,v 1.3 2003-10-25 22:53:26 richard Exp $
import unittest, time
self.assertRaises(ValueError, token_split, '"hello world')
self.assertRaises(ValueError, token_split, "Roch'e Compaan")
-def suite():
- return unittest.makeSuite(TokenTestCase, 'test')
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(TokenTestCase))
+ return suite
+if __name__ == '__main__':
+ runner = unittest.TextTestRunner()
+ unittest.main(testRunner=runner)
# vim: set filetype=python ts=4 sw=4 et si