Master branch updates from beta September 2023 #337

Manually merged
claudio.atzori merged 1271 commits from beta into master 2023-09-06 11:31:09 +02:00
7 changed files with 235 additions and 215 deletions
Showing only changes of commit be320ba3c1 - Show all commits

View File

@ -3,7 +3,6 @@ package eu.dnetlib.dhp.actionmanager;
import java.util.Optional;
import eu.dnetlib.dhp.common.HdfsSupport;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
@ -12,6 +11,7 @@ import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.oaf.Subject;
@ -94,6 +94,7 @@ public class Constants {
return s;
}
public static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}

View File

@ -8,12 +8,6 @@ import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.actionmanager.Constants;
import eu.dnetlib.dhp.actionmanager.bipaffiliations.model.*;
import eu.dnetlib.dhp.actionmanager.ror.GenerateRorActionSetJob;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
@ -22,17 +16,23 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.spark.sql.Dataset;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.Constants;
import eu.dnetlib.dhp.actionmanager.bipaffiliations.model.*;
import eu.dnetlib.dhp.actionmanager.ror.GenerateRorActionSetJob;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import scala.Tuple2;
@ -41,125 +41,139 @@ import scala.Tuple2;
*/
public class PrepareAffiliationRelations implements Serializable {
private static final Logger log = LoggerFactory.getLogger(PrepareAffiliationRelations.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String ID_PREFIX = "50|doi_________::";
public static final String BIP_AFFILIATIONS_CLASSID = "result:organization:bipinference";
public static final String BIP_AFFILIATIONS_CLASSNAME = "Affiliation relation inferred by BIP!";
public static final String BIP_INFERENCE_PROVENANCE = "bip:affiliation:crossref";
private static final Logger log = LoggerFactory.getLogger(PrepareAffiliationRelations.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String ID_PREFIX = "50|doi_________::";
public static final String BIP_AFFILIATIONS_CLASSID = "result:organization:bipinference";
public static final String BIP_AFFILIATIONS_CLASSNAME = "Affiliation relation inferred by BIP!";
public static final String BIP_INFERENCE_PROVENANCE = "bip:affiliation:crossref";
public static <I extends Result> void main(String[] args) throws Exception {
public static <I extends Result> void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareAffiliationRelations.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json"));
String jsonConfiguration = IOUtils
.toString(
PrepareAffiliationRelations.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Constants.isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
Boolean isSparkSessionManaged = Constants.isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("inputPath");
log.info("inputPath {}: ", inputPath);
final String inputPath = parser.get("inputPath");
log.info("inputPath {}: ", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath {}: ", outputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath {}: ", outputPath);
SparkConf conf = new SparkConf();
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Constants.removeOutputDir(spark, outputPath);
prepareAffiliationRelations(spark, inputPath, outputPath);
});
}
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Constants.removeOutputDir(spark, outputPath);
prepareAffiliationRelations(spark, inputPath, outputPath);
});
}
private static <I extends Result> void prepareAffiliationRelations(SparkSession spark, String inputPath, String outputPath) {
private static <I extends Result> void prepareAffiliationRelations(SparkSession spark, String inputPath,
String outputPath) {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
// load and parse affiliation relations from HDFS
JavaRDD<AffiliationRelationDeserializer> affiliationRelationsDeserializeRDD = sc
.textFile(inputPath)
.map(item -> OBJECT_MAPPER.readValue(item, AffiliationRelationDeserializer.class));
// load and parse affiliation relations from HDFS
JavaRDD<AffiliationRelationDeserializer> affiliationRelationsDeserializeRDD = sc
.textFile(inputPath)
.map(item -> OBJECT_MAPPER.readValue(item, AffiliationRelationDeserializer.class));
// convert affiliation to an internal representation
Dataset<AffiliationRelationModel> affiliationRelations =
spark.createDataset(
affiliationRelationsDeserializeRDD.flatMap(entry ->
entry.getMatchings().stream().flatMap(matching ->
matching.getRorId().stream().map( rorId -> new AffiliationRelationModel(
entry.getDoi(),
rorId,
matching.getConfidence()
))).collect(Collectors.toList())
.iterator())
.rdd(),
Encoders.bean(AffiliationRelationModel.class));
// convert affiliation to an internal representation
Dataset<AffiliationRelationModel> affiliationRelations = spark
.createDataset(
affiliationRelationsDeserializeRDD
.flatMap(
entry -> entry
.getMatchings()
.stream()
.flatMap(
matching -> matching
.getRorId()
.stream()
.map(
rorId -> new AffiliationRelationModel(
entry.getDoi(),
rorId,
matching.getConfidence())))
.collect(Collectors.toList())
.iterator())
.rdd(),
Encoders.bean(AffiliationRelationModel.class));
// prepare action sets for affiliation relations
affiliationRelations
.flatMap((FlatMapFunction<AffiliationRelationModel, Relation>) affRel -> {
// prepare action sets for affiliation relations
affiliationRelations
.flatMap((FlatMapFunction<AffiliationRelationModel, Relation>) affRel -> {
// DOI to OpenAIRE id
final String paperId = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", affRel.getDoi()));
// DOI to OpenAIRE id
final String paperId = ID_PREFIX
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", affRel.getDoi()));
// ROR id to OpenAIRE id
final String affId = GenerateRorActionSetJob.calculateOpenaireId(affRel.getRorId());
// ROR id to OpenAIRE id
final String affId = GenerateRorActionSetJob.calculateOpenaireId(affRel.getRorId());
Qualifier qualifier = OafMapperUtils.qualifier(
BIP_AFFILIATIONS_CLASSID,
BIP_AFFILIATIONS_CLASSNAME,
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS);
Qualifier qualifier = OafMapperUtils
.qualifier(
BIP_AFFILIATIONS_CLASSID,
BIP_AFFILIATIONS_CLASSNAME,
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS);
// format data info; setting `confidence` into relation's `trust`
DataInfo dataInfo = OafMapperUtils.dataInfo(
false,
BIP_INFERENCE_PROVENANCE,
true,
false,
qualifier,
Double.toString(affRel.getConfidence()));
// format data info; setting `confidence` into relation's `trust`
DataInfo dataInfo = OafMapperUtils
.dataInfo(
false,
BIP_INFERENCE_PROVENANCE,
true,
false,
qualifier,
Double.toString(affRel.getConfidence()));
// return bi-directional relations
return getAffiliationRelationPair(paperId, affId, dataInfo).iterator();
// return bi-directional relations
return getAffiliationRelationPair(paperId, affId, dataInfo).iterator();
}, Encoders.bean(Relation.class))
.toJavaRDD()
.map(p -> new AtomicAction(Relation.class, p))
.mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
}, Encoders.bean(Relation.class))
.toJavaRDD()
.map(p -> new AtomicAction(Relation.class, p))
.mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
}
}
private static List<Relation> getAffiliationRelationPair(String paperId, String affId, DataInfo dataInfo) {
return Arrays.asList(
OafMapperUtils.getRelation(
paperId,
affId,
ModelConstants.RESULT_ORGANIZATION,
ModelConstants.AFFILIATION,
ModelConstants.HAS_AUTHOR_INSTITUTION,
null,
dataInfo,
null),
OafMapperUtils.getRelation(
affId,
paperId,
ModelConstants.RESULT_ORGANIZATION,
ModelConstants.AFFILIATION,
ModelConstants.IS_AUTHOR_INSTITUTION_OF,
null,
dataInfo,
null)
);
}
private static List<Relation> getAffiliationRelationPair(String paperId, String affId, DataInfo dataInfo) {
return Arrays
.asList(
OafMapperUtils
.getRelation(
paperId,
affId,
ModelConstants.RESULT_ORGANIZATION,
ModelConstants.AFFILIATION,
ModelConstants.HAS_AUTHOR_INSTITUTION,
null,
dataInfo,
null),
OafMapperUtils
.getRelation(
affId,
paperId,
ModelConstants.RESULT_ORGANIZATION,
ModelConstants.AFFILIATION,
ModelConstants.IS_AUTHOR_INSTITUTION_OF,
null,
dataInfo,
null));
}
}

