Merge branch 'enh-1726-harvesting-model-update'

This commit is contained in:
amercader 2012-02-15 12:08:19 +00:00
commit 218651af0b
12 changed files with 233 additions and 114 deletions

View File

@ -278,7 +278,8 @@ following methods::
- performing any necessary action with the fetched object (e.g
create a CKAN package).
Note: if this stage creates or updates a package, a reference
to the package should be added to the HarvestObject.
to the package must be added to the HarvestObject.
Additionally, the HarvestObject must be flagged as current.
- creating the HarvestObject - Package relation (if necessary)
- creating and storing any suitable HarvestObjectErrors that may
occur.

View File

@ -216,7 +216,8 @@ class Harvester(CkanCommand):
source_id = unicode(self.args[1])
else:
source_id = None
import_last_objects(source_id)
objs = import_last_objects(source_id)
print '%s objects reimported' % len(objs)
def create_harvest_job_all(self):
jobs = create_harvest_job_all()

View File

@ -80,7 +80,7 @@ class ViewController(BaseController):
data = data or old_data
errors = errors or {}
error_summary = error_summary or {}
#TODO: Use new description interface to build the types select and descriptions
vars = {'data': data, 'errors': errors, 'error_summary': error_summary, 'harvesters': get_registered_harvesters_info()}
c.form = render('source/new_source_form.html', extra_vars=vars)

View File

@ -1,6 +1,8 @@
import logging
import re
from sqlalchemy.sql import update,and_, bindparam
from ckan import model
from ckan.model import Session, Package
from ckan.logic import ValidationError, NotFound, get_action
@ -145,10 +147,8 @@ class HarvesterBase(SingletonPlugin):
log.info('Package with GUID %s exists and needs to be updated' % harvest_object.guid)
# Update package
context.update({'id':package_dict['id']})
updated_package = get_action('package_update_rest')(context, package_dict)
new_package = get_action('package_update_rest')(context, package_dict)
harvest_object.package_id = updated_package['id']
harvest_object.save()
else:
log.info('Package with GUID %s not updated, skipping...' % harvest_object.guid)
@ -161,6 +161,20 @@ class HarvesterBase(SingletonPlugin):
log.info('Package with GUID %s does not exist, let\'s create it' % harvest_object.guid)
new_package = get_action('package_create_rest')(context, package_dict)
harvest_object.package_id = new_package['id']
# Flag the other objects linking to this package as not current anymore
from ckanext.harvest.model import harvest_object_table
conn = Session.connection()
u = update(harvest_object_table) \
.where(harvest_object_table.c.package_id==bindparam('b_package_id')) \
.values(current=False)
conn.execute(u, b_package_id=new_package['id'])
Session.commit()
# Flag this as the current harvest object
harvest_object.package_id = new_package['id']
harvest_object.current = True
harvest_object.save()
return True

View File

