adding relations to the dump

This commit is contained in:
Miriam Baglioni 2023-01-23 16:08:50 +02:00
parent d5420960d1
commit 64511dad2c
10 changed files with 315 additions and 92 deletions

View File

@ -0,0 +1,78 @@
package eu.dnetlib.dhp.eosc.model;
import java.io.Serializable;
import java.util.Objects;
import com.github.imifou.jsonschema.module.addon.annotation.JsonSchema;
import eu.dnetlib.dhp.oa.model.Provenance;
import eu.dnetlib.dhp.oa.model.graph.Node;
import eu.dnetlib.dhp.oa.model.graph.RelType;
/**
* To represent the gereric relation between two entities. It has the following parameters: - private Node source to
* represent the entity source of the relation - private Node target to represent the entity target of the relation -
* private RelType reltype to represent the semantics of the relation - private Provenance provenance to represent the
* provenance of the relation
*/
public class Relation implements Serializable {
@JsonSchema(description = "The node source in the relation")
private String source;
@JsonSchema(description = "The node target in the relation")
private String target;
@JsonSchema(description = "To represent the semantics of a relation between two entities")
private RelType reltype;
@JsonSchema(description = "The reason why OpenAIRE holds the relation ")
private Provenance provenance;
public String getSource() {
return source;
}
public void setSource(String source) {
this.source = source;
}
public String getTarget() {
return target;
}
public void setTarget(String target) {
this.target = target;
}
public RelType getReltype() {
return reltype;
}
public void setReltype(RelType reltype) {
this.reltype = reltype;
}
public Provenance getProvenance() {
return provenance;
}
public void setProvenance(Provenance provenance) {
this.provenance = provenance;
}
@Override
public int hashCode() {
return Objects.hash(source, target, reltype.getType() + ":" + reltype.getName());
}
public static Relation newInstance(String source, String target, RelType reltype, Provenance provenance) {
Relation relation = new Relation();
relation.source = source;
relation.target = target;
relation.reltype = reltype;
relation.provenance = provenance;
return relation;
}
}

View File

