Code

Configuration issue: On some postgresql 8.4 installations (notably on
[roundup.git] / roundup / backends / back_postgresql.py
index 4882492c4393813cb3f6c5eab64420af69c63787..098126281449106c4afaa97885a4e2fa9aa031a8 100644 (file)
@@ -1,3 +1,4 @@
+#$Id: back_postgresql.py,v 1.44 2008-08-07 05:50:03 richard Exp $
 #
 # Copyright (c) 2003 Martynas Sklyzmantas, Andrey Lebedev <andrey@micro.lt>
 #
 '''Postgresql backend via psycopg for Roundup.'''
 __docformat__ = 'restructuredtext'
 
-
-import os, shutil, popen2, time
-import psycopg
+import os, shutil, time
+try:
+    import psycopg
+    from psycopg import QuotedString
+    from psycopg import ProgrammingError
+except:
+    from psycopg2 import psycopg1 as psycopg
+    from psycopg2.extensions import QuotedString
+    from psycopg2.psycopg1 import ProgrammingError
+import logging
 
 from roundup import hyperdb, date
 from roundup.backends import rdbms_common
+from roundup.backends import sessions_rdbms
+
+def connection_dict(config, dbnamestr=None):
+    ''' read_default_group is MySQL-specific, ignore it '''
+    d = rdbms_common.connection_dict(config, dbnamestr)
+    if 'read_default_group' in d:
+        del d['read_default_group']
+    if 'read_default_file' in d:
+        del d['read_default_file']
+    return d
 
 def db_create(config):
     """Clear all database contents and drop database itself"""
-    if __debug__:
-        print >> hyperdb.DEBUG, '+++ create database +++'
-    name = config.POSTGRESQL_DATABASE['database']
-    n = 0
-    while n < 10:
-        cout,cin = popen2.popen4('createdb %s'%name)
-        cin.close()
-        response = cout.read().split('\n')[0]
-        if response.find('FATAL') != -1:
-            raise RuntimeError, response
-        elif response.find('ERROR') != -1:
-            if not response.find('is being accessed by other users') != -1:
-                raise RuntimeError, response
-            if __debug__:
-                print >> hyperdb.DEBUG, '+++ SLEEPING +++'
-            time.sleep(1)
-            n += 1
-            continue
-        return
-    raise RuntimeError, '10 attempts to create database failed'
+    command = "CREATE DATABASE %s WITH ENCODING='UNICODE'"%config.RDBMS_NAME
+    if config.RDBMS_TEMPLATE :
+        command = command + " TEMPLATE=%s" % config.RDBMS_TEMPLATE
+    logging.getLogger('roundup.hyperdb').info(command)
+    db_command(config, command)
 
 def db_nuke(config, fail_ok=0):
     """Clear all database contents and drop database itself"""
-    if __debug__:
-        print >> hyperdb.DEBUG, '+++ nuke database +++'
-    name = config.POSTGRESQL_DATABASE['database']
-    n = 0
+    command = 'DROP DATABASE %s'% config.RDBMS_NAME
+    logging.getLogger('roundup.hyperdb').info(command)
+    db_command(config, command)
+
     if os.path.exists(config.DATABASE):
         shutil.rmtree(config.DATABASE)
-    while n < 10:
-        cout,cin = popen2.popen4('dropdb %s'%name)
-        cin.close()
-        response = cout.read().split('\n')[0]
-        if response.endswith('does not exist') and fail_ok:
-            return
-        elif response.find('FATAL') != -1:
-            raise RuntimeError, response
-        elif response.find('ERROR') != -1:
-            if not response.find('is being accessed by other users') != -1:
-                raise RuntimeError, response
-            if __debug__:
-                print >> hyperdb.DEBUG, '+++ SLEEPING +++'
-            time.sleep(1)
-            n += 1
-            continue
-        return
-    raise RuntimeError, '10 attempts to nuke database failed'
+
+def db_command(config, command):
+    '''Perform some sort of database-level command. Retry 10 times if we
+    fail by conflicting with another user.
+    '''
+    template1 = connection_dict(config)
+    template1['database'] = 'template1'
+
+    try:
+        conn = psycopg.connect(**template1)
+    except psycopg.OperationalError, message:
+        raise hyperdb.DatabaseError(message)
+
+    conn.set_isolation_level(0)
+    cursor = conn.cursor()
+    try:
+        for n in range(10):
+            if pg_command(cursor, command):
+                return
+    finally:
+        conn.close()
+    raise RuntimeError('10 attempts to create database failed')
+
+def pg_command(cursor, command):
+    '''Execute the postgresql command, which may be blocked by some other
+    user connecting to the database, and return a true value if it succeeds.
+
+    If there is a concurrent update, retry the command.
+    '''
+    try:
+        cursor.execute(command)
+    except psycopg.ProgrammingError, err:
+        response = str(err).split('\n')[0]
+        if response.find('FATAL') != -1:
+            raise RuntimeError(response)
+        else:
+            msgs = [
+                'is being accessed by other users',
+                'could not serialize access due to concurrent update',
+            ]
+            can_retry = 0
+            for msg in msgs:
+                if response.find(msg) == -1:
+                    can_retry = 1
+            if can_retry:
+                time.sleep(1)
+                return 0
+            raise RuntimeError(response)
+    return 1
 
 def db_exists(config):
     """Check if database already exists"""
-    db = getattr(config, 'POSTGRESQL_DATABASE')
+    db = connection_dict(config, 'database')
     try:
         conn = psycopg.connect(**db)
         conn.close()
-        if __debug__:
-            print >> hyperdb.DEBUG, '+++ database exists +++'
         return 1
     except:
-        if __debug__:
-            print >> hyperdb.DEBUG, '+++ no database +++'
         return 0
 
+class Sessions(sessions_rdbms.Sessions):
+    def set(self, *args, **kwargs):
+        try:
+            sessions_rdbms.Sessions.set(self, *args, **kwargs)
+        except ProgrammingError, err:
+            response = str(err).split('\n')[0]
+            if -1 != response.find('ERROR') and \
+               -1 != response.find('could not serialize access due to concurrent update'):
+                # another client just updated, and we're running on
+                # serializable isolation.
+                # see http://www.postgresql.org/docs/7.4/interactive/transaction-iso.html
+                self.db.rollback()
+
 class Database(rdbms_common.Database):
     arg = '%s'
 
+    # used by some code to switch styles of query
+    implements_intersect = 1
+
+    def getSessionManager(self):
+        return Sessions(self)
+
     def sql_open_connection(self):
-        db = getattr(self.config, 'POSTGRESQL_DATABASE')
+        db = connection_dict(self.config, 'database')
+        logging.getLogger('roundup.hyperdb').info(
+            'open database %r'%db['database'])
         try:
             conn = psycopg.connect(**db)
         except psycopg.OperationalError, message:
-            raise hyperdb.DatabaseError, message
+            raise hyperdb.DatabaseError(message)
 
         cursor = conn.cursor()
 
@@ -97,14 +148,13 @@ class Database(rdbms_common.Database):
         if not db_exists(self.config):
             db_create(self.config)
 
-        if __debug__:
-            print >>hyperdb.DEBUG, '+++ open database connection +++'
-
         self.conn, self.cursor = self.sql_open_connection()
 
         try:
             self.load_dbschema()
-        except:
+        except psycopg.ProgrammingError, message:
+            if str(message).find('schema') == -1:
+                raise
             self.rollback()
             self.init_dbschema()
             self.sql("CREATE TABLE schema (schema TEXT)")
@@ -114,103 +164,129 @@ class Database(rdbms_common.Database):
 
     def create_version_2_tables(self):
         # OTK store
-        self.cursor.execute('''CREATE TABLE otks (otk_key VARCHAR(255),
-            otk_value VARCHAR(255), otk_time FLOAT(20))''')
-        self.cursor.execute('CREATE INDEX otks_key_idx ON otks(otk_key)')
+        self.sql('''CREATE TABLE otks (otk_key VARCHAR(255),
+            otk_value TEXT, otk_time REAL)''')
+        self.sql('CREATE INDEX otks_key_idx ON otks(otk_key)')
 
         # Sessions store
-        self.cursor.execute('''CREATE TABLE sessions (
-            session_key VARCHAR(255), session_time FLOAT(20),
-            session_value VARCHAR(255))''')
-        self.cursor.execute('''CREATE INDEX sessions_key_idx ON
+        self.sql('''CREATE TABLE sessions (
+            session_key VARCHAR(255), session_time REAL,
+            session_value TEXT)''')
+        self.sql('''CREATE INDEX sessions_key_idx ON
             sessions(session_key)''')
 
         # full-text indexing store
-        self.cursor.execute('CREATE SEQUENCE ___textids_ids')
-        self.cursor.execute('''CREATE TABLE __textids (
+        self.sql('CREATE SEQUENCE ___textids_ids')
+        self.sql('''CREATE TABLE __textids (
             _textid integer primary key, _class VARCHAR(255),
             _itemid VARCHAR(255), _prop VARCHAR(255))''')
-        self.cursor.execute('''CREATE TABLE __words (_word VARCHAR(30), 
+        self.sql('''CREATE TABLE __words (_word VARCHAR(30),
             _textid integer)''')
-        self.cursor.execute('CREATE INDEX words_word_idx ON __words(_word)')
+        self.sql('CREATE INDEX words_word_idx ON __words(_word)')
+        self.sql('CREATE INDEX words_by_id ON __words (_textid)')
+        self.sql('CREATE UNIQUE INDEX __textids_by_props ON '
+                 '__textids (_class, _itemid, _prop)')
+
+    def fix_version_2_tables(self):
+        # Convert journal date column to TIMESTAMP, params column to TEXT
+        self._convert_journal_tables()
+
+        # Convert all String properties to TEXT
+        self._convert_string_properties()
+
+        # convert session / OTK *_time columns to REAL
+        for name in ('otk', 'session'):
+            self.sql('drop index %ss_key_idx'%name)
+            self.sql('drop table %ss'%name)
+            self.sql('''CREATE TABLE %ss (%s_key VARCHAR(255),
+                %s_value VARCHAR(255), %s_time REAL)'''%(name, name, name,
+                name))
+            self.sql('CREATE INDEX %ss_key_idx ON %ss(%s_key)'%(name, name,
+                name))
+
+    def fix_version_3_tables(self):
+        rdbms_common.Database.fix_version_3_tables(self)
+        self.sql('''CREATE INDEX words_both_idx ON public.__words
+            USING btree (_word, _textid)''')
 
     def add_actor_column(self):
         # update existing tables to have the new actor column
         tables = self.database_schema['tables']
-        for name in tables.keys():
-            self.cursor.execute('ALTER TABLE _%s add __actor '
-                'VARCHAR(255)'%name)
+        for name in tables:
+            self.sql('ALTER TABLE _%s add __actor VARCHAR(255)'%name)
 
     def __repr__(self):
         return '<roundpsycopgsql 0x%x>' % id(self)
 
+    def sql_commit(self, fail_ok=False):
+        ''' Actually commit to the database.
+        '''
+        logging.getLogger('roundup.hyperdb').info('commit')
+
+        try:
+            self.conn.commit()
+        except psycopg.ProgrammingError, message:
+            # we've been instructed that this commit is allowed to fail
+            if fail_ok and str(message).endswith('could not serialize '
+                    'access due to concurrent update'):
+                logging.getLogger('roundup.hyperdb').info(
+                    'commit FAILED, but fail_ok')
+            else:
+                raise
+
+        # open a new cursor for subsequent work
+        self.cursor = self.conn.cursor()
+
     def sql_stringquote(self, value):
         ''' psycopg.QuotedString returns a "buffer" object with the
             single-quotes around it... '''
-        return str(psycopg.QuotedString(str(value)))[1:-1]
+        return str(QuotedString(str(value)))[1:-1]
 
     def sql_index_exists(self, table_name, index_name):
         sql = 'select count(*) from pg_indexes where ' \
             'tablename=%s and indexname=%s'%(self.arg, self.arg)
-        self.cursor.execute(sql, (table_name, index_name))
+        self.sql(sql, (table_name, index_name))
         return self.cursor.fetchone()[0]
 
-    def create_class_table(self, spec):
-        sql = 'CREATE SEQUENCE _%s_ids'%spec.classname
-        if __debug__:
-            print >>hyperdb.DEBUG, 'create_class_table', (self, sql)
-        self.cursor.execute(sql)
+    def create_class_table(self, spec, create_sequence=1):
+        if create_sequence:
+            sql = 'CREATE SEQUENCE _%s_ids'%spec.classname
+            self.sql(sql)
 
         return rdbms_common.Database.create_class_table(self, spec)
 
     def drop_class_table(self, cn):
         sql = 'drop table _%s'%cn
-        if __debug__:
-            print >>hyperdb.DEBUG, 'drop_class', (self, sql)
-        self.cursor.execute(sql)
+        self.sql(sql)
 
         sql = 'drop sequence _%s_ids'%cn
-        if __debug__:
-            print >>hyperdb.DEBUG, 'drop_class', (self, sql)
-        self.cursor.execute(sql)
-
-    def create_journal_table(self, spec):
-        cols = ',' . join(['"%s" VARCHAR(255)'%x
-          for x in 'nodeid date tag action params' . split()])
-        sql  = 'CREATE TABLE "%s__journal" (%s)'%(spec.classname, cols)
-        if __debug__:
-            print >>hyperdb.DEBUG, 'create_journal_table', (self, sql)
-        self.cursor.execute(sql)
-        self.create_journal_table_indexes(spec)
-
-    def create_multilink_table(self, spec, ml):
-        sql = '''CREATE TABLE "%s_%s" (linkid VARCHAR(255),
-            nodeid VARCHAR(255))'''%(spec.classname, ml)
-
-        if __debug__:
-            print >>hyperdb.DEBUG, 'create_class', (self, sql)
-
-        self.cursor.execute(sql)
-        self.create_multilink_table_indexes(spec, ml)
+        self.sql(sql)
 
     def newid(self, classname):
         sql = "select nextval('_%s_ids') from dual"%classname
-        if __debug__:
-            print >>hyperdb.DEBUG, 'setid', (self, sql)
-        self.cursor.execute(sql)
-        return self.cursor.fetchone()[0]
+        self.sql(sql)
+        return str(self.cursor.fetchone()[0])
 
     def setid(self, classname, setid):
         sql = "select setval('_%s_ids', %s) from dual"%(classname, int(setid))
-        if __debug__:
-            print >>hyperdb.DEBUG, 'setid', (self, sql)
-        self.cursor.execute(sql)
+        self.sql(sql)
+
+    def clear(self):
+        rdbms_common.Database.clear(self)
+
+        # reset the sequences
+        for cn in self.classes:
+            self.cursor.execute('DROP SEQUENCE _%s_ids'%cn)
+            self.cursor.execute('CREATE SEQUENCE _%s_ids'%cn)
 
+class PostgresqlClass:
+    order_by_null_values = '(%s is not NULL)'
 
-class Class(rdbms_common.Class):
+class Class(PostgresqlClass, rdbms_common.Class):
     pass
-class IssueClass(rdbms_common.IssueClass):
+class IssueClass(PostgresqlClass, rdbms_common.IssueClass):
     pass
-class FileClass(rdbms_common.FileClass):
+class FileClass(PostgresqlClass, rdbms_common.FileClass):
     pass
 
+# vim: set et sts=4 sw=4 :