minor changes to fix relative paths
This commit is contained in:
parent
3353c08405
commit
efdbc76181
|
@ -27,24 +27,19 @@ def testValidJSON(x):
|
||||||
except:
|
except:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
conf = SparkConf().setAppName('fairshare').setMaster('local[*]')
|
conf = SparkConf().setAppName('Create JSON Dump').setMaster('local[*]')
|
||||||
sc = SparkContext(conf=conf)
|
sc = SparkContext(conf=conf)
|
||||||
spark = SparkSession.builder.config(conf=conf).getOrCreate()
|
spark = SparkSession.builder.config(conf=conf).getOrCreate()
|
||||||
|
|
||||||
deleteDir("datasources")
|
# RAW DUMPS PATH
|
||||||
|
fairsharing_path = "../../data/raw/fairsharing_dump_api_02_2022.json"
|
||||||
|
opendoar_path = "../../data/raw/openDoar.tsv"
|
||||||
|
re3data_path = "../../data/raw/re3data.tsv"
|
||||||
|
roar_path = "../../data/raw/export_roar_CSV.csv"
|
||||||
|
|
||||||
fairsharing_path = "../data/raw/fairsharing_dump_api_02_2022.json"
|
output_path = "../../data/interim/datasources"
|
||||||
opendoar_path = "../data/raw/openDoar.tsv"
|
|
||||||
re3data_path = "../data/raw/re3data.tsv"
|
|
||||||
roar_path = "../data/raw/export_roar_CSV.csv"
|
|
||||||
|
|
||||||
# #FAIRSHARING V1.0
|
deleteDir(output_path)
|
||||||
# #FILE STRUCTURE: Full name of record | Short name | FAIRsharing URL | Homepage URL of the resource | Country Field | Subjects
|
|
||||||
# fairsharing = sc.textFile("/Users/miconis/Desktop/Fairsharing dedup/registries_analysis/data/raw/FAIRsharingDBrec_summary20210304.csv")
|
|
||||||
# fairsharing = fairsharing.filter(lambda x: " FAIRsharing URL " not in x.split("|")).map(lambda x: dict(fullname=x.split("|")[0], shortname=x.split("|")[1], fairshare_url=x.split("|")[2], homepage_url=x.split("|")[3], country=x.split("|")[4], subjects=x.split("|")[5]))
|
|
||||||
# fairsharing = fairsharing.map(lambda x: dict(id=x['fairshare_url'], originalId=x['fairshare_url'], websiteurl=x['homepage_url'], name=x['fullname'], alternativeNames=[x['shortname']], collectedfrom="FAIRsharing"))
|
|
||||||
#
|
|
||||||
# print "FAIRSHARING:" + str(fairsharing.count())
|
|
||||||
|
|
||||||
#FAIRSHARING (file structure: )
|
#FAIRSHARING (file structure: )
|
||||||
fairsharing = sc.textFile(fairsharing_path).filter(testValidJSON).map(json.loads)
|
fairsharing = sc.textFile(fairsharing_path).filter(testValidJSON).map(json.loads)
|
||||||
|
@ -73,4 +68,4 @@ print "ROAR:" + str(roar.count())
|
||||||
|
|
||||||
all_ds = fairsharing.union(opendoar).union(re3data).union(roar)
|
all_ds = fairsharing.union(opendoar).union(re3data).union(roar)
|
||||||
|
|
||||||
all_ds.map(json.dumps).saveAsTextFile("datasources")
|
all_ds.map(json.dumps).saveAsTextFile(output_path)
|
||||||
|
|
|
@ -21,11 +21,13 @@ def relToRe3data(x):
|
||||||
rel.append((re3data, id))
|
rel.append((re3data, id))
|
||||||
return rel
|
return rel
|
||||||
|
|
||||||
conf = SparkConf().setAppName('FAIRsharing').setMaster('local[*]')
|
conf = SparkConf().setAppName('Create Dedup CSV').setMaster('local[*]')
|
||||||
sc = SparkContext(conf=conf)
|
sc = SparkContext(conf=conf)
|
||||||
spark = SparkSession.builder.config(conf=conf).getOrCreate()
|
spark = SparkSession.builder.config(conf=conf).getOrCreate()
|
||||||
|
|
||||||
datasources = sc.textFile("datasources").map(json.loads)
|
input_path = "../../data/interim/datasources"
|
||||||
|
|
||||||
|
datasources = sc.textFile(input_path).map(json.loads)
|
||||||
|
|
||||||
print("TOTAL DS", datasources.count())
|
print("TOTAL DS", datasources.count())
|
||||||
print("ROAR", datasources.filter(lambda x: "roar" in x['collectedfrom']).count())
|
print("ROAR", datasources.filter(lambda x: "roar" in x['collectedfrom']).count())
|
||||||
|
@ -44,7 +46,7 @@ joinRes = joinRes.map(lambda x: dict(dedup_id=x[1][0], duplicate_id=x[0], origin
|
||||||
|
|
||||||
#CREATE CSV FILE (file structure: dedup_id; duplicate_id; original_id; name; collectedfrom
|
#CREATE CSV FILE (file structure: dedup_id; duplicate_id; original_id; name; collectedfrom
|
||||||
csv_name = "ds_dedup_" + str(datetime.now()).split(".")[0].replace(" ", "_").replace(":", ".") + ".csv"
|
csv_name = "ds_dedup_" + str(datetime.now()).split(".")[0].replace(" ", "_").replace(":", ".") + ".csv"
|
||||||
f = open(csv_name, "w")
|
f = open("../../data/processed/" + csv_name, "w")
|
||||||
f.write("dedup_id;duplicate_id;original_id;name;collectedfrom\n")
|
f.write("dedup_id;duplicate_id;original_id;name;collectedfrom\n")
|
||||||
for row in joinRes.collect():
|
for row in joinRes.collect():
|
||||||
line = row['dedup_id'] + ";" + row['duplicate_id'] + ";" + row['original_id'] + ";\"" + row['name'] + "\";" + row['collectedfrom'] + "\n"
|
line = row['dedup_id'] + ";" + row['duplicate_id'] + ";" + row['original_id'] + ";\"" + row['name'] + "\";" + row['collectedfrom'] + "\n"
|
||||||
|
|
Loading…
Reference in New Issue