diff --git a/test/test_db.py b/test/test_db.py
index 6d5cabc0cd6cbc9bafe2c7101cba30e14c5a55cc..315ab2fd3e71bd55bc39fe8c24b2c59513f3c6e0 100644 (file)
--- a/test/test_db.py
+++ b/test/test_db.py
# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#
-# $Id: test_db.py,v 1.49 2002-09-18 07:04:39 richard Exp $
+# $Id: test_db.py,v 1.55 2002-09-24 01:59:44 richard Exp $
import unittest, os, shutil, time
assignable=Boolean(), age=Number(), roles=String())
user.setkey("username")
file = module.FileClass(db, "file", name=String(), type=String(),
- comment=String(indexme="yes"))
+ comment=String(indexme="yes"), fooz=Password())
issue = module.IssueClass(db, "issue", title=String(indexme="yes"),
status=Link("status"), nosy=Multilink("user"), deadline=Date(),
foo=Interval(), files=Multilink("file"), assignedto=Link('user'))
session.disableJournalling()
db.post_init()
if create:
+ user.create(username="admin", roles='Admin')
status.create(name="unread")
status.create(name="in-progress")
status.create(name="testing")
if os.path.exists(config.DATABASE):
shutil.rmtree(config.DATABASE)
os.makedirs(config.DATABASE + '/files')
- self.db = anydbm.Database(config, 'test')
+ self.db = anydbm.Database(config, 'admin')
setupSchema(self.db, 1, anydbm)
- self.db2 = anydbm.Database(config, 'test')
+ self.db2 = anydbm.Database(config, 'admin')
setupSchema(self.db2, 0, anydbm)
def testStringChange(self):
self.assertEqual(self.db.user.get('1', "assignable"), None)
def testNumberChange(self):
- self.db.user.create(username='foo', age=1)
- self.assertEqual(1, self.db.user.get('1', 'age'))
+ nid = self.db.user.create(username='foo', age=1)
+ self.assertEqual(1, self.db.user.get(nid, 'age'))
self.db.user.set('1', age=3)
self.assertNotEqual(self.db.user.get('1', 'age'), 1)
self.db.user.set('1', age=1.0)
self.db.user.set('1', age=None)
self.assertEqual(self.db.user.get('1', "age"), None)
+ def testKeyValue(self):
+ newid = self.db.user.create(username="spam")
+ self.assertEqual(self.db.user.lookup('spam'), newid)
+ self.db.commit()
+ self.assertEqual(self.db.user.lookup('spam'), newid)
+ self.db.user.retire(newid)
+ self.assertRaises(KeyError, self.db.user.lookup, 'spam')
+
def testNewProperty(self):
self.db.issue.create(title="spam", status='1')
self.db.issue.addprop(fixer=Link("user"))
# key property
#
# key must be a String
- ar(TypeError, self.db.user.setkey, 'password')
+ ar(TypeError, self.db.file.setkey, 'fooz')
# key must exist
- ar(KeyError, self.db.user.setkey, 'fubar')
+ ar(KeyError, self.db.file.setkey, 'fubar')
#
# class get
#
# invalid node id
- ar(IndexError, self.db.issue.get, '1', 'title')
+ ar(IndexError, self.db.issue.get, '99', 'title')
# invalid property name
ar(KeyError, self.db.status.get, '2', 'foo')
# class set
#
# invalid node id
- ar(IndexError, self.db.issue.set, '1', title='foo')
+ ar(IndexError, self.db.issue.set, '99', title='foo')
# invalid property name
ar(KeyError, self.db.status.set, '1', foo='foo')
# string property
# key name clash
ar(ValueError, self.db.status.set, '2', name='unread')
# set up a valid issue for me to work on
- self.db.issue.create(title="spam", status='1')
+ id = self.db.issue.create(title="spam", status='1')
# invalid link index
- ar(IndexError, self.db.issue.set, '6', title='foo', status='bar')
+ ar(IndexError, self.db.issue.set, id, title='foo', status='bar')
# invalid link value
- ar(ValueError, self.db.issue.set, '6', title='foo', status=1)
+ ar(ValueError, self.db.issue.set, id, title='foo', status=1)
# invalid multilink type
- ar(TypeError, self.db.issue.set, '6', title='foo', status='1',
+ ar(TypeError, self.db.issue.set, id, title='foo', status='1',
nosy='hello')
# invalid multilink index type
- ar(ValueError, self.db.issue.set, '6', title='foo', status='1',
+ ar(ValueError, self.db.issue.set, id, title='foo', status='1',
nosy=[1])
# invalid multilink index
- ar(IndexError, self.db.issue.set, '6', title='foo', status='1',
+ ar(IndexError, self.db.issue.set, id, title='foo', status='1',
nosy=['10'])
# invalid number value
ar(TypeError, self.db.user.create, username='foo', age='a')
# invalid boolean value
ar(TypeError, self.db.user.create, username='foo', assignable='true')
- self.db.user.create(username='foo')
+ nid = self.db.user.create(username='foo')
# invalid number value
- ar(TypeError, self.db.user.set, '3', username='foo', age='a')
+ ar(TypeError, self.db.user.set, nid, username='foo', age='a')
# invalid boolean value
- ar(TypeError, self.db.user.set, '3', username='foo', assignable='true')
+ ar(TypeError, self.db.user.set, nid, username='foo', assignable='true')
def testJournals(self):
self.db.user.create(username="mary")
self.assertEqual(1, len(journal))
(nodeid, date_stamp, journaltag, action, params) = journal[0]
self.assertEqual(nodeid, '1')
- self.assertEqual(journaltag, 'test')
+ self.assertEqual(journaltag, self.db.user.lookup('admin'))
self.assertEqual(action, 'create')
keys = params.keys()
keys.sort()
self.assertEqual(2, len(journal))
(nodeid, date_stamp, journaltag, action, params) = journal[1]
self.assertEqual('1', nodeid)
- self.assertEqual('test', journaltag)
+ self.assertEqual('1', journaltag)
self.assertEqual('link', action)
self.assertEqual(('issue', '1', 'assignedto'), params)
self.assertEqual(3, len(journal))
(nodeid, date_stamp, journaltag, action, params) = journal[2]
self.assertEqual('1', nodeid)
- self.assertEqual('test', journaltag)
+ self.assertEqual('1', journaltag)
self.assertEqual('unlink', action)
self.assertEqual(('issue', '1', 'assignedto'), params)
(x, date_stamp2, x, x, x) = entry
# see if the change was journalled when it shouldn't have been
self.assertEqual(date_stamp, date_stamp2)
+ time.sleep(1)
self.db.issue.enableJournalling()
self.db.issue.set('1', title='hello world 2')
self.db.commit()
self.assertNotEqual(date_stamp, date_stamp2)
def testPack(self):
- self.db.issue.create(title="spam", status='1')
+ id = self.db.issue.create(title="spam", status='1')
self.db.commit()
- self.db.issue.set('1', status='2')
+ self.db.issue.set(id, status='2')
self.db.commit()
# sleep for at least a second, then get a date to pack at
- time.sleep(1)
+ time.sleep(2)
pack_before = date.Date('.')
+ time.sleep(2)
# one more entry
- self.db.issue.set('1', status='3')
+ self.db.issue.set(id, status='3')
self.db.commit()
# pack
self.db.pack(pack_before)
- journal = self.db.getjournal('issue', '1')
+ journal = self.db.getjournal('issue', id)
# we should have the create and last set entries now
self.assertEqual(2, len(journal))
self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
{'1': {}})
+ def testStringFind(self):
+ ids = []
+ ids.append(self.db.issue.create(title="spam"))
+ self.db.issue.create(title="not spam")
+ ids.append(self.db.issue.create(title="spam"))
+ ids.sort()
+ got = self.db.issue.stringFind(title='spam')
+ got.sort()
+ self.assertEqual(got, ids)
+ self.assertEqual(self.db.issue.stringFind(title='fubar'), [])
+
+ def filteringSetup(self):
+ for user in (
+ {'username': 'bleep'},
+ {'username': 'blop'},
+ {'username': 'blorp'}):
+ self.db.user.create(**user)
+ iss = self.db.issue
+ for issue in (
+ {'title': 'issue one', 'status': '2'},
+ {'title': 'issue two', 'status': '1'},
+ {'title': 'issue three', 'status': '1', 'nosy': ['1','2']}):
+ self.db.issue.create(**issue)
+ self.db.commit()
+ return self.assertEqual, self.db.issue.filter
+
+ def testFilteringString(self):
+ ae, filt = self.filteringSetup()
+ ae(filt(None, {'title': 'issue one'}, ('+','id'), (None,None)), ['1'])
+ ae(filt(None, {'title': 'issue'}, ('+','id'), (None,None)),
+ ['1','2','3'])
+
+ def testFilteringLink(self):
+ ae, filt = self.filteringSetup()
+ ae(filt(None, {'status': '1'}, ('+','id'), (None,None)), ['2','3'])
+
+ def testFilteringMultilink(self):
+ ae, filt = self.filteringSetup()
+ ae(filt(None, {'nosy': '2'}, ('+','id'), (None,None)), ['3'])
+
+ def testFilteringMany(self):
+ ae, filt = self.filteringSetup()
+ ae(filt(None, {'nosy': '2', 'status': '1'}, ('+','id'), (None,None)),
+ ['3'])
+
class anydbmReadOnlyDBTestCase(MyTestCase):
def setUp(self):
from roundup.backends import anydbm
if os.path.exists(config.DATABASE):
shutil.rmtree(config.DATABASE)
os.makedirs(config.DATABASE + '/files')
- db = anydbm.Database(config, 'test')
+ db = anydbm.Database(config, 'admin')
setupSchema(db, 1, anydbm)
self.db = anydbm.Database(config)
setupSchema(self.db, 0, anydbm)
- self.db2 = anydbm.Database(config, 'test')
+ self.db2 = anydbm.Database(config, 'admin')
setupSchema(self.db2, 0, anydbm)
def testExceptions(self):
if os.path.exists(config.DATABASE):
shutil.rmtree(config.DATABASE)
os.makedirs(config.DATABASE + '/files')
- self.db = bsddb.Database(config, 'test')
+ self.db = bsddb.Database(config, 'admin')
setupSchema(self.db, 1, bsddb)
- self.db2 = bsddb.Database(config, 'test')
+ self.db2 = bsddb.Database(config, 'admin')
setupSchema(self.db2, 0, bsddb)
class bsddbReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
if os.path.exists(config.DATABASE):
shutil.rmtree(config.DATABASE)
os.makedirs(config.DATABASE + '/files')
- db = bsddb.Database(config, 'test')
+ db = bsddb.Database(config, 'admin')
setupSchema(db, 1, bsddb)
self.db = bsddb.Database(config)
setupSchema(self.db, 0, bsddb)
- self.db2 = bsddb.Database(config, 'test')
+ self.db2 = bsddb.Database(config, 'admin')
setupSchema(self.db2, 0, bsddb)
if os.path.exists(config.DATABASE):
shutil.rmtree(config.DATABASE)
os.makedirs(config.DATABASE + '/files')
- self.db = bsddb3.Database(config, 'test')
+ self.db = bsddb3.Database(config, 'admin')
setupSchema(self.db, 1, bsddb3)
- self.db2 = bsddb3.Database(config, 'test')
+ self.db2 = bsddb3.Database(config, 'admin')
setupSchema(self.db2, 0, bsddb3)
class bsddb3ReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
if os.path.exists(config.DATABASE):
shutil.rmtree(config.DATABASE)
os.makedirs(config.DATABASE + '/files')
- db = bsddb3.Database(config, 'test')
+ db = bsddb3.Database(config, 'admin')
setupSchema(db, 1, bsddb3)
self.db = bsddb3.Database(config)
setupSchema(self.db, 0, bsddb3)
- self.db2 = bsddb3.Database(config, 'test')
+ self.db2 = bsddb3.Database(config, 'admin')
setupSchema(self.db2, 0, bsddb3)
shutil.rmtree(config.DATABASE)
config.GADFLY_DATABASE = ('test', config.DATABASE)
os.makedirs(config.DATABASE + '/files')
- self.db = gadfly.Database(config, 'test')
+ self.db = gadfly.Database(config, 'admin')
setupSchema(self.db, 1, gadfly)
def testIDGeneration(self):
shutil.rmtree(config.DATABASE)
config.GADFLY_DATABASE = ('test', config.DATABASE)
os.makedirs(config.DATABASE + '/files')
- db = gadfly.Database(config, 'test')
+ db = gadfly.Database(config, 'admin')
setupSchema(db, 1, gadfly)
self.db = gadfly.Database(config)
setupSchema(self.db, 0, gadfly)
if os.path.exists(config.DATABASE):
shutil.rmtree(config.DATABASE)
os.makedirs(config.DATABASE + '/files')
- self.db = sqlite.Database(config, 'test')
+ self.db = sqlite.Database(config, 'admin')
setupSchema(self.db, 1, sqlite)
def testIDGeneration(self):
if os.path.exists(config.DATABASE):
shutil.rmtree(config.DATABASE)
os.makedirs(config.DATABASE + '/files')
- db = sqlite.Database(config, 'test')
+ db = sqlite.Database(config, 'admin')
setupSchema(db, 1, sqlite)
self.db = sqlite.Database(config)
setupSchema(self.db, 0, sqlite)
if os.path.exists(config.DATABASE):
shutil.rmtree(config.DATABASE)
os.makedirs(config.DATABASE + '/files')
- self.db = metakit.Database(config, 'test')
+ self.db = metakit.Database(config, 'admin')
setupSchema(self.db, 1, metakit)
- #self.db2 = metakit.Database(config, 'test')
- #setupSchema(self.db2, 0, metakit)
def testIDGeneration(self):
id1 = self.db.issue.create(title="spam", status='1')
if os.path.exists(config.DATABASE):
shutil.rmtree(config.DATABASE)
os.makedirs(config.DATABASE + '/files')
- db = metakit.Database(config, 'test')
+ db = metakit.Database(config, 'admin')
setupSchema(db, 1, metakit)
self.db = metakit.Database(config)
setupSchema(self.db, 0, metakit)
-# self.db2 = metakit.Database(config, 'test')
-# setupSchema(self.db2, 0, metakit)
def suite():
l = [
]
# return unittest.TestSuite(l)
- try:
- import bsddb
- l.append(unittest.makeSuite(bsddbDBTestCase, 'test'))
- l.append(unittest.makeSuite(bsddbReadOnlyDBTestCase, 'test'))
- except:
- print 'bsddb module not found, skipping bsddb DBTestCase'
-
- try:
- import bsddb3
- l.append(unittest.makeSuite(bsddb3DBTestCase, 'test'))
- l.append(unittest.makeSuite(bsddb3ReadOnlyDBTestCase, 'test'))
- except:
- print 'bsddb3 module not found, skipping bsddb3 DBTestCase'
-
- try:
- import gadfly
+ from roundup import backends
+ if hasattr(backends, 'gadfly'):
l.append(unittest.makeSuite(gadflyDBTestCase, 'test'))
l.append(unittest.makeSuite(gadflyReadOnlyDBTestCase, 'test'))
- except:
- print 'gadfly module not found, skipping gadfly DBTestCase'
- try:
- import sqlite
+ if hasattr(backends, 'sqlite'):
l.append(unittest.makeSuite(sqliteDBTestCase, 'test'))
l.append(unittest.makeSuite(sqliteReadOnlyDBTestCase, 'test'))
- except:
- print 'sqlite module not found, skipping gadfly DBTestCase'
- try:
- import metakit
+ if hasattr(backends, 'bsddb'):
+ l.append(unittest.makeSuite(bsddbDBTestCase, 'test'))
+ l.append(unittest.makeSuite(bsddbReadOnlyDBTestCase, 'test'))
+
+ if hasattr(backends, 'bsddb3'):
+ l.append(unittest.makeSuite(bsddb3DBTestCase, 'test'))
+ l.append(unittest.makeSuite(bsddb3ReadOnlyDBTestCase, 'test'))
+
+ if hasattr(backends, 'metakit'):
l.append(unittest.makeSuite(metakitDBTestCase, 'test'))
l.append(unittest.makeSuite(metakitReadOnlyDBTestCase, 'test'))
- except:
- print 'metakit module not found, skipping metakit DBTestCase'
return unittest.TestSuite(l)