Code

Multilinks can be filtered by combining elements with AND, OR and NOT
[roundup.git] / roundup / backends / back_mysql.py
1 #
2 # Copyright (c) 2003 Martynas Sklyzmantas, Andrey Lebedev <andrey@micro.lt>
3 #
4 # This module is free software, and you may redistribute it and/or modify
5 # under the same terms as Python, so long as this copyright message and
6 # disclaimer are retained in their original form.
7 #
9 '''This module defines a backend implementation for MySQL.
12 How to implement AUTO_INCREMENT:
14 mysql> create table foo (num integer auto_increment primary key, name
15 varchar(255)) AUTO_INCREMENT=1 ENGINE=InnoDB;
17 ql> insert into foo (name) values ('foo5');
18 Query OK, 1 row affected (0.00 sec)
20 mysql> SELECT num FROM foo WHERE num IS NULL;
21 +-----+
22 | num |
23 +-----+
24 |   4 |
25 +-----+
26 1 row in set (0.00 sec)
28 mysql> SELECT num FROM foo WHERE num IS NULL;
29 Empty set (0.00 sec)
31 NOTE: we don't need an index on the id column if it's PRIMARY KEY
33 '''
34 __docformat__ = 'restructuredtext'
36 from roundup.backends.rdbms_common import *
37 from roundup.backends import rdbms_common
38 import MySQLdb
39 import os, shutil
40 from MySQLdb.constants import ER
41 import logging
43 def connection_dict(config, dbnamestr=None):
44     d = rdbms_common.connection_dict(config, dbnamestr)
45     if d.has_key('password'):
46         d['passwd'] = d['password']
47         del d['password']
48     if d.has_key('port'):
49         d['port'] = int(d['port'])
50     return d
52 def db_nuke(config):
53     """Clear all database contents and drop database itself"""
54     if db_exists(config):
55         kwargs = connection_dict(config)
56         conn = MySQLdb.connect(**kwargs)
57         try:
58             conn.select_db(config.RDBMS_NAME)
59         except:
60             # no, it doesn't exist
61             pass
62         else:
63             cursor = conn.cursor()
64             cursor.execute("SHOW TABLES")
65             tables = cursor.fetchall()
66             # stupid MySQL bug requires us to drop all the tables first
67             for table in tables:
68                 command = 'DROP TABLE `%s`'%table[0]
69                 logging.debug(command)
70                 cursor.execute(command)
71             command = "DROP DATABASE %s"%config.RDBMS_NAME
72             logging.info(command)
73             cursor.execute(command)
74             conn.commit()
75         conn.close()
77     if os.path.exists(config.DATABASE):
78         shutil.rmtree(config.DATABASE)
80 def db_create(config):
81     """Create the database."""
82     kwargs = connection_dict(config)
83     conn = MySQLdb.connect(**kwargs)
84     cursor = conn.cursor()
85     command = "CREATE DATABASE %s"%config.RDBMS_NAME
86     logging.info(command)
87     cursor.execute(command)
88     conn.commit()
89     conn.close()
91 def db_exists(config):
92     """Check if database already exists."""
93     kwargs = connection_dict(config)
94     conn = MySQLdb.connect(**kwargs)
95     try:
96         try:
97             conn.select_db(config.RDBMS_NAME)
98         except MySQLdb.OperationalError:
99             return 0
100     finally:
101         conn.close()
102     return 1
105 class Database(Database):
106     arg = '%s'
108     # used by some code to switch styles of query
109     implements_intersect = 0
111     # Backend for MySQL to use.
112     # InnoDB is faster, but if you're running <4.0.16 then you'll need to
113     # use BDB to pass all unit tests.
114     mysql_backend = 'InnoDB'
115     #mysql_backend = 'BDB'
117     hyperdb_to_sql_datatypes = {
118         hyperdb.String : 'TEXT',
119         hyperdb.Date   : 'DATETIME',
120         hyperdb.Link   : 'INTEGER',
121         hyperdb.Interval  : 'VARCHAR(255)',
122         hyperdb.Password  : 'VARCHAR(255)',
123         hyperdb.Boolean   : 'BOOL',
124         hyperdb.Number    : 'REAL',
125     }
127     hyperdb_to_sql_value = {
128         hyperdb.String : str,
129         # no fractional seconds for MySQL
130         hyperdb.Date   : lambda x: x.formal(sep=' '),
131         hyperdb.Link   : int,
132         hyperdb.Interval  : str,
133         hyperdb.Password  : str,
134         hyperdb.Boolean   : int,
135         hyperdb.Number    : lambda x: x,
136         hyperdb.Multilink : lambda x: x,    # used in journal marshalling
137     }
139     def sql_open_connection(self):
140         kwargs = connection_dict(self.config, 'db')
141         self.log_info('open database %r'%(kwargs['db'],))
142         try:
143             conn = MySQLdb.connect(**kwargs)
144         except MySQLdb.OperationalError, message:
145             raise DatabaseError, message
146         cursor = conn.cursor()
147         cursor.execute("SET AUTOCOMMIT=0")
148         cursor.execute("START TRANSACTION")
149         return (conn, cursor)
151     def open_connection(self):
152         # make sure the database actually exists
153         if not db_exists(self.config):
154             db_create(self.config)
156         self.conn, self.cursor = self.sql_open_connection()
158         try:
159             self.load_dbschema()
160         except MySQLdb.OperationalError, message:
161             if message[0] != ER.NO_DB_ERROR:
162                 raise
163         except MySQLdb.ProgrammingError, message:
164             if message[0] != ER.NO_SUCH_TABLE:
165                 raise DatabaseError, message
166             self.init_dbschema()
167             self.sql("CREATE TABLE `schema` (`schema` TEXT) ENGINE=%s"%
168                 self.mysql_backend)
169             self.sql('''CREATE TABLE ids (name VARCHAR(255),
170                 num INTEGER) ENGINE=%s'''%self.mysql_backend)
171             self.sql('create index ids_name_idx on ids(name)')
172             self.create_version_2_tables()
174     def load_dbschema(self):
175         ''' Load the schema definition that the database currently implements
176         '''
177         self.cursor.execute('select `schema` from `schema`')
178         schema = self.cursor.fetchone()
179         if schema:
180             self.database_schema = eval(schema[0])
181         else:
182             self.database_schema = {}
184     def save_dbschema(self):
185         ''' Save the schema definition that the database currently implements
186         '''
187         s = repr(self.database_schema)
188         self.sql('delete from `schema`')
189         self.sql('insert into `schema` values (%s)', (s,))
191     def create_version_2_tables(self):
192         # OTK store
193         self.sql('''CREATE TABLE otks (otk_key VARCHAR(255),
194             otk_value TEXT, otk_time FLOAT(20))
195             ENGINE=%s'''%self.mysql_backend)
196         self.sql('CREATE INDEX otks_key_idx ON otks(otk_key)')
198         # Sessions store
199         self.sql('''CREATE TABLE sessions (session_key VARCHAR(255),
200             session_time FLOAT(20), session_value TEXT)
201             ENGINE=%s'''%self.mysql_backend)
202         self.sql('''CREATE INDEX sessions_key_idx ON
203             sessions(session_key)''')
205         # full-text indexing store
206         self.sql('''CREATE TABLE __textids (_class VARCHAR(255),
207             _itemid VARCHAR(255), _prop VARCHAR(255), _textid INT)
208             ENGINE=%s'''%self.mysql_backend)
209         self.sql('''CREATE TABLE __words (_word VARCHAR(30),
210             _textid INT) ENGINE=%s'''%self.mysql_backend)
211         self.sql('CREATE INDEX words_word_ids ON __words(_word)')
212         self.sql('CREATE INDEX words_by_id ON __words (_textid)')
213         self.sql('CREATE UNIQUE INDEX __textids_by_props ON '
214                  '__textids (_class, _itemid, _prop)')
215         sql = 'insert into ids (name, num) values (%s,%s)'%(self.arg, self.arg)
216         self.sql(sql, ('__textids', 1))
218     def add_new_columns_v2(self):
219         '''While we're adding the actor column, we need to update the
220         tables to have the correct datatypes.'''
221         for klass in self.classes.values():
222             cn = klass.classname
223             properties = klass.getprops()
224             old_spec = self.database_schema['tables'][cn]
226             # figure the non-Multilink properties to copy over
227             propnames = ['activity', 'creation', 'creator']
229             # figure actions based on data type
230             for name, s_prop in old_spec[1]:
231                 # s_prop is a repr() string of a hyperdb type object
232                 if s_prop.find('Multilink') == -1:
233                     if properties.has_key(name):
234                         propnames.append(name)
235                     continue
236                 tn = '%s_%s'%(cn, name)
238                 if properties.has_key(name):
239                     # grabe the current values
240                     sql = 'select linkid, nodeid from %s'%tn
241                     self.sql(sql)
242                     rows = self.cursor.fetchall()
244                 # drop the old table
245                 self.drop_multilink_table_indexes(cn, name)
246                 sql = 'drop table %s'%tn
247                 self.sql(sql)
249                 if properties.has_key(name):
250                     # re-create and populate the new table
251                     self.create_multilink_table(klass, name)
252                     sql = '''insert into %s (linkid, nodeid) values
253                         (%s, %s)'''%(tn, self.arg, self.arg)
254                     for linkid, nodeid in rows:
255                         self.sql(sql, (int(linkid), int(nodeid)))
257             # figure the column names to fetch
258             fetch = ['_%s'%name for name in propnames]
260             # select the data out of the old table
261             fetch.append('id')
262             fetch.append('__retired__')
263             fetchcols = ','.join(fetch)
264             sql = 'select %s from _%s'%(fetchcols, cn)
265             self.sql(sql)
267             # unserialise the old data
268             olddata = []
269             propnames = propnames + ['id', '__retired__']
270             cols = []
271             first = 1
272             for entry in self.cursor.fetchall():
273                 l = []
274                 olddata.append(l)
275                 for i in range(len(propnames)):
276                     name = propnames[i]
277                     v = entry[i]
279                     if name in ('id', '__retired__'):
280                         if first:
281                             cols.append(name)
282                         l.append(int(v))
283                         continue
284                     if first:
285                         cols.append('_' + name)
286                     prop = properties[name]
287                     if isinstance(prop, Date) and v is not None:
288                         v = date.Date(v)
289                     elif isinstance(prop, Interval) and v is not None:
290                         v = date.Interval(v)
291                     elif isinstance(prop, Password) and v is not None:
292                         v = password.Password(encrypted=v)
293                     elif (isinstance(prop, Boolean) or
294                             isinstance(prop, Number)) and v is not None:
295                         v = float(v)
297                     # convert to new MySQL data type
298                     prop = properties[name]
299                     if v is not None:
300                         e = self.to_sql_value(prop.__class__)(v)
301                     else:
302                         e = None
303                     l.append(e)
305                     # Intervals store the seconds value too
306                     if isinstance(prop, Interval):
307                         if first:
308                             cols.append('__' + name + '_int__')
309                         if v is not None:
310                             l.append(v.as_seconds())
311                         else:
312                             l.append(e)
313                 first = 0
315             self.drop_class_table_indexes(cn, old_spec[0])
317             # drop the old table
318             self.sql('drop table _%s'%cn)
320             # create the new table
321             self.create_class_table(klass)
323             # do the insert of the old data
324             args = ','.join([self.arg for x in cols])
325             cols = ','.join(cols)
326             sql = 'insert into _%s (%s) values (%s)'%(cn, cols, args)
327             for entry in olddata:
328                 self.sql(sql, tuple(entry))
330             # now load up the old journal data to migrate it
331             cols = ','.join('nodeid date tag action params'.split())
332             sql = 'select %s from %s__journal'%(cols, cn)
333             self.sql(sql)
335             # data conversions
336             olddata = []
337             for nodeid, journaldate, journaltag, action, params in \
338                     self.cursor.fetchall():
339                 #nodeid = int(nodeid)
340                 journaldate = date.Date(journaldate)
341                 #params = eval(params)
342                 olddata.append((nodeid, journaldate, journaltag, action,
343                     params))
345             # drop journal table and indexes
346             self.drop_journal_table_indexes(cn)
347             sql = 'drop table %s__journal'%cn
348             self.sql(sql)
350             # re-create journal table
351             self.create_journal_table(klass)
352             dc = self.to_sql_value(hyperdb.Date)
353             for nodeid, journaldate, journaltag, action, params in olddata:
354                 self.save_journal(cn, cols, nodeid, dc(journaldate),
355                     journaltag, action, params)
357             # make sure the normal schema update code doesn't try to
358             # change things
359             self.database_schema['tables'][cn] = klass.schema()
361     def fix_version_2_tables(self):
362         # Convert journal date column to TIMESTAMP, params column to TEXT
363         self._convert_journal_tables()
365         # Convert all String properties to TEXT
366         self._convert_string_properties()
368     def __repr__(self):
369         return '<myroundsql 0x%x>'%id(self)
371     def sql_fetchone(self):
372         return self.cursor.fetchone()
374     def sql_fetchall(self):
375         return self.cursor.fetchall()
377     def sql_index_exists(self, table_name, index_name):
378         self.sql('show index from %s'%table_name)
379         for index in self.cursor.fetchall():
380             if index[2] == index_name:
381                 return 1
382         return 0
384     def create_class_table(self, spec, create_sequence=1):
385         cols, mls = self.determine_columns(spec.properties.items())
387         # add on our special columns
388         cols.append(('id', 'INTEGER PRIMARY KEY'))
389         cols.append(('__retired__', 'INTEGER DEFAULT 0'))
391         # create the base table
392         scols = ','.join(['%s %s'%x for x in cols])
393         sql = 'create table _%s (%s) ENGINE=%s'%(spec.classname, scols,
394             self.mysql_backend)
395         self.sql(sql)
397         self.create_class_table_indexes(spec)
398         return cols, mls
400     def create_class_table_indexes(self, spec):
401         ''' create the class table for the given spec
402         '''
403         # create __retired__ index
404         index_sql2 = 'create index _%s_retired_idx on _%s(__retired__)'%(
405                         spec.classname, spec.classname)
406         self.sql(index_sql2)
408         # create index for key property
409         if spec.key:
410             if isinstance(spec.properties[spec.key], String):
411                 idx = spec.key + '(255)'
412             else:
413                 idx = spec.key
414             index_sql3 = 'create index _%s_%s_idx on _%s(_%s)'%(
415                         spec.classname, spec.key,
416                         spec.classname, idx)
417             self.sql(index_sql3)
419         # TODO: create indexes on (selected?) Link property columns, as
420         # they're more likely to be used for lookup
422     def add_class_key_required_unique_constraint(self, cn, key):
423         # mysql requires sizes on TEXT indexes
424         prop = self.classes[cn].getprops()[key]
425         if isinstance(prop, String):
426             sql = '''create unique index _%s_key_retired_idx
427                 on _%s(__retired__, _%s(255))'''%(cn, cn, key)
428         else:
429             sql = '''create unique index _%s_key_retired_idx
430                 on _%s(__retired__, _%s)'''%(cn, cn, key)
431         self.sql(sql)
433     def create_class_table_key_index(self, cn, key):
434         # mysql requires sizes on TEXT indexes
435         prop = self.classes[cn].getprops()[key]
436         if isinstance(prop, String):
437             sql = 'create index _%s_%s_idx on _%s(_%s(255))'%(cn, key, cn, key)
438         else:
439             sql = 'create index _%s_%s_idx on _%s(_%s)'%(cn, key, cn, key)
440         self.sql(sql)
442     def drop_class_table_indexes(self, cn, key):
443         # drop the old table indexes first
444         l = ['_%s_id_idx'%cn, '_%s_retired_idx'%cn]
445         if key:
446             l.append('_%s_%s_idx'%(cn, key))
448         table_name = '_%s'%cn
449         for index_name in l:
450             if not self.sql_index_exists(table_name, index_name):
451                 continue
452             index_sql = 'drop index %s on %s'%(index_name, table_name)
453             self.sql(index_sql)
455     def create_journal_table(self, spec):
456         ''' create the journal table for a class given the spec and
457             already-determined cols
458         '''
459         # journal table
460         cols = ','.join(['%s varchar'%x
461             for x in 'nodeid date tag action params'.split()])
462         sql = '''create table %s__journal (
463             nodeid integer, date datetime, tag varchar(255),
464             action varchar(255), params text) ENGINE=%s'''%(
465             spec.classname, self.mysql_backend)
466         self.sql(sql)
467         self.create_journal_table_indexes(spec)
469     def drop_journal_table_indexes(self, classname):
470         index_name = '%s_journ_idx'%classname
471         if not self.sql_index_exists('%s__journal'%classname, index_name):
472             return
473         index_sql = 'drop index %s on %s__journal'%(index_name, classname)
474         self.sql(index_sql)
476     def create_multilink_table(self, spec, ml):
477         sql = '''CREATE TABLE `%s_%s` (linkid VARCHAR(255),
478             nodeid VARCHAR(255)) ENGINE=%s'''%(spec.classname, ml,
479                 self.mysql_backend)
480         self.sql(sql)
481         self.create_multilink_table_indexes(spec, ml)
483     def drop_multilink_table_indexes(self, classname, ml):
484         l = [
485             '%s_%s_l_idx'%(classname, ml),
486             '%s_%s_n_idx'%(classname, ml)
487         ]
488         table_name = '%s_%s'%(classname, ml)
489         for index_name in l:
490             if not self.sql_index_exists(table_name, index_name):
491                 continue
492             sql = 'drop index %s on %s'%(index_name, table_name)
493             self.sql(sql)
495     def drop_class_table_key_index(self, cn, key):
496         table_name = '_%s'%cn
497         index_name = '_%s_%s_idx'%(cn, key)
498         if not self.sql_index_exists(table_name, index_name):
499             return
500         sql = 'drop index %s on %s'%(index_name, table_name)
501         self.sql(sql)
503     # old-skool id generation
504     def newid(self, classname):
505         ''' Generate a new id for the given class
506         '''
507         # get the next ID - "FOR UPDATE" will lock the row for us
508         sql = 'select num from ids where name=%s FOR UPDATE'%self.arg
509         self.sql(sql, (classname, ))
510         newid = int(self.cursor.fetchone()[0])
512         # update the counter
513         sql = 'update ids set num=%s where name=%s'%(self.arg, self.arg)
514         vals = (int(newid)+1, classname)
515         self.sql(sql, vals)
517         # return as string
518         return str(newid)
520     def setid(self, classname, setid):
521         ''' Set the id counter: used during import of database
523         We add one to make it behave like the seqeunces in postgres.
524         '''
525         sql = 'update ids set num=%s where name=%s'%(self.arg, self.arg)
526         vals = (int(setid)+1, classname)
527         self.sql(sql, vals)
529     def clear(self):
530         rdbms_common.Database.clear(self)
532         # set the id counters to 0 (setid adds one) so we start at 1
533         for cn in self.classes.keys():
534             self.setid(cn, 0)
536     def create_class(self, spec):
537         rdbms_common.Database.create_class(self, spec)
538         sql = 'insert into ids (name, num) values (%s, %s)'
539         vals = (spec.classname, 1)
540         self.sql(sql, vals)
542     def sql_commit(self, fail_ok=False):
543         ''' Actually commit to the database.
544         '''
545         self.log_info('commit')
547         # MySQL commits don't seem to ever fail, the latest update winning.
548         # makes you wonder why they have transactions...
549         self.conn.commit()
551         # open a new cursor for subsequent work
552         self.cursor = self.conn.cursor()
554         # make sure we're in a new transaction and not autocommitting
555         self.sql("SET AUTOCOMMIT=0")
556         self.sql("START TRANSACTION")
558     def sql_close(self):
559         self.log_info('close')
560         try:
561             self.conn.close()
562         except MySQLdb.ProgrammingError, message:
563             if str(message) != 'closing a closed connection':
564                 raise
566 class MysqlClass:
568     def supports_subselects(self):
569         # TODO: AFAIK its version dependent for MySQL
570         return False
572     def _subselect(self, classname, multilink_table):
573         ''' "I can't believe it's not a toy RDBMS"
574            see, even toy RDBMSes like gadfly and sqlite can do sub-selects...
575         '''
576         self.db.sql('select nodeid from %s'%multilink_table)
577         s = ','.join([x[0] for x in self.db.sql_fetchall()])
578         return '_%s.id not in (%s)'%(classname, s)
580     def create_inner(self, **propvalues):
581         try:
582             return rdbms_common.Class.create_inner(self, **propvalues)
583         except MySQLdb.IntegrityError, e:
584             self._handle_integrity_error(e, propvalues)
586     def set_inner(self, nodeid, **propvalues):
587         try:
588             return rdbms_common.Class.set_inner(self, nodeid,
589                                                 **propvalues)
590         except MySQLdb.IntegrityError, e:
591             self._handle_integrity_error(e, propvalues)
593     def _handle_integrity_error(self, e, propvalues):
594         ''' Handle a MySQL IntegrityError.
596         If the error is recognized, then it may be converted into an
597         alternative exception.  Otherwise, it is raised unchanged from
598         this function.'''
600         # There are checks in create_inner/set_inner to see if a node
601         # is being created with the same key as an existing node.
602         # But, there is a race condition -- we may pass those checks,
603         # only to find out that a parallel session has created the
604         # node by by the time we actually issue the SQL command to
605         # create the node.  Fortunately, MySQL gives us a unique error
606         # code for this situation, so we can detect it here and handle
607         # it appropriately.
608         # 
609         # The details of the race condition are as follows, where
610         # "X" is a classname, and the term "thread" is meant to
611         # refer generically to both threads and processes:
612         #
613         # Thread A                    Thread B
614         # --------                    --------
615         #                             read table for X
616         # create new X object
617         # commit
618         #                             create new X object
619         #
620         # In Thread B, the check in create_inner does not notice that
621         # the new X object is a duplicate of that committed in Thread
622         # A because MySQL's default "consistent nonlocking read"
623         # behavior means that Thread B sees a snapshot of the database
624         # at the point at which its transaction began -- which was
625         # before Thread A created the object.  However, the attempt
626         # to *write* to the table for X, creating a duplicate entry,
627         # triggers an error at the point of the write.
628         #
629         # If both A and B's transaction begins with creating a new X
630         # object, then this bug cannot occur because creating the
631         # object requires getting a new ID, and newid() locks the id
632         # table until the transaction is committed or rolledback.  So,
633         # B will block until A's commit is complete, and will not
634         # actually get its snapshot until A's transaction completes.
635         # But, if the transaction has begun prior to calling newid,
636         # then the snapshot has already been established.
637         if e[0] == ER.DUP_ENTRY:
638             key = propvalues[self.key]
639             raise ValueError, 'node with key "%s" exists' % key
640         # We don't know what this exception is; reraise it.
641         raise
642         
644 class Class(MysqlClass, rdbms_common.Class):
645     pass
646 class IssueClass(MysqlClass, rdbms_common.IssueClass):
647     pass
648 class FileClass(MysqlClass, rdbms_common.FileClass):
649     pass
651 # vim: set et sts=4 sw=4 :