1 #
2 # Copyright (c) 2001 Bizar Software Pty Ltd (http://www.bizarsoftware.com.au/)
3 # This module is free software, and you may redistribute it and/or modify
4 # under the same terms as Python, so long as this copyright message and
5 # disclaimer are retained in their original form.
6 #
7 # IN NO EVENT SHALL BIZAR SOFTWARE PTY LTD BE LIABLE TO ANY PARTY FOR
8 # DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING
9 # OUT OF THE USE OF THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE
10 # POSSIBILITY OF SUCH DAMAGE.
11 #
12 # BIZAR SOFTWARE PTY LTD SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING,
13 # BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
14 # FOR A PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS"
15 # BASIS, AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
16 # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
17 #
18 # $Id: test_db.py,v 1.61 2002-10-10 08:04:46 richard Exp $
20 import unittest, os, shutil, time
22 from roundup.hyperdb import String, Password, Link, Multilink, Date, \
23 Interval, DatabaseError, Boolean, Number
24 from roundup import date, password
25 from roundup.indexer import Indexer
27 def setupSchema(db, create, module):
28 status = module.Class(db, "status", name=String())
29 status.setkey("name")
30 user = module.Class(db, "user", username=String(), password=Password(),
31 assignable=Boolean(), age=Number(), roles=String())
32 user.setkey("username")
33 file = module.FileClass(db, "file", name=String(), type=String(),
34 comment=String(indexme="yes"), fooz=Password())
35 issue = module.IssueClass(db, "issue", title=String(indexme="yes"),
36 status=Link("status"), nosy=Multilink("user"), deadline=Date(),
37 foo=Interval(), files=Multilink("file"), assignedto=Link('user'))
38 session = module.Class(db, 'session', title=String())
39 session.disableJournalling()
40 db.post_init()
41 if create:
42 user.create(username="admin", roles='Admin')
43 status.create(name="unread")
44 status.create(name="in-progress")
45 status.create(name="testing")
46 status.create(name="resolved")
47 db.commit()
49 class MyTestCase(unittest.TestCase):
50 def tearDown(self):
51 self.db.close()
52 if hasattr(self, 'db2'):
53 self.db2.close()
54 if os.path.exists('_test_dir'):
55 shutil.rmtree('_test_dir')
57 class config:
58 DATABASE='_test_dir'
59 MAILHOST = 'localhost'
60 MAIL_DOMAIN = 'fill.me.in.'
61 TRACKER_NAME = 'Roundup issue tracker'
62 TRACKER_EMAIL = 'issue_tracker@%s'%MAIL_DOMAIN
63 TRACKER_WEB = 'http://some.useful.url/'
64 ADMIN_EMAIL = 'roundup-admin@%s'%MAIL_DOMAIN
65 FILTER_POSITION = 'bottom' # one of 'top', 'bottom', 'top and bottom'
66 ANONYMOUS_ACCESS = 'deny' # either 'deny' or 'allow'
67 ANONYMOUS_REGISTER = 'deny' # either 'deny' or 'allow'
68 MESSAGES_TO_AUTHOR = 'no' # either 'yes' or 'no'
69 EMAIL_SIGNATURE_POSITION = 'bottom'
71 class anydbmDBTestCase(MyTestCase):
72 def setUp(self):
73 from roundup.backends import anydbm
74 # remove previous test, ignore errors
75 if os.path.exists(config.DATABASE):
76 shutil.rmtree(config.DATABASE)
77 os.makedirs(config.DATABASE + '/files')
78 self.db = anydbm.Database(config, 'admin')
79 setupSchema(self.db, 1, anydbm)
80 self.db2 = anydbm.Database(config, 'admin')
81 setupSchema(self.db2, 0, anydbm)
83 def testStringChange(self):
84 for commit in (0,1):
85 # test set & retrieve
86 nid = self.db.issue.create(title="spam", status='1')
87 self.assertEqual(self.db.issue.get(nid, 'title'), 'spam')
89 # change and make sure we retrieve the correct value
90 self.db.issue.set(nid, title='eggs')
91 if commit: self.db.commit()
92 self.assertEqual(self.db.issue.get(nid, 'title'), 'eggs')
94 def testStringUnset(self):
95 for commit in (0,1):
96 nid = self.db.issue.create(title="spam", status='1')
97 if commit: self.db.commit()
98 self.assertEqual(self.db.issue.get(nid, 'title'), 'spam')
99 # make sure we can unset
100 self.db.issue.set(nid, title=None)
101 if commit: self.db.commit()
102 self.assertEqual(self.db.issue.get(nid, "title"), None)
104 def testLinkChange(self):
105 for commit in (0,1):
106 nid = self.db.issue.create(title="spam", status='1')
107 if commit: self.db.commit()
108 self.assertEqual(self.db.issue.get(nid, "status"), '1')
109 self.db.issue.set(nid, status='2')
110 if commit: self.db.commit()
111 self.assertEqual(self.db.issue.get(nid, "status"), '2')
113 def testLinkUnset(self):
114 for commit in (0,1):
115 nid = self.db.issue.create(title="spam", status='1')
116 if commit: self.db.commit()
117 self.db.issue.set(nid, status=None)
118 if commit: self.db.commit()
119 self.assertEqual(self.db.issue.get(nid, "status"), None)
121 def testMultilinkChange(self):
122 for commit in (0,1):
123 u1 = self.db.user.create(username='foo%s'%commit)
124 u2 = self.db.user.create(username='bar%s'%commit)
125 nid = self.db.issue.create(title="spam", nosy=[u1])
126 if commit: self.db.commit()
127 self.assertEqual(self.db.issue.get(nid, "nosy"), [u1])
128 self.db.issue.set(nid, nosy=[])
129 if commit: self.db.commit()
130 self.assertEqual(self.db.issue.get(nid, "nosy"), [])
131 self.db.issue.set(nid, nosy=[u1,u2])
132 if commit: self.db.commit()
133 self.assertEqual(self.db.issue.get(nid, "nosy"), [u1,u2])
135 def testDateChange(self):
136 for commit in (0,1):
137 nid = self.db.issue.create(title="spam", status='1')
138 a = self.db.issue.get(nid, "deadline")
139 if commit: self.db.commit()
140 self.db.issue.set(nid, deadline=date.Date())
141 b = self.db.issue.get(nid, "deadline")
142 if commit: self.db.commit()
143 self.assertNotEqual(a, b)
144 self.assertNotEqual(b, date.Date('1970-1-1 00:00:00'))
146 def testDateUnset(self):
147 for commit in (0,1):
148 nid = self.db.issue.create(title="spam", status='1')
149 self.db.issue.set(nid, deadline=date.Date())
150 if commit: self.db.commit()
151 self.assertNotEqual(self.db.issue.get(nid, "deadline"), None)
152 self.db.issue.set(nid, deadline=None)
153 if commit: self.db.commit()
154 self.assertEqual(self.db.issue.get(nid, "deadline"), None)
156 def testIntervalChange(self):
157 for commit in (0,1):
158 nid = self.db.issue.create(title="spam", status='1')
159 if commit: self.db.commit()
160 a = self.db.issue.get(nid, "foo")
161 i = date.Interval('-1d')
162 self.db.issue.set(nid, foo=i)
163 if commit: self.db.commit()
164 self.assertNotEqual(self.db.issue.get(nid, "foo"), a)
165 self.assertEqual(i, self.db.issue.get(nid, "foo"))
166 j = date.Interval('1y')
167 self.db.issue.set(nid, foo=j)
168 if commit: self.db.commit()
169 self.assertNotEqual(self.db.issue.get(nid, "foo"), i)
170 self.assertEqual(j, self.db.issue.get(nid, "foo"))
172 def testIntervalUnset(self):
173 for commit in (0,1):
174 nid = self.db.issue.create(title="spam", status='1')
175 self.db.issue.set(nid, foo=date.Interval('-1d'))
176 if commit: self.db.commit()
177 self.assertNotEqual(self.db.issue.get(nid, "foo"), None)
178 self.db.issue.set(nid, foo=None)
179 if commit: self.db.commit()
180 self.assertEqual(self.db.issue.get(nid, "foo"), None)
182 def testBooleanChange(self):
183 userid = self.db.user.create(username='foo', assignable=1)
184 self.assertEqual(1, self.db.user.get(userid, 'assignable'))
185 self.db.user.set(userid, assignable=0)
186 self.assertEqual(self.db.user.get(userid, 'assignable'), 0)
187 self.db.user.set(userid, assignable=None)
188 self.assertEqual(self.db.user.get('1', "assignable"), None)
190 def testNumberChange(self):
191 nid = self.db.user.create(username='foo', age=1)
192 self.assertEqual(1, self.db.user.get(nid, 'age'))
193 self.db.user.set('1', age=3)
194 self.assertNotEqual(self.db.user.get('1', 'age'), 1)
195 self.db.user.set('1', age=1.0)
196 self.db.user.set('1', age=None)
197 self.assertEqual(self.db.user.get('1', "age"), None)
199 def testKeyValue(self):
200 newid = self.db.user.create(username="spam")
201 self.assertEqual(self.db.user.lookup('spam'), newid)
202 self.db.commit()
203 self.assertEqual(self.db.user.lookup('spam'), newid)
204 self.db.user.retire(newid)
205 self.assertRaises(KeyError, self.db.user.lookup, 'spam')
207 def testNewProperty(self):
208 self.db.issue.create(title="spam", status='1')
209 self.db.issue.addprop(fixer=Link("user"))
210 # force any post-init stuff to happen
211 self.db.post_init()
212 props = self.db.issue.getprops()
213 keys = props.keys()
214 keys.sort()
215 self.assertEqual(keys, ['activity', 'assignedto', 'creation',
216 'creator', 'deadline', 'files', 'fixer', 'foo', 'id', 'messages',
217 'nosy', 'status', 'superseder', 'title'])
218 self.assertEqual(self.db.issue.get('1', "fixer"), None)
220 def testRetire(self):
221 self.db.issue.create(title="spam", status='1')
222 b = self.db.status.get('1', 'name')
223 a = self.db.status.list()
224 self.db.status.retire('1')
225 # make sure the list is different
226 self.assertNotEqual(a, self.db.status.list())
227 # can still access the node if necessary
228 self.assertEqual(self.db.status.get('1', 'name'), b)
229 self.db.commit()
230 self.assertEqual(self.db.status.get('1', 'name'), b)
231 self.assertNotEqual(a, self.db.status.list())
233 def testSerialisation(self):
234 nid = self.db.issue.create(title="spam", status='1',
235 deadline=date.Date(), foo=date.Interval('-1d'))
236 self.db.commit()
237 assert isinstance(self.db.issue.get(nid, 'deadline'), date.Date)
238 assert isinstance(self.db.issue.get(nid, 'foo'), date.Interval)
239 uid = self.db.user.create(username="fozzy",
240 password=password.Password('t. bear'))
241 self.db.commit()
242 assert isinstance(self.db.user.get(uid, 'password'), password.Password)
244 def testTransactions(self):
245 # remember the number of items we started
246 num_issues = len(self.db.issue.list())
247 num_files = self.db.numfiles()
248 self.db.issue.create(title="don't commit me!", status='1')
249 self.assertNotEqual(num_issues, len(self.db.issue.list()))
250 self.db.rollback()
251 self.assertEqual(num_issues, len(self.db.issue.list()))
252 self.db.issue.create(title="please commit me!", status='1')
253 self.assertNotEqual(num_issues, len(self.db.issue.list()))
254 self.db.commit()
255 self.assertNotEqual(num_issues, len(self.db.issue.list()))
256 self.db.rollback()
257 self.assertNotEqual(num_issues, len(self.db.issue.list()))
258 self.db.file.create(name="test", type="text/plain", content="hi")
259 self.db.rollback()
260 self.assertEqual(num_files, self.db.numfiles())
261 for i in range(10):
262 self.db.file.create(name="test", type="text/plain",
263 content="hi %d"%(i))
264 self.db.commit()
265 num_files2 = self.db.numfiles()
266 self.assertNotEqual(num_files, num_files2)
267 self.db.file.create(name="test", type="text/plain", content="hi")
268 self.db.rollback()
269 self.assertNotEqual(num_files, self.db.numfiles())
270 self.assertEqual(num_files2, self.db.numfiles())
272 def testDestroyNoJournalling(self):
273 self.innerTestDestroy(klass=self.db.session)
275 def testDestroyJournalling(self):
276 self.innerTestDestroy(klass=self.db.issue)
278 def innerTestDestroy(self, klass):
279 newid = klass.create(title='Mr Friendly')
280 n = len(klass.list())
281 self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
282 klass.destroy(newid)
283 self.assertRaises(IndexError, klass.get, newid, 'title')
284 self.assertNotEqual(len(klass.list()), n)
285 if klass.do_journal:
286 self.assertRaises(IndexError, klass.history, newid)
288 # now with a commit
289 newid = klass.create(title='Mr Friendly')
290 n = len(klass.list())
291 self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
292 self.db.commit()
293 klass.destroy(newid)
294 self.assertRaises(IndexError, klass.get, newid, 'title')
295 self.db.commit()
296 self.assertRaises(IndexError, klass.get, newid, 'title')
297 self.assertNotEqual(len(klass.list()), n)
298 if klass.do_journal:
299 self.assertRaises(IndexError, klass.history, newid)
301 # now with a rollback
302 newid = klass.create(title='Mr Friendly')
303 n = len(klass.list())
304 self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
305 self.db.commit()
306 klass.destroy(newid)
307 self.assertNotEqual(len(klass.list()), n)
308 self.assertRaises(IndexError, klass.get, newid, 'title')
309 self.db.rollback()
310 self.assertEqual(klass.get(newid, 'title'), 'Mr Friendly')
311 self.assertEqual(len(klass.list()), n)
312 if klass.do_journal:
313 self.assertNotEqual(klass.history(newid), [])
315 def testExceptions(self):
316 # this tests the exceptions that should be raised
317 ar = self.assertRaises
319 #
320 # class create
321 #
322 # string property
323 ar(TypeError, self.db.status.create, name=1)
324 # invalid property name
325 ar(KeyError, self.db.status.create, foo='foo')
326 # key name clash
327 ar(ValueError, self.db.status.create, name='unread')
328 # invalid link index
329 ar(IndexError, self.db.issue.create, title='foo', status='bar')
330 # invalid link value
331 ar(ValueError, self.db.issue.create, title='foo', status=1)
332 # invalid multilink type
333 ar(TypeError, self.db.issue.create, title='foo', status='1',
334 nosy='hello')
335 # invalid multilink index type
336 ar(ValueError, self.db.issue.create, title='foo', status='1',
337 nosy=[1])
338 # invalid multilink index
339 ar(IndexError, self.db.issue.create, title='foo', status='1',
340 nosy=['10'])
342 #
343 # key property
344 #
345 # key must be a String
346 ar(TypeError, self.db.file.setkey, 'fooz')
347 # key must exist
348 ar(KeyError, self.db.file.setkey, 'fubar')
350 #
351 # class get
352 #
353 # invalid node id
354 ar(IndexError, self.db.issue.get, '99', 'title')
355 # invalid property name
356 ar(KeyError, self.db.status.get, '2', 'foo')
358 #
359 # class set
360 #
361 # invalid node id
362 ar(IndexError, self.db.issue.set, '99', title='foo')
363 # invalid property name
364 ar(KeyError, self.db.status.set, '1', foo='foo')
365 # string property
366 ar(TypeError, self.db.status.set, '1', name=1)
367 # key name clash
368 ar(ValueError, self.db.status.set, '2', name='unread')
369 # set up a valid issue for me to work on
370 id = self.db.issue.create(title="spam", status='1')
371 # invalid link index
372 ar(IndexError, self.db.issue.set, id, title='foo', status='bar')
373 # invalid link value
374 ar(ValueError, self.db.issue.set, id, title='foo', status=1)
375 # invalid multilink type
376 ar(TypeError, self.db.issue.set, id, title='foo', status='1',
377 nosy='hello')
378 # invalid multilink index type
379 ar(ValueError, self.db.issue.set, id, title='foo', status='1',
380 nosy=[1])
381 # invalid multilink index
382 ar(IndexError, self.db.issue.set, id, title='foo', status='1',
383 nosy=['10'])
384 # invalid number value
385 ar(TypeError, self.db.user.create, username='foo', age='a')
386 # invalid boolean value
387 ar(TypeError, self.db.user.create, username='foo', assignable='true')
388 nid = self.db.user.create(username='foo')
389 # invalid number value
390 ar(TypeError, self.db.user.set, nid, username='foo', age='a')
391 # invalid boolean value
392 ar(TypeError, self.db.user.set, nid, username='foo', assignable='true')
394 def testJournals(self):
395 self.db.user.create(username="mary")
396 self.db.user.create(username="pete")
397 self.db.issue.create(title="spam", status='1')
398 self.db.commit()
400 # journal entry for issue create
401 journal = self.db.getjournal('issue', '1')
402 self.assertEqual(1, len(journal))
403 (nodeid, date_stamp, journaltag, action, params) = journal[0]
404 self.assertEqual(nodeid, '1')
405 self.assertEqual(journaltag, self.db.user.lookup('admin'))
406 self.assertEqual(action, 'create')
407 keys = params.keys()
408 keys.sort()
409 self.assertEqual(keys, ['assignedto', 'deadline', 'files',
410 'foo', 'messages', 'nosy', 'status', 'superseder', 'title'])
411 self.assertEqual(None,params['deadline'])
412 self.assertEqual(None,params['foo'])
413 self.assertEqual([],params['nosy'])
414 self.assertEqual('1',params['status'])
415 self.assertEqual('spam',params['title'])
417 # journal entry for link
418 journal = self.db.getjournal('user', '1')
419 self.assertEqual(1, len(journal))
420 self.db.issue.set('1', assignedto='1')
421 self.db.commit()
422 journal = self.db.getjournal('user', '1')
423 self.assertEqual(2, len(journal))
424 (nodeid, date_stamp, journaltag, action, params) = journal[1]
425 self.assertEqual('1', nodeid)
426 self.assertEqual('1', journaltag)
427 self.assertEqual('link', action)
428 self.assertEqual(('issue', '1', 'assignedto'), params)
430 # journal entry for unlink
431 self.db.issue.set('1', assignedto='2')
432 self.db.commit()
433 journal = self.db.getjournal('user', '1')
434 self.assertEqual(3, len(journal))
435 (nodeid, date_stamp, journaltag, action, params) = journal[2]
436 self.assertEqual('1', nodeid)
437 self.assertEqual('1', journaltag)
438 self.assertEqual('unlink', action)
439 self.assertEqual(('issue', '1', 'assignedto'), params)
441 # test disabling journalling
442 # ... get the last entry
443 time.sleep(1)
444 entry = self.db.getjournal('issue', '1')[-1]
445 (x, date_stamp, x, x, x) = entry
446 self.db.issue.disableJournalling()
447 self.db.issue.set('1', title='hello world')
448 self.db.commit()
449 entry = self.db.getjournal('issue', '1')[-1]
450 (x, date_stamp2, x, x, x) = entry
451 # see if the change was journalled when it shouldn't have been
452 self.assertEqual(date_stamp, date_stamp2)
453 time.sleep(1)
454 self.db.issue.enableJournalling()
455 self.db.issue.set('1', title='hello world 2')
456 self.db.commit()
457 entry = self.db.getjournal('issue', '1')[-1]
458 (x, date_stamp2, x, x, x) = entry
459 # see if the change was journalled
460 self.assertNotEqual(date_stamp, date_stamp2)
462 def testPack(self):
463 id = self.db.issue.create(title="spam", status='1')
464 self.db.commit()
465 self.db.issue.set(id, status='2')
466 self.db.commit()
468 # sleep for at least a second, then get a date to pack at
469 time.sleep(1)
470 pack_before = date.Date('.')
472 # wait another second and add one more entry
473 time.sleep(1)
474 self.db.issue.set(id, status='3')
475 self.db.commit()
476 jlen = len(self.db.getjournal('issue', id))
478 # pack
479 self.db.pack(pack_before)
481 # we should have the create and last set entries now
482 self.assertEqual(jlen-1, len(self.db.getjournal('issue', id)))
484 def testIDGeneration(self):
485 id1 = self.db.issue.create(title="spam", status='1')
486 id2 = self.db2.issue.create(title="eggs", status='2')
487 self.assertNotEqual(id1, id2)
489 def testSearching(self):
490 self.db.file.create(content='hello', type="text/plain")
491 self.db.file.create(content='world', type="text/frozz",
492 comment='blah blah')
493 self.db.issue.create(files=['1', '2'], title="flebble plop")
494 self.db.issue.create(title="flebble frooz")
495 self.db.commit()
496 self.assertEquals(self.db.indexer.search(['hello'], self.db.issue),
497 {'1': {'files': ['1']}})
498 self.assertEquals(self.db.indexer.search(['world'], self.db.issue), {})
499 self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue),
500 {'2': {}})
501 self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
502 {'2': {}, '1': {}})
504 def testReindexing(self):
505 self.db.issue.create(title="frooz")
506 self.db.commit()
507 self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue),
508 {'1': {}})
509 self.db.issue.set('1', title="dooble")
510 self.db.commit()
511 self.assertEquals(self.db.indexer.search(['dooble'], self.db.issue),
512 {'1': {}})
513 self.assertEquals(self.db.indexer.search(['frooz'], self.db.issue), {})
515 def testForcedReindexing(self):
516 self.db.issue.create(title="flebble frooz")
517 self.db.commit()
518 self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
519 {'1': {}})
520 self.db.indexer.quiet = 1
521 self.db.indexer.force_reindex()
522 self.db.post_init()
523 self.db.indexer.quiet = 9
524 self.assertEquals(self.db.indexer.search(['flebble'], self.db.issue),
525 {'1': {}})
527 #
528 # searching tests follow
529 #
530 def testFind(self):
531 self.db.user.create(username='test')
532 ids = []
533 ids.append(self.db.issue.create(status="1", nosy=['1']))
534 oddid = self.db.issue.create(status="2", nosy=['2'])
535 ids.append(self.db.issue.create(status="1", nosy=['1','2']))
536 self.db.issue.create(status="3", nosy=['1'])
537 ids.sort()
539 # should match first and third
540 got = self.db.issue.find(status='1')
541 got.sort()
542 self.assertEqual(got, ids)
544 # none
545 self.assertEqual(self.db.issue.find(status='4'), [])
547 # should match first three
548 got = self.db.issue.find(status='1', nosy='2')
549 got.sort()
550 ids.append(oddid)
551 ids.sort()
552 self.assertEqual(got, ids)
554 # none
555 self.assertEqual(self.db.issue.find(status='4', nosy='3'), [])
557 def testStringFind(self):
558 ids = []
559 ids.append(self.db.issue.create(title="spam"))
560 self.db.issue.create(title="not spam")
561 ids.append(self.db.issue.create(title="spam"))
562 ids.sort()
563 got = self.db.issue.stringFind(title='spam')
564 got.sort()
565 self.assertEqual(got, ids)
566 self.assertEqual(self.db.issue.stringFind(title='fubar'), [])
568 def filteringSetup(self):
569 for user in (
570 {'username': 'bleep'},
571 {'username': 'blop'},
572 {'username': 'blorp'}):
573 self.db.user.create(**user)
574 iss = self.db.issue
575 for issue in (
576 {'title': 'issue one', 'status': '2'},
577 {'title': 'issue two', 'status': '1'},
578 {'title': 'issue three', 'status': '1', 'nosy': ['1','2']}):
579 self.db.issue.create(**issue)
580 self.db.commit()
581 return self.assertEqual, self.db.issue.filter
583 def testFilteringString(self):
584 ae, filt = self.filteringSetup()
585 ae(filt(None, {'title': 'issue one'}, ('+','id'), (None,None)), ['1'])
586 ae(filt(None, {'title': 'issue'}, ('+','id'), (None,None)),
587 ['1','2','3'])
589 def testFilteringLink(self):
590 ae, filt = self.filteringSetup()
591 ae(filt(None, {'status': '1'}, ('+','id'), (None,None)), ['2','3'])
593 def testFilteringMultilink(self):
594 ae, filt = self.filteringSetup()
595 ae(filt(None, {'nosy': '2'}, ('+','id'), (None,None)), ['3'])
597 def testFilteringMany(self):
598 ae, filt = self.filteringSetup()
599 ae(filt(None, {'nosy': '2', 'status': '1'}, ('+','id'), (None,None)),
600 ['3'])
602 class anydbmReadOnlyDBTestCase(MyTestCase):
603 def setUp(self):
604 from roundup.backends import anydbm
605 # remove previous test, ignore errors
606 if os.path.exists(config.DATABASE):
607 shutil.rmtree(config.DATABASE)
608 os.makedirs(config.DATABASE + '/files')
609 db = anydbm.Database(config, 'admin')
610 setupSchema(db, 1, anydbm)
611 self.db = anydbm.Database(config)
612 setupSchema(self.db, 0, anydbm)
613 self.db2 = anydbm.Database(config, 'admin')
614 setupSchema(self.db2, 0, anydbm)
616 def testExceptions(self):
617 # this tests the exceptions that should be raised
618 ar = self.assertRaises
620 # this tests the exceptions that should be raised
621 ar(DatabaseError, self.db.status.create, name="foo")
622 ar(DatabaseError, self.db.status.set, '1', name="foo")
623 ar(DatabaseError, self.db.status.retire, '1')
626 class bsddbDBTestCase(anydbmDBTestCase):
627 def setUp(self):
628 from roundup.backends import bsddb
629 # remove previous test, ignore errors
630 if os.path.exists(config.DATABASE):
631 shutil.rmtree(config.DATABASE)
632 os.makedirs(config.DATABASE + '/files')
633 self.db = bsddb.Database(config, 'admin')
634 setupSchema(self.db, 1, bsddb)
635 self.db2 = bsddb.Database(config, 'admin')
636 setupSchema(self.db2, 0, bsddb)
638 class bsddbReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
639 def setUp(self):
640 from roundup.backends import bsddb
641 # remove previous test, ignore errors
642 if os.path.exists(config.DATABASE):
643 shutil.rmtree(config.DATABASE)
644 os.makedirs(config.DATABASE + '/files')
645 db = bsddb.Database(config, 'admin')
646 setupSchema(db, 1, bsddb)
647 self.db = bsddb.Database(config)
648 setupSchema(self.db, 0, bsddb)
649 self.db2 = bsddb.Database(config, 'admin')
650 setupSchema(self.db2, 0, bsddb)
653 class bsddb3DBTestCase(anydbmDBTestCase):
654 def setUp(self):
655 from roundup.backends import bsddb3
656 # remove previous test, ignore errors
657 if os.path.exists(config.DATABASE):
658 shutil.rmtree(config.DATABASE)
659 os.makedirs(config.DATABASE + '/files')
660 self.db = bsddb3.Database(config, 'admin')
661 setupSchema(self.db, 1, bsddb3)
662 self.db2 = bsddb3.Database(config, 'admin')
663 setupSchema(self.db2, 0, bsddb3)
665 class bsddb3ReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
666 def setUp(self):
667 from roundup.backends import bsddb3
668 # remove previous test, ignore errors
669 if os.path.exists(config.DATABASE):
670 shutil.rmtree(config.DATABASE)
671 os.makedirs(config.DATABASE + '/files')
672 db = bsddb3.Database(config, 'admin')
673 setupSchema(db, 1, bsddb3)
674 self.db = bsddb3.Database(config)
675 setupSchema(self.db, 0, bsddb3)
676 self.db2 = bsddb3.Database(config, 'admin')
677 setupSchema(self.db2, 0, bsddb3)
680 class gadflyDBTestCase(anydbmDBTestCase):
681 ''' Gadfly doesn't support multiple connections to the one local
682 database
683 '''
684 def setUp(self):
685 from roundup.backends import gadfly
686 # remove previous test, ignore errors
687 if os.path.exists(config.DATABASE):
688 shutil.rmtree(config.DATABASE)
689 config.GADFLY_DATABASE = ('test', config.DATABASE)
690 os.makedirs(config.DATABASE + '/files')
691 self.db = gadfly.Database(config, 'admin')
692 setupSchema(self.db, 1, gadfly)
694 def testIDGeneration(self):
695 id1 = self.db.issue.create(title="spam", status='1')
696 id2 = self.db.issue.create(title="eggs", status='2')
697 self.assertNotEqual(id1, id2)
699 def testFilteringString(self):
700 ae, filt = self.filteringSetup()
701 ae(filt(None, {'title': 'issue one'}, ('+','id'), (None,None)), ['1'])
702 # XXX gadfly can't do substring LIKE searches
703 #ae(filt(None, {'title': 'issue'}, ('+','id'), (None,None)),
704 # ['1','2','3'])
706 class gadflyReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
707 def setUp(self):
708 from roundup.backends import gadfly
709 # remove previous test, ignore errors
710 if os.path.exists(config.DATABASE):
711 shutil.rmtree(config.DATABASE)
712 config.GADFLY_DATABASE = ('test', config.DATABASE)
713 os.makedirs(config.DATABASE + '/files')
714 db = gadfly.Database(config, 'admin')
715 setupSchema(db, 1, gadfly)
716 self.db = gadfly.Database(config)
717 setupSchema(self.db, 0, gadfly)
720 class sqliteDBTestCase(anydbmDBTestCase):
721 def setUp(self):
722 from roundup.backends import sqlite
723 # remove previous test, ignore errors
724 if os.path.exists(config.DATABASE):
725 shutil.rmtree(config.DATABASE)
726 os.makedirs(config.DATABASE + '/files')
727 self.db = sqlite.Database(config, 'admin')
728 setupSchema(self.db, 1, sqlite)
730 def testIDGeneration(self):
731 pass
733 class sqliteReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
734 def setUp(self):
735 from roundup.backends import sqlite
736 # remove previous test, ignore errors
737 if os.path.exists(config.DATABASE):
738 shutil.rmtree(config.DATABASE)
739 os.makedirs(config.DATABASE + '/files')
740 db = sqlite.Database(config, 'admin')
741 setupSchema(db, 1, sqlite)
742 self.db = sqlite.Database(config)
743 setupSchema(self.db, 0, sqlite)
746 class metakitDBTestCase(anydbmDBTestCase):
747 def setUp(self):
748 from roundup.backends import metakit
749 import weakref
750 metakit._instances = weakref.WeakValueDictionary()
751 # remove previous test, ignore errors
752 if os.path.exists(config.DATABASE):
753 shutil.rmtree(config.DATABASE)
754 os.makedirs(config.DATABASE + '/files')
755 self.db = metakit.Database(config, 'admin')
756 setupSchema(self.db, 1, metakit)
758 def testIDGeneration(self):
759 id1 = self.db.issue.create(title="spam", status='1')
760 id2 = self.db.issue.create(title="eggs", status='2')
761 self.assertNotEqual(id1, id2)
763 def testTransactions(self):
764 # remember the number of items we started
765 num_issues = len(self.db.issue.list())
766 self.db.issue.create(title="don't commit me!", status='1')
767 self.assertNotEqual(num_issues, len(self.db.issue.list()))
768 self.db.rollback()
769 self.assertEqual(num_issues, len(self.db.issue.list()))
770 self.db.issue.create(title="please commit me!", status='1')
771 self.assertNotEqual(num_issues, len(self.db.issue.list()))
772 self.db.commit()
773 self.assertNotEqual(num_issues, len(self.db.issue.list()))
774 self.db.rollback()
775 self.assertNotEqual(num_issues, len(self.db.issue.list()))
776 self.db.file.create(name="test", type="text/plain", content="hi")
777 self.db.rollback()
778 num_files = len(self.db.file.list())
779 for i in range(10):
780 self.db.file.create(name="test", type="text/plain",
781 content="hi %d"%(i))
782 self.db.commit()
783 # TODO: would be good to be able to ensure the file is not on disk after
784 # a rollback...
785 num_files2 = len(self.db.file.list())
786 self.assertNotEqual(num_files, num_files2)
787 self.db.file.create(name="test", type="text/plain", content="hi")
788 num_rfiles = len(os.listdir(self.db.config.DATABASE + '/files/file/0'))
789 self.db.rollback()
790 num_rfiles2 = len(os.listdir(self.db.config.DATABASE + '/files/file/0'))
791 self.assertEqual(num_files2, len(self.db.file.list()))
792 self.assertEqual(num_rfiles2, num_rfiles-1)
794 class metakitReadOnlyDBTestCase(anydbmReadOnlyDBTestCase):
795 def setUp(self):
796 from roundup.backends import metakit
797 import weakref
798 metakit._instances = weakref.WeakValueDictionary()
799 # remove previous test, ignore errors
800 if os.path.exists(config.DATABASE):
801 shutil.rmtree(config.DATABASE)
802 os.makedirs(config.DATABASE + '/files')
803 db = metakit.Database(config, 'admin')
804 setupSchema(db, 1, metakit)
805 self.db = metakit.Database(config)
806 setupSchema(self.db, 0, metakit)
808 def suite():
809 l = [
810 unittest.makeSuite(anydbmDBTestCase, 'test'),
811 unittest.makeSuite(anydbmReadOnlyDBTestCase, 'test')
812 ]
813 # return unittest.TestSuite(l)
815 from roundup import backends
816 if hasattr(backends, 'gadfly'):
817 l.append(unittest.makeSuite(gadflyDBTestCase, 'test'))
818 l.append(unittest.makeSuite(gadflyReadOnlyDBTestCase, 'test'))
820 if hasattr(backends, 'sqlite'):
821 l.append(unittest.makeSuite(sqliteDBTestCase, 'test'))
822 l.append(unittest.makeSuite(sqliteReadOnlyDBTestCase, 'test'))
824 if hasattr(backends, 'bsddb'):
825 l.append(unittest.makeSuite(bsddbDBTestCase, 'test'))
826 l.append(unittest.makeSuite(bsddbReadOnlyDBTestCase, 'test'))
828 if hasattr(backends, 'bsddb3'):
829 l.append(unittest.makeSuite(bsddb3DBTestCase, 'test'))
830 l.append(unittest.makeSuite(bsddb3ReadOnlyDBTestCase, 'test'))
832 if hasattr(backends, 'metakit'):
833 l.append(unittest.makeSuite(metakitDBTestCase, 'test'))
834 l.append(unittest.makeSuite(metakitReadOnlyDBTestCase, 'test'))
836 return unittest.TestSuite(l)
838 # vim: set filetype=python ts=4 sw=4 et si