1 #$Id: back_postgresql.py,v 1.44 2008-08-07 05:50:03 richard Exp $
2 #
3 # Copyright (c) 2003 Martynas Sklyzmantas, Andrey Lebedev <andrey@micro.lt>
4 #
5 # This module is free software, and you may redistribute it and/or modify
6 # under the same terms as Python, so long as this copyright message and
7 # disclaimer are retained in their original form.
8 #
9 '''Postgresql backend via psycopg for Roundup.'''
10 __docformat__ = 'restructuredtext'
12 import os, shutil, time
13 try:
14 import psycopg
15 from psycopg import QuotedString
16 from psycopg import ProgrammingError
17 except:
18 from psycopg2 import psycopg1 as psycopg
19 from psycopg2.extensions import QuotedString
20 from psycopg2.psycopg1 import ProgrammingError
21 import logging
23 from roundup import hyperdb, date
24 from roundup.backends import rdbms_common
25 from roundup.backends import sessions_rdbms
27 def connection_dict(config, dbnamestr=None):
28 ''' read_default_group is MySQL-specific, ignore it '''
29 d = rdbms_common.connection_dict(config, dbnamestr)
30 if 'read_default_group' in d:
31 del d['read_default_group']
32 if 'read_default_file' in d:
33 del d['read_default_file']
34 return d
36 def db_create(config):
37 """Clear all database contents and drop database itself"""
38 command = "CREATE DATABASE %s WITH ENCODING='UNICODE'"%config.RDBMS_NAME
39 if config.RDBMS_TEMPLATE :
40 command = command + " TEMPLATE=%s" % config.RDBMS_TEMPLATE
41 logging.getLogger('roundup.hyperdb').info(command)
42 db_command(config, command)
44 def db_nuke(config, fail_ok=0):
45 """Clear all database contents and drop database itself"""
46 command = 'DROP DATABASE %s'% config.RDBMS_NAME
47 logging.getLogger('roundup.hyperdb').info(command)
48 db_command(config, command)
50 if os.path.exists(config.DATABASE):
51 shutil.rmtree(config.DATABASE)
53 def db_command(config, command):
54 '''Perform some sort of database-level command. Retry 10 times if we
55 fail by conflicting with another user.
56 '''
57 template1 = connection_dict(config)
58 template1['database'] = 'template1'
60 try:
61 conn = psycopg.connect(**template1)
62 except psycopg.OperationalError, message:
63 raise hyperdb.DatabaseError(message)
65 conn.set_isolation_level(0)
66 cursor = conn.cursor()
67 try:
68 for n in range(10):
69 if pg_command(cursor, command):
70 return
71 finally:
72 conn.close()
73 raise RuntimeError('10 attempts to create database failed')
75 def pg_command(cursor, command):
76 '''Execute the postgresql command, which may be blocked by some other
77 user connecting to the database, and return a true value if it succeeds.
79 If there is a concurrent update, retry the command.
80 '''
81 try:
82 cursor.execute(command)
83 except psycopg.ProgrammingError, err:
84 response = str(err).split('\n')[0]
85 if response.find('FATAL') != -1:
86 raise RuntimeError(response)
87 else:
88 msgs = [
89 'is being accessed by other users',
90 'could not serialize access due to concurrent update',
91 ]
92 can_retry = 0
93 for msg in msgs:
94 if response.find(msg) == -1:
95 can_retry = 1
96 if can_retry:
97 time.sleep(1)
98 return 0
99 raise RuntimeError(response)
100 return 1
102 def db_exists(config):
103 """Check if database already exists"""
104 db = connection_dict(config, 'database')
105 try:
106 conn = psycopg.connect(**db)
107 conn.close()
108 return 1
109 except:
110 return 0
112 class Sessions(sessions_rdbms.Sessions):
113 def set(self, *args, **kwargs):
114 try:
115 sessions_rdbms.Sessions.set(self, *args, **kwargs)
116 except ProgrammingError, err:
117 response = str(err).split('\n')[0]
118 if -1 != response.find('ERROR') and \
119 -1 != response.find('could not serialize access due to concurrent update'):
120 # another client just updated, and we're running on
121 # serializable isolation.
122 # see http://www.postgresql.org/docs/7.4/interactive/transaction-iso.html
123 self.db.rollback()
125 class Database(rdbms_common.Database):
126 arg = '%s'
128 # used by some code to switch styles of query
129 implements_intersect = 1
131 def getSessionManager(self):
132 return Sessions(self)
134 def sql_open_connection(self):
135 db = connection_dict(self.config, 'database')
136 logging.getLogger('roundup.hyperdb').info(
137 'open database %r'%db['database'])
138 try:
139 conn = psycopg.connect(**db)
140 except psycopg.OperationalError, message:
141 raise hyperdb.DatabaseError(message)
143 cursor = conn.cursor()
145 return (conn, cursor)
147 def open_connection(self):
148 if not db_exists(self.config):
149 db_create(self.config)
151 self.conn, self.cursor = self.sql_open_connection()
153 try:
154 self.load_dbschema()
155 except psycopg.ProgrammingError, message:
156 if str(message).find('schema') == -1:
157 raise
158 self.rollback()
159 self.init_dbschema()
160 self.sql("CREATE TABLE schema (schema TEXT)")
161 self.sql("CREATE TABLE dual (dummy integer)")
162 self.sql("insert into dual values (1)")
163 self.create_version_2_tables()
165 def create_version_2_tables(self):
166 # OTK store
167 self.sql('''CREATE TABLE otks (otk_key VARCHAR(255),
168 otk_value TEXT, otk_time REAL)''')
169 self.sql('CREATE INDEX otks_key_idx ON otks(otk_key)')
171 # Sessions store
172 self.sql('''CREATE TABLE sessions (
173 session_key VARCHAR(255), session_time REAL,
174 session_value TEXT)''')
175 self.sql('''CREATE INDEX sessions_key_idx ON
176 sessions(session_key)''')
178 # full-text indexing store
179 self.sql('CREATE SEQUENCE ___textids_ids')
180 self.sql('''CREATE TABLE __textids (
181 _textid integer primary key, _class VARCHAR(255),
182 _itemid VARCHAR(255), _prop VARCHAR(255))''')
183 self.sql('''CREATE TABLE __words (_word VARCHAR(30),
184 _textid integer)''')
185 self.sql('CREATE INDEX words_word_idx ON __words(_word)')
186 self.sql('CREATE INDEX words_by_id ON __words (_textid)')
187 self.sql('CREATE UNIQUE INDEX __textids_by_props ON '
188 '__textids (_class, _itemid, _prop)')
190 def fix_version_2_tables(self):
191 # Convert journal date column to TIMESTAMP, params column to TEXT
192 self._convert_journal_tables()
194 # Convert all String properties to TEXT
195 self._convert_string_properties()
197 # convert session / OTK *_time columns to REAL
198 for name in ('otk', 'session'):
199 self.sql('drop index %ss_key_idx'%name)
200 self.sql('drop table %ss'%name)
201 self.sql('''CREATE TABLE %ss (%s_key VARCHAR(255),
202 %s_value VARCHAR(255), %s_time REAL)'''%(name, name, name,
203 name))
204 self.sql('CREATE INDEX %ss_key_idx ON %ss(%s_key)'%(name, name,
205 name))
207 def fix_version_3_tables(self):
208 rdbms_common.Database.fix_version_3_tables(self)
209 self.sql('''CREATE INDEX words_both_idx ON public.__words
210 USING btree (_word, _textid)''')
212 def add_actor_column(self):
213 # update existing tables to have the new actor column
214 tables = self.database_schema['tables']
215 for name in tables:
216 self.sql('ALTER TABLE _%s add __actor VARCHAR(255)'%name)
218 def __repr__(self):
219 return '<roundpsycopgsql 0x%x>' % id(self)
221 def sql_commit(self, fail_ok=False):
222 ''' Actually commit to the database.
223 '''
224 logging.getLogger('roundup.hyperdb').info('commit')
226 try:
227 self.conn.commit()
228 except psycopg.ProgrammingError, message:
229 # we've been instructed that this commit is allowed to fail
230 if fail_ok and str(message).endswith('could not serialize '
231 'access due to concurrent update'):
232 logging.getLogger('roundup.hyperdb').info(
233 'commit FAILED, but fail_ok')
234 else:
235 raise
237 # open a new cursor for subsequent work
238 self.cursor = self.conn.cursor()
240 def sql_stringquote(self, value):
241 ''' psycopg.QuotedString returns a "buffer" object with the
242 single-quotes around it... '''
243 return str(QuotedString(str(value)))[1:-1]
245 def sql_index_exists(self, table_name, index_name):
246 sql = 'select count(*) from pg_indexes where ' \
247 'tablename=%s and indexname=%s'%(self.arg, self.arg)
248 self.sql(sql, (table_name, index_name))
249 return self.cursor.fetchone()[0]
251 def create_class_table(self, spec, create_sequence=1):
252 if create_sequence:
253 sql = 'CREATE SEQUENCE _%s_ids'%spec.classname
254 self.sql(sql)
256 return rdbms_common.Database.create_class_table(self, spec)
258 def drop_class_table(self, cn):
259 sql = 'drop table _%s'%cn
260 self.sql(sql)
262 sql = 'drop sequence _%s_ids'%cn
263 self.sql(sql)
265 def newid(self, classname):
266 sql = "select nextval('_%s_ids') from dual"%classname
267 self.sql(sql)
268 return str(self.cursor.fetchone()[0])
270 def setid(self, classname, setid):
271 sql = "select setval('_%s_ids', %s) from dual"%(classname, int(setid))
272 self.sql(sql)
274 def clear(self):
275 rdbms_common.Database.clear(self)
277 # reset the sequences
278 for cn in self.classes:
279 self.cursor.execute('DROP SEQUENCE _%s_ids'%cn)
280 self.cursor.execute('CREATE SEQUENCE _%s_ids'%cn)
282 class PostgresqlClass:
283 order_by_null_values = '(%s is not NULL)'
285 class Class(PostgresqlClass, rdbms_common.Class):
286 pass
287 class IssueClass(PostgresqlClass, rdbms_common.IssueClass):
288 pass
289 class FileClass(PostgresqlClass, rdbms_common.FileClass):
290 pass
292 # vim: set et sts=4 sw=4 :