FDP-harvester/ckanext/dcat/f2ds_harvester.py

760 lines
30 KiB
Python

import json
import os
import urllib2
import urllib
import sys
import re
import time
from warnings import catch_warnings
import rdflib
import rdflib.parser
from rdflib import URIRef, BNode, Literal
from rdflib.namespace import Namespace, RDF
from ckanext.dcat.processors import RDFParser
from ckanext.dcat.profiles import EuropeanDCATAPProfile
from ckanext.dcat.profiles import RDFProfile
# needed for #workaround until the core translation function defaults to the Flask one
from paste.registry import Registry
from ckan.lib.cli import MockTranslator
from ckantoolkit import config
from pylons import translator
import copy
from ckan.plugins import toolkit
import logging
from logging.handlers import RotatingFileHandler
from logging import handlers
from datetime import datetime, timedelta
import ConfigParser
# Log stdout Handler
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
log = logging.getLogger(__name__)
# Log FILE handler
# LOGFILE = "/var/log/ckan/f2ds_harvester.log"
LOG_PATH = "/var/log/ckan/f2ds-log"
LOGFILE = LOG_PATH + "/f2ds_harvester.log"
# Create the log PATH folder if does not exist
if not os.path.exists(LOG_PATH):
os.makedirs(LOG_PATH)
fh = RotatingFileHandler(LOGFILE, maxBytes=(1048576 * 10), backupCount=7)
fh_formatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s")
fh.setLevel(logging.INFO)
fh.setFormatter(fh_formatter)
log.addHandler(fh)
# create report_logger. It is filled with latest harvesting results
report_logger = logging.getLogger("f2ds.summary_latest_harvesting")
report_logger.setLevel(logging.INFO)
REPORT_HARVESTING_FILE = LOG_PATH + "/f2ds_summary_latest_harvesting.log"
# create file handler which logs even debug messages
fh_report = logging.FileHandler(REPORT_HARVESTING_FILE, mode='w')
fh_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh_report.setLevel(logging.INFO)
fh_report.setFormatter(fh_formatter)
report_logger.addHandler(fh_report)
# Created by Francesco Mangiacrapa
# francesco.mangiacrapa@isti.cnr.it
# ISTI-CNR Pisa (ITALY)
DCT = Namespace("http://purl.org/dc/terms/")
DCAT = Namespace("http://www.w3.org/ns/dcat#")
ADMS = Namespace("http://www.w3.org/ns/adms#")
VCARD = Namespace("http://www.w3.org/2006/vcard/ns#")
FOAF = Namespace("http://xmlns.com/foaf/0.1/")
SCHEMA = Namespace('http://schema.org/')
TIME = Namespace('http://www.w3.org/2006/time')
LOCN = Namespace('http://www.w3.org/ns/locn#')
GSP = Namespace('http://www.opengis.net/ont/geosparql#')
OWL = Namespace('http://www.w3.org/2002/07/owl#')
SPDX = Namespace('http://spdx.org/rdf/terms#')
RE3DATA = Namespace('http://www.re3data.org/schema/3-0#')
namespaces = {
'dct': DCT,
'dcat': DCAT,
'adms': ADMS,
'vcard': VCARD,
'foaf': FOAF,
'schema': SCHEMA,
'time': TIME,
'locn': LOCN,
'gsp': GSP,
'owl': OWL,
'spdx': SPDX,
'r3d': RE3DATA,
}
DATASET_FIELD_NAME = 'name'
DATASET_NAME_MAX_SIZE = 100
F2DS_HARVESTER_INI_PATH = '/etc/ckan/default/f2ds_harvester.ini'
F2DS_HARVESTER_INI_DEFAULT_SECTION = 'DEFAULT_CONFIGS'
def http_get_response_body(uri, dict_query_args=None, gcube_token=None):
if dict_query_args:
uri += "?{}".format(dict_query_args)
headers = {}
if gcube_token:
headers["gcube-token"] = gcube_token
log.info("Sending GET request to URL: %s" % uri)
req = urllib2.Request(uri, None, headers)
try:
resp = urllib2.urlopen(req, timeout=60)
except urllib2.HTTPError as e:
log.error("The request sent to the URL {} is failed".format(uri))
log.error("HTTPError: %d" % e.code)
return None
except urllib2.URLError as e:
# Not an HTTP-specific error (e.g. connection refused)
log.error("URLError - Input URI: %s " % uri + " is not valid!!")
return None
else:
# 200
body = resp.read()
return body
def copy_list_of_dict(list_source):
cpy_list = []
for li in list_source:
d2 = copy.deepcopy(li)
cpy_list.append(d2)
return cpy_list
def http_json_response_body(uri, data=None, gcube_token=None, method='POST'):
'''
:param uri: the uri
:param data: the json data to send
:param gcube_token: the gcube-token that must be used for contacting gCat serivce
:param method: the http method to perform the request, default is 'POST'
:return: the HTTP response returned as a dictionary of type dict(body='', status='', error='')
'''
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
}
if gcube_token:
headers["gcube-token"] = gcube_token
''' Appending the dataset name to URI as /[dataset-name]'''
if method == 'PUT':
data_json_dict = json.loads(data)
uri += '/' + data_json_dict['name']
# data_json_dict['name'] = data_json_dict['name']+'new'
# data = json.dumps(data_json_dict).encode("utf-8")
req = urllib2.Request(uri, data, headers)
if method:
req.get_method = lambda: method
log.info("Sending {} request to URL: {}".format(method, uri))
log.debug("Headers: %s" % headers)
log.debug("data: %s" % data)
try:
resp = urllib2.urlopen(req, timeout=300)
response_status_code = resp.getcode()
except urllib2.HTTPError as e:
response_status_code = e.code
log.error("The request sent to the URL {} is failed".format(uri))
log.error("HTTPError: {}. Error message: {}".format(e.code, e.read()))
return dict(body=None, status=response_status_code, error=e.read())
except urllib2.URLError as e:
response_status_code = e.code
# Not an HTTP-specific error (e.g. connection refused)
log.error("URLError - Input URI: %s " % uri + " is not valid!!")
return dict(body=None, status=response_status_code, error=e.read())
else:
# 200
return dict(body=resp.read(), status=response_status_code, error=None)
def fill_in_to_gcat_dataset(rdf_catalogue, json_data, license_id='CC-BY-SA-4.0', system_type='Dataset'):
# data_json_array = json.dumps(json_data)
data_json_dict = json.loads(json_data)
log.debug("read json data {}".format(json_data))
dataset_name = None
try:
dataset_name = data_json_dict[DATASET_FIELD_NAME]
except KeyError as e:
log.info("The field " + DATASET_FIELD_NAME + " not found from source. KeyError {}".format(e))
if not dataset_name:
log.debug("Trying to generate the field 'name' from 'title'")
dataset_title = data_json_dict['title']
if not dataset_title:
raise NameError('name and title fields are not defined')
# The field 'name' must be between 2 and 100 characters long
# and contain only lowercase alphanumeric characters, '-' and '_'.
rescue_dataset_title = normalize_to_alphanumeric_string(dataset_title, '-')
rescue_dataset_title = rescue_dataset_title.lower()
dataset_name = (rescue_dataset_title[:DATASET_NAME_MAX_SIZE]) if len(
rescue_dataset_title) > DATASET_NAME_MAX_SIZE else rescue_dataset_title
log.info("dataset cleaned name is: {} ".format(dataset_name))
if not dataset_name:
raise NameError('field name not found and built')
data_json_dict[DATASET_FIELD_NAME] = dataset_name
# Managing tags
try:
dict_normalized_ckan_tags = dict_of_normalized_ckan_tags(data_json_dict['tags'])
# there are tag normalized
if len(dict_normalized_ckan_tags) > 0:
data_json_dict['tags'] = []
for tag in dict_normalized_ckan_tags:
# adding to tags
norm_tag = dict_normalized_ckan_tags[tag]
data_json_dict['tags'].append({'name': norm_tag})
# if tag is not equal to normalized tag value
if tag != norm_tag:
# adding to extras as keyword
data_json_dict['extras'].append({'key': 'keyword', 'value': tag})
else:
# at least one tag is required for item creation on CKAN. Adding "No Tag"
data_json_dict['tags'] = []
data_json_dict['tags'].append({'name': 'No Tag'})
log.info("no tag found, adding 'No Tag' as tag because at least one tag is required")
except Exception as e:
log.error("Error occurred during managing tags . Exception {}".format(e))
if not license_id:
license_id = 'CC-BY-SA-4.0'
if not system_type:
system_type = 'Dataset'
# adding system:type
data_json_dict['extras'].append({'key': 'system:type', 'value': system_type})
rescue_catalogue_title = rdf_catalogue.get_name()
if rescue_catalogue_title:
pattern = '[^a-zA-Z0-9_\-]+'
remove_white_spaces = True
rescue_catalogue_title = normalize_to_alphanumeric_string(rescue_catalogue_title, '-', pattern, remove_white_spaces)
else:
rescue_catalogue_title = "No Catalogue"
log.info("rescue_catalogue_title is {} ".format(rescue_catalogue_title))
# adding catalogue_title
data_json_dict['extras'].append({'key': 'catalogue_title', 'value': rescue_catalogue_title})
# adding license
data_json_dict['license_id'] = license_id
return data_json_dict
# Returns a string containing only alphanumeric characters, '-' and '_'.
def normalize_to_alphanumeric_string(sting_value, replace_with='_', pattern='[^a-zA-Z0-9_\-]+', remove_white_spaces=False):
sanitize1 = re.sub(pattern, replace_with, sting_value)
if remove_white_spaces:
return re.sub(' ', replace_with, sanitize1)
return sanitize1
# For any ckan tag found normalizes its value (as alphanumeric characters with '-' and '_')
# returns the dict {'tag': 'normalized-tag'}
def dict_of_normalized_ckan_tags(tag_array):
dict_normalized_ckan_tags = {}
# alphanumeric string with '-' and '_' and ' '
pattern = '[^a-zA-Z0-9_\- ]+'
if tag_array:
for tag in tag_array:
tag_value = tag.get('name')
log.info("tag value is: {}".format(tag_value))
n_tag_value = normalize_to_alphanumeric_string(tag_value, '_', pattern)
dict_normalized_ckan_tags[tag_value] = n_tag_value
log.info("returning the dict of normalized tag: {}".format(dict_normalized_ckan_tags))
return dict_normalized_ckan_tags
class RDFCatalogue(object):
def __init__(self, catalogue_url, catalogue_name, catalogue_title, catalogue_description, list_dataset_urls):
self._url = catalogue_url
self._name = catalogue_name
self._title = catalogue_title
self._description = catalogue_description
self._list_dataset_urls = list_dataset_urls
def get_url(self):
return self._url
def get_name(self):
return self._name
def get_list_dataset_urls(self):
return self._list_dataset_urls
def get_title(self):
return self._title
def get_description(self):
return self._description
def __repr__(self):
return "RDFCatalogue(url: {0}, name: {1}, title: {2}, description: {3}, datasets: {4})".format(self._url,
self._name,
self._title,
self._description,
self._list_dataset_urls)
class DiscoverRDFCatalogue(object):
def __init__(self, fdp_catalogues_endpoint, format_file):
self._fdp_catalogues_endpoint = fdp_catalogues_endpoint
self._format_file = format_file
def discover_catalogues(self):
g = rdflib.ConjunctiveGraph()
for prefix, namespace in namespaces.iteritems():
g.bind(prefix, namespace)
list_catalogue_urls = []
try:
catalogues_ref = URIRef(self._fdp_catalogues_endpoint)
parser = g.parse(source=self._fdp_catalogues_endpoint, format=self._format_file)
# print(parser.all_nodes())
# loop through each triple in the graph (subj, pred, obj)
for subj, pred, obj in g:
# check if there is at least one triple in the Graph
log.debug("subj has {} pred has {} obj has {}".format(subj, pred, obj))
if (subj, pred, obj) not in g:
raise Exception("It better be!")
log.debug("graph has {} statements.".format(len(g)))
for catalog in g.subject_objects(RE3DATA.dataCatalog):
log.debug("catalogue url found {}".format(catalog[1]))
list_catalogue_urls.append(str(catalog[1]))
log.info("returning list of catalogue URLs {}".format(list_catalogue_urls))
return list_catalogue_urls
except Exception as inst:
log.error("Error on reading list of catalogues: " + str(inst))
log.info("Returning empty list of catalogues")
return list_catalogue_urls
def read_catalogue(self, catalogue_url):
g = rdflib.ConjunctiveGraph()
for prefix, namespace in namespaces.iteritems():
g.bind(prefix, namespace)
list_dataset_urls = []
try:
catalog_ref = URIRef(catalogue_url)
parser = g.parse(source=catalogue_url, format=self._format_file)
# print(parser.all_nodes())
# loop through each triple in the graph (subj, pred, obj)
for subj, pred, obj in g:
# check if there is at least one triple in the Graph
log.debug("subj has {} pred has {} obj has {}".format(subj, pred, obj))
if (subj, pred, obj) not in g:
raise Exception("It better be!")
log.debug("graph has {} statements.".format(len(g)))
catalogue_name = None
for name in g.subject_objects(FOAF.name):
catalogue_name = str(name[1])
log.info("catalogue name found: {}".format(catalogue_name))
break
catalogue_title = None
for title in g.subject_objects(DCT.title):
catalogue_title = str(title[1])
log.info("catalogue title found: {}".format(catalogue_title))
break
catalogue_description = None
for description in g.subject_objects(DCT.description):
catalogue_description = str(description[1])
log.info("catalogue description found: {}".format(catalogue_description))
break
for dataset in g.subject_objects(DCAT.dataset):
log.info("dataset url found: {}".format(dataset[1]))
list_dataset_urls.append(str(dataset[1]))
log.info("returning list of dataset URLs {}".format(list_dataset_urls))
return RDFCatalogue(catalogue_url, catalogue_name, catalogue_title, catalogue_description,
list_dataset_urls)
except Exception as inst:
log.error("Error on reading catalogue: " + str(inst))
log.info("Returning empty list of datasets")
return RDFCatalogue(catalogue_url, "", [])
def to_ckan_mapping(self, dataset_body, format):
parser = RDFParser()
parser._profiles = [EuropeanDCATAPProfile, F2DSDCATAPProfile]
parser.parse(dataset_body, _format=format)
indent = 4
ckan_datasets = [d for d in parser.datasets()]
# only one exists
ckan_dataset = ckan_datasets[0]
log.debug("ckan_dataset is {}".format(ckan_dataset))
# ckan_dataset = self._ckan_dataset_extras_strip(ckan_dataset)
# print(json.dumps(ckan_dataset, indent=indent))
# print(json.dumps(ckan_datasets, indent=indent))
return ckan_dataset
def _ckan_dataset_extras_strip(self, _dict):
'''
Returns dict of extras stripped
'''
try:
new_dict_extras = []
for extra in _dict.get('extras', []):
k, v = extra['key'], extra['value']
if isinstance(v, str):
v = str(v).strip()
new_dict_extras.append({k, v})
_dict['extras'] = new_dict_extras
return _dict
except Exception as inst:
log.error("Error _ckan_dataset_extras_strip: " + str(inst))
class F2DSDCATAPProfile(RDFProfile):
'''
An RDF profile for the F2DS. Implemented by Francesco Mangiacrapa at ISTI-CNR
Called before of the European DCAT-AP profile (`euro_dcat_ap`)
'''
def _set_extras_dict_value(self, _dict, key, value, default=None):
'''
Set the value for the given key on a CKAN dict `extras`
Return the new dict
'''
new_dict_extras = []
for extra in _dict.get('extras', []):
new_dict_extras.append(extra)
if extra['key'] == key or extra['key'] == 'dcat_' + key:
extra['value'] = value
new_dict_extras.append(extra)
return new_dict_extras
def parse_dataset(self, dataset_dict, dataset_ref):
log.info("parse_dataset into F2DSDCATAPProfile")
document_format = "application/rdf+xml"
# Resetting list of resources
dataset_dict['resources'] = []
# Distributions
distribution_index = 0
# external distribution referred in RDF dataset
for distribution_url in self.g.objects(dataset_ref, DCAT.distribution):
try:
distribution_index += 1
log.info("\t## {} ## Read distribution url {} for dataset url {}".format(distribution_index, distribution_url, dataset_ref))
graph = rdflib.ConjunctiveGraph()
for prefix, namespace in namespaces.iteritems():
graph.bind(prefix, namespace)
graph.parse(URIRef(distribution_url), document_format)
self.g.add((URIRef(distribution_url), RDF.type, DCAT.Distribution))
self.g += graph
log.info("\t\t## Added distribution url {} to graph".format(distribution_url))
except Exception as inst:
error = str(inst)
log.error("Error on resolving distribution_url {}. Error: {}".format(distribution_url, error))
distributions = []
# inline distribution referred in RDF dataset
for distribution in self.g.subjects(RDF.type, DCAT.Distribution):
distributions.append(distribution)
distribution_index = 0
for distribution in distributions:
distribution_index += 1
log.info("\t## {} ## mapping distribution {} to CKAN resource".format(distribution_index, str(distribution)))
resource_dict = {}
# Simple values
for key, predicate in (
('name', DCT.title),
('description', DCT.description),
('access_url', DCAT.accessURL),
('download_url', DCAT.downloadURL),
('issued', DCT.issued),
('modified', DCT.modified),
('status', ADMS.status),
('rights', DCT.rights),
('license', DCT.license),
):
value = self._object_value(distribution, predicate)
if value:
resource_dict[key] = value
resource_dict['url'] = (self._object_value(distribution,
DCAT.downloadURL) or
self._object_value(distribution,
DCAT.accessURL))
# Lists
for key, predicate in (
('language', DCT.language),
('documentation', FOAF.page),
('conforms_to', DCT.conformsTo),
):
values = self._object_value_list(distribution, predicate)
if values:
resource_dict[key] = json.dumps(values)
# Format and media type
normalize_ckan_format = config.get(
'ckanext.dcat.normalize_ckan_format', True)
imt, label = self._distribution_format(distribution,
normalize_ckan_format)
if imt:
resource_dict['mimetype'] = imt
if label:
resource_dict['format'] = label
elif imt:
resource_dict['format'] = imt
# Size
size = self._object_value_int(distribution, DCAT.byteSize)
if size is not None:
resource_dict['size'] = size
# Checksum
for checksum in self.g.objects(distribution, SPDX.checksum):
algorithm = self._object_value(checksum, SPDX.algorithm)
checksum_value = self._object_value(checksum, SPDX.checksumValue)
if algorithm:
resource_dict['hash_algorithm'] = algorithm
if checksum_value:
resource_dict['hash'] = checksum_value
# Distribution URI (explicitly show the missing ones)
resource_dict['uri'] = (unicode(distribution)
if isinstance(distribution,
rdflib.term.URIRef)
else '')
dataset_dict['resources'].append(resource_dict)
# Spatial
spatial = self._spatial(dataset_ref, DCT.spatial)
log.debug("spatial {}".format(str(spatial)))
for key in ('uri', 'text', 'geom'):
if spatial.get(key):
the_key = 'spatial_{0}'.format(key) if key != 'geom' else 'spatial'
the_value = spatial.get(key).strip()
log.info("overriding spatial value to: {}".format(str(the_value)))
dataset_dict = self._set_extras_dict_value(dataset_dict, the_key, the_value)
# dataset_dict['extras'].append(
# {'key': 'spatial_{0}'.format(key) if key != 'geom' else 'spatial',
# 'value': spatial.get(key).strip()})
return dataset_dict
log.info("F2DS harvester script starts...")
# Reading Configurations
if not os.path.exists(F2DS_HARVESTER_INI_PATH):
log.info("File '{}' not found, F2DS harvester exit".format(F2DS_HARVESTER_INI_PATH))
sys.exit('No file with f2ds configurations found')
f2ds_config = ConfigParser.ConfigParser()
# parse f2ds_harvester.ini file
f2ds_config.read(F2DS_HARVESTER_INI_PATH)
config_dict = dict(f2ds_config.items(F2DS_HARVESTER_INI_DEFAULT_SECTION))
fdp_catalogues_endpoint = None
format_file = None
license_id = None
gcat_endpoint = None
gcube_meta_system_type = None
gcube_token = None
try:
log.info("From {} read configs {}".format(F2DS_HARVESTER_INI_DEFAULT_SECTION, config_dict))
format_file = config_dict['format_file']
license_id = config_dict['default_license_id']
gcat_endpoint = config_dict['gcat_endpoint']
gcube_meta_system_type = config_dict['gcube_metadata_profile_system_type']
gcube_token = config_dict['jarvis_token']
fdp_catalogues_endpoint = config_dict['fdp_catalogues_endpoint']
log.info("F2DS harvester is running with configurations:")
log.info("fdp_catalogues_endpoint: {}".format(fdp_catalogues_endpoint))
log.info("gcat: {}".format(gcat_endpoint))
masked_token = gcube_token[0:10]+"_MASKED_TOKEN"
log.info("gcube_token: {}".format(masked_token))
log.info("gcube_meta_system_type: {}".format(gcube_meta_system_type))
except Exception as inst:
log.error("Error on reading configurations: " + str(inst))
# Running
if not (fdp_catalogues_endpoint):
fdp_catalogues_endpoint = "https://f2ds.eosc-pillar.eu/expanded?format=rdf"
log.warn("Using hard coded endpoint {}".format(fdp_catalogues_endpoint))
discover_RDFC = DiscoverRDFCatalogue(fdp_catalogues_endpoint, format_file)
list_catalogues_urls = discover_RDFC.discover_catalogues()
countOperations = 0
countCatalogues = 0
countCreate = 0
countUpdate = 0
countDatasetError = 0
countCatalogueError = 0
for catalogue_url in list_catalogues_urls:
try:
log.debug("rdf catalogue URL {}".format(catalogue_url))
countCatalogues += 1
report_logger.info("CTG.{} Harvesting Catalogue: {}".format(countCatalogues, catalogue_url))
rdf_catalogue = discover_RDFC.read_catalogue(catalogue_url)
log.info("from {} got {}".format(catalogue_url, rdf_catalogue))
# Workaround until the core translation function defaults to the Flask one
registry = Registry()
registry.prepare()
registry.register(translator, MockTranslator())
dataset_body_to_ckan_list = []
for dataset_url in rdf_catalogue.get_list_dataset_urls():
dataset_body = http_get_response_body(dataset_url, 'format=rdf')
if dataset_body is None:
countDatasetError += 1
report_logger.info("Error, no body for dataset: {}".format(dataset_url))
continue
log.debug("from {} read dataset_body {}".format(dataset_url, dataset_body))
ckan_dataset = discover_RDFC.to_ckan_mapping(dataset_body, "application/rdf+xml")
dataset_body_to_ckan_list.append(ckan_dataset)
loop_index = 0
loop_size = len(dataset_body_to_ckan_list)
for ckan_dataset_body in dataset_body_to_ckan_list:
try:
loop_index += 1
log.info('\n\nManaging publishing {} of {}'.format(loop_index, loop_size))
log.debug('ckan_dataset_body {}'.format(ckan_dataset_body))
# data = str(ckan_dataset_body).encode('utf-8').strip()
# json.dumps takes in a json object and returns a string.
json_data = json.dumps(ckan_dataset_body)
json_data_dict = fill_in_to_gcat_dataset(rdf_catalogue, json_data, license_id, gcube_meta_system_type)
# just swapping the dataset_name for logging into report
dataset_name = json_data_dict[DATASET_FIELD_NAME]
log.debug("Dataset name is {}".format(dataset_name))
# encoding in utf-8
json_data = json.dumps(json_data_dict).encode("utf-8")
log.debug('json_data {}'.format(json_data))
response = http_json_response_body(gcat_endpoint + '/items', json_data, gcube_token, method='PUT')
code_status = response["status"]
if 200 <= code_status <= 208:
log.info("Update OK. Response returned status {}".format(code_status))
report_logger.info("DTS.{} Updated dataset: {}".format(loop_index, dataset_name))
countUpdate += 1
else:
log.info("Update request failed (with PUT request). Response returned status {}".format(
code_status))
if code_status == 404:
log.info("Update failed with code {}. Submitting the create with POST request".format(code_status))
response = http_json_response_body(gcat_endpoint + '/items', json_data, gcube_token, method='POST')
code_status = response["status"]
log.info("POST response returned with status {}".format(code_status))
report_logger.info("DTS.{} Created dataset: {}".format(loop_index, dataset_name))
countCreate += 1
else:
countDatasetError += 1
report_logger.warn("Not created/updated the dataset: {}".format(dataset_name))
except Exception as inst:
log.error("Error on creating dataset: {}. Error: ".format(dataset_name) + str(inst))
report_logger.info("Dataset error: {}".format(dataset_name) + str(inst))
countDatasetError += 1
countOperations += 1
if loop_index < loop_size:
log.info("sleeping 2 seconds...")
time.sleep(2)
log.info("running")
except Exception as inst:
log.error("Error on harvesting catalogue: {}. Error: ".format(catalogue_url) + str(inst))
report_logger.info("Catalogue error: {}".format(catalogue_url) + str(inst))
countCatalogueError += 1
log.info("F2DS harvesting stage is finished. Operations (create/update) done are: " + str(countOperations))
report_logger.info("Catalogue/s found: {}".format(countCatalogues))
report_logger.info("Total # operations: {}".format(countOperations))
report_logger.info("Datasets created: {}".format(countCreate))
report_logger.info("Datasets updated: {}".format(countUpdate))
report_logger.info("Datasets errors: {}".format(countDatasetError))
report_logger.info("Catalogues errors: {}".format(countCatalogueError))
log.info("Going to resync DB -> Solr, with search-index rebuild -r")
os.system(
'/usr/lib/ckan/default/bin/paster --plugin=ckan search-index rebuild -r --config=/etc/ckan/default/production.ini')
log.info("Resync DB -> Solr, done.")
# Running
if countDatasetError == 0:
log.info("No error detected going to call bulk deletion...")
report_logger.info("Bulk deletion stage starts...")
'''Building yesterday UTC time as YESTERDAY:T23:59:59Z'''
today = datetime.utcnow().date()
yesterday = today - timedelta(days=1)
utc_time_suffix = 'T23:59:59Z'
yesterday_time = yesterday.__str__() + utc_time_suffix
'''Building query string for bulk purge invocation on gCat'''
query_string = {"q": "extras_systemtype:{}".format(gcube_meta_system_type),
"fq": "metadata_modified:[* TO {}]".format(yesterday_time),
"own_only": True}
safe_query_string = urllib.urlencode(query_string)
uri = gcat_endpoint + '/items?{}'.format(safe_query_string)
report_logger.info("Calling {}".format(uri))
response = http_json_response_body(uri, None, gcube_token, method='PURGE')
report_logger.info("\nBulk deletion called for {}".format(uri))
log.info("\nBulk deletion called correctly with parameters {}".format(query_string))
else:
report_logger.info("\nDatasets error/s found, so skipping bulk deletion stage")
log.info("\nDatasets error/s found, so skipping bulk deletion stage")
log.info("\n\nF2DS harvester terminated!")
sys.exit(0)