Code for the context propagation
This commit is contained in:
parent
0b8b6ae9a9
commit
afec5e4cf9
|
@ -0,0 +1,184 @@
|
|||
import json
|
||||
import sys
|
||||
import re
|
||||
from pyspark.sql.types import *
|
||||
from pyspark.sql import SQLContext
|
||||
from pyspark.sql.functions import *
|
||||
import copy
|
||||
|
||||
sqlContext = SQLContext(sc)
|
||||
|
||||
paper_dataset_propagation = {
|
||||
"documents": {
|
||||
"prob": 1.0,
|
||||
"path":set()
|
||||
},
|
||||
"isderivedfrom": {
|
||||
"prob": 0.9,
|
||||
"path":set()
|
||||
},
|
||||
"issourceof": {
|
||||
"prob": 0.9,
|
||||
"path":set()
|
||||
},
|
||||
"reviews": {
|
||||
"prob": 0.8,
|
||||
"path":set()
|
||||
},
|
||||
"references": {
|
||||
"prob": 0.7,
|
||||
"path":set()
|
||||
},
|
||||
"issupplementedby": {
|
||||
"prob": 1.0,
|
||||
"path":set()
|
||||
},
|
||||
"cites": {
|
||||
"prob": 0.7,
|
||||
"path":set()
|
||||
}
|
||||
}
|
||||
|
||||
dataset_dataset_propagation= {
|
||||
"issupplementedby": {
|
||||
"prob": 1.0
|
||||
},
|
||||
"documents": {
|
||||
"prob": 0.7
|
||||
},
|
||||
"iscitedby": {
|
||||
"prob": 0.7
|
||||
},
|
||||
"haspart": {
|
||||
"prob": 1.0
|
||||
},
|
||||
"isdocumentedby": {
|
||||
"prob": 0.7
|
||||
},
|
||||
"continues": {
|
||||
"prob": 1.0
|
||||
},
|
||||
"cites": {
|
||||
"prob": 0.7
|
||||
|
||||
},
|
||||
"issupplementto": {
|
||||
"prob": 1.0
|
||||
|
||||
},
|
||||
"isnewversionof": {
|
||||
"prob": 1.0
|
||||
},
|
||||
"ispartof": {
|
||||
"prob": 1.0
|
||||
|
||||
},
|
||||
"references": {
|
||||
"prob": 0.7
|
||||
|
||||
},
|
||||
"isreferencedby": {
|
||||
"prob": 0.7
|
||||
|
||||
},
|
||||
"iscontinuedby": {
|
||||
"prob": 1.0
|
||||
|
||||
},
|
||||
"isvariantformof": {
|
||||
"prob": 0.9
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
def propagateDataset(x):
|
||||
propagation = copy.deepcopy(x[1][0]) #dictionary {"publicationId":{propagation_probabilities and path}}
|
||||
dsprob = x[1][1] #dictionary {"datasetId":{dataset_probabilities}}
|
||||
source = dsprob.keys().pop()
|
||||
todelpid = set()
|
||||
for pid in propagation:
|
||||
entry = propagation[pid]
|
||||
if source in propagation[pid]['path']:
|
||||
todelpid.add(pid)
|
||||
continue
|
||||
for use in entry:
|
||||
if use == 'path':
|
||||
continue
|
||||
new_p = entry[use] * dsprob[source]["prob"]
|
||||
if new_p > 0.3:
|
||||
entry[use] = new_p
|
||||
propagation[pid]['path'].add(x[0])
|
||||
else:
|
||||
todelpid.add(pid)
|
||||
for pid in todelpid:
|
||||
del propagation[pid]
|
||||
return (source, propagation)
|
||||
|
||||
|
||||
def reduceRelation(a, b):
|
||||
if a is None:
|
||||
return b
|
||||
if b is None:
|
||||
return a
|
||||
for pid in b:
|
||||
if not pid in a:
|
||||
a[pid] = copy.deepcopy(b[pid])
|
||||
else:
|
||||
probabilities = b[pid]
|
||||
for prob in probabilities:
|
||||
if prob =='path':
|
||||
for e in probabilities['path']:
|
||||
a[pid]['path'].add(e)
|
||||
continue
|
||||
if prob in a[pid]:
|
||||
if a[pid][prob] < probabilities[prob]:
|
||||
a[pid][prob] = probabilities[prob]
|
||||
else:
|
||||
a[pid][prob] = probabilities[prob]
|
||||
return a
|
||||
|
||||
|
||||
def hasDescription(x):
|
||||
if 'description' in x and not x['description'] is None:
|
||||
for dic in x['description']:
|
||||
if dic['value'] is not None and dic['value'].strip() != "":
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
load_datasets = sc.textFile('/user/sandro.labruzzo/scholix/graph/dataset').map(json.loads).filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference'])
|
||||
load_publications = sc.textFile('/user/sandro.labruzzo/scholix/graph/publication').map(json.loads).filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference'])
|
||||
relations_rdd = spark.read.parquet('/user/sandro.labruzzo/scholix/graph/relation').rdd.filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference'])
|
||||
|
||||
#relations from publication to dataset in the graph subset
|
||||
pubs_relation = relations_rdd.filter(lambda x: x['source'][:2] == '50' and x['target'][:2] == '60' and x['relType'].lower() in paper_dataset_propagation)
|
||||
|
||||
#relation from dataset to dataset (no self loops) in the graph subset
|
||||
dats_relation = relations_rdd.filter(lambda x: x['source'][:2] == '60' and x['target'][:2] == '60' and x['source'] != x['target'] and x['relType'].lower() in dataset_dataset_propagation)
|
||||
|
||||
#distinct publication subset appearing in a relation to at least one dataset
|
||||
pubs_subgraph = pubs_relation.map(lambda x: (x['source'],1)).reduceByKey(lambda a,b : a+b).join(load_publications.map(lambda x:(x['id'],x))).map(lambda x: x[1][1])
|
||||
|
||||
#publications with abstract
|
||||
pubs_with_abst = pubs_subgraph.filter(hasDescription).map(lambda x:(x['id'],x))
|
||||
|
||||
#relations from publication with abstract to dataset
|
||||
rel_pubs_dats_abst = pubs_relation.map(lambda x: (x['source'],x)).join(pubs_with_abst).map(lambda x: x[1][0]).map(lambda x: (x['target'], x)).join(load_datasets.map(lambda x: (x['id'], 1))).map(lambda x: x[1][0])
|
||||
|
||||
|
||||
publication_dataset = rel_pubs_dats_abst.map(lambda x: (x['target'], {x['source']:copy.deepcopy(paper_dataset_propagation[x['relType'].lower()])}))
|
||||
dataset_dataset = dats_relation.map(lambda x: (x['source'], {x['target']:copy.deepcopy(dataset_dataset_propagation[x['relType'].lower()])}))
|
||||
|
||||
|
||||
propagation = publication_dataset.reduceByKey(reduceRelation)
|
||||
|
||||
count = 2
|
||||
hops = 3
|
||||
while (count <= hops):
|
||||
pl_new = propagation.join(dataset_dataset).map(propagateDataset).filter(lambda x: len(x[1]) > 0)
|
||||
if pl_new.count() == 0:
|
||||
break
|
||||
propagation = pl_new.union(propagation).reduceByKey(reduceRelation)
|
||||
count += 1
|
Loading…
Reference in New Issue