diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/communitytoresultthroughsemrel/SparkResultToCommunityThroughSemRelJob.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/communitytoresultthroughsemrel/SparkResultToCommunityThroughSemRelJob.java index b99d9098f..b2178f1fb 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/communitytoresultthroughsemrel/SparkResultToCommunityThroughSemRelJob.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/communitytoresultthroughsemrel/SparkResultToCommunityThroughSemRelJob.java @@ -48,86 +48,83 @@ public class SparkResultToCommunityThroughSemRelJob { directory.mkdirs(); } /* - - JavaPairRDD rel_datasource_organization = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class)) + + JavaRDD relations = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) + .map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class)).cache(); + + JavaPairRDD result_result = relations .filter(r -> !r.getDataInfo().getDeletedbyinference()) - .filter(r -> RELATION_DATASOURCEORGANIZATION_REL_TYPE.equals(r.getRelClass()) && RELATION_DATASOURCE_ORGANIZATION_REL_CLASS.equals(r.getRelType())) + .filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType())) .map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget())) .mapToPair(toPair()); - JavaPairRDD instdatasource_organization = datasources.join(rel_datasource_organization) - .map(x -> x._2()._2()) + JavaPairRDD result_project = relations + .filter(r -> !r.getDataInfo().getDeletedbyinference()) + .filter(r -> RELATION_RESULT_PROJECT_REL_CLASS.equals(r.getRelClass()) && RELATION_RESULTPROJECT_REL_TYPE.equals(r.getRelType())) + .map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget())) .mapToPair(toPair()); - JavaRDD relations = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class)); - JavaRDD publications = sc.sequenceFile(inputPath + "/publication", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class)); - JavaRDD datasets = sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class)); - JavaRDD software = sc.sequenceFile(inputPath + "/software", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class)); - JavaRDD other = sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class) - .map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class)); - - JavaPairRDD datasource_results = publications - .map(oaf -> getTypedRowsDatasourceResult(oaf)) - .flatMapToPair(f -> { - ArrayList> ret = new ArrayList<>(); - for (TypedRow t : f) { - ret.add(new Tuple2<>(t.getSourceId(), t)); - } - return ret.iterator(); + //relationships from project to result. One pair for each relationship for results having allowed semantics relation with another result + JavaPairRDD project_result = result_project.join(result_result) + .map(c -> { + String projectId = c._2()._1().getTargetId(); + String resultId = c._2()._2().getTargetId(); + return new TypedRow().setSourceId(projectId).setTargetId(resultId); }) - .union(datasets - .map(oaf -> getTypedRowsDatasourceResult(oaf)) - .flatMapToPair(f -> { - ArrayList> ret = new ArrayList<>(); - for (TypedRow t : f) { - ret.add(new Tuple2<>(t.getSourceId(), t)); - } - return ret.iterator(); - })) - .union(software - .map(oaf -> getTypedRowsDatasourceResult(oaf)) - .flatMapToPair(f -> { - ArrayList> ret = new ArrayList<>(); - for (TypedRow t : f) { - ret.add(new Tuple2<>(t.getSourceId(), t)); - } - return ret.iterator(); - })) - .union(other - .map(oaf -> getTypedRowsDatasourceResult(oaf)) - .flatMapToPair(f -> { - ArrayList> ret = new ArrayList<>(); - for (TypedRow t : f) { - ret.add(new Tuple2<>(t.getSourceId(), t)); - } - return ret.iterator(); - })); + .mapToPair(toPair()); - JavaRDD newRels = instdatasource_organization.join(datasource_results) - .flatMap(c -> { - List rels = new ArrayList(); - String orgId = c._2()._1().getTargetId(); - String resId = c._2()._2().getTargetId(); - rels.add(getRelation(orgId, resId, RELATION_ORGANIZATION_RESULT_REL_CLASS, - RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); - rels.add(getRelation(resId, orgId, RELATION_RESULT_ORGANIZATION_REL_CLASS, - RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); - return rels.iterator(); + //relationships from project to result. One Pair for each project => project id list of results related to the project + JavaPairRDD project_results = relations + .filter(r -> !r.getDataInfo().getDeletedbyinference()) + .filter(r -> RELATION_PROJECT_RESULT_REL_CLASS.equals(r.getRelClass()) && RELATION_RESULTPROJECT_REL_TYPE.equals(r.getRelType())) + .map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget())) + .mapToPair(toPair()) + .reduceByKey((a, b) -> { + if (a == null) { + return b; + } + if (b == null) { + return a; + } + a.addAll(b.getAccumulator()); + return a; }); + + + + JavaRDD newRels = project_result.join(project_results) + .flatMap(c -> { + String resId = c._2()._1().getTargetId(); + + if (c._2()._2().getAccumulator().contains(resId)) { + return null; + } + String progId = c._2()._1().getSourceId(); + List rels = new ArrayList(); + + rels.add(getRelation(progId, resId, RELATION_PROJECT_RESULT_REL_CLASS, + RELATION_RESULTPROJECT_REL_TYPE, RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); + rels.add(getRelation(resId, progId, RELATION_RESULT_PROJECT_REL_CLASS, + RELATION_RESULTPROJECT_REL_TYPE, RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME)); + return rels.iterator(); + }) + .cache(); + newRels.map(p -> new ObjectMapper().writeValueAsString(p)) .saveAsTextFile(outputPath + "/relation_new"); newRels.union(relations).map(p -> new ObjectMapper().writeValueAsString(p)) - .saveAsTextFile(outputPath + "/relation");*/ + .saveAsTextFile(outputPath + "/relation"); + + } + + +} +*/ } } /*