[orcipPropagation] test to verify the merge for multiple enrichment from multiple sources. It covers also the check for persistency of other identifiers types
This commit is contained in:
parent
29611b2091
commit
071cc95afb
|
@ -9,11 +9,7 @@ import org.slf4j.{Logger, LoggerFactory}
|
||||||
import eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE
|
import eu.dnetlib.dhp.common.enrichment.Constants.PROPAGATION_DATA_INFO_TYPE
|
||||||
import eu.dnetlib.dhp.schema.oaf.{OafEntity, Result}
|
import eu.dnetlib.dhp.schema.oaf.{OafEntity, Result}
|
||||||
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils
|
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils
|
||||||
import org.apache.spark.api.java.function.{MapFunction, MapGroupsFunction}
|
|
||||||
import org.apache.spark.sql.expressions.Aggregator
|
|
||||||
|
|
||||||
import java.util
|
|
||||||
import java.util.Iterator
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[String], log: Logger)
|
abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[String], log: Logger)
|
||||||
|
@ -83,7 +79,8 @@ abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[Str
|
||||||
.mapGroups((k, it) => {
|
.mapGroups((k, it) => {
|
||||||
val p: Result = it.next
|
val p: Result = it.next
|
||||||
|
|
||||||
it.foldLeft(p.getAuthor)((x,r) => MergeUtils.mergeAuthors(x, r.getAuthor,0))
|
p.setAuthor(it.foldLeft(p.getAuthor)((x,r) => MergeUtils.mergeAuthors(x, r.getAuthor,0)))
|
||||||
|
|
||||||
p
|
p
|
||||||
|
|
||||||
})(Encoders.bean(resultClazz))
|
})(Encoders.bean(resultClazz))
|
||||||
|
@ -94,7 +91,6 @@ abstract class SparkEnrichWithOrcidAuthors(propertyPath: String, args: Array[Str
|
||||||
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
// def generateGraph(spark: SparkSession, graphPath: String, workingDir: String, targetPath: String): Unit
|
|
||||||
|
|
||||||
def createTemporaryData(spark: SparkSession, graphPath: String, orcidPath: String, targetPath: String): Unit
|
def createTemporaryData(spark: SparkSession, graphPath: String, orcidPath: String, targetPath: String): Unit
|
||||||
|
|
||||||
|
|
|
@ -6,14 +6,9 @@ import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
|
|
||||||
import eu.dnetlib.dhp.utils.ORCIDAuthorEnricherResult;
|
|
||||||
import org.apache.spark.api.java.function.FilterFunction;
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
import org.apache.spark.api.java.function.ForeachFunction;
|
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.api.java.function.MapGroupsFunction;
|
|
||||||
import org.apache.spark.sql.*;
|
import org.apache.spark.sql.*;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -24,7 +19,7 @@ import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
import eu.dnetlib.dhp.utils.OrcidAuthor;
|
import eu.dnetlib.dhp.utils.OrcidAuthor;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
import static org.apache.spark.sql.functions.*;
|
|
||||||
|
|
||||||
public class SparkPropagateOrcidAuthor extends SparkEnrichWithOrcidAuthors {
|
public class SparkPropagateOrcidAuthor extends SparkEnrichWithOrcidAuthors {
|
||||||
private static final Logger log = LoggerFactory.getLogger(SparkPropagateOrcidAuthor.class);
|
private static final Logger log = LoggerFactory.getLogger(SparkPropagateOrcidAuthor.class);
|
||||||
|
@ -89,69 +84,6 @@ public class SparkPropagateOrcidAuthor extends SparkEnrichWithOrcidAuthors {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// private <T extends Result> void processAndMerge(
|
|
||||||
// SparkSession spark,
|
|
||||||
// String inputPath,
|
|
||||||
// String outputPath,
|
|
||||||
// Class<T> clazz,
|
|
||||||
// Encoder<T> encoder) {
|
|
||||||
//
|
|
||||||
// spark.read()
|
|
||||||
// .schema(Encoders.bean(clazz).schema())
|
|
||||||
// .json(inputPath)
|
|
||||||
// .as(encoder)
|
|
||||||
// .groupByKey((MapFunction<T, String>) OafEntity::getId, Encoders.STRING())
|
|
||||||
// .mapGroups((MapGroupsFunction<String, T, T>) (k, it) -> {
|
|
||||||
// T p = it.next();
|
|
||||||
// it.forEachRemaining(r -> p.setAuthor(MergeUtils.mergeAuthors(p.getAuthor(),r.getAuthor(),0)));
|
|
||||||
// return p;
|
|
||||||
// }, encoder)
|
|
||||||
// .write()
|
|
||||||
// .mode(SaveMode.Overwrite)
|
|
||||||
// .option("compression", "gzip")
|
|
||||||
// .json(outputPath);
|
|
||||||
// }
|
|
||||||
// @Override
|
|
||||||
//public void generateGraph(SparkSession spark, String graphPath, String workingDir, String targetPath){
|
|
||||||
//
|
|
||||||
// ModelSupport.entityTypes.keySet().stream().filter(ModelSupport::isResult)
|
|
||||||
// .forEach(e -> {
|
|
||||||
// Class resultClazz = ModelSupport.entityTypes.get(e);
|
|
||||||
// Dataset<Row> matched = spark
|
|
||||||
// .read()
|
|
||||||
// .schema(Encoders.bean(ORCIDAuthorEnricherResult.class).schema())
|
|
||||||
// .parquet(workingDir + "/" + e.name() + "_matched")
|
|
||||||
// .selectExpr("id","enriched_author");
|
|
||||||
// Dataset<Row> result = spark.read().schema(Encoders.bean(resultClazz).schema())
|
|
||||||
// .json(graphPath + "/" + e.name());
|
|
||||||
//
|
|
||||||
//
|
|
||||||
//
|
|
||||||
// result.join(matched, result.col("id").equalTo(matched.col("id")), "left")
|
|
||||||
// .withColumn(
|
|
||||||
// "author",
|
|
||||||
// when(size(col("enriched_author")).gt(0), col("enriched_author"))
|
|
||||||
// .otherwise(col("author"))
|
|
||||||
// )
|
|
||||||
// .drop(matched.col("id"))
|
|
||||||
// .drop("enriched_author")
|
|
||||||
// .write()
|
|
||||||
// .mode(SaveMode.Overwrite)
|
|
||||||
// .option("compression", "gzip")
|
|
||||||
// .json(workingDir + "/" + e.name() + "/_tobemerged")
|
|
||||||
// ;
|
|
||||||
// processAndMerge(
|
|
||||||
// spark,
|
|
||||||
// workingDir + "/" + e.name() + "/_tobemerged",
|
|
||||||
// targetPath + "/" + e.name(),
|
|
||||||
// resultClazz,
|
|
||||||
// Encoders.bean(resultClazz)
|
|
||||||
// );
|
|
||||||
// });
|
|
||||||
//
|
|
||||||
//
|
|
||||||
// }
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void createTemporaryData(SparkSession spark, String graphPath, String orcidPath, String targetPath) {
|
public void createTemporaryData(SparkSession spark, String graphPath, String orcidPath, String targetPath) {
|
||||||
Dataset<Row> supplements = spark
|
Dataset<Row> supplements = spark
|
||||||
|
@ -163,7 +95,6 @@ public class SparkPropagateOrcidAuthor extends SparkEnrichWithOrcidAuthors {
|
||||||
ModelConstants.IS_SUPPLEMENTED_BY + "')")
|
ModelConstants.IS_SUPPLEMENTED_BY + "')")
|
||||||
.selectExpr("source as id", "target");
|
.selectExpr("source as id", "target");
|
||||||
|
|
||||||
|
|
||||||
Dataset<Row> result = spark
|
Dataset<Row> result = spark
|
||||||
.read()
|
.read()
|
||||||
.schema(Encoders.bean(Result.class).schema())
|
.schema(Encoders.bean(Result.class).schema())
|
||||||
|
|
|
@ -184,15 +184,16 @@ public class OrcidPropagationJobTest {
|
||||||
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/sample/twoupdates")
|
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/sample/twoupdates")
|
||||||
.getPath(),
|
.getPath(),
|
||||||
"-targetPath",
|
"-targetPath",
|
||||||
workingDir.toString() + "/dataset",
|
workingDir.toString() + "/graph",
|
||||||
"-orcidPath", "",
|
"-orcidPath", "",
|
||||||
"-workingDir", workingDir.toString()
|
"-workingDir", workingDir.toString(),
|
||||||
|
"-matchingSource", "propagation"
|
||||||
});
|
});
|
||||||
|
|
||||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
JavaRDD<Dataset> tmp = sc
|
JavaRDD<Dataset> tmp = sc
|
||||||
.textFile(workingDir.toString() + "/dataset")
|
.textFile(workingDir.toString() + "/graph/dataset")
|
||||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||||
|
|
||||||
Assertions.assertEquals(10, tmp.count());
|
Assertions.assertEquals(10, tmp.count());
|
||||||
|
@ -214,10 +215,10 @@ public class OrcidPropagationJobTest {
|
||||||
|
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
1, propagatedAuthors.filter("name = 'Marc' and surname = 'Schmidtmann'").count());
|
1, propagatedAuthors.filter("name = 'Nicole' and surname = 'Jung'").count());
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
1, propagatedAuthors.filter("name = 'Ruediger' and surname = 'Beckhaus'").count());
|
1, propagatedAuthors.filter("name = 'Camilo José' and surname = 'Cela'").count());
|
||||||
|
|
||||||
query = "select id, MyT.name name, MyT.surname surname, MyP.value pid ,MyP.qualifier.classid pidType "
|
query = "select id, MyT.name name, MyT.surname surname, MyP.value pid ,MyP.qualifier.classid pidType "
|
||||||
+ "from dataset "
|
+ "from dataset "
|
||||||
|
@ -228,13 +229,27 @@ public class OrcidPropagationJobTest {
|
||||||
|
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
2, authorsExplodedPids.filter("name = 'Marc' and surname = 'Schmidtmann'").count());
|
3, authorsExplodedPids.filter("name = 'Camilo José' and surname = 'Cela'").count());
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
1,
|
1,
|
||||||
authorsExplodedPids
|
authorsExplodedPids
|
||||||
.filter(
|
.filter(
|
||||||
"name = 'Marc' and surname = 'Schmidtmann' and pidType = 'MAG Identifier'")
|
"name = 'Camilo José' and surname = 'Cela' and pidType = 'MAG Identifier'")
|
||||||
|
.count());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1,
|
||||||
|
authorsExplodedPids
|
||||||
|
.filter(
|
||||||
|
"name = 'Camilo José' and surname = 'Cela' and pidType = 'orcid'")
|
||||||
|
.count());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1,
|
||||||
|
authorsExplodedPids
|
||||||
|
.filter(
|
||||||
|
"name = 'Camilo José' and surname = 'Cela' and pidType = 'orcid_pending'")
|
||||||
.count());
|
.count());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Binary file not shown.
File diff suppressed because one or more lines are too long
Binary file not shown.
|
@ -0,0 +1,9 @@
|
||||||
|
{"subRelType": "supplement", "relClass": "IsSupplementedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.7731"}, "target": "50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e", "lastupdatetimestamp": 1694431186898, "relType": "resultOrganization", "source": "50|dedup_wf_001::36bcfaa1494c849547a346da688ade24", "collectedfrom": [], "validated": false, "properties": []}
|
||||||
|
{"subRelType": "supplement", "relClass": "IsSupplementTo", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.7731"}, "source": "50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e", "lastupdatetimestamp": 1694431186898, "relType": "resultOrganization", "target": "50|dedup_wf_001::36bcfaa1494c849547a346da688ade24", "collectedfrom": [], "validated": false, "properties": []}
|
||||||
|
{"subRelType": "supplement", "relClass": "IsSupplementedBy", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.7731"}, "target": "50|od______3989::0f89464c4ac4c398fe0c71433b175a62", "lastupdatetimestamp": 1694431186898, "relType": "resultOrganization", "source": "50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e", "collectedfrom": [], "validated": false, "properties": []}
|
||||||
|
{"subRelType": "supplement", "relClass": "IsSupplementTo", "dataInfo": {"provenanceaction": {"classid": "iis", "classname": "Inferred by OpenAIRE", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": true, "inferenceprovenance": "iis::document_affiliations", "invisible": false, "trust": "0.7731"}, "source": "50|od______3989::0f89464c4ac4c398fe0c71433b175a62", "lastupdatetimestamp": 1694431186898, "relType": "resultOrganization", "target": "50|dedup_wf_001::95b033c0c3961f6a1cdcd41a99a9632e", "collectedfrom": [], "validated": false, "properties": []}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue