2012-08-02 19:41:59 +02:00
import hashlib
2014-06-27 17:54:39 +02:00
import json
2012-08-02 19:41:59 +02:00
2012-02-29 16:20:35 +01:00
import logging
2012-10-29 18:15:02 +01:00
import datetime
2012-02-29 16:20:35 +01:00
2013-03-28 16:36:44 +01:00
from pylons import config
2014-05-15 17:30:30 +02:00
from sqlalchemy import and_ , or_
2012-12-13 17:33:44 +01:00
2013-01-22 17:43:25 +01:00
from ckan . lib . search . index import PackageSearchIndex
2012-02-29 16:20:35 +01:00
from ckan . plugins import PluginImplementations
2012-10-29 18:15:02 +01:00
from ckan . logic import get_action
2012-02-29 16:20:35 +01:00
from ckanext . harvest . interfaces import IHarvester
2013-03-28 16:36:44 +01:00
from ckan . lib . search . common import SearchIndexError , make_connection
2012-02-29 16:20:35 +01:00
from ckan . model import Package
2012-11-30 15:03:04 +01:00
from ckan import logic
2015-03-19 13:48:46 +01:00
from ckan . plugins import toolkit
2012-02-29 16:20:35 +01:00
2012-11-30 15:03:04 +01:00
from ckan . logic import NotFound , check_access
2012-02-29 16:20:35 +01:00
2012-11-30 15:03:04 +01:00
from ckanext . harvest . plugin import DATASET_TYPE_NAME
2013-04-21 18:04:57 +02:00
from ckanext . harvest . queue import get_gather_publisher , resubmit_jobs
2012-02-29 16:20:35 +01:00
2018-06-11 13:38:28 +02:00
from ckanext . harvest . model import HarvestSource , HarvestJob , HarvestObject
2015-10-28 22:58:36 +01:00
from ckanext . harvest . logic import HarvestJobExists
2015-10-28 18:51:58 +01:00
from ckanext . harvest . logic . dictization import harvest_job_dictize
2012-02-29 16:20:35 +01:00
2015-11-03 21:30:11 +01:00
from ckanext . harvest . logic . action . get import (
harvest_source_show , harvest_job_list , _get_sources_for_user )
2012-02-29 16:20:35 +01:00
2016-06-02 17:32:08 +02:00
import ckan . lib . mailer as mailer
2018-06-11 13:38:28 +02:00
from itertools import islice
2016-06-02 17:32:08 +02:00
2012-02-29 16:20:35 +01:00
log = logging . getLogger ( __name__ )
2015-11-03 21:30:11 +01:00
def harvest_source_update ( context , data_dict ) :
2012-11-30 15:03:04 +01:00
'''
Updates an existing harvest source
2015-11-13 14:45:56 +01:00
This method just proxies the request to package_update , which will create a
harvest_source dataset type and the HarvestSource object . All auth checks
and validation will be done there . We only make sure to set the dataset
type
2012-11-30 15:03:04 +01:00
2015-11-13 14:45:56 +01:00
Note that the harvest source type ( ckan , waf , csw , etc ) is now set via the
source_type field .
2012-11-30 15:03:04 +01:00
: param id : the name or id of the harvest source to update
: type id : string
: param url : the URL for the harvest source
: type url : string
: param name : the name of the new harvest source , must be between 2 and 100
characters long and contain only lowercase alphanumeric characters
: type name : string
: param title : the title of the dataset ( optional , default : same as
` ` name ` ` )
: type title : string
: param notes : a description of the harvest source ( optional )
: type notes : string
: param source_type : the harvester type for this source . This must be one
of the registerd harvesters , eg ' ckan ' , ' csw ' , etc .
: type source_type : string
: param frequency : the frequency in wich this harvester should run . See
` ` ckanext . harvest . model ` ` source for possible values . Default is
' MANUAL '
: type frequency : string
: param config : extra configuration options for the particular harvester
type . Should be a serialized as JSON . ( optional )
: type config : string
: returns : the newly created harvest source
: rtype : dictionary
'''
log . info ( ' Updating harvest source: %r ' , data_dict )
2012-03-06 17:01:43 +01:00
2012-11-30 15:03:04 +01:00
data_dict [ ' type ' ] = DATASET_TYPE_NAME
2012-02-29 16:20:35 +01:00
2012-11-30 15:03:04 +01:00
context [ ' extras_as_string ' ] = True
2014-09-29 13:43:37 +02:00
source = logic . get_action ( ' package_update ' ) ( context , data_dict )
2012-02-29 16:20:35 +01:00
2012-11-30 15:03:04 +01:00
return source
2012-02-29 16:20:35 +01:00
2015-10-28 18:51:58 +01:00
def harvest_source_clear ( context , data_dict ) :
2013-03-25 12:39:00 +01:00
'''
2015-10-28 18:51:58 +01:00
Clears all datasets , jobs and objects related to a harvest source , but
keeps the source itself . This is useful to clean history of long running
harvest sources to start again fresh .
2013-03-25 12:39:00 +01:00
: param id : the id of the harvest source to clear
: type id : string
'''
2015-11-03 21:30:11 +01:00
check_access ( ' harvest_source_clear ' , context , data_dict )
harvest_source_id = data_dict . get ( ' id ' )
2013-03-25 12:39:00 +01:00
source = HarvestSource . get ( harvest_source_id )
if not source :
log . error ( ' Harvest source %s does not exist ' , harvest_source_id )
raise NotFound ( ' Harvest source %s does not exist ' % harvest_source_id )
2013-03-28 16:36:44 +01:00
harvest_source_id = source . id
2013-05-16 18:33:39 +02:00
# Clear all datasets from this source from the index
harvest_source_index_clear ( context , data_dict )
2014-09-12 10:49:29 +02:00
model = context [ ' model ' ]
2016-02-04 14:40:02 +01:00
# CKAN-2.6 or above: related don't exist any more
if toolkit . check_ckan_version ( max_version = ' 2.5.99 ' ) :
sql = ''' select id from related where id in (
select related_id from related_dataset where dataset_id in (
select package_id from harvest_object
where harvest_source_id = ' {harvest_source_id} ' ) ) ; ''' .format(
harvest_source_id = harvest_source_id )
result = model . Session . execute ( sql )
ids = [ ]
for row in result :
ids . append ( row [ 0 ] )
related_ids = " ( ' " + " ' , ' " . join ( ids ) + " ' ) "
2013-05-16 18:33:39 +02:00
2015-11-03 21:30:11 +01:00
sql = ''' begin;
update package set state = ' to_delete ' where id in (
select package_id from harvest_object
where harvest_source_id = ' {harvest_source_id} ' ) ; ''' .format(
2015-02-23 18:02:21 +01:00
harvest_source_id = harvest_source_id )
# CKAN-2.3 or above: delete resource views, resource revisions & resources
2015-03-19 13:48:46 +01:00
if toolkit . check_ckan_version ( min_version = ' 2.3 ' ) :
2015-02-23 18:02:21 +01:00
sql + = '''
2015-11-03 21:30:11 +01:00
delete from resource_view where resource_id in (
select id from resource where package_id in (
select id from package where state = ' to_delete ' ) ) ;
delete from resource_revision where package_id in (
select id from package where state = ' to_delete ' ) ;
delete from resource where package_id in (
select id from package where state = ' to_delete ' ) ;
2015-02-23 18:02:21 +01:00
'''
# Backwards-compatibility: support ResourceGroup (pre-CKAN-2.3)
else :
sql + = '''
2015-11-03 21:30:11 +01:00
delete from resource_revision where resource_group_id in (
select id from resource_group where package_id in (
select id from package where state = ' to_delete ' ) ) ;
delete from resource where resource_group_id in (
select id from resource_group where package_id in (
select id from package where state = ' to_delete ' ) ) ;
delete from resource_group_revision where package_id in (
select id from package where state = ' to_delete ' ) ;
delete from resource_group where package_id in (
select id from package where state = ' to_delete ' ) ;
2015-02-23 18:02:21 +01:00
'''
2015-08-19 04:25:20 +02:00
# CKAN pre-2.5: authz models were removed in migration 078
2015-08-19 10:41:42 +02:00
if toolkit . check_ckan_version ( max_version = ' 2.4.99 ' ) :
2015-08-19 04:25:20 +02:00
sql + = '''
2015-11-03 21:30:11 +01:00
delete from package_role where package_id in (
select id from package where state = ' to_delete ' ) ;
delete from user_object_role where id not in (
select user_object_role_id from package_role )
and context = ' Package ' ;
2015-08-19 04:25:20 +02:00
'''
2015-02-23 18:02:21 +01:00
sql + = '''
2015-11-03 21:30:11 +01:00
delete from harvest_object_error where harvest_object_id in (
select id from harvest_object
where harvest_source_id = ' {harvest_source_id} ' ) ;
delete from harvest_object_extra where harvest_object_id in (
select id from harvest_object
where harvest_source_id = ' {harvest_source_id} ' ) ;
2013-03-25 12:39:00 +01:00
delete from harvest_object where harvest_source_id = ' {harvest_source_id} ' ;
2015-11-03 21:30:11 +01:00
delete from harvest_gather_error where harvest_job_id in (
select id from harvest_job where source_id = ' {harvest_source_id} ' ) ;
2013-03-25 12:39:00 +01:00
delete from harvest_job where source_id = ' {harvest_source_id} ' ;
2015-11-03 21:30:11 +01:00
delete from package_tag_revision where package_id in (
select id from package where state = ' to_delete ' ) ;
delete from member_revision where table_id in (
select id from package where state = ' to_delete ' ) ;
delete from package_extra_revision where package_id in (
select id from package where state = ' to_delete ' ) ;
delete from package_revision where id in (
select id from package where state = ' to_delete ' ) ;
delete from package_tag where package_id in (
select id from package where state = ' to_delete ' ) ;
delete from package_extra where package_id in (
select id from package where state = ' to_delete ' ) ;
delete from package_relationship_revision where subject_package_id in (
select id from package where state = ' to_delete ' ) ;
delete from package_relationship_revision where object_package_id in (
select id from package where state = ' to_delete ' ) ;
delete from package_relationship where subject_package_id in (
select id from package where state = ' to_delete ' ) ;
delete from package_relationship where object_package_id in (
select id from package where state = ' to_delete ' ) ;
delete from member where table_id in (
select id from package where state = ' to_delete ' ) ;
2016-02-04 14:40:02 +01:00
''' .format(
harvest_source_id = harvest_source_id )
2014-12-12 14:10:40 +01:00
2016-02-04 14:40:02 +01:00
if toolkit . check_ckan_version ( max_version = ' 2.5.99 ' ) :
sql + = '''
delete from related_dataset where dataset_id in (
select id from package where state = ' to_delete ' ) ;
delete from related where id in { related_ids } ;
delete from package where id in (
select id from package where state = ' to_delete ' ) ;
''' .format(related_ids=related_ids)
else :
# CKAN-2.6 or above: related don't exist any more
sql + = '''
delete from package where id in (
select id from package where state = ' to_delete ' ) ;
'''
2014-12-12 14:10:40 +01:00
2016-02-04 14:40:02 +01:00
sql + = '''
commit ;
'''
2013-03-25 12:39:00 +01:00
model . Session . execute ( sql )
2013-05-16 18:33:39 +02:00
# Refresh the index for this source to update the status object
2014-06-27 17:39:02 +02:00
get_action ( ' harvest_source_reindex ' ) ( context , { ' id ' : harvest_source_id } )
2013-03-28 16:36:44 +01:00
2013-03-25 12:39:00 +01:00
return { ' id ' : harvest_source_id }
2012-02-29 16:20:35 +01:00
2015-11-03 21:30:11 +01:00
2016-10-28 11:29:27 +02:00
def harvest_sources_job_history_clear ( context , data_dict ) :
'''
Clears the history for all active harvest sources . All jobs and objects related to a harvest source will
be cleared , but keeps the source itself .
This is useful to clean history of long running harvest sources to start again fresh .
The datasets imported from the harvest source will NOT be deleted ! ! !
'''
check_access ( ' harvest_sources_clear ' , context , data_dict )
job_history_clear_results = [ ]
# We assume that the maximum of 1000 (hard limit) rows should be enough
2016-11-11 18:11:28 +01:00
result = logic . get_action ( ' package_search ' ) ( context , { ' fq ' : ' +dataset_type:harvest ' , ' rows ' : 1000 } )
2016-10-28 11:29:27 +02:00
harvest_packages = result [ ' results ' ]
if harvest_packages :
for data_dict in harvest_packages :
2016-11-15 23:36:11 +01:00
try :
clear_result = get_action ( ' harvest_source_job_history_clear ' ) ( context , { ' id ' : data_dict [ ' id ' ] } )
job_history_clear_results . append ( clear_result )
except NotFound :
# Ignoring not existent harvest sources because of a possibly corrupt search index
# Logging was already done in called function
pass
2016-10-28 11:29:27 +02:00
return job_history_clear_results
def harvest_source_job_history_clear ( context , data_dict ) :
'''
Clears all jobs and objects related to a harvest source , but keeps the source itself .
This is useful to clean history of long running harvest sources to start again fresh .
The datasets imported from the harvest source will NOT be deleted ! ! !
: param id : the id of the harvest source to clear
: type id : string
'''
check_access ( ' harvest_source_clear ' , context , data_dict )
harvest_source_id = data_dict . get ( ' id ' , None )
source = HarvestSource . get ( harvest_source_id )
if not source :
log . error ( ' Harvest source %s does not exist ' , harvest_source_id )
raise NotFound ( ' Harvest source %s does not exist ' % harvest_source_id )
harvest_source_id = source . id
model = context [ ' model ' ]
sql = ''' begin;
delete from harvest_object_error where harvest_object_id in ( select id from harvest_object where harvest_source_id = ' {harvest_source_id} ' ) ;
delete from harvest_object_extra where harvest_object_id in ( select id from harvest_object where harvest_source_id = ' {harvest_source_id} ' ) ;
delete from harvest_object where harvest_source_id = ' {harvest_source_id} ' ;
delete from harvest_gather_error where harvest_job_id in ( select id from harvest_job where source_id = ' {harvest_source_id} ' ) ;
delete from harvest_job where source_id = ' {harvest_source_id} ' ;
commit ;
''' .format(harvest_source_id=harvest_source_id)
model . Session . execute ( sql )
# Refresh the index for this source to update the status object
get_action ( ' harvest_source_reindex ' ) ( context , { ' id ' : harvest_source_id } )
return { ' id ' : harvest_source_id }
2015-11-03 21:30:11 +01:00
def harvest_source_index_clear ( context , data_dict ) :
2015-10-28 18:51:58 +01:00
'''
Clears all datasets , jobs and objects related to a harvest source , but
keeps the source itself . This is useful to clean history of long running
harvest sources to start again fresh .
: param id : the id of the harvest source to clear
: type id : string
'''
2013-03-28 16:36:44 +01:00
2015-11-03 21:30:11 +01:00
check_access ( ' harvest_source_clear ' , context , data_dict )
harvest_source_id = data_dict . get ( ' id ' )
2013-03-28 16:36:44 +01:00
source = HarvestSource . get ( harvest_source_id )
if not source :
log . error ( ' Harvest source %s does not exist ' , harvest_source_id )
raise NotFound ( ' Harvest source %s does not exist ' % harvest_source_id )
harvest_source_id = source . id
conn = make_connection ( )
2015-11-03 21:30:11 +01:00
query = ''' + %s : " %s " +site_id: " %s " ''' % (
' harvest_source_id ' , harvest_source_id , config . get ( ' ckan.site_id ' ) )
2016-05-10 13:14:35 +02:00
solr_commit = toolkit . asbool ( config . get ( ' ckan.search.solr_commit ' , ' true ' ) )
if toolkit . check_ckan_version ( max_version = ' 2.5.99 ' ) :
# conn is solrpy
try :
conn . delete_query ( query )
if solr_commit :
conn . commit ( )
except Exception , e :
log . exception ( e )
raise SearchIndexError ( e )
finally :
conn . close ( )
else :
# conn is pysolr
try :
conn . delete ( q = query , commit = solr_commit )
except Exception , e :
log . exception ( e )
raise SearchIndexError ( e )
2013-03-28 16:36:44 +01:00
return { ' id ' : harvest_source_id }
2012-02-29 16:20:35 +01:00
2015-10-28 18:51:58 +01:00
def harvest_objects_import ( context , data_dict ) :
2012-02-29 16:20:35 +01:00
'''
2015-10-28 18:51:58 +01:00
Reimports the existing harvest objects , specified by either source_id ,
harvest_object_id or package_id .
It performs the import stage with the last fetched objects , optionally
belonging to a certain source .
Please note that no objects will be fetched from the remote server .
It will only affect the last fetched objects already present in the
database .
: param source_id : the id of the harvest source to import
: type source_id : string
2015-12-08 17:17:39 +01:00
: param guid : the guid of the harvest object to import
: type guid : string
2015-10-28 18:51:58 +01:00
: param harvest_object_id : the id of the harvest object to import
: type harvest_object_id : string
: param package_id : the id or name of the package to import
: type package_id : string
2012-02-29 16:20:35 +01:00
'''
2012-06-08 18:09:22 +02:00
log . info ( ' Harvest objects import: %r ' , data_dict )
2015-11-03 21:30:11 +01:00
check_access ( ' harvest_objects_import ' , context , data_dict )
2012-03-01 13:02:16 +01:00
2012-02-29 16:20:35 +01:00
model = context [ ' model ' ]
2012-03-01 13:02:16 +01:00
session = context [ ' session ' ]
2015-11-03 21:30:11 +01:00
source_id = data_dict . get ( ' source_id ' )
2015-12-08 17:17:39 +01:00
guid = data_dict . get ( ' guid ' )
2015-11-03 21:30:11 +01:00
harvest_object_id = data_dict . get ( ' harvest_object_id ' )
package_id_or_name = data_dict . get ( ' package_id ' )
2012-02-29 16:20:35 +01:00
2015-11-03 21:30:11 +01:00
segments = context . get ( ' segments ' )
2012-08-02 19:41:59 +02:00
2015-11-03 21:30:11 +01:00
join_datasets = context . get ( ' join_datasets ' , True )
2012-07-30 13:11:55 +02:00
2015-12-08 17:17:39 +01:00
if guid :
last_objects_ids = \
session . query ( HarvestObject . id ) \
. filter ( HarvestObject . guid == guid ) \
. filter ( HarvestObject . current == True )
elif source_id :
2012-02-29 16:20:35 +01:00
source = HarvestSource . get ( source_id )
if not source :
2012-06-08 18:09:22 +02:00
log . error ( ' Harvest source %s does not exist ' , source_id )
2012-02-29 16:20:35 +01:00
raise NotFound ( ' Harvest source %s does not exist ' % source_id )
if not source . active :
2012-06-08 18:09:22 +02:00
log . warn ( ' Harvest source %s is not active. ' , source_id )
2012-02-29 16:20:35 +01:00
raise Exception ( ' This harvest source is not active ' )
2015-11-03 21:30:11 +01:00
last_objects_ids = \
session . query ( HarvestObject . id ) \
. join ( HarvestSource ) \
. filter ( HarvestObject . source == source ) \
. filter ( HarvestObject . current == True )
2012-07-30 13:11:55 +02:00
2014-05-15 17:30:30 +02:00
elif harvest_object_id :
2015-11-03 21:30:11 +01:00
last_objects_ids = \
session . query ( HarvestObject . id ) \
. filter ( HarvestObject . id == harvest_object_id )
2014-05-15 17:30:30 +02:00
elif package_id_or_name :
2015-11-03 21:30:11 +01:00
last_objects_ids = \
session . query ( HarvestObject . id ) \
. join ( Package ) \
. filter ( HarvestObject . current == True ) \
. filter ( Package . state == u ' active ' ) \
. filter ( or_ ( Package . id == package_id_or_name ,
Package . name == package_id_or_name ) )
2014-05-15 17:30:30 +02:00
join_datasets = False
2012-02-29 16:20:35 +01:00
else :
2015-11-03 21:30:11 +01:00
last_objects_ids = \
session . query ( HarvestObject . id ) \
. filter ( HarvestObject . current == True )
2012-07-30 13:11:55 +02:00
if join_datasets :
last_objects_ids = last_objects_ids . join ( Package ) \
2015-11-03 21:30:11 +01:00
. filter ( Package . state == u ' active ' )
2012-07-30 13:11:55 +02:00
last_objects_ids = last_objects_ids . all ( )
2012-02-29 16:20:35 +01:00
2012-08-09 12:17:41 +02:00
last_objects_count = 0
2012-08-02 19:41:59 +02:00
2012-02-29 16:20:35 +01:00
for obj_id in last_objects_ids :
2015-11-03 21:30:11 +01:00
if segments and \
str ( hashlib . md5 ( obj_id [ 0 ] ) . hexdigest ( ) ) [ 0 ] not in segments :
2012-08-02 19:41:59 +02:00
continue
2012-03-01 13:02:16 +01:00
obj = session . query ( HarvestObject ) . get ( obj_id )
2012-08-02 19:41:59 +02:00
2012-02-29 16:20:35 +01:00
for harvester in PluginImplementations ( IHarvester ) :
if harvester . info ( ) [ ' name ' ] == obj . source . type :
2015-11-03 21:30:11 +01:00
if hasattr ( harvester , ' force_import ' ) :
2012-02-29 16:20:35 +01:00
harvester . force_import = True
harvester . import_stage ( obj )
break
2012-08-09 12:17:41 +02:00
last_objects_count + = 1
log . info ( ' Harvest objects imported: %s ' , last_objects_count )
return last_objects_count
2012-02-29 16:20:35 +01:00
2015-11-03 21:30:11 +01:00
def _calculate_next_run ( frequency ) :
2012-10-29 18:15:02 +01:00
now = datetime . datetime . utcnow ( )
if frequency == ' ALWAYS ' :
return now
if frequency == ' WEEKLY ' :
return now + datetime . timedelta ( weeks = 1 )
if frequency == ' BIWEEKLY ' :
return now + datetime . timedelta ( weeks = 2 )
if frequency == ' DAILY ' :
return now + datetime . timedelta ( days = 1 )
if frequency == ' MONTHLY ' :
2015-11-03 21:30:11 +01:00
if now . month in ( 4 , 6 , 9 , 11 ) :
2012-10-29 18:15:02 +01:00
days = 30
elif now . month == 2 :
if now . year % 4 == 0 :
days = 29
else :
days = 28
else :
days = 31
return now + datetime . timedelta ( days = days )
raise Exception ( ' Frequency {freq} not recognised ' . format ( freq = frequency ) )
def _make_scheduled_jobs ( context , data_dict ) :
data_dict = { ' only_to_run ' : True ,
' only_active ' : True }
sources = _get_sources_for_user ( context , data_dict )
for source in sources :
2015-10-28 22:58:36 +01:00
data_dict = { ' source_id ' : source . id , ' run ' : True }
2012-10-29 18:15:02 +01:00
try :
get_action ( ' harvest_job_create ' ) ( context , data_dict )
2015-11-03 21:30:11 +01:00
except HarvestJobExists :
2015-11-04 10:44:05 +01:00
log . info ( ' Trying to rerun job for %s skipping ' , source . id )
2012-10-29 18:15:02 +01:00
2015-11-03 21:30:11 +01:00
source . next_run = _calculate_next_run ( source . frequency )
2012-10-29 18:15:02 +01:00
source . save ( )
2015-11-03 21:30:11 +01:00
def harvest_jobs_run ( context , data_dict ) :
2015-10-28 22:58:36 +01:00
'''
Runs scheduled jobs , checks if any jobs need marking as finished , and
resubmits queue items if needed .
This should be called every few minutes ( e . g . by a cron ) , or else jobs
will never show as finished .
This used to also ' run ' new jobs created by the web UI , putting them onto
the gather queue , but now this is done by default when you create a job . If
you need to send do this explicitly , then use
` ` harvest_send_job_to_gather_queue ` ` .
: param source_id : the id of the harvest source , if you just want to check
for its finished jobs ( optional )
2015-11-04 10:44:05 +01:00
: type source_id : string
2015-10-28 22:58:36 +01:00
'''
2012-06-08 18:09:22 +02:00
log . info ( ' Harvest job run: %r ' , data_dict )
2015-11-03 21:30:11 +01:00
check_access ( ' harvest_jobs_run ' , context , data_dict )
2012-03-01 13:02:16 +01:00
2012-12-13 17:33:44 +01:00
session = context [ ' session ' ]
2015-11-03 21:30:11 +01:00
source_id = data_dict . get ( ' source_id ' )
2012-03-02 17:49:39 +01:00
2015-10-28 22:58:36 +01:00
# Scheduled jobs
2012-10-29 18:15:02 +01:00
if not source_id :
_make_scheduled_jobs ( context , data_dict )
2012-12-13 19:20:49 +01:00
context [ ' return_objects ' ] = False
2012-12-13 17:33:44 +01:00
# Flag finished jobs as such
2015-11-03 21:30:11 +01:00
jobs = harvest_job_list (
context , { ' source_id ' : source_id , ' status ' : u ' Running ' } )
2012-12-13 17:33:44 +01:00
if len ( jobs ) :
for job in jobs :
if job [ ' gather_finished ' ] :
2015-11-02 17:59:19 +01:00
num_objects_in_progress = \
session . query ( HarvestObject . id ) \
2015-11-03 21:30:11 +01:00
. filter ( HarvestObject . harvest_job_id == job [ ' id ' ] ) \
. filter ( and_ ( ( HarvestObject . state != u ' COMPLETE ' ) ,
( HarvestObject . state != u ' ERROR ' ) ) ) \
2015-11-02 17:59:19 +01:00
. count ( )
if num_objects_in_progress == 0 :
2012-12-13 17:33:44 +01:00
job_obj = HarvestJob . get ( job [ ' id ' ] )
job_obj . status = u ' Finished '
2015-11-02 18:29:45 +01:00
log . info ( ' Marking job as finished %s %s ' ,
job_obj . source . url , job_obj . id )
2013-01-28 18:19:28 +01:00
2015-11-04 10:50:00 +01:00
# save the time of finish, according to the last running
# object
2013-01-28 18:19:28 +01:00
last_object = session . query ( HarvestObject ) \
2015-11-03 21:30:11 +01:00
. filter ( HarvestObject . harvest_job_id == job [ ' id ' ] ) \
. filter ( HarvestObject . import_finished != None ) \
. order_by ( HarvestObject . import_finished . desc ( ) ) \
. first ( )
2013-01-28 18:19:28 +01:00
if last_object :
job_obj . finished = last_object . import_finished
2014-09-10 08:13:53 +02:00
else :
2014-09-10 09:33:13 +02:00
job_obj . finished = job [ ' gather_finished ' ]
2012-12-13 17:33:44 +01:00
job_obj . save ( )
2015-11-02 17:59:19 +01:00
2012-12-14 13:39:01 +01:00
# Reindex the harvest source dataset so it has the latest
# status
2015-11-03 21:30:11 +01:00
get_action ( ' harvest_source_reindex ' ) (
context , { ' id ' : job_obj . source . id } )
2016-06-02 17:32:08 +02:00
2018-06-11 13:38:28 +02:00
status = get_action ( ' harvest_source_show_status ' ) ( context , { ' id ' : job_obj . source . id } )
if toolkit . asbool ( config . get ( ' ckan.harvest.status_mail.errored ' ) ) and ( status [ ' last_job ' ] [ ' stats ' ] [ ' errored ' ] ) :
2018-06-13 13:11:51 +02:00
send_error_mail ( context , job_obj . source . id , status )
2015-10-28 22:58:36 +01:00
else :
2015-11-04 10:44:05 +01:00
log . debug ( ' Ongoing job: %s source: %s ' ,
job [ ' id ' ] , job [ ' source_id ' ] )
2012-12-14 13:39:01 +01:00
2013-04-21 18:04:57 +02:00
# resubmit old redis tasks
resubmit_jobs ( )
2012-12-13 17:33:44 +01:00
2015-10-28 22:58:36 +01:00
return [ ] # merely for backwards compatibility
2018-06-11 13:38:28 +02:00
def send_error_mail ( context , source_id , status ) :
2016-06-02 17:32:08 +02:00
2018-06-11 13:38:28 +02:00
last_job = status [ ' last_job ' ]
source = get_action ( ' harvest_source_show ' ) ( context , { ' id ' : source_id } )
2016-06-02 17:32:08 +02:00
ckan_site_url = config . get ( ' ckan.site_url ' )
2018-06-11 13:38:28 +02:00
job_url = toolkit . url_for ( ' harvest_job_show ' , source = source [ ' id ' ] , id = last_job [ ' id ' ] )
2016-06-02 17:32:08 +02:00
msg = toolkit . _ ( ' This is a failure-notification of the latest harvest job ( {0} ) set-up in {1} . ' ) . format ( job_url , ckan_site_url )
msg + = ' \n \n '
2018-06-11 13:38:28 +02:00
msg + = toolkit . _ ( ' Harvest Source: {0} ' ) . format ( source [ ' title ' ] ) + ' \n '
if source . get ( ' config ' ) :
msg + = toolkit . _ ( ' Harvester-Configuration: {0} ' ) . format ( source [ ' config ' ] ) + ' \n '
msg + = ' \n \n '
2016-06-02 17:32:08 +02:00
2018-06-11 13:38:28 +02:00
if source [ ' organization ' ] :
msg + = toolkit . _ ( ' Organization: {0} ' ) . format ( source [ ' organization ' ] [ ' name ' ] )
2016-06-02 17:32:08 +02:00
msg + = ' \n \n '
2018-06-11 13:38:28 +02:00
msg + = toolkit . _ ( ' Harvest Job Id: {0} ' ) . format ( last_job [ ' id ' ] ) + ' \n '
msg + = toolkit . _ ( ' Created: {0} ' ) . format ( last_job [ ' created ' ] ) + ' \n '
msg + = toolkit . _ ( ' Finished: {0} ' ) . format ( last_job [ ' finished ' ] ) + ' \n \n '
2016-06-02 17:32:08 +02:00
2018-06-11 13:38:28 +02:00
report = get_action ( ' harvest_job_report ' ) ( context , { ' id ' : status [ ' last_job ' ] [ ' id ' ] } )
2016-06-02 17:32:08 +02:00
2018-06-11 13:38:28 +02:00
msg + = toolkit . _ ( ' Records in Error: {0} ' ) . format ( str ( last_job [ ' stats ' ] . get ( ' errored ' , 0 ) ) )
2016-06-02 17:32:08 +02:00
msg + = ' \n '
obj_error = ' '
job_error = ' '
2018-06-11 13:38:28 +02:00
for harvest_object_error in islice ( report . get ( ' object_errors ' ) , 0 , 20 ) :
obj_error + = harvest_object_error [ ' message ' ] + ' \n '
2016-06-02 17:32:08 +02:00
2018-06-11 13:38:28 +02:00
for harvest_gather_error in islice ( report . get ( ' gather_errors ' ) , 0 , 20 ) :
job_error + = harvest_gather_error [ ' message ' ] + ' \n '
2016-06-02 17:32:08 +02:00
if ( obj_error != ' ' or job_error != ' ' ) :
msg + = toolkit . _ ( ' Error Summary ' )
2018-06-11 13:38:28 +02:00
msg + = ' \n '
2016-06-02 17:32:08 +02:00
if ( obj_error != ' ' ) :
msg + = toolkit . _ ( ' Document Error ' )
msg + = ' \n ' + obj_error + ' \n \n '
if ( job_error != ' ' ) :
msg + = toolkit . _ ( ' Job Errors ' )
msg + = ' \n ' + job_error + ' \n \n '
if obj_error or job_error :
msg + = ' \n -- \n '
msg + = toolkit . _ ( ' You are receiving this email because you are currently set-up as Administrator for {0} . Please do not reply to this email as it was sent from a non-monitored address. ' ) . format ( config . get ( ' ckan.site_title ' ) )
2018-06-13 10:46:35 +02:00
model = context [ ' model ' ]
2016-06-02 17:32:08 +02:00
2018-06-13 10:46:35 +02:00
sysadmins = model . Session . query ( model . User ) . filter ( model . User . sysadmin == True ) . all ( )
# for recipient in email_recipients:
for recipient in sysadmins :
2016-06-02 17:32:08 +02:00
email = { ' recipient_name ' : recipient ,
2018-06-13 10:46:35 +02:00
' recipient_email ' : recipient ,
' subject ' : config . get ( ' ckan.site_title ' ) + ' - Harvesting Job - Error Notification ' ,
' body ' : msg }
2016-06-02 17:32:08 +02:00
try :
mailer . mail_recipient ( * * email )
2018-06-11 13:38:28 +02:00
except mailer . MailerException :
2016-06-02 17:32:08 +02:00
log . error ( ' Sending Harvest-Notification-Mail failed. Message: ' + msg )
2018-06-13 10:53:36 +02:00
except Exception as e :
log . error ( e )
raise
2018-06-11 13:38:28 +02:00
2016-06-02 17:32:08 +02:00
2015-10-28 22:58:36 +01:00
def harvest_send_job_to_gather_queue ( context , data_dict ) :
'''
Sends a harvest job to the gather queue .
2012-02-29 16:20:35 +01:00
2015-10-28 22:58:36 +01:00
: param id : the id of the harvest job
: type id : string
'''
log . info ( ' Send job to gather queue: %r ' , data_dict )
job_id = logic . get_or_bust ( data_dict , ' id ' )
2015-12-09 16:50:05 +01:00
job = toolkit . get_action ( ' harvest_job_show ' ) (
context , { ' id ' : job_id } )
2012-02-29 16:20:35 +01:00
2015-12-09 16:50:05 +01:00
check_access ( ' harvest_send_job_to_gather_queue ' , context , job )
2012-02-29 16:20:35 +01:00
2015-10-28 22:58:36 +01:00
# gather queue
2012-02-29 16:20:35 +01:00
publisher = get_gather_publisher ( )
2015-10-28 22:58:36 +01:00
# Check the source is active
source = harvest_source_show ( context , { ' id ' : job [ ' source_id ' ] } )
if not source [ ' active ' ] :
raise toolkit . ValidationError ( ' Source is not active ' )
job_obj = HarvestJob . get ( job [ ' id ' ] )
job_obj . status = job [ ' status ' ] = u ' Running '
job_obj . save ( )
publisher . send ( { ' harvest_job_id ' : job [ ' id ' ] } )
2015-11-04 10:44:05 +01:00
log . info ( ' Sent job %s to the gather queue ' , job [ ' id ' ] )
2015-10-28 22:58:36 +01:00
return harvest_job_dictize ( job_obj , context )
2012-02-29 16:20:35 +01:00
2014-06-27 17:39:02 +02:00
2015-10-28 18:51:58 +01:00
def harvest_job_abort ( context , data_dict ) :
'''
Aborts a harvest job . Given a harvest source_id , it looks for the latest
one and ( assuming it not already Finished ) marks it as Finished . It also
marks any of that source ' s harvest objects and (if not complete or error)
marks them " ERROR " , so any left in limbo are cleaned up . Does not actually
stop running any queued harvest fetchs / objects .
2015-12-02 08:59:08 +01:00
Specify either id or source_id .
: param id : the job id to abort , or the id or name of the harvest source
with a job to abort
: type id : string
2015-10-28 18:51:58 +01:00
: param source_id : the name or id of the harvest source with a job to abort
: type source_id : string
'''
check_access ( ' harvest_job_abort ' , context , data_dict )
model = context [ ' model ' ]
2015-12-02 08:59:08 +01:00
source_or_job_id = data_dict . get ( ' source_id ' ) or data_dict . get ( ' id ' )
if source_or_job_id :
try :
source = harvest_source_show ( context , { ' id ' : source_or_job_id } )
except NotFound :
job = get_action ( ' harvest_job_show ' ) (
context , { ' id ' : source_or_job_id } )
else :
# HarvestJob set status to 'Aborted'
# Do not use harvest_job_list since it can use a lot of memory
# Get the most recent job for the source
job = model . Session . query ( HarvestJob ) \
. filter_by ( source_id = source [ ' id ' ] ) \
. order_by ( HarvestJob . created . desc ( ) ) . first ( )
if not job :
raise NotFound ( ' Error: source has no jobs ' )
job_id = job . id
job = get_action ( ' harvest_job_show ' ) (
context , { ' id ' : job_id } )
2015-10-28 18:51:58 +01:00
if job [ ' status ' ] != ' Finished ' :
# i.e. New or Running
job_obj = HarvestJob . get ( job [ ' id ' ] )
job_obj . status = new_status = ' Finished '
model . repo . commit_and_remove ( )
log . info ( ' Harvest job changed status from " %s " to " %s " ' ,
job [ ' status ' ] , new_status )
else :
log . info ( ' Harvest job unchanged. Source %s status is: " %s " ' ,
job [ ' id ' ] , job [ ' status ' ] )
# HarvestObjects set to ERROR
job_obj = HarvestJob . get ( job [ ' id ' ] )
objs = job_obj . objects
for obj in objs :
if obj . state not in ( ' COMPLETE ' , ' ERROR ' ) :
old_state = obj . state
obj . state = ' ERROR '
log . info ( ' Harvest object changed state from " %s " to " %s " : %s ' ,
old_state , obj . state , obj . id )
else :
log . info ( ' Harvest object not changed from " %s " : %s ' ,
obj . state , obj . id )
model . repo . commit_and_remove ( )
job_obj = HarvestJob . get ( job [ ' id ' ] )
return harvest_job_dictize ( job_obj , context )
2014-06-27 17:39:02 +02:00
@logic.side_effect_free
2013-01-22 17:43:25 +01:00
def harvest_sources_reindex ( context , data_dict ) :
'''
Reindexes all harvest source datasets with the latest status
'''
log . info ( ' Reindexing all harvest sources ' )
check_access ( ' harvest_sources_reindex ' , context , data_dict )
model = context [ ' model ' ]
packages = model . Session . query ( model . Package ) \
2015-11-03 21:30:11 +01:00
. filter ( model . Package . type == DATASET_TYPE_NAME ) \
. filter ( model . Package . state == u ' active ' ) \
2013-01-22 17:43:25 +01:00
. all ( )
package_index = PackageSearchIndex ( )
2014-06-27 17:39:02 +02:00
reindex_context = { ' defer_commit ' : True }
2013-01-22 17:43:25 +01:00
for package in packages :
2015-11-03 21:30:11 +01:00
get_action ( ' harvest_source_reindex ' ) (
reindex_context , { ' id ' : package . id } )
2013-01-22 17:43:25 +01:00
package_index . commit ( )
2014-06-27 17:39:02 +02:00
return True
2015-11-03 21:30:11 +01:00
2014-06-27 17:39:02 +02:00
@logic.side_effect_free
def harvest_source_reindex ( context , data_dict ) :
''' Reindex a single harvest source '''
harvest_source_id = logic . get_or_bust ( data_dict , ' id ' )
defer_commit = context . get ( ' defer_commit ' , False )
if ' extras_as_string ' in context :
del context [ ' extras_as_string ' ]
context . update ( { ' ignore_auth ' : True } )
2015-11-03 21:30:11 +01:00
package_dict = logic . get_action ( ' harvest_source_show ' ) (
context , { ' id ' : harvest_source_id } )
2015-11-04 10:44:05 +01:00
log . debug ( ' Updating search index for harvest source: %s ' ,
package_dict . get ( ' name ' ) or harvest_source_id )
2014-06-27 17:39:02 +02:00
2014-06-27 17:54:39 +02:00
# Remove configuration values
new_dict = { }
2018-02-23 09:25:55 +01:00
try :
config = json . loads ( package_dict . get ( ' config ' , ' ' ) )
except ValueError :
config = { }
for key , value in package_dict . iteritems ( ) :
if key not in config :
new_dict [ key ] = value
2014-06-27 17:39:02 +02:00
package_index = PackageSearchIndex ( )
2014-06-27 17:54:39 +02:00
package_index . index_package ( new_dict , defer_commit = defer_commit )
2014-06-27 17:39:02 +02:00
return True