Copied from SVN

This commit is contained in:
Francesco Mangiacrapa 2022-12-16 15:01:46 +01:00
commit 4915e82d7b
6 changed files with 1182 additions and 0 deletions

17
.project Normal file
View File

@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>FPD-harvester</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.python.pydev.PyDevBuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.python.pydev.pythonNature</nature>
</natures>
</projectDescription>

7
CHANGELOG.md Normal file
View File

@ -0,0 +1,7 @@
# Changelog
All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
- First release

311
LICENSE.md Normal file
View File

@ -0,0 +1,311 @@
#European Union Public Licence V.1.1
##*EUPL © the European Community 2007*
This **European Union Public Licence** (the **“EUPL”**) applies to the Work or Software
(as defined below) which is provided under the terms of this Licence. Any use of
the Work, other than as authorised under this Licence is prohibited (to the
extent such use is covered by a right of the copyright holder of the Work).
The Original Work is provided under the terms of this Licence when the Licensor
(as defined below) has placed the following notice immediately following the
copyright notice for the Original Work:
**Licensed under the EUPL V.1.1**
or has expressed by any other mean his willingness to license under the EUPL.
##1. Definitions
In this Licence, the following terms have the following meaning:
- The Licence: this Licence.
- The Original Work or the Software: the software distributed and/or
communicated by the Licensor under this Licence, available as Source Code and
also as Executable Code as the case may be.
- Derivative Works: the works or software that could be created by the Licensee,
based upon the Original Work or modifications thereof. This Licence does not
define the extent of modification or dependence on the Original Work required
in order to classify a work as a Derivative Work; this extent is determined by
copyright law applicable in the country mentioned in Article 15.
- The Work: the Original Work and/or its Derivative Works.
- The Source Code: the human-readable form of the Work which is the most
convenient for people to study and modify.
- The Executable Code: any code which has generally been compiled and which is
meant to be interpreted by a computer as a program.
- The Licensor: the natural or legal person that distributes and/or communicates
the Work under the Licence.
- Contributor(s): any natural or legal person who modifies the Work under the
Licence, or otherwise contributes to the creation of a Derivative Work.
- The Licensee or “You”: any natural or legal person who makes any usage of the
Software under the terms of the Licence.
- Distribution and/or Communication: any act of selling, giving, lending,
renting, distributing, communicating, transmitting, or otherwise making
available, on-line or off-line, copies of the Work or providing access to its
essential functionalities at the disposal of any other natural or legal
person.
##2. Scope of the rights granted by the Licence
The Licensor hereby grants You a world-wide, royalty-free, non-exclusive,
sub-licensable licence to do the following, for the duration of copyright vested
in the Original Work:
- use the Work in any circumstance and for all usage, reproduce the Work, modify
- the Original Work, and make Derivative Works based upon the Work, communicate
- to the public, including the right to make available or display the Work or
- copies thereof to the public and perform publicly, as the case may be, the
- Work, distribute the Work or copies thereof, lend and rent the Work or copies
- thereof, sub-license rights in the Work or copies thereof.
Those rights can be exercised on any media, supports and formats, whether now
known or later invented, as far as the applicable law permits so.
In the countries where moral rights apply, the Licensor waives his right to
exercise his moral right to the extent allowed by law in order to make effective
the licence of the economic rights here above listed.
The Licensor grants to the Licensee royalty-free, non exclusive usage rights to
any patents held by the Licensor, to the extent necessary to make use of the
rights granted on the Work under this Licence.
##3. Communication of the Source Code
The Licensor may provide the Work either in its Source Code form, or as
Executable Code. If the Work is provided as Executable Code, the Licensor
provides in addition a machine-readable copy of the Source Code of the Work
along with each copy of the Work that the Licensor distributes or indicates, in
a notice following the copyright notice attached to the Work, a repository where
the Source Code is easily and freely accessible for as long as the Licensor
continues to distribute and/or communicate the Work.
##4. Limitations on copyright
Nothing in this Licence is intended to deprive the Licensee of the benefits from
any exception or limitation to the exclusive rights of the rights owners in the
Original Work or Software, of the exhaustion of those rights or of other
applicable limitations thereto.
##5. Obligations of the Licensee
The grant of the rights mentioned above is subject to some restrictions and
obligations imposed on the Licensee. Those obligations are the following:
Attribution right: the Licensee shall keep intact all copyright, patent or
trademarks notices and all notices that refer to the Licence and to the
disclaimer of warranties. The Licensee must include a copy of such notices and a
copy of the Licence with every copy of the Work he/she distributes and/or
communicates. The Licensee must cause any Derivative Work to carry prominent
notices stating that the Work has been modified and the date of modification.
Copyleft clause: If the Licensee distributes and/or communicates copies of the
Original Works or Derivative Works based upon the Original Work, this
Distribution and/or Communication will be done under the terms of this Licence
or of a later version of this Licence unless the Original Work is expressly
distributed only under this version of the Licence. The Licensee (becoming
Licensor) cannot offer or impose any additional terms or conditions on the Work
or Derivative Work that alter or restrict the terms of the Licence.
Compatibility clause: If the Licensee Distributes and/or Communicates Derivative
Works or copies thereof based upon both the Original Work and another work
licensed under a Compatible Licence, this Distribution and/or Communication can
be done under the terms of this Compatible Licence. For the sake of this clause,
“Compatible Licence” refers to the licences listed in the appendix attached to
this Licence. Should the Licensees obligations under the Compatible Licence
conflict with his/her obligations under this Licence, the obligations of the
Compatible Licence shall prevail.
Provision of Source Code: When distributing and/or communicating copies of the
Work, the Licensee will provide a machine-readable copy of the Source Code or
indicate a repository where this Source will be easily and freely available for
as long as the Licensee continues to distribute and/or communicate the Work.
Legal Protection: This Licence does not grant permission to use the trade names,
trademarks, service marks, or names of the Licensor, except as required for
reasonable and customary use in describing the origin of the Work and
reproducing the content of the copyright notice.
##6. Chain of Authorship
The original Licensor warrants that the copyright in the Original Work granted
hereunder is owned by him/her or licensed to him/her and that he/she has the
power and authority to grant the Licence.
Each Contributor warrants that the copyright in the modifications he/she brings
to the Work are owned by him/her or licensed to him/her and that he/she has the
power and authority to grant the Licence.
Each time You accept the Licence, the original Licensor and subsequent
Contributors grant You a licence to their contributions to the Work, under the
terms of this Licence.
##7. Disclaimer of Warranty
The Work is a work in progress, which is continuously improved by numerous
contributors. It is not a finished work and may therefore contain defects or
“bugs” inherent to this type of software development.
For the above reason, the Work is provided under the Licence on an “as is” basis
and without warranties of any kind concerning the Work, including without
limitation merchantability, fitness for a particular purpose, absence of defects
or errors, accuracy, non-infringement of intellectual property rights other than
copyright as stated in Article 6 of this Licence.
This disclaimer of warranty is an essential part of the Licence and a condition
for the grant of any rights to the Work.
##8. Disclaimer of Liability
Except in the cases of wilful misconduct or damages directly caused to natural
persons, the Licensor will in no event be liable for any direct or indirect,
material or moral, damages of any kind, arising out of the Licence or of the use
of the Work, including without limitation, damages for loss of goodwill, work
stoppage, computer failure or malfunction, loss of data or any commercial
damage, even if the Licensor has been advised of the possibility of such
damage. However, the Licensor will be liable under statutory product liability
laws as far such laws apply to the Work.
##9. Additional agreements
While distributing the Original Work or Derivative Works, You may choose to
conclude an additional agreement to offer, and charge a fee for, acceptance of
support, warranty, indemnity, or other liability obligations and/or services
consistent with this Licence. However, in accepting such obligations, You may
act only on your own behalf and on your sole responsibility, not on behalf of
the original Licensor or any other Contributor, and only if You agree to
indemnify, defend, and hold each Contributor harmless for any liability incurred
by, or claims asserted against such Contributor by the fact You have accepted
any such warranty or additional liability.
##10. Acceptance of the Licence
The provisions of this Licence can be accepted by clicking on an icon “I agree”
placed under the bottom of a window displaying the text of this Licence or by
affirming consent in any other similar way, in accordance with the rules of
applicable law. Clicking on that icon indicates your clear and irrevocable
acceptance of this Licence and all of its terms and conditions.
Similarly, you irrevocably accept this Licence and all of its terms and
conditions by exercising any rights granted to You by Article 2 of this Licence,
such as the use of the Work, the creation by You of a Derivative Work or the
Distribution and/or Communication by You of the Work or copies thereof.
##11. Information to the public
In case of any Distribution and/or Communication of the Work by means of
electronic communication by You (for example, by offering to download the Work
from a remote location) the distribution channel or media (for example, a
website) must at least provide to the public the information requested by the
applicable law regarding the Licensor, the Licence and the way it may be
accessible, concluded, stored and reproduced by the Licensee.
##12. Termination of the Licence
The Licence and the rights granted hereunder will terminate automatically upon
any breach by the Licensee of the terms of the Licence.
Such a termination will not terminate the licences of any person who has
received the Work from the Licensee under the Licence, provided such persons
remain in full compliance with the Licence.
##13. Miscellaneous
Without prejudice of Article 9 above, the Licence represents the complete
agreement between the Parties as to the Work licensed hereunder.
If any provision of the Licence is invalid or unenforceable under applicable
law, this will not affect the validity or enforceability of the Licence as a
whole. Such provision will be construed and/or reformed so as necessary to make
it valid and enforceable.
The European Commission may publish other linguistic versions and/or new
versions of this Licence, so far this is required and reasonable, without
reducing the scope of the rights granted by the Licence. New versions of the
Licence will be published with a unique version number.
All linguistic versions of this Licence, approved by the European Commission,
have identical value. Parties can take advantage of the linguistic version of
their choice.
##14. Jurisdiction
Any litigation resulting from the interpretation of this License, arising
between the European Commission, as a Licensor, and any Licensee, will be
subject to the jurisdiction of the Court of Justice of the European Communities,
as laid down in article 238 of the Treaty establishing the European Community.
Any litigation arising between Parties, other than the European Commission, and
resulting from the interpretation of this License, will be subject to the
exclusive jurisdiction of the competent court where the Licensor resides or
conducts its primary business.
##15. Applicable Law
This Licence shall be governed by the law of the European Union country where
the Licensor resides or has his registered office.
This licence shall be governed by the Belgian law if:
- a litigation arises between the European Commission, as a Licensor, and any
- Licensee; the Licensor, other than the European Commission, has no residence
- or registered office inside a European Union country.
---
##Appendix
**“Compatible Licences”** according to article 5 EUPL are:
- GNU General Public License (GNU GPL) v. 2
- Open Software License (OSL) v. 2.1, v. 3.0
- Common Public License v. 1.0
- Eclipse Public License v. 1.0
- Cecill v. 2.0

