1 # Copyright (c) 2002 ekit.com Inc (http://www.ekit-inc.com/)
2 #
3 # Permission is hereby granted, free of charge, to any person obtaining a copy
4 # of this software and associated documentation files (the "Software"), to deal
5 # in the Software without restriction, including without limitation the rights
6 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7 # copies of the Software, and to permit persons to whom the Software is
8 # furnished to do so, subject to the following conditions:
9 #
10 # The above copyright notice and this permission notice shall be included in
11 # all copies or substantial portions of the Software.
12 #
13 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19 # SOFTWARE.
21 # $Id: test_indexer.py,v 1.13 2008-09-11 19:10:30 schlatterbeck Exp $
23 import os, unittest, shutil
25 from roundup.backends import get_backend, have_backend
26 from roundup.backends.indexer_rdbms import Indexer
28 # borrow from other tests
29 from db_test_base import setupSchema, config
30 from test_postgresql import postgresqlOpener
31 from test_mysql import mysqlOpener
32 from test_sqlite import sqliteOpener
34 class db:
35 class config(dict):
36 DATABASE = 'test-index'
37 config = config()
38 config[('main', 'indexer_stopwords')] = []
40 class IndexerTest(unittest.TestCase):
41 def setUp(self):
42 if os.path.exists('test-index'):
43 shutil.rmtree('test-index')
44 os.mkdir('test-index')
45 os.mkdir('test-index/files')
46 from roundup.backends.indexer_dbm import Indexer
47 self.dex = Indexer(db)
48 self.dex.load_index()
50 def assertSeqEqual(self, s1, s2):
51 # First argument is the db result we're testing, second is the
52 # desired result. Some db results don't have iterable rows, so we
53 # have to work around that.
54 # Also work around some dbs not returning items in the expected
55 # order.
56 s1 = list([tuple([r[n] for n in range(len(r))]) for r in s1])
57 s1.sort()
58 if s1 != s2:
59 self.fail('contents of %r != %r'%(s1, s2))
61 def test_basics(self):
62 self.dex.add_text(('test', '1', 'foo'), 'a the hello world')
63 self.dex.add_text(('test', '2', 'foo'), 'blah blah the world')
64 self.assertSeqEqual(self.dex.find(['world']), [('test', '1', 'foo'),
65 ('test', '2', 'foo')])
66 self.assertSeqEqual(self.dex.find(['blah']), [('test', '2', 'foo')])
67 self.assertSeqEqual(self.dex.find(['blah', 'hello']), [])
69 def test_change(self):
70 self.dex.add_text(('test', '1', 'foo'), 'a the hello world')
71 self.dex.add_text(('test', '2', 'foo'), 'blah blah the world')
72 self.assertSeqEqual(self.dex.find(['world']), [('test', '1', 'foo'),
73 ('test', '2', 'foo')])
74 self.dex.add_text(('test', '1', 'foo'), 'a the hello')
75 self.assertSeqEqual(self.dex.find(['world']), [('test', '2', 'foo')])
77 def test_clear(self):
78 self.dex.add_text(('test', '1', 'foo'), 'a the hello world')
79 self.dex.add_text(('test', '2', 'foo'), 'blah blah the world')
80 self.assertSeqEqual(self.dex.find(['world']), [('test', '1', 'foo'),
81 ('test', '2', 'foo')])
82 self.dex.add_text(('test', '1', 'foo'), '')
83 self.assertSeqEqual(self.dex.find(['world']), [('test', '2', 'foo')])
85 def test_stopwords(self):
86 """Test that we can find a text with a stopword in it."""
87 stopword = "with"
88 self.assert_(self.dex.is_stopword(stopword.upper()))
89 self.dex.add_text(('test', '1', 'bar'), '%s hello world' % stopword)
90 self.dex.add_text(('test', '2', 'bar'), 'blah a %s world' % stopword)
91 self.dex.add_text(('test', '3', 'bar'), 'blah Blub river')
92 self.dex.add_text(('test', '4', 'bar'), 'blah river %s' % stopword)
93 self.assertSeqEqual(self.dex.find(['with','world']),
94 [('test', '1', 'bar'),
95 ('test', '2', 'bar')])
96 def test_extremewords(self):
97 """Testing too short or too long words."""
98 short = "b"
99 long = "abcdefghijklmnopqrstuvwxyz"
100 self.dex.add_text(('test', '1', 'a'), '%s hello world' % short)
101 self.dex.add_text(('test', '2', 'a'), 'blah a %s world' % short)
102 self.dex.add_text(('test', '3', 'a'), 'blah Blub river')
103 self.dex.add_text(('test', '4', 'a'), 'blah river %s %s'
104 % (short, long))
105 self.assertSeqEqual(self.dex.find([short,'world', long, short]),
106 [('test', '1', 'a'),
107 ('test', '2', 'a')])
108 self.assertSeqEqual(self.dex.find([long]),[])
110 # special test because some faulty code indexed length(word)>=2
111 # but only considered length(word)>=3 to be significant
112 self.dex.add_text(('test', '5', 'a'), 'blah py %s %s'
113 % (short, long))
114 self.assertSeqEqual(self.dex.find(["py"]), [('test', '5', 'a')])
116 def test_casesensitity(self):
117 """Test if searches are case-in-sensitive."""
118 self.dex.add_text(('test', '1', 'a'), 'aaaa bbbb')
119 self.dex.add_text(('test', '2', 'a'), 'aAaa BBBB')
120 self.assertSeqEqual(self.dex.find(['aaaa']),
121 [('test', '1', 'a'),
122 ('test', '2', 'a')])
123 self.assertSeqEqual(self.dex.find(['BBBB']),
124 [('test', '1', 'a'),
125 ('test', '2', 'a')])
127 def test_wordsplitting(self):
128 """Test if word splitting works."""
129 self.dex.add_text(('test', '1', 'a'), 'aaaa-aaa bbbb*bbb')
130 self.dex.add_text(('test', '2', 'a'), 'aaaA-aaa BBBB*BBB')
131 for k in 'aaaa', 'aaa', 'bbbb', 'bbb':
132 self.assertSeqEqual(self.dex.find([k]),
133 [('test', '1', 'a'), ('test', '2', 'a')])
135 def tearDown(self):
136 shutil.rmtree('test-index')
138 class XapianIndexerTest(IndexerTest):
139 def setUp(self):
140 if os.path.exists('test-index'):
141 shutil.rmtree('test-index')
142 os.mkdir('test-index')
143 from roundup.backends.indexer_xapian import Indexer
144 self.dex = Indexer(db)
145 def tearDown(self):
146 shutil.rmtree('test-index')
148 class RDBMSIndexerTest(IndexerTest):
149 def setUp(self):
150 # remove previous test, ignore errors
151 if os.path.exists(config.DATABASE):
152 shutil.rmtree(config.DATABASE)
153 self.db = self.module.Database(config, 'admin')
154 self.dex = Indexer(self.db)
155 def tearDown(self):
156 if hasattr(self, 'db'):
157 self.db.close()
158 if os.path.exists(config.DATABASE):
159 shutil.rmtree(config.DATABASE)
161 class postgresqlIndexerTest(postgresqlOpener, RDBMSIndexerTest):
162 def setUp(self):
163 postgresqlOpener.setUp(self)
164 RDBMSIndexerTest.setUp(self)
165 def tearDown(self):
166 RDBMSIndexerTest.tearDown(self)
167 postgresqlOpener.tearDown(self)
169 class mysqlIndexerTest(mysqlOpener, RDBMSIndexerTest):
170 def setUp(self):
171 mysqlOpener.setUp(self)
172 RDBMSIndexerTest.setUp(self)
173 def tearDown(self):
174 RDBMSIndexerTest.tearDown(self)
175 mysqlOpener.tearDown(self)
177 class sqliteIndexerTest(sqliteOpener, RDBMSIndexerTest):
178 pass
180 def test_suite():
181 suite = unittest.TestSuite()
183 suite.addTest(unittest.makeSuite(IndexerTest))
185 try:
186 import xapian
187 suite.addTest(unittest.makeSuite(XapianIndexerTest))
188 except ImportError:
189 print "Skipping Xapian indexer tests"
190 pass
192 if have_backend('postgresql'):
193 # make sure we start with a clean slate
194 if postgresqlOpener.module.db_exists(config):
195 postgresqlOpener.module.db_nuke(config, 1)
196 suite.addTest(unittest.makeSuite(postgresqlIndexerTest))
197 else:
198 print "Skipping postgresql indexer tests"
200 if have_backend('mysql'):
201 # make sure we start with a clean slate
202 if mysqlOpener.module.db_exists(config):
203 mysqlOpener.module.db_nuke(config)
204 suite.addTest(unittest.makeSuite(mysqlIndexerTest))
205 else:
206 print "Skipping mysql indexer tests"
208 if have_backend('sqlite'):
209 # make sure we start with a clean slate
210 if sqliteOpener.module.db_exists(config):
211 sqliteOpener.module.db_nuke(config)
212 suite.addTest(unittest.makeSuite(sqliteIndexerTest))
213 else:
214 print "Skipping sqlite indexer tests"
216 return suite
218 if __name__ == '__main__':
219 runner = unittest.TextTestRunner()
220 unittest.main(testRunner=runner)
222 # vim: set filetype=python ts=4 sw=4 et si