From afec5e4cf9da8280cae40f7dffac94bf4311b5d5 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 29 Jun 2020 18:07:56 +0200 Subject: [PATCH] Code for the context propagation --- ScholexplorerPropagation.py | 184 ++++++++++++++++++++++++++++++++++++ 1 file changed, 184 insertions(+) create mode 100644 ScholexplorerPropagation.py diff --git a/ScholexplorerPropagation.py b/ScholexplorerPropagation.py new file mode 100644 index 0000000..9f82e8c --- /dev/null +++ b/ScholexplorerPropagation.py @@ -0,0 +1,184 @@ +import json +import sys +import re +from pyspark.sql.types import * +from pyspark.sql import SQLContext +from pyspark.sql.functions import * +import copy + +sqlContext = SQLContext(sc) + +paper_dataset_propagation = { + "documents": { + "prob": 1.0, + "path":set() + }, + "isderivedfrom": { + "prob": 0.9, + "path":set() + }, + "issourceof": { + "prob": 0.9, + "path":set() + }, + "reviews": { + "prob": 0.8, + "path":set() + }, + "references": { + "prob": 0.7, + "path":set() + }, + "issupplementedby": { + "prob": 1.0, + "path":set() + }, + "cites": { + "prob": 0.7, + "path":set() + } +} + +dataset_dataset_propagation= { + "issupplementedby": { + "prob": 1.0 + }, + "documents": { + "prob": 0.7 + }, + "iscitedby": { + "prob": 0.7 + }, + "haspart": { + "prob": 1.0 + }, + "isdocumentedby": { + "prob": 0.7 + }, + "continues": { + "prob": 1.0 + }, + "cites": { + "prob": 0.7 + + }, + "issupplementto": { + "prob": 1.0 + + }, + "isnewversionof": { + "prob": 1.0 + }, + "ispartof": { + "prob": 1.0 + + }, + "references": { + "prob": 0.7 + + }, + "isreferencedby": { + "prob": 0.7 + + }, + "iscontinuedby": { + "prob": 1.0 + + }, + "isvariantformof": { + "prob": 0.9 + + } +} + + + +def propagateDataset(x): + propagation = copy.deepcopy(x[1][0]) #dictionary {"publicationId":{propagation_probabilities and path}} + dsprob = x[1][1] #dictionary {"datasetId":{dataset_probabilities}} + source = dsprob.keys().pop() + todelpid = set() + for pid in propagation: + entry = propagation[pid] + if source in propagation[pid]['path']: + todelpid.add(pid) + continue + for use in entry: + if use == 'path': + continue + new_p = entry[use] * dsprob[source]["prob"] + if new_p > 0.3: + entry[use] = new_p + propagation[pid]['path'].add(x[0]) + else: + todelpid.add(pid) + for pid in todelpid: + del propagation[pid] + return (source, propagation) + + +def reduceRelation(a, b): + if a is None: + return b + if b is None: + return a + for pid in b: + if not pid in a: + a[pid] = copy.deepcopy(b[pid]) + else: + probabilities = b[pid] + for prob in probabilities: + if prob =='path': + for e in probabilities['path']: + a[pid]['path'].add(e) + continue + if prob in a[pid]: + if a[pid][prob] < probabilities[prob]: + a[pid][prob] = probabilities[prob] + else: + a[pid][prob] = probabilities[prob] + return a + + +def hasDescription(x): + if 'description' in x and not x['description'] is None: + for dic in x['description']: + if dic['value'] is not None and dic['value'].strip() != "": + return True + return False + + +load_datasets = sc.textFile('/user/sandro.labruzzo/scholix/graph/dataset').map(json.loads).filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference']) +load_publications = sc.textFile('/user/sandro.labruzzo/scholix/graph/publication').map(json.loads).filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference']) +relations_rdd = spark.read.parquet('/user/sandro.labruzzo/scholix/graph/relation').rdd.filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference']) + +#relations from publication to dataset in the graph subset +pubs_relation = relations_rdd.filter(lambda x: x['source'][:2] == '50' and x['target'][:2] == '60' and x['relType'].lower() in paper_dataset_propagation) + +#relation from dataset to dataset (no self loops) in the graph subset +dats_relation = relations_rdd.filter(lambda x: x['source'][:2] == '60' and x['target'][:2] == '60' and x['source'] != x['target'] and x['relType'].lower() in dataset_dataset_propagation) + +#distinct publication subset appearing in a relation to at least one dataset +pubs_subgraph = pubs_relation.map(lambda x: (x['source'],1)).reduceByKey(lambda a,b : a+b).join(load_publications.map(lambda x:(x['id'],x))).map(lambda x: x[1][1]) + +#publications with abstract +pubs_with_abst = pubs_subgraph.filter(hasDescription).map(lambda x:(x['id'],x)) + +#relations from publication with abstract to dataset +rel_pubs_dats_abst = pubs_relation.map(lambda x: (x['source'],x)).join(pubs_with_abst).map(lambda x: x[1][0]).map(lambda x: (x['target'], x)).join(load_datasets.map(lambda x: (x['id'], 1))).map(lambda x: x[1][0]) + + +publication_dataset = rel_pubs_dats_abst.map(lambda x: (x['target'], {x['source']:copy.deepcopy(paper_dataset_propagation[x['relType'].lower()])})) +dataset_dataset = dats_relation.map(lambda x: (x['source'], {x['target']:copy.deepcopy(dataset_dataset_propagation[x['relType'].lower()])})) + + +propagation = publication_dataset.reduceByKey(reduceRelation) + +count = 2 +hops = 3 +while (count <= hops): + pl_new = propagation.join(dataset_dataset).map(propagateDataset).filter(lambda x: len(x[1]) > 0) + if pl_new.count() == 0: + break + propagation = pl_new.union(propagation).reduceByKey(reduceRelation) + count += 1