58
README.md Normal file
View File

@ -0,0 +1,58 @@
# FPD Harvester
The FPD Harvester is a python application which use the ckanext-dcat pluin to collect data harvested from a F2DS catalogue (i.e https://f2ds.eosc-pillar.eu/expanded?format=rdf)
and publish them into a D4Science Catalogue instance powered by CKAN
## Built With
* [Python 2.6](https://www.python.org/download/releases/2.6/) - The JDK used
**Requires**
* ckanext-dcat. [ckanext-dcat](https://github.com/ckan/ckanext-dcat]
## Documentation
N/A
## Change log
See the [Releases](https://code-repo.d4science.org/CKAN-Extensions/FPD-harvester/releases)
## Authors
* **Francesco Mangiacrapa** ([ORCID](https://orcid.org/0000-0002-6528-664X)) Computer Scientist at [ISTI-CNR Infrascience Group](http://nemis.isti.cnr.it/groups/infrascience)
## License
This project is licensed under the EUPL V.1.1 License - see the [LICENSE.md](LICENSE.md) file for details.
## About the gCube Framework
This software is part of the [gCubeFramework](https://www.gcube-system.org/ "gCubeFramework"): an
open-source software toolkit used for building and operating Hybrid Data
Infrastructures enabling the dynamic deployment of Virtual Research Environments
by favouring the realisation of reuse oriented policies.
The projects leading to this software have received funding from a series of European Union programmes including:
- the Sixth Framework Programme for Research and Technological Development
- DILIGENT (grant no. 004260).
- the Seventh Framework Programme for research, technological development and demonstration
- D4Science (grant no. 212488);
- D4Science-II (grant no.239019);
- ENVRI (grant no. 283465);
- EUBrazilOpenBio (grant no. 288754);
- iMarine(grant no. 283644).
- the H2020 research and innovation programme
- BlueBRIDGE (grant no. 675680);
- EGIEngage (grant no. 654142);
- ENVRIplus (grant no. 654182);
- PARTHENOS (grant no. 654119);
- SoBigData (grant no. 654024);
- DESIRA (grant no. 818194);
- ARIADNEplus (grant no. 823914);
- RISIS2 (grant no. 824091);
- PerformFish (grant no. 727610);
- AGINFRAplus (grant no. 731001).

View File

@ -0,0 +1,783 @@
import json
import os
import urllib2
import urllib
import sys
import re
import time
from warnings import catch_warnings
import rdflib
import rdflib.parser
from rdflib import URIRef, BNode, Literal
from rdflib.namespace import Namespace, RDF
from ckanext.dcat.processors import RDFParser
from ckanext.dcat.profiles import EuropeanDCATAPProfile
from ckanext.dcat.profiles import RDFProfile
# needed for #workaround until the core translation function defaults to the Flask one
from paste.registry import Registry
from ckan.lib.cli import MockTranslator
from ckantoolkit import config
from pylons import translator
import copy
from ckan.plugins import toolkit
import logging
from logging.handlers import RotatingFileHandler
from logging import handlers
from datetime import datetime, timedelta
import ConfigParser
# Log stdout Handler
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
log = logging.getLogger(__name__)
# Log FILE handler
# LOGFILE = "/var/log/ckan/f2ds_harvester.log"
LOG_PATH = "/var/log/ckan/f2ds-log"
LOGFILE = LOG_PATH + "/f2ds_harvester.log"
# Create the log PATH folder if does not exist
if not os.path.exists(LOG_PATH):
os.makedirs(LOG_PATH)
fh = RotatingFileHandler(LOGFILE, maxBytes=(1048576 * 10), backupCount=7)
fh_formatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s")
fh.setLevel(logging.INFO)
fh.setFormatter(fh_formatter)
log.addHandler(fh)
# create report_logger. It is filled with latest harvesting results
report_logger = logging.getLogger("f2ds.summary_latest_harvesting")
report_logger.setLevel(logging.INFO)
REPORT_HARVESTING_FILE = LOG_PATH + "/f2ds_summary_latest_harvesting.log"
# create file handler which logs even debug messages
fh_report = logging.FileHandler(REPORT_HARVESTING_FILE, mode='w')
fh_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh_report.setLevel(logging.INFO)
fh_report.setFormatter(fh_formatter)
report_logger.addHandler(fh_report)
# Created by Francesco Mangiacrapa
# francesco.mangiacrapa@isti.cnr.it
# ISTI-CNR Pisa (ITALY)
DCT = Namespace("http://purl.org/dc/terms/")
DCAT = Namespace("http://www.w3.org/ns/dcat#")
ADMS = Namespace("http://www.w3.org/ns/adms#")
VCARD = Namespace("http://www.w3.org/2006/vcard/ns#")
FOAF = Namespace("http://xmlns.com/foaf/0.1/")
SCHEMA = Namespace('http://schema.org/')
TIME = Namespace('http://www.w3.org/2006/time')
LOCN = Namespace('http://www.w3.org/ns/locn#')
GSP = Namespace('http://www.opengis.net/ont/geosparql#')
OWL = Namespace('http://www.w3.org/2002/07/owl#')
SPDX = Namespace('http://spdx.org/rdf/terms#')
RE3DATA = Namespace('http://www.re3data.org/schema/3-0#')
namespaces = {
'dct': DCT,
'dcat': DCAT,
'adms': ADMS,
'vcard': VCARD,
'foaf': FOAF,
'schema': SCHEMA,
'time': TIME,
'locn': LOCN,
'gsp': GSP,
'owl': OWL,
'spdx': SPDX,
'r3d': RE3DATA,
}
DATASET_FIELD_NAME = 'name'
DATASET_NAME_MAX_SIZE = 100
F2DS_HARVESTER_INI_PATH = '/etc/ckan/default/f2ds_harvester.ini'
F2DS_HARVESTER_INI_DEFAULT_SECTION = 'DEFAULT_CONFIGS'
def http_get_response_body(uri, dict_query_args=None, gcube_token=None):
if dict_query_args:
uri += "?{}".format(dict_query_args)
headers = {}
if gcube_token:
headers["gcube-token"] = gcube_token
log.info("Sending GET request to URL: %s" % uri)
req = urllib2.Request(uri, None, headers)
try:
resp = urllib2.urlopen(req, timeout=60)
except urllib2.HTTPError as e:
log.error("The request sent to the URL {} is failed".format(uri))
log.error("HTTPError: %d" % e.code)
return None
except urllib2.URLError as e:
# Not an HTTP-specific error (e.g. connection refused)
log.error("URLError - Input URI: %s " % uri + " is not valid!!")
return None
else:
# 200
body = resp.read()
return body
def copy_list_of_dict(list_source):
cpy_list = []
for li in list_source:
d2 = copy.deepcopy(li)
cpy_list.append(d2)
return cpy_list
def http_json_response_body(uri, data=None, gcube_token=None, method='POST'):
'''
:param uri: the uri
:param data: the json data to send
:param gcube_token: the gcube-token that must be used for contacting gCat serivce
:param method: the http method to perform the request, default is 'POST'
:return: the HTTP response returned as a dictionary of type dict(body='', status='', error='')
'''
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
}
if gcube_token:
headers["gcube-token"] = gcube_token
''' Appending the dataset name to URI as /[dataset-name]'''
if method == 'PUT':
data_json_dict = json.loads(data)
uri += '/' + data_json_dict['name']
# data_json_dict['name'] = data_json_dict['name']+'new'
# data = json.dumps(data_json_dict).encode("utf-8")
req = urllib2.Request(uri, data, headers)
if method:
req.get_method = lambda: method
log.info("Sending {} request to URL: {}".format(method, uri))
log.debug("Headers: %s" % headers)
log.debug("data: %s" % data)
try:
resp = urllib2.urlopen(req, timeout=300)
response_status_code = resp.getcode()
except urllib2.HTTPError as e:
response_status_code = e.code
log.error("The request sent to the URL {} is failed".format(uri))
log.error("HTTPError: {}. Error message: {}".format(e.code, e.read()))
return dict(body=None, status=response_status_code, error=e.read())
except urllib2.URLError as e:
response_status_code = e.code
# Not an HTTP-specific error (e.g. connection refused)
log.error("URLError - Input URI: %s " % uri + " is not valid!!")
return dict(body=None, status=response_status_code, error=e.read())
else:
# 200
return dict(body=resp.read(), status=response_status_code, error=None)
def fill_in_to_gcat_dataset(rdf_catalogue, json_data, license_id='CC-BY-SA-4.0', system_type='Dataset'):
# data_json_array = json.dumps(json_data)
data_json_dict = json.loads(json_data)
log.debug("read json data {}".format(json_data))
dataset_name = None
try:
dataset_name = data_json_dict[DATASET_FIELD_NAME]
except KeyError as e:
log.info("The field " + DATASET_FIELD_NAME + " not found from source. KeyError {}".format(e))
if not dataset_name:
log.debug("Trying to generate the field 'name' from 'title'")
dataset_title = data_json_dict['title']
if not dataset_title:
raise NameError('name and title fields are not defined')
# The field 'name' must be between 2 and 100 characters long
# and contain only lowercase alphanumeric characters, '-' and '_'.
rescue_dataset_title = normalize_to_alphanumeric_string(dataset_title, '-')
rescue_dataset_title = rescue_dataset_title.lower()
dataset_name = (rescue_dataset_title[:DATASET_NAME_MAX_SIZE]) if len(
rescue_dataset_title) > DATASET_NAME_MAX_SIZE else rescue_dataset_title
log.info("dataset cleaned name is: {} ".format(dataset_name))
if not dataset_name:
raise NameError('field name not found and built')
data_json_dict[DATASET_FIELD_NAME] = dataset_name
# Managing tags
try:
dict_normalized_ckan_tags = dict_of_normalized_ckan_tags(data_json_dict['tags'])
# there are tag normalized
if len(dict_normalized_ckan_tags) > 0:
data_json_dict['tags'] = []
for tag in dict_normalized_ckan_tags:
# adding to tags
norm_tag = dict_normalized_ckan_tags[tag]
data_json_dict['tags'].append({'name': norm_tag})
# if tag is not equal to normalized tag value
if tag != norm_tag:
# adding to extras as keyword
data_json_dict['extras'].append({'key': 'keyword', 'value': tag})
else:
# at least one tag is required for item creation on CKAN. Adding "No Tag"
data_json_dict['tags'] = []
data_json_dict['tags'].append({'name': 'No Tag'})
log.info("no tag found, adding 'No Tag' as tag because at least one tag is required")
except Exception as e:
log.error("Error occurred during managing tags . Exception {}".format(e))
if not license_id:
license_id = 'CC-BY-SA-4.0'
if not system_type:
system_type = 'Dataset'
# adding system:type
data_json_dict['extras'].append({'key': 'system:type', 'value': system_type})
rescue_catalogue_title = rdf_catalogue.get_name()
if rescue_catalogue_title:
pattern = '[^a-zA-Z0-9_\-]+'
remove_white_spaces = True
rescue_catalogue_title = normalize_to_alphanumeric_string(rescue_catalogue_title, '-', pattern, remove_white_spaces)
else:
rescue_catalogue_title = "No Catalogue"
log.info("rescue_catalogue_title is {} ".format(rescue_catalogue_title))
# adding catalogue_title
data_json_dict['extras'].append({'key': 'catalogue_title', 'value': rescue_catalogue_title})
# adding license
data_json_dict['license_id'] = license_id
return data_json_dict
# Returns a string containing only alphanumeric characters, '-' and '_'.
def normalize_to_alphanumeric_string(sting_value, replace_with='_', pattern='[^a-zA-Z0-9_\-]+', remove_white_spaces=False):
sanitize1 = re.sub(pattern, replace_with, sting_value)
if remove_white_spaces:
return re.sub(' ', replace_with, sanitize1)
return sanitize1
# For any ckan tag found normalizes its value (as alphanumeric characters with '-' and '_')
# returns the dict {'tag': 'normalized-tag'}
def dict_of_normalized_ckan_tags(tag_array):
dict_normalized_ckan_tags = {}
# alphanumeric string with '-' and '_' and ' '
pattern = '[^a-zA-Z0-9_\- ]+'
if tag_array:
for tag in tag_array:
tag_value = tag.get('name')
log.info("tag value is: {}".format(tag_value))
n_tag_value = normalize_to_alphanumeric_string(tag_value, '_', pattern)
dict_normalized_ckan_tags[tag_value] = n_tag_value
log.info("returning the dict of normalized tag: {}".format(dict_normalized_ckan_tags))
return dict_normalized_ckan_tags
class RDFCatalogue(object):
def __init__(self, catalogue_url, catalogue_name, catalogue_title, catalogue_description, list_dataset_urls):
self._url = catalogue_url
self._name = catalogue_name
self._title = catalogue_title
self._description = catalogue_description
self._list_dataset_urls = list_dataset_urls
def get_url(self):
return self._url
def get_name(self):
return self._name
def get_list_dataset_urls(self):
return self._list_dataset_urls
def get_title(self):
return self._title
def get_description(self):
return self._description
def __repr__(self):
return "RDFCatalogue(url: {0}, name: {1}, title: {2}, description: {3}, datasets: {4})".format(self._url,
self._name,
self._title,
self._description,
self._list_dataset_urls)
class DiscoverRDFCatalogue(object):
def __init__(self, fdp_catalogues_endpoint, format_file):
self._fdp_catalogues_endpoint = fdp_catalogues_endpoint
self._format_file = format_file
def discover_catalogues(self):
g = rdflib.ConjunctiveGraph()
for prefix, namespace in namespaces.iteritems():
g.bind(prefix, namespace)
list_catalogue_urls = []
try:
catalogues_ref = URIRef(self._fdp_catalogues_endpoint)
parser = g.parse(source=self._fdp_catalogues_endpoint, format=self._format_file)
# print(parser.all_nodes())
# loop through each triple in the graph (subj, pred, obj)
for subj, pred, obj in g:
# check if there is at least one triple in the Graph
log.debug("subj has {} pred has {} obj has {}".format(subj, pred, obj))
if (subj, pred, obj) not in g:
raise Exception("It better be!")
log.debug("graph has {} statements.".format(len(g)))
for catalog in g.subject_objects(RE3DATA.dataCatalog):
log.debug("catalogue url found {}".format(catalog[1]))
list_catalogue_urls.append(str(catalog[1]))
log.info("returning list of catalogue URLs {}".format(list_catalogue_urls))
return list_catalogue_urls
except Exception as inst:
log.error("Error on reading list of catalogues: " + str(inst))
log.info("Returning empty list of catalogues")
return list_catalogue_urls
def read_catalogue(self, catalogue_url):
g = rdflib.ConjunctiveGraph()
for prefix, namespace in namespaces.iteritems():
g.bind(prefix, namespace)
list_dataset_urls = []
try:
catalog_ref = URIRef(catalogue_url)
parser = g.parse(source=catalogue_url, format=self._format_file)
# print(parser.all_nodes())
# loop through each triple in the graph (subj, pred, obj)
for subj, pred, obj in g:
# check if there is at least one triple in the Graph
log.debug("subj has {} pred has {} obj has {}".format(subj, pred, obj))
if (subj, pred, obj) not in g:
raise Exception("It better be!")
log.debug("graph has {} statements.".format(len(g)))
catalogue_name = None
for name in g.subject_objects(FOAF.name):
catalogue_name = str(name[1])
log.info("catalogue name found: {}".format(catalogue_name))
break
catalogue_title = None
for title in g.subject_objects(DCT.title):
catalogue_title = str(title[1])
log.info("catalogue title found: {}".format(catalogue_title))
break
catalogue_description = None
for description in g.subject_objects(DCT.description):
catalogue_description = str(description[1])
log.info("catalogue description found: {}".format(catalogue_description))
break
for dataset in g.subject_objects(DCAT.dataset):
log.info("dataset url found: {}".format(dataset[1]))
list_dataset_urls.append(str(dataset[1]))
log.info("returning list of dataset URLs {}".format(list_dataset_urls))
return RDFCatalogue(catalogue_url, catalogue_name, catalogue_title, catalogue_description,
list_dataset_urls)
except Exception as inst:
log.error("Error on reading catalogue: " + str(inst))
log.info("Returning empty list of datasets")
return RDFCatalogue(catalogue_url, "", [])
def to_ckan_mapping(self, dataset_body, format):
parser = RDFParser()
parser._profiles = [EuropeanDCATAPProfile, F2DSDCATAPProfile]
parser.parse(dataset_body, _format=format)
indent = 4
ckan_datasets = [d for d in parser.datasets()]
# only one exists
ckan_dataset = ckan_datasets[0]
log.debug("ckan_dataset is {}".format(ckan_dataset))
# ckan_dataset = self._ckan_dataset_extras_strip(ckan_dataset)
# print(json.dumps(ckan_dataset, indent=indent))
# print(json.dumps(ckan_datasets, indent=indent))
return ckan_dataset
def _ckan_dataset_extras_strip(self, _dict):
'''
Returns dict of extras stripped
'''
try:
new_dict_extras = []
for extra in _dict.get('extras', []):
k, v = extra['key'], extra['value']
if isinstance(v, str):
v = str(v).strip()
new_dict_extras.append({k, v})
_dict['extras'] = new_dict_extras
return _dict
except Exception as inst:
log.error("Error _ckan_dataset_extras_strip: " + str(inst))
class F2DSDCATAPProfile(RDFProfile):
'''
An RDF profile for the F2DS. Implemented by Francesco Mangiacrapa at ISTI-CNR
Called before of the European DCAT-AP profile (`euro_dcat_ap`)
'''
def _set_extras_dict_value(self, _dict, key, value, default=None):
'''
Set the value for the given key on a CKAN dict `extras`
Return the new dict
'''
new_dict_extras = []
for extra in _dict.get('extras', []):
new_dict_extras.append(extra)
if extra['key'] == key or extra['key'] == 'dcat_' + key:
extra['value'] = value
new_dict_extras.append(extra)
return new_dict_extras
def parse_dataset(self, dataset_dict, dataset_ref):
log.info("parse_dataset into F2DSDCATAPProfile")
document_format = "application/rdf+xml"
# Resetting list of resources
dataset_dict['resources'] = []
# Distributions
distribution_index = 0
# external distribution referred in RDF dataset
for distribution_url in self.g.objects(dataset_ref, DCAT.distribution):
try:
distribution_index += 1
log.info("\t## {} ## Read distribution url {} for dataset url {}".format(distribution_index, distribution_url, dataset_ref))
graph = rdflib.ConjunctiveGraph()
for prefix, namespace in namespaces.iteritems():
graph.bind(prefix, namespace)
graph.parse(URIRef(distribution_url), document_format)
self.g.add((URIRef(distribution_url), RDF.type, DCAT.Distribution))
self.g += graph
log.info("\t\t## Added distribution url {} to graph".format(distribution_url))
except Exception as inst:
error = str(inst)
log.error("Error on resolving distribution_url {}. Error: {}".format(distribution_url, error))
distributions = []
# inline distribution referred in RDF dataset
for distribution in self.g.subjects(RDF.type, DCAT.Distribution):
distributions.append(distribution)
distribution_index = 0
for distribution in distributions:
distribution_index += 1
log.info("\t## {} ## mapping distribution {} to CKAN resource".format(distribution_index, str(distribution)))
resource_dict = {}
# Simple values
for key, predicate in (
('name', DCT.title),
('description', DCT.description),
('access_url', DCAT.accessURL),
('download_url', DCAT.downloadURL),
('issued', DCT.issued),
('modified', DCT.modified),
('status', ADMS.status),
('rights', DCT.rights),
('license', DCT.license),
):
value = self._object_value(distribution, predicate)
if value:
resource_dict[key] = value
resource_dict['url'] = (self._object_value(distribution,
DCAT.downloadURL) or
self._object_value(distribution,
DCAT.accessURL))
# Lists
for key, predicate in (
('language', DCT.language),
('documentation', FOAF.page),
('conforms_to', DCT.conformsTo),
):
values = self._object_value_list(distribution, predicate)
if values:
resource_dict[key] = json.dumps(values)
# Format and media type
normalize_ckan_format = config.get(
'ckanext.dcat.normalize_ckan_format', True)
imt, label = self._distribution_format(distribution,
normalize_ckan_format)
if imt:
resource_dict['mimetype'] = imt
if label:
resource_dict['format'] = label
elif imt:
resource_dict['format'] = imt
# Size
size = self._object_value_int(distribution, DCAT.byteSize)
if size is not None:
resource_dict['size'] = size
# Checksum
for checksum in self.g.objects(distribution, SPDX.checksum):
algorithm = self._object_value(checksum, SPDX.algorithm)
checksum_value = self._object_value(checksum, SPDX.checksumValue)
if algorithm:
resource_dict['hash_algorithm'] = algorithm
if checksum_value:
resource_dict['hash'] = checksum_value
# Distribution URI (explicitly show the missing ones)
resource_dict['uri'] = (unicode(distribution)
if isinstance(distribution,
rdflib.term.URIRef)
else '')
dataset_dict['resources'].append(resource_dict)
# Spatial
spatial = self._spatial(dataset_ref, DCT.spatial)
log.debug("spatial {}".format(str(spatial)))
for key in ('uri', 'text', 'geom'):
if spatial.get(key):
the_key = 'spatial_{0}'.format(key) if key != 'geom' else 'spatial'
the_value = spatial.get(key).strip()
log.info("overriding spatial value to: {}".format(str(the_value)))
dataset_dict = self._set_extras_dict_value(dataset_dict, the_key, the_value)
# dataset_dict['extras'].append(
# {'key': 'spatial_{0}'.format(key) if key != 'geom' else 'spatial',
# 'value': spatial.get(key).strip()})
return dataset_dict
log.info("F2DS harvester script starts...")
# Reading Configurations
if not os.path.exists(F2DS_HARVESTER_INI_PATH):
log.info("File '{}' not found, F2DS harvester exit".format(F2DS_HARVESTER_INI_PATH))
sys.exit('No file with f2ds configurations found')
f2ds_config = ConfigParser.ConfigParser()
# parse f2ds_harvester.ini file
f2ds_config.read(F2DS_HARVESTER_INI_PATH)
config_dict = dict(f2ds_config.items(F2DS_HARVESTER_INI_DEFAULT_SECTION))
fdp_catalogues_endpoint = None
format_file = None
license_id = None
gcat_endpoint = None
gcube_meta_system_type = None
gcube_token = None
try:
log.info("From {} read configs {}".format(F2DS_HARVESTER_INI_DEFAULT_SECTION, config_dict))
format_file = config_dict['format_file']
license_id = config_dict['default_license_id']
gcat_endpoint = config_dict['gcat_endpoint']
gcube_meta_system_type = config_dict['gcube_metadata_profile_system_type']
gcube_token = config_dict['jarvis_token']
fdp_catalogues_endpoint = config_dict['fdp_catalogues_endpoint']
log.info("F2DS harvester is running with configurations:")
log.info("fdp_catalogues_endpoint: {}".format(fdp_catalogues_endpoint))
log.info("gcat: {}".format(gcat_endpoint))
masked_token = gcube_token[0:10]+"_MASKED_TOKEN"
log.info("gcube_token: {}".format(masked_token))
log.info("gcube_meta_system_type: {}".format(gcube_meta_system_type))
except Exception as inst:
log.error("Error on reading configurations: " + str(inst))
# Running
if not (fdp_catalogues_endpoint):
fdp_catalogues_endpoint = "https://f2ds.eosc-pillar.eu/expanded?format=rdf"
log.warn("Using hard coded endpoint {}".format(fdp_catalogues_endpoint))
discover_RDFC = DiscoverRDFCatalogue(fdp_catalogues_endpoint, format_file)
list_catalogues_urls = discover_RDFC.discover_catalogues()
countOperations = 0
countCatalogues = 0
countCreate = 0
countUpdate = 0
countDatasetError = 0
countCatalogueError = 0
for catalogue_url in list_catalogues_urls:
try:
log.debug("rdf catalogue URL {}".format(catalogue_url))
countCatalogues += 1
report_logger.info("CTG.{} Harvesting Catalogue: {}".format(countCatalogues, catalogue_url))
rdf_catalogue = discover_RDFC.read_catalogue(catalogue_url)
log.info("from {} got {}".format(catalogue_url, rdf_catalogue))
# Workaround until the core translation function defaults to the Flask one
registry = Registry()
registry.prepare()
registry.register(translator, MockTranslator())
dataset_body_to_ckan_list = []
for dataset_url in rdf_catalogue.get_list_dataset_urls():
dataset_body = http_get_response_body(dataset_url, 'format=rdf')
if dataset_body is None:
countDatasetError += 1
report_logger.info("Error, no body for dataset: {}".format(dataset_url))
continue
log.debug("from {} read dataset_body {}".format(dataset_url, dataset_body))
ckan_dataset = discover_RDFC.to_ckan_mapping(dataset_body, "application/rdf+xml")
dataset_body_to_ckan_list.append(ckan_dataset)
loop_index = 0
loop_size = len(dataset_body_to_ckan_list)
for ckan_dataset_body in dataset_body_to_ckan_list:
try:
loop_index += 1
log.info('\n\nManaging publishing {} of {}'.format(loop_index, loop_size))
log.debug('ckan_dataset_body {}'.format(ckan_dataset_body))
# data = str(ckan_dataset_body).encode('utf-8').strip()
# json.dumps takes in a json object and returns a string.
json_data = json.dumps(ckan_dataset_body)
json_data_dict = fill_in_to_gcat_dataset(rdf_catalogue, json_data, license_id, gcube_meta_system_type)
# just swapping the dataset_name for logging into report
dataset_name = json_data_dict[DATASET_FIELD_NAME]
log.debug("Dataset name is {}".format(dataset_name))
# encoding in utf-8
json_data = json.dumps(json_data_dict).encode("utf-8")
log.debug('json_data {}'.format(json_data))
response = http_json_response_body(gcat_endpoint + '/items', json_data, gcube_token, method='PUT')
code_status = response["status"]
if 200 <= code_status <= 208:
log.info("Update OK. Response returned status {}".format(code_status))
report_logger.info("DTS.{} Updated dataset: {}".format(loop_index, dataset_name))
countUpdate += 1
else:
log.info("Update request failed (with PUT request). Response returned status {}".format(
code_status))
if code_status == 404:
log.info("Update failed with code {}. Submitting the create with POST request".format(code_status))
response = http_json_response_body(gcat_endpoint + '/items', json_data, gcube_token, method='POST')
code_status = response["status"]
log.info("POST response returned with status {}".format(code_status))
report_logger.info("DTS.{} Created dataset: {}".format(loop_index, dataset_name))
countCreate += 1
else:
countDatasetError += 1
report_logger.warn("Not created/updated the dataset: {}".format(dataset_name))
except Exception as inst:
log.error("Error on creating dataset: {}. Error: ".format(dataset_name) + str(inst))
report_logger.info("Dataset error: {}".format(dataset_name) + str(inst))
countDatasetError += 1
countOperations += 1
if loop_index < loop_size:
log.info("sleeping 2 seconds...")
time.sleep(2)
log.info("running")
except Exception as inst:
log.error("Error on harvesting catalogue: {}. Error: ".format(catalogue_url) + str(inst))
report_logger.info("Catalogue error: {}".format(catalogue_url) + str(inst))
countCatalogueError += 1
log.info("F2DS harvesting stage is finished. Operations (create/update) done are: " + str(countOperations))
report_logger.info("Catalogue/s found: {}".format(countCatalogues))
report_logger.info("Total # operations: {}".format(countOperations))
report_logger.info("Datasets created: {}".format(countCreate))
report_logger.info("Datasets updated: {}".format(countUpdate))
report_logger.info("Datasets errors: {}".format(countDatasetError))
report_logger.info("Catalogues errors: {}".format(countCatalogueError))
log.info("Going to resync DB -> Solr, with search-index rebuild -r")
os.system(
'/usr/lib/ckan/default/bin/paster --plugin=ckan search-index rebuild -r --config=/etc/ckan/default/production.ini')
log.info("Resync DB -> Solr, done.")
# Running
if countDatasetError == 0:
log.info("No error detected going to call bulk deletion...")
report_logger.info("Bulk deletion stage starts...")
'''Building yesterday UTC time as YESTERDAY:T23:59:59Z'''
today = datetime.utcnow().date()
yesterday = today - timedelta(days=1)
utc_time_suffix = 'T23:59:59Z'
yesterday_time = yesterday.__str__() + utc_time_suffix
'''Building query string for bulk purge invocation on gCat'''
query_string = {"q": "extras_systemtype:{}".format(gcube_meta_system_type),
"fq": "metadata_modified:[* TO {}]".format(yesterday_time),
"own_only": True}
safe_query_string = urllib.urlencode(query_string)
uri = gcat_endpoint + '/items?{}'.format(safe_query_string)
report_logger.info("Calling {}".format(uri))
response = http_json_response_body(uri, None, gcube_token, method='PURGE')
report_logger.info("\nBulk deletion called for {}".format(uri))
log.info("\nBulk deletion called correctly with parameters {}".format(query_string))
else:
report_logger.info("\nDatasets error/s found, so skipping bulk deletion stage")
log.info("\nDatasets error/s found, so skipping bulk deletion stage")
log.info("\n\nF2DS harvester terminated!")
sys.exit(0)
# log.info("F2DS harvester testing dataset url converting...")
# # Workaround until the core translation function defaults to the Flask one
# registry = Registry()
# registry.prepare()
# registry.register(translator, MockTranslator())
# dataset_url = "https://f2ds.eosc-pillar.eu/dataset/80b25b6f-2bc5-4c15-955e-13b99d226333"
# #dataset_url = "https://f2ds.eosc-pillar.eu/dataset/6220d4be-add2-4541-95d5-c32f23aeb6ab"
# #dataset_url = "https://f2ds.eosc-pillar.eu/dataset/2bcaa8f6-b0d9-4fac-93e1-174dd33441d9"
# #dataset_url = "https://f2ds.eosc-pillar.eu/dataset/3cde15cf-2a7e-4fd8-b78d-e290b54511c2"
# log.debug("Testing dataset_url {}".format(dataset_url))
# dataset_body = http_get_response_body(dataset_url, 'format=rdf')
# log.debug("from {} read dataset_body {}".format(dataset_url, dataset_body))
# ckan_dataset = discover_RDFC.to_ckan_mapping(dataset_body, "application/rdf+xml")
# log.debug("converted dataset is {}".format(ckan_dataset))
#
# json_data = json.dumps(ckan_dataset)
# log.info("JSON data is {}".format(json_data))
# json_data_dict = fill_in_to_gcat_dataset("Test catalogue", json_data, license_id, gcube_meta_system_type)
# # encoding in utf-8
# json_data = json.dumps(json_data_dict).encode("utf-8")
# log.debug("Dataset JSON is {}".format(json_data))
# log.info("\n\nF2DS harvester testing dataset url, terminated!")
# sys.exit(0)

View File

@ -0,0 +1,6 @@
#!/bin/bash
# Let's call this by venv
echo "Running /usr/lib/ckan/default/src/ckanext-dcat/ckanext/dcat/f2ds_harvester.py (with virtualenv enabled, using python from /usr/lib/ckan/default/bin/python)"
/usr/lib/ckan/default/bin/python /usr/lib/ckan/default/src/ckanext-dcat/ckanext/dcat/f2ds_harvester.py &
exit 0