summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: b2f7d91)
raw | patch | inline | side by side (parent: b2f7d91)
author | richard <richard@57a73879-2fb5-44c3-a270-3262357dd7e2> | |
Fri, 5 Dec 2003 09:47:46 +0000 (09:47 +0000) | ||
committer | richard <richard@57a73879-2fb5-44c3-a270-3262357dd7e2> | |
Fri, 5 Dec 2003 09:47:46 +0000 (09:47 +0000) |
git-svn-id: http://svn.roundup-tracker.org/svnroot/roundup/trunk@2018 57a73879-2fb5-44c3-a270-3262357dd7e2
roundup/backends/back_anydbm.py | patch | blob | history | |
roundup/backends/rdbms_common.py | patch | blob | history | |
test/db_test_base.py | patch | blob | history |
index d6aebc43da3d022ad9d1a4cc8123de59a2cccdb8..090965e846f791b3ac845766f45f242e10d3e1d6 100644 (file)
# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#
-#$Id: back_anydbm.py,v 1.132 2003-11-16 18:41:40 jlgijsbers Exp $
+#$Id: back_anydbm.py,v 1.133 2003-12-05 09:47:46 richard Exp $
'''
This module defines a backend that saves the hyperdatabase in a database
chosen by anydbm. It is guaranteed to always be available in python
if db is None:
db = self.getclassdb(classname)
if not db.has_key(nodeid):
- # try the cache - might be a brand-new node
- cache_dict = self.cache.setdefault(classname, {})
- if cache_dict.has_key(nodeid):
- if __debug__:
- print >>hyperdb.TRACE, 'get %s %s cached'%(classname,
- nodeid)
- return cache_dict[nodeid]
raise IndexError, "no such %s %s"%(classname, nodeid)
# check the uncommitted, destroyed nodes
'''
for propname in requirements.keys():
prop = self.properties[propname]
- if isinstance(not prop, String):
+ if not isinstance(prop, String):
raise TypeError, "'%s' not a String property"%propname
requirements[propname] = requirements[propname].lower()
l = []
index bdb60ac3493c1c9b369649b5e97d8bc36b3e73dd..45078c44b0f3237e3bdcf31e4c4c89fdbcf53f94 100644 (file)
-# $Id: rdbms_common.py,v 1.71 2003-11-16 18:41:40 jlgijsbers Exp $
+# $Id: rdbms_common.py,v 1.72 2003-12-05 09:47:46 richard Exp $
''' Relational database (SQL) backend common code.
Basics:
self.conn.commit()
def refresh_database(self):
- # now detect changes in the schema
- for classname, spec in self.classes.items():
- dbspec = self.database_schema[classname]
- self.update_class(spec, dbspec, force=1)
- self.database_schema[classname] = spec.schema()
- # update the database version of the schema
- self.sql('delete from schema')
- self.save_dbschema(self.database_schema)
- # reindex the db
- self.reindex()
- # commit
- self.conn.commit()
-
+ self.post_init()
def reindex(self):
for klass in self.classes.values():
raise TypeError, "'%s' not a Link/Multilink property"%propname
# first, links
- where = []
- allvalues = ()
+ where = ['__retired__ = %s']
+ allvalues = (0,)
a = self.db.arg
for prop, values in propspec:
if not isinstance(props[prop], hyperdb.Link):
args = []
for propname in requirements.keys():
prop = self.properties[propname]
- if isinstance(not prop, String):
+ if not isinstance(prop, String):
raise TypeError, "'%s' not a String property"%propname
where.append(propname)
args.append(requirements[propname].lower())
# generate the where clause
s = ' and '.join(['lower(_%s)=%s'%(col, self.db.arg) for col in where])
- sql = 'select id from _%s where %s'%(self.classname, s)
+ sql = 'select id from _%s where %s and __retired__=%s'%(self.classname,
+ s, self.db.arg)
+ args.append(0)
self.db.sql(sql, tuple(args))
l = [x[0] for x in self.db.sql_fetchall()]
if __debug__:
diff --git a/test/db_test_base.py b/test/db_test_base.py
index d7b65be8fc51622e9c2a49592d73f8a53809145b..681cf4f37d2c430b8c488c824a55916648ef9da5 100644 (file)
--- a/test/db_test_base.py
+++ b/test/db_test_base.py
# BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#
-# $Id: db_test_base.py,v 1.10 2003-11-16 22:56:46 jlgijsbers Exp $
+# $Id: db_test_base.py,v 1.11 2003-12-05 09:47:46 richard Exp $
import unittest, os, shutil, errno, imp, sys, time, pprint
issue = module.IssueClass(db, "issue", title=String(indexme="yes"),
status=Link("status"), nosy=Multilink("user"), deadline=Date(),
foo=Interval(), files=Multilink("file"), assignedto=Link('user'))
+ stuff = module.Class(db, "stuff", stuff=String())
session = module.Class(db, 'session', title=String())
session.disableJournalling()
db.post_init()
if create:
- user.create(username="admin", roles='Admin')
+ user.create(username="admin", roles='Admin',
+ password=password.Password('sekrit'))
status.create(name="unread")
status.create(name="in-progress")
status.create(name="testing")
self.db = self.module.Database(config, 'admin')
setupSchema(self.db, 1, self.module)
- #
- # schema mutation
- #
- def testAddProperty(self):
- self.db.issue.create(title="spam", status='1')
- self.db.commit()
-
- self.db.issue.addprop(fixer=Link("user"))
- # force any post-init stuff to happen
- self.db.post_init()
- props = self.db.issue.getprops()
- keys = props.keys()
- keys.sort()
- self.assertEqual(keys, ['activity', 'assignedto', 'creation',
- 'creator', 'deadline', 'files', 'fixer', 'foo', 'id', 'messages',
- 'nosy', 'status', 'superseder', 'title'])
- self.assertEqual(self.db.issue.get('1', "fixer"), None)
-
- def testRemoveProperty(self):
- self.db.issue.create(title="spam", status='1')
- self.db.commit()
-
- del self.db.issue.properties['title']
- self.db.post_init()
- props = self.db.issue.getprops()
- keys = props.keys()
- keys.sort()
- self.assertEqual(keys, ['activity', 'assignedto', 'creation',
- 'creator', 'deadline', 'files', 'foo', 'id', 'messages',
- 'nosy', 'status', 'superseder'])
- self.assertEqual(self.db.issue.list(), ['1'])
-
- def testAddRemoveProperty(self):
- self.db.issue.create(title="spam", status='1')
- self.db.commit()
-
- self.db.issue.addprop(fixer=Link("user"))
- del self.db.issue.properties['title']
- self.db.post_init()
- props = self.db.issue.getprops()
- keys = props.keys()
- keys.sort()
- self.assertEqual(keys, ['activity', 'assignedto', 'creation',
- 'creator', 'deadline', 'files', 'fixer', 'foo', 'id', 'messages',
- 'nosy', 'status', 'superseder'])
- self.assertEqual(self.db.issue.list(), ['1'])
+ def testRefresh(self):
+ self.db.refresh_database()
#
# basic operations
id2 = self.db.issue.create(title="eggs", status='2')
self.assertNotEqual(id1, id2)
+ def testEmptySet(self):
+ id1 = self.db.issue.create(title="spam", status='1')
+ self.db.issue.set(id1)
+
def testStringChange(self):
for commit in (0,1):
# test set & retrieve
self.assertEqual(self.db.issue.get(nid, "title"), None)
def testLinkChange(self):
+ self.assertRaises(IndexError, self.db.issue.create, title="spam",
+ status='100')
for commit in (0,1):
nid = self.db.issue.create(title="spam", status='1')
if commit: self.db.commit()
def testMultilinkChange(self):
for commit in (0,1):
+ self.assertRaises(IndexError, self.db.issue.create, title="spam",
+ nosy=['foo%s'%commit])
u1 = self.db.user.create(username='foo%s'%commit)
u2 = self.db.user.create(username='bar%s'%commit)
nid = self.db.issue.create(title="spam", nosy=[u1])
self.assertEqual(self.db.issue.get(nid, "nosy"), [u1,u2])
def testDateChange(self):
+ self.assertRaises(TypeError, self.db.issue.create,
+ title='spam', deadline=1)
for commit in (0,1):
nid = self.db.issue.create(title="spam", status='1')
+ self.assertRaises(TypeError, self.db.issue.set, nid, deadline=1)
a = self.db.issue.get(nid, "deadline")
if commit: self.db.commit()
self.db.issue.set(nid, deadline=date.Date())
self.assertEqual(self.db.issue.get(nid, "deadline"), None)
def testIntervalChange(self):
+ self.assertRaises(TypeError, self.db.issue.create,
+ title='spam', foo=1)
for commit in (0,1):
nid = self.db.issue.create(title="spam", status='1')
+ self.assertRaises(TypeError, self.db.issue.set, nid, foo=1)
if commit: self.db.commit()
a = self.db.issue.get(nid, "foo")
i = date.Interval('-1d')
self.db.user.set(nid, age=None)
self.assertEqual(self.db.user.get(nid, "age"), None)
+ def testPasswordChange(self):
+ x = password.Password('x')
+ userid = self.db.user.create(username='foo', password=x)
+ self.assertEqual(x, self.db.user.get(userid, 'password'))
+ self.assertEqual(self.db.user.get(userid, 'password'), 'x')
+ y = password.Password('y')
+ self.db.user.set(userid, password=y)
+ self.assertEqual(self.db.user.get(userid, 'password'), 'y')
+ self.assertRaises(TypeError, self.db.user.create, userid,
+ username='bar', password='x')
+ self.assertRaises(TypeError, self.db.user.set, userid, password='x')
+
+ def testPasswordUnset(self):
+ x = password.Password('x')
+ nid = self.db.user.create(username='foo', password=x)
+ self.db.user.set(nid, assignable=None)
+ self.assertEqual(self.db.user.get(nid, "assignable"), None)
+
def testKeyValue(self):
+ self.assertRaises(ValueError, self.db.user.create)
+
newid = self.db.user.create(username="spam")
self.assertEqual(self.db.user.lookup('spam'), newid)
self.db.commit()
# try to restore old node. this shouldn't succeed!
self.assertRaises(KeyError, self.db.user.restore, newid)
+ self.assertRaises(TypeError, self.db.issue.lookup, 'fubar')
+
+ def testLabelProp(self):
+ # key prop
+ self.assertEqual(self.db.status.labelprop(), 'name')
+ self.assertEqual(self.db.user.labelprop(), 'username')
+ # title
+ self.assertEqual(self.db.issue.labelprop(), 'title')
+ # name
+ self.assertEqual(self.db.file.labelprop(), 'name')
+ # id
+ self.assertEqual(self.db.stuff.labelprop(default_to_id=1), 'id')
+
def testRetire(self):
self.db.issue.create(title="spam", status='1')
b = self.db.status.get('1', 'name')
self.assertNotEqual(a, self.db.status.list())
# can still access the node if necessary
self.assertEqual(self.db.status.get('1', 'name'), b)
+ self.assertRaises(IndexError, self.db.status.set, '1', name='hello')
self.db.commit()
self.assertEqual(self.db.status.get('1', 'name'), b)
self.assertNotEqual(a, self.db.status.list())
newid = klass.create(title='Mr Friendly')
n = len(klass.list())
self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
+ count = klass.count()
klass.destroy(newid)
+ self.assertNotEqual(count, klass.count())
self.assertRaises(IndexError, klass.get, newid, 'title')
self.assertNotEqual(len(klass.list()), n)
if klass.do_journal:
n = len(klass.list())
self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
self.db.commit()
+ count = klass.count()
klass.destroy(newid)
+ self.assertNotEqual(count, klass.count())
self.assertRaises(IndexError, klass.get, newid, 'title')
self.db.commit()
self.assertRaises(IndexError, klass.get, newid, 'title')
n = len(klass.list())
self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
self.db.commit()
+ count = klass.count()
klass.destroy(newid)
self.assertNotEqual(len(klass.list()), n)
self.assertRaises(IndexError, klass.get, newid, 'title')
self.db.rollback()
+ self.assertEqual(count, klass.count())
self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
self.assertEqual(len(klass.list()), n)
if klass.do_journal:
# this tests the exceptions that should be raised
ar = self.assertRaises
+ ar(KeyError, self.db.getclass, 'fubar')
+
#
# class create
#
# string property
ar(TypeError, self.db.status.create, name=1)
+ # id, creation, creator and activity properties are reserved
+ ar(KeyError, self.db.status.create, id=1)
+ ar(KeyError, self.db.status.create, creation=1)
+ ar(KeyError, self.db.status.create, creator=1)
+ ar(KeyError, self.db.status.create, activity=1)
# invalid property name
ar(KeyError, self.db.status.create, foo='foo')
# key name clash
# searching tests follow
#
def testFind(self):
+ self.assertRaises(TypeError, self.db.issue.find, title='fubar')
+
self.db.user.create(username='test')
ids = []
ids.append(self.db.issue.create(status="1", nosy=['1']))
self.assertEqual(self.db.issue.find(status='4', nosy='3'), [])
self.assertEqual(self.db.issue.find(status={'4':1}, nosy={'3':1}), [])
+ # test retiring a node
+ self.db.issue.retire(ids[0])
+ self.assertEqual(len(self.db.issue.find(status='1', nosy='2')), 2)
+
def testStringFind(self):
+ self.assertRaises(TypeError, self.db.issue.stringFind, status='1')
+
ids = []
ids.append(self.db.issue.create(title="spam"))
self.db.issue.create(title="not spam")
self.assertEqual(got, ids)
self.assertEqual(self.db.issue.stringFind(title='fubar'), [])
+ # test retiring a node
+ self.db.issue.retire(ids[0])
+ self.assertEqual(len(self.db.issue.stringFind(title='spam')), 1)
+
def filteringSetup(self):
for user in (
{'username': 'bleep'},
for cn, items in export.items():
klass = self.db.classes[cn]
names = items[0]
+ maxid = 1
for itemprops in items[1:]:
- klass.import_list(names, itemprops)
+ maxid = max(maxid, int(klass.import_list(names, itemprops)))
+ self.db.setid(cn, str(maxid+1))
- # grab snapshot of the current database
+ # compare with snapshot of the database
for cn, items in orig.items():
klass = self.db.classes[cn]
# ensure retired items are retired :)
ae(self.db.user.get('3', 'username'), 'blop')
ae(self.db.issue.get('2', 'title'), 'issue two')
+ # make sure id counters are set correctly
+ maxid = max([int(id) for id in self.db.user.list()])
+ newid = self.db.user.create(username='testing')
+ assert newid > maxid
+
def testSafeGet(self):
# existent nodeid, existent property
self.assertEqual(self.db.user.safeget('1', 'username'), 'admin')
# different default
self.assertEqual(self.db.issue.safeget('999', 'nosy', []), [])
+ def testAddProperty(self):
+ self.db.issue.create(title="spam", status='1')
+ self.db.commit()
+
+ self.db.issue.addprop(fixer=Link("user"))
+ # force any post-init stuff to happen
+ self.db.post_init()
+ props = self.db.issue.getprops()
+ keys = props.keys()
+ keys.sort()
+ self.assertEqual(keys, ['activity', 'assignedto', 'creation',
+ 'creator', 'deadline', 'files', 'fixer', 'foo', 'id', 'messages',
+ 'nosy', 'status', 'superseder', 'title'])
+ self.assertEqual(self.db.issue.get('1', "fixer"), None)
+
+ def testRemoveProperty(self):
+ self.db.issue.create(title="spam", status='1')
+ self.db.commit()
+
+ del self.db.issue.properties['title']
+ self.db.post_init()
+ props = self.db.issue.getprops()
+ keys = props.keys()
+ keys.sort()
+ self.assertEqual(keys, ['activity', 'assignedto', 'creation',
+ 'creator', 'deadline', 'files', 'foo', 'id', 'messages',
+ 'nosy', 'status', 'superseder'])
+ self.assertEqual(self.db.issue.list(), ['1'])
+
+ def testAddRemoveProperty(self):
+ self.db.issue.create(title="spam", status='1')
+ self.db.commit()
+
+ self.db.issue.addprop(fixer=Link("user"))
+ del self.db.issue.properties['title']
+ self.db.post_init()
+ props = self.db.issue.getprops()
+ keys = props.keys()
+ keys.sort()
+ self.assertEqual(keys, ['activity', 'assignedto', 'creation',
+ 'creator', 'deadline', 'files', 'fixer', 'foo', 'id', 'messages',
+ 'nosy', 'status', 'superseder'])
+ self.assertEqual(self.db.issue.list(), ['1'])
+
class ROTest(MyTestCase):
def setUp(self):
# remove previous test, ignore errors
shutil.rmtree(config.DATABASE)
os.makedirs(config.DATABASE + '/files')
+ def test_reservedProperties(self):
+ self.db = self.module.Database(config, 'admin')
+ self.assertRaises(ValueError, self.module.Class, self.db, "a",
+ creation=String())
+ self.assertRaises(ValueError, self.module.Class, self.db, "a",
+ activity=String())
+ self.assertRaises(ValueError, self.module.Class, self.db, "a",
+ creator=String())
+
def init_a(self):
self.db = self.module.Database(config, 'admin')
a = self.module.Class(self.db, "a", name=String())
def test_addNewClass(self):
self.init_a()
+
+ self.assertRaises(ValueError, self.module.Class, self.db, "a",
+ name=String())
+
aid = self.db.a.create(name='apple')
self.db.commit(); self.db.close()
# confirm journal's ok
self.db.getjournal('a', aid)
+class RDBMSTest:
+ ''' tests specific to RDBMS backends '''
+ def test_indexTest(self):
+ self.assertEqual(self.db.sql_index_exists('_issue', '_issue_id_idx'), 1)
+ self.assertEqual(self.db.sql_index_exists('_issue', '_issue_x_idx'), 0)
+
class ClassicInitTest(unittest.TestCase):
count = 0