@ -87,8 +87,9 @@ def _get_source_status(source, detailed=True):
# Overall statistics
packages = Session.query(distinct(HarvestObject.package_id),Package.name) \
.join(Package).join(HarvestJob).join(HarvestSource) \
.filter(HarvestJob.source==source) \
.join(Package).join(HarvestSource) \
.filter(HarvestObject.source==source) \
.filter(HarvestObject.current==True) \
.filter(Package.state==u'active')
out['overall_statistics']['added'] = packages.count()
@ -110,8 +111,6 @@ def _get_source_status(source, detailed=True):
return out
def _source_as_dict(source, detailed=True):
out = source.as_dict()
out['jobs'] = []
@ -153,42 +152,6 @@ def _object_as_dict(obj):
return out
def _url_exists(url):
new_url = _normalize_url(url)
existing_sources = get_harvest_sources()
for existing_source in existing_sources:
existing_url = _normalize_url(existing_source['url'])
if existing_url == new_url and existing_source['active'] == True:
return existing_source
return False
def _normalize_url(url):
o = urlparse.urlparse(url)
# Normalize port
if ':' in o.netloc:
parts = o.netloc.split(':')
if (o.scheme == 'http' and parts[1] == '80') or \
(o.scheme == 'https' and parts[1] == '443'):
netloc = parts[0]
else:
netloc = ':'.join(parts)
else:
netloc = o.netloc
# Remove trailing slash
path = o.path.rstrip('/')
check_url = urlparse.urlunparse((
o.scheme,
netloc,
path,
None,None,None))
return check_url
def _prettify(field_name):
field_name = re.sub('(?<!\w)[Uu]rl(?!\w)', 'URL', field_name.replace('_', ' ').capitalize())
return field_name.replace('_', ' ')
@ -226,11 +189,14 @@ def create_harvest_source(data_dict):
source.url = data['url']
source.type = data['type']
opt = ['active','description','user_id','publisher_id','config']
opt = ['active','title','description','user_id','publisher_id','config']
for o in opt:
if o in data and data[o] is not None:
source.__setattr__(o,data[o])
if 'active' in data_dict:
source.active = data['active']
source.save()
return _source_as_dict(source)
@ -250,14 +216,25 @@ def edit_harvest_source(source_id,data_dict):
Session.rollback()
raise ValidationError(errors,_error_summary(errors))
fields = ['url','type','active','description','user_id','publisher_id']
fields = ['url','title','type','description','user_id','publisher_id']
for f in fields:
if f in data_dict and data_dict[f] is not None and data_dict[f] != '':
source.__setattr__(f,data_dict[f])
if f in data and data[f] is not None:
source.__setattr__(f,data[f])
source.config = data_dict['config']
if 'active' in data_dict:
source.active = data['active']
if 'config' in data_dict:
source.config = data['config']
source.save()
# Abort any pending jobs
if not source.active:
jobs = HarvestJob.filter(source=source,status=u'New')
if jobs:
for job in jobs:
job.status = u'Aborted'
job.save()
return _source_as_dict(source)
@ -353,40 +330,29 @@ def import_last_objects(source_id=None):
raise Exception('This harvest source is not active')
last_objects_ids = Session.query(HarvestObject.id) \
.join(HarvestJob).join(Package) \
.filter(HarvestJob.source==source) \
.filter(HarvestObject.package!=None) \
.join(HarvestSource).join(Package) \
.filter(HarvestObject.source==source) \
.filter(HarvestObject.current==True) \
.filter(Package.state==u'active') \
.order_by(HarvestObject.guid) \
.order_by(HarvestObject.metadata_modified_date.desc()) \
.order_by(HarvestObject.gathered.desc()) \
.all()
else:
last_objects_ids = Session.query(HarvestObject.id) \
.join(Package) \
.filter(HarvestObject.package!=None) \
.filter(HarvestObject.current==True) \
.filter(Package.state==u'active') \
.order_by(HarvestObject.guid) \
.order_by(HarvestObject.metadata_modified_date.desc()) \
.order_by(HarvestObject.gathered.desc()) \
.all()
last_obj_guid = ''
imported_objects = []
last_objects = []
for obj_id in last_objects_ids:
obj = Session.query(HarvestObject).get(obj_id)
if obj.guid != last_obj_guid:
imported_objects.append(obj)
for harvester in PluginImplementations(IHarvester):
if harvester.info()['name'] == obj.job.source.type:
if harvester.info()['name'] == obj.source.type:
if hasattr(harvester,'force_import'):
harvester.force_import = True
harvester.import_stage(obj)
break
last_obj_guid = obj.guid
return imported_objects
last_objects.append(obj)
return last_objects
def create_harvest_job_all():

View File

