diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java index 0814c68..52584f9 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java @@ -155,7 +155,7 @@ public class ResultMapper implements Serializable { input .getCollectedfrom() .stream() - .map(cf -> CfHbKeyValue.newInstance(cf.getKey(), cf.getValue())) + .map(cf -> CfHbKeyValue.newInstance(cf.getKey().substring(3), cf.getValue())) .collect(Collectors.toList())); } @@ -518,11 +518,11 @@ public class ResultMapper implements Serializable { instance .setCollectedfrom( CfHbKeyValue - .newInstance(i.getCollectedfrom().getKey(), i.getCollectedfrom().getValue())); + .newInstance(i.getCollectedfrom().getKey().substring(3), i.getCollectedfrom().getValue())); instance .setHostedby( - CfHbKeyValue.newInstance(i.getHostedby().getKey(), i.getHostedby().getValue())); + CfHbKeyValue.newInstance(i.getHostedby().getKey().substring(3), i.getHostedby().getValue())); return instance; diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java index dca42f4..b3f380e 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java @@ -63,7 +63,7 @@ public class Utils { return String .format( - "%s|%s::%s", Constants.CONTEXT_ID, Constants.CONTEXT_NS_PREFIX, + "%s::%s", Constants.CONTEXT_ID, Constants.CONTEXT_NS_PREFIX, DHPUtils.md5(id)); } diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkPrepareResultProject.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkPrepareResultProject.java index 079e708..8c4faba 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkPrepareResultProject.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SparkPrepareResultProject.java @@ -61,6 +61,13 @@ public class SparkPrepareResultProject implements Serializable { .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + Boolean substring = Optional + .ofNullable(parser.get("substring")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + final String inputPath = parser.get("sourcePath"); log.info("inputPath: {}", inputPath); @@ -74,11 +81,12 @@ public class SparkPrepareResultProject implements Serializable { isSparkSessionManaged, spark -> { Utils.removeOutputDir(spark, outputPath); - prepareResultProjectList(spark, inputPath, outputPath); + prepareResultProjectList(spark, inputPath, outputPath, substring); }); } - private static void prepareResultProjectList(SparkSession spark, String inputPath, String outputPath) { + private static void prepareResultProjectList(SparkSession spark, String inputPath, String outputPath, + Boolean substring) { Dataset relation = Utils .readPath(spark, inputPath + "/relation", Relation.class) .filter( @@ -101,7 +109,10 @@ public class SparkPrepareResultProject implements Serializable { Set projectSet = new HashSet<>(); Tuple2 first = it.next(); ResultProject rp = new ResultProject(); - rp.setResultId(s); + if (substring) + rp.setResultId(s.substring(3)); + else + rp.setResultId(s); eu.dnetlib.dhp.schema.oaf.Project p = first._1(); projectSet.add(p.getId()); Project ps = getProject(p, first._2); @@ -131,7 +142,7 @@ public class SparkPrepareResultProject implements Serializable { private static Project getProject(eu.dnetlib.dhp.schema.oaf.Project op, Relation relation) { Project p = Project .newInstance( - op.getId(), + op.getId().substring(3), op.getCode().getValue(), Optional .ofNullable(op.getAcronym()) diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/Extractor.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/Extractor.java index fd2712f..8315808 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/Extractor.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/Extractor.java @@ -84,7 +84,7 @@ public class Extractor implements Serializable { .orElse(null)) .orElse(null); Relation r = getRelation( - value.getId().substring(3), contextId.substring(3), + value.getId().substring(3), contextId, Constants.RESULT_ENTITY, Constants.CONTEXT_ENTITY, ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP, provenance); @@ -94,7 +94,7 @@ public class Extractor implements Serializable { hashCodes.add(r.hashCode()); } r = getRelation( - contextId.substring(3), value.getId().substring(3), + contextId, value.getId().substring(3), Constants.CONTEXT_ENTITY, Constants.RESULT_ENTITY, ModelConstants.IS_RELATED_TO, diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/Process.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/Process.java index 55390c6..30fdb96 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/Process.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/Process.java @@ -55,8 +55,8 @@ public class Process implements Serializable { ci .getDatasourceList() .forEach(ds -> { - String nodeType = ModelSupport.idPrefixEntity.get(ds.substring(0, 2)); + String datasourceId = ds.startsWith("10|") || ds.startsWith("40|") ? ds.substring(3) : ds; String contextId = Utils.getContextId(ci.getId()); relationList @@ -65,7 +65,7 @@ public class Process implements Serializable { .newInstance( contextId, eu.dnetlib.dhp.oa.model.graph.Constants.CONTEXT_ENTITY, - ds, nodeType, + datasourceId, nodeType, RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP), Provenance .newInstance( @@ -76,7 +76,7 @@ public class Process implements Serializable { .add( Relation .newInstance( - ds, nodeType, + datasourceId, nodeType, contextId, eu.dnetlib.dhp.oa.model.graph.Constants.CONTEXT_ENTITY, RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP), Provenance diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkCollectAndSave.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkCollectAndSave.java index f6f7c9f..1335c68 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkCollectAndSave.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkCollectAndSave.java @@ -105,7 +105,7 @@ public class SparkCollectAndSave implements Serializable { } -// Dataset dumpedIds = Utils.getEntitiesId(spark, outputPath); + Dataset dumpedIds = Utils.getEntitiesId(spark, outputPath); Dataset relations = Utils .readPath(spark, inputPath + "/relation/publication", Relation.class) @@ -116,16 +116,18 @@ public class SparkCollectAndSave implements Serializable { .union(Utils.readPath(spark, inputPath + "/relation/context", Relation.class)) .union(Utils.readPath(spark, inputPath + "/relation/relation", Relation.class)); - Utils.getValidRelations(relations, Utils.getEntitiesId(spark, outputPath)) -// Dataset relJoinSource = relations -// .joinWith(dumpedIds, relations.col("source.id").equalTo(dumpedIds.col("value"))) -// .map((MapFunction, Relation>) t2 -> t2._1(), -// Encoders.bean(Relation.class)); -// -// relJoinSource -// .joinWith(dumpedIds, relJoinSource.col("target.id").equalTo(dumpedIds.col("value"))) -// .map((MapFunction, Relation>) t2 -> t2._1(), -// Encoders.bean(Relation.class)) +// Utils.getValidRelations(relations, Utils.getEntitiesId(spark, outputPath)) + Dataset relJoinSource = relations + .joinWith(dumpedIds, relations.col("source").equalTo(dumpedIds.col("value"))) + .map( + (MapFunction, Relation>) t2 -> t2._1(), + Encoders.bean(Relation.class)); + + relJoinSource + .joinWith(dumpedIds, relJoinSource.col("target").equalTo(dumpedIds.col("value"))) + .map( + (MapFunction, Relation>) t2 -> t2._1(), + Encoders.bean(Relation.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkDumpRelationJob.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkDumpRelationJob.java index 8aeb72d..5c84c55 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkDumpRelationJob.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkDumpRelationJob.java @@ -79,7 +79,9 @@ public class SparkDumpRelationJob implements Serializable { private static void dumpRelation(SparkSession spark, String inputPath, String outputPath, Set removeSet) { Dataset relations = Utils.readPath(spark, inputPath, Relation.class); relations - .filter((FilterFunction) r -> !removeSet.contains(r.getRelClass())) + .filter( + (FilterFunction) r -> !removeSet.contains(r.getRelClass()) + && !r.getSubRelType().equalsIgnoreCase("resultService")) .map((MapFunction) relation -> { eu.dnetlib.dhp.oa.model.graph.Relation relNew = new eu.dnetlib.dhp.oa.model.graph.Relation(); relNew diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkOrganizationRelation.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkOrganizationRelation.java index 473633b..527e324 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkOrganizationRelation.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkOrganizationRelation.java @@ -155,7 +155,7 @@ public class SparkOrganizationRelation implements Serializable { eu.dnetlib.dhp.oa.model.graph.Relation .newInstance( id, Constants.CONTEXT_ENTITY, - organization, + organization.substring(3), ModelSupport.idPrefixEntity.get(organization.substring(0, 2)), RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP), Provenance @@ -167,7 +167,7 @@ public class SparkOrganizationRelation implements Serializable { .add( eu.dnetlib.dhp.oa.model.graph.Relation .newInstance( - organization, ModelSupport.idPrefixEntity.get(organization.substring(0, 2)), + organization.substring(3), ModelSupport.idPrefixEntity.get(organization.substring(0, 2)), id, Constants.CONTEXT_ENTITY, RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP), Provenance diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/project_prep_parameters.json b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/project_prep_parameters.json index 82714d9..a4ebd34 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/project_prep_parameters.json +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/project_prep_parameters.json @@ -16,5 +16,11 @@ "paramLongName": "isSparkSessionManaged", "paramDescription": "true if the spark session is managed, false otherwise", "paramRequired": false + }, + { + "paramName": "sb", + "paramLongName": "substring", + "paramDescription": "true if the spark session is managed, false otherwise", + "paramRequired": false } ] \ No newline at end of file diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/funder/oozie_app/workflow.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/funder/oozie_app/workflow.xml index 75124cf..d755629 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/funder/oozie_app/workflow.xml +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/funder/oozie_app/workflow.xml @@ -102,6 +102,7 @@ --sourcePath${sourcePath} --outputPath${workingDir}/preparedInfo + --substringfalse