76 lines
6.0 KiB
Python
76 lines
6.0 KiB
Python
# coding=utf-8
|
|
from pyspark import SparkContext
|
|
from pyspark import SparkConf
|
|
from pyspark.sql import SparkSession
|
|
import json
|
|
#!/usr/bin/python
|
|
import os
|
|
import sys
|
|
import shutil
|
|
|
|
def deleteDir(x):
|
|
try:
|
|
shutil.rmtree(x)
|
|
except OSError as e:
|
|
print("Error: %s - %s." % (e.filename, e.strerror))
|
|
|
|
def stringToArray(x):
|
|
try:
|
|
return eval(x)
|
|
except:
|
|
return []
|
|
|
|
def testValidJSON(x):
|
|
try:
|
|
json.loads(x)
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
conf = SparkConf().setAppName('fairshare').setMaster('local[*]')
|
|
sc = SparkContext(conf=conf)
|
|
spark = SparkSession.builder.config(conf=conf).getOrCreate()
|
|
|
|
deleteDir("datasources")
|
|
|
|
fairsharing_path = "../data/raw/fairsharing_dump_api_02_2022.json"
|
|
opendoar_path = "../data/raw/openDoar.tsv"
|
|
re3data_path = "../data/raw/re3data.tsv"
|
|
roar_path = "../data/raw/export_roar_CSV.csv"
|
|
|
|
# #FAIRSHARING V1.0
|
|
# #FILE STRUCTURE: Full name of record | Short name | FAIRsharing URL | Homepage URL of the resource | Country Field | Subjects
|
|
# fairsharing = sc.textFile("/Users/miconis/Desktop/Fairsharing dedup/registries_analysis/data/raw/FAIRsharingDBrec_summary20210304.csv")
|
|
# fairsharing = fairsharing.filter(lambda x: " FAIRsharing URL " not in x.split("|")).map(lambda x: dict(fullname=x.split("|")[0], shortname=x.split("|")[1], fairshare_url=x.split("|")[2], homepage_url=x.split("|")[3], country=x.split("|")[4], subjects=x.split("|")[5]))
|
|
# fairsharing = fairsharing.map(lambda x: dict(id=x['fairshare_url'], originalId=x['fairshare_url'], websiteurl=x['homepage_url'], name=x['fullname'], alternativeNames=[x['shortname']], collectedfrom="FAIRsharing"))
|
|
#
|
|
# print "FAIRSHARING:" + str(fairsharing.count())
|
|
|
|
#FAIRSHARING (file structure: )
|
|
fairsharing = sc.textFile(fairsharing_path).filter(testValidJSON).map(json.loads)
|
|
fairsharing = fairsharing.map(lambda x: dict(id=x['attributes']['url'], originalId=x['id'], websiteurl=x['attributes']['metadata']['homepage'], name=x['attributes']['metadata']['name'], alternativeNames=[x['attributes']['abbreviation']], collectedfrom="FAIRsharing"))
|
|
print("FAIRSHARING DS", fairsharing.count())
|
|
|
|
#OPENDOAR (file structure: OpenAIREID orgIdentifier repositoryName alternativeNames repositoryURL description type updateDate startDate subject contentType institution metadataPolicy dataPolicy submissionPolicy contentPolicy software api)
|
|
opendoar = sc.textFile(opendoar_path)
|
|
opendoar = opendoar.filter(lambda x: "system_metadata.id" not in x.split("\t")).map(lambda x: dict(fullname=x.split("\t")[1], shortname=x.split("\t")[1], alternativeNames=x.split("\t")[2], opendoar_id=x.split("\t")[0], homepage_url=x.split("\t")[3], subjects=x.split("\t")[9], openaire_id="opendoar::"+x.split("\t")[0]))
|
|
opendoar = opendoar.map(lambda x: dict(id=x['openaire_id'], originalId=x['opendoar_id'], websiteurl=x['homepage_url'], name=json.loads(x['fullname'])['name'], alternativeNames=x['alternativeNames'], collectedfrom="OpenDOAR"))
|
|
print("OPENDOAR DS", opendoar.count())
|
|
|
|
#RE3DATA (file structure: openaire_id re3data_id repository_name additional_name repository_url repository_id description type size update_date start_date end_date subject mission_statement content_type provider_type keyword institution policy database_access database_license data_access data_license data_upload data_upload_license software versioning api pid_system citation_guideline_url aid_system enhanced_publication quality_management certificate metadata_standard syndication remarks entry_date last_update)
|
|
re3data = sc.textFile(re3data_path)
|
|
re3data = re3data.filter(lambda x: "repositoryName.language" not in x.split("\t")).map(lambda x: dict(fullname=x.split("\t")[1], shortname=x.split("\t")[3], re3data_id=x.split("\t")[0], homepage_url=x.split("\t")[4], subjects=x.split("\t")[14], openaire_id="re3data::"+x.split("\t")[0]))
|
|
re3data = re3data.map(lambda x: dict(id=x['openaire_id'], originalId=x['re3data_id'], websiteurl=x['homepage_url'], name=x['fullname'], alternativeNames=stringToArray(x['shortname']), collectedfrom="re3data"))
|
|
print("RE3DATA", re3data.count())
|
|
|
|
#ROAR (file structure: "eprintid","rev_number","eprint_status","userid","importid","source","dir","datestamp","lastmod","status_changed","type","succeeds","commentary","metadata_visibility","latitude","longitude","relation_type","relation_uri","item_issues_id","item_issues_type","item_issues_description","item_issues_timestamp","item_issues_status","item_issues_reported_by","item_issues_resolved_by","item_issues_comment","item_issues_count","sword_depositor","sword_slug","exemplar","home_page","title","oai_pmh","sword_endpoint","rss_feed","twitter_feed","description","fulltext","open_access","mandate","organisation_title","organisation_home_page","location_country","location_city","location_latitude","location_longitude","software","geoname","version","subjects","date","note","suggestions","activity_low","activity_medium","activity_high","recordcount","recordhistory","fulltexts_total","fulltexts_docs","fulltexts_rtotal","fulltexts_rdocs","registry_name","registry_id","submit_to","submitted_to_name","submitted_to_done","webometrics_rank","webometrics_size","webometrics_visibility","webometrics_rich_files","webometrics_scholar","monthly_deposits","total_deposits","association")
|
|
roar = sc.textFile(roar_path)
|
|
roar = roar.filter(lambda x: "\"eprintid\"" not in x.split(",")).filter(lambda x: len(x.split(","))>31).map(lambda x: dict(id=x.split(",")[0], homepage_url=x.split(",")[30], title=x.split(",")[31])).filter(lambda x: x['title'] != "" and x['homepage_url'] != "")
|
|
roar = roar.map(lambda x: dict(id="roar::"+x['id'].replace("\"",""), originalId=x['id'].replace("\"",""), websiteurl=x['homepage_url'].replace("\"",""), name=x['title'].replace("\"", ""), alternativeNames=[], collectedfrom="roar"))
|
|
roar = roar.filter(lambda x: x['id'] != "" and x['name'] != "")
|
|
roar = roar.map(lambda x: (x['id'], x)).reduceByKey(lambda x, y: x).map(lambda x: x[1])
|
|
print "ROAR:" + str(roar.count())
|
|
|
|
all_ds = fairsharing.union(opendoar).union(re3data).union(roar)
|
|
|
|
all_ds.map(json.dumps).saveAsTextFile("datasources") |