From 0b8b6ae9a9611e3bbb944ef858ac241f5943d7e2 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 29 Jun 2020 17:12:45 +0200 Subject: [PATCH] - --- ScholexplorerPropagation.json | 114 +++++++++++++++++++++++++++++++++- 1 file changed, 113 insertions(+), 1 deletion(-) diff --git a/ScholexplorerPropagation.json b/ScholexplorerPropagation.json index a10b438..c2677d6 100644 --- a/ScholexplorerPropagation.json +++ b/ScholexplorerPropagation.json @@ -1 +1,113 @@ -{"paragraphs":[{"text":"%pyspark\nimport json\nimport sys\nimport re\nfrom pyspark.sql.types import *\nfrom pyspark.sql import SQLContext\nfrom pyspark.sql.functions import *\nimport copy\n\nsqlContext = SQLContext(sc)\n\npaper_dataset_propagation = {\n \"documents\": {\n \"prob\": 1.0,\n \"path\":set()\n },\n \"isderivedfrom\": {\n \"prob\": 0.9,\n \"path\":set()\n },\n \"issourceof\": {\n \"prob\": 0.7,\n \"path\":set()\n },\n \"reviews\": {\n \"prob\": 0.8,\n \"path\":set()\n },\n \"references\": {\n \"prob\": 1.0,\n \"path\":set()\n },\n \"issupplementedby\": {\n \"prob\": 0.8,\n \"path\":set()\n },\n \"cites\": {\n \"prob\": 0.8,\n \"path\":set()\n }\n}\n\ndataset_dataset_propagation= {\n \"issupplementedby\": {\n \"prob\": 1.0\n },\n \"documents\": {\n \"prob\": 0.9\n },\n \"iscitedby\": {\n \"prob\": 0.9\n },\n \"haspart\": {\n \"prob\": 0.7 },\n \"isdocumentedby\": {\n \"prob\": 0.7 },\n \"continues\": {\n \"prob\": 0.8 },\n \"cites\": {\n \"prob\": 1.0 },\n \"issupplementto\": {\n \"prob\": 0.8 },\n \"isnewversionof\": {\n \"prob\": 0.9 },\n \"ispartof\": {\n \"prob\": 0.8 },\n \"references\": {\n \"prob\": 1.0 },\n \"isreferencedby\": {\n \"prob\": 0.9 },\n \"iscontinuedby\": {\n \"prob\": 0.7 },\n \"isvariantformof\": {\n \"prob\": 0.9 }\n }\n\n\n \ndef propagateDataset(x):\n propagation = copy.deepcopy(x[1][0]) #dictionary {\"publicationId\":{propagation_probabilities and path}}\n dsprob = x[1][1] #dictionary {\"datasetId\":{dataset_probabilities}}\n source = dsprob.keys().pop()\n todelpid = set()\n for pid in propagation:\n entry = propagation[pid]\n if source in propagation[pid]['path']:\n todelpid.add(pid)\n continue\n for use in entry:\n if use == 'path':\n continue\n new_p = entry[use] * dsprob[source][\"prob\"]\n if new_p > 0.3:\n entry[use] = new_p\n propagation[pid]['path'].add(x[0])\n else:\n todelpid.add(pid)\n for pid in todelpid:\n del propagation[pid]\n return (source, propagation)\n\ndef reduceRelation(a, b):\n if a is None:\n return b\n if b is None:\n return a \n for pid in b:\n if not pid in a:\n a[pid] = copy.deepcopy(b[pid])\n else:\n probabilities = b[pid]\n for prob in probabilities:\n if prob =='path':\n for e in probabilities['path']:\n a[pid]['path'].add(e)\n continue\n if prob in a[pid]:\n if a[pid][prob] < probabilities[prob]:\n a[pid][prob] = probabilities[prob]\n else:\n a[pid][prob] = probabilities[prob]\n return a \n \ndef hasDescription(x):\n if 'description' in x and not x['description'] is None:\n for dic in x['description']:\n if dic['value'] is not None and dic['value'].strip() != \"\":\n return True\n return False\n ","user":"miriam.baglioni","dateUpdated":"2020-06-29T14:55:43+0000","config":{"editorSetting":{"language":"python","editOnDblClick":false,"completionKey":"TAB","completionSupport":true},"colWidth":12,"editorMode":"ace/mode/python","fontSize":9,"results":{},"enabled":true},"settings":{"params":{},"forms":{}},"results":{"code":"SUCCESS","msg":[]},"apps":[],"jobName":"paragraph_1593089330199_-1573015420","id":"20200521-082800_526102814","dateCreated":"2020-06-25T12:48:50+0000","status":"READY","errorMessage":"","progressUpdateIntervalMs":500,"focus":true,"$$hashKey":"object:1124"},{"text":"%pyspark\n\nload_datasets = sc.textFile('/user/sandro.labruzzo/scholix/graph/dataset').map(json.loads).filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference'])\nload_publications = sc.textFile('/user/sandro.labruzzo/scholix/graph/publication').map(json.loads).filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference'])\nrelations_rdd = spark.read.parquet('/user/sandro.labruzzo/scholix/graph/relation').rdd.filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference'])\n\n#relations from publication to dataset in the graph subset \npubs_relation = relations_rdd.filter(lambda x: x['source'][:2] == '50' and x['target'][:2] == '60' and x['relType'].lower() in paper_dataset_propagation)\n\n#relation from dataset to dataset (no self loops) in the graph subset\ndats_relation = relations_rdd.filter(lambda x: x['source'][:2] == '60' and x['target'][:2] == '60' and x['source'] != x['target'] and x['relType'].lower() in dataset_dataset_propagation)\n\n#distinct publication subset appearing in a relation to at least one dataset\npubs_subgraph = pubs_relation.map(lambda x: (x['source'],1)).reduceByKey(lambda a,b : a+b).join(load_publications.map(lambda x:(x['id'],x))).map(lambda x: x[1][1])\n\n#publications with abstract\npubs_with_abst = pubs_subgraph.filter(hasDescription).map(lambda x:(x['id'],x))\n\n#relations from publication with abstract to dataset\nrel_pubs_dats_abst = pubs_relation.map(lambda x: (x['source'],x)).join(pubs_with_abst).map(lambda x: x[1][0]).map(lambda x: (x['target'], x)).join(load_datasets.map(lambda x: (x['id'], 1))).map(lambda x: x[1][0])\n\n\npublication_dataset = rel_pubs_dats_abst.map(lambda x: (x['target'], {x['source']:copy.deepcopy(paper_dataset_propagation[x['relType'].lower()])}))\ndataset_dataset = dats_relation.map(lambda x: (x['source'], {x['target']:copy.deepcopy(dataset_dataset_propagation[x['relType'].lower()])}))\n\n\npl1 = publication_dataset.reduceByKey(reduceRelation)\n\npreviuos_propagation = pl1\npl1.count()\ncount = 2\nhops = 3\nwhile (True):\n if count > hops:\n break\n pl_step1 = previuos_propagation.join(dataset_dataset)\n pl_step2 = pl_step1.map(propagateDataset).filter(lambda x: len(x[1]) > 0)\n if pl_step2.count() == 0:\n break\n pl_step3 = pl_step2.reduceByKey(reduceRelation)\n current_propagation = pl_step3.union(previuos_propagation).reduceByKey(reduceRelation)\n current_propagation.count()\n count += 1\n previuos_propagation = current_propagation\n\n","user":"miriam.baglioni","dateUpdated":"2020-06-29T14:52:36+0000","config":{"editorSetting":{"language":"python","editOnDblClick":false,"completionKey":"TAB","completionSupport":true},"colWidth":12,"editorMode":"ace/mode/python","fontSize":9,"results":{},"enabled":true},"settings":{"params":{},"forms":{}},"apps":[],"jobName":"paragraph_1593089330225_1214619039","id":"20200521-084556_457403103","dateCreated":"2020-06-25T12:48:50+0000","status":"READY","errorMessage":"","progressUpdateIntervalMs":500,"$$hashKey":"object:1125"},{"text":"%pyspark\n","user":"miriam.baglioni","dateUpdated":"2020-06-29T14:53:46+0000","config":{"colWidth":12,"fontSize":9,"enabled":true,"results":{},"editorSetting":{"language":"scala","editOnDblClick":false,"completionKey":"TAB","completionSupport":true},"editorMode":"ace/mode/scala"},"settings":{"params":{},"forms":{}},"apps":[],"jobName":"paragraph_1593442426323_1460687479","id":"20200629-145346_169818547","dateCreated":"2020-06-29T14:53:46+0000","status":"READY","progressUpdateIntervalMs":500,"focus":true,"$$hashKey":"object:1868"},{"text":"%pyspark\nmap_publications = pubs_with_abst.map(lambda x: x[1]).map(getDescriptionTitleAndSubject).filter(lambda x: not x['description'] == [])\n\npub_id_maps_dex = map_publications.map(lambda x: (x['id'],x['description']))\n\nexpand_propagation = current_propagation.map(lambda x: {'id':x[0],\"propInfo\":x[1]}).map(removePathFromPropagation).flatMap(expandPropagation)\n\npropagated_abst = pub_id_maps_dex.join(expand_propagation).map(putDexInLevel).reduceByKey(reduceAbstracts)\n\nmap_datasets = load_datasets.map(getDescriptionTitleAndSubject)\n\nds_id_map_dex = map_datasets.map(lambda x: (x['id'],{\"dex\":x['description'], \"title\":x['title'], \"pid\":x['pid']}))\n\n#todo index datasets without propagation\ncompleteInfo = ds_id_map_dex.leftOuterJoin(propagated_abst).map(mergeInfoDsOptionalNoUsage)\n\ncompleteInfo.map(json.dumps).saveAsTextFile(path='/tmp/propagationSE_FS_3hops/toIndexAllDS', compressionCodecClass=\"org.apache.hadoop.io.compress.GzipCodec\")\nindex_publication = load_publications.map(getDescriptionTitleAndSubject)\nindex_publication.map(json.dumps).saveAsTextFile(path = '/tmp/propagationSE_FS_3hops/toIndexAllPB', compressionCodecClass=\"org.apache.hadoop.io.compress.GzipCodec\")","user":"miriam.baglioni","dateUpdated":"2020-06-29T13:49:22+0000","config":{"colWidth":12,"fontSize":9,"enabled":true,"results":{},"editorSetting":{"language":"python","editOnDblClick":false,"completionKey":"TAB","completionSupport":true},"editorMode":"ace/mode/python"},"settings":{"params":{},"forms":{}},"apps":[],"jobName":"paragraph_1593438558940_25945220","id":"20200629-134918_320619032","dateCreated":"2020-06-29T13:49:18+0000","status":"READY","progressUpdateIntervalMs":500,"$$hashKey":"object:1126"},{"text":"%pyspark\n\ndef expandCollectedFrom(x):\n cf = x['collectedfrom']\n ret = []\n for i in cf:\n ret.append((i['value'],x))\n return ret\n\ndef hasDescription(x):\n if 'description' in x and not x['description'] is None:\n for dic in x['description']:\n if dic['value'] is not None and dic['value'].strip() != \"\":\n return True\n return False\n\ndef reduceSource(a,b):\n if a is None:\n return b\n if b is None:\n return a\n return a.union(b)\n\ndef relPerDs(ds, rels ):\n return ds.map(lambda x: (x['id'], x)).join(rels)\n\ndef addDsDis(rdd, ds, dic):\n print(ds + '\\n')\n ds_dic = {}\n for i in rdd.map(lambda x:(x[0],x[1][1])).collect():\n if not i[1] in ds_dic:\n ds_dic[i[1]] = 0\n ds_dic[i[1]] += 1\n dic[ds]=ds_dic\n\n","user":"miriam.baglioni","dateUpdated":"2020-06-25T12:48:50+0000","config":{"editorSetting":{"language":"python","editOnDblClick":false,"completionKey":"TAB","completionSupport":true},"colWidth":12,"editorMode":"ace/mode/python","fontSize":9,"results":{},"enabled":true},"settings":{"params":{},"forms":{}},"results":{"code":"SUCCESS","msg":[]},"apps":[],"jobName":"paragraph_1593089330226_1614165818","id":"20200525-155720_256707982","dateCreated":"2020-06-25T12:48:50+0000","status":"READY","errorMessage":"","progressUpdateIntervalMs":500,"$$hashKey":"object:1127"},{"text":"%pyspark\n#numbers for paper in the graph\n\nload_datasets = sc.textFile('/user/sandro.labruzzo/scholix/graph/dataset').map(json.loads).filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference'])\nload_publications = sc.textFile('/user/sandro.labruzzo/scholix/graph/publication').map(json.loads).filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference'])\nrelations_rdd = spark.read.parquet('/user/sandro.labruzzo/scholix/graph/relation').rdd.filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference'])\n\n#relations from publication to dataset in the graph\npubs_relation = relations_rdd.filter(lambda x: x['source'][:2] == '50' and x['target'][:2] == '60')\n\n#relation from dataset to dataset (no self loops) in the graph\ndats_relation = relations_rdd.filter(lambda x: x['source'][:2] == '60' and x['target'][:2] == '60' and x['source'] != x['target'])\n\n#distinct publication subset\npubs_subgraph = pubs_relation.map(lambda x: (x['source'],1)).reduceByKey(lambda a,b : a+b).join(load_publications.map(lambda x:(x['id'],x))).map(lambda x: x[1][1])\n#number_of_publications = pubs_subgraph.count()\n\n#distinct dataset_subset\ndats_subgraph = pubs_relation.map(lambda x:(x['target'], 1)).union(dats_relation.map(lambda x: (x['target'],1))).union(dats_relation.map(lambda x: (x['source'],1))).reduceByKey(lambda a,b : a+b).join(load_datasets.map(lambda x: (x['id'],x))).map(lambda x: x[1][1])\n#number_of_dataset = dats_subgraph.count()\n\n#publication with abstract as couple(id, p)\npubs_with_abst = pubs_subgraph.filter(hasDescription).map(lambda x:(x['id'],x))\n#number_of_publication_with_abstract = pubs_with_abst.count()\n\n#dataset with abstract as couple (id, d)\ndats_with_abst = dats_subgraph.filter(hasDescription).map(lambda x:(x['id'],x))\n#number_of_dataset_with_abstract = dats_with_abst.count()\n\n#relations from publication with abstract to dataset\nrel_pubs_dats_abst = pubs_relation.map(lambda x: (x['source'],x)).join(pubs_with_abst).map(lambda x: x[1][0]).map(lambda x: (x['target'], x)).join(load_datasets.map(lambda x: (x['id'], 1))).map(lambda x: x[1][0])\n#number_of_relation_from_pub_with_abst_to_dataset = rel_pubs_dats_abst.count()\n\n#relation from publication with abstract to dataset with abstract\nrels_pubs_dats_both_abst = rel_pubs_dats_abst.map(lambda x: (x['target'], 1)).join(dats_subgraph.filter(hasDescription).map(lambda x: (x['id'],x))).map(lambda x: x[1][0])\n\n#relation from publication with abstract to dataset without abstract\nrels_pubs_dats_no_abst = rel_pubs_dats_abst.map(lambda x: (x['target'], 1)).join(dats_subgraph.filter(lambda x: not hasDescription(x)).map(lambda x: (x['id'],x))).map(lambda x: x[1][0])\n\n","user":"miriam.baglioni","dateUpdated":"2020-06-25T12:48:50+0000","config":{"editorSetting":{"language":"python","editOnDblClick":false,"completionKey":"TAB","completionSupport":true},"colWidth":12,"editorMode":"ace/mode/python","fontSize":9,"results":{},"enabled":true},"settings":{"params":{},"forms":{}},"apps":[],"jobName":"paragraph_1593089330227_989655784","id":"20200603-104852_1876883026","dateCreated":"2020-06-25T12:48:50+0000","status":"READY","errorMessage":"","progressUpdateIntervalMs":500,"$$hashKey":"object:1128"},{"text":"%pyspark\n\n#numbers for paper in the subgraph\n\nload_datasets = sc.textFile('/user/sandro.labruzzo/scholix/graph/dataset').map(json.loads).filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference'])\nload_publications = sc.textFile('/user/sandro.labruzzo/scholix/graph/publication').map(json.loads).filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference'])\nrelations_rdd = spark.read.parquet('/user/sandro.labruzzo/scholix/graph/relation').rdd.filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference'])\n\n#relations from publication to dataset in the graph subset \npubs_relation = relations_rdd.filter(lambda x: x['source'][:2] == '50' and x['target'][:2] == '60' and x['relType'].lower() in paper_dataset_propagation)\n#relation from publication breack down for the semantics\npubs_relation.map(lambda x:(x['relType'].lower(),1)).reduceByKey(lambda a,b: a+b).collect()\n\n#relation from dataset to dataset (no self loops) in the graph subset\ndats_relation = relations_rdd.filter(lambda x: x['source'][:2] == '60' and x['target'][:2] == '60' and x['source'] != x['target'] and x['relType'].lower() in dataset_dataset_propagation)\n#relation from dataset breakdown for tje semantics\ndats_relation.map(lambda x:(x['relType'].lower(),1)).reduceByKey(lambda a,b: a+b).collect()\n\ntotal_rels = pubs_relation.union(dats_relation)\n\n#distinct publication subset\npubs_subgraph = pubs_relation.map(lambda x: (x['source'],1)).reduceByKey(lambda a,b : a+b).join(load_publications.map(lambda x:(x['id'],x))).map(lambda x: x[1][1])\n#number_of_publications = pubs_subgraph.count()\n\n#distinct dataset_subset\ndats_subgraph = pubs_relation.map(lambda x:(x['target'], 1)).union(dats_relation.map(lambda x: (x['target'],1))).union(dats_relation.map(lambda x: (x['source'],1))).reduceByKey(lambda a,b : a+b).join(load_datasets.map(lambda x: (x['id'],x))).map(lambda x: x[1][1])\n#number_of_dataset = dats_subgraph.count()\n\n#publication with abstract as couple(id, p)\npubs_with_abst = pubs_subgraph.filter(hasDescription).map(lambda x:(x['id'],x))\n#number_of_publication_with_abstract = pubs_with_abst.count()\n\n#dataset with abstract as couple (id, d)\ndats_with_abst = dats_subgraph.filter(hasDescription).map(lambda x:(x['id'],x))\n#number_of_dataset_with_abstract = dats_with_abst.count()\n\n#relations from publication with abstract to dataset\nrel_pubs_dats_abst = pubs_relation.map(lambda x: (x['source'],x)).join(pubs_with_abst).map(lambda x: x[1][0]).map(lambda x: (x['target'], x)).join(load_datasets.map(lambda x: (x['id'], 1))).map(lambda x: x[1][0])\n#number_of_relation_from_pub_with_abst_to_dataset = rel_pubs_dats_abst.count()\n\n#relation from publication with abstract to dataset with abstract\nrels_pubs_dats_both_abst = rel_pubs_dats_abst.map(lambda x: (x['target'], 1)).join(dats_subgraph.filter(hasDescription).map(lambda x: (x['id'],x))).map(lambda x: x[1][0])\n\n#relation from publication with abstract to dataset without abstract\nrels_pubs_dats_no_abst = rel_pubs_dats_abst.map(lambda x: (x['target'], 1)).join(dats_subgraph.filter(lambda x: not hasDescription(x)).map(lambda x: (x['id'],x))).map(lambda x: x[1][0])\n\n\nprint(\"Number for the whole graph: \\n number of dataset = %s, number of publications = %s, number of relations = %s\"%(str(load_datasets.count()), str(load_publications.count()), str(relations_rdd.count())))\nprint(\"relations by semantics \")\nprint(relations_rdd.map(lambda x: (x['relType'].lower(),1)).reduceByKey(lambda a,b: a+b).collect())\n\nprint(\"Numbers in the subgraph identified by the semantic relations: \\n\")\nprint(\"number of publications = %s\"%str(pubs_subgraph.count()))\nprint(\"number of datasets = %s\"%str(dats_subgraph.count()))\nprint(\"number of relations = %s \"%total_rels.count())\nprint(\"relation by semantcics : \")\nprint(total_rels.map(lambda x: (x['relType'].lower(),1)).reduceByKey(lambda a,b: a+b).collect())\nprint(\"number of relations between publications and datasets = %s\"%str(pubs_relation.count()))\nprint(\"number of relations between datasets and datasets (no loops) = %s\"%str(dats_relation.count()))\nprint(\"number of publications with abstract = %s\"%str(pubs_with_abst.count()))\nprint(\"number of datasets with abstract = %s\"%str(dats_with_abst.count()))\nprint(\"number of relation between publication with abstract to dataset = %s\"%str(rel_pubs_dats_abst.count()))\nprint(\"number of relation between publication with abstract and dataset with abstract = %s\"%str(rels_pubs_dats_both_abst.count()))\nprint(\"number of relation between publication with abstract and dataset without abstract = %s\"%str(rels_pubs_dats_no_abst.count()))\n\n#Relations per provider (general in the subgraph)\ntmp1 = dats_subgraph.flatMap(expandCollectedFrom)\n\ntmp1.map(lambda x: (x[0],1)).reduceByKey(lambda a,b : a+b).collect()\n\n#dataset in the subgraph per provider\nENA_DS = tmp1.filter(lambda x: x[0] == 'European Nucleotide Archive').map(lambda x: x[1])\nPANGAEA_DS = tmp1.filter(lambda x: x[0] == 'Pangaea').map(lambda x: x[1])\nRCSB_DS = tmp1.filter(lambda x: x[0] == 'RCSB').map(lambda x: x[1])\nOpenAIRE_DS = tmp1.filter(lambda x: x[0] == 'OpenAIRE').map(lambda x: x[1])\nSN_DS = tmp1.filter(lambda x: x[0] == 'Springer Nature').map(lambda x: x[1])\nNCBI_DS = tmp1.filter(lambda x: x[0] == 'NCBI Nucleotide').map(lambda x: x[1])\nICPSR_DS = tmp1.filter(lambda x: x[0] == 'ICPSR').map(lambda x: x[1])\nCCDC_DS = tmp1.filter(lambda x: x[0] == 'Cambridge Crystallographic Data Centre').map(lambda x: x[1])\nEVENTDATA = tmp1.filter(lambda x: x[0] == 'Crossref').map(lambda x: x[1])\nTUD_DS = tmp1.filter(lambda x: x[0] == '3TU.Datacentrum').map(lambda x: x[1])\nIEDA_DS = tmp1.filter(lambda x: x[0] == 'IEDA').map(lambda x: x[1])\nEBI_DS = tmp1.filter(lambda x: x[0] == 'Europe PMC').map(lambda x: x[1])\nTR_DS = tmp1.filter(lambda x: x[0] == 'Thomson Reuters').map(lambda x: x[1])\nANDS_DS = tmp1.filter(lambda x: x[0] == 'Australian National Data Service').map(lambda x: x[1])\nDATACITE_DS = tmp1.filter(lambda x: x[0] == 'Datasets in Datacite').map(lambda x: x[1])\n\n#number of datasets with description per provider\nabst_dic = {}\n\nabst_dic['ENA_DS'] = ENA_DS.filter(hasDescription).count() \nabst_dic['PANGAEA_DS_ABS'] = PANGAEA_DS.filter(hasDescription).count()\nabst_dic['RCSB_DS_ABS'] = RCSB_DS.filter(hasDescription).count()\n#abst_dic['OpenAIRE_DS_ABS'] = OpenAIRE_DS.filter(hasDescription).count()\n#abst_dic['SN_DS_ABS'] = SN_DS.filter(hasDescription).count()\n#abst_dic['NCBI_DS_ABS'] = NCBI_DS.filter(hasDescription).count()\nabst_dic['ICPSR_DS_ABS'] = ICPSR_DS.filter(hasDescription).count()\nabst_dic['CCDC_DS_ABS'] = CCDC_DS.filter(hasDescription).count()\n#abst_dic['EVENTDATA_ABS'] = EVENTDATA.filter(hasDescription).count()\nabst_dic['TUDCDS_ABS'] = TUD_DS.filter(hasDescription).count()\nabst_dic['IEDA_DS_ABS'] = IEDA_DS.filter(hasDescription).count()\n#abst_dic['EBI_DS_ABS'] = EBI_DS.filter(hasDescription).count()\n#abst_dic['TR_DS_ABS'] = TR_DS.filter(hasDescription).count()\nabst_dic['ANDS_DS_ABS'] = ANDS_DS.filter(hasDescription).count()\nabst_dic['DATACITE_DS_ABS'] = DATACITE_DS.filter(hasDescription).count()\n\n\n#relation publication with abstract to dataset per provider (w/wo abstract)\nnumber_of_rels_per_ds = rel_pubs_dats_abst.map(lambda x:(x['target'],1)).reduceByKey(lambda a,b : a+b)\n\nprint(\"ENA_DS_numb = %s\"%str(relPerDs(ENA_DS, number_of_rels_per_ds).count()))\nprint(\"PANGAEA_DS_numb = %s\"%str(relPerDs(PANGAEA_DS, number_of_rels_per_ds).count()))\nprint(\"RCSB_DS_numb = %s\"%str(relPerDs(RCSB_DS, number_of_rels_per_ds).count()))\nprint(\"OpenAIRE_DS_numb = %s\"%str(relPerDs(OpenAIRE_DS, number_of_rels_per_ds).count()))\nprint(\"SN_DS_numb = %s\"%str(relPerDs(SN_DS, number_of_rels_per_ds).count()))\nprint(\"NCBI_DS_numb = %s\"%str(relPerDs(NCBI_DS, number_of_rels_per_ds).count()))\nprint(\"ICPSR_DS_numb = %s\"%str(relPerDs(ICPSR_DS, number_of_rels_per_ds).count()))\nprint(\"CCDC_DS_numb = %s\"%str(relPerDs(CCDC_DS, number_of_rels_per_ds).count()))\nprint(\"EVENTDATA_numb = %s\"%str(relPerDs(EVENTDATA, number_of_rels_per_ds).count()))\nprint(\"TUD_DS_numb = %s\"%str(relPerDs(TUD_DS, number_of_rels_per_ds).count()))\nprint(\"IEDA_DS_numb = %s\"%str(relPerDs(IEDA_DS, number_of_rels_per_ds).count()))\nprint(\"EBI_DS_numb = %s\"%str(relPerDs(EBI_DS, number_of_rels_per_ds).count()))\nprint(\"TR_DS_numb = %s\"%str( relPerDs(TR_DS, number_of_rels_per_ds).count()))\nprint(\"ANDS_DS_numb =%s\"%str(relPerDs(ANDS_DS, number_of_rels_per_ds).count()))\nprint(\"DATACITE_DS_numb =%s\"%str( relPerDs(DATACITE_DS, number_of_rels_per_ds).count()))\n\n\nENA_DS_ABST_rel_per_ds = relPerDs(ENA_DS.filter(hasDescription), number_of_rels_per_ds)\nPANGAEA_DS_ABST_rel_per_ds = relPerDs(PANGAEA_DS.filter(hasDescription), number_of_rels_per_ds)\nRCSB_DS_ABST_rel_per_ds = relPerDs(RCSB_DS.filter(hasDescription), number_of_rels_per_ds)\nOpenAIRE_DS_ABST_rel_per_ds = relPerDs(OpenAIRE_DS.filter(hasDescription), number_of_rels_per_ds)\nSN_DS_ABST_rel_per_ds = relPerDs(SN_DS.filter(hasDescription), number_of_rels_per_ds)\nNCBI_DS_ABST_rel_per_ds = relPerDs(NCBI_DS.filter(hasDescription), number_of_rels_per_ds)\nICPSR_DS_ABST_rel_per_ds = relPerDs(ICPSR_DS.filter(hasDescription), number_of_rels_per_ds)\nCCDC_DS_ABST_rel_per_ds = relPerDs(CCDC_DS.filter(hasDescription), number_of_rels_per_ds)\nEVENTDATA_ABST_rel_per_ds = relPerDs(EVENTDATA.filter(hasDescription), number_of_rels_per_ds)\nTUD_DS_ABST_rel_per_ds = relPerDs(TUD_DS.filter(hasDescription), number_of_rels_per_ds)\nIEDA_DS_ABST_rel_per_ds = relPerDs(IEDA_DS.filter(hasDescription), number_of_rels_per_ds)\nEBI_DS_ABST_rel_per_ds = relPerDs(EBI_DS.filter(hasDescription), number_of_rels_per_ds)\nTR_DS_ABST_rel_per_ds = relPerDs(TR_DS.filter(hasDescription), number_of_rels_per_ds)\nANDS_DS_ABST_rel_per_ds = relPerDs(ANDS_DS.filter(hasDescription), number_of_rels_per_ds)\nDATACITE_DS_ABST_rel_per_ds = relPerDs(DATACITE_DS.filter(hasDescription), number_of_rels_per_ds)\n\ndegree_dic_ds_with_abst = {}\n\naddDsDis(ENA_DS_ABST_rel_per_ds, 'ENA_DS', degree_dic_ds_with_abst )\naddDsDis(PANGAEA_DS_ABST_rel_per_ds, 'PANGAEA_DS', degree_dic_ds_with_abst )\naddDsDis(RCSB_DS_ABST_rel_per_ds, 'RCSB_DS', degree_dic_ds_with_abst )\naddDsDis(OpenAIRE_DS_ABST_rel_per_ds, 'OpenAIRE_DS', degree_dic_ds_with_abst )\naddDsDis(SN_DS_ABST_rel_per_ds, 'SN_DS', degree_dic_ds_with_abst )\naddDsDis(NCBI_DS_ABST_rel_per_ds, 'NCBI_DS', degree_dic_ds_with_abst )\naddDsDis(ICPSR_DS_ABST_rel_per_ds, 'ICPSR_DS', degree_dic_ds_with_abst )\naddDsDis(CCDC_DS_ABST_rel_per_ds, 'CCDC_DS', degree_dic_ds_with_abst )\naddDsDis(EVENTDATA_ABST_rel_per_ds, 'EVENTDATA', degree_dic_ds_with_abst )\naddDsDis(TUD_DS_ABST_rel_per_ds, 'TUD_DS', degree_dic_ds_with_abst )\naddDsDis(IEDA_DS_ABST_rel_per_ds, 'IEDA_DS', degree_dic_ds_with_abst )\naddDsDis(EBI_DS_ABST_rel_per_ds, 'EBI_DS', degree_dic_ds_with_abst )\naddDsDis(TR_DS_ABST_rel_per_ds, 'TR_DS', degree_dic_ds_with_abst )\naddDsDis(ANDS_DS_ABST_rel_per_ds, 'ANDS_DS', degree_dic_ds_with_abst )\naddDsDis(DATACITE_DS_ABST_rel_per_ds , 'DATACITE_DS', degree_dic_ds_with_abst )\n\nENA_DS_NOABST_rel_per_ds = relPerDs(ENA_DS.filter(lambda x: not hasDescription(x)), number_of_rels_per_ds)\nPANGAEA_DS_NOABST_rel_per_ds = relPerDs(PANGAEA_DS.filter(lambda x: not hasDescription(x)), number_of_rels_per_ds)\nRCSB_DS_NOABST_rel_per_ds = relPerDs(RCSB_DS.filter(lambda x: not hasDescription(x)), number_of_rels_per_ds)\nOpenAIRE_DS_NOABST_rel_per_ds = relPerDs(OpenAIRE_DS.filter(lambda x: not hasDescription(x)), number_of_rels_per_ds)\nSN_DS_NOABST_rel_per_ds = relPerDs(SN_DS.filter(lambda x: not hasDescription(x)), number_of_rels_per_ds)\nNCBI_DS_NOABST_rel_per_ds = relPerDs(NCBI_DS.filter(lambda x: not hasDescription(x)), number_of_rels_per_ds)\nICPSR_DS_NOABST_rel_per_ds = relPerDs(ICPSR_DS.filter(lambda x: not hasDescription(x)), number_of_rels_per_ds)\nCCDC_DS_NOABST_rel_per_ds = relPerDs(CCDC_DS.filter(lambda x: not hasDescription(x)), number_of_rels_per_ds)\nEVENTDATA_NOABST_rel_per_ds = relPerDs(EVENTDATA.filter(lambda x: not hasDescription(x)), number_of_rels_per_ds)\nTUD_DS_NOABST_rel_per_ds = relPerDs(TUD_DS.filter(lambda x: not hasDescription(x)), number_of_rels_per_ds)\nIEDA_DS_NOABST_rel_per_ds = relPerDs(IEDA_DS.filter(lambda x: not hasDescription(x)), number_of_rels_per_ds)\nEBI_DS_NOABST_rel_per_ds = relPerDs(EBI_DS.filter(lambda x: not hasDescription(x)), number_of_rels_per_ds)\nTR_DS_NOABST_rel_per_ds = relPerDs(TR_DS.filter(lambda x: not hasDescription(x)), number_of_rels_per_ds)\nANDS_DS_NOABST_rel_per_ds = relPerDs(ANDS_DS.filter(lambda x: not hasDescription(x)), number_of_rels_per_ds)\nDATACITE_DS_NOABST_rel_per_ds = relPerDs(DATACITE_DS.filter(lambda x: not hasDescription(x)), number_of_rels_per_ds)\n\ndegree_dic_ds_no_abst = {}\n\naddDsDis(ENA_DS_NOABST_rel_per_ds, 'ENA_DS', degree_dic_ds_no_abst )\naddDsDis(PANGAEA_DS_NOABST_rel_per_ds, 'PANGAEA_DS', degree_dic_ds_no_abst )\naddDsDis(RCSB_DS_NOABST_rel_per_ds, 'RCSB_DS', degree_dic_ds_no_abst )\naddDsDis(OpenAIRE_DS_NOABST_rel_per_ds, 'OpenAIRE_DS', degree_dic_ds_no_abst )\naddDsDis(SN_DS_NOABST_rel_per_ds, 'SN_DS', degree_dic_ds_no_abst )\naddDsDis(NCBI_DS_NOABST_rel_per_ds, 'NCBI_DS', degree_dic_ds_no_abst )\naddDsDis(ICPSR_DS_NOABST_rel_per_ds, 'ICPSR_DS', degree_dic_ds_no_abst )\naddDsDis(CCDC_DS_NOABST_rel_per_ds, 'CCDC_DS', degree_dic_ds_no_abst )\naddDsDis(EVENTDATA_NOABST_rel_per_ds, 'EVENTDATA', degree_dic_ds_no_abst )\naddDsDis(TUD_DS_NOABST_rel_per_ds, 'TUD_DS', degree_dic_ds_no_abst )\naddDsDis(IEDA_DS_NOABST_rel_per_ds, 'IEDA_DS', degree_dic_ds_no_abst )\naddDsDis(EBI_DS_NOABST_rel_per_ds, 'EBI_DS', degree_dic_ds_no_abst )\naddDsDis(TR_DS_NOABST_rel_per_ds, 'TR_DS', degree_dic_ds_no_abst )\naddDsDis(ANDS_DS_NOABST_rel_per_ds, 'ANDS_DS', degree_dic_ds_no_abst )\naddDsDis(DATACITE_DS_NOABST_rel_per_ds , 'DATACITE_DS', degree_dic_ds_no_abst )\n\n# relation_ds_ds = rel_pubs_dats_abst.map(lambda x:(x['target'],x)).join(dats_relation.map(lambda x:(x['source'],x)))\n\n# number_of_relations_ds_ds_perds = relation_ds_ds.map(lambda x: (x[1][1]['target'],1)).reduceByKey(lambda a,b: a+b)\n\nP -> D (target) <--> D(source) -> D1(target) \n\nrelation_ds_ds_1 = rel_pubs_dats_abst.map(lambda x:(x['target'],x)).join(dats_relation.map(lambda x:(x['source'],x)))\n\nnumber_of_relations_ds_ds_perds_hop2 = relation_ds_ds_1.map(lambda x: (x[1][1]['target'],1)).reduceByKey(lambda a,b: a+b)\nrelation_ds_ds_2 = number_of_relations_ds_ds_perds_hop2.join(dats_relation.map(lambda x: (x['source'],x)))\nnumber_of_relation_ds_ds_per_ds_hop3 = relation_ds_ds_2.map(lambda x: (x[1][1]['target'],1)).reduceByKey(lambda a,b: a+b)\n\nnumber_of_relations_ds_ds_perds = number_of_relations_ds_ds_perds_hop2.union(number_of_relation_ds_ds_per_ds_hop3).reduceByKey(lambda a,b:a+b)\n\nENA_DS_rels_per_ds = relPerDs(ENA_DS, number_of_relations_ds_ds_perds)\nPANGAEA_DS_rels_per_ds = relPerDs(PANGAEA_DS, number_of_relations_ds_ds_perds)\nRCSB_DS_rels_per_ds = relPerDs(RCSB_DS, number_of_relations_ds_ds_perds) \n#OpenAIRE_DS_rels_per_ds = relPerDs(OpenAIRE_DS, number_of_relations_ds_ds_perds) \n#SN_DS_rels_per_ds = relPerDs(SN_DS, number_of_relations_ds_ds_perds) \n#NCBI_DS_rels_per_ds = relPerDs(NCBI_DS, number_of_relations_ds_ds_perds)\nICPSR_DS_rels_per_ds = relPerDs(ICPSR_DS, number_of_relations_ds_ds_perds)\nCCDC_DS_rels_per_ds = relPerDs(CCDC_DS, number_of_relations_ds_ds_perds) \n#EVENTDATA_rels_per_ds = relPerDs(EVENTDATA, number_of_relations_ds_ds_perds) \nTUD_DS_rels_per_ds = relPerDs(TUD_DS, number_of_relations_ds_ds_perds) \nIEDA_DS_rels_per_ds = relPerDs(IEDA_DS, number_of_relations_ds_ds_perds) \n#EBI_DS_rels_per_ds = relPerDs(EBI_DS, number_of_relations_ds_ds_perds) \n#TR_DS_rels_per_ds = relPerDs(TR_DS, number_of_relations_ds_ds_perds) \nANDS_DS_rels_per_ds = relPerDs(ANDS_DS, number_of_relations_ds_ds_perds) \nDATACITE_DS_rels_per_ds = relPerDs(DATACITE_DS, number_of_relations_ds_ds_perds) \n\nENA_DS_ABST_rel_per_ds = relPerDs(ENA_DS.filter(hasDescription), number_of_relations_ds_ds_perds)\nPANGAEA_DS_ABST_rel_per_ds = relPerDs(PANGAEA_DS.filter(hasDescription), number_of_relations_ds_ds_perds)\nRCSB_DS_ABST_rel_per_ds = relPerDs(RCSB_DS.filter(hasDescription), number_of_relations_ds_ds_perds)\n#OpenAIRE_DS_ABST_rel_per_ds = relPerDs(OpenAIRE_DS.filter(hasDescription), number_of_relations_ds_ds_perds)\n#SN_DS_ABST_rel_per_ds = relPerDs(SN_DS.filter(hasDescription), number_of_relations_ds_ds_perds)\n#NCBI_DS_ABST_rel_per_ds = relPerDs(NCBI_DS.filter(hasDescription), number_of_relations_ds_ds_perds)\nICPSR_DS_ABST_rel_per_ds = relPerDs(ICPSR_DS.filter(hasDescription), number_of_relations_ds_ds_perds)\nCCDC_DS_ABST_rel_per_ds = relPerDs(CCDC_DS.filter(hasDescription), number_of_relations_ds_ds_perds)\n#EVENTDATA_ABST_rel_per_ds = relPerDs(EVENTDATA.filter(hasDescription), number_of_relations_ds_ds_perds)\nTUD_DS_ABST_rel_per_ds = relPerDs(TUD_DS.filter(hasDescription), number_of_relations_ds_ds_perds)\nIEDA_DS_ABST_rel_per_ds = relPerDs(IEDA_DS.filter(hasDescription), number_of_relations_ds_ds_perds)\n#EBI_DS_ABST_rel_per_ds = relPerDs(EBI_DS.filter(hasDescription), number_of_relations_ds_ds_perds)\n#TR_DS_ABST_rel_per_ds = relPerDs(TR_DS.filter(hasDescription), number_of_relations_ds_ds_perds)\nANDS_DS_ABST_rel_per_ds = relPerDs(ANDS_DS.filter(hasDescription), number_of_relations_ds_ds_perds)\nDATACITE_DS_ABST_rel_per_ds = relPerDs(DATACITE_DS.filter(hasDescription), number_of_relations_ds_ds_perds)\n\ndegree_dic_dsds_with_abst = {}\n\naddDsDis(ENA_DS_ABST_rel_per_ds, 'ENA_DS', degree_dic_dsds_with_abst )\naddDsDis(PANGAEA_DS_ABST_rel_per_ds, 'PANGAEA_DS', degree_dic_dsds_with_abst )\naddDsDis(RCSB_DS_ABST_rel_per_ds, 'RCSB_DS', degree_dic_dsds_with_abst )\n#addDsDis(OpenAIRE_DS_ABST_rel_per_ds, 'OpenAIRE_DS', degree_dic_dsds_with_abst )\n#addDsDis(SN_DS_ABST_rel_per_ds, 'SN_DS', degree_dic_dsds_with_abst )\n#addDsDis(NCBI_DS_ABST_rel_per_ds, 'NCBI_DS', degree_dic_dsds_with_abst )\naddDsDis(ICPSR_DS_ABST_rel_per_ds, 'ICPSR_DS', degree_dic_dsds_with_abst )\naddDsDis(CCDC_DS_ABST_rel_per_ds, 'CCDC_DS', degree_dic_dsds_with_abst )\n#addDsDis(EVENTDATA_ABST_rel_per_ds, 'EVENTDATA', degree_dic_dsds_with_abst )\naddDsDis(TUD_DS_ABST_rel_per_ds, 'TUD_DS', degree_dic_dsds_with_abst )\naddDsDis(IEDA_DS_ABST_rel_per_ds, 'IEDA_DS', degree_dic_dsds_with_abst )\n#addDsDis(EBI_DS_ABST_rel_per_ds, 'EBI_DS', degree_dic_dsds_with_abst )\n#ddDsDis(TR_DS_ABST_rel_per_ds, 'TR_DS', degree_dic_dsds_with_abst )\naddDsDis(ANDS_DS_ABST_rel_per_ds, 'ANDS_DS', degree_dic_dsds_with_abst )\naddDsDis(DATACITE_DS_ABST_rel_per_ds , 'DATACITE_DS', degree_dic_dsds_with_abst )\n\nENA_DS_NOABST_rel_per_ds = relPerDs(ENA_DS.filter(lambda x: not hasDescription(x)), number_of_relations_ds_ds_perds)\nPANGAEA_DS_NOABST_rel_per_ds = relPerDs(PANGAEA_DS.filter(lambda x: not hasDescription(x)), number_of_relations_ds_ds_perds)\nRCSB_DS_NOABST_rel_per_ds = relPerDs(RCSB_DS.filter(lambda x: not hasDescription(x)), number_of_relations_ds_ds_perds)\n#OpenAIRE_DS_NOABST_rel_per_ds = relPerDs(OpenAIRE_DS.filter(lambda x: not hasDescription(x)), number_of_relations_ds_ds_perds)\n#SN_DS_NOABST_rel_per_ds = relPerDs(SN_DS.filter(lambda x: not hasDescription(x)), number_of_relations_ds_ds_perds)\n#NCBI_DS_NOABST_rel_per_ds = relPerDs(NCBI_DS.filter(lambda x: not hasDescription(x)), number_of_relations_ds_ds_perds)\nICPSR_DS_NOABST_rel_per_ds = relPerDs(ICPSR_DS.filter(lambda x: not hasDescription(x)), number_of_relations_ds_ds_perds)\nCCDC_DS_NOABST_rel_per_ds = relPerDs(CCDC_DS.filter(lambda x: not hasDescription(x)), number_of_relations_ds_ds_perds)\n#EVENTDATA_NOABST_rel_per_ds = relPerDs(EVENTDATA.filter(lambda x: not hasDescription(x)), number_of_relations_ds_ds_perds)\nTUD_DS_NOABST_rel_per_ds = relPerDs(TUD_DS.filter(lambda x: not hasDescription(x)), number_of_relations_ds_ds_perds)\nIEDA_DS_NOABST_rel_per_ds = relPerDs(IEDA_DS.filter(lambda x: not hasDescription(x)), number_of_relations_ds_ds_perds)\n#EBI_DS_NOABST_rel_per_ds = relPerDs(EBI_DS.filter(lambda x: not hasDescription(x)), number_of_relations_ds_ds_perds)\n#TR_DS_NOABST_rel_per_ds = relPerDs(TR_DS.filter(lambda x: not hasDescription(x)), number_of_relations_ds_ds_perds)\nANDS_DS_NOABST_rel_per_ds = relPerDs(ANDS_DS.filter(lambda x: not hasDescription(x)), number_of_relations_ds_ds_perds)\nDATACITE_DS_NOABST_rel_per_ds = relPerDs(DATACITE_DS.filter(lambda x: not hasDescription(x)), number_of_relations_ds_ds_perds)\n\ndegree_dic_dsds_no_abst = {}\n\naddDsDis(ENA_DS_NOABST_rel_per_ds, 'ENA_DS', degree_dic_dsds_no_abst )\naddDsDis(PANGAEA_DS_NOABST_rel_per_ds, 'PANGAEA_DS', degree_dic_dsds_no_abst )\naddDsDis(RCSB_DS_NOABST_rel_per_ds, 'RCSB_DS', degree_dic_dsds_no_abst )\n#addDsDis(OpenAIRE_DS_NOABST_rel_per_ds, 'OpenAIRE_DS', degree_dic_dsds_no_abst )\n#addDsDis(SN_DS_NOABST_rel_per_ds, 'SN_DS', degree_dic_dsds_no_abst )\n#addDsDis(NCBI_DS_NOABST_rel_per_ds, 'NCBI_DS', degree_dic_dsds_no_abst )\naddDsDis(ICPSR_DS_NOABST_rel_per_ds, 'ICPSR_DS', degree_dic_dsds_no_abst )\naddDsDis(CCDC_DS_NOABST_rel_per_ds, 'CCDC_DS', degree_dic_dsds_no_abst )\n#addDsDis(EVENTDATA_NOABST_rel_per_ds, 'EVENTDATA', degree_dic_dsds_no_abst )\naddDsDis(TUD_DS_NOABST_rel_per_ds, 'TUD_DS', degree_dic_dsds_no_abst )\naddDsDis(IEDA_DS_NOABST_rel_per_ds, 'IEDA_DS', degree_dic_dsds_no_abst )\n#addDsDis(EBI_DS_NOABST_rel_per_ds, 'EBI_DS', degree_dic_dsds_no_abst )\n#addDsDis(TR_DS_NOABST_rel_per_ds, 'TR_DS', degree_dic_dsds_no_abst )\naddDsDis(ANDS_DS_NOABST_rel_per_ds, 'ANDS_DS', degree_dic_dsds_no_abst )\naddDsDis(DATACITE_DS_NOABST_rel_per_ds , 'DATACITE_DS', degree_dic_dsds_no_abst )\n\n\n\n\n","user":"miriam.baglioni","dateUpdated":"2020-06-25T12:48:50+0000","config":{"editorSetting":{"language":"python","editOnDblClick":false,"completionKey":"TAB","completionSupport":true},"colWidth":12,"editorMode":"ace/mode/python","fontSize":9,"results":{},"enabled":true},"settings":{"params":{},"forms":{}},"apps":[],"jobName":"paragraph_1593089330228_-1076252430","id":"20200601-131822_1709231619","dateCreated":"2020-06-25T12:48:50+0000","status":"READY","errorMessage":"","progressUpdateIntervalMs":500,"$$hashKey":"object:1129"},{"text":"%pyspark\n\n\nload_datasets = sc.textFile('/user/sandro.labruzzo/scholix/graph/dataset').map(json.loads).filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference'])\nload_publications = sc.textFile('/user/sandro.labruzzo/scholix/graph/publication').map(json.loads).filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference'])\nrelations_rdd = spark.read.parquet('/user/sandro.labruzzo/scholix/graph/relation').rdd.filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference'])\n","user":"miriam.baglioni","dateUpdated":"2020-06-25T12:48:50+0000","config":{"editorSetting":{"language":"python","editOnDblClick":false,"completionKey":"TAB","completionSupport":true},"colWidth":12,"editorMode":"ace/mode/python","fontSize":9,"results":{},"enabled":true},"settings":{"params":{},"forms":{}},"results":{"code":"SUCCESS","msg":[]},"apps":[],"jobName":"paragraph_1593089330230_542410461","id":"20200604-134951_851458341","dateCreated":"2020-06-25T12:48:50+0000","status":"READY","errorMessage":"","progressUpdateIntervalMs":500,"$$hashKey":"object:1130"},{"text":"%pyspark\n\n#relations from publication to dataset in the graph subset \npubs_relation = relations_rdd.filter(lambda x: x['source'][:2] == '50' and x['target'][:2] == '60' and x['relType'].lower() in paper_dataset_propagation)\n#relation from publication breack down for the semantics\n\n#relation from dataset to dataset (no self loops) in the graph subset\ndats_relation = relations_rdd.filter(lambda x: x['source'][:2] == '60' and x['target'][:2] == '60' and x['source'] != x['target'] and x['relType'].lower() in dataset_dataset_propagation)\n\n#distinct publication subset\npubs_subgraph = pubs_relation.map(lambda x: (x['source'],1)).reduceByKey(lambda a,b : a+b).join(load_publications.map(lambda x:(x['id'],x))).map(lambda x: x[1][1])\n\n#distinct dataset_subset\ndats_subgraph = pubs_relation.map(lambda x:(x['target'], 1)).union(dats_relation.map(lambda x: (x['target'],1))).union(dats_relation.map(lambda x: (x['source'],1))).reduceByKey(lambda a,b : a+b).join(load_datasets.map(lambda x: (x['id'],x))).map(lambda x: x[1][1])\n\n#Relations per provider (general in the subgraph)\ntmp1 = dats_subgraph.flatMap(expandCollectedFrom)\n\n\n#dataset in the subgraph per provider\nENA_DS = tmp1.filter(lambda x: x[0] == 'European Nucleotide Archive').map(lambda x: x[1])\nPANGAEA_DS = tmp1.filter(lambda x: x[0] == 'Pangaea').map(lambda x: x[1])\nRCSB_DS = tmp1.filter(lambda x: x[0] == 'RCSB').map(lambda x: x[1])\n#OpenAIRE_DS = tmp1.filter(lambda x: x[0] == 'OpenAIRE').map(lambda x: x[1])\n#SN_DS = tmp1.filter(lambda x: x[0] == 'Springer Nature').map(lambda x: x[1])\n#NCBI_DS = tmp1.filter(lambda x: x[0] == 'NCBI Nucleotide').map(lambda x: x[1])\nICPSR_DS = tmp1.filter(lambda x: x[0] == 'ICPSR').map(lambda x: x[1])\nCCDC_DS = tmp1.filter(lambda x: x[0] == 'Cambridge Crystallographic Data Centre').map(lambda x: x[1])\n#EVENTDATA = tmp1.filter(lambda x: x[0] == 'Crossref').map(lambda x: x[1])\nTUD_DS = tmp1.filter(lambda x: x[0] == '3TU.Datacentrum').map(lambda x: x[1])\nIEDA_DS = tmp1.filter(lambda x: x[0] == 'IEDA').map(lambda x: x[1])\n#EBI_DS = tmp1.filter(lambda x: x[0] == 'Europe PMC').map(lambda x: x[1])\n#TR_DS = tmp1.filter(lambda x: x[0] == 'Thomson Reuters').map(lambda x: x[1])\nANDS_DS = tmp1.filter(lambda x: x[0] == 'Australian National Data Service').map(lambda x: x[1])\nDATACITE_DS = tmp1.filter(lambda x: x[0] == 'Datasets in Datacite').map(lambda x: x[1])\n","user":"miriam.baglioni","dateUpdated":"2020-06-25T12:48:50+0000","config":{"editorSetting":{"language":"python","editOnDblClick":false,"completionKey":"TAB","completionSupport":true},"colWidth":12,"editorMode":"ace/mode/python","fontSize":9,"results":{},"enabled":true},"settings":{"params":{},"forms":{}},"results":{"code":"SUCCESS","msg":[]},"apps":[],"jobName":"paragraph_1593089330231_-2027454836","id":"20200604-135024_1087660000","dateCreated":"2020-06-25T12:48:50+0000","status":"READY","errorMessage":"","progressUpdateIntervalMs":500,"$$hashKey":"object:1131"},{"text":"%pyspark\n\ndats_relation.map(lambda x: (x['target'],1)).join(ENA_DS.map(lambda x: (x['id'],1))).count()","user":"miriam.baglioni","dateUpdated":"2020-06-25T12:48:50+0000","config":{"editorSetting":{"language":"python","editOnDblClick":false,"completionKey":"TAB","completionSupport":true},"colWidth":12,"editorMode":"ace/mode/python","fontSize":9,"results":{},"enabled":true},"settings":{"params":{},"forms":{}},"results":{"code":"ERROR","msg":[{"type":"TEXT","data":"Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 661 in stage 41.0 failed 4 times, most recent failure: Lost task 661.3 in stage 41.0 (TID 5870, eos-m2-sn01.ocean.icm.edu.pl, executor 388): ExecutorLostFailure (executor 388 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 3.8 GB of 3 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:363)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:944)\n\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)\n\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat java.lang.Thread.run(Thread.java:745)\n\n(, Py4JJavaError(u'An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\\n', JavaObject id=o303), )"}]},"apps":[],"jobName":"paragraph_1593089330232_-74965536","id":"20200604-135318_52826837","dateCreated":"2020-06-25T12:48:50+0000","status":"READY","errorMessage":"","progressUpdateIntervalMs":500,"$$hashKey":"object:1132"},{"text":"%pyspark\n","user":"miriam.baglioni","dateUpdated":"2020-06-25T12:48:50+0000","config":{"colWidth":12,"fontSize":9,"enabled":true,"results":{},"editorSetting":{"language":"scala","editOnDblClick":false,"completionKey":"TAB","completionSupport":true},"editorMode":"ace/mode/scala"},"settings":{"params":{},"forms":{}},"apps":[],"jobName":"paragraph_1593089330233_-644104972","id":"20200604-135527_1516509564","dateCreated":"2020-06-25T12:48:50+0000","status":"READY","errorMessage":"","progressUpdateIntervalMs":500,"$$hashKey":"object:1133"}],"name":"ScholexplorerPropagation","id":"2FB9ZGBK4","noteParams":{},"noteForms":{},"angularObjects":{"md:shared_process":[],"spark:miriam.baglioni:":[]},"config":{"isZeppelinNotebookCronEnable":false,"looknfeel":"default","personalizedMode":"false"},"info":{}} \ No newline at end of file +{ + "paragraphs": [ + { + "text": "%pyspark\nimport json\nimport sys\nimport re\nfrom pyspark.sql.types import *\nfrom pyspark.sql import SQLContext\nfrom pyspark.sql.functions import *\nimport copy\n\nsqlContext = SQLContext(sc)\n\npaper_dataset_propagation = {\n \"documents\": {\n \"prob\": 1.0,\n \"path\":set()\n },\n \"isderivedfrom\": {\n \"prob\": 0.9,\n \"path\":set()\n },\n \"issourceof\": {\n \"prob\": 0.7,\n \"path\":set()\n },\n \"reviews\": {\n \"prob\": 0.8,\n \"path\":set()\n },\n \"references\": {\n \"prob\": 1.0,\n \"path\":set()\n },\n \"issupplementedby\": {\n \"prob\": 0.8,\n \"path\":set()\n },\n \"cites\": {\n \"prob\": 0.8,\n \"path\":set()\n }\n}\n\ndataset_dataset_propagation= {\n \"issupplementedby\": {\n \"prob\": 1.0\n },\n \"documents\": {\n \"prob\": 0.9\n },\n \"iscitedby\": {\n \"prob\": 0.9\n },\n \"haspart\": {\n \"prob\": 0.7 },\n \"isdocumentedby\": {\n \"prob\": 0.7 },\n \"continues\": {\n \"prob\": 0.8 },\n \"cites\": {\n \"prob\": 1.0 },\n \"issupplementto\": {\n \"prob\": 0.8 },\n \"isnewversionof\": {\n \"prob\": 0.9 },\n \"ispartof\": {\n \"prob\": 0.8 },\n \"references\": {\n \"prob\": 1.0 },\n \"isreferencedby\": {\n \"prob\": 0.9 },\n \"iscontinuedby\": {\n \"prob\": 0.7 },\n \"isvariantformof\": {\n \"prob\": 0.9 }\n }\n\n\n \ndef propagateDataset(x):\n propagation = copy.deepcopy(x[1][0]) #dictionary {\"publicationId\":{propagation_probabilities and path}}\n dsprob = x[1][1] #dictionary {\"datasetId\":{dataset_probabilities}}\n source = dsprob.keys().pop()\n todelpid = set()\n for pid in propagation:\n entry = propagation[pid]\n if source in propagation[pid]['path']:\n todelpid.add(pid)\n continue\n for use in entry:\n if use == 'path':\n continue\n new_p = entry[use] * dsprob[source][\"prob\"]\n if new_p > 0.3:\n entry[use] = new_p\n propagation[pid]['path'].add(x[0])\n else:\n todelpid.add(pid)\n for pid in todelpid:\n del propagation[pid]\n return (source, propagation)\n\ndef reduceRelation(a, b):\n if a is None:\n return b\n if b is None:\n return a \n for pid in b:\n if not pid in a:\n a[pid] = copy.deepcopy(b[pid])\n else:\n probabilities = b[pid]\n for prob in probabilities:\n if prob =='path':\n for e in probabilities['path']:\n a[pid]['path'].add(e)\n continue\n if prob in a[pid]:\n if a[pid][prob] < probabilities[prob]:\n a[pid][prob] = probabilities[prob]\n else:\n a[pid][prob] = probabilities[prob]\n return a \n \ndef hasDescription(x):\n if 'description' in x and not x['description'] is None:\n for dic in x['description']:\n if dic['value'] is not None and dic['value'].strip() != \"\":\n return True\n return False\n ", + "user": "miriam.baglioni", + "dateUpdated": "2020-06-29T14:55:43+0000", + "config": { + "editorSetting": { + "language": "python", + "editOnDblClick": false, + "completionKey": "TAB", + "completionSupport": true + }, + "colWidth": 12, + "editorMode": "ace/mode/python", + "fontSize": 9, + "results": {}, + "enabled": true + }, + "settings": { + "params": {}, + "forms": {} + }, + "results": { + "code": "SUCCESS", + "msg": [] + }, + "apps": [], + "jobName": "paragraph_1593089330199_-1573015420", + "id": "20200521-082800_526102814", + "dateCreated": "2020-06-25T12:48:50+0000", + "status": "READY", + "errorMessage": "", + "progressUpdateIntervalMs": 500, + "focus": true, + "$$hashKey": "object:1124" + }, + { + "text": "%pyspark\n\nload_datasets = sc.textFile('/user/sandro.labruzzo/scholix/graph/dataset').map(json.loads).filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference'])\nload_publications = sc.textFile('/user/sandro.labruzzo/scholix/graph/publication').map(json.loads).filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference'])\nrelations_rdd = spark.read.parquet('/user/sandro.labruzzo/scholix/graph/relation').rdd.filter(lambda x: x['dataInfo'] is None or not x['dataInfo']['deletedbyinference'])\n\n#relations from publication to dataset in the graph subset \npubs_relation = relations_rdd.filter(lambda x: x['source'][:2] == '50' and x['target'][:2] == '60' and x['relType'].lower() in paper_dataset_propagation)\n\n#relation from dataset to dataset (no self loops) in the graph subset\ndats_relation = relations_rdd.filter(lambda x: x['source'][:2] == '60' and x['target'][:2] == '60' and x['source'] != x['target'] and x['relType'].lower() in dataset_dataset_propagation)\n\n#distinct publication subset appearing in a relation to at least one dataset\npubs_subgraph = pubs_relation.map(lambda x: (x['source'],1)).reduceByKey(lambda a,b : a+b).join(load_publications.map(lambda x:(x['id'],x))).map(lambda x: x[1][1])\n\n#publications with abstract\npubs_with_abst = pubs_subgraph.filter(hasDescription).map(lambda x:(x['id'],x))\n\n#relations from publication with abstract to dataset\nrel_pubs_dats_abst = pubs_relation.map(lambda x: (x['source'],x)).join(pubs_with_abst).map(lambda x: x[1][0]).map(lambda x: (x['target'], x)).join(load_datasets.map(lambda x: (x['id'], 1))).map(lambda x: x[1][0])\n\n\npublication_dataset = rel_pubs_dats_abst.map(lambda x: (x['target'], {x['source']:copy.deepcopy(paper_dataset_propagation[x['relType'].lower()])}))\ndataset_dataset = dats_relation.map(lambda x: (x['source'], {x['target']:copy.deepcopy(dataset_dataset_propagation[x['relType'].lower()])}))\n\n\npl1 = publication_dataset.reduceByKey(reduceRelation)\n\npreviuos_propagation = pl1\npl1.count()\ncount = 2\nhops = 3\nwhile (True):\n if count > hops:\n break\n pl_step1 = previuos_propagation.join(dataset_dataset)\n pl_step2 = pl_step1.map(propagateDataset).filter(lambda x: len(x[1]) > 0)\n if pl_step2.count() == 0:\n break\n pl_step3 = pl_step2.reduceByKey(reduceRelation)\n current_propagation = pl_step3.union(previuos_propagation).reduceByKey(reduceRelation)\n current_propagation.count()\n count += 1\n previuos_propagation = current_propagation\n\n", + "user": "miriam.baglioni", + "dateUpdated": "2020-06-29T14:52:36+0000", + "config": { + "editorSetting": { + "language": "python", + "editOnDblClick": false, + "completionKey": "TAB", + "completionSupport": true + }, + "colWidth": 12, + "editorMode": "ace/mode/python", + "fontSize": 9, + "results": {}, + "enabled": true + }, + "settings": { + "params": {}, + "forms": {} + }, + "apps": [], + "jobName": "paragraph_1593089330225_1214619039", + "id": "20200521-084556_457403103", + "dateCreated": "2020-06-25T12:48:50+0000", + "status": "READY", + "errorMessage": "", + "progressUpdateIntervalMs": 500, + "$$hashKey": "object:1125" + }, + { + "text": "%pyspark\n", + "user": "miriam.baglioni", + "dateUpdated": "2020-06-29T14:53:46+0000", + "config": { + "colWidth": 12, + "fontSize": 9, + "enabled": true, + "results": {}, + "editorSetting": { + "language": "scala", + "editOnDblClick": false, + "completionKey": "TAB", + "completionSupport": true + }, + "editorMode": "ace/mode/scala" + }, + "settings": { + "params": {}, + "forms": {} + }, + "apps": [], + "jobName": "paragraph_1593442426323_1460687479", + "id": "20200629-145346_169818547", + "dateCreated": "2020-06-29T14:53:46+0000", + "status": "READY", + "progressUpdateIntervalMs": 500, + "focus": true, + "$$hashKey": "object:1868" + } + ], + "name": "ScholexplorerPropagation", + "id": "2FB9ZGBK4", + "noteParams": {}, + "noteForms": {}, + "angularObjects": { + "md:shared_process": [], + "spark:miriam.baglioni:": [] + }, + "config": { + "isZeppelinNotebookCronEnable": false, + "looknfeel": "default", + "personalizedMode": "false" + }, + "info": {} +} \ No newline at end of file