diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java index e793e3f295..c6c9d80447 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java @@ -2,6 +2,7 @@ package eu.dnetlib.dhp.common; import java.io.Serializable; +import java.util.function.Consumer; import java.util.function.Supplier; /** Provides serializable and throwing extensions to standard functional interfaces. */ @@ -10,6 +11,16 @@ public class FunctionalInterfaceSupport { private FunctionalInterfaceSupport() { } + /** + * Serializable consumer of any kind of objects. To be used withing spark processing pipelines when supplying + * functions externally. + * + * @param + */ + @FunctionalInterface + public interface SerializableConsumer extends Consumer, Serializable { + } + /** * Serializable supplier of any kind of objects. To be used withing spark processing pipelines when supplying * functions externally. diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphProperties.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java similarity index 84% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphProperties.java rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java index 86dca6d21e..b2c7152d5f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphProperties.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java @@ -3,19 +3,9 @@ package eu.dnetlib.dhp.oa.graph.clean; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import java.beans.BeanInfo; -import java.beans.IntrospectionException; -import java.beans.Introspector; -import java.beans.PropertyDescriptor; -import java.lang.reflect.InvocationTargetException; -import java.util.Map; -import java.util.Objects; import java.util.Optional; -import java.util.TreeMap; -import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; @@ -33,11 +23,10 @@ import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import scala.Predef; -public class CleanGraphProperties { +public class CleanGraphSparkJob { - private static final Logger log = LoggerFactory.getLogger(CleanGraphProperties.class); + private static final Logger log = LoggerFactory.getLogger(CleanGraphSparkJob.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -45,7 +34,7 @@ public class CleanGraphProperties { String jsonConfiguration = IOUtils .toString( - CleanGraphProperties.class + CleanGraphSparkJob.class .getResourceAsStream( "/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); @@ -91,13 +80,14 @@ public class CleanGraphProperties { Class clazz, String outputPath) { - CleaningRule rule = new CleaningRule<>(vocs); + final CleaningRuleMap mapping = CleaningRuleMap.create(vocs); readTableFromPath(spark, inputPath, clazz) - .map(rule, Encoders.bean(clazz)) + .map((MapFunction) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz)) .write() .mode(SaveMode.Overwrite) - .parquet(outputPath); + .option("compression", "gzip") + .json(outputPath); } private static Dataset readTableFromPath( diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningRule.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningRuleMap.java similarity index 50% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningRule.java rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningRuleMap.java index 6d7a262beb..8006f73009 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningRule.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningRuleMap.java @@ -1,43 +1,23 @@ package eu.dnetlib.dhp.oa.graph.clean; -import java.util.Map; -import java.util.function.Consumer; - -import org.apache.spark.api.java.function.MapFunction; - -import com.google.common.collect.Maps; +import java.io.Serializable; +import java.util.HashMap; +import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableConsumer; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; -import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; -public class CleaningRule implements MapFunction { - - private VocabularyGroup vocabularies; - - private Map> mapping = Maps.newHashMap(); - - public CleaningRule(VocabularyGroup vocabularies) { - this.vocabularies = vocabularies; - setMappings(vocabularies); - } - - @Override - public T call(T value) throws Exception { - - OafNavigator.apply(value, mapping); - - return value; - } +public class CleaningRuleMap extends HashMap> implements Serializable { /** - * Populates the mapping for the Oaf types subject to cleaning - * + * Creates the mapping for the Oaf types subject to cleaning + * * @param vocabularies */ - private void setMappings(VocabularyGroup vocabularies) { + public static CleaningRuleMap create(VocabularyGroup vocabularies) { + CleaningRuleMap mapping = new CleaningRuleMap(); mapping.put(Qualifier.class, o -> { Qualifier q = (Qualifier) o; if (vocabularies.vocabularyExists(q.getSchemeid())) { @@ -54,10 +34,7 @@ public class CleaningRule implements MapFunction { * } */ }); - } - - public VocabularyGroup getVocabularies() { - return vocabularies; + return mapping; } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/OafNavigator.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/OafCleaner.java similarity index 80% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/OafNavigator.java rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/OafCleaner.java index 2cc499577d..9ba153ba5e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/OafNavigator.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/OafCleaner.java @@ -1,15 +1,18 @@ package eu.dnetlib.dhp.oa.graph.clean; +import java.io.Serializable; import java.lang.reflect.Field; -import java.util.*; -import java.util.function.Consumer; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; import eu.dnetlib.dhp.schema.oaf.Oaf; -public class OafNavigator { +public class OafCleaner implements Serializable { - public static E apply(E oaf, Map> mapping) { + public static E apply(E oaf, CleaningRuleMap mapping) { try { navigate(oaf, mapping); } catch (IllegalAccessException e) { @@ -18,7 +21,7 @@ public class OafNavigator { return oaf; } - private static void navigate(Object o, Map> mapping) throws IllegalAccessException { + private static void navigate(Object o, CleaningRuleMap mapping) throws IllegalAccessException { if (isPrimitive(o)) { return; } else if (isIterable(o.getClass())) { @@ -40,7 +43,7 @@ public class OafNavigator { } } - private static boolean hasMapping(Object o, Map> mapping) { + private static boolean hasMapping(Object o, CleaningRuleMap mapping) { return mapping.containsKey(o.getClass()); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/config-default.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml similarity index 80% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/workflow.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml index d152448af3..7329df29a4 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml @@ -72,18 +72,17 @@ yarn cluster Clean publications - eu.dnetlib.dhp.oa.graph.clean.CleanGraphProperties + eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob dhp-graph-mapper-${projectVersion}.jar - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --conf spark.network.timeout=${sparkNetworkTimeout} --inputPath${graphInputPath}/publication --outputPath${graphOutputPath}/publication @@ -99,18 +98,17 @@ yarn cluster Clean datasets - eu.dnetlib.dhp.oa.graph.clean.CleanGraphProperties + eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob dhp-graph-mapper-${projectVersion}.jar - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --conf spark.network.timeout=${sparkNetworkTimeout} --inputPath${graphInputPath}/dataset --outputPath${graphOutputPath}/dataset @@ -126,18 +124,17 @@ yarn cluster Clean otherresearchproducts - eu.dnetlib.dhp.oa.graph.clean.CleanGraphProperties + eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob dhp-graph-mapper-${projectVersion}.jar - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --conf spark.network.timeout=${sparkNetworkTimeout} --inputPath${graphInputPath}/otherresearchproduct --outputPath${graphOutputPath}/otherresearchproduct @@ -153,18 +150,17 @@ yarn cluster Clean softwares - eu.dnetlib.dhp.oa.graph.clean.CleanGraphProperties + eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob dhp-graph-mapper-${projectVersion}.jar - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --conf spark.network.timeout=${sparkNetworkTimeout} --inputPath${graphInputPath}/software --outputPath${graphOutputPath}/software @@ -180,18 +176,17 @@ yarn cluster Clean datasources - eu.dnetlib.dhp.oa.graph.clean.CleanGraphProperties + eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob dhp-graph-mapper-${projectVersion}.jar - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --conf spark.network.timeout=${sparkNetworkTimeout} --inputPath${graphInputPath}/datasource --outputPath${graphOutputPath}/datasource @@ -207,18 +202,17 @@ yarn cluster Clean organizations - eu.dnetlib.dhp.oa.graph.clean.CleanGraphProperties + eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob dhp-graph-mapper-${projectVersion}.jar - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --conf spark.network.timeout=${sparkNetworkTimeout} --inputPath${graphInputPath}/organization --outputPath${graphOutputPath}/organization @@ -234,18 +228,17 @@ yarn cluster Clean projects - eu.dnetlib.dhp.oa.graph.clean.CleanGraphProperties + eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob dhp-graph-mapper-${projectVersion}.jar - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --conf spark.network.timeout=${sparkNetworkTimeout} --inputPath${graphInputPath}/project --outputPath${graphOutputPath}/project @@ -261,18 +254,17 @@ yarn cluster Clean relations - eu.dnetlib.dhp.oa.graph.clean.CleanGraphProperties + eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob dhp-graph-mapper-${projectVersion}.jar - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --conf spark.network.timeout=${sparkNetworkTimeout} --inputPath${graphInputPath}/relation --outputPath${graphOutputPath}/relation diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningRuleTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java similarity index 82% rename from dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningRuleTest.java rename to dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java index 7792f64c65..d1f152342f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningRuleTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java @@ -5,32 +5,27 @@ import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.lenient; import java.io.IOException; -import java.util.HashSet; import java.util.List; -import java.util.Objects; import java.util.Set; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.io.IOUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.platform.commons.util.StringUtils; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; -import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyTerm; import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @ExtendWith(MockitoExtension.class) -public class CleaningRuleTest { +public class CleaningFunctionTest { public static final ObjectMapper MAPPER = new ObjectMapper(); @@ -39,7 +34,7 @@ public class CleaningRuleTest { private VocabularyGroup vocabularies; - private CleaningRule cleaningRule; + private CleaningRuleMap mapping; @BeforeEach public void setUp() throws ISLookUpException, IOException { @@ -49,18 +44,19 @@ public class CleaningRuleTest { .thenReturn(synonyms()); vocabularies = VocabularyGroup.loadVocsFromIS(isLookUpService); - cleaningRule = new CleaningRule(vocabularies); + mapping = CleaningRuleMap.create(vocabularies); } @Test public void testCleaning() throws Exception { - assertNotNull(cleaningRule.getVocabularies()); + assertNotNull(vocabularies); + assertNotNull(mapping); String json = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/result.json")); Publication p_in = MAPPER.readValue(json, Publication.class); - Publication p_out = cleaningRule.call(p_in); + Publication p_out = OafCleaner.apply(p_in, mapping); assertNotNull(p_out); @@ -100,11 +96,11 @@ public class CleaningRuleTest { private List vocs() throws IOException { return IOUtils - .readLines(CleaningRuleTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt")); + .readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt")); } private List synonyms() throws IOException { return IOUtils - .readLines(CleaningRuleTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt")); + .readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt")); } }