WIP: graph property fixing implementation

This commit is contained in:
Claudio Atzori 2020-06-05 18:37:38 +02:00
parent 7e82996e7c
commit b2349659cf
4 changed files with 549 additions and 0 deletions

View File

@ -0,0 +1,210 @@
package eu.dnetlib.dhp.oa.graph.fix;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class FixGraphProperties {
private static final Logger log = LoggerFactory.getLogger(FixGraphProperties.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
FixGraphProperties.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/input_fix_graph_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: {}", isLookupUrl);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
Class<? extends OafEntity> entityClazz = (Class<? extends OafEntity>) Class.forName(graphTableClassName);
final VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookupUrl);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
fixGraphTable(spark, vocs, inputPath, entityClazz, outputPath);
});
}
private static <T extends Oaf> void fixGraphTable(
SparkSession spark,
VocabularyGroup vocs,
String inputPath,
Class<T> clazz,
String outputPath) {
MapFunction<T, T> fixFn = getFixingFunction(vocs, clazz);
readTableFromPath(spark, inputPath, clazz)
.map(fixFn, Encoders.bean(clazz))
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
}
private static <T extends Oaf> MapFunction<T, T> getFixingFunction(VocabularyGroup vocs, Class<T> clazz) {
switch (clazz.getCanonicalName()) {
case "eu.dnetlib.dhp.schema.oaf.Publication":
case "eu.dnetlib.dhp.schema.oaf.Dataset":
case "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct":
case "eu.dnetlib.dhp.schema.oaf.Software":
return (MapFunction<T, T>) value -> {
Result r = (Result) value;
if (r.getLanguage() != null) {
r.setLanguage(vocs.getTermAsQualifier("dnet:languages", "und"));
} else {
r.setLanguage(vocs.getTermAsQualifier("dnet:languages", r.getLanguage().getClassid()));
}
if (r.getCountry() != null) {
r.setCountry(
r.getCountry()
.stream()
.filter(Objects::nonNull)
.map(c -> {
Qualifier q = vocs.getTermAsQualifier("dnet:countries", c.getClassid());
Country cn = new Country();
cn.setDataInfo(c.getDataInfo());
cn.setClassid(q.getClassid());
cn.setClassname(cn.getClassname());
cn.setSchemeid("dnet:countries");
cn.setSchemename("dnet:countries");
return cn;
})
.collect(Collectors.toList()));
}
if (r.getSubject() != null) {
r.setSubject(
r.getSubject()
.stream()
.filter(Objects::nonNull)
.map(s -> {
if (s.getQualifier() == null || StringUtils.isBlank(s.getQualifier().getClassid())) {
s.setQualifier(vocs.getTermAsQualifier("dnet:subject_classification_typologies", "UNKNOWN"));
}
})
.collect(Collectors.toList())
);
}
if (r.getPublisher() != null && StringUtils.isBlank(r.getPublisher().getValue())) {
r.setPublisher(null);
}
if (r.getBestaccessright() == null) {
r.setBestaccessright(vocs.getTermAsQualifier("dnet:access_modes", "UNKNOWN"));
}
if (r.getInstance() != null) {
for(Instance i : r.getInstance()) {
if (i.getAccessright() == null) {
i.setAccessright(vocs.getTermAsQualifier("dnet:access_modes", "UNKNOWN"));
}
if (i.getInstancetype() != null) {
i.setInstancetype(vocs.getTermAsQualifier("dnet:publication_resource", i.getInstancetype().getClassid()));
} else {
i.setInstancetype(vocs.getTermAsQualifier("dnet:publication_resource", "0000"));
}
}
}
return clazz.cast(r);
};
case "eu.dnetlib.dhp.schema.oaf.Datasource":
return (MapFunction<T, T>) value -> {
return value;
};
case "eu.dnetlib.dhp.schema.oaf.Organization":
return (MapFunction<T, T>) value -> {
Organization o = (Organization) value;
if (o.getCountry() == null) {
o.setCountry(vocs.getTermAsQualifier("dnet:countries", "UNKNOWN"));
} else {
o.setCountry(vocs.getTermAsQualifier("dnet:countries", o.getCountry().getClassid()));
}
return clazz.cast(o);
};
case "eu.dnetlib.dhp.schema.oaf.Project":
return (MapFunction<T, T>) value -> {
return value;
};
case "eu.dnetlib.dhp.schema.oaf.Relation":
return (MapFunction<T, T>) value -> {
return value;
};
default:
throw new RuntimeException("unknown class: " + clazz.getCanonicalName());
}
}
private static <T extends Oaf> Dataset<T> readTableFromPath(
SparkSession spark, String inputEntityPath, Class<T> clazz) {
log.info("Reading Graph table from: {}", inputEntityPath);
return spark
.read()
.textFile(inputEntityPath)
.map(
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, clazz),
Encoders.bean(clazz));
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}

View File

@ -0,0 +1,18 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -0,0 +1,289 @@
<workflow-app name="fix Graph" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphInputPath</name>
<description>the input path to read graph content</description>
</property>
<property>
<name>graphOutputPath</name>
<description>the target path to store fixed graph</description>
</property>
<property>
<name>isLookupUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<start to="fork_fix_graph"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<fork name="fork_fix_graph">
<path start="fix_publication"/>
<path start="fix_dataset"/>
<path start="fix_otherresearchproduct"/>
<path start="fix_software"/>
<path start="fix_datasource"/>
<path start="fix_organization"/>
<path start="fix_project"/>
<path start="fix_relation"/>
</fork>
<action name="fix_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Fix publications</name>
<class>eu.dnetlib.dhp.oa.graph.fix.FixGraphProperties</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/publication</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>
<ok to="wait_fix"/>
<error to="Kill"/>
</action>
<action name="fix_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Fix datasets</name>
<class>eu.dnetlib.dhp.oa.graph.fix.FixGraphProperties</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/dataset</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>
<ok to="wait_fix"/>
<error to="Kill"/>
</action>
<action name="fix_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Fix otherresearchproducts</name>
<class>eu.dnetlib.dhp.oa.graph.fix.FixGraphProperties</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>
<ok to="wait_fix"/>
<error to="Kill"/>
</action>
<action name="fix_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Fix softwares</name>
<class>eu.dnetlib.dhp.oa.graph.fix.FixGraphProperties</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/software</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>
<ok to="wait_fix"/>
<error to="Kill"/>
</action>
<action name="fix_datasource">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Fix datasources</name>
<class>eu.dnetlib.dhp.oa.graph.fix.FixGraphProperties</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/datasource</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/datasource</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>
<ok to="wait_fix"/>
<error to="Kill"/>
</action>
<action name="fix_organization">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Fix organizations</name>
<class>eu.dnetlib.dhp.oa.graph.fix.FixGraphProperties</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/organization</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/organization</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>
<ok to="wait_fix"/>
<error to="Kill"/>
</action>
<action name="fix_project">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Fix projects</name>
<class>eu.dnetlib.dhp.oa.graph.fix.FixGraphProperties</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/project</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/project</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>
<ok to="wait_fix"/>
<error to="Kill"/>
</action>
<action name="fix_relation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Fix relations</name>
<class>eu.dnetlib.dhp.oa.graph.fix.FixGraphProperties</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/relation</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/relation</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>
<ok to="wait_fix"/>
<error to="Kill"/>
</action>
<join name="wait_fix" to="End"/>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,32 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "in",
"paramLongName": "inputPath",
"paramDescription": "the path to the graph data dump to read",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path to store the output graph",
"paramRequired": true
},
{
"paramName": "isu",
"paramLongName": "isLookupUrl",
"paramDescription": "url to the ISLookup Service",
"paramRequired": true
},
{
"paramName": "class",
"paramLongName": "graphTableClassName",
"paramDescription": "class name moelling the graph table",
"paramRequired": true
}
]