View File

@ -1,26 +1,27 @@
package eu.dnetlib.dhp.actionmanager.bipaffiliations.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
package eu.dnetlib.dhp.actionmanager.bipaffiliations.model;
import java.io.Serializable;
import java.util.List;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
@Data
public class AffiliationRelationDeserializer implements Serializable {
@JsonProperty("DOI")
private String doi;
@JsonProperty("Matchings")
private List<Matching> matchings;
@JsonProperty("DOI")
private String doi;
@JsonProperty("Matchings")
private List<Matching> matchings;
@Data
public static class Matching implements Serializable {
@JsonProperty("RORid")
private List<String> rorId;
@JsonProperty("Confidence")
private double confidence;
@Data
public static class Matching implements Serializable {
@JsonProperty("RORid")
private List<String> rorId;
@JsonProperty("Confidence")
private double confidence;
}
}
}

View File

@ -1,15 +1,16 @@
package eu.dnetlib.dhp.actionmanager.bipaffiliations.model;
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@AllArgsConstructor
public class AffiliationRelationModel implements Serializable {
private String doi;
private String rorId;
private double confidence;
private String doi;
private String rorId;
private double confidence;
}

View File

@ -28,15 +28,8 @@ oozie.use.system.libpath=true
spark2ExtraListeners=com.cloudera.spark.lineage.NavigatorAppListener
spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListener
# I think this should be the oozie workflow directory
# oozieWorkflowPath=/user/ilias.kanellos/workflow_example/
# The workflow application path
wfAppPath=${oozieTopWfApplicationPath}
# The following is needed as a property of a workflow
oozie.wf.application.path=${oozieTopWfApplicationPath}
inputPath=/user/schatz/affiliations/data-v3.json
outputPath=/tmp/crossref-affiliations-output-v3
inputPath=/user/schatz/affiliations/data-v3.1.json
outputPath=/tmp/crossref-affiliations-output-v3.1