@ -93,7 +93,11 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable {
rOrg.setResultId(t2._1().getTarget()); rOrg.setResultId(t2._1().getTarget());
eu.dnetlib.dhp.eosc.model.Organization org = new eu.dnetlib.dhp.eosc.model.Organization(); eu.dnetlib.dhp.eosc.model.Organization org = new eu.dnetlib.dhp.eosc.model.Organization();
org.setId(t2._2().getId()); org.setId(t2._2().getId());
org.setName(t2._2().getLegalname().getValue()); if (Optional.ofNullable(t2._2().getLegalname()).isPresent()) {
org.setName(t2._2().getLegalname().getValue());
} else {
org.setName("");
}
HashMap<String, Set<String>> organizationPids = new HashMap<>(); HashMap<String, Set<String>> organizationPids = new HashMap<>();
if (Optional.ofNullable(t2._2().getPid()).isPresent()) if (Optional.ofNullable(t2._2().getPid()).isPresent())
t2._2().getPid().forEach(p -> { t2._2().getPid().forEach(p -> {

View File

@ -22,7 +22,6 @@ import eu.dnetlib.dhp.oa.graph.dump.Constants;
import eu.dnetlib.dhp.oa.graph.dump.ResultMapper; import eu.dnetlib.dhp.oa.graph.dump.ResultMapper;
import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
import eu.dnetlib.dhp.oa.model.graph.GraphResult;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
/** /**

View File

@ -0,0 +1,122 @@
package eu.dnetlib.dhp.oa.graph.dump.eosc;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.model.Provenance;
import eu.dnetlib.dhp.oa.model.graph.Node;
import eu.dnetlib.dhp.oa.model.graph.RelType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Relation;
/**
* @author miriam.baglioni
* @Date 12/01/23
*/
public class SparkDumpRelation implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkDumpRelation.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkDumpRelation.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_relationdump_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
dumpRelation(spark, inputPath, outputPath);
});
}
private static void dumpRelation(SparkSession spark, String inputPath, String outputPath) {
Dataset<Relation> relations = Utils.readPath(spark, inputPath, Relation.class);
relations
.map((MapFunction<Relation, eu.dnetlib.dhp.eosc.model.Relation>) relation -> {
eu.dnetlib.dhp.eosc.model.Relation relNew = new eu.dnetlib.dhp.eosc.model.Relation();
relNew
.setSource(
relation.getSource());
relNew
.setTarget(
relation.getTarget());
relNew
.setReltype(
RelType
.newInstance(
relation.getRelClass(),
relation.getSubRelType()));
Optional<DataInfo> odInfo = Optional.ofNullable(relation.getDataInfo());
if (odInfo.isPresent()) {
DataInfo dInfo = odInfo.get();
if (Optional.ofNullable(dInfo.getProvenanceaction()).isPresent() &&
Optional.ofNullable(dInfo.getProvenanceaction().getClassname()).isPresent()) {
relNew
.setProvenance(
Provenance
.newInstance(
dInfo.getProvenanceaction().getClassname(),
dInfo.getTrust()));
}
}
return relNew;
}, Encoders.bean(eu.dnetlib.dhp.eosc.model.Relation.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Append)
.json(outputPath);
}
}

View File

@ -1,10 +1,11 @@
package eu.dnetlib.dhp.oa.graph.dump.eosc; package eu.dnetlib.dhp.oa.graph.dump.eosc;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import eu.dnetlib.dhp.eosc.model.EoscResult;
import eu.dnetlib.dhp.oa.graph.dump.Utils; import java.io.Serializable;
import eu.dnetlib.dhp.schema.common.ModelConstants; import java.util.*;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;
@ -17,103 +18,101 @@ import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.eosc.model.EoscResult;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2; import scala.Tuple2;
import java.io.Serializable;
import java.util.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
/** /**
* @author miriam.baglioni * @author miriam.baglioni
* @Date 12/01/23 * @Date 12/01/23
*/ */
public class SparkSelectRelation Serializable { public class SparkSelectRelation implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkSelectRelation.class); private static final Logger log = LoggerFactory.getLogger(SparkSelectRelation.class);
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils String jsonConfiguration = IOUtils
.toString( .toString(
SparkSelectRelation.class SparkSelectRelation.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_relationdump_parameters.json")); "/eu/dnetlib/dhp/oa/graph/dump/input_relationdump_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args); parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged")) .ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf) .map(Boolean::valueOf)
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath"); final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath); log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
Optional<String> rs = Optional.ofNullable(parser.get("removeSet")); Optional<String> rs = Optional.ofNullable(parser.get("removeSet"));
final Set<String> removeSet = new HashSet<>(); final Set<String> removeSet = new HashSet<>();
if (rs.isPresent()) { if (rs.isPresent()) {
Collections.addAll(removeSet, rs.get().split(";")); Collections.addAll(removeSet, rs.get().split(";"));
} }
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
selectSubset(spark, inputPath, outputPath, removeSet); selectSubset(spark, inputPath, outputPath, removeSet);
}); });
} }
private static void selectSubset(SparkSession spark, String inputPath, String outputPath, Set<String> removeSet) { private static void selectSubset(SparkSession spark, String inputPath, String outputPath, Set<String> removeSet) {
Dataset<Relation> relation = Utils Dataset<Relation> relation = Utils
.readPath(spark, inputPath + "/relation", Relation.class) .readPath(spark, inputPath + "/relation", Relation.class)
.filter( .filter(
(FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() (FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference()
&& !removeSet.contains(r.getRelClass())); && !removeSet.contains(r.getRelClass()));
Dataset<String> resultIds = Utils Dataset<String> resultIds = Utils
.readPath(spark, outputPath + "/publication", EoscResult.class) .readPath(spark, outputPath + "/publication", EoscResult.class)
.map((MapFunction<EoscResult, String>) p -> p.getId(), Encoders.STRING()) .map((MapFunction<EoscResult, String>) p -> p.getId(), Encoders.STRING())
.union( .union(
Utils Utils
.readPath(spark, outputPath + "/dataset", EoscResult.class) .readPath(spark, outputPath + "/dataset", EoscResult.class)
.map((MapFunction<EoscResult, String>) d -> d.getId(), Encoders.STRING())) .map((MapFunction<EoscResult, String>) d -> d.getId(), Encoders.STRING()))
.union( .union(
Utils Utils
.readPath(spark, outputPath + "/software", EoscResult.class) .readPath(spark, outputPath + "/software", EoscResult.class)
.map((MapFunction<EoscResult, String>) s -> s.getId(), Encoders.STRING())) .map((MapFunction<EoscResult, String>) s -> s.getId(), Encoders.STRING()))
.union( .union(
Utils Utils
.readPath(spark, outputPath + "/otherresearchproduct", EoscResult.class) .readPath(spark, outputPath + "/otherresearchproduct", EoscResult.class)
.map((MapFunction<EoscResult, String>) o -> o.getId(), Encoders.STRING())); .map((MapFunction<EoscResult, String>) o -> o.getId(), Encoders.STRING()));
// select result -> result relations // select result -> result relations
Dataset<Relation> relResultResult = relation Dataset<Relation> relResultResult = relation
.joinWith(resultIds, relation.col("source").equalTo(resultIds.col("value"))) .joinWith(resultIds, relation.col("source").equalTo(resultIds.col("value")))
.map((MapFunction<Tuple2<Relation, String>, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class)); .map((MapFunction<Tuple2<Relation, String>, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class));
relResultResult relResultResult
.joinWith(resultIds, relResultResult.col("target").equalTo(resultIds.col("value"))) .joinWith(resultIds, relResultResult.col("target").equalTo(resultIds.col("value")))
.map((MapFunction<Tuple2<Relation, String>, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class)) .map((MapFunction<Tuple2<Relation, String>, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class))
.write() .write()
.option("compression", "gzip") .option("compression", "gzip")
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.json(outputPath + "/relation"); .json(outputPath + "/relation");
}
}
} }