@ -5,10 +5,11 @@ from ckan.lib.navl.validators import (ignore_missing,
not_missing
)
from ckanext.harvest.logic.validators import harvest_source_id_exists, \
harvest_source_url_validator, \
harvest_source_type_exists, \
harvest_source_config_validator
from ckanext.harvest.logic.validators import (harvest_source_id_exists,
harvest_source_url_validator,
harvest_source_type_exists,
harvest_source_config_validator,
harvest_source_active_validator,)
def default_harvest_source_schema():
@ -16,8 +17,9 @@ def default_harvest_source_schema():
'id': [ignore_missing, unicode, harvest_source_id_exists],
'url': [not_empty, unicode, harvest_source_url_validator],
'type': [not_empty, unicode, harvest_source_type_exists],
'description': [ignore_missing],
'active': [ignore_missing],
'title': [ignore_missing,unicode],
'description': [ignore_missing,unicode],
'active': [ignore_missing,harvest_source_active_validator],
'user_id': [ignore_missing],
'publisher_id': [ignore_missing],
'config': [ignore_missing,harvest_source_config_validator]

View File

@ -55,8 +55,8 @@ def harvest_source_url_validator(key,data,errors,context):
for url,active in existing_sources:
url = _normalize_url(url)
if url == new_url and active == True:
raise Invalid('There already is an active Harvest Source for this URL: %s' % data[key])
if url == new_url:
raise Invalid('There already is a Harvest Source for this URL: %s' % data[key])
return data[key]
@ -91,3 +91,11 @@ def harvest_source_config_validator(key,data,errors,context):
else:
return data[key]
def harvest_source_active_validator(value,context):
if isinstance(value,basestring):
if value.lower() == 'true':
return True
else:
return False
return bool(value)

View File

