This commit is contained in:
Miriam Baglioni 2020-02-19 18:03:13 +01:00
parent d0279af630
commit a153a07997
1 changed files with 62 additions and 65 deletions

View File

@ -51,83 +51,80 @@ public class SparkResultToCommunityThroughSemRelJob {
JavaPairRDD<String, TypedRow> rel_datasource_organization = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) JavaRDD<Relation> relations = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class)) .map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class)).cache();
JavaPairRDD<String, TypedRow> result_result = relations
.filter(r -> !r.getDataInfo().getDeletedbyinference()) .filter(r -> !r.getDataInfo().getDeletedbyinference())
.filter(r -> RELATION_DATASOURCEORGANIZATION_REL_TYPE.equals(r.getRelClass()) && RELATION_DATASOURCE_ORGANIZATION_REL_CLASS.equals(r.getRelType())) .filter(r -> allowedsemrel.contains(r.getRelClass()) && RELATION_RESULTRESULT_REL_TYPE.equals(r.getRelType()))
.map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget())) .map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget()))
.mapToPair(toPair()); .mapToPair(toPair());
JavaPairRDD<String, TypedRow> instdatasource_organization = datasources.join(rel_datasource_organization) JavaPairRDD<String, TypedRow> result_project = relations
.map(x -> x._2()._2()) .filter(r -> !r.getDataInfo().getDeletedbyinference())
.filter(r -> RELATION_RESULT_PROJECT_REL_CLASS.equals(r.getRelClass()) && RELATION_RESULTPROJECT_REL_TYPE.equals(r.getRelType()))
.map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget()))
.mapToPair(toPair()); .mapToPair(toPair());
JavaRDD<Relation> relations = sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) //relationships from project to result. One pair for each relationship for results having allowed semantics relation with another result
.map(item -> new ObjectMapper().readValue(item._2().toString(), Relation.class)); JavaPairRDD<String, TypedRow> project_result = result_project.join(result_result)
JavaRDD<Publication> publications = sc.sequenceFile(inputPath + "/publication", Text.class, Text.class) .map(c -> {
.map(item -> new ObjectMapper().readValue(item._2().toString(), Publication.class)); String projectId = c._2()._1().getTargetId();
JavaRDD<Dataset> datasets = sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class) String resultId = c._2()._2().getTargetId();
.map(item -> new ObjectMapper().readValue(item._2().toString(), Dataset.class)); return new TypedRow().setSourceId(projectId).setTargetId(resultId);
JavaRDD<Software> software = sc.sequenceFile(inputPath + "/software", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), Software.class));
JavaRDD<OtherResearchProduct> other = sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class)
.map(item -> new ObjectMapper().readValue(item._2().toString(), OtherResearchProduct.class));
JavaPairRDD<String, TypedRow> datasource_results = publications
.map(oaf -> getTypedRowsDatasourceResult(oaf))
.flatMapToPair(f -> {
ArrayList<Tuple2<String, TypedRow>> ret = new ArrayList<>();
for (TypedRow t : f) {
ret.add(new Tuple2<>(t.getSourceId(), t));
}
return ret.iterator();
}) })
.union(datasets .mapToPair(toPair());
.map(oaf -> getTypedRowsDatasourceResult(oaf))
.flatMapToPair(f -> {
ArrayList<Tuple2<String, TypedRow>> ret = new ArrayList<>();
for (TypedRow t : f) {
ret.add(new Tuple2<>(t.getSourceId(), t));
}
return ret.iterator();
}))
.union(software
.map(oaf -> getTypedRowsDatasourceResult(oaf))
.flatMapToPair(f -> {
ArrayList<Tuple2<String, TypedRow>> ret = new ArrayList<>();
for (TypedRow t : f) {
ret.add(new Tuple2<>(t.getSourceId(), t));
}
return ret.iterator();
}))
.union(other
.map(oaf -> getTypedRowsDatasourceResult(oaf))
.flatMapToPair(f -> {
ArrayList<Tuple2<String, TypedRow>> ret = new ArrayList<>();
for (TypedRow t : f) {
ret.add(new Tuple2<>(t.getSourceId(), t));
}
return ret.iterator();
}));
JavaRDD<Relation> newRels = instdatasource_organization.join(datasource_results) //relationships from project to result. One Pair for each project => project id list of results related to the project
.flatMap(c -> { JavaPairRDD<String, TypedRow> project_results = relations
List<Relation> rels = new ArrayList(); .filter(r -> !r.getDataInfo().getDeletedbyinference())
String orgId = c._2()._1().getTargetId(); .filter(r -> RELATION_PROJECT_RESULT_REL_CLASS.equals(r.getRelClass()) && RELATION_RESULTPROJECT_REL_TYPE.equals(r.getRelType()))
String resId = c._2()._2().getTargetId(); .map(r -> new TypedRow().setSourceId(r.getSource()).setTargetId(r.getTarget()))
rels.add(getRelation(orgId, resId, RELATION_ORGANIZATION_RESULT_REL_CLASS, .mapToPair(toPair())
RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, .reduceByKey((a, b) -> {
PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); if (a == null) {
rels.add(getRelation(resId, orgId, RELATION_RESULT_ORGANIZATION_REL_CLASS, return b;
RELATION_RESULTORGANIZATION_REL_TYPE, RELATION_RESULTORGANIZATION_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE, }
PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID, PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME)); if (b == null) {
return rels.iterator(); return a;
}
a.addAll(b.getAccumulator());
return a;
}); });
JavaRDD<Relation> newRels = project_result.join(project_results)
.flatMap(c -> {
String resId = c._2()._1().getTargetId();
if (c._2()._2().getAccumulator().contains(resId)) {
return null;
}
String progId = c._2()._1().getSourceId();
List<Relation> rels = new ArrayList();
rels.add(getRelation(progId, resId, RELATION_PROJECT_RESULT_REL_CLASS,
RELATION_RESULTPROJECT_REL_TYPE, RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME));
rels.add(getRelation(resId, progId, RELATION_RESULT_PROJECT_REL_CLASS,
RELATION_RESULTPROJECT_REL_TYPE, RELATION_RESULTPROJECT_SUBREL_TYPE, PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID, PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME));
return rels.iterator();
})
.cache();
newRels.map(p -> new ObjectMapper().writeValueAsString(p)) newRels.map(p -> new ObjectMapper().writeValueAsString(p))
.saveAsTextFile(outputPath + "/relation_new"); .saveAsTextFile(outputPath + "/relation_new");
newRels.union(relations).map(p -> new ObjectMapper().writeValueAsString(p)) newRels.union(relations).map(p -> new ObjectMapper().writeValueAsString(p))
.saveAsTextFile(outputPath + "/relation");*/ .saveAsTextFile(outputPath + "/relation");
}
}
*/
} }
} }
/* /*