From b2349659cfceca0f749a75a388a012a3939ae1a0 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 5 Jun 2020 18:37:38 +0200 Subject: [PATCH] WIP: graph property fixing implementation --- .../dhp/oa/graph/fix/FixGraphProperties.java | 210 +++++++++++++ .../dhp/oa/graph/fix/config-default.xml | 18 ++ .../eu/dnetlib/dhp/oa/graph/fix/workflow.xml | 289 ++++++++++++++++++ .../oa/graph/input_fix_graph_parameters.json | 32 ++ 4 files changed, 549 insertions(+) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/fix/FixGraphProperties.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/fix/config-default.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/fix/workflow.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_fix_graph_parameters.json diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/fix/FixGraphProperties.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/fix/FixGraphProperties.java new file mode 100644 index 000000000..bbcadde59 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/fix/FixGraphProperties.java @@ -0,0 +1,210 @@ +package eu.dnetlib.dhp.oa.graph.fix; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils; +import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class FixGraphProperties { + + private static final Logger log = LoggerFactory.getLogger(FixGraphProperties.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + FixGraphProperties.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/input_fix_graph_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("inputPath"); + log.info("inputPath: {}", inputPath); + + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + String isLookupUrl = parser.get("isLookupUrl"); + log.info("isLookupUrl: {}", isLookupUrl); + + String graphTableClassName = parser.get("graphTableClassName"); + log.info("graphTableClassName: {}", graphTableClassName); + + Class entityClazz = (Class) Class.forName(graphTableClassName); + + final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupUrl); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + fixGraphTable(spark, vocs, inputPath, entityClazz, outputPath); + }); + } + + private static void fixGraphTable( + SparkSession spark, + VocabularyGroup vocs, + String inputPath, + Class clazz, + String outputPath) { + + MapFunction fixFn = getFixingFunction(vocs, clazz); + + readTableFromPath(spark, inputPath, clazz) + .map(fixFn, Encoders.bean(clazz)) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); + } + + private static MapFunction getFixingFunction(VocabularyGroup vocs, Class clazz) { + + switch (clazz.getCanonicalName()) { + case "eu.dnetlib.dhp.schema.oaf.Publication": + case "eu.dnetlib.dhp.schema.oaf.Dataset": + case "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct": + case "eu.dnetlib.dhp.schema.oaf.Software": + return (MapFunction) value -> { + Result r = (Result) value; + + if (r.getLanguage() != null) { + r.setLanguage(vocs.getTermAsQualifier("dnet:languages", "und")); + } else { + r.setLanguage(vocs.getTermAsQualifier("dnet:languages", r.getLanguage().getClassid())); + } + if (r.getCountry() != null) { + r.setCountry( + r.getCountry() + .stream() + .filter(Objects::nonNull) + .map(c -> { + Qualifier q = vocs.getTermAsQualifier("dnet:countries", c.getClassid()); + Country cn = new Country(); + cn.setDataInfo(c.getDataInfo()); + cn.setClassid(q.getClassid()); + cn.setClassname(cn.getClassname()); + cn.setSchemeid("dnet:countries"); + cn.setSchemename("dnet:countries"); + return cn; + }) + .collect(Collectors.toList())); + } + + if (r.getSubject() != null) { + r.setSubject( + r.getSubject() + .stream() + .filter(Objects::nonNull) + .map(s -> { + if (s.getQualifier() == null || StringUtils.isBlank(s.getQualifier().getClassid())) { + s.setQualifier(vocs.getTermAsQualifier("dnet:subject_classification_typologies", "UNKNOWN")); + } + }) + .collect(Collectors.toList()) + ); + } + + if (r.getPublisher() != null && StringUtils.isBlank(r.getPublisher().getValue())) { + r.setPublisher(null); + } + if (r.getBestaccessright() == null) { + r.setBestaccessright(vocs.getTermAsQualifier("dnet:access_modes", "UNKNOWN")); + } + if (r.getInstance() != null) { + for(Instance i : r.getInstance()) { + if (i.getAccessright() == null) { + i.setAccessright(vocs.getTermAsQualifier("dnet:access_modes", "UNKNOWN")); + } + if (i.getInstancetype() != null) { + i.setInstancetype(vocs.getTermAsQualifier("dnet:publication_resource", i.getInstancetype().getClassid())); + } else { + i.setInstancetype(vocs.getTermAsQualifier("dnet:publication_resource", "0000")); + } + + + } + } + + return clazz.cast(r); + }; + case "eu.dnetlib.dhp.schema.oaf.Datasource": + return (MapFunction) value -> { + return value; + }; + case "eu.dnetlib.dhp.schema.oaf.Organization": + return (MapFunction) value -> { + Organization o = (Organization) value; + + if (o.getCountry() == null) { + o.setCountry(vocs.getTermAsQualifier("dnet:countries", "UNKNOWN")); + } else { + o.setCountry(vocs.getTermAsQualifier("dnet:countries", o.getCountry().getClassid())); + } + + return clazz.cast(o); + }; + case "eu.dnetlib.dhp.schema.oaf.Project": + return (MapFunction) value -> { + return value; + }; + case "eu.dnetlib.dhp.schema.oaf.Relation": + return (MapFunction) value -> { + return value; + }; + default: + throw new RuntimeException("unknown class: " + clazz.getCanonicalName()); + } + + } + + private static Dataset readTableFromPath( + SparkSession spark, String inputEntityPath, Class clazz) { + + log.info("Reading Graph table from: {}", inputEntityPath); + return spark + .read() + .textFile(inputEntityPath) + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), + Encoders.bean(clazz)); + } + + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/fix/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/fix/config-default.xml new file mode 100644 index 000000000..2e0ed9aee --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/fix/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/fix/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/fix/workflow.xml new file mode 100644 index 000000000..e93f58279 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/fix/workflow.xml @@ -0,0 +1,289 @@ + + + + + graphInputPath + the input path to read graph content + + + graphOutputPath + the target path to store fixed graph + + + isLookupUrl + the address of the lookUp service + + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + + + yarn + cluster + Fix publications + eu.dnetlib.dhp.oa.graph.fix.FixGraphProperties + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputPath${graphInputPath}/publication + --outputPath${graphOutputPath}/publication + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication + --isLookupUrl${isLookupUrl} + + + + + + + + yarn + cluster + Fix datasets + eu.dnetlib.dhp.oa.graph.fix.FixGraphProperties + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputPath${graphInputPath}/dataset + --outputPath${graphOutputPath}/dataset + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset + --isLookupUrl${isLookupUrl} + + + + + + + + yarn + cluster + Fix otherresearchproducts + eu.dnetlib.dhp.oa.graph.fix.FixGraphProperties + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputPath${graphInputPath}/otherresearchproduct + --outputPath${graphOutputPath}/otherresearchproduct + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct + --isLookupUrl${isLookupUrl} + + + + + + + + yarn + cluster + Fix softwares + eu.dnetlib.dhp.oa.graph.fix.FixGraphProperties + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputPath${graphInputPath}/software + --outputPath${graphOutputPath}/software + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software + --isLookupUrl${isLookupUrl} + + + + + + + + yarn + cluster + Fix datasources + eu.dnetlib.dhp.oa.graph.fix.FixGraphProperties + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputPath${graphInputPath}/datasource + --outputPath${graphOutputPath}/datasource + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource + --isLookupUrl${isLookupUrl} + + + + + + + + yarn + cluster + Fix organizations + eu.dnetlib.dhp.oa.graph.fix.FixGraphProperties + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputPath${graphInputPath}/organization + --outputPath${graphOutputPath}/organization + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization + --isLookupUrl${isLookupUrl} + + + + + + + + yarn + cluster + Fix projects + eu.dnetlib.dhp.oa.graph.fix.FixGraphProperties + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputPath${graphInputPath}/project + --outputPath${graphOutputPath}/project + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project + --isLookupUrl${isLookupUrl} + + + + + + + + yarn + cluster + Fix relations + eu.dnetlib.dhp.oa.graph.fix.FixGraphProperties + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputPath${graphInputPath}/relation + --outputPath${graphOutputPath}/relation + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Relation + --isLookupUrl${isLookupUrl} + + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_fix_graph_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_fix_graph_parameters.json new file mode 100644 index 000000000..9cfed1e91 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/input_fix_graph_parameters.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "in", + "paramLongName": "inputPath", + "paramDescription": "the path to the graph data dump to read", + "paramRequired": true + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path to store the output graph", + "paramRequired": true + }, + { + "paramName": "isu", + "paramLongName": "isLookupUrl", + "paramDescription": "url to the ISLookup Service", + "paramRequired": true + }, + { + "paramName": "class", + "paramLongName": "graphTableClassName", + "paramDescription": "class name moelling the graph table", + "paramRequired": true + } +]