@ -1,14 +1,22 @@
import logging
import datetime
from sqlalchemy import event
from sqlalchemy import distinct
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.orm import backref, relation
from ckan import model
from ckan.model.meta import *
from ckan.model.meta import (metadata, mapper, Session,
Table, Column, ForeignKey, types)
from ckan.model.types import make_uuid
from ckan.model.core import *
from ckan.model.domain_object import DomainObject
from ckan.model.package import Package
from sqlalchemy.orm import backref, relation
log = logging.getLogger(__name__)
__all__ = [
@ -27,12 +35,34 @@ harvest_gather_error_table = None
harvest_object_error_table = None
def setup():
if harvest_source_table is None:
create_harvester_tables()
define_harvester_tables()
log.debug('Harvest tables defined in memory')
if model.repo.are_tables_created():
metadata.create_all()
if not harvest_source_table.exists():
# Create each table individually rather than
# using metadata.create_all()
harvest_source_table.create()
harvest_job_table.create()
harvest_object_table.create()
harvest_gather_error_table.create()
harvest_object_error_table.create()
log.debug('Harvest tables created')
else:
from ckan.model.meta import engine
log.debug('Harvest tables already exist')
# Check if existing tables need to be updated
inspector = Inspector.from_engine(engine)
columns = inspector.get_columns('harvest_source')
if not 'title' in [column['name'] for column in columns]:
log.debug('Harvest tables need to be updated')
migrate_v2()
else:
log.debug('Harvest table creation deferred')
@ -46,20 +76,20 @@ class HarvestDomainObject(DomainObject):
key_attr = 'id'
@classmethod
def get(self, key, default=None, attr=None):
def get(cls, key, default=None, attr=None):
'''Finds a single entity in the register.'''
if attr == None:
attr = self.key_attr
attr = cls.key_attr
kwds = {attr: key}
o = self.filter(**kwds).first()
o = cls.filter(**kwds).first()
if o:
return o
else:
return default
@classmethod
def filter(self, **kwds):
query = Session.query(self).autoflush(False)
def filter(cls, **kwds):
query = Session.query(cls).autoflush(False)
return query.filter_by(**kwds)
@ -91,10 +121,6 @@ class HarvestObject(HarvestDomainObject):
'''
@property
def source(self):
return self.job.source
class HarvestGatherError(HarvestDomainObject):
'''Gather errors are raised during the **gather** stage of a harvesting
job.
@ -107,7 +133,19 @@ class HarvestObjectError(HarvestDomainObject):
'''
pass
def create_harvester_tables():
def harvest_object_before_insert_listener(mapper,connection,target):
'''
For compatibility with old harvesters, check if the source id has
been set, and set it automatically from the job if not.
'''
if not target.harvest_source_id or not target.source:
if not target.job:
raise Exception('You must define a Harvest Job for each Harvest Object')
target.source = target.job.source
target.harvest_source_id = target.job.source.id
def define_harvester_tables():
global harvest_source_table
global harvest_job_table
@ -118,9 +156,10 @@ def create_harvester_tables():
harvest_source_table = Table('harvest_source', metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('url', types.UnicodeText, nullable=False),
Column('title', types.UnicodeText, default=u''),
Column('description', types.UnicodeText, default=u''),
Column('config', types.UnicodeText, default=u''),
Column('created', DateTime, default=datetime.datetime.utcnow),
Column('created', types.DateTime, default=datetime.datetime.utcnow),
Column('type',types.UnicodeText,nullable=False),
Column('active',types.Boolean,default=True),
Column('user_id', types.UnicodeText, default=u''),
@ -129,23 +168,25 @@ def create_harvester_tables():
# Was harvesting_job
harvest_job_table = Table('harvest_job', metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('created', DateTime, default=datetime.datetime.utcnow),
Column('gather_started', DateTime),
Column('gather_finished', DateTime),
Column('created', types.DateTime, default=datetime.datetime.utcnow),
Column('gather_started', types.DateTime),
Column('gather_finished', types.DateTime),
Column('source_id', types.UnicodeText, ForeignKey('harvest_source.id')),
Column('status', types.UnicodeText, default=u'New', nullable=False),
)
# Was harvested_document
harvest_object_table = Table('harvest_object', metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('guid', types.UnicodeText, default=''),
Column('gathered', DateTime, default=datetime.datetime.utcnow),
Column('fetch_started', DateTime),
Column('guid', types.UnicodeText, default=u''),
Column('current',types.Boolean,default=False),
Column('gathered', types.DateTime, default=datetime.datetime.utcnow),
Column('fetch_started', types.DateTime),
Column('content', types.UnicodeText, nullable=True),
Column('fetch_finished', DateTime),
Column('metadata_modified_date', DateTime),
Column('fetch_finished', types.DateTime),
Column('metadata_modified_date', types.DateTime),
Column('retry_times',types.Integer),
Column('harvest_job_id', types.UnicodeText, ForeignKey('harvest_job.id')),
Column('harvest_source_id', types.UnicodeText, ForeignKey('harvest_source.id')),
Column('package_id', types.UnicodeText, ForeignKey('package.id'), nullable=True),
)
# New table
@ -153,7 +194,7 @@ def create_harvester_tables():
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('harvest_job_id', types.UnicodeText, ForeignKey('harvest_job.id')),
Column('message', types.UnicodeText),
Column('created', DateTime, default=datetime.datetime.utcnow),
Column('created', types.DateTime, default=datetime.datetime.utcnow),
)
# New table
harvest_object_error_table = Table('harvest_object_error',metadata,
@ -161,7 +202,7 @@ def create_harvester_tables():
Column('harvest_object_id', types.UnicodeText, ForeignKey('harvest_object.id')),
Column('message',types.UnicodeText),
Column('stage', types.UnicodeText),
Column('created', DateTime, default=datetime.datetime.utcnow),
Column('created', types.DateTime, default=datetime.datetime.utcnow),
)
mapper(
@ -196,6 +237,12 @@ def create_harvester_tables():
lazy=True,
backref=u'objects',
),
'source': relation(
HarvestSource,
lazy=True,
backref=u'objects',
),
},
)
@ -220,3 +267,46 @@ def create_harvester_tables():
),
},
)
event.listen(HarvestObject, 'before_insert', harvest_object_before_insert_listener)
def migrate_v2():
log.debug('Migrating harvest tables to v2. This may take a while...')
conn = Session.connection()
statements = '''
ALTER TABLE harvest_source ADD COLUMN title text;
ALTER TABLE harvest_object ADD COLUMN current boolean;
ALTER TABLE harvest_object ADD COLUMN harvest_source_id text;
ALTER TABLE harvest_object ADD CONSTRAINT harvest_object_harvest_source_id_fkey FOREIGN KEY (harvest_source_id) REFERENCES harvest_source(id);
UPDATE harvest_object o SET harvest_source_id = j.source_id FROM harvest_job j WHERE o.harvest_job_id = j.id;
'''
conn.execute(statements)
# Flag current harvest_objects
guids = Session.query(distinct(HarvestObject.guid)) \
.join(Package) \
.filter(HarvestObject.package!=None) \
.filter(Package.state==u'active')
update_statement = '''
UPDATE harvest_object
SET current = TRUE
WHERE id = (
SELECT o.id
FROM harvest_object o JOIN package p ON p.id = o.package_id
WHERE o.package_id IS NOT null AND p.state = 'active'
AND o.guid = '%s'
ORDER BY metadata_modified_date DESC, fetch_finished DESC, gathered DESC
LIMIT 1)
'''
for guid in guids:
conn.execute(update_statement % guid)
conn.execute('UPDATE harvest_object SET current = FALSE WHERE current IS NOT TRUE')
Session.commit()
log.info('Harvest tables migrated to v2')

View File

@ -58,3 +58,12 @@ body.index.ViewController #content {
vertical-align: middle;
margin: 0 5px;
}
.source-state-active{
font-weight:bold;
}
.source-state-inactive{
font-weight:bold;
color: red;
}

View File

@ -5,6 +5,8 @@
<py:def function="page_title">Harvesting Sources</py:def>
<py:def function="body_class">harvest</py:def>
<py:def function="optional_head">
<link type="text/css" rel="stylesheet" media="all" href="/ckanext/harvest/style.css" />
</py:def>

View File

@ -35,11 +35,32 @@
</py:for>
</ul>
</dd>
<dt class="harvest-source-title"><label class="field_req" for="title">Title</label></dt>
<dd class="harvest-source-title"><input id="title" name="title" size="80" type="text" value="${data.get('title', '')}" /></dd>
<dd class="harvest-source-title field_error" py:if="errors.get('title', '')">${errors.get('title', '')}</dd>
<dd class="harvest-source-title instructions basic">This will be shown as the datasets source.</dd>
<dt><label class="field_opt" for="description">Description</label></dt>
<dd><textarea id="description" name="description" cols="30" rows="2" style="height:75px">${data.get('description', '')}</textarea></dd>
<dd class="instructions basic">You can add your own notes here about what the URL above represents to remind you later.</dd>
<dt><label class="field_opt" for="config">Configuration</label></dt>
<dd><textarea id="config" name="config" cols="30" rows="2" style="height:75px">${data.get('config', '')}</textarea></dd>
<dt><label class="field_opt" for="active">State</label></dt>
<dd>
<select id="active" name="active">
<option py:attrs="{'selected': 'selected' if data.get('active') or not 'active' in data else None}" value="True">active</option>
<option py:attrs="{'selected': 'selected' if 'active' in data and not data.get('active') else None}" value="False">inactive</option>
</select>
<py:if test="data.get('active') or not 'active' in data">
<div>This harvest source is <span class="source-state-active">Active</span></div>
</py:if>
<py:if test="'active' in data and not data.get('active')">
<div>This harvest source is <span class="source-state-inactive">Inactive</span></div>
</py:if>
</dd>
</dl>
</fieldset>
<input id="save" name="save" value="Save" type="submit" /> or <a href="/harvest">Return to the harvest sources list</a>

View File

@ -33,6 +33,11 @@
<th>Active</th>
<td>${c.source.active}</td>
</tr>
<tr py:if="c.source.title">
<th>Title</th>
<td>${c.source.title}</td>
</tr>
<tr>
<th>Description</th>
<td>${c.source.description}</td>