forked from D-Net/dnet-hadoop
[raw_all] added extra workflow step for patching the identifiers in the relations, given an id mapping dataset
This commit is contained in:
parent
998b66855a
commit
d267dce520
|
@ -0,0 +1,117 @@
|
|||
package eu.dnetlib.dhp.oa.graph.raw;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.oa.graph.dump.Utils;
|
||||
import eu.dnetlib.dhp.oa.graph.raw.common.RelationIdMapping;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
public class PatchRelationsApplication {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(PatchRelationsApplication.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
Optional.ofNullable(
|
||||
PatchRelationsApplication.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/graph/patch_relations_parameters.json"))
|
||||
.orElseThrow(FileNotFoundException::new)
|
||||
));
|
||||
parser.parseArgument(args);
|
||||
|
||||
final Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String graphBasePath = parser.get("graphBasePath");
|
||||
log.info("graphBasePath: {}", graphBasePath);
|
||||
|
||||
final String workingDir = parser.get("workingDir");
|
||||
log.info("workingDir: {}", workingDir);
|
||||
|
||||
final String idMappingPath = parser.get("idMappingPath");
|
||||
log.info("idMappingPath: {}", idMappingPath);
|
||||
|
||||
final SparkConf conf = new SparkConf();
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> patchRelations(spark, graphBasePath, workingDir, idMappingPath));
|
||||
}
|
||||
|
||||
/**
|
||||
* Substitutes the identifiers (source/target) from the set of relations part of the graphBasePath included in the
|
||||
* mapping provided by the dataset stored on idMappingPath, using workingDir as intermediate storage location.
|
||||
*
|
||||
* @param spark the SparkSession
|
||||
* @param graphBasePath base graph path providing the set of relations to patch
|
||||
* @param workingDir intermediate storage location
|
||||
* @param idMappingPath dataset providing the old -> new identifier mapping
|
||||
*/
|
||||
private static void patchRelations(final SparkSession spark, final String graphBasePath, final String workingDir, final String idMappingPath) {
|
||||
|
||||
final String relationPath = graphBasePath + "/relation";
|
||||
|
||||
final Dataset<Relation> rels = Utils.readPath(spark, relationPath, Relation.class);
|
||||
final Dataset<RelationIdMapping> idMapping = Utils.readPath(spark, idMappingPath, RelationIdMapping.class);
|
||||
|
||||
rels
|
||||
.joinWith(idMapping, rels.col("source").equalTo(idMapping.col("oldId")), "full")
|
||||
.filter((FilterFunction<Tuple2<Relation, RelationIdMapping>>) t -> Objects.nonNull(t._1()))
|
||||
.map((MapFunction<Tuple2<Relation, RelationIdMapping>, Relation>) t -> {
|
||||
final Relation r = t._1();
|
||||
Optional.ofNullable(t._2())
|
||||
.map(RelationIdMapping::getNewId)
|
||||
.ifPresent(r::setSource);
|
||||
return r;
|
||||
}, Encoders.bean(Relation.class))
|
||||
.joinWith(idMapping, rels.col("target").equalTo(idMapping.col("oldId")), "full")
|
||||
.filter((FilterFunction<Tuple2<Relation, RelationIdMapping>>) t -> Objects.nonNull(t._1()))
|
||||
.map((MapFunction<Tuple2<Relation, RelationIdMapping>, Relation>) t -> {
|
||||
final Relation r = t._1();
|
||||
Optional.ofNullable(t._2())
|
||||
.map(RelationIdMapping::getNewId)
|
||||
.ifPresent(r::setTarget);
|
||||
return r;
|
||||
}, Encoders.bean(Relation.class))
|
||||
.map(
|
||||
(MapFunction<Relation, String>) OBJECT_MAPPER::writeValueAsString,
|
||||
Encoders.STRING())
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.text(workingDir);
|
||||
|
||||
spark.read().textFile(workingDir)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.text(relationPath);
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
package eu.dnetlib.dhp.oa.graph.raw.common;
|
||||
|
||||
public class RelationIdMapping {
|
||||
|
||||
private String oldId;
|
||||
|
||||
private String newId;
|
||||
|
||||
public String getOldId() {
|
||||
return oldId;
|
||||
}
|
||||
|
||||
public void setOldId(final String oldId) {
|
||||
this.oldId = oldId;
|
||||
}
|
||||
|
||||
public String getNewId() {
|
||||
return newId;
|
||||
}
|
||||
|
||||
public void setNewId(final String newId) {
|
||||
this.newId = newId;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
[
|
||||
{
|
||||
"paramName": "issm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "when true will stop SparkSession after job execution",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "g",
|
||||
"paramLongName": "graphBasePath",
|
||||
"paramDescription": "base graph path providing the set of relations to patch",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "w",
|
||||
"paramLongName": "workingDir",
|
||||
"paramDescription": "intermediate storage location",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "i",
|
||||
"paramLongName": "idMappingPath",
|
||||
"paramDescription": "dataset providing the old -> new identifier mapping",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -100,6 +100,16 @@
|
|||
<value></value>
|
||||
<description>a blacklist of nsprefixes (comma separeted)</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>shouldPatchRelations</name>
|
||||
<value>false</value>
|
||||
<description>activates the relation patching phase, driven by the content in ${idMappingPath}</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>idMappingPath</name>
|
||||
<value></value>
|
||||
<description>path pointing to the relations identifiers mapping dataset</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
|
@ -538,7 +548,42 @@
|
|||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<join name="wait_graphs" to="fork_merge_claims"/>
|
||||
<join name="wait_graphs" to="patchRelations"/>
|
||||
|
||||
<decision name="decisionPatchRelations">
|
||||
<switch>
|
||||
<case to="patchRelations">
|
||||
${(shouldPatchRelations eq "true") and
|
||||
(fs:exists(concat(concat(wf:conf('nameNode'),'/'),wf:conf('idMappingPath'))) eq "true")}
|
||||
</case>
|
||||
<default to="fork_merge_claims"/>
|
||||
</switch>
|
||||
</decision>
|
||||
|
||||
<action name="patchRelations">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>PatchRelations</name>
|
||||
<class>eu.dnetlib.dhp.oa.graph.raw.PatchRelationsApplication</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory ${sparkExecutorMemory}
|
||||
--executor-cores ${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--graphBasePath</arg><arg>${workingDir}/graph_raw</arg>
|
||||
<arg>--workingDir</arg><arg>${workingDir}/patch_relations</arg>
|
||||
<arg>--idMappingPath</arg><arg>${idMappingPath}</arg>
|
||||
</spark>
|
||||
<ok to="fork_merge_claims"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<fork name="fork_merge_claims">
|
||||
<path start="merge_claims_publication"/>
|
||||
|
|
Loading…
Reference in New Issue