Code

fixed nasty sorting bug that was lowercasing properties
[roundup.git] / test / test_db.py
1 #
2 # Copyright (c) 2001 Bizar Software Pty Ltd (http://www.bizarsoftware.com.au/)
3 # This module is free software, and you may redistribute it and/or modify
4 # under the same terms as Python, so long as this copyright message and
5 # disclaimer are retained in their original form.
6 #
7 # IN NO EVENT SHALL BIZAR SOFTWARE PTY LTD BE LIABLE TO ANY PARTY FOR
8 # DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING
9 # OUT OF THE USE OF THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE
10 # POSSIBILITY OF SUCH DAMAGE.
11 #
12 # BIZAR SOFTWARE PTY LTD SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING,
13 # BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
14 # FOR A PARTICULAR PURPOSE.  THE CODE PROVIDED HEREUNDER IS ON AN "AS IS"
15 # BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
16 # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
17
18 # $Id: test_db.py,v 1.62 2002-11-06 11:45:25 richard Exp $ 
20 import unittest, os, shutil, time
22 from roundup.hyperdb import String, Password, Link, Multilink, Date, \
23     Interval, DatabaseError, Boolean, Number
24 from roundup import date, password
25 from roundup.indexer import Indexer
27 def setupSchema(db, create, module):
28     status = module.Class(db, "status", name=String())
29     status.setkey("name")
30     user = module.Class(db, "user", username=String(), password=Password(),
31         assignable=Boolean(), age=Number(), roles=String())
32     user.setkey("username")
33     file = module.FileClass(db, "file", name=String(), type=String(),
34         comment=String(indexme="yes"), fooz=Password())
35     issue = module.IssueClass(db, "issue", title=String(indexme="yes"),
36         status=Link("status"), nosy=Multilink("user"), deadline=Date(),
37         foo=Interval(), files=Multilink("file"), assignedto=Link('user'))
38     session = module.Class(db, 'session', title=String())
39     session.disableJournalling()
40     db.post_init()
41     if create:
42         user.create(username="admin", roles='Admin')
43         status.create(name="unread")
44         status.create(name="in-progress")
45         status.create(name="testing")
46         status.create(name="resolved")
47     db.commit()
49 class MyTestCase(unittest.TestCase):
50     def tearDown(self):
51         self.db.close()
52         if hasattr(self, 'db2'):
53             self.db2.close()
54         if os.path.exists('_test_dir'):
55             shutil.rmtree('_test_dir')
57 class config:
58     DATABASE='_test_dir'
59     MAILHOST = 'localhost'
60     MAIL_DOMAIN = 'fill.me.in.'
61     TRACKER_NAME = 'Roundup issue tracker'
62     TRACKER_EMAIL = 'issue_tracker@%s'%MAIL_DOMAIN
63     TRACKER_WEB = 'http://some.useful.url/'
64     ADMIN_EMAIL = 'roundup-admin@%s'%MAIL_DOMAIN
65     FILTER_POSITION = 'bottom'      # one of 'top', 'bottom', 'top and bottom'
66     ANONYMOUS_ACCESS = 'deny'       # either 'deny' or 'allow'
67     ANONYMOUS_REGISTER = 'deny'     # either 'deny' or 'allow'
68     MESSAGES_TO_AUTHOR = 'no'       # either 'yes' or 'no'
69     EMAIL_SIGNATURE_POSITION = 'bottom'
71 class anydbmDBTestCase(MyTestCase):
72     def setUp(self):
73         from roundup.backends import anydbm
74         # remove previous test, ignore errors
75         if os.path.exists(config.DATABASE):
76             shutil.rmtree(config.DATABASE)
77         os.makedirs(config.DATABASE + '/files')
78         self.db = anydbm.Database(config, 'admin')
79         setupSchema(self.db, 1, anydbm)
80         self.db2 = anydbm.Database(config, 'admin')
81         setupSchema(self.db2, 0, anydbm)
83     def testStringChange(self):
84         for commit in (0,1):
85             # test set & retrieve
86             nid = self.db.issue.create(title="spam", status='1')
87             self.assertEqual(self.db.issue.get(nid, 'title'), 'spam')
89             # change and make sure we retrieve the correct value
90             self.db.issue.set(nid, title='eggs')
91             if commit: self.db.commit()
92             self.assertEqual(self.db.issue.get(nid, 'title'), 'eggs')
94     def testStringUnset(self):
95         for commit in (0,1):
96             nid = self.db.issue.create(title="spam", status='1')
97             if commit: self.db.commit()
98             self.assertEqual(self.db.issue.get(nid, 'title'), 'spam')
99             # make sure we can unset
100             self.db.issue.set(nid, title=None)
101             if commit: self.db.commit()
102             self.assertEqual(self.db.issue.get(nid, "title"), None)
104     def testLinkChange(self):
105         for commit in (0,1):
106             nid = self.db.issue.create(title="spam", status='1')
107             if commit: self.db.commit()
108             self.assertEqual(self.db.issue.get(nid, "status"), '1')
109             self.db.issue.set(nid, status='2')
110             if commit: self.db.commit()
111             self.assertEqual(self.db.issue.get(nid, "status"), '2')
113     def testLinkUnset(self):
114         for commit in (0,1):
115             nid = self.db.issue.create(title="spam", status='1')
116             if commit: self.db.commit()
117             self.db.issue.set(nid, status=None)
118             if commit: self.db.commit()
119             self.assertEqual(self.db.issue.get(nid, "status"), None)
121     def testMultilinkChange(self):
122         for commit in (0,1):
123             u1 = self.db.user.create(username='foo%s'%commit)
124             u2 = self.db.user.create(username='bar%s'%commit)
125             nid = self.db.issue.create(title="spam", nosy=[u1])
126             if commit: self.db.commit()
127             self.assertEqual(self.db.issue.get(nid, "nosy"), [u1])
128             self.db.issue.set(nid, nosy=[])
129             if commit: self.db.commit()
130             self.assertEqual(self.db.issue.get(nid, "nosy"), [])
131             self.db.issue.set(nid, nosy=[u1,u2])
132             if commit: self.db.commit()
133             self.assertEqual(self.db.issue.get(nid, "nosy"), [u1,u2])
135     def testDateChange(self):
136         for commit in (0,1):
137             nid = self.db.issue.create(title="spam", status='1')
138             a = self.db.issue.get(nid, "deadline")
139             if commit: self.db.commit()
140             self.db.issue.set(nid, deadline=date.Date())
141             b = self.db.issue.get(nid, "deadline")
142             if commit: self.db.commit()
143             self.assertNotEqual(a, b)
144             self.assertNotEqual(b, date.Date('1970-1-1 00:00:00'))
146     def testDateUnset(self):
147         for commit in (0,1):
148             nid = self.db.issue.create(title="spam", status='1')
149             self.db.issue.set(nid, deadline=date.Date())
150             if commit: self.db.commit()
151             self.assertNotEqual(self.db.issue.get(nid, "deadline"), None)
152             self.db.issue.set(nid, deadline=None)
153             if commit: self.db.commit()
154             self.assertEqual(self.db.issue.get(nid, "deadline"), None)
156     def testIntervalChange(self):
157         for commit in (0,1):
158             nid = self.db.issue.create(title="spam", status='1')
159             if commit: self.db.commit()
160             a = self.db.issue.get(nid, "foo")
161             i = date.Interval('-1d')
162             self.db.issue.set(nid, foo=i)
163             if commit: self.db.commit()
164             self.assertNotEqual(self.db.issue.get(nid, "foo"), a)
165             self.assertEqual(i, self.db.issue.get(nid, "foo"))
166             j = date.Interval('1y')
167             self.db.issue.set(nid, foo=j)
168             if commit: self.db.commit()
169             self.assertNotEqual(self.db.issue.get(nid, "foo"), i)
170             self.assertEqual(j, self.db.issue.get(nid, "foo"))
172     def testIntervalUnset(self):
173         for commit in (0,1):
174             nid = self.db.issue.create(title="spam", status='1')
175             self.db.issue.set(nid, foo=date.Interval('-1d'))
176             if commit: self.db.commit()
177             self.assertNotEqual(self.db.issue.get(nid, "foo"), None)
178             self.db.issue.set(nid, foo=None)
179             if commit: self.db.commit()
180             self.assertEqual(self.db.issue.get(nid, "foo"), None)
182     def testBooleanChange(self):
183         userid = self.db.user.create(username='foo', assignable=1)
184         self.assertEqual(1, self.db.user.get(userid, 'assignable'))
185         self.db.user.set(userid, assignable=0)
186         self.assertEqual(self.db.user.get(userid, 'assignable'), 0)
187         self.db.user.set(userid, assignable=None)
188         self.assertEqual(self.db.user.get('1', "assignable"), None)
190     def testNumberChange(self):
191         nid = self.db.user.create(username='foo', age=1)
192         self.assertEqual(1, self.db.user.get(nid, 'age'))
193         self.db.user.set('1', age=3)
194         self.assertNotEqual(self.db.user.get('1', 'age'), 1)
195         self.db.user.set('1', age=1.0)
196         self.db.user.set('1', age=None)
197         self.assertEqual(self.db.user.get('1', "age"), None)
199     def testKeyValue(self):
200         newid = self.db.user.create(username="spam")
201         self.assertEqual(self.db.user.lookup('spam'), newid)
202         self.db.commit()
203         self.assertEqual(self.db.user.lookup('spam'), newid)
204         self.db.user.retire(newid)
205         self.assertRaises(KeyError, self.db.user.lookup, 'spam')
207     def testNewProperty(self):
208         self.db.issue.create(title="spam", status='1')
209         self.db.issue.addprop(fixer=Link("user"))
210         # force any post-init stuff to happen
211         self.db.post_init()
212         props = self.db.issue.getprops()
213         keys = props.keys()
214         keys.sort()
215         self.assertEqual(keys, ['activity', 'assignedto', 'creation',
216             'creator', 'deadline', 'files', 'fixer', 'foo', 'id', 'messages',
217             'nosy', 'status', 'superseder', 'title'])
218         self.assertEqual(self.db.issue.get('1', "fixer"), None)
220     def testRetire(self):
221         self.db.issue.create(title="spam", status='1')
222         b = self.db.status.get('1', 'name')
223         a = self.db.status.list()
224         self.db.status.retire('1')
225         # make sure the list is different 
226         self.assertNotEqual(a, self.db.status.list())
227         # can still access the node if necessary
228         self.assertEqual(self.db.status.get('1', 'name'), b)
229         self.db.commit()
230         self.assertEqual(self.db.status.get('1', 'name'), b)
231         self.assertNotEqual(a, self.db.status.list())
233     def testSerialisation(self):
234         nid = self.db.issue.create(title="spam", status='1',
235             deadline=date.Date(), foo=date.Interval('-1d'))
236         self.db.commit()
237         assert isinstance(self.db.issue.get(nid, 'deadline'), date.Date)
238         assert isinstance(self.db.issue.get(nid, 'foo'), date.Interval)
239         uid = self.db.user.create(username="fozzy",
240             password=password.Password('t. bear'))
241         self.db.commit()
242         assert isinstance(self.db.user.get(uid, 'password'), password.Password)
244     def testTransactions(self):
245         # remember the number of items we started
246         num_issues = len(self.db.issue.list())
247         num_files = self.db.numfiles()
248         self.db.issue.create(title="don't commit me!", status='1')
249         self.assertNotEqual(num_issues, len(self.db.issue.list()))
250         self.db.rollback()
251         self.assertEqual(num_issues, len(self.db.issue.list()))
252         self.db.issue.create(title="please commit me!", status='1')
253         self.assertNotEqual(num_issues, len(self.db.issue.list()))
254         self.db.commit()
255         self.assertNotEqual(num_issues, len(self.db.issue.list()))
256         self.db.rollback()
257         self.assertNotEqual(num_issues, len(self.db.issue.list()))
258         self.db.file.create(name="test", type="text/plain", content="hi")
259         self.db.rollback()
260         self.assertEqual(num_files, self.db.numfiles())
261         for i in range(10):
262             self.db.file.create(name="test", type="text/plain", 
263                     content="hi %d"%(i))
264             self.db.commit()
265         num_files2 = self.db.numfiles()
266         self.assertNotEqual(num_files, num_files2)
267         self.db.file.create(name="test", type="text/plain", content="hi")
268         self.db.rollback()
269         self.assertNotEqual(num_files, self.db.numfiles())
270         self.assertEqual(num_files2, self.db.numfiles())
272     def testDestroyNoJournalling(self):
273         self.innerTestDestroy(klass=self.db.session)
275     def testDestroyJournalling(self):
276         self.innerTestDestroy(klass=self.db.issue)
278     def innerTestDestroy(self, klass):
279         newid = klass.create(title='Mr Friendly')
280         n = len(klass.list())
281         self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
282         klass.destroy(newid)
283         self.assertRaises(IndexError, klass.get, newid, 'title')
284         self.assertNotEqual(len(klass.list()), n)
285         if klass.do_journal:
286             self.assertRaises(IndexError, klass.history, newid)
288         # now with a commit
289         newid = klass.create(title='Mr Friendly')
290         n = len(klass.list())
291         self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
292         self.db.commit()
293         klass.destroy(newid)
294         self.assertRaises(IndexError, klass.get, newid, 'title')
295         self.db.commit()
296         self.assertRaises(IndexError, klass.get, newid, 'title')
297         self.assertNotEqual(len(klass.list()), n)
298         if klass.do_journal:
299             self.assertRaises(IndexError, klass.history, newid)
301         # now with a rollback
302         newid = klass.create(title='Mr Friendly')
303         n = len(klass.list())
304         self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
305         self.db.commit()
306         klass.destroy(newid)
307         self.assertNotEqual(len(klass.list()), n)
308         self.assertRaises(IndexError, klass.get, newid, 'title')
309         self.db.rollback()
310         self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
311         self.assertEqual(len(klass.list()), n)
312         if klass.do_journal:
313             self.assertNotEqual(klass.history(newid), [])
315     def testExceptions(self):
316         # this tests the exceptions that should be raised
317         ar = self.assertRaises
319         #
320         # class create
321         #
322         # string property
323         ar(TypeError, self.db.status.create, name=1)
324         # invalid property name
325         ar(KeyError, self.db.status.create, foo='foo')
326         # key name clash
327         ar(ValueError, self.db.status.create, name='unread')
328         # invalid link index
329         ar(IndexError, self.db.issue.create, title='foo', status='bar')
330         # invalid link value
331         ar(ValueError, self.db.issue.create, title='foo', status=1)
332         # invalid multilink type
333         ar(TypeError, self.db.issue.create, title='foo', status='1',
334             nosy='hello')
335         # invalid multilink index type
336         ar(ValueError, self.db.issue.create, title='foo', status='1',
337             nosy=[1])
338         # invalid multilink index
339         ar(IndexError, self.db.issue.create, title='foo', status='1',
340             nosy=['10'])
342         #
343         # key property
344         # 
345         # key must be a String
346         ar(TypeError, self.db.file.setkey, 'fooz')
347         # key must exist
348         ar(KeyError, self.db.file.setkey, 'fubar')
350         #
351         # class get
352         #
353         # invalid node id
354         ar(IndexError, self.db.issue.get, '99', 'title')
355         # invalid property name
356         ar(KeyError, self.db.status.get, '2', 'foo')
358         #
359         # class set
360         #
361         # invalid node id
362         ar(IndexError, self.db.issue.set, '99', title='foo')
363         # invalid property name
364         ar(KeyError, self.db.status.set, '1', foo='foo')
365         # string property
366         ar(TypeError, self.db.status.set, '1', name=1)
367         # key name clash
368         ar(ValueError, self.db.status.set, '2', name='unread')
369         # set up a valid issue for me to work on
370         id = self.db.issue.create(title="spam", status='1')
371         # invalid link index
372         ar(IndexError, self.db.issue.set, id, title='foo', status='bar')
373         # invalid link value
374         ar(ValueError, self.db.issue.set, id, title='foo', status=1)
375         # invalid multilink type
376         ar(TypeError, self.db.issue.set, id, title='foo', status='1',
377             nosy='hello')
378         # invalid multilink index type
379         ar(ValueError, self.db.issue.set, id, title='foo', status='1',
380             nosy=[1])
381         # invalid multilink index
382         ar(IndexError, self.db.issue.set, id, title='foo', status='1',
383             nosy=['10'])
384         # invalid number value
385         ar(TypeError, self.db.user.create, username='foo', age='a')
386         # invalid boolean value
387         ar(TypeError, self.db.user.create, username='foo', assignable='true')
388         nid = self.db.user.create(username='foo')
389         # invalid number value
390         ar(TypeError, self.db.user.set, nid, username='foo', age='a')
391         # invalid boolean value
392         ar(TypeError, self.db.user.set, nid, username='foo', assignable='true')
394     def testJournals(self):
395         self.db.user.create(username="mary")
396         self.db.user.create(username="pete")
397         self.db.issue.create(title="spam", status='1')
398         self.db.commit()
400         # journal entry for issue create
401         journal = self.db.getjournal('issue', '1')
402         self.assertEqual(1, len(journal))
403         (nodeid, date_stamp, journaltag, action, params) = journal[0]
404         self.assertEqual(nodeid, '1')
405         self.assertEqual(journaltag, self.db.user.lookup('admin'))
406         self.assertEqual(action, 'create')
407         keys = params.keys()
408         keys.sort()
409         self.assertEqual(keys, [])
411         # journal entry for link
412         journal = self.db.getjournal('user', '1')
413         self.assertEqual(1, len(journal))
414         self.db.issue.set('1', assignedto='1')
415         self.db.commit()
416         journal = self.db.getjournal('user', '1')
417         self.assertEqual(2, len(journal))
418         (nodeid, date_stamp, journaltag, action, params) = journal[1]
419         self.assertEqual('1', nodeid)
420         self.assertEqual('1', journaltag)
421         self.assertEqual('link', action)
422         self.assertEqual(('issue', '1', 'assignedto'), params)
424         # journal entry for unlink
425         self.db.issue.set('1', assignedto='2')
426         self.db.commit()
427         journal = self.db.getjournal('user', '1')
428         self.assertEqual(3, len(journal))
429         (nodeid, date_stamp, journaltag, action, params) = journal[2]
430         self.assertEqual('1', nodeid)
431         self.assertEqual('1', journaltag)
432         self.assertEqual('unlink', action)
433         self.assertEqual(('issue', '1', 'assignedto'), params)
435         # test disabling journalling
436         # ... get the last entry
437         time.sleep(1)
438         entry = self.db.getjournal('issue', '1')[-1]
439         (x, date_stamp, x, x, x) = entry
440         self.db.issue.disableJournalling()
441         self.db.issue.set('1', title='hello world')
442         self.db.commit()
443         entry = self.db.getjournal('issue', '1')[-1]
444         (x, date_stamp2, x, x, x) = entry
445         # see if the change was journalled when it shouldn't have been
446         self.assertEqual(date_stamp, date_stamp2)
447         time.sleep(1)
448         self.db.issue.enableJournalling()
449         self.db.issue.set('1', title='hello world 2')
450         self.db.commit()
451         entry = self.db.getjournal('issue', '1')[-1]
452         (x, date_stamp2, x, x, x) = entry
453         # see if the change was journalled
454         self.assertNotEqual(date_stamp, date_stamp2)
456     def testPack(self):
457         id = self.db.issue.create(title="spam", status='1')
458         self.db.commit()
459         self.db.issue.set(id, status='2')
460         self.db.commit()
462         # sleep for at least a second, then get a date to pack at
463         time.sleep(1)
464         pack_before = date.Date('.')
466         # wait another second and add one more entry
467         time.sleep(1)
468         self.db.issue.set(id, status='3')
469         self.db.commit()
470         jlen = len(self.db.getjournal('issue', id))
472         # pack
473         self.db.pack(pack_before)
475         # we should have the create and last set entries now
476         self.assertEqual(jlen-1, len(self.db.getjournal('issue', id)))
478     def testIDGeneration(self):
479         id1 = self.db.issue.create(title="spam", status='1')
480         id2 = self.db2.issue.create(title="eggs", status='2')
481         self.assertNotEqual(id1, id2)
483     def testSearching(self):
484         self.db.file.create(content='hello', type="text/plain")
485         self.db.file.create(content='world', type="text/frozz",
486             comment='blah blah')
487         self.db.issue.create(files=['1', '2'], title="flebble plop")
488         self.db.issue.create(title="flebble frooz")
489         self.db.commit()
490         self.assertEquals(self.db.indexer.search(['hello'], self.db.issue),
491             {'1': {'files': ['1']}})
492         self.assertEquals(self.db.indexer.search(['world'], self.db.issue), {})
493         self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue),
494             {'2': {}})
495         self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
496             {'2': {}, '1': {}})
498     def testReindexing(self):
499         self.db.issue.create(title="frooz")
500         self.db.commit()
501         self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue),
502             {'1': {}})
503         self.db.issue.set('1', title="dooble")
504         self.db.commit()
505         self.assertEquals(self.db.indexer.search(['dooble'], self.db.issue),
506             {'1': {}})
507         self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue), {})
509     def testForcedReindexing(self):
510         self.db.issue.create(title="flebble frooz")
511         self.db.commit()
512         self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
513             {'1': {}})
514         self.db.indexer.quiet = 1
515         self.db.indexer.force_reindex()
516         self.db.post_init()
517         self.db.indexer.quiet = 9
518         self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
519             {'1': {}})
521     #
522     # searching tests follow
523     #
524     def testFind(self):
525         self.db.user.create(username='test')
526         ids = []
527         ids.append(self.db.issue.create(status="1", nosy=['1']))
528         oddid = self.db.issue.create(status="2", nosy=['2'])
529         ids.append(self.db.issue.create(status="1", nosy=['1','2']))
530         self.db.issue.create(status="3", nosy=['1'])
531         ids.sort()
533         # should match first and third
534         got = self.db.issue.find(status='1')
535         got.sort()
536         self.assertEqual(got, ids)
538         # none
539         self.assertEqual(self.db.issue.find(status='4'), [])
541         # should match first three
542         got = self.db.issue.find(status='1', nosy='2')
543         got.sort()
544         ids.append(oddid)
545         ids.sort()
546         self.assertEqual(got, ids)
548         # none
549         self.assertEqual(self.db.issue.find(status='4', nosy='3'), [])
551     def testStringFind(self):
552         ids = []
553         ids.append(self.db.issue.create(title="spam"))
554         self.db.issue.create(title="not spam")
555         ids.append(self.db.issue.create(title="spam"))
556         ids.sort()
557         got = self.db.issue.stringFind(title='spam')
558         got.sort()
559         self.assertEqual(got, ids)
560         self.assertEqual(self.db.issue.stringFind(title='fubar'), [])
562     def filteringSetup(self):
563         for user in (
564                 {'username': 'bleep'},
565                 {'username': 'blop'},
566                 {'username': 'blorp'}):
567             self.db.user.create(**user)
568         iss = self.db.issue
569         for issue in (
570                 {'title': 'issue one', 'status': '2'},
571                 {'title': 'issue two', 'status': '1'},
572                 {'title': 'issue three', 'status': '1', 'nosy': ['1','2']}):
573             self.db.issue.create(**issue)
574         self.db.commit()
575         return self.assertEqual, self.db.issue.filter
577     def testFilteringString(self):
578         ae, filt = self.filteringSetup()
579         ae(filt(None, {'title': 'issue one'}, ('+','id'), (None,None)), ['1'])
580         ae(filt(None, {'title': 'issue'}, ('+','id'), (None,None)),
581             ['1','2','3'])
583     def testFilteringLink(self):
584         ae, filt = self.filteringSetup()
585         ae(filt(None, {'status': '1'}, ('+','id'), (None,None)), ['2','3'])
587     def testFilteringMultilink(self):
588         ae, filt = self.filteringSetup()
589         ae(filt(None, {'nosy': '2'}, ('+','id'), (None,None)), ['3'])
591     def testFilteringMany(self):
592         ae, filt = self.filteringSetup()
593         ae(filt(None, {'nosy': '2', 'status': '1'}, ('+','id'), (None,None)),
594             ['3'])
596 class anydbmReadOnlyDBTestCase(MyTestCase):
597     def setUp(self):
598         from roundup.backends import anydbm
599         # remove previous test, ignore errors
600         if os.path.exists(config.DATABASE):
601             shutil.rmtree(config.DATABASE)
602         os.makedirs(config.DATABASE + '/files')
603         db = anydbm.Database(config, 'admin')
604         setupSchema(db, 1, anydbm)
605         self.db = anydbm.Database(config)
606         setupSchema(self.db, 0, anydbm)
607         self.db2 = anydbm.Database(config, 'admin')
608         setupSchema(self.db2, 0, anydbm)
610     def testExceptions(self):
611         # this tests the exceptions that should be raised
612         ar = self.assertRaises
614         # this tests the exceptions that should be raised
615         ar(DatabaseError, self.db.status.create, name="foo")
616         ar(DatabaseError, self.db.status.set, '1', name="foo")
617         ar(DatabaseError, self.db.status.retire, '1')
620 class bsddbDBTestCase(anydbmDBTestCase):
621     def setUp(self):
622         from roundup.backends import bsddb
623         # remove previous test, ignore errors
624         if os.path.exists(config.DATABASE):
625             shutil.rmtree(config.DATABASE)
626         os.makedirs(config.DATABASE + '/files')
627         self.db = bsddb.Database(config, 'admin')
628         setupSchema(self.db, 1, bsddb)
629         self.db2 = bsddb.Database(config, 'admin')
630         setupSchema(self.db2, 0, bsddb)
632 class bsddbReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
633     def setUp(self):
634         from roundup.backends import bsddb
635         # remove previous test, ignore errors
636         if os.path.exists(config.DATABASE):
637             shutil.rmtree(config.DATABASE)
638         os.makedirs(config.DATABASE + '/files')
639         db = bsddb.Database(config, 'admin')
640         setupSchema(db, 1, bsddb)
641         self.db = bsddb.Database(config)
642         setupSchema(self.db, 0, bsddb)
643         self.db2 = bsddb.Database(config, 'admin')
644         setupSchema(self.db2, 0, bsddb)
647 class bsddb3DBTestCase(anydbmDBTestCase):
648     def setUp(self):
649         from roundup.backends import bsddb3
650         # remove previous test, ignore errors
651         if os.path.exists(config.DATABASE):
652             shutil.rmtree(config.DATABASE)
653         os.makedirs(config.DATABASE + '/files')
654         self.db = bsddb3.Database(config, 'admin')
655         setupSchema(self.db, 1, bsddb3)
656         self.db2 = bsddb3.Database(config, 'admin')
657         setupSchema(self.db2, 0, bsddb3)
659 class bsddb3ReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
660     def setUp(self):
661         from roundup.backends import bsddb3
662         # remove previous test, ignore errors
663         if os.path.exists(config.DATABASE):
664             shutil.rmtree(config.DATABASE)
665         os.makedirs(config.DATABASE + '/files')
666         db = bsddb3.Database(config, 'admin')
667         setupSchema(db, 1, bsddb3)
668         self.db = bsddb3.Database(config)
669         setupSchema(self.db, 0, bsddb3)
670         self.db2 = bsddb3.Database(config, 'admin')
671         setupSchema(self.db2, 0, bsddb3)
674 class gadflyDBTestCase(anydbmDBTestCase):
675     ''' Gadfly doesn't support multiple connections to the one local
676         database
677     '''
678     def setUp(self):
679         from roundup.backends import gadfly
680         # remove previous test, ignore errors
681         if os.path.exists(config.DATABASE):
682             shutil.rmtree(config.DATABASE)
683         config.GADFLY_DATABASE = ('test', config.DATABASE)
684         os.makedirs(config.DATABASE + '/files')
685         self.db = gadfly.Database(config, 'admin')
686         setupSchema(self.db, 1, gadfly)
688     def testIDGeneration(self):
689         id1 = self.db.issue.create(title="spam", status='1')
690         id2 = self.db.issue.create(title="eggs", status='2')
691         self.assertNotEqual(id1, id2)
693     def testFilteringString(self):
694         ae, filt = self.filteringSetup()
695         ae(filt(None, {'title': 'issue one'}, ('+','id'), (None,None)), ['1'])
696         # XXX gadfly can't do substring LIKE searches
697         #ae(filt(None, {'title': 'issue'}, ('+','id'), (None,None)),
698         #    ['1','2','3'])
700 class gadflyReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
701     def setUp(self):
702         from roundup.backends import gadfly
703         # remove previous test, ignore errors
704         if os.path.exists(config.DATABASE):
705             shutil.rmtree(config.DATABASE)
706         config.GADFLY_DATABASE = ('test', config.DATABASE)
707         os.makedirs(config.DATABASE + '/files')
708         db = gadfly.Database(config, 'admin')
709         setupSchema(db, 1, gadfly)
710         self.db = gadfly.Database(config)
711         setupSchema(self.db, 0, gadfly)
714 class sqliteDBTestCase(anydbmDBTestCase):
715     def setUp(self):
716         from roundup.backends import sqlite
717         # remove previous test, ignore errors
718         if os.path.exists(config.DATABASE):
719             shutil.rmtree(config.DATABASE)
720         os.makedirs(config.DATABASE + '/files')
721         self.db = sqlite.Database(config, 'admin')
722         setupSchema(self.db, 1, sqlite)
724     def testIDGeneration(self):
725         pass
727 class sqliteReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
728     def setUp(self):
729         from roundup.backends import sqlite
730         # remove previous test, ignore errors
731         if os.path.exists(config.DATABASE):
732             shutil.rmtree(config.DATABASE)
733         os.makedirs(config.DATABASE + '/files')
734         db = sqlite.Database(config, 'admin')
735         setupSchema(db, 1, sqlite)
736         self.db = sqlite.Database(config)
737         setupSchema(self.db, 0, sqlite)
740 class metakitDBTestCase(anydbmDBTestCase):
741     def setUp(self):
742         from roundup.backends import metakit
743         import weakref
744         metakit._instances = weakref.WeakValueDictionary()
745         # remove previous test, ignore errors
746         if os.path.exists(config.DATABASE):
747             shutil.rmtree(config.DATABASE)
748         os.makedirs(config.DATABASE + '/files')
749         self.db = metakit.Database(config, 'admin')
750         setupSchema(self.db, 1, metakit)
752     def testIDGeneration(self):
753         id1 = self.db.issue.create(title="spam", status='1')
754         id2 = self.db.issue.create(title="eggs", status='2')
755         self.assertNotEqual(id1, id2)
757     def testTransactions(self):
758         # remember the number of items we started
759         num_issues = len(self.db.issue.list())
760         self.db.issue.create(title="don't commit me!", status='1')
761         self.assertNotEqual(num_issues, len(self.db.issue.list()))
762         self.db.rollback()
763         self.assertEqual(num_issues, len(self.db.issue.list()))
764         self.db.issue.create(title="please commit me!", status='1')
765         self.assertNotEqual(num_issues, len(self.db.issue.list()))
766         self.db.commit()
767         self.assertNotEqual(num_issues, len(self.db.issue.list()))
768         self.db.rollback()
769         self.assertNotEqual(num_issues, len(self.db.issue.list()))
770         self.db.file.create(name="test", type="text/plain", content="hi")
771         self.db.rollback()
772         num_files = len(self.db.file.list())
773         for i in range(10):
774             self.db.file.create(name="test", type="text/plain", 
775                     content="hi %d"%(i))
776             self.db.commit()
777         # TODO: would be good to be able to ensure the file is not on disk after
778         # a rollback...
779         num_files2 = len(self.db.file.list())
780         self.assertNotEqual(num_files, num_files2)
781         self.db.file.create(name="test", type="text/plain", content="hi")
782         num_rfiles = len(os.listdir(self.db.config.DATABASE + '/files/file/0'))
783         self.db.rollback()
784         num_rfiles2 = len(os.listdir(self.db.config.DATABASE + '/files/file/0'))
785         self.assertEqual(num_files2, len(self.db.file.list()))
786         self.assertEqual(num_rfiles2, num_rfiles-1)
788 class metakitReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
789     def setUp(self):
790         from roundup.backends import metakit
791         import weakref
792         metakit._instances = weakref.WeakValueDictionary()
793         # remove previous test, ignore errors
794         if os.path.exists(config.DATABASE):
795             shutil.rmtree(config.DATABASE)
796         os.makedirs(config.DATABASE + '/files')
797         db = metakit.Database(config, 'admin')
798         setupSchema(db, 1, metakit)
799         self.db = metakit.Database(config)
800         setupSchema(self.db, 0, metakit)
802 def suite():
803     l = [
804          unittest.makeSuite(anydbmDBTestCase, 'test'),
805          unittest.makeSuite(anydbmReadOnlyDBTestCase, 'test')
806     ]
807 #    return unittest.TestSuite(l)
809     from roundup import backends
810     if hasattr(backends, 'gadfly'):
811         l.append(unittest.makeSuite(gadflyDBTestCase, 'test'))
812         l.append(unittest.makeSuite(gadflyReadOnlyDBTestCase, 'test'))
814     if hasattr(backends, 'sqlite'):
815         l.append(unittest.makeSuite(sqliteDBTestCase, 'test'))
816         l.append(unittest.makeSuite(sqliteReadOnlyDBTestCase, 'test'))
818     if hasattr(backends, 'bsddb'):
819         l.append(unittest.makeSuite(bsddbDBTestCase, 'test'))
820         l.append(unittest.makeSuite(bsddbReadOnlyDBTestCase, 'test'))
822     if hasattr(backends, 'bsddb3'):
823         l.append(unittest.makeSuite(bsddb3DBTestCase, 'test'))
824         l.append(unittest.makeSuite(bsddb3ReadOnlyDBTestCase, 'test'))
826     if hasattr(backends, 'metakit'):
827         l.append(unittest.makeSuite(metakitDBTestCase, 'test'))
828         l.append(unittest.makeSuite(metakitReadOnlyDBTestCase, 'test'))
830     return unittest.TestSuite(l)
832 # vim: set filetype=python ts=4 sw=4 et si