From ccba1a3db1f5dfd99743e4aee957a2e5553f4a83 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 21 Apr 2022 13:00:06 +0200 Subject: [PATCH 1/5] [Clean Context] added logic to cleaning workflow to accomodate also context cleaning --- .../dhp/oa/graph/clean/oozie_app/workflow.xml | 22 ++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml index dc0529012..58ad6f1f5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml @@ -13,6 +13,10 @@ isLookupUrl the address of the lookUp service + + shouldCleanContext + false + sparkDriverMemory @@ -275,7 +279,23 @@ - + + + + + ${wf:conf('shouldCleanContext') eq true} + + + + + + + + + + + + \ No newline at end of file From 5b7d9e741c2a77d408cc0db3ef9d357c4c27fe96 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 21 Apr 2022 13:02:14 +0200 Subject: [PATCH 2/5] [Clean Context] added logic to cleaning workflow to accomodate also context cleaning --- .../dhp/oa/graph/clean/oozie_app/workflow.xml | 108 ++++++++++++++++++ 1 file changed, 108 insertions(+) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml index 58ad6f1f5..12ba3e5a7 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml @@ -295,7 +295,115 @@ + + + yarn + cluster + Clean publications context + eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${inputPath}/publication + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication + --workingPath${workingDir}/working + --contextId${contextId} + --verifyParam${verifyParam} + + + + + + + yarn + cluster + Clean datasets Context + eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${graphInputPath}/dataset + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset + --workingPath${workingDir}/working + --contextId${contextId} + --verifyParam${verifyParam} + + + + + + + yarn + cluster + Clean otherresearchproducts context + eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${graphInputPath}/otherresearchproduct + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct + --workingPath${workingDir}/working + --contextId${contextId} + --verifyParam${verifyParam} + + + + + + + + yarn + cluster + Clean softwares context + eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --inputPath${graphInputPath}/software + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software + --workingPath${workingDir}/working + --contextId${contextId} + --verifyParam${verifyParam} + + + + + + + \ No newline at end of file From 9a961a0092d2de554c197934cb317beb5bb0335f Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 21 Apr 2022 15:12:24 +0200 Subject: [PATCH 3/5] [Clean Context] fixed issue in param name --- .../oa/graph/clean/CleanContextSparkJob.java | 181 +++++---- .../dhp/oa/graph/clean/oozie_app/workflow.xml | 20 +- .../dhp/oa/graph/clean/CleanContextTest.java | 375 ++++++++++++------ 3 files changed, 367 insertions(+), 209 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextSparkJob.java index b20dcb67b..3e9b17f3f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextSparkJob.java @@ -1,16 +1,13 @@ + package eu.dnetlib.dhp.oa.graph.clean; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.Context; -import eu.dnetlib.dhp.schema.oaf.Oaf; -import eu.dnetlib.dhp.schema.oaf.OafEntity; -import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; @@ -21,93 +18,113 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Serializable; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; +import com.fasterxml.jackson.databind.ObjectMapper; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Context; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; public class CleanContextSparkJob implements Serializable { - private static final Logger log = LoggerFactory.getLogger(CleanContextSparkJob.class); + private static final Logger log = LoggerFactory.getLogger(CleanContextSparkJob.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(String[] args) throws Exception { + public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - CleanContextSparkJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/input_clean_context_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + String jsonConfiguration = IOUtils + .toString( + CleanContextSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/input_clean_context_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - String inputPath = parser.get("inputPath"); - log.info("inputPath: {}", inputPath); + String inputPath = parser.get("inputPath"); + log.info("inputPath: {}", inputPath); - String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + String workingPath = parser.get("workingPath"); + log.info("workingPath: {}", workingPath); - String contextId = parser.get("contextId"); - log.info("contextId: {}", contextId); + String contextId = parser.get("contextId"); + log.info("contextId: {}", contextId); - String verifyParam = parser.get("verifyParam"); - log.info("verifyParam: {}", verifyParam); + String verifyParam = parser.get("verifyParam"); + log.info("verifyParam: {}", verifyParam); + String graphTableClassName = parser.get("graphTableClassName"); + log.info("graphTableClassName: {}", graphTableClassName); - String graphTableClassName = parser.get("graphTableClassName"); - log.info("graphTableClassName: {}", graphTableClassName); + Class entityClazz = (Class) Class.forName(graphTableClassName); - Class entityClazz = (Class) Class.forName(graphTableClassName); + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + cleanContext(spark, contextId, verifyParam, inputPath, entityClazz, workingPath); + }); + } - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { + private static void cleanContext(SparkSession spark, String contextId, String verifyParam, + String inputPath, Class entityClazz, String workingPath) { + Dataset res = spark + .read() + .textFile(inputPath) + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), + Encoders.bean(entityClazz)); - cleanContext(spark, contextId, verifyParam, inputPath, entityClazz, workingPath); - }); - } + res.map((MapFunction) r -> { + if (!r + .getTitle() + .stream() + .filter( + t -> t + .getQualifier() + .getClassid() + .equalsIgnoreCase(ModelConstants.MAIN_TITLE_QUALIFIER.getClassid())) + .anyMatch(t -> t.getValue().toLowerCase().startsWith(verifyParam.toLowerCase()))) { + return r; + } + r + .setContext( + r + .getContext() + .stream() + .filter( + c -> !c.getId().split("::")[0] + .equalsIgnoreCase(contextId)) + .collect(Collectors.toList())); + return r; + }, Encoders.bean(entityClazz)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingPath); - private static void cleanContext(SparkSession spark, String contextId, String verifyParam, String inputPath, Class entityClazz, String workingPath) { - Dataset res = spark - .read() - .textFile(inputPath) - .map( - (MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), - Encoders.bean(entityClazz)); - - res.map((MapFunction) r -> { - if(!r.getTitle() - .stream() - .filter(t -> t.getQualifier().getClassid() - .equalsIgnoreCase(ModelConstants.MAIN_TITLE_QUALIFIER.getClassid())) - .anyMatch(t -> t.getValue().toLowerCase().startsWith(verifyParam.toLowerCase()))){ - return r; - } - r.setContext(r.getContext().stream().filter(c -> !c.getId().split("::")[0] - .equalsIgnoreCase(contextId)).collect(Collectors.toList())); - return r; - } ,Encoders.bean(entityClazz)) - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(workingPath); - - spark.read().textFile(workingPath).map((MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), - Encoders.bean(entityClazz)) - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(inputPath); - } + spark + .read() + .textFile(workingPath) + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), + Encoders.bean(entityClazz)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(inputPath); + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml index 12ba3e5a7..3d4d60d73 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml @@ -15,7 +15,7 @@ shouldCleanContext - false + true if the context have to be cleaned @@ -312,9 +312,9 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${inputPath}/publication + --inputPath${graphOutputPath}/publication --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication - --workingPath${workingDir}/working + --workingPath${workingDir}/working/publication --contextId${contextId} --verifyParam${verifyParam} @@ -339,9 +339,9 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/dataset + --inputPath${graphOutputPath}/dataset --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset - --workingPath${workingDir}/working + --workingPath${workingDir}/working/dataset --contextId${contextId} --verifyParam${verifyParam} @@ -366,9 +366,9 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/otherresearchproduct + --inputPath${graphOutputPath}/otherresearchproduct --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct - --workingPath${workingDir}/working + --workingPath${workingDir}/working/otherresearchproduct --contextId${contextId} --verifyParam${verifyParam} @@ -393,9 +393,9 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/software + --inputPath${graphOutputPath}/software --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software - --workingPath${workingDir}/working + --workingPath${workingDir}/working/software --contextId${contextId} --verifyParam${verifyParam} @@ -404,6 +404,6 @@ - + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextTest.java index bba814346..472d3781d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextTest.java @@ -1,18 +1,12 @@ + package eu.dnetlib.dhp.oa.graph.clean; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.oa.graph.dump.Constants; -import eu.dnetlib.dhp.oa.graph.dump.DumpJobTest; -import eu.dnetlib.dhp.oa.graph.dump.DumpProducts; -import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.dump.oaf.Instance; -import eu.dnetlib.dhp.schema.dump.oaf.OpenAccessRoute; -import eu.dnetlib.dhp.schema.dump.oaf.graph.GraphResult; -import eu.dnetlib.dhp.schema.oaf.Publication; -import eu.dnetlib.dhp.schema.oaf.Software; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Locale; + import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -27,133 +21,280 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.List; -import java.util.Locale; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.oa.graph.dump.Constants; +import eu.dnetlib.dhp.oa.graph.dump.DumpJobTest; +import eu.dnetlib.dhp.oa.graph.dump.DumpProducts; +import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.dump.oaf.Instance; +import eu.dnetlib.dhp.schema.dump.oaf.OpenAccessRoute; +import eu.dnetlib.dhp.schema.dump.oaf.graph.GraphResult; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Software; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; public class CleanContextTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static SparkSession spark; + private static SparkSession spark; - private static Path workingDir; + private static Path workingDir; - private static final Logger log = LoggerFactory.getLogger(CleanContextTest.class); + private static final Logger log = LoggerFactory.getLogger(CleanContextTest.class); - @BeforeAll - public static void beforeAll() throws IOException { - workingDir = Files.createTempDirectory(DumpJobTest.class.getSimpleName()); - log.info("using work dir {}", workingDir); + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(DumpJobTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); - SparkConf conf = new SparkConf(); - conf.setAppName(DumpJobTest.class.getSimpleName()); + SparkConf conf = new SparkConf(); + conf.setAppName(DumpJobTest.class.getSimpleName()); - conf.setMaster("local[*]"); - conf.set("spark.driver.host", "localhost"); - conf.set("hive.metastore.local", "true"); - conf.set("spark.ui.enabled", "false"); - conf.set("spark.sql.warehouse.dir", workingDir.toString()); - conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); - spark = SparkSession - .builder() - .appName(DumpJobTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); - } + spark = SparkSession + .builder() + .appName(DumpJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } - @AfterAll - public static void afterAll() throws IOException { - FileUtils.deleteDirectory(workingDir.toFile()); - spark.stop(); - } + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } - @Test - public void testResultClean() throws Exception { - final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/clean/publication_clean_context.json") - .getPath(); - final String prefix = "gcube "; + @Test + public void testResultClean() throws Exception { + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/clean/publication_clean_context.json") + .getPath(); + final String prefix = "gcube "; + spark + .read() + .textFile(sourcePath) + .map( + (MapFunction) r -> OBJECT_MAPPER.readValue(r, Publication.class), + Encoders.bean(Publication.class)) + .write() + .json(workingDir.toString() + "/publication"); - spark.read().textFile(sourcePath).map((MapFunction) r -> OBJECT_MAPPER.readValue(r, Publication.class), Encoders.bean(Publication.class)) - .write().json(workingDir.toString() + "/publication"); + CleanContextSparkJob.main(new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--inputPath", workingDir.toString() + "/publication", + "-graphTableClassName", Publication.class.getCanonicalName(), + "-workingPath", workingDir.toString() + "/working", + "-contextId", "sobigdata", + "-verifyParam", "gCube " + }); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/publication") + .map(item -> OBJECT_MAPPER.readValue(item, Publication.class)); - CleanContextSparkJob.main(new String[] { - "--isSparkSessionManaged", Boolean.FALSE.toString(), - "--inputPath", workingDir.toString() + "/publication", - "-graphTableClassName", Publication.class.getCanonicalName(), - "-workingPath", workingDir.toString() + "/working", - "-contextId","sobigdata", - "-verifyParam","gCube " - }); + Assertions.assertEquals(7, tmp.count()); - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD tmp = sc - .textFile(workingDir.toString() + "/publication") - .map(item -> OBJECT_MAPPER.readValue(item, Publication.class)); + // original result with sobigdata context and gcube as starting string in the main title for the publication + Assertions + .assertEquals( + 0, + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6439c7a95")) + .collect() + .get(0) + .getContext() + .size()); - Assertions.assertEquals(7, tmp.count()); + // original result with sobigdata context without gcube as starting string in the main title for the publication + Assertions + .assertEquals( + 1, + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9")) + .collect() + .get(0) + .getContext() + .size()); + Assertions + .assertEquals( + "sobigdata::projects::2", + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9")) + .collect() + .get(0) + .getContext() + .get(0) + .getId()); - //original result with sobigdata context and gcube as starting string in the main title for the publication - Assertions.assertEquals(0, - tmp.filter(p->p.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6439c7a95")).collect().get(0).getContext().size()); + // original result with sobigdata context with gcube as starting string in the subtitle + Assertions + .assertEquals( + 1, + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af")) + .collect() + .get(0) + .getContext() + .size()); + Assertions + .assertEquals( + "sobigdata::projects::2", + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af")) + .collect() + .get(0) + .getContext() + .get(0) + .getId()); + List titles = tmp + .filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af")) + .collect() + .get(0) + .getTitle(); + Assertions.assertEquals(1, titles.size()); + Assertions.assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix)); + Assertions.assertEquals("subtitle", titles.get(0).getQualifier().getClassid()); - //original result with sobigdata context without gcube as starting string in the main title for the publication - Assertions.assertEquals(1, - tmp.filter(p->p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9")).collect().get(0).getContext().size()); - Assertions.assertEquals("sobigdata::projects::2",tmp.filter(p->p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9")).collect().get(0).getContext().get(0).getId() ); + // original result with sobigdata context with gcube not as starting string in the main title + Assertions + .assertEquals( + 1, + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f")) + .collect() + .get(0) + .getContext() + .size()); + Assertions + .assertEquals( + "sobigdata::projects::1", + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f")) + .collect() + .get(0) + .getContext() + .get(0) + .getId()); + titles = tmp + .filter(p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f")) + .collect() + .get(0) + .getTitle(); + Assertions.assertEquals(1, titles.size()); + Assertions.assertFalse(titles.get(0).getValue().toLowerCase().startsWith(prefix)); + Assertions.assertTrue(titles.get(0).getValue().toLowerCase().contains(prefix.trim())); + Assertions.assertEquals("main title", titles.get(0).getQualifier().getClassid()); - //original result with sobigdata context with gcube as starting string in the subtitle - Assertions.assertEquals(1, - tmp.filter(p->p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af")).collect().get(0).getContext().size()); - Assertions.assertEquals("sobigdata::projects::2",tmp.filter(p->p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af")).collect().get(0).getContext().get(0).getId() ); - List titles = tmp.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af")).collect().get(0).getTitle(); - Assertions.assertEquals(1, titles.size()); - Assertions.assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix) ); - Assertions.assertEquals("subtitle", titles.get(0).getQualifier().getClassid()); + // original result with sobigdata in context and also other contexts with gcube as starting string for the main + // title + Assertions + .assertEquals( + 1, + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53")) + .collect() + .get(0) + .getContext() + .size()); + Assertions + .assertEquals( + "dh-ch", + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53")) + .collect() + .get(0) + .getContext() + .get(0) + .getId()); + titles = tmp + .filter(p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53")) + .collect() + .get(0) + .getTitle(); + Assertions.assertEquals(1, titles.size()); + Assertions.assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix)); + Assertions.assertEquals("main title", titles.get(0).getQualifier().getClassid()); - //original result with sobigdata context with gcube not as starting string in the main title - Assertions.assertEquals(1, - tmp.filter(p->p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f")).collect().get(0).getContext().size()); - Assertions.assertEquals("sobigdata::projects::1",tmp.filter(p->p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f")).collect().get(0).getContext().get(0).getId() ); - titles = tmp.filter(p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f")).collect().get(0).getTitle(); - Assertions.assertEquals(1, titles.size()); - Assertions.assertFalse(titles.get(0).getValue().toLowerCase().startsWith(prefix) ); - Assertions.assertTrue(titles.get(0).getValue().toLowerCase().contains(prefix.trim()) ); - Assertions.assertEquals("main title", titles.get(0).getQualifier().getClassid()); + // original result with multiple main title one of which whith gcube as starting string and with 2 contextes + Assertions + .assertEquals( + 1, + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff")) + .collect() + .get(0) + .getContext() + .size()); + Assertions + .assertEquals( + "dh-ch", + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff")) + .collect() + .get(0) + .getContext() + .get(0) + .getId()); + titles = tmp + .filter(p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff")) + .collect() + .get(0) + .getTitle(); + Assertions.assertEquals(2, titles.size()); + Assertions + .assertTrue( + titles + .stream() + .anyMatch( + t -> t.getQualifier().getClassid().equals("main title") + && t.getValue().toLowerCase().startsWith(prefix))); - //original result with sobigdata in context and also other contexts with gcube as starting string for the main title - Assertions.assertEquals(1, - tmp.filter(p->p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53")).collect().get(0).getContext().size()); - Assertions.assertEquals("dh-ch",tmp.filter(p->p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53")).collect().get(0).getContext().get(0).getId() ); - titles = tmp.filter(p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53")).collect().get(0).getTitle(); - Assertions.assertEquals(1, titles.size()); - Assertions.assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix) ); - Assertions.assertEquals("main title", titles.get(0).getQualifier().getClassid()); + // original result without sobigdata in context with gcube as starting string for the main title + Assertions + .assertEquals( + 1, + tmp + .filter(p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8")) + .collect() + .get(0) + .getContext() + .size()); + Assertions + .assertEquals( + "dh-ch", + tmp + .filter(p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8")) + .collect() + .get(0) + .getContext() + .get(0) + .getId()); + titles = tmp + .filter(p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8")) + .collect() + .get(0) + .getTitle(); + Assertions.assertEquals(2, titles.size()); - //original result with multiple main title one of which whith gcube as starting string and with 2 contextes - Assertions.assertEquals(1, - tmp.filter(p->p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff")).collect().get(0).getContext().size()); - Assertions.assertEquals("dh-ch",tmp.filter(p->p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff")).collect().get(0).getContext().get(0).getId() ); - titles = tmp.filter(p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff")).collect().get(0).getTitle(); - Assertions.assertEquals(2, titles.size()); - Assertions.assertTrue(titles.stream().anyMatch(t -> t.getQualifier().getClassid().equals("main title") && t.getValue().toLowerCase().startsWith(prefix)) ); + Assertions + .assertTrue( + titles + .stream() + .anyMatch( + t -> t.getQualifier().getClassid().equals("main title") + && t.getValue().toLowerCase().startsWith(prefix))); - - //original result without sobigdata in context with gcube as starting string for the main title - Assertions.assertEquals(1, - tmp.filter(p->p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8")).collect().get(0).getContext().size()); - Assertions.assertEquals("dh-ch",tmp.filter(p->p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8")).collect().get(0).getContext().get(0).getId() ); - titles = tmp.filter(p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8")).collect().get(0).getTitle(); - Assertions.assertEquals(2, titles.size()); - - Assertions.assertTrue(titles.stream().anyMatch(t -> t.getQualifier().getClassid().equals("main title") && t.getValue().toLowerCase().startsWith(prefix))); - - - } + } } From e0915061c234447edbeb898a4cb726f62b89eb29 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 21 Apr 2022 16:32:40 +0200 Subject: [PATCH 4/5] [Clean Context] fixed issue in param name --- .../eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml index 3d4d60d73..45e492387 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml @@ -17,6 +17,14 @@ shouldCleanContext true if the context have to be cleaned + + contextId + sobigdata + + + verifyParam + gcube + sparkDriverMemory From 19d90658fc74006408d76269dc332d7205aaeedd Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 22 Apr 2022 15:41:23 +0200 Subject: [PATCH 5/5] [Clean Context] added description to parameters --- .../eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml index 45e492387..0cf6cdd05 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml @@ -20,10 +20,15 @@ contextId sobigdata + It is the context id that should be removed from the result if the condition is matched. + Now it is just sobigdata. In a futere implementation I plan to have the contextId as value in a json + where to specify also the constraints that should be verified to remove the context from the result verifyParam gcube + It is the constrint to be verified. This time is hardcoded as gcube and it is searched for in + the title. If title starts with gcube than the context sobigdata will be removed by the result if present