View File

@ -1,4 +1,4 @@
<workflow-app name="BipFinderScore" xmlns="uri:oozie:workflow:0.5">
<workflow-app name="BipAffiliations" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
@ -84,7 +84,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>Produces the atomic action with the inferred by BIP! affiliation relations from Crossref</name>
<class>eu.dnetlib.dhp.actionmanager.bipaffiliations.PrepareAffiliationRelations/class>
<class>eu.dnetlib.dhp.actionmanager.bipaffiliations.PrepareAffiliationRelations</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.actionmanager.bipaffiliations;
import static org.junit.jupiter.api.Assertions.*;
@ -6,10 +7,6 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
@ -29,107 +26,120 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
public class PrepareAffiliationRelationsTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static SparkSession spark;
private static Path workingDir;
private static final String ID_PREFIX = "50|doi_________::";
private static final Logger log = LoggerFactory
.getLogger(PrepareAffiliationRelationsTest.class);
private static Path workingDir;
private static final String ID_PREFIX = "50|doi_________::";
private static final Logger log = LoggerFactory
.getLogger(PrepareAffiliationRelationsTest.class);
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(PrepareAffiliationRelationsTest.class.getSimpleName());
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(PrepareAffiliationRelationsTest.class.getSimpleName());
log.info("Using work dir {}", workingDir);
log.info("Using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(PrepareAffiliationRelationsTest.class.getSimpleName());
SparkConf conf = new SparkConf();
conf.setAppName(PrepareAffiliationRelationsTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(PrepareAffiliationRelationsTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
spark = SparkSession
.builder()
.appName(PrepareAffiliationRelationsTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
void testMatch() throws Exception {
@Test
void testMatch() throws Exception {
String affiliationRelationsPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json")
.getPath();
String affiliationRelationsPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json")
.getPath();
String outputPath = workingDir.toString() + "/actionSet";
String outputPath = workingDir.toString() + "/actionSet";
PrepareAffiliationRelations
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-inputPath", affiliationRelationsPath,
"-outputPath", outputPath
});
PrepareAffiliationRelations
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-inputPath", affiliationRelationsPath,
"-outputPath", outputPath
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Relation> tmp = sc
.sequenceFile(outputPath, Text.class, Text.class)
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Relation) aa.getPayload()));
JavaRDD<Relation> tmp = sc
.sequenceFile(outputPath, Text.class, Text.class)
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Relation) aa.getPayload()));
// for (Relation r : tmp.collect()) {
// System.out.println(
// r.getSource() + "\t" + r.getTarget() + "\t" + r.getRelType() + "\t" + r.getRelClass() + "\t" + r.getSubRelType() + "\t" + r.getValidationDate() + "\t" + r.getDataInfo().getTrust() + "\t" + r.getDataInfo().getInferred()
// );
// }
// count the number of relations
assertEquals(16, tmp.count());
// count the number of relations
assertEquals(16, tmp.count());
Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
dataset.createOrReplaceTempView("result");
Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
dataset.createOrReplaceTempView("result");
Dataset<Row> execVerification = spark.sql("select r.relType, r.relClass, r.source, r.target, r.dataInfo.trust from result r");
Dataset<Row> execVerification = spark
.sql("select r.relType, r.relClass, r.source, r.target, r.dataInfo.trust from result r");
// verify that we have equal number of bi-directional relations
Assertions.assertEquals(8, execVerification
.filter(
"relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION +"'")
.collectAsList()
.size());
// verify that we have equal number of bi-directional relations
Assertions
.assertEquals(
8, execVerification
.filter(
"relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'")
.collectAsList()
.size());
Assertions.assertEquals(8, execVerification
.filter(
"relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF +"'")
.collectAsList()
.size());
Assertions
.assertEquals(
8, execVerification
.filter(
"relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'")
.collectAsList()
.size());
// check confidence value of a specific relation
String sourceDOI = "10.1105/tpc.8.3.343";
// check confidence value of a specific relation
String sourceDOI = "10.1105/tpc.8.3.343";
final String sourceOpenaireId = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", sourceDOI));
final String sourceOpenaireId = ID_PREFIX
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", sourceDOI));
Assertions.assertEquals("0.7071067812", execVerification
.filter(
"source='" + sourceOpenaireId +"'")
.collectAsList().get(0).getString(4));
Assertions
.assertEquals(
"0.7071067812", execVerification
.filter(
"source='" + sourceOpenaireId + "'")
.collectAsList()
.get(0)
.getString(4));
}
}
}