View File

@ -568,13 +568,34 @@
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg> <arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump</arg> <arg>--outputPath</arg><arg>${workingDir}/dump</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg> <arg>--removeSet</arg><arg>${removeSet}</arg>
<arg>--dumpType</arg><arg>eosc</arg>
</spark> </spark>
<ok to="dump_relation"/> <ok to="dump_relation"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="dump_relation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Select the set of relations between the results in the selected set</name>
<class>eu.dnetlib.dhp.oa.graph.dump.eosc.SparkDumpRelation</class>
<jar>dump-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/dump/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/tar/relation</arg>
</spark>
<ok to="make_archive"/>
<error to="Kill"/>
</action>
<action name="make_archive"> <action name="make_archive">
<java> <java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.MakeTar</main-class> <main-class>eu.dnetlib.dhp.oa.graph.dump.MakeTar</main-class>

View File

@ -144,7 +144,7 @@
</action> </action>
<action name="make_archive"> <action name="make_archive">
<java> <java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.MakeTar</main-class> <main-class>eu.dnetlib.dhp.oa.graph.dump.eosc.MakeTar</main-class>
<arg>--hdfsPath</arg><arg>${outputPath}</arg> <arg>--hdfsPath</arg><arg>${outputPath}</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg> <arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--sourcePath</arg><arg>${workingDir}/tar</arg> <arg>--sourcePath</arg><arg>${workingDir}/tar</arg>

View File

@ -158,7 +158,7 @@
<action name="save_community_map"> <action name="save_community_map">
<java> <java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.SaveCommunityMap</main-class> <main-class>eu.dnetlib.dhp.oa.graph.dump.eosc.SaveCommunityMap</main-class>
<arg>--outputPath</arg><arg>${workingDir}/communityMap</arg> <arg>--outputPath</arg><arg>${workingDir}/communityMap</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg> <arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg> <arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
@ -269,7 +269,7 @@
<action name="make_archive"> <action name="make_archive">
<java> <java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.MakeTar</main-class> <main-class>eu.dnetlib.dhp.oa.graph.dump.eosc.MakeTar</main-class>
<arg>--hdfsPath</arg><arg>${outputPath}</arg> <arg>--hdfsPath</arg><arg>${outputPath}</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg> <arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--sourcePath</arg><arg>${workingDir}/tar</arg> <arg>--sourcePath</arg><arg>${workingDir}/tar</arg>

View File

@ -202,7 +202,7 @@
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Prepare association result subset of project info</name> <name>Prepare association result subset of project info</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkPrepareResultProject</class> <class>eu.dnetlib.dhp.oa.graph.dump.eosc.SparkPrepareResultProject</class>
<jar>dump-${projectVersion}.jar</jar> <jar>dump-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
@ -233,7 +233,7 @@
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Extend dumped publications with information about project</name> <name>Extend dumped publications with information about project</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo</class> <class>eu.dnetlib.dhp.oa.graph.dump.eosc.SparkUpdateProjectInfo</class>
<jar>dump-${projectVersion}.jar</jar> <jar>dump-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
@ -258,7 +258,7 @@
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Extend dumped dataset with information about project</name> <name>Extend dumped dataset with information about project</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo</class> <class>eu.dnetlib.dhp.oa.graph.dump.eosc.SparkUpdateProjectInfo</class>
<jar>dump-${projectVersion}.jar</jar> <jar>dump-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
@ -283,7 +283,7 @@
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Extend dumped ORP with information about project</name> <name>Extend dumped ORP with information about project</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo</class> <class>eu.dnetlib.dhp.oa.graph.dump.eosc.SparkUpdateProjectInfo</class>
<jar>dump-${projectVersion}.jar</jar> <jar>dump-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
@ -308,7 +308,7 @@
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Extend dumped software with information about project</name> <name>Extend dumped software with information about project</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo</class> <class>eu.dnetlib.dhp.oa.graph.dump.eosc.SparkUpdateProjectInfo</class>
<jar>dump-${projectVersion}.jar</jar> <jar>dump-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}

View File

@ -88,7 +88,7 @@
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Prepare association result subset of project info</name> <name>Prepare association result subset of project info</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkPrepareResultProject</class> <class>eu.dnetlib.dhp.oa.graph.dump.eosc.SparkPrepareResultProject</class>
<jar>dump-${projectVersion}.jar</jar> <jar>dump-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}