# coding=utf-8 from pyspark import SparkContext from pyspark import SparkConf from pyspark.sql import SparkSession import json from datetime import datetime def isRe3data(x): for id in x: if 're3data' in id: return True return False def relToRe3data(x): rel = [] re3data = '' for id in x: if 're3data' in id: re3data = id for id in x: rel.append((re3data, id)) return rel conf = SparkConf().setAppName('Create Dedup CSV').setMaster('local[*]') sc = SparkContext(conf=conf) spark = SparkSession.builder.config(conf=conf).getOrCreate() input_path = "../../data/interim/datasources" datasources = sc.textFile(input_path).map(json.loads) print("TOTAL DS", datasources.count()) print("ROAR", datasources.filter(lambda x: "roar" in x['collectedfrom']).count()) print("FAIRsharing", datasources.filter(lambda x: "FAIRsharing" in x['collectedfrom']).count()) print("OpenDOAR", datasources.filter(lambda x: "OpenDOAR" in x['collectedfrom']).count()) print("re3data", datasources.filter(lambda x: "re3data" in x['collectedfrom']).count()) mergerels = spark.read.load("mergerels").rdd print("Number of mergerels", mergerels.count()) #TO RE3DATA # mergerels = mergerels.map(lambda x: (x['source'], x['target'])).groupByKey().filter(lambda x: isRe3data(x[1])).flatMap(lambda x: relToRe3data(x[1])).map(lambda x: dict(source=x[0], target=x[1])) joinRes = mergerels.map(lambda x: (x['target'], x['source'])).join(datasources.map(lambda x: (x['id'], x))) joinRes = joinRes.map(lambda x: dict(dedup_id=x[1][0], duplicate_id=x[0], original_id=x[1][1]['originalId'], name=x[1][1]['name'].replace("\"",""), collectedfrom=x[1][1]['collectedfrom'])).sortBy(lambda x: x['dedup_id']) #CREATE CSV FILE (file structure: dedup_id; duplicate_id; original_id; name; collectedfrom csv_name = "ds_dedup_" + str(datetime.now()).split(".")[0].replace(" ", "_").replace(":", ".") + ".csv" f = open("../../data/processed/" + csv_name, "w") f.write("dedup_id;duplicate_id;original_id;name;collectedfrom\n") for row in joinRes.collect(): line = row['dedup_id'] + ";" + row['duplicate_id'] + ";" + row['original_id'] + ";\"" + row['name'] + "\";" + row['collectedfrom'] + "\n" f.write(line.encode("utf-8")) f.close()