WIP: graph cleaner implementation

master
Claudio Atzori 4 years ago
parent bed65a1be6
commit 0d52816244

@ -2,6 +2,7 @@
package eu.dnetlib.dhp.common;
import java.io.Serializable;
import java.util.function.Consumer;
import java.util.function.Supplier;
/** Provides serializable and throwing extensions to standard functional interfaces. */
@ -10,6 +11,16 @@ public class FunctionalInterfaceSupport {
private FunctionalInterfaceSupport() {
}
/**
* Serializable consumer of any kind of objects. To be used withing spark processing pipelines when supplying
* functions externally.
*
* @param <T>
*/
@FunctionalInterface
public interface SerializableConsumer<T> extends Consumer<T>, Serializable {
}
/**
* Serializable supplier of any kind of objects. To be used withing spark processing pipelines when supplying
* functions externally.

@ -3,19 +3,9 @@ package eu.dnetlib.dhp.oa.graph.clean;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.beans.BeanInfo;
import java.beans.IntrospectionException;
import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
@ -33,11 +23,10 @@ import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import scala.Predef;
public class CleanGraphProperties {
public class CleanGraphSparkJob {
private static final Logger log = LoggerFactory.getLogger(CleanGraphProperties.class);
private static final Logger log = LoggerFactory.getLogger(CleanGraphSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@ -45,7 +34,7 @@ public class CleanGraphProperties {
String jsonConfiguration = IOUtils
.toString(
CleanGraphProperties.class
CleanGraphSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
@ -91,13 +80,14 @@ public class CleanGraphProperties {
Class<T> clazz,
String outputPath) {
CleaningRule<T> rule = new CleaningRule<>(vocs);
final CleaningRuleMap mapping = CleaningRuleMap.create(vocs);
readTableFromPath(spark, inputPath, clazz)
.map(rule, Encoders.bean(clazz))
.map((MapFunction<T, T>) value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz))
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
.option("compression", "gzip")
.json(outputPath);
}
private static <T extends Oaf> Dataset<T> readTableFromPath(

@ -1,43 +1,23 @@
package eu.dnetlib.dhp.oa.graph.clean;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.spark.api.java.function.MapFunction;
import com.google.common.collect.Maps;
import java.io.Serializable;
import java.util.HashMap;
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableConsumer;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
public class CleaningRule<T extends Oaf> implements MapFunction<T, T> {
private VocabularyGroup vocabularies;
private Map<Class, Consumer<Object>> mapping = Maps.newHashMap();
public CleaningRule(VocabularyGroup vocabularies) {
this.vocabularies = vocabularies;
setMappings(vocabularies);
}
@Override
public T call(T value) throws Exception {
OafNavigator.apply(value, mapping);
return value;
}
public class CleaningRuleMap extends HashMap<Class, SerializableConsumer<Object>> implements Serializable {
/**
* Populates the mapping for the Oaf types subject to cleaning
*
* Creates the mapping for the Oaf types subject to cleaning
*
* @param vocabularies
*/
private void setMappings(VocabularyGroup vocabularies) {
public static CleaningRuleMap create(VocabularyGroup vocabularies) {
CleaningRuleMap mapping = new CleaningRuleMap();
mapping.put(Qualifier.class, o -> {
Qualifier q = (Qualifier) o;
if (vocabularies.vocabularyExists(q.getSchemeid())) {
@ -54,10 +34,7 @@ public class CleaningRule<T extends Oaf> implements MapFunction<T, T> {
* }
*/
});
}
public VocabularyGroup getVocabularies() {
return vocabularies;
return mapping;
}
}

@ -1,15 +1,18 @@
package eu.dnetlib.dhp.oa.graph.clean;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.*;
import java.util.function.Consumer;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import eu.dnetlib.dhp.schema.oaf.Oaf;
public class OafNavigator {
public class OafCleaner implements Serializable {
public static <E extends Oaf> E apply(E oaf, Map<Class, Consumer<Object>> mapping) {
public static <E extends Oaf> E apply(E oaf, CleaningRuleMap mapping) {
try {
navigate(oaf, mapping);
} catch (IllegalAccessException e) {
@ -18,7 +21,7 @@ public class OafNavigator {
return oaf;
}
private static void navigate(Object o, Map<Class, Consumer<Object>> mapping) throws IllegalAccessException {
private static void navigate(Object o, CleaningRuleMap mapping) throws IllegalAccessException {
if (isPrimitive(o)) {
return;
} else if (isIterable(o.getClass())) {
@ -40,7 +43,7 @@ public class OafNavigator {
}
}
private static boolean hasMapping(Object o, Map<Class, Consumer<Object>> mapping) {
private static boolean hasMapping(Object o, CleaningRuleMap mapping) {
return mapping.containsKey(o.getClass());
}

@ -72,18 +72,17 @@
<master>yarn</master>
<mode>cluster</mode>
<name>Clean publications</name>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphProperties</class>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/publication</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/publication</arg>
@ -99,18 +98,17 @@
<master>yarn</master>
<mode>cluster</mode>
<name>Clean datasets</name>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphProperties</class>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/dataset</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/dataset</arg>
@ -126,18 +124,17 @@
<master>yarn</master>
<mode>cluster</mode>
<name>Clean otherresearchproducts</name>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphProperties</class>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
@ -153,18 +150,17 @@
<master>yarn</master>
<mode>cluster</mode>
<name>Clean softwares</name>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphProperties</class>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/software</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/software</arg>
@ -180,18 +176,17 @@
<master>yarn</master>
<mode>cluster</mode>
<name>Clean datasources</name>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphProperties</class>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/datasource</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/datasource</arg>
@ -207,18 +202,17 @@
<master>yarn</master>
<mode>cluster</mode>
<name>Clean organizations</name>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphProperties</class>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/organization</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/organization</arg>
@ -234,18 +228,17 @@
<master>yarn</master>
<mode>cluster</mode>
<name>Clean projects</name>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphProperties</class>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/project</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/project</arg>
@ -261,18 +254,17 @@
<master>yarn</master>
<mode>cluster</mode>
<name>Clean relations</name>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphProperties</class>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/relation</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/relation</arg>

@ -5,32 +5,27 @@ import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.lenient;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.platform.commons.util.StringUtils;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyTerm;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ExtendWith(MockitoExtension.class)
public class CleaningRuleTest {
public class CleaningFunctionTest {
public static final ObjectMapper MAPPER = new ObjectMapper();
@ -39,7 +34,7 @@ public class CleaningRuleTest {
private VocabularyGroup vocabularies;
private CleaningRule<Publication> cleaningRule;
private CleaningRuleMap mapping;
@BeforeEach
public void setUp() throws ISLookUpException, IOException {
@ -49,18 +44,19 @@ public class CleaningRuleTest {
.thenReturn(synonyms());
vocabularies = VocabularyGroup.loadVocsFromIS(isLookUpService);
cleaningRule = new CleaningRule(vocabularies);
mapping = CleaningRuleMap.create(vocabularies);
}
@Test
public void testCleaning() throws Exception {
assertNotNull(cleaningRule.getVocabularies());
assertNotNull(vocabularies);
assertNotNull(mapping);
String json = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/result.json"));
Publication p_in = MAPPER.readValue(json, Publication.class);
Publication p_out = cleaningRule.call(p_in);
Publication p_out = OafCleaner.apply(p_in, mapping);
assertNotNull(p_out);
@ -100,11 +96,11 @@ public class CleaningRuleTest {
private List<String> vocs() throws IOException {
return IOUtils
.readLines(CleaningRuleTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt"));
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt"));
}
private List<String> synonyms() throws IOException {
return IOUtils
.readLines(CleaningRuleTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt"));
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt"));
}
}
Loading…
Cancel
Save