script to create the csv file basing on the mergerels, generation of the mergerels and the deduplication csv
This commit is contained in:
parent
b31e97f71e
commit
594ba0e1c7
File diff suppressed because it is too large
Load Diff
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1,52 @@
|
||||||
|
# coding=utf-8
|
||||||
|
from pyspark import SparkContext
|
||||||
|
from pyspark import SparkConf
|
||||||
|
from pyspark.sql import SparkSession
|
||||||
|
import json
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
def isRe3data(x):
|
||||||
|
for id in x:
|
||||||
|
if 're3data' in id:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def relToRe3data(x):
|
||||||
|
rel = []
|
||||||
|
re3data = ''
|
||||||
|
for id in x:
|
||||||
|
if 're3data' in id:
|
||||||
|
re3data = id
|
||||||
|
for id in x:
|
||||||
|
rel.append((re3data, id))
|
||||||
|
return rel
|
||||||
|
|
||||||
|
conf = SparkConf().setAppName('FAIRsharing').setMaster('local[*]')
|
||||||
|
sc = SparkContext(conf=conf)
|
||||||
|
spark = SparkSession.builder.config(conf=conf).getOrCreate()
|
||||||
|
|
||||||
|
datasources = sc.textFile("datasources").map(json.loads)
|
||||||
|
|
||||||
|
print("TOTAL DS", datasources.count())
|
||||||
|
print("ROAR", datasources.filter(lambda x: "roar" in x['collectedfrom']).count())
|
||||||
|
print("FAIRsharing", datasources.filter(lambda x: "FAIRsharing" in x['collectedfrom']).count())
|
||||||
|
print("OpenDOAR", datasources.filter(lambda x: "OpenDOAR" in x['collectedfrom']).count())
|
||||||
|
print("re3data", datasources.filter(lambda x: "re3data" in x['collectedfrom']).count())
|
||||||
|
mergerels = spark.read.load("mergerels").rdd
|
||||||
|
print("Number of mergerels", mergerels.count())
|
||||||
|
|
||||||
|
#TO RE3DATA
|
||||||
|
# mergerels = mergerels.map(lambda x: (x['source'], x['target'])).groupByKey().filter(lambda x: isRe3data(x[1])).flatMap(lambda x: relToRe3data(x[1])).map(lambda x: dict(source=x[0], target=x[1]))
|
||||||
|
|
||||||
|
joinRes = mergerels.map(lambda x: (x['target'], x['source'])).join(datasources.map(lambda x: (x['id'], x)))
|
||||||
|
|
||||||
|
joinRes = joinRes.map(lambda x: dict(dedup_id=x[1][0], duplicate_id=x[0], original_id=x[1][1]['originalId'], name=x[1][1]['name'].replace("\"",""), collectedfrom=x[1][1]['collectedfrom'])).sortBy(lambda x: x['dedup_id'])
|
||||||
|
|
||||||
|
#CREATE CSV FILE (file structure: dedup_id; duplicate_id; original_id; name; collectedfrom
|
||||||
|
csv_name = "ds_dedup_" + str(datetime.now()).split(".")[0].replace(" ", "_").replace(":", ".") + ".csv"
|
||||||
|
f = open(csv_name, "w")
|
||||||
|
f.write("dedup_id;duplicate_id;original_id;name;collectedfrom\n")
|
||||||
|
for row in joinRes.collect():
|
||||||
|
line = row['dedup_id'] + ";" + row['duplicate_id'] + ";" + row['original_id'] + ";\"" + row['name'] + "\";" + row['collectedfrom'] + "\n"
|
||||||
|
f.write(line.encode("utf-8"))
|
||||||
|
f.close()
|
Loading…
Reference in New Issue