context-propagation/ScholexplorerPropagation.py

185 lines
5.3 KiB
Python

import json
import sys
import re
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
import copy
sqlContext = SQLContext(sc)
paper_dataset_propagation = {
"documents": {
"prob": 1.0,
"path":set()
},
"isderivedfrom": {
"prob": 0.9,
"path":set()
},
"issourceof": {
"prob": 0.9,
"path":set()
},
"reviews": {
"prob": 0.8,
"path":set()
},
"references": {
"prob": 0.7,
"path":set()
},
"issupplementedby": {
"prob": 1.0,
"path":set()
},
"cites": {
"prob": 0.7,
"path":set()
}
}
dataset_dataset_propagation= {
"issupplementedby": {
"prob": 1.0
},
"documents": {
"prob": 0.7
},
"iscitedby": {
"prob": 0.7
},
"haspart": {
"prob": 1.0
},
"isdocumentedby": {
"prob": 0.7
},
"continues": {
"prob": 1.0
},
"cites": {
"prob": 0.7
},
"issupplementto": {
"prob": 1.0
},
"isnewversionof": {
"prob": 1.0
},
"ispartof": {
"prob": 1.0
},
"references": {
"prob": 0.7
},
"isreferencedby": {
"prob": 0.7
},
"iscontinuedby": {
"prob": 1.0
},
"isvariantformof": {
"prob": 0.9
}
}
def propagateDataset(x):
propagation = copy.deepcopy(x[1][0]) #dictionary {"publicationId":{propagation_probabilities and path}}
dsprob = x[1][1] #dictionary {"datasetId":{dataset_probabilities}}
source = dsprob.keys().pop()
todelpid = set()
for pid in propagation:
entry = propagation[pid]
if source in propagation[pid]['path']:
todelpid.add(pid)
continue
for use in entry:
if use == 'path':
continue
new_p = entry[use] * dsprob[source]["prob"]
if new_p > 0.3:
entry[use] = new_p
propagation[pid]['path'].add(x[0])
else:
todelpid.add(pid)
for pid in todelpid:
del propagation[pid]
return (source, propagation)
def reduceRelation(a, b):
if a is None:
return b
if b is None:
return a
for pid in b:
if not pid in a:
a[pid] = copy.deepcopy(b[pid])
else:
probabilities = b[pid]
for prob in probabilities:
if prob =='path':
for e in probabilities['path']:
a[pid]['path'].add(e)
continue
if prob in a[pid]:
if a[pid][prob] < probabilities[prob]:
a[pid][prob] = probabilities[prob]
else:
a[pid][prob] = probabilities[prob]
return a
def hasDescription(x):
if 'description' in x and not x['description'] is None:
for dic in x['description']:
if dic['value'] is not None and dic['value'].strip() != "":
return True
return False
load_datasets = sc.textFile(<INPUT_DATASET_PATH>).map(json.loads).filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference'])
load_publications = sc.textFile(<INPUT_PUBLICATION_PATH>).map(json.loads).filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference'])
relations_rdd = spark.read.parquet(<INPUT_RELATION_PATH>).rdd.filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference'])
#relations from publication to dataset in the graph subset
pubs_relation = relations_rdd.filter(lambda x: x['source'][:2] == '50' and x['target'][:2] == '60' and x['relType'].lower() in paper_dataset_propagation)
#relation from dataset to dataset (no self loops) in the graph subset
dats_relation = relations_rdd.filter(lambda x: x['source'][:2] == '60' and x['target'][:2] == '60' and x['source'] != x['target'] and x['relType'].lower() in dataset_dataset_propagation)
#distinct publication subset appearing in a relation to at least one dataset
pubs_subgraph = pubs_relation.map(lambda x: (x['source'],1)).reduceByKey(lambda a,b : a+b).join(load_publications.map(lambda x:(x['id'],x))).map(lambda x: x[1][1])
#publications with abstract
pubs_with_abst = pubs_subgraph.filter(hasDescription).map(lambda x:(x['id'],x))
#relations from publication with abstract to dataset
rel_pubs_dats_abst = pubs_relation.map(lambda x: (x['source'],x)).join(pubs_with_abst).map(lambda x: x[1][0]).map(lambda x: (x['target'], x)).join(load_datasets.map(lambda x: (x['id'], 1))).map(lambda x: x[1][0])
publication_dataset = rel_pubs_dats_abst.map(lambda x: (x['target'], {x['source']:copy.deepcopy(paper_dataset_propagation[x['relType'].lower()])}))
dataset_dataset = dats_relation.map(lambda x: (x['source'], {x['target']:copy.deepcopy(dataset_dataset_propagation[x['relType'].lower()])}))
propagation = publication_dataset.reduceByKey(reduceRelation)
count = 2
hops = 3
while (count <= hops):
pl_new = propagation.join(dataset_dataset).map(propagateDataset).filter(lambda x: len(x[1]) > 0)
if pl_new.count() == 0:
break
propagation = pl_new.union(propagation).reduceByKey(reduceRelation)
count += 1