import json import sys import re from pyspark.sql.types import * from pyspark.sql import SQLContext from pyspark.sql.functions import * import copy sqlContext = SQLContext(sc) paper_dataset_propagation = { "documents": { "prob": 1.0, "path":set() }, "isderivedfrom": { "prob": 0.9, "path":set() }, "issourceof": { "prob": 0.9, "path":set() }, "reviews": { "prob": 0.8, "path":set() }, "references": { "prob": 0.7, "path":set() }, "issupplementedby": { "prob": 1.0, "path":set() }, "cites": { "prob": 0.7, "path":set() } } dataset_dataset_propagation= { "issupplementedby": { "prob": 1.0 }, "documents": { "prob": 0.7 }, "iscitedby": { "prob": 0.7 }, "haspart": { "prob": 1.0 }, "isdocumentedby": { "prob": 0.7 }, "continues": { "prob": 1.0 }, "cites": { "prob": 0.7 }, "issupplementto": { "prob": 1.0 }, "isnewversionof": { "prob": 1.0 }, "ispartof": { "prob": 1.0 }, "references": { "prob": 0.7 }, "isreferencedby": { "prob": 0.7 }, "iscontinuedby": { "prob": 1.0 }, "isvariantformof": { "prob": 0.9 } } def propagateDataset(x): propagation = copy.deepcopy(x[1][0]) #dictionary {"publicationId":{propagation_probabilities and path}} dsprob = x[1][1] #dictionary {"datasetId":{dataset_probabilities}} source = dsprob.keys().pop() todelpid = set() for pid in propagation: entry = propagation[pid] if source in propagation[pid]['path']: todelpid.add(pid) continue for use in entry: if use == 'path': continue new_p = entry[use] * dsprob[source]["prob"] if new_p > 0.3: entry[use] = new_p propagation[pid]['path'].add(x[0]) else: todelpid.add(pid) for pid in todelpid: del propagation[pid] return (source, propagation) def reduceRelation(a, b): if a is None: return b if b is None: return a for pid in b: if not pid in a: a[pid] = copy.deepcopy(b[pid]) else: probabilities = b[pid] for prob in probabilities: if prob =='path': for e in probabilities['path']: a[pid]['path'].add(e) continue if prob in a[pid]: if a[pid][prob] < probabilities[prob]: a[pid][prob] = probabilities[prob] else: a[pid][prob] = probabilities[prob] return a def hasDescription(x): if 'description' in x and not x['description'] is None: for dic in x['description']: if dic['value'] is not None and dic['value'].strip() != "": return True return False load_datasets = sc.textFile().map(json.loads).filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference']) load_publications = sc.textFile().map(json.loads).filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference']) relations_rdd = spark.read.parquet().rdd.filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference']) #relations from publication to dataset in the graph subset pubs_relation = relations_rdd.filter(lambda x: x['source'][:2] == '50' and x['target'][:2] == '60' and x['relType'].lower() in paper_dataset_propagation) #relation from dataset to dataset (no self loops) in the graph subset dats_relation = relations_rdd.filter(lambda x: x['source'][:2] == '60' and x['target'][:2] == '60' and x['source'] != x['target'] and x['relType'].lower() in dataset_dataset_propagation) #distinct publication subset appearing in a relation to at least one dataset pubs_subgraph = pubs_relation.map(lambda x: (x['source'],1)).reduceByKey(lambda a,b : a+b).join(load_publications.map(lambda x:(x['id'],x))).map(lambda x: x[1][1]) #publications with abstract pubs_with_abst = pubs_subgraph.filter(hasDescription).map(lambda x:(x['id'],x)) #relations from publication with abstract to dataset rel_pubs_dats_abst = pubs_relation.map(lambda x: (x['source'],x)).join(pubs_with_abst).map(lambda x: x[1][0]).map(lambda x: (x['target'], x)).join(load_datasets.map(lambda x: (x['id'], 1))).map(lambda x: x[1][0]) publication_dataset = rel_pubs_dats_abst.map(lambda x: (x['target'], {x['source']:copy.deepcopy(paper_dataset_propagation[x['relType'].lower()])})) dataset_dataset = dats_relation.map(lambda x: (x['source'], {x['target']:copy.deepcopy(dataset_dataset_propagation[x['relType'].lower()])})) propagation = publication_dataset.reduceByKey(reduceRelation) count = 2 hops = 3 while (count <= hops): pl_new = propagation.join(dataset_dataset).map(propagateDataset).filter(lambda x: len(x[1]) > 0) if pl_new.count() == 0: break propagation = pl_new.union(propagation).reduceByKey(reduceRelation) count += 1