[raw_all] added extra workflow step for patching the identifiers in the relations, given an id mapping dataset

This commit is contained in:
Claudio Atzori 2021-07-29 12:13:06 +02:00
parent dc55ed4acd
commit e87e1805c4
4 changed files with 211 additions and 1 deletions

View File

@ -0,0 +1,115 @@
package eu.dnetlib.dhp.oa.graph.raw;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.graph.raw.common.RelationIdMapping;
import eu.dnetlib.dhp.schema.oaf.Relation;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.FileNotFoundException;
import java.util.Objects;
import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class PatchRelationsApplication {
private static final Logger log = LoggerFactory.getLogger(PatchRelationsApplication.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
Optional.ofNullable(
PatchRelationsApplication.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/patch_relations_parameters.json"))
.orElseThrow(FileNotFoundException::new)
));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String graphBasePath = parser.get("graphBasePath");
log.info("graphBasePath: {}", graphBasePath);
final String workingDir = parser.get("workingDir");
log.info("workingDir: {}", workingDir);
final String idMappingPath = parser.get("idMappingPath");
log.info("idMappingPath: {}", idMappingPath);
final SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> patchRelations(spark, graphBasePath, workingDir, idMappingPath));
}
/**
* Substitutes the identifiers (source/target) from the set of relations part of the graphBasePath included in the
* mapping provided by the dataset stored on idMappingPath, using workingDir as intermediate storage location.
*
* @param spark the SparkSession
* @param graphBasePath base graph path providing the set of relations to patch
* @param workingDir intermediate storage location
* @param idMappingPath dataset providing the old -> new identifier mapping
*/
private static void patchRelations(final SparkSession spark, final String graphBasePath, final String workingDir, final String idMappingPath) {
final String relationPath = graphBasePath + "/relation";
final Dataset<Relation> rels = Utils.readPath(spark, relationPath, Relation.class);
final Dataset<RelationIdMapping> idMapping = Utils.readPath(spark, idMappingPath, RelationIdMapping.class);
rels
.joinWith(idMapping, rels.col("source").equalTo(idMapping.col("oldId")), "left")
.map((MapFunction<Tuple2<Relation, RelationIdMapping>, Relation>) t -> {
final Relation r = t._1();
Optional.ofNullable(t._2())
.map(RelationIdMapping::getNewId)
.ifPresent(r::setSource);
return r;
}, Encoders.bean(Relation.class))
.joinWith(idMapping, rels.col("target").equalTo(idMapping.col("oldId")), "left")
.map((MapFunction<Tuple2<Relation, RelationIdMapping>, Relation>) t -> {
final Relation r = t._1();
Optional.ofNullable(t._2())
.map(RelationIdMapping::getNewId)
.ifPresent(r::setTarget);
return r;
}, Encoders.bean(Relation.class))
.map(
(MapFunction<Relation, String>) OBJECT_MAPPER::writeValueAsString,
Encoders.STRING())
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.text(workingDir);
spark.read().textFile(workingDir)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.text(relationPath);
}
}

View File

@ -0,0 +1,24 @@
package eu.dnetlib.dhp.oa.graph.raw.common;
public class RelationIdMapping {
private String oldId;
private String newId;
public String getOldId() {
return oldId;
}
public void setOldId(final String oldId) {
this.oldId = oldId;
}
public String getNewId() {
return newId;
}
public void setNewId(final String newId) {
this.newId = newId;
}
}

View File

@ -0,0 +1,26 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "g",
"paramLongName": "graphBasePath",
"paramDescription": "base graph path providing the set of relations to patch",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workingDir",
"paramDescription": "intermediate storage location",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "idMappingPath",
"paramDescription": "dataset providing the old -> new identifier mapping",
"paramRequired": true
}
]

View File

@ -100,6 +100,16 @@
<value></value> <value></value>
<description>a blacklist of nsprefixes (comma separeted)</description> <description>a blacklist of nsprefixes (comma separeted)</description>
</property> </property>
<property>
<name>shouldPatchRelations</name>
<value>false</value>
<description>activates the relation patching phase, driven by the content in ${idMappingPath}</description>
</property>
<property>
<name>idMappingPath</name>
<value></value>
<description>path pointing to the relations identifiers mapping dataset</description>
</property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
<description>memory for driver process</description> <description>memory for driver process</description>
@ -538,7 +548,42 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<join name="wait_graphs" to="fork_merge_claims"/> <join name="wait_graphs" to="patchRelations"/>
<decision name="decisionPatchRelations">
<switch>
<case to="patchRelations">
${(shouldPatchRelations eq "true") and
(fs:exists(concat(concat(wf:conf('nameNode'),'/'),wf:conf('idMappingPath'))) eq "true")}
</case>
<default to="fork_merge_claims"/>
</switch>
</decision>
<action name="patchRelations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PatchRelations</name>
<class>eu.dnetlib.dhp.oa.graph.raw.PatchRelationsApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--graphBasePath</arg><arg>${workingDir}/graph_raw</arg>
<arg>--workingDir</arg><arg>${workingDir}/patch_relations</arg>
<arg>--idMappingPath</arg><arg>${idMappingPath}</arg>
</spark>
<ok to="fork_merge_claims"/>
<error to="Kill"/>
</action>
<fork name="fork_merge_claims"> <fork name="fork_merge_claims">
<path start="merge_claims_publication"/> <path start="merge_claims_publication"/>