760 lines
30 KiB
Python
760 lines
30 KiB
Python
import json
|
|
import os
|
|
import urllib2
|
|
import urllib
|
|
import sys
|
|
import re
|
|
import time
|
|
from warnings import catch_warnings
|
|
|
|
import rdflib
|
|
import rdflib.parser
|
|
|
|
from rdflib import URIRef, BNode, Literal
|
|
from rdflib.namespace import Namespace, RDF
|
|
from ckanext.dcat.processors import RDFParser
|
|
from ckanext.dcat.profiles import EuropeanDCATAPProfile
|
|
from ckanext.dcat.profiles import RDFProfile
|
|
# needed for #workaround until the core translation function defaults to the Flask one
|
|
from paste.registry import Registry
|
|
from ckan.lib.cli import MockTranslator
|
|
from ckantoolkit import config
|
|
from pylons import translator
|
|
import copy
|
|
from ckan.plugins import toolkit
|
|
|
|
import logging
|
|
from logging.handlers import RotatingFileHandler
|
|
from logging import handlers
|
|
from datetime import datetime, timedelta
|
|
|
|
import ConfigParser
|
|
|
|
# Log stdout Handler
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
log = logging.getLogger(__name__)
|
|
|
|
# Log FILE handler
|
|
# LOGFILE = "/var/log/ckan/f2ds_harvester.log"
|
|
LOG_PATH = "/var/log/ckan/f2ds-log"
|
|
LOGFILE = LOG_PATH + "/f2ds_harvester.log"
|
|
# Create the log PATH folder if does not exist
|
|
if not os.path.exists(LOG_PATH):
|
|
os.makedirs(LOG_PATH)
|
|
fh = RotatingFileHandler(LOGFILE, maxBytes=(1048576 * 10), backupCount=7)
|
|
fh_formatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s")
|
|
fh.setLevel(logging.INFO)
|
|
fh.setFormatter(fh_formatter)
|
|
log.addHandler(fh)
|
|
|
|
# create report_logger. It is filled with latest harvesting results
|
|
report_logger = logging.getLogger("f2ds.summary_latest_harvesting")
|
|
report_logger.setLevel(logging.INFO)
|
|
REPORT_HARVESTING_FILE = LOG_PATH + "/f2ds_summary_latest_harvesting.log"
|
|
# create file handler which logs even debug messages
|
|
fh_report = logging.FileHandler(REPORT_HARVESTING_FILE, mode='w')
|
|
fh_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
fh_report.setLevel(logging.INFO)
|
|
fh_report.setFormatter(fh_formatter)
|
|
report_logger.addHandler(fh_report)
|
|
|
|
# Created by Francesco Mangiacrapa
|
|
# francesco.mangiacrapa@isti.cnr.it
|
|
# ISTI-CNR Pisa (ITALY)
|
|
|
|
DCT = Namespace("http://purl.org/dc/terms/")
|
|
DCAT = Namespace("http://www.w3.org/ns/dcat#")
|
|
ADMS = Namespace("http://www.w3.org/ns/adms#")
|
|
VCARD = Namespace("http://www.w3.org/2006/vcard/ns#")
|
|
FOAF = Namespace("http://xmlns.com/foaf/0.1/")
|
|
SCHEMA = Namespace('http://schema.org/')
|
|
TIME = Namespace('http://www.w3.org/2006/time')
|
|
LOCN = Namespace('http://www.w3.org/ns/locn#')
|
|
GSP = Namespace('http://www.opengis.net/ont/geosparql#')
|
|
OWL = Namespace('http://www.w3.org/2002/07/owl#')
|
|
SPDX = Namespace('http://spdx.org/rdf/terms#')
|
|
RE3DATA = Namespace('http://www.re3data.org/schema/3-0#')
|
|
|
|
namespaces = {
|
|
'dct': DCT,
|
|
'dcat': DCAT,
|
|
'adms': ADMS,
|
|
'vcard': VCARD,
|
|
'foaf': FOAF,
|
|
'schema': SCHEMA,
|
|
'time': TIME,
|
|
'locn': LOCN,
|
|
'gsp': GSP,
|
|
'owl': OWL,
|
|
'spdx': SPDX,
|
|
'r3d': RE3DATA,
|
|
}
|
|
|
|
DATASET_FIELD_NAME = 'name'
|
|
DATASET_NAME_MAX_SIZE = 100
|
|
|
|
F2DS_HARVESTER_INI_PATH = '/etc/ckan/default/f2ds_harvester.ini'
|
|
F2DS_HARVESTER_INI_DEFAULT_SECTION = 'DEFAULT_CONFIGS'
|
|
|
|
|
|
def http_get_response_body(uri, dict_query_args=None, gcube_token=None):
|
|
if dict_query_args:
|
|
uri += "?{}".format(dict_query_args)
|
|
|
|
headers = {}
|
|
if gcube_token:
|
|
headers["gcube-token"] = gcube_token
|
|
|
|
log.info("Sending GET request to URL: %s" % uri)
|
|
req = urllib2.Request(uri, None, headers)
|
|
try:
|
|
resp = urllib2.urlopen(req, timeout=60)
|
|
except urllib2.HTTPError as e:
|
|
log.error("The request sent to the URL {} is failed".format(uri))
|
|
log.error("HTTPError: %d" % e.code)
|
|
return None
|
|
except urllib2.URLError as e:
|
|
# Not an HTTP-specific error (e.g. connection refused)
|
|
log.error("URLError - Input URI: %s " % uri + " is not valid!!")
|
|
return None
|
|
else:
|
|
# 200
|
|
body = resp.read()
|
|
return body
|
|
|
|
|
|
def copy_list_of_dict(list_source):
|
|
cpy_list = []
|
|
for li in list_source:
|
|
d2 = copy.deepcopy(li)
|
|
cpy_list.append(d2)
|
|
|
|
return cpy_list
|
|
|
|
|
|
def http_json_response_body(uri, data=None, gcube_token=None, method='POST'):
|
|
'''
|
|
|
|
:param uri: the uri
|
|
:param data: the json data to send
|
|
:param gcube_token: the gcube-token that must be used for contacting gCat serivce
|
|
:param method: the http method to perform the request, default is 'POST'
|
|
:return: the HTTP response returned as a dictionary of type dict(body='', status='', error='')
|
|
'''
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json",
|
|
}
|
|
|
|
if gcube_token:
|
|
headers["gcube-token"] = gcube_token
|
|
|
|
''' Appending the dataset name to URI as /[dataset-name]'''
|
|
if method == 'PUT':
|
|
data_json_dict = json.loads(data)
|
|
uri += '/' + data_json_dict['name']
|
|
# data_json_dict['name'] = data_json_dict['name']+'new'
|
|
# data = json.dumps(data_json_dict).encode("utf-8")
|
|
|
|
req = urllib2.Request(uri, data, headers)
|
|
|
|
if method:
|
|
req.get_method = lambda: method
|
|
|
|
log.info("Sending {} request to URL: {}".format(method, uri))
|
|
log.debug("Headers: %s" % headers)
|
|
log.debug("data: %s" % data)
|
|
try:
|
|
resp = urllib2.urlopen(req, timeout=300)
|
|
response_status_code = resp.getcode()
|
|
except urllib2.HTTPError as e:
|
|
response_status_code = e.code
|
|
log.error("The request sent to the URL {} is failed".format(uri))
|
|
log.error("HTTPError: {}. Error message: {}".format(e.code, e.read()))
|
|
return dict(body=None, status=response_status_code, error=e.read())
|
|
except urllib2.URLError as e:
|
|
response_status_code = e.code
|
|
# Not an HTTP-specific error (e.g. connection refused)
|
|
log.error("URLError - Input URI: %s " % uri + " is not valid!!")
|
|
return dict(body=None, status=response_status_code, error=e.read())
|
|
else:
|
|
# 200
|
|
return dict(body=resp.read(), status=response_status_code, error=None)
|
|
|
|
|
|
def fill_in_to_gcat_dataset(rdf_catalogue, json_data, license_id='CC-BY-SA-4.0', system_type='Dataset'):
|
|
# data_json_array = json.dumps(json_data)
|
|
data_json_dict = json.loads(json_data)
|
|
log.debug("read json data {}".format(json_data))
|
|
|
|
dataset_name = None
|
|
try:
|
|
dataset_name = data_json_dict[DATASET_FIELD_NAME]
|
|
except KeyError as e:
|
|
log.info("The field " + DATASET_FIELD_NAME + " not found from source. KeyError {}".format(e))
|
|
|
|
if not dataset_name:
|
|
log.debug("Trying to generate the field 'name' from 'title'")
|
|
dataset_title = data_json_dict['title']
|
|
if not dataset_title:
|
|
raise NameError('name and title fields are not defined')
|
|
|
|
# The field 'name' must be between 2 and 100 characters long
|
|
# and contain only lowercase alphanumeric characters, '-' and '_'.
|
|
rescue_dataset_title = normalize_to_alphanumeric_string(dataset_title, '-')
|
|
rescue_dataset_title = rescue_dataset_title.lower()
|
|
dataset_name = (rescue_dataset_title[:DATASET_NAME_MAX_SIZE]) if len(
|
|
rescue_dataset_title) > DATASET_NAME_MAX_SIZE else rescue_dataset_title
|
|
log.info("dataset cleaned name is: {} ".format(dataset_name))
|
|
|
|
if not dataset_name:
|
|
raise NameError('field name not found and built')
|
|
|
|
data_json_dict[DATASET_FIELD_NAME] = dataset_name
|
|
|
|
# Managing tags
|
|
try:
|
|
dict_normalized_ckan_tags = dict_of_normalized_ckan_tags(data_json_dict['tags'])
|
|
# there are tag normalized
|
|
if len(dict_normalized_ckan_tags) > 0:
|
|
data_json_dict['tags'] = []
|
|
for tag in dict_normalized_ckan_tags:
|
|
# adding to tags
|
|
norm_tag = dict_normalized_ckan_tags[tag]
|
|
data_json_dict['tags'].append({'name': norm_tag})
|
|
# if tag is not equal to normalized tag value
|
|
if tag != norm_tag:
|
|
# adding to extras as keyword
|
|
data_json_dict['extras'].append({'key': 'keyword', 'value': tag})
|
|
else:
|
|
# at least one tag is required for item creation on CKAN. Adding "No Tag"
|
|
data_json_dict['tags'] = []
|
|
data_json_dict['tags'].append({'name': 'No Tag'})
|
|
log.info("no tag found, adding 'No Tag' as tag because at least one tag is required")
|
|
|
|
except Exception as e:
|
|
log.error("Error occurred during managing tags . Exception {}".format(e))
|
|
|
|
if not license_id:
|
|
license_id = 'CC-BY-SA-4.0'
|
|
|
|
if not system_type:
|
|
system_type = 'Dataset'
|
|
|
|
# adding system:type
|
|
data_json_dict['extras'].append({'key': 'system:type', 'value': system_type})
|
|
|
|
rescue_catalogue_title = rdf_catalogue.get_name()
|
|
if rescue_catalogue_title:
|
|
pattern = '[^a-zA-Z0-9_\-]+'
|
|
remove_white_spaces = True
|
|
rescue_catalogue_title = normalize_to_alphanumeric_string(rescue_catalogue_title, '-', pattern, remove_white_spaces)
|
|
else:
|
|
rescue_catalogue_title = "No Catalogue"
|
|
|
|
log.info("rescue_catalogue_title is {} ".format(rescue_catalogue_title))
|
|
|
|
# adding catalogue_title
|
|
data_json_dict['extras'].append({'key': 'catalogue_title', 'value': rescue_catalogue_title})
|
|
|
|
# adding license
|
|
data_json_dict['license_id'] = license_id
|
|
return data_json_dict
|
|
|
|
|
|
# Returns a string containing only alphanumeric characters, '-' and '_'.
|
|
def normalize_to_alphanumeric_string(sting_value, replace_with='_', pattern='[^a-zA-Z0-9_\-]+', remove_white_spaces=False):
|
|
sanitize1 = re.sub(pattern, replace_with, sting_value)
|
|
if remove_white_spaces:
|
|
return re.sub(' ', replace_with, sanitize1)
|
|
|
|
return sanitize1
|
|
|
|
# For any ckan tag found normalizes its value (as alphanumeric characters with '-' and '_')
|
|
# returns the dict {'tag': 'normalized-tag'}
|
|
def dict_of_normalized_ckan_tags(tag_array):
|
|
dict_normalized_ckan_tags = {}
|
|
# alphanumeric string with '-' and '_' and ' '
|
|
pattern = '[^a-zA-Z0-9_\- ]+'
|
|
if tag_array:
|
|
for tag in tag_array:
|
|
tag_value = tag.get('name')
|
|
log.info("tag value is: {}".format(tag_value))
|
|
n_tag_value = normalize_to_alphanumeric_string(tag_value, '_', pattern)
|
|
dict_normalized_ckan_tags[tag_value] = n_tag_value
|
|
|
|
log.info("returning the dict of normalized tag: {}".format(dict_normalized_ckan_tags))
|
|
return dict_normalized_ckan_tags
|
|
|
|
|
|
class RDFCatalogue(object):
|
|
|
|
def __init__(self, catalogue_url, catalogue_name, catalogue_title, catalogue_description, list_dataset_urls):
|
|
self._url = catalogue_url
|
|
self._name = catalogue_name
|
|
self._title = catalogue_title
|
|
self._description = catalogue_description
|
|
self._list_dataset_urls = list_dataset_urls
|
|
|
|
def get_url(self):
|
|
return self._url
|
|
|
|
def get_name(self):
|
|
return self._name
|
|
|
|
def get_list_dataset_urls(self):
|
|
return self._list_dataset_urls
|
|
|
|
def get_title(self):
|
|
return self._title
|
|
|
|
def get_description(self):
|
|
return self._description
|
|
|
|
def __repr__(self):
|
|
return "RDFCatalogue(url: {0}, name: {1}, title: {2}, description: {3}, datasets: {4})".format(self._url,
|
|
self._name,
|
|
self._title,
|
|
self._description,
|
|
self._list_dataset_urls)
|
|
|
|
|
|
class DiscoverRDFCatalogue(object):
|
|
|
|
def __init__(self, fdp_catalogues_endpoint, format_file):
|
|
self._fdp_catalogues_endpoint = fdp_catalogues_endpoint
|
|
self._format_file = format_file
|
|
|
|
def discover_catalogues(self):
|
|
|
|
g = rdflib.ConjunctiveGraph()
|
|
|
|
for prefix, namespace in namespaces.iteritems():
|
|
g.bind(prefix, namespace)
|
|
|
|
list_catalogue_urls = []
|
|
try:
|
|
catalogues_ref = URIRef(self._fdp_catalogues_endpoint)
|
|
parser = g.parse(source=self._fdp_catalogues_endpoint, format=self._format_file)
|
|
# print(parser.all_nodes())
|
|
# loop through each triple in the graph (subj, pred, obj)
|
|
for subj, pred, obj in g:
|
|
# check if there is at least one triple in the Graph
|
|
log.debug("subj has {} pred has {} obj has {}".format(subj, pred, obj))
|
|
if (subj, pred, obj) not in g:
|
|
raise Exception("It better be!")
|
|
|
|
log.debug("graph has {} statements.".format(len(g)))
|
|
|
|
for catalog in g.subject_objects(RE3DATA.dataCatalog):
|
|
log.debug("catalogue url found {}".format(catalog[1]))
|
|
list_catalogue_urls.append(str(catalog[1]))
|
|
|
|
log.info("returning list of catalogue URLs {}".format(list_catalogue_urls))
|
|
return list_catalogue_urls
|
|
|
|
except Exception as inst:
|
|
log.error("Error on reading list of catalogues: " + str(inst))
|
|
log.info("Returning empty list of catalogues")
|
|
return list_catalogue_urls
|
|
|
|
def read_catalogue(self, catalogue_url):
|
|
|
|
g = rdflib.ConjunctiveGraph()
|
|
|
|
for prefix, namespace in namespaces.iteritems():
|
|
g.bind(prefix, namespace)
|
|
|
|
list_dataset_urls = []
|
|
try:
|
|
catalog_ref = URIRef(catalogue_url)
|
|
parser = g.parse(source=catalogue_url, format=self._format_file)
|
|
# print(parser.all_nodes())
|
|
# loop through each triple in the graph (subj, pred, obj)
|
|
for subj, pred, obj in g:
|
|
# check if there is at least one triple in the Graph
|
|
log.debug("subj has {} pred has {} obj has {}".format(subj, pred, obj))
|
|
if (subj, pred, obj) not in g:
|
|
raise Exception("It better be!")
|
|
|
|
log.debug("graph has {} statements.".format(len(g)))
|
|
|
|
catalogue_name = None
|
|
for name in g.subject_objects(FOAF.name):
|
|
catalogue_name = str(name[1])
|
|
log.info("catalogue name found: {}".format(catalogue_name))
|
|
break
|
|
|
|
catalogue_title = None
|
|
for title in g.subject_objects(DCT.title):
|
|
catalogue_title = str(title[1])
|
|
log.info("catalogue title found: {}".format(catalogue_title))
|
|
break
|
|
|
|
catalogue_description = None
|
|
for description in g.subject_objects(DCT.description):
|
|
catalogue_description = str(description[1])
|
|
log.info("catalogue description found: {}".format(catalogue_description))
|
|
break
|
|
|
|
for dataset in g.subject_objects(DCAT.dataset):
|
|
log.info("dataset url found: {}".format(dataset[1]))
|
|
list_dataset_urls.append(str(dataset[1]))
|
|
|
|
log.info("returning list of dataset URLs {}".format(list_dataset_urls))
|
|
return RDFCatalogue(catalogue_url, catalogue_name, catalogue_title, catalogue_description,
|
|
list_dataset_urls)
|
|
|
|
except Exception as inst:
|
|
log.error("Error on reading catalogue: " + str(inst))
|
|
log.info("Returning empty list of datasets")
|
|
return RDFCatalogue(catalogue_url, "", [])
|
|
|
|
def to_ckan_mapping(self, dataset_body, format):
|
|
parser = RDFParser()
|
|
parser._profiles = [EuropeanDCATAPProfile, F2DSDCATAPProfile]
|
|
parser.parse(dataset_body, _format=format)
|
|
indent = 4
|
|
ckan_datasets = [d for d in parser.datasets()]
|
|
# only one exists
|
|
ckan_dataset = ckan_datasets[0]
|
|
log.debug("ckan_dataset is {}".format(ckan_dataset))
|
|
# ckan_dataset = self._ckan_dataset_extras_strip(ckan_dataset)
|
|
# print(json.dumps(ckan_dataset, indent=indent))
|
|
# print(json.dumps(ckan_datasets, indent=indent))
|
|
return ckan_dataset
|
|
|
|
def _ckan_dataset_extras_strip(self, _dict):
|
|
'''
|
|
Returns dict of extras stripped
|
|
'''
|
|
try:
|
|
new_dict_extras = []
|
|
for extra in _dict.get('extras', []):
|
|
k, v = extra['key'], extra['value']
|
|
if isinstance(v, str):
|
|
v = str(v).strip()
|
|
new_dict_extras.append({k, v})
|
|
|
|
_dict['extras'] = new_dict_extras
|
|
return _dict
|
|
except Exception as inst:
|
|
log.error("Error _ckan_dataset_extras_strip: " + str(inst))
|
|
|
|
|
|
class F2DSDCATAPProfile(RDFProfile):
|
|
'''
|
|
An RDF profile for the F2DS. Implemented by Francesco Mangiacrapa at ISTI-CNR
|
|
|
|
Called before of the European DCAT-AP profile (`euro_dcat_ap`)
|
|
'''
|
|
|
|
def _set_extras_dict_value(self, _dict, key, value, default=None):
|
|
'''
|
|
Set the value for the given key on a CKAN dict `extras`
|
|
Return the new dict
|
|
'''
|
|
|
|
new_dict_extras = []
|
|
for extra in _dict.get('extras', []):
|
|
new_dict_extras.append(extra)
|
|
if extra['key'] == key or extra['key'] == 'dcat_' + key:
|
|
extra['value'] = value
|
|
new_dict_extras.append(extra)
|
|
|
|
return new_dict_extras
|
|
|
|
def parse_dataset(self, dataset_dict, dataset_ref):
|
|
log.info("parse_dataset into F2DSDCATAPProfile")
|
|
|
|
document_format = "application/rdf+xml"
|
|
|
|
# Resetting list of resources
|
|
dataset_dict['resources'] = []
|
|
# Distributions
|
|
distribution_index = 0
|
|
# external distribution referred in RDF dataset
|
|
for distribution_url in self.g.objects(dataset_ref, DCAT.distribution):
|
|
try:
|
|
distribution_index += 1
|
|
log.info("\t## {} ## Read distribution url {} for dataset url {}".format(distribution_index, distribution_url, dataset_ref))
|
|
|
|
graph = rdflib.ConjunctiveGraph()
|
|
|
|
for prefix, namespace in namespaces.iteritems():
|
|
graph.bind(prefix, namespace)
|
|
|
|
graph.parse(URIRef(distribution_url), document_format)
|
|
self.g.add((URIRef(distribution_url), RDF.type, DCAT.Distribution))
|
|
self.g += graph
|
|
log.info("\t\t## Added distribution url {} to graph".format(distribution_url))
|
|
except Exception as inst:
|
|
error = str(inst)
|
|
log.error("Error on resolving distribution_url {}. Error: {}".format(distribution_url, error))
|
|
|
|
distributions = []
|
|
|
|
# inline distribution referred in RDF dataset
|
|
for distribution in self.g.subjects(RDF.type, DCAT.Distribution):
|
|
distributions.append(distribution)
|
|
|
|
distribution_index = 0
|
|
for distribution in distributions:
|
|
distribution_index += 1
|
|
log.info("\t## {} ## mapping distribution {} to CKAN resource".format(distribution_index, str(distribution)))
|
|
|
|
resource_dict = {}
|
|
|
|
# Simple values
|
|
for key, predicate in (
|
|
('name', DCT.title),
|
|
('description', DCT.description),
|
|
('access_url', DCAT.accessURL),
|
|
('download_url', DCAT.downloadURL),
|
|
('issued', DCT.issued),
|
|
('modified', DCT.modified),
|
|
('status', ADMS.status),
|
|
('rights', DCT.rights),
|
|
('license', DCT.license),
|
|
):
|
|
value = self._object_value(distribution, predicate)
|
|
if value:
|
|
resource_dict[key] = value
|
|
|
|
resource_dict['url'] = (self._object_value(distribution,
|
|
DCAT.downloadURL) or
|
|
self._object_value(distribution,
|
|
DCAT.accessURL))
|
|
# Lists
|
|
for key, predicate in (
|
|
('language', DCT.language),
|
|
('documentation', FOAF.page),
|
|
('conforms_to', DCT.conformsTo),
|
|
):
|
|
values = self._object_value_list(distribution, predicate)
|
|
if values:
|
|
resource_dict[key] = json.dumps(values)
|
|
|
|
# Format and media type
|
|
normalize_ckan_format = config.get(
|
|
'ckanext.dcat.normalize_ckan_format', True)
|
|
imt, label = self._distribution_format(distribution,
|
|
normalize_ckan_format)
|
|
|
|
if imt:
|
|
resource_dict['mimetype'] = imt
|
|
|
|
if label:
|
|
resource_dict['format'] = label
|
|
elif imt:
|
|
resource_dict['format'] = imt
|
|
|
|
# Size
|
|
size = self._object_value_int(distribution, DCAT.byteSize)
|
|
if size is not None:
|
|
resource_dict['size'] = size
|
|
|
|
# Checksum
|
|
for checksum in self.g.objects(distribution, SPDX.checksum):
|
|
algorithm = self._object_value(checksum, SPDX.algorithm)
|
|
checksum_value = self._object_value(checksum, SPDX.checksumValue)
|
|
if algorithm:
|
|
resource_dict['hash_algorithm'] = algorithm
|
|
if checksum_value:
|
|
resource_dict['hash'] = checksum_value
|
|
|
|
# Distribution URI (explicitly show the missing ones)
|
|
resource_dict['uri'] = (unicode(distribution)
|
|
if isinstance(distribution,
|
|
rdflib.term.URIRef)
|
|
else '')
|
|
|
|
dataset_dict['resources'].append(resource_dict)
|
|
|
|
# Spatial
|
|
spatial = self._spatial(dataset_ref, DCT.spatial)
|
|
log.debug("spatial {}".format(str(spatial)))
|
|
for key in ('uri', 'text', 'geom'):
|
|
if spatial.get(key):
|
|
the_key = 'spatial_{0}'.format(key) if key != 'geom' else 'spatial'
|
|
the_value = spatial.get(key).strip()
|
|
log.info("overriding spatial value to: {}".format(str(the_value)))
|
|
dataset_dict = self._set_extras_dict_value(dataset_dict, the_key, the_value)
|
|
# dataset_dict['extras'].append(
|
|
# {'key': 'spatial_{0}'.format(key) if key != 'geom' else 'spatial',
|
|
# 'value': spatial.get(key).strip()})
|
|
|
|
return dataset_dict
|
|
|
|
|
|
|
|
log.info("F2DS harvester script starts...")
|
|
# Reading Configurations
|
|
if not os.path.exists(F2DS_HARVESTER_INI_PATH):
|
|
log.info("File '{}' not found, F2DS harvester exit".format(F2DS_HARVESTER_INI_PATH))
|
|
sys.exit('No file with f2ds configurations found')
|
|
|
|
f2ds_config = ConfigParser.ConfigParser()
|
|
# parse f2ds_harvester.ini file
|
|
f2ds_config.read(F2DS_HARVESTER_INI_PATH)
|
|
config_dict = dict(f2ds_config.items(F2DS_HARVESTER_INI_DEFAULT_SECTION))
|
|
|
|
fdp_catalogues_endpoint = None
|
|
format_file = None
|
|
license_id = None
|
|
gcat_endpoint = None
|
|
gcube_meta_system_type = None
|
|
gcube_token = None
|
|
try:
|
|
log.info("From {} read configs {}".format(F2DS_HARVESTER_INI_DEFAULT_SECTION, config_dict))
|
|
format_file = config_dict['format_file']
|
|
license_id = config_dict['default_license_id']
|
|
gcat_endpoint = config_dict['gcat_endpoint']
|
|
gcube_meta_system_type = config_dict['gcube_metadata_profile_system_type']
|
|
gcube_token = config_dict['jarvis_token']
|
|
fdp_catalogues_endpoint = config_dict['fdp_catalogues_endpoint']
|
|
|
|
log.info("F2DS harvester is running with configurations:")
|
|
log.info("fdp_catalogues_endpoint: {}".format(fdp_catalogues_endpoint))
|
|
log.info("gcat: {}".format(gcat_endpoint))
|
|
masked_token = gcube_token[0:10]+"_MASKED_TOKEN"
|
|
log.info("gcube_token: {}".format(masked_token))
|
|
log.info("gcube_meta_system_type: {}".format(gcube_meta_system_type))
|
|
except Exception as inst:
|
|
log.error("Error on reading configurations: " + str(inst))
|
|
|
|
# Running
|
|
if not (fdp_catalogues_endpoint):
|
|
fdp_catalogues_endpoint = "https://f2ds.eosc-pillar.eu/expanded?format=rdf"
|
|
log.warn("Using hard coded endpoint {}".format(fdp_catalogues_endpoint))
|
|
|
|
discover_RDFC = DiscoverRDFCatalogue(fdp_catalogues_endpoint, format_file)
|
|
list_catalogues_urls = discover_RDFC.discover_catalogues()
|
|
|
|
countOperations = 0
|
|
countCatalogues = 0
|
|
countCreate = 0
|
|
countUpdate = 0
|
|
countDatasetError = 0
|
|
countCatalogueError = 0
|
|
for catalogue_url in list_catalogues_urls:
|
|
try:
|
|
log.debug("rdf catalogue URL {}".format(catalogue_url))
|
|
countCatalogues += 1
|
|
report_logger.info("CTG.{} Harvesting Catalogue: {}".format(countCatalogues, catalogue_url))
|
|
rdf_catalogue = discover_RDFC.read_catalogue(catalogue_url)
|
|
log.info("from {} got {}".format(catalogue_url, rdf_catalogue))
|
|
|
|
# Workaround until the core translation function defaults to the Flask one
|
|
registry = Registry()
|
|
registry.prepare()
|
|
registry.register(translator, MockTranslator())
|
|
|
|
dataset_body_to_ckan_list = []
|
|
for dataset_url in rdf_catalogue.get_list_dataset_urls():
|
|
dataset_body = http_get_response_body(dataset_url, 'format=rdf')
|
|
if dataset_body is None:
|
|
countDatasetError += 1
|
|
report_logger.info("Error, no body for dataset: {}".format(dataset_url))
|
|
continue
|
|
|
|
log.debug("from {} read dataset_body {}".format(dataset_url, dataset_body))
|
|
ckan_dataset = discover_RDFC.to_ckan_mapping(dataset_body, "application/rdf+xml")
|
|
dataset_body_to_ckan_list.append(ckan_dataset)
|
|
|
|
loop_index = 0
|
|
loop_size = len(dataset_body_to_ckan_list)
|
|
for ckan_dataset_body in dataset_body_to_ckan_list:
|
|
try:
|
|
loop_index += 1
|
|
log.info('\n\nManaging publishing {} of {}'.format(loop_index, loop_size))
|
|
log.debug('ckan_dataset_body {}'.format(ckan_dataset_body))
|
|
# data = str(ckan_dataset_body).encode('utf-8').strip()
|
|
# json.dumps takes in a json object and returns a string.
|
|
json_data = json.dumps(ckan_dataset_body)
|
|
json_data_dict = fill_in_to_gcat_dataset(rdf_catalogue, json_data, license_id, gcube_meta_system_type)
|
|
# just swapping the dataset_name for logging into report
|
|
dataset_name = json_data_dict[DATASET_FIELD_NAME]
|
|
log.debug("Dataset name is {}".format(dataset_name))
|
|
# encoding in utf-8
|
|
json_data = json.dumps(json_data_dict).encode("utf-8")
|
|
log.debug('json_data {}'.format(json_data))
|
|
response = http_json_response_body(gcat_endpoint + '/items', json_data, gcube_token, method='PUT')
|
|
code_status = response["status"]
|
|
if 200 <= code_status <= 208:
|
|
log.info("Update OK. Response returned status {}".format(code_status))
|
|
report_logger.info("DTS.{} Updated dataset: {}".format(loop_index, dataset_name))
|
|
countUpdate += 1
|
|
else:
|
|
log.info("Update request failed (with PUT request). Response returned status {}".format(
|
|
code_status))
|
|
if code_status == 404:
|
|
log.info("Update failed with code {}. Submitting the create with POST request".format(code_status))
|
|
response = http_json_response_body(gcat_endpoint + '/items', json_data, gcube_token, method='POST')
|
|
code_status = response["status"]
|
|
log.info("POST response returned with status {}".format(code_status))
|
|
report_logger.info("DTS.{} Created dataset: {}".format(loop_index, dataset_name))
|
|
countCreate += 1
|
|
else:
|
|
countDatasetError += 1
|
|
report_logger.warn("Not created/updated the dataset: {}".format(dataset_name))
|
|
|
|
except Exception as inst:
|
|
log.error("Error on creating dataset: {}. Error: ".format(dataset_name) + str(inst))
|
|
report_logger.info("Dataset error: {}".format(dataset_name) + str(inst))
|
|
countDatasetError += 1
|
|
|
|
countOperations += 1
|
|
|
|
if loop_index < loop_size:
|
|
log.info("sleeping 2 seconds...")
|
|
time.sleep(2)
|
|
log.info("running")
|
|
except Exception as inst:
|
|
log.error("Error on harvesting catalogue: {}. Error: ".format(catalogue_url) + str(inst))
|
|
report_logger.info("Catalogue error: {}".format(catalogue_url) + str(inst))
|
|
countCatalogueError += 1
|
|
|
|
log.info("F2DS harvesting stage is finished. Operations (create/update) done are: " + str(countOperations))
|
|
report_logger.info("Catalogue/s found: {}".format(countCatalogues))
|
|
report_logger.info("Total # operations: {}".format(countOperations))
|
|
report_logger.info("Datasets created: {}".format(countCreate))
|
|
report_logger.info("Datasets updated: {}".format(countUpdate))
|
|
report_logger.info("Datasets errors: {}".format(countDatasetError))
|
|
report_logger.info("Catalogues errors: {}".format(countCatalogueError))
|
|
|
|
log.info("Going to resync DB -> Solr, with search-index rebuild -r")
|
|
os.system(
|
|
'/usr/lib/ckan/default/bin/paster --plugin=ckan search-index rebuild -r --config=/etc/ckan/default/production.ini')
|
|
log.info("Resync DB -> Solr, done.")
|
|
|
|
# Running
|
|
if countDatasetError == 0:
|
|
log.info("No error detected going to call bulk deletion...")
|
|
report_logger.info("Bulk deletion stage starts...")
|
|
|
|
'''Building yesterday UTC time as YESTERDAY:T23:59:59Z'''
|
|
today = datetime.utcnow().date()
|
|
yesterday = today - timedelta(days=1)
|
|
utc_time_suffix = 'T23:59:59Z'
|
|
yesterday_time = yesterday.__str__() + utc_time_suffix
|
|
|
|
'''Building query string for bulk purge invocation on gCat'''
|
|
query_string = {"q": "extras_systemtype:{}".format(gcube_meta_system_type),
|
|
"fq": "metadata_modified:[* TO {}]".format(yesterday_time),
|
|
"own_only": True}
|
|
safe_query_string = urllib.urlencode(query_string)
|
|
|
|
uri = gcat_endpoint + '/items?{}'.format(safe_query_string)
|
|
report_logger.info("Calling {}".format(uri))
|
|
response = http_json_response_body(uri, None, gcube_token, method='PURGE')
|
|
report_logger.info("\nBulk deletion called for {}".format(uri))
|
|
log.info("\nBulk deletion called correctly with parameters {}".format(query_string))
|
|
else:
|
|
report_logger.info("\nDatasets error/s found, so skipping bulk deletion stage")
|
|
log.info("\nDatasets error/s found, so skipping bulk deletion stage")
|
|
|
|
log.info("\n\nF2DS harvester terminated!")
|
|
sys.exit(0)
|
|
|