Remove old database migrations that targeted ancient versions
For this version it is safe to assume that is these migrations are no longer needed. They could introduce hard to track errors as part of the startup work they did.
This commit is contained in:
parent
819706ae5d
commit
7ac70cd58e
|
@ -16,6 +16,7 @@ Changed
|
|||
- Use ckantoolkit to clean up imports #358
|
||||
- Add hook to extend the package dict in CKAN harvester
|
||||
- Use CKAN core ckan.redis.url setting if present
|
||||
- Remove database migration code targeting ancient versions
|
||||
|
||||
Fixed
|
||||
-----
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
import logging
|
||||
import datetime
|
||||
import uuid
|
||||
|
||||
from sqlalchemy import event
|
||||
from sqlalchemy import distinct
|
||||
from sqlalchemy import Table
|
||||
from sqlalchemy import Column
|
||||
from sqlalchemy import ForeignKey
|
||||
|
@ -14,12 +12,11 @@ from sqlalchemy.orm import backref, relation
|
|||
from sqlalchemy.exc import InvalidRequestError
|
||||
|
||||
from ckan import model
|
||||
from ckan import logic
|
||||
from ckan.model.meta import metadata, mapper, Session
|
||||
from ckan.model.meta import metadata, mapper, Session
|
||||
from ckan.model.types import make_uuid
|
||||
from ckan.model.domain_object import DomainObject
|
||||
from ckan.model.package import Package
|
||||
from ckan.lib.munge import munge_title_to_name
|
||||
|
||||
|
||||
UPDATE_FREQUENCIES = ['MANUAL', 'MONTHLY', 'WEEKLY', 'BIWEEKLY', 'DAILY', 'ALWAYS']
|
||||
|
||||
|
@ -72,23 +69,6 @@ def setup():
|
|||
log.debug('Harvest tables already exist')
|
||||
# Check if existing tables need to be updated
|
||||
inspector = Inspector.from_engine(engine)
|
||||
columns = inspector.get_columns('harvest_source')
|
||||
column_names = [column['name'] for column in columns]
|
||||
if 'title' not in column_names:
|
||||
log.debug('Harvest tables need to be updated')
|
||||
migrate_v2()
|
||||
if 'frequency' not in column_names:
|
||||
log.debug('Harvest tables need to be updated')
|
||||
migrate_v3()
|
||||
|
||||
# Check if this instance has harvest source datasets
|
||||
source_ids = Session.query(HarvestSource.id).filter_by(active=True).all()
|
||||
source_package_ids = Session.query(model.Package.id).filter_by(type=u'harvest', state='active').all()
|
||||
sources_to_migrate = set(source_ids) - set(source_package_ids)
|
||||
if sources_to_migrate:
|
||||
log.debug('Creating harvest source datasets for %i existing sources', len(sources_to_migrate))
|
||||
sources_to_migrate = [s[0] for s in sources_to_migrate]
|
||||
migrate_v3_create_datasets(sources_to_migrate)
|
||||
|
||||
# Check if harvest_log table exist - needed for existing users
|
||||
if 'harvest_log' not in inspector.get_table_names():
|
||||
|
@ -432,100 +412,6 @@ def define_harvester_tables():
|
|||
event.listen(HarvestObject, 'before_insert', harvest_object_before_insert_listener)
|
||||
|
||||
|
||||
def migrate_v2():
|
||||
log.debug('Migrating harvest tables to v2. This may take a while...')
|
||||
conn = Session.connection()
|
||||
|
||||
statements = '''
|
||||
ALTER TABLE harvest_source ADD COLUMN title text;
|
||||
|
||||
ALTER TABLE harvest_object ADD COLUMN current boolean;
|
||||
ALTER TABLE harvest_object ADD COLUMN harvest_source_id text;
|
||||
ALTER TABLE harvest_object ADD CONSTRAINT harvest_object_harvest_source_id_fkey FOREIGN KEY (harvest_source_id)
|
||||
REFERENCES harvest_source(id);
|
||||
|
||||
UPDATE harvest_object o SET harvest_source_id = j.source_id FROM harvest_job j WHERE o.harvest_job_id = j.id;
|
||||
'''
|
||||
conn.execute(statements)
|
||||
|
||||
# Flag current harvest_objects
|
||||
guids = Session.query(distinct(HarvestObject.guid)) \
|
||||
.join(Package) \
|
||||
.filter(
|
||||
HarvestObject.package != None # noqa: E711
|
||||
).filter(Package.state == u'active')
|
||||
|
||||
update_statement = '''
|
||||
UPDATE harvest_object
|
||||
SET current = TRUE
|
||||
WHERE id = (
|
||||
SELECT o.id
|
||||
FROM harvest_object o JOIN package p ON p.id = o.package_id
|
||||
WHERE o.package_id IS NOT null AND p.state = 'active'
|
||||
AND o.guid = '%s'
|
||||
ORDER BY metadata_modified_date DESC, fetch_finished DESC, gathered DESC
|
||||
LIMIT 1)
|
||||
'''
|
||||
|
||||
for guid in guids:
|
||||
conn.execute(update_statement % guid)
|
||||
|
||||
conn.execute('UPDATE harvest_object SET current = FALSE WHERE current IS NOT TRUE')
|
||||
|
||||
Session.commit()
|
||||
log.info('Harvest tables migrated to v2')
|
||||
|
||||
|
||||
def migrate_v3():
|
||||
log.debug('Migrating harvest tables to v3. This may take a while...')
|
||||
conn = Session.connection()
|
||||
|
||||
statement = """CREATE TABLE harvest_object_extra (
|
||||
id text NOT NULL,
|
||||
harvest_object_id text,
|
||||
"key" text,
|
||||
"value" text
|
||||
);
|
||||
|
||||
ALTER TABLE harvest_object
|
||||
ADD COLUMN import_started timestamp without time zone,
|
||||
ADD COLUMN import_finished timestamp without time zone,
|
||||
ADD COLUMN "state" text,
|
||||
ADD COLUMN "report_status" text;
|
||||
|
||||
ALTER TABLE harvest_source
|
||||
ADD COLUMN frequency text,
|
||||
ADD COLUMN next_run timestamp without time zone;
|
||||
|
||||
ALTER TABLE harvest_job
|
||||
ADD COLUMN finished timestamp without time zone;
|
||||
|
||||
ALTER TABLE harvest_object_extra
|
||||
ADD CONSTRAINT harvest_object_extra_pkey PRIMARY KEY (id);
|
||||
|
||||
ALTER TABLE harvest_object_extra
|
||||
ADD CONSTRAINT harvest_object_extra_harvest_object_id_fkey FOREIGN KEY (harvest_object_id) REFERENCES harvest_object(id);
|
||||
|
||||
UPDATE harvest_object set state = 'COMPLETE' where package_id is not null;
|
||||
UPDATE harvest_object set state = 'ERROR' where package_id is null;
|
||||
UPDATE harvest_object set retry_times = 0;
|
||||
UPDATE harvest_object set report_status = 'updated' where package_id is not null;
|
||||
UPDATE harvest_object set report_status = 'errored' where package_id is null;
|
||||
UPDATE harvest_source set frequency = 'MANUAL';
|
||||
|
||||
ALTER TABLE harvest_object DROP CONSTRAINT harvest_object_package_id_fkey;
|
||||
ALTER TABLE harvest_object
|
||||
ADD CONSTRAINT harvest_object_package_id_fkey FOREIGN KEY (package_id) REFERENCES package(id) DEFERRABLE;
|
||||
|
||||
ALTER TABLE harvest_object_error
|
||||
ADD COLUMN line integer;
|
||||
|
||||
"""
|
||||
conn.execute(statement)
|
||||
Session.commit()
|
||||
log.info('Harvest tables migrated to v3')
|
||||
|
||||
|
||||
class PackageIdHarvestSourceIdMismatch(Exception):
|
||||
"""
|
||||
The package created for the harvest source must match the id of the
|
||||
|
@ -534,89 +420,9 @@ class PackageIdHarvestSourceIdMismatch(Exception):
|
|||
pass
|
||||
|
||||
|
||||
def migrate_v3_create_datasets(source_ids=None):
|
||||
import pylons
|
||||
from paste.registry import Registry
|
||||
|
||||
from ckan.lib.cli import MockTranslator
|
||||
registry = Registry()
|
||||
registry.prepare()
|
||||
registry.register(pylons.translator, MockTranslator())
|
||||
|
||||
sources = []
|
||||
if not source_ids:
|
||||
sources = model.Session.query(HarvestSource).all()
|
||||
|
||||
else:
|
||||
sources = model.Session.query(HarvestSource) \
|
||||
.filter(HarvestSource.id.in_(source_ids)) \
|
||||
.all()
|
||||
|
||||
if not sources:
|
||||
log.debug('No harvest sources to migrate')
|
||||
return
|
||||
|
||||
site_user_name = logic.get_action('get_site_user')({'model': model, 'ignore_auth': True}, {})['name']
|
||||
|
||||
context = {'model': model,
|
||||
'session': model.Session,
|
||||
'user': site_user_name, # TODO: auth of existing sources?
|
||||
'return_id_only': True,
|
||||
'extras_as_string': True,
|
||||
}
|
||||
|
||||
def gen_new_name(title):
|
||||
name = munge_title_to_name(title).replace('_', '-')
|
||||
while '--' in name:
|
||||
name = name.replace('--', '-')
|
||||
pkg_obj = Session.query(Package).filter(Package.name == name).first()
|
||||
if pkg_obj:
|
||||
return name + str(uuid.uuid4())[:5]
|
||||
else:
|
||||
return name
|
||||
|
||||
for source in sources:
|
||||
if 'id' in context:
|
||||
del context['id']
|
||||
if 'package' in context:
|
||||
del context['package']
|
||||
|
||||
# Check if package already exists
|
||||
|
||||
try:
|
||||
logic.get_action('package_show')(context, {'id': source.id})
|
||||
continue
|
||||
except logic.NotFound:
|
||||
pass
|
||||
|
||||
package_dict = {
|
||||
'id': source.id,
|
||||
'name': gen_new_name(source.title) if source.title else source.id,
|
||||
'title': source.title if source.title else source.url,
|
||||
'notes': source.description,
|
||||
'url': source.url,
|
||||
'type': 'harvest',
|
||||
'source_type': source.type,
|
||||
'config': source.config,
|
||||
'frequency': source.frequency,
|
||||
'owner_org': source.publisher_id,
|
||||
}
|
||||
context['message'] = 'Created package for harvest source {0}'.format(source.id)
|
||||
try:
|
||||
new_package_id = logic.get_action('package_create')(context, package_dict)
|
||||
if new_package_id != source.id or not context['return_id_only']:
|
||||
# this check only makes sense if we are sure we are returning
|
||||
# the package id not the package object
|
||||
raise PackageIdHarvestSourceIdMismatch
|
||||
|
||||
log.info('Created new package for source {0} ({1})'.format(source.id, source.url))
|
||||
except logic.ValidationError, e:
|
||||
log.error('Validation Error: %s' % str(e.error_summary))
|
||||
|
||||
|
||||
def clean_harvest_log(condition):
|
||||
Session.query(HarvestLog).filter(HarvestLog.created <= condition)\
|
||||
.delete(synchronize_session=False)
|
||||
Session.query(HarvestLog).filter(HarvestLog.created <= condition) \
|
||||
.delete(synchronize_session=False)
|
||||
try:
|
||||
Session.commit()
|
||||
except InvalidRequestError:
|
||||
|
|
Loading…
Reference in New Issue