Merge pull request #376 from ckan/remove-old-migrations
Remove old database migrations that targeted ancient versions
This commit is contained in:
commit
5c9c093803
|
@ -16,6 +16,10 @@ Changed
|
|||
- Use ckantoolkit to clean up imports #358
|
||||
- Add hook to extend the package dict in CKAN harvester
|
||||
- Use CKAN core ckan.redis.url setting if present
|
||||
- Remove database migration code targeting ancient versions #376
|
||||
(In the unlikely event that you need to upgrade from one
|
||||
of the previous DB versions just apply the changes removed
|
||||
on the linked PR manually)
|
||||
|
||||
Fixed
|
||||
-----
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
import logging
|
||||
import datetime
|
||||
import uuid
|
||||
|
||||
from sqlalchemy import event
|
||||
from sqlalchemy import distinct
|
||||
from sqlalchemy import Table
|
||||
from sqlalchemy import Column
|
||||
from sqlalchemy import ForeignKey
|
||||
|
@ -14,12 +12,11 @@ from sqlalchemy.orm import backref, relation
|
|||
from sqlalchemy.exc import InvalidRequestError
|
||||
|
||||
from ckan import model
|
||||
from ckan import logic
|
||||
from ckan.model.meta import metadata, mapper, Session
|
||||
from ckan.model.meta import metadata, mapper, Session
|
||||
from ckan.model.types import make_uuid
|
||||
from ckan.model.domain_object import DomainObject
|
||||
from ckan.model.package import Package
|
||||
from ckan.lib.munge import munge_title_to_name
|
||||
|
||||
|
||||
UPDATE_FREQUENCIES = ['MANUAL', 'MONTHLY', 'WEEKLY', 'BIWEEKLY', 'DAILY', 'ALWAYS']
|
||||
|
||||
|
@ -72,23 +69,6 @@ def setup():
|
|||
log.debug('Harvest tables already exist')
|
||||
# Check if existing tables need to be updated
|
||||
inspector = Inspector.from_engine(engine)
|
||||
columns = inspector.get_columns('harvest_source')
|
||||
column_names = [column['name'] for column in columns]
|
||||
if 'title' not in column_names:
|
||||
log.debug('Harvest tables need to be updated')
|
||||
migrate_v2()
|
||||
if 'frequency' not in column_names:
|
||||
log.debug('Harvest tables need to be updated')
|
||||
migrate_v3()
|
||||
|
||||
# Check if this instance has harvest source datasets
|
||||
source_ids = Session.query(HarvestSource.id).filter_by(active=True).all()
|
||||
source_package_ids = Session.query(model.Package.id).filter_by(type=u'harvest', state='active').all()
|
||||
sources_to_migrate = set(source_ids) - set(source_package_ids)
|
||||
if sources_to_migrate:
|
||||
log.debug('Creating harvest source datasets for %i existing sources', len(sources_to_migrate))
|
||||
sources_to_migrate = [s[0] for s in sources_to_migrate]
|
||||
migrate_v3_create_datasets(sources_to_migrate)
|
||||
|
||||
# Check if harvest_log table exist - needed for existing users
|
||||
if 'harvest_log' not in inspector.get_table_names():
|
||||
|
@ -432,100 +412,6 @@ def define_harvester_tables():
|
|||
event.listen(HarvestObject, 'before_insert', harvest_object_before_insert_listener)
|
||||
|
||||
|
||||
def migrate_v2():
|
||||
log.debug('Migrating harvest tables to v2. This may take a while...')
|
||||
conn = Session.connection()
|
||||
|
||||
statements = '''
|
||||
ALTER TABLE harvest_source ADD COLUMN title text;
|
||||
|
||||
ALTER TABLE harvest_object ADD COLUMN current boolean;
|
||||
ALTER TABLE harvest_object ADD COLUMN harvest_source_id text;
|
||||
ALTER TABLE harvest_object ADD CONSTRAINT harvest_object_harvest_source_id_fkey FOREIGN KEY (harvest_source_id)
|
||||
REFERENCES harvest_source(id);
|
||||
|
||||
UPDATE harvest_object o SET harvest_source_id = j.source_id FROM harvest_job j WHERE o.harvest_job_id = j.id;
|
||||
'''
|
||||
conn.execute(statements)
|
||||
|
||||
# Flag current harvest_objects
|
||||
guids = Session.query(distinct(HarvestObject.guid)) \
|
||||
.join(Package) \
|
||||
.filter(
|
||||
HarvestObject.package != None # noqa: E711
|
||||
).filter(Package.state == u'active')
|
||||
|
||||
update_statement = '''
|
||||
UPDATE harvest_object
|
||||
SET current = TRUE
|
||||
WHERE id = (
|
||||
SELECT o.id
|
||||
FROM harvest_object o JOIN package p ON p.id = o.package_id
|
||||
WHERE o.package_id IS NOT null AND p.state = 'active'
|
||||
AND o.guid = '%s'
|
||||
ORDER BY metadata_modified_date DESC, fetch_finished DESC, gathered DESC
|
||||
LIMIT 1)
|
||||
'''
|
||||
|
||||
for guid in guids:
|
||||
conn.execute(update_statement % guid)
|
||||
|
||||
conn.execute('UPDATE harvest_object SET current = FALSE WHERE current IS NOT TRUE')
|
||||
|
||||
Session.commit()
|
||||
log.info('Harvest tables migrated to v2')
|
||||
|
||||
|
||||
def migrate_v3():
|
||||
log.debug('Migrating harvest tables to v3. This may take a while...')
|
||||
conn = Session.connection()
|
||||
|
||||
statement = """CREATE TABLE harvest_object_extra (
|
||||
id text NOT NULL,
|
||||
harvest_object_id text,
|
||||
"key" text,
|
||||
"value" text
|
||||
);
|
||||
|
||||
ALTER TABLE harvest_object
|
||||
ADD COLUMN import_started timestamp without time zone,
|
||||
ADD COLUMN import_finished timestamp without time zone,
|
||||
ADD COLUMN "state" text,
|
||||
ADD COLUMN "report_status" text;
|
||||
|
||||
ALTER TABLE harvest_source
|
||||
ADD COLUMN frequency text,
|
||||
ADD COLUMN next_run timestamp without time zone;
|
||||
|
||||
ALTER TABLE harvest_job
|
||||
ADD COLUMN finished timestamp without time zone;
|
||||
|
||||
ALTER TABLE harvest_object_extra
|
||||
ADD CONSTRAINT harvest_object_extra_pkey PRIMARY KEY (id);
|
||||
|
||||
ALTER TABLE harvest_object_extra
|
||||
ADD CONSTRAINT harvest_object_extra_harvest_object_id_fkey FOREIGN KEY (harvest_object_id) REFERENCES harvest_object(id);
|
||||
|
||||
UPDATE harvest_object set state = 'COMPLETE' where package_id is not null;
|
||||
UPDATE harvest_object set state = 'ERROR' where package_id is null;
|
||||
UPDATE harvest_object set retry_times = 0;
|
||||
UPDATE harvest_object set report_status = 'updated' where package_id is not null;
|
||||
UPDATE harvest_object set report_status = 'errored' where package_id is null;
|
||||
UPDATE harvest_source set frequency = 'MANUAL';
|
||||
|
||||
ALTER TABLE harvest_object DROP CONSTRAINT harvest_object_package_id_fkey;
|
||||
ALTER TABLE harvest_object
|
||||
ADD CONSTRAINT harvest_object_package_id_fkey FOREIGN KEY (package_id) REFERENCES package(id) DEFERRABLE;
|
||||
|
||||
ALTER TABLE harvest_object_error
|
||||
ADD COLUMN line integer;
|
||||
|
||||
"""
|
||||
conn.execute(statement)
|
||||
Session.commit()
|
||||
log.info('Harvest tables migrated to v3')
|
||||
|
||||
|
||||
class PackageIdHarvestSourceIdMismatch(Exception):
|
||||
"""
|
||||
The package created for the harvest source must match the id of the
|
||||
|
@ -534,89 +420,9 @@ class PackageIdHarvestSourceIdMismatch(Exception):
|
|||
pass
|
||||
|
||||
|
||||
def migrate_v3_create_datasets(source_ids=None):
|
||||
import pylons
|
||||
from paste.registry import Registry
|
||||
|
||||
from ckan.lib.cli import MockTranslator
|
||||
registry = Registry()
|
||||
registry.prepare()
|
||||
registry.register(pylons.translator, MockTranslator())
|
||||
|
||||
sources = []
|
||||
if not source_ids:
|
||||
sources = model.Session.query(HarvestSource).all()
|
||||
|
||||
else:
|
||||
sources = model.Session.query(HarvestSource) \
|
||||
.filter(HarvestSource.id.in_(source_ids)) \
|
||||
.all()
|
||||
|
||||
if not sources:
|
||||
log.debug('No harvest sources to migrate')
|
||||
return
|
||||
|
||||
site_user_name = logic.get_action('get_site_user')({'model': model, 'ignore_auth': True}, {})['name']
|
||||
|
||||
context = {'model': model,
|
||||
'session': model.Session,
|
||||
'user': site_user_name, # TODO: auth of existing sources?
|
||||
'return_id_only': True,
|
||||
'extras_as_string': True,
|
||||
}
|
||||
|
||||
def gen_new_name(title):
|
||||
name = munge_title_to_name(title).replace('_', '-')
|
||||
while '--' in name:
|
||||
name = name.replace('--', '-')
|
||||
pkg_obj = Session.query(Package).filter(Package.name == name).first()
|
||||
if pkg_obj:
|
||||
return name + str(uuid.uuid4())[:5]
|
||||
else:
|
||||
return name
|
||||
|
||||
for source in sources:
|
||||
if 'id' in context:
|
||||
del context['id']
|
||||
if 'package' in context:
|
||||
del context['package']
|
||||
|
||||
# Check if package already exists
|
||||
|
||||
try:
|
||||
logic.get_action('package_show')(context, {'id': source.id})
|
||||
continue
|
||||
except logic.NotFound:
|
||||
pass
|
||||
|
||||
package_dict = {
|
||||
'id': source.id,
|
||||
'name': gen_new_name(source.title) if source.title else source.id,
|
||||
'title': source.title if source.title else source.url,
|
||||
'notes': source.description,
|
||||
'url': source.url,
|
||||
'type': 'harvest',
|
||||
'source_type': source.type,
|
||||
'config': source.config,
|
||||
'frequency': source.frequency,
|
||||
'owner_org': source.publisher_id,
|
||||
}
|
||||
context['message'] = 'Created package for harvest source {0}'.format(source.id)
|
||||
try:
|
||||
new_package_id = logic.get_action('package_create')(context, package_dict)
|
||||
if new_package_id != source.id or not context['return_id_only']:
|
||||
# this check only makes sense if we are sure we are returning
|
||||
# the package id not the package object
|
||||
raise PackageIdHarvestSourceIdMismatch
|
||||
|
||||
log.info('Created new package for source {0} ({1})'.format(source.id, source.url))
|
||||
except logic.ValidationError, e:
|
||||
log.error('Validation Error: %s' % str(e.error_summary))
|
||||
|
||||
|
||||
def clean_harvest_log(condition):
|
||||
Session.query(HarvestLog).filter(HarvestLog.created <= condition)\
|
||||
.delete(synchronize_session=False)
|
||||
Session.query(HarvestLog).filter(HarvestLog.created <= condition) \
|
||||
.delete(synchronize_session=False)
|
||||
try:
|
||||
Session.commit()
|
||||
except InvalidRequestError:
|
||||
|
|
Loading…
Reference in New Issue