185 lines
5.3 KiB
Python
185 lines
5.3 KiB
Python
import json
|
|
import sys
|
|
import re
|
|
from pyspark.sql.types import *
|
|
from pyspark.sql import SQLContext
|
|
from pyspark.sql.functions import *
|
|
import copy
|
|
|
|
sqlContext = SQLContext(sc)
|
|
|
|
paper_dataset_propagation = {
|
|
"documents": {
|
|
"prob": 1.0,
|
|
"path":set()
|
|
},
|
|
"isderivedfrom": {
|
|
"prob": 0.9,
|
|
"path":set()
|
|
},
|
|
"issourceof": {
|
|
"prob": 0.9,
|
|
"path":set()
|
|
},
|
|
"reviews": {
|
|
"prob": 0.8,
|
|
"path":set()
|
|
},
|
|
"references": {
|
|
"prob": 0.7,
|
|
"path":set()
|
|
},
|
|
"issupplementedby": {
|
|
"prob": 1.0,
|
|
"path":set()
|
|
},
|
|
"cites": {
|
|
"prob": 0.7,
|
|
"path":set()
|
|
}
|
|
}
|
|
|
|
dataset_dataset_propagation= {
|
|
"issupplementedby": {
|
|
"prob": 1.0
|
|
},
|
|
"documents": {
|
|
"prob": 0.7
|
|
},
|
|
"iscitedby": {
|
|
"prob": 0.7
|
|
},
|
|
"haspart": {
|
|
"prob": 1.0
|
|
},
|
|
"isdocumentedby": {
|
|
"prob": 0.7
|
|
},
|
|
"continues": {
|
|
"prob": 1.0
|
|
},
|
|
"cites": {
|
|
"prob": 0.7
|
|
|
|
},
|
|
"issupplementto": {
|
|
"prob": 1.0
|
|
|
|
},
|
|
"isnewversionof": {
|
|
"prob": 1.0
|
|
},
|
|
"ispartof": {
|
|
"prob": 1.0
|
|
|
|
},
|
|
"references": {
|
|
"prob": 0.7
|
|
|
|
},
|
|
"isreferencedby": {
|
|
"prob": 0.7
|
|
|
|
},
|
|
"iscontinuedby": {
|
|
"prob": 1.0
|
|
|
|
},
|
|
"isvariantformof": {
|
|
"prob": 0.9
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
def propagateDataset(x):
|
|
propagation = copy.deepcopy(x[1][0]) #dictionary {"publicationId":{propagation_probabilities and path}}
|
|
dsprob = x[1][1] #dictionary {"datasetId":{dataset_probabilities}}
|
|
source = dsprob.keys().pop()
|
|
todelpid = set()
|
|
for pid in propagation:
|
|
entry = propagation[pid]
|
|
if source in propagation[pid]['path']:
|
|
todelpid.add(pid)
|
|
continue
|
|
for use in entry:
|
|
if use == 'path':
|
|
continue
|
|
new_p = entry[use] * dsprob[source]["prob"]
|
|
if new_p > 0.3:
|
|
entry[use] = new_p
|
|
propagation[pid]['path'].add(x[0])
|
|
else:
|
|
todelpid.add(pid)
|
|
for pid in todelpid:
|
|
del propagation[pid]
|
|
return (source, propagation)
|
|
|
|
|
|
def reduceRelation(a, b):
|
|
if a is None:
|
|
return b
|
|
if b is None:
|
|
return a
|
|
for pid in b:
|
|
if not pid in a:
|
|
a[pid] = copy.deepcopy(b[pid])
|
|
else:
|
|
probabilities = b[pid]
|
|
for prob in probabilities:
|
|
if prob =='path':
|
|
for e in probabilities['path']:
|
|
a[pid]['path'].add(e)
|
|
continue
|
|
if prob in a[pid]:
|
|
if a[pid][prob] < probabilities[prob]:
|
|
a[pid][prob] = probabilities[prob]
|
|
else:
|
|
a[pid][prob] = probabilities[prob]
|
|
return a
|
|
|
|
|
|
def hasDescription(x):
|
|
if 'description' in x and not x['description'] is None:
|
|
for dic in x['description']:
|
|
if dic['value'] is not None and dic['value'].strip() != "":
|
|
return True
|
|
return False
|
|
|
|
|
|
load_datasets = sc.textFile(<INPUT_DATASET_PATH>).map(json.loads).filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference'])
|
|
load_publications = sc.textFile(<INPUT_PUBLICATION_PATH>).map(json.loads).filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference'])
|
|
relations_rdd = spark.read.parquet(<INPUT_RELATION_PATH>).rdd.filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference'])
|
|
|
|
#relations from publication to dataset in the graph subset
|
|
pubs_relation = relations_rdd.filter(lambda x: x['source'][:2] == '50' and x['target'][:2] == '60' and x['relType'].lower() in paper_dataset_propagation)
|
|
|
|
#relation from dataset to dataset (no self loops) in the graph subset
|
|
dats_relation = relations_rdd.filter(lambda x: x['source'][:2] == '60' and x['target'][:2] == '60' and x['source'] != x['target'] and x['relType'].lower() in dataset_dataset_propagation)
|
|
|
|
#distinct publication subset appearing in a relation to at least one dataset
|
|
pubs_subgraph = pubs_relation.map(lambda x: (x['source'],1)).reduceByKey(lambda a,b : a+b).join(load_publications.map(lambda x:(x['id'],x))).map(lambda x: x[1][1])
|
|
|
|
#publications with abstract
|
|
pubs_with_abst = pubs_subgraph.filter(hasDescription).map(lambda x:(x['id'],x))
|
|
|
|
#relations from publication with abstract to dataset
|
|
rel_pubs_dats_abst = pubs_relation.map(lambda x: (x['source'],x)).join(pubs_with_abst).map(lambda x: x[1][0]).map(lambda x: (x['target'], x)).join(load_datasets.map(lambda x: (x['id'], 1))).map(lambda x: x[1][0])
|
|
|
|
|
|
publication_dataset = rel_pubs_dats_abst.map(lambda x: (x['target'], {x['source']:copy.deepcopy(paper_dataset_propagation[x['relType'].lower()])}))
|
|
dataset_dataset = dats_relation.map(lambda x: (x['source'], {x['target']:copy.deepcopy(dataset_dataset_propagation[x['relType'].lower()])}))
|
|
|
|
|
|
propagation = publication_dataset.reduceByKey(reduceRelation)
|
|
|
|
count = 2
|
|
hops = 3
|
|
while (count <= hops):
|
|
pl_new = propagation.join(dataset_dataset).map(propagateDataset).filter(lambda x: len(x[1]) > 0)
|
|
if pl_new.count() == 0:
|
|
break
|
|
propagation = pl_new.union(propagation).reduceByKey(reduceRelation)
|
|
count += 1
|