From 4915e82d7b37c90eff6e715a663264e4d3a3ac69 Mon Sep 17 00:00:00 2001 From: "francesco.mangiacrapa" Date: Fri, 16 Dec 2022 15:01:46 +0100 Subject: [PATCH] Copied from SVN --- .project | 17 + CHANGELOG.md | 7 + LICENSE.md | 311 ++++++++++++ README.md | 58 +++ ckanext/dcat/f2ds_harvester.py | 783 +++++++++++++++++++++++++++++ ckanext/dcat/f2ds_harvester_run.sh | 6 + 6 files changed, 1182 insertions(+) create mode 100644 .project create mode 100644 CHANGELOG.md create mode 100644 LICENSE.md create mode 100644 README.md create mode 100644 ckanext/dcat/f2ds_harvester.py create mode 100644 ckanext/dcat/f2ds_harvester_run.sh diff --git a/.project b/.project new file mode 100644 index 0000000..d084290 --- /dev/null +++ b/.project @@ -0,0 +1,17 @@ + + + FPD-harvester + + + + + + org.python.pydev.PyDevBuilder + + + + + + org.python.pydev.pythonNature + + diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..fe5eebe --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,7 @@ + +# Changelog + +All notable changes to this project will be documented in this file. +This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +- First release diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..1932b4c --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,311 @@ +#European Union Public Licence V.1.1 + +##*EUPL © the European Community 2007* + + +This **European Union Public Licence** (the **“EUPL”**) applies to the Work or Software +(as defined below) which is provided under the terms of this Licence. Any use of +the Work, other than as authorised under this Licence is prohibited (to the +extent such use is covered by a right of the copyright holder of the Work). + +The Original Work is provided under the terms of this Licence when the Licensor +(as defined below) has placed the following notice immediately following the +copyright notice for the Original Work: + +**Licensed under the EUPL V.1.1** + +or has expressed by any other mean his willingness to license under the EUPL. + + + +##1. Definitions + +In this Licence, the following terms have the following meaning: + +- The Licence: this Licence. + +- The Original Work or the Software: the software distributed and/or + communicated by the Licensor under this Licence, available as Source Code and + also as Executable Code as the case may be. + +- Derivative Works: the works or software that could be created by the Licensee, + based upon the Original Work or modifications thereof. This Licence does not + define the extent of modification or dependence on the Original Work required + in order to classify a work as a Derivative Work; this extent is determined by + copyright law applicable in the country mentioned in Article 15. + +- The Work: the Original Work and/or its Derivative Works. + +- The Source Code: the human-readable form of the Work which is the most + convenient for people to study and modify. + +- The Executable Code: any code which has generally been compiled and which is + meant to be interpreted by a computer as a program. + +- The Licensor: the natural or legal person that distributes and/or communicates + the Work under the Licence. + +- Contributor(s): any natural or legal person who modifies the Work under the + Licence, or otherwise contributes to the creation of a Derivative Work. + +- The Licensee or “You”: any natural or legal person who makes any usage of the + Software under the terms of the Licence. + +- Distribution and/or Communication: any act of selling, giving, lending, + renting, distributing, communicating, transmitting, or otherwise making + available, on-line or off-line, copies of the Work or providing access to its + essential functionalities at the disposal of any other natural or legal + person. + + + +##2. Scope of the rights granted by the Licence + +The Licensor hereby grants You a world-wide, royalty-free, non-exclusive, +sub-licensable licence to do the following, for the duration of copyright vested +in the Original Work: + +- use the Work in any circumstance and for all usage, reproduce the Work, modify +- the Original Work, and make Derivative Works based upon the Work, communicate +- to the public, including the right to make available or display the Work or +- copies thereof to the public and perform publicly, as the case may be, the +- Work, distribute the Work or copies thereof, lend and rent the Work or copies +- thereof, sub-license rights in the Work or copies thereof. + +Those rights can be exercised on any media, supports and formats, whether now +known or later invented, as far as the applicable law permits so. + +In the countries where moral rights apply, the Licensor waives his right to +exercise his moral right to the extent allowed by law in order to make effective +the licence of the economic rights here above listed. + +The Licensor grants to the Licensee royalty-free, non exclusive usage rights to +any patents held by the Licensor, to the extent necessary to make use of the +rights granted on the Work under this Licence. + + + +##3. Communication of the Source Code + +The Licensor may provide the Work either in its Source Code form, or as +Executable Code. If the Work is provided as Executable Code, the Licensor +provides in addition a machine-readable copy of the Source Code of the Work +along with each copy of the Work that the Licensor distributes or indicates, in +a notice following the copyright notice attached to the Work, a repository where +the Source Code is easily and freely accessible for as long as the Licensor +continues to distribute and/or communicate the Work. + + + +##4. Limitations on copyright + +Nothing in this Licence is intended to deprive the Licensee of the benefits from +any exception or limitation to the exclusive rights of the rights owners in the +Original Work or Software, of the exhaustion of those rights or of other +applicable limitations thereto. + + + +##5. Obligations of the Licensee + +The grant of the rights mentioned above is subject to some restrictions and +obligations imposed on the Licensee. Those obligations are the following: + +Attribution right: the Licensee shall keep intact all copyright, patent or +trademarks notices and all notices that refer to the Licence and to the +disclaimer of warranties. The Licensee must include a copy of such notices and a +copy of the Licence with every copy of the Work he/she distributes and/or +communicates. The Licensee must cause any Derivative Work to carry prominent +notices stating that the Work has been modified and the date of modification. + +Copyleft clause: If the Licensee distributes and/or communicates copies of the +Original Works or Derivative Works based upon the Original Work, this +Distribution and/or Communication will be done under the terms of this Licence +or of a later version of this Licence unless the Original Work is expressly +distributed only under this version of the Licence. The Licensee (becoming +Licensor) cannot offer or impose any additional terms or conditions on the Work +or Derivative Work that alter or restrict the terms of the Licence. + +Compatibility clause: If the Licensee Distributes and/or Communicates Derivative +Works or copies thereof based upon both the Original Work and another work +licensed under a Compatible Licence, this Distribution and/or Communication can +be done under the terms of this Compatible Licence. For the sake of this clause, +“Compatible Licence” refers to the licences listed in the appendix attached to +this Licence. Should the Licensee’s obligations under the Compatible Licence +conflict with his/her obligations under this Licence, the obligations of the +Compatible Licence shall prevail. + +Provision of Source Code: When distributing and/or communicating copies of the +Work, the Licensee will provide a machine-readable copy of the Source Code or +indicate a repository where this Source will be easily and freely available for +as long as the Licensee continues to distribute and/or communicate the Work. + +Legal Protection: This Licence does not grant permission to use the trade names, +trademarks, service marks, or names of the Licensor, except as required for +reasonable and customary use in describing the origin of the Work and +reproducing the content of the copyright notice. + + + +##6. Chain of Authorship + +The original Licensor warrants that the copyright in the Original Work granted +hereunder is owned by him/her or licensed to him/her and that he/she has the +power and authority to grant the Licence. + +Each Contributor warrants that the copyright in the modifications he/she brings +to the Work are owned by him/her or licensed to him/her and that he/she has the +power and authority to grant the Licence. + +Each time You accept the Licence, the original Licensor and subsequent +Contributors grant You a licence to their contributions to the Work, under the +terms of this Licence. + + + +##7. Disclaimer of Warranty + +The Work is a work in progress, which is continuously improved by numerous +contributors. It is not a finished work and may therefore contain defects or +“bugs” inherent to this type of software development. + +For the above reason, the Work is provided under the Licence on an “as is” basis +and without warranties of any kind concerning the Work, including without +limitation merchantability, fitness for a particular purpose, absence of defects +or errors, accuracy, non-infringement of intellectual property rights other than +copyright as stated in Article 6 of this Licence. + +This disclaimer of warranty is an essential part of the Licence and a condition +for the grant of any rights to the Work. + + + +##8. Disclaimer of Liability + +Except in the cases of wilful misconduct or damages directly caused to natural +persons, the Licensor will in no event be liable for any direct or indirect, +material or moral, damages of any kind, arising out of the Licence or of the use +of the Work, including without limitation, damages for loss of goodwill, work +stoppage, computer failure or malfunction, loss of data or any commercial +damage, even if the Licensor has been advised of the possibility of such +damage. However, the Licensor will be liable under statutory product liability +laws as far such laws apply to the Work. + + + +##9. Additional agreements + +While distributing the Original Work or Derivative Works, You may choose to +conclude an additional agreement to offer, and charge a fee for, acceptance of +support, warranty, indemnity, or other liability obligations and/or services +consistent with this Licence. However, in accepting such obligations, You may +act only on your own behalf and on your sole responsibility, not on behalf of +the original Licensor or any other Contributor, and only if You agree to +indemnify, defend, and hold each Contributor harmless for any liability incurred +by, or claims asserted against such Contributor by the fact You have accepted +any such warranty or additional liability. + + + +##10. Acceptance of the Licence + +The provisions of this Licence can be accepted by clicking on an icon “I agree” +placed under the bottom of a window displaying the text of this Licence or by +affirming consent in any other similar way, in accordance with the rules of +applicable law. Clicking on that icon indicates your clear and irrevocable +acceptance of this Licence and all of its terms and conditions. + +Similarly, you irrevocably accept this Licence and all of its terms and +conditions by exercising any rights granted to You by Article 2 of this Licence, +such as the use of the Work, the creation by You of a Derivative Work or the +Distribution and/or Communication by You of the Work or copies thereof. + + + +##11. Information to the public + +In case of any Distribution and/or Communication of the Work by means of +electronic communication by You (for example, by offering to download the Work +from a remote location) the distribution channel or media (for example, a +website) must at least provide to the public the information requested by the +applicable law regarding the Licensor, the Licence and the way it may be +accessible, concluded, stored and reproduced by the Licensee. + + + +##12. Termination of the Licence + +The Licence and the rights granted hereunder will terminate automatically upon +any breach by the Licensee of the terms of the Licence. + +Such a termination will not terminate the licences of any person who has +received the Work from the Licensee under the Licence, provided such persons +remain in full compliance with the Licence. + + + +##13. Miscellaneous + +Without prejudice of Article 9 above, the Licence represents the complete +agreement between the Parties as to the Work licensed hereunder. + +If any provision of the Licence is invalid or unenforceable under applicable +law, this will not affect the validity or enforceability of the Licence as a +whole. Such provision will be construed and/or reformed so as necessary to make +it valid and enforceable. + +The European Commission may publish other linguistic versions and/or new +versions of this Licence, so far this is required and reasonable, without +reducing the scope of the rights granted by the Licence. New versions of the +Licence will be published with a unique version number. + +All linguistic versions of this Licence, approved by the European Commission, +have identical value. Parties can take advantage of the linguistic version of +their choice. + + + +##14. Jurisdiction + +Any litigation resulting from the interpretation of this License, arising +between the European Commission, as a Licensor, and any Licensee, will be +subject to the jurisdiction of the Court of Justice of the European Communities, +as laid down in article 238 of the Treaty establishing the European Community. + +Any litigation arising between Parties, other than the European Commission, and +resulting from the interpretation of this License, will be subject to the +exclusive jurisdiction of the competent court where the Licensor resides or +conducts its primary business. + + + +##15. Applicable Law + +This Licence shall be governed by the law of the European Union country where +the Licensor resides or has his registered office. + +This licence shall be governed by the Belgian law if: + +- a litigation arises between the European Commission, as a Licensor, and any +- Licensee; the Licensor, other than the European Commission, has no residence +- or registered office inside a European Union country. + + +--- + + +##Appendix + + +**“Compatible Licences”** according to article 5 EUPL are: + + +- GNU General Public License (GNU GPL) v. 2 + +- Open Software License (OSL) v. 2.1, v. 3.0 + +- Common Public License v. 1.0 + +- Eclipse Public License v. 1.0 + +- Cecill v. 2.0 diff --git a/README.md b/README.md new file mode 100644 index 0000000..b55fa25 --- /dev/null +++ b/README.md @@ -0,0 +1,58 @@ +# FPD Harvester + +The FPD Harvester is a python application which use the ckanext-dcat pluin to collect data harvested from a F2DS catalogue (i.e https://f2ds.eosc-pillar.eu/expanded?format=rdf) +and publish them into a D4Science Catalogue instance powered by CKAN + +## Built With + +* [Python 2.6](https://www.python.org/download/releases/2.6/) - The JDK used + +**Requires** +* ckanext-dcat. [ckanext-dcat](https://github.com/ckan/ckanext-dcat] + +## Documentation + +N/A + +## Change log + +See the [Releases](https://code-repo.d4science.org/CKAN-Extensions/FPD-harvester/releases) + +## Authors + +* **Francesco Mangiacrapa** ([ORCID](https://orcid.org/0000-0002-6528-664X)) Computer Scientist at [ISTI-CNR Infrascience Group](http://nemis.isti.cnr.it/groups/infrascience) + +## License + +This project is licensed under the EUPL V.1.1 License - see the [LICENSE.md](LICENSE.md) file for details. + + +## About the gCube Framework +This software is part of the [gCubeFramework](https://www.gcube-system.org/ "gCubeFramework"): an +open-source software toolkit used for building and operating Hybrid Data +Infrastructures enabling the dynamic deployment of Virtual Research Environments +by favouring the realisation of reuse oriented policies. + +The projects leading to this software have received funding from a series of European Union programmes including: + +- the Sixth Framework Programme for Research and Technological Development + - DILIGENT (grant no. 004260). +- the Seventh Framework Programme for research, technological development and demonstration + - D4Science (grant no. 212488); + - D4Science-II (grant no.239019); + - ENVRI (grant no. 283465); + - EUBrazilOpenBio (grant no. 288754); + - iMarine(grant no. 283644). +- the H2020 research and innovation programme + - BlueBRIDGE (grant no. 675680); + - EGIEngage (grant no. 654142); + - ENVRIplus (grant no. 654182); + - PARTHENOS (grant no. 654119); + - SoBigData (grant no. 654024); + - DESIRA (grant no. 818194); + - ARIADNEplus (grant no. 823914); + - RISIS2 (grant no. 824091); + - PerformFish (grant no. 727610); + - AGINFRAplus (grant no. 731001). + + diff --git a/ckanext/dcat/f2ds_harvester.py b/ckanext/dcat/f2ds_harvester.py new file mode 100644 index 0000000..8cb7dac --- /dev/null +++ b/ckanext/dcat/f2ds_harvester.py @@ -0,0 +1,783 @@ +import json +import os +import urllib2 +import urllib +import sys +import re +import time +from warnings import catch_warnings + +import rdflib +import rdflib.parser + +from rdflib import URIRef, BNode, Literal +from rdflib.namespace import Namespace, RDF +from ckanext.dcat.processors import RDFParser +from ckanext.dcat.profiles import EuropeanDCATAPProfile +from ckanext.dcat.profiles import RDFProfile +# needed for #workaround until the core translation function defaults to the Flask one +from paste.registry import Registry +from ckan.lib.cli import MockTranslator +from ckantoolkit import config +from pylons import translator +import copy +from ckan.plugins import toolkit + +import logging +from logging.handlers import RotatingFileHandler +from logging import handlers +from datetime import datetime, timedelta + +import ConfigParser + +# Log stdout Handler +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +log = logging.getLogger(__name__) + +# Log FILE handler +# LOGFILE = "/var/log/ckan/f2ds_harvester.log" +LOG_PATH = "/var/log/ckan/f2ds-log" +LOGFILE = LOG_PATH + "/f2ds_harvester.log" +# Create the log PATH folder if does not exist +if not os.path.exists(LOG_PATH): + os.makedirs(LOG_PATH) +fh = RotatingFileHandler(LOGFILE, maxBytes=(1048576 * 10), backupCount=7) +fh_formatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s") +fh.setLevel(logging.INFO) +fh.setFormatter(fh_formatter) +log.addHandler(fh) + +# create report_logger. It is filled with latest harvesting results +report_logger = logging.getLogger("f2ds.summary_latest_harvesting") +report_logger.setLevel(logging.INFO) +REPORT_HARVESTING_FILE = LOG_PATH + "/f2ds_summary_latest_harvesting.log" +# create file handler which logs even debug messages +fh_report = logging.FileHandler(REPORT_HARVESTING_FILE, mode='w') +fh_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +fh_report.setLevel(logging.INFO) +fh_report.setFormatter(fh_formatter) +report_logger.addHandler(fh_report) + +# Created by Francesco Mangiacrapa +# francesco.mangiacrapa@isti.cnr.it +# ISTI-CNR Pisa (ITALY) + +DCT = Namespace("http://purl.org/dc/terms/") +DCAT = Namespace("http://www.w3.org/ns/dcat#") +ADMS = Namespace("http://www.w3.org/ns/adms#") +VCARD = Namespace("http://www.w3.org/2006/vcard/ns#") +FOAF = Namespace("http://xmlns.com/foaf/0.1/") +SCHEMA = Namespace('http://schema.org/') +TIME = Namespace('http://www.w3.org/2006/time') +LOCN = Namespace('http://www.w3.org/ns/locn#') +GSP = Namespace('http://www.opengis.net/ont/geosparql#') +OWL = Namespace('http://www.w3.org/2002/07/owl#') +SPDX = Namespace('http://spdx.org/rdf/terms#') +RE3DATA = Namespace('http://www.re3data.org/schema/3-0#') + +namespaces = { + 'dct': DCT, + 'dcat': DCAT, + 'adms': ADMS, + 'vcard': VCARD, + 'foaf': FOAF, + 'schema': SCHEMA, + 'time': TIME, + 'locn': LOCN, + 'gsp': GSP, + 'owl': OWL, + 'spdx': SPDX, + 'r3d': RE3DATA, +} + +DATASET_FIELD_NAME = 'name' +DATASET_NAME_MAX_SIZE = 100 + +F2DS_HARVESTER_INI_PATH = '/etc/ckan/default/f2ds_harvester.ini' +F2DS_HARVESTER_INI_DEFAULT_SECTION = 'DEFAULT_CONFIGS' + + +def http_get_response_body(uri, dict_query_args=None, gcube_token=None): + if dict_query_args: + uri += "?{}".format(dict_query_args) + + headers = {} + if gcube_token: + headers["gcube-token"] = gcube_token + + log.info("Sending GET request to URL: %s" % uri) + req = urllib2.Request(uri, None, headers) + try: + resp = urllib2.urlopen(req, timeout=60) + except urllib2.HTTPError as e: + log.error("The request sent to the URL {} is failed".format(uri)) + log.error("HTTPError: %d" % e.code) + return None + except urllib2.URLError as e: + # Not an HTTP-specific error (e.g. connection refused) + log.error("URLError - Input URI: %s " % uri + " is not valid!!") + return None + else: + # 200 + body = resp.read() + return body + + +def copy_list_of_dict(list_source): + cpy_list = [] + for li in list_source: + d2 = copy.deepcopy(li) + cpy_list.append(d2) + + return cpy_list + + +def http_json_response_body(uri, data=None, gcube_token=None, method='POST'): + ''' + + :param uri: the uri + :param data: the json data to send + :param gcube_token: the gcube-token that must be used for contacting gCat serivce + :param method: the http method to perform the request, default is 'POST' + :return: the HTTP response returned as a dictionary of type dict(body='', status='', error='') + ''' + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + + if gcube_token: + headers["gcube-token"] = gcube_token + + ''' Appending the dataset name to URI as /[dataset-name]''' + if method == 'PUT': + data_json_dict = json.loads(data) + uri += '/' + data_json_dict['name'] + # data_json_dict['name'] = data_json_dict['name']+'new' + # data = json.dumps(data_json_dict).encode("utf-8") + + req = urllib2.Request(uri, data, headers) + + if method: + req.get_method = lambda: method + + log.info("Sending {} request to URL: {}".format(method, uri)) + log.debug("Headers: %s" % headers) + log.debug("data: %s" % data) + try: + resp = urllib2.urlopen(req, timeout=300) + response_status_code = resp.getcode() + except urllib2.HTTPError as e: + response_status_code = e.code + log.error("The request sent to the URL {} is failed".format(uri)) + log.error("HTTPError: {}. Error message: {}".format(e.code, e.read())) + return dict(body=None, status=response_status_code, error=e.read()) + except urllib2.URLError as e: + response_status_code = e.code + # Not an HTTP-specific error (e.g. connection refused) + log.error("URLError - Input URI: %s " % uri + " is not valid!!") + return dict(body=None, status=response_status_code, error=e.read()) + else: + # 200 + return dict(body=resp.read(), status=response_status_code, error=None) + + +def fill_in_to_gcat_dataset(rdf_catalogue, json_data, license_id='CC-BY-SA-4.0', system_type='Dataset'): + # data_json_array = json.dumps(json_data) + data_json_dict = json.loads(json_data) + log.debug("read json data {}".format(json_data)) + + dataset_name = None + try: + dataset_name = data_json_dict[DATASET_FIELD_NAME] + except KeyError as e: + log.info("The field " + DATASET_FIELD_NAME + " not found from source. KeyError {}".format(e)) + + if not dataset_name: + log.debug("Trying to generate the field 'name' from 'title'") + dataset_title = data_json_dict['title'] + if not dataset_title: + raise NameError('name and title fields are not defined') + + # The field 'name' must be between 2 and 100 characters long + # and contain only lowercase alphanumeric characters, '-' and '_'. + rescue_dataset_title = normalize_to_alphanumeric_string(dataset_title, '-') + rescue_dataset_title = rescue_dataset_title.lower() + dataset_name = (rescue_dataset_title[:DATASET_NAME_MAX_SIZE]) if len( + rescue_dataset_title) > DATASET_NAME_MAX_SIZE else rescue_dataset_title + log.info("dataset cleaned name is: {} ".format(dataset_name)) + + if not dataset_name: + raise NameError('field name not found and built') + + data_json_dict[DATASET_FIELD_NAME] = dataset_name + + # Managing tags + try: + dict_normalized_ckan_tags = dict_of_normalized_ckan_tags(data_json_dict['tags']) + # there are tag normalized + if len(dict_normalized_ckan_tags) > 0: + data_json_dict['tags'] = [] + for tag in dict_normalized_ckan_tags: + # adding to tags + norm_tag = dict_normalized_ckan_tags[tag] + data_json_dict['tags'].append({'name': norm_tag}) + # if tag is not equal to normalized tag value + if tag != norm_tag: + # adding to extras as keyword + data_json_dict['extras'].append({'key': 'keyword', 'value': tag}) + else: + # at least one tag is required for item creation on CKAN. Adding "No Tag" + data_json_dict['tags'] = [] + data_json_dict['tags'].append({'name': 'No Tag'}) + log.info("no tag found, adding 'No Tag' as tag because at least one tag is required") + + except Exception as e: + log.error("Error occurred during managing tags . Exception {}".format(e)) + + if not license_id: + license_id = 'CC-BY-SA-4.0' + + if not system_type: + system_type = 'Dataset' + + # adding system:type + data_json_dict['extras'].append({'key': 'system:type', 'value': system_type}) + + rescue_catalogue_title = rdf_catalogue.get_name() + if rescue_catalogue_title: + pattern = '[^a-zA-Z0-9_\-]+' + remove_white_spaces = True + rescue_catalogue_title = normalize_to_alphanumeric_string(rescue_catalogue_title, '-', pattern, remove_white_spaces) + else: + rescue_catalogue_title = "No Catalogue" + + log.info("rescue_catalogue_title is {} ".format(rescue_catalogue_title)) + + # adding catalogue_title + data_json_dict['extras'].append({'key': 'catalogue_title', 'value': rescue_catalogue_title}) + + # adding license + data_json_dict['license_id'] = license_id + return data_json_dict + + +# Returns a string containing only alphanumeric characters, '-' and '_'. +def normalize_to_alphanumeric_string(sting_value, replace_with='_', pattern='[^a-zA-Z0-9_\-]+', remove_white_spaces=False): + sanitize1 = re.sub(pattern, replace_with, sting_value) + if remove_white_spaces: + return re.sub(' ', replace_with, sanitize1) + + return sanitize1 + +# For any ckan tag found normalizes its value (as alphanumeric characters with '-' and '_') +# returns the dict {'tag': 'normalized-tag'} +def dict_of_normalized_ckan_tags(tag_array): + dict_normalized_ckan_tags = {} + # alphanumeric string with '-' and '_' and ' ' + pattern = '[^a-zA-Z0-9_\- ]+' + if tag_array: + for tag in tag_array: + tag_value = tag.get('name') + log.info("tag value is: {}".format(tag_value)) + n_tag_value = normalize_to_alphanumeric_string(tag_value, '_', pattern) + dict_normalized_ckan_tags[tag_value] = n_tag_value + + log.info("returning the dict of normalized tag: {}".format(dict_normalized_ckan_tags)) + return dict_normalized_ckan_tags + + +class RDFCatalogue(object): + + def __init__(self, catalogue_url, catalogue_name, catalogue_title, catalogue_description, list_dataset_urls): + self._url = catalogue_url + self._name = catalogue_name + self._title = catalogue_title + self._description = catalogue_description + self._list_dataset_urls = list_dataset_urls + + def get_url(self): + return self._url + + def get_name(self): + return self._name + + def get_list_dataset_urls(self): + return self._list_dataset_urls + + def get_title(self): + return self._title + + def get_description(self): + return self._description + + def __repr__(self): + return "RDFCatalogue(url: {0}, name: {1}, title: {2}, description: {3}, datasets: {4})".format(self._url, + self._name, + self._title, + self._description, + self._list_dataset_urls) + + +class DiscoverRDFCatalogue(object): + + def __init__(self, fdp_catalogues_endpoint, format_file): + self._fdp_catalogues_endpoint = fdp_catalogues_endpoint + self._format_file = format_file + + def discover_catalogues(self): + + g = rdflib.ConjunctiveGraph() + + for prefix, namespace in namespaces.iteritems(): + g.bind(prefix, namespace) + + list_catalogue_urls = [] + try: + catalogues_ref = URIRef(self._fdp_catalogues_endpoint) + parser = g.parse(source=self._fdp_catalogues_endpoint, format=self._format_file) + # print(parser.all_nodes()) + # loop through each triple in the graph (subj, pred, obj) + for subj, pred, obj in g: + # check if there is at least one triple in the Graph + log.debug("subj has {} pred has {} obj has {}".format(subj, pred, obj)) + if (subj, pred, obj) not in g: + raise Exception("It better be!") + + log.debug("graph has {} statements.".format(len(g))) + + for catalog in g.subject_objects(RE3DATA.dataCatalog): + log.debug("catalogue url found {}".format(catalog[1])) + list_catalogue_urls.append(str(catalog[1])) + + log.info("returning list of catalogue URLs {}".format(list_catalogue_urls)) + return list_catalogue_urls + + except Exception as inst: + log.error("Error on reading list of catalogues: " + str(inst)) + log.info("Returning empty list of catalogues") + return list_catalogue_urls + + def read_catalogue(self, catalogue_url): + + g = rdflib.ConjunctiveGraph() + + for prefix, namespace in namespaces.iteritems(): + g.bind(prefix, namespace) + + list_dataset_urls = [] + try: + catalog_ref = URIRef(catalogue_url) + parser = g.parse(source=catalogue_url, format=self._format_file) + # print(parser.all_nodes()) + # loop through each triple in the graph (subj, pred, obj) + for subj, pred, obj in g: + # check if there is at least one triple in the Graph + log.debug("subj has {} pred has {} obj has {}".format(subj, pred, obj)) + if (subj, pred, obj) not in g: + raise Exception("It better be!") + + log.debug("graph has {} statements.".format(len(g))) + + catalogue_name = None + for name in g.subject_objects(FOAF.name): + catalogue_name = str(name[1]) + log.info("catalogue name found: {}".format(catalogue_name)) + break + + catalogue_title = None + for title in g.subject_objects(DCT.title): + catalogue_title = str(title[1]) + log.info("catalogue title found: {}".format(catalogue_title)) + break + + catalogue_description = None + for description in g.subject_objects(DCT.description): + catalogue_description = str(description[1]) + log.info("catalogue description found: {}".format(catalogue_description)) + break + + for dataset in g.subject_objects(DCAT.dataset): + log.info("dataset url found: {}".format(dataset[1])) + list_dataset_urls.append(str(dataset[1])) + + log.info("returning list of dataset URLs {}".format(list_dataset_urls)) + return RDFCatalogue(catalogue_url, catalogue_name, catalogue_title, catalogue_description, + list_dataset_urls) + + except Exception as inst: + log.error("Error on reading catalogue: " + str(inst)) + log.info("Returning empty list of datasets") + return RDFCatalogue(catalogue_url, "", []) + + def to_ckan_mapping(self, dataset_body, format): + parser = RDFParser() + parser._profiles = [EuropeanDCATAPProfile, F2DSDCATAPProfile] + parser.parse(dataset_body, _format=format) + indent = 4 + ckan_datasets = [d for d in parser.datasets()] + # only one exists + ckan_dataset = ckan_datasets[0] + log.debug("ckan_dataset is {}".format(ckan_dataset)) + # ckan_dataset = self._ckan_dataset_extras_strip(ckan_dataset) + # print(json.dumps(ckan_dataset, indent=indent)) + # print(json.dumps(ckan_datasets, indent=indent)) + return ckan_dataset + + def _ckan_dataset_extras_strip(self, _dict): + ''' + Returns dict of extras stripped + ''' + try: + new_dict_extras = [] + for extra in _dict.get('extras', []): + k, v = extra['key'], extra['value'] + if isinstance(v, str): + v = str(v).strip() + new_dict_extras.append({k, v}) + + _dict['extras'] = new_dict_extras + return _dict + except Exception as inst: + log.error("Error _ckan_dataset_extras_strip: " + str(inst)) + + +class F2DSDCATAPProfile(RDFProfile): + ''' + An RDF profile for the F2DS. Implemented by Francesco Mangiacrapa at ISTI-CNR + + Called before of the European DCAT-AP profile (`euro_dcat_ap`) + ''' + + def _set_extras_dict_value(self, _dict, key, value, default=None): + ''' + Set the value for the given key on a CKAN dict `extras` + Return the new dict + ''' + + new_dict_extras = [] + for extra in _dict.get('extras', []): + new_dict_extras.append(extra) + if extra['key'] == key or extra['key'] == 'dcat_' + key: + extra['value'] = value + new_dict_extras.append(extra) + + return new_dict_extras + + def parse_dataset(self, dataset_dict, dataset_ref): + log.info("parse_dataset into F2DSDCATAPProfile") + + document_format = "application/rdf+xml" + + # Resetting list of resources + dataset_dict['resources'] = [] + # Distributions + distribution_index = 0 + # external distribution referred in RDF dataset + for distribution_url in self.g.objects(dataset_ref, DCAT.distribution): + try: + distribution_index += 1 + log.info("\t## {} ## Read distribution url {} for dataset url {}".format(distribution_index, distribution_url, dataset_ref)) + + graph = rdflib.ConjunctiveGraph() + + for prefix, namespace in namespaces.iteritems(): + graph.bind(prefix, namespace) + + graph.parse(URIRef(distribution_url), document_format) + self.g.add((URIRef(distribution_url), RDF.type, DCAT.Distribution)) + self.g += graph + log.info("\t\t## Added distribution url {} to graph".format(distribution_url)) + except Exception as inst: + error = str(inst) + log.error("Error on resolving distribution_url {}. Error: {}".format(distribution_url, error)) + + distributions = [] + + # inline distribution referred in RDF dataset + for distribution in self.g.subjects(RDF.type, DCAT.Distribution): + distributions.append(distribution) + + distribution_index = 0 + for distribution in distributions: + distribution_index += 1 + log.info("\t## {} ## mapping distribution {} to CKAN resource".format(distribution_index, str(distribution))) + + resource_dict = {} + + # Simple values + for key, predicate in ( + ('name', DCT.title), + ('description', DCT.description), + ('access_url', DCAT.accessURL), + ('download_url', DCAT.downloadURL), + ('issued', DCT.issued), + ('modified', DCT.modified), + ('status', ADMS.status), + ('rights', DCT.rights), + ('license', DCT.license), + ): + value = self._object_value(distribution, predicate) + if value: + resource_dict[key] = value + + resource_dict['url'] = (self._object_value(distribution, + DCAT.downloadURL) or + self._object_value(distribution, + DCAT.accessURL)) + # Lists + for key, predicate in ( + ('language', DCT.language), + ('documentation', FOAF.page), + ('conforms_to', DCT.conformsTo), + ): + values = self._object_value_list(distribution, predicate) + if values: + resource_dict[key] = json.dumps(values) + + # Format and media type + normalize_ckan_format = config.get( + 'ckanext.dcat.normalize_ckan_format', True) + imt, label = self._distribution_format(distribution, + normalize_ckan_format) + + if imt: + resource_dict['mimetype'] = imt + + if label: + resource_dict['format'] = label + elif imt: + resource_dict['format'] = imt + + # Size + size = self._object_value_int(distribution, DCAT.byteSize) + if size is not None: + resource_dict['size'] = size + + # Checksum + for checksum in self.g.objects(distribution, SPDX.checksum): + algorithm = self._object_value(checksum, SPDX.algorithm) + checksum_value = self._object_value(checksum, SPDX.checksumValue) + if algorithm: + resource_dict['hash_algorithm'] = algorithm + if checksum_value: + resource_dict['hash'] = checksum_value + + # Distribution URI (explicitly show the missing ones) + resource_dict['uri'] = (unicode(distribution) + if isinstance(distribution, + rdflib.term.URIRef) + else '') + + dataset_dict['resources'].append(resource_dict) + + # Spatial + spatial = self._spatial(dataset_ref, DCT.spatial) + log.debug("spatial {}".format(str(spatial))) + for key in ('uri', 'text', 'geom'): + if spatial.get(key): + the_key = 'spatial_{0}'.format(key) if key != 'geom' else 'spatial' + the_value = spatial.get(key).strip() + log.info("overriding spatial value to: {}".format(str(the_value))) + dataset_dict = self._set_extras_dict_value(dataset_dict, the_key, the_value) + # dataset_dict['extras'].append( + # {'key': 'spatial_{0}'.format(key) if key != 'geom' else 'spatial', + # 'value': spatial.get(key).strip()}) + + return dataset_dict + + + +log.info("F2DS harvester script starts...") +# Reading Configurations +if not os.path.exists(F2DS_HARVESTER_INI_PATH): + log.info("File '{}' not found, F2DS harvester exit".format(F2DS_HARVESTER_INI_PATH)) + sys.exit('No file with f2ds configurations found') + +f2ds_config = ConfigParser.ConfigParser() +# parse f2ds_harvester.ini file +f2ds_config.read(F2DS_HARVESTER_INI_PATH) +config_dict = dict(f2ds_config.items(F2DS_HARVESTER_INI_DEFAULT_SECTION)) + +fdp_catalogues_endpoint = None +format_file = None +license_id = None +gcat_endpoint = None +gcube_meta_system_type = None +gcube_token = None +try: + log.info("From {} read configs {}".format(F2DS_HARVESTER_INI_DEFAULT_SECTION, config_dict)) + format_file = config_dict['format_file'] + license_id = config_dict['default_license_id'] + gcat_endpoint = config_dict['gcat_endpoint'] + gcube_meta_system_type = config_dict['gcube_metadata_profile_system_type'] + gcube_token = config_dict['jarvis_token'] + fdp_catalogues_endpoint = config_dict['fdp_catalogues_endpoint'] + + log.info("F2DS harvester is running with configurations:") + log.info("fdp_catalogues_endpoint: {}".format(fdp_catalogues_endpoint)) + log.info("gcat: {}".format(gcat_endpoint)) + masked_token = gcube_token[0:10]+"_MASKED_TOKEN" + log.info("gcube_token: {}".format(masked_token)) + log.info("gcube_meta_system_type: {}".format(gcube_meta_system_type)) +except Exception as inst: + log.error("Error on reading configurations: " + str(inst)) + +# Running +if not (fdp_catalogues_endpoint): + fdp_catalogues_endpoint = "https://f2ds.eosc-pillar.eu/expanded?format=rdf" + log.warn("Using hard coded endpoint {}".format(fdp_catalogues_endpoint)) + +discover_RDFC = DiscoverRDFCatalogue(fdp_catalogues_endpoint, format_file) +list_catalogues_urls = discover_RDFC.discover_catalogues() + +countOperations = 0 +countCatalogues = 0 +countCreate = 0 +countUpdate = 0 +countDatasetError = 0 +countCatalogueError = 0 +for catalogue_url in list_catalogues_urls: + try: + log.debug("rdf catalogue URL {}".format(catalogue_url)) + countCatalogues += 1 + report_logger.info("CTG.{} Harvesting Catalogue: {}".format(countCatalogues, catalogue_url)) + rdf_catalogue = discover_RDFC.read_catalogue(catalogue_url) + log.info("from {} got {}".format(catalogue_url, rdf_catalogue)) + + # Workaround until the core translation function defaults to the Flask one + registry = Registry() + registry.prepare() + registry.register(translator, MockTranslator()) + + dataset_body_to_ckan_list = [] + for dataset_url in rdf_catalogue.get_list_dataset_urls(): + dataset_body = http_get_response_body(dataset_url, 'format=rdf') + if dataset_body is None: + countDatasetError += 1 + report_logger.info("Error, no body for dataset: {}".format(dataset_url)) + continue + + log.debug("from {} read dataset_body {}".format(dataset_url, dataset_body)) + ckan_dataset = discover_RDFC.to_ckan_mapping(dataset_body, "application/rdf+xml") + dataset_body_to_ckan_list.append(ckan_dataset) + + loop_index = 0 + loop_size = len(dataset_body_to_ckan_list) + for ckan_dataset_body in dataset_body_to_ckan_list: + try: + loop_index += 1 + log.info('\n\nManaging publishing {} of {}'.format(loop_index, loop_size)) + log.debug('ckan_dataset_body {}'.format(ckan_dataset_body)) + # data = str(ckan_dataset_body).encode('utf-8').strip() + # json.dumps takes in a json object and returns a string. + json_data = json.dumps(ckan_dataset_body) + json_data_dict = fill_in_to_gcat_dataset(rdf_catalogue, json_data, license_id, gcube_meta_system_type) + # just swapping the dataset_name for logging into report + dataset_name = json_data_dict[DATASET_FIELD_NAME] + log.debug("Dataset name is {}".format(dataset_name)) + # encoding in utf-8 + json_data = json.dumps(json_data_dict).encode("utf-8") + log.debug('json_data {}'.format(json_data)) + response = http_json_response_body(gcat_endpoint + '/items', json_data, gcube_token, method='PUT') + code_status = response["status"] + if 200 <= code_status <= 208: + log.info("Update OK. Response returned status {}".format(code_status)) + report_logger.info("DTS.{} Updated dataset: {}".format(loop_index, dataset_name)) + countUpdate += 1 + else: + log.info("Update request failed (with PUT request). Response returned status {}".format( + code_status)) + if code_status == 404: + log.info("Update failed with code {}. Submitting the create with POST request".format(code_status)) + response = http_json_response_body(gcat_endpoint + '/items', json_data, gcube_token, method='POST') + code_status = response["status"] + log.info("POST response returned with status {}".format(code_status)) + report_logger.info("DTS.{} Created dataset: {}".format(loop_index, dataset_name)) + countCreate += 1 + else: + countDatasetError += 1 + report_logger.warn("Not created/updated the dataset: {}".format(dataset_name)) + + except Exception as inst: + log.error("Error on creating dataset: {}. Error: ".format(dataset_name) + str(inst)) + report_logger.info("Dataset error: {}".format(dataset_name) + str(inst)) + countDatasetError += 1 + + countOperations += 1 + + if loop_index < loop_size: + log.info("sleeping 2 seconds...") + time.sleep(2) + log.info("running") + except Exception as inst: + log.error("Error on harvesting catalogue: {}. Error: ".format(catalogue_url) + str(inst)) + report_logger.info("Catalogue error: {}".format(catalogue_url) + str(inst)) + countCatalogueError += 1 + +log.info("F2DS harvesting stage is finished. Operations (create/update) done are: " + str(countOperations)) +report_logger.info("Catalogue/s found: {}".format(countCatalogues)) +report_logger.info("Total # operations: {}".format(countOperations)) +report_logger.info("Datasets created: {}".format(countCreate)) +report_logger.info("Datasets updated: {}".format(countUpdate)) +report_logger.info("Datasets errors: {}".format(countDatasetError)) +report_logger.info("Catalogues errors: {}".format(countCatalogueError)) + +log.info("Going to resync DB -> Solr, with search-index rebuild -r") +os.system( + '/usr/lib/ckan/default/bin/paster --plugin=ckan search-index rebuild -r --config=/etc/ckan/default/production.ini') +log.info("Resync DB -> Solr, done.") + +# Running +if countDatasetError == 0: + log.info("No error detected going to call bulk deletion...") + report_logger.info("Bulk deletion stage starts...") + + '''Building yesterday UTC time as YESTERDAY:T23:59:59Z''' + today = datetime.utcnow().date() + yesterday = today - timedelta(days=1) + utc_time_suffix = 'T23:59:59Z' + yesterday_time = yesterday.__str__() + utc_time_suffix + + '''Building query string for bulk purge invocation on gCat''' + query_string = {"q": "extras_systemtype:{}".format(gcube_meta_system_type), + "fq": "metadata_modified:[* TO {}]".format(yesterday_time), + "own_only": True} + safe_query_string = urllib.urlencode(query_string) + + uri = gcat_endpoint + '/items?{}'.format(safe_query_string) + report_logger.info("Calling {}".format(uri)) + response = http_json_response_body(uri, None, gcube_token, method='PURGE') + report_logger.info("\nBulk deletion called for {}".format(uri)) + log.info("\nBulk deletion called correctly with parameters {}".format(query_string)) +else: + report_logger.info("\nDatasets error/s found, so skipping bulk deletion stage") + log.info("\nDatasets error/s found, so skipping bulk deletion stage") + +log.info("\n\nF2DS harvester terminated!") +sys.exit(0) + + +# log.info("F2DS harvester testing dataset url converting...") +# # Workaround until the core translation function defaults to the Flask one +# registry = Registry() +# registry.prepare() +# registry.register(translator, MockTranslator()) +# dataset_url = "https://f2ds.eosc-pillar.eu/dataset/80b25b6f-2bc5-4c15-955e-13b99d226333" +# #dataset_url = "https://f2ds.eosc-pillar.eu/dataset/6220d4be-add2-4541-95d5-c32f23aeb6ab" +# #dataset_url = "https://f2ds.eosc-pillar.eu/dataset/2bcaa8f6-b0d9-4fac-93e1-174dd33441d9" +# #dataset_url = "https://f2ds.eosc-pillar.eu/dataset/3cde15cf-2a7e-4fd8-b78d-e290b54511c2" +# log.debug("Testing dataset_url {}".format(dataset_url)) +# dataset_body = http_get_response_body(dataset_url, 'format=rdf') +# log.debug("from {} read dataset_body {}".format(dataset_url, dataset_body)) +# ckan_dataset = discover_RDFC.to_ckan_mapping(dataset_body, "application/rdf+xml") +# log.debug("converted dataset is {}".format(ckan_dataset)) +# +# json_data = json.dumps(ckan_dataset) +# log.info("JSON data is {}".format(json_data)) +# json_data_dict = fill_in_to_gcat_dataset("Test catalogue", json_data, license_id, gcube_meta_system_type) +# # encoding in utf-8 +# json_data = json.dumps(json_data_dict).encode("utf-8") +# log.debug("Dataset JSON is {}".format(json_data)) +# log.info("\n\nF2DS harvester testing dataset url, terminated!") +# sys.exit(0) diff --git a/ckanext/dcat/f2ds_harvester_run.sh b/ckanext/dcat/f2ds_harvester_run.sh new file mode 100644 index 0000000..4d3abee --- /dev/null +++ b/ckanext/dcat/f2ds_harvester_run.sh @@ -0,0 +1,6 @@ +#!/bin/bash +# Let's call this by venv +echo "Running /usr/lib/ckan/default/src/ckanext-dcat/ckanext/dcat/f2ds_harvester.py (with virtualenv enabled, using python from /usr/lib/ckan/default/bin/python)" +/usr/lib/ckan/default/bin/python /usr/lib/ckan/default/src/ckanext-dcat/ckanext/dcat/f2ds_harvester.py & + +exit 0