Merge branch 'master' into stable_ids

This commit is contained in:
Claudio Atzori 2020-10-16 12:06:23 +02:00
commit c188868450
24 changed files with 364 additions and 56 deletions

View File

@ -98,6 +98,17 @@
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib</groupId>
<artifactId>dnet-pace-core</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-schemas</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.oa.dedup;
package eu.dnetlib.dhp.oa.merge;
import java.text.Normalizer;
import java.util.*;

View File

@ -1,7 +1,6 @@
package eu.dnetlib.dhp.oa.dedup;
import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@ -18,6 +17,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.oa.merge.AuthorMerger;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;

View File

@ -14,6 +14,7 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import eu.dnetlib.dhp.oa.merge.AuthorMerger;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2;

View File

@ -1,17 +1,13 @@
package eu.dnetlib.dhp.doiboost
import eu.dnetlib.dhp.schema.oaf.Project
import eu.dnetlib.dhp.schema.oaf.Publication
import org.apache.spark.SparkContext
import org.apache.spark.sql.functions.{col, sum}
import org.apache.hadoop.io.Text
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import org.codehaus.jackson.map.ObjectMapper
import org.json4s.DefaultFormats
import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
import org.json4s
import org.json4s.DefaultFormats
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._
import scala.::
import scala.collection.JavaConverters._
class QueryTest {
@ -27,19 +23,32 @@ class QueryTest {
}
def hasInstanceWithUrl(p:Publication):Boolean = {
val c = p.getInstance.asScala.map(i => i.getUrl!= null && !i.getUrl.isEmpty).size
!(!p.getInstance.isEmpty && c == p.getInstance().size)
}
def hasNullAccessRights(p:Publication):Boolean = {
val c = p.getInstance.asScala.map(i => i.getAccessright!= null && i.getAccessright.getClassname.nonEmpty).size
!p.getInstance.isEmpty && c == p.getInstance().size()
}
def myQuery(spark:SparkSession, sc:SparkContext): Unit = {
implicit val mapEncoderPub: Encoder[Project] = Encoders.kryo[Project]
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
val mapper = new ObjectMapper()
mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
// val ds:Dataset[Project] = spark.createDataset(sc.sequenceFile("", classOf[Text], classOf[Text])
// .map(_._2.toString)
// .map(s => new ObjectMapper().readValue(s, classOf[Project])))
//
// ds.write.saveAsTable()
val ds:Dataset[Publication] = spark.read.load("/tmp/p").as[Publication]
ds.filter(p =>p.getBestaccessright!= null && p.getBestaccessright.getClassname.nonEmpty).count()
}
}

View File

@ -83,13 +83,6 @@
<artifactId>dhp-schemas</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-dedup-openaire</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>

View File

@ -0,0 +1,97 @@
package eu.dnetlib.dhp.oa.graph.merge;
import java.util.Comparator;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
public class DatasourceCompatibilityComparator implements Comparator<Qualifier> {
@Override
public int compare(Qualifier left, Qualifier right) {
String lClass = left.getClassid();
String rClass = right.getClassid();
if (lClass.equals(rClass))
return 0;
if (lClass.equals("openaire-cris_1.1"))
return -1;
if (rClass.equals("openaire-cris_1.1"))
return 1;
if (lClass.equals("openaire4.0"))
return -1;
if (rClass.equals("openaire4.0"))
return 1;
if (lClass.equals("driver-openaire2.0"))
return -1;
if (rClass.equals("driver-openaire2.0"))
return 1;
if (lClass.equals("driver"))
return -1;
if (rClass.equals("driver"))
return 1;
if (lClass.equals("openaire2.0"))
return -1;
if (rClass.equals("openaire2.0"))
return 1;
if (lClass.equals("openaire3.0"))
return -1;
if (rClass.equals("openaire3.0"))
return 1;
if (lClass.equals("openaire2.0_data"))
return -1;
if (rClass.equals("openaire2.0_data"))
return 1;
if (lClass.equals("native"))
return -1;
if (rClass.equals("native"))
return 1;
if (lClass.equals("hostedBy"))
return -1;
if (rClass.equals("hostedBy"))
return 1;
if (lClass.equals("notCompatible"))
return -1;
if (rClass.equals("notCompatible"))
return 1;
if (lClass.equals("UNKNOWN"))
return -1;
if (rClass.equals("UNKNOWN"))
return 1;
// Else (but unlikely), lexicographical ordering will do.
return lClass.compareTo(rClass);
}
/*
* CASE WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility):: TEXT) @> ARRAY
* ['openaire-cris_1.1']) THEN 'openaire-cris_1.1@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT
* COALESCE (a.compatibility_override, a.compatibility):: TEXT) @> ARRAY ['openaire4.0']) THEN
* 'openaire4.0@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override,
* a.compatibility):: TEXT) @> ARRAY ['driver', 'openaire2.0']) THEN
* 'driver-openaire2.0@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE
* (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['driver']) THEN
* 'driver@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override,
* a.compatibility) :: TEXT) @> ARRAY ['openaire2.0']) THEN 'openaire2.0@@@dnet:datasourceCompatibilityLevel' WHEN
* (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['openaire3.0']) THEN
* 'openaire3.0@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override,
* a.compatibility) :: TEXT) @> ARRAY ['openaire2.0_data']) THEN
* 'openaire2.0_data@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE
* (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['native']) THEN
* 'native@@@dnet:datasourceCompatibilityLevel' WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override,
* a.compatibility) :: TEXT) @> ARRAY ['hostedBy']) THEN 'hostedBy@@@dnet:datasourceCompatibilityLevel' WHEN
* (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['notCompatible'])
* THEN 'notCompatible@@@dnet:datasourceCompatibilityLevel' ELSE 'UNKNOWN@@@dnet:datasourceCompatibilityLevel' END
*/
}

View File

@ -3,8 +3,9 @@ package eu.dnetlib.dhp.oa.graph.merge;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Objects;
import java.util.Optional;
import java.util.*;
import javax.xml.crypto.Data;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
@ -14,6 +15,7 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,6 +41,14 @@ public class MergeGraphSparkJob {
private static final String PRIORITY_DEFAULT = "BETA"; // BETA | PROD
private static final Datasource DATASOURCE = new Datasource();
static {
Qualifier compatibility = new Qualifier();
compatibility.setClassid("UNKNOWN");
DATASOURCE.setOpenairecompatibility(compatibility);
}
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
@ -104,6 +114,10 @@ public class MergeGraphSparkJob {
.map((MapFunction<Tuple2<Tuple2<String, P>, Tuple2<String, B>>, P>) value -> {
Optional<P> p = Optional.ofNullable(value._1()).map(Tuple2::_2);
Optional<B> b = Optional.ofNullable(value._2()).map(Tuple2::_2);
if (p.orElse((P) b.orElse((B) DATASOURCE)) instanceof Datasource) {
return mergeDatasource(p, b);
}
switch (priority) {
default:
case "BETA":
@ -119,6 +133,36 @@ public class MergeGraphSparkJob {
.json(outputPath);
}
/**
* Datasources involved in the merge operation doesn't obey to the infra precedence policy, but relies on a custom
* behaviour that, given two datasources from beta and prod returns the one from prod with the highest
* compatibility among the two.
*
* @param p datasource from PROD
* @param b datasource from BETA
* @param <P> Datasource class type from PROD
* @param <B> Datasource class type from BETA
* @return the datasource from PROD with the highest compatibility level.
*/
protected static <P extends Oaf, B extends Oaf> P mergeDatasource(Optional<P> p, Optional<B> b) {
if (p.isPresent() & !b.isPresent()) {
return p.get();
}
if (b.isPresent() & !p.isPresent()) {
return (P) b.get();
}
if (!b.isPresent() & !p.isPresent()) {
return null; // unlikely, at least one should be produced by the join operation
}
Datasource dp = (Datasource) p.get();
Datasource db = (Datasource) b.get();
List<Qualifier> list = Arrays.asList(dp.getOpenairecompatibility(), db.getOpenairecompatibility());
dp.setOpenairecompatibility(Collections.min(list, new DatasourceCompatibilityComparator()));
return (P) dp;
}
private static <P extends Oaf, B extends Oaf> P mergeWithPriorityToPROD(Optional<P> p, Optional<B> b) {
if (b.isPresent() & !p.isPresent()) {
return (P) b.get();

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.sx.ebi
import eu.dnetlib.dhp.oa.dedup.AuthorMerger
import eu.dnetlib.dhp.oa.merge.AuthorMerger
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown}
import org.apache.spark.sql.{Encoder, Encoders}

View File

@ -0,0 +1,84 @@
package eu.dnetlib.dhp.oa.graph.merge;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.IOException;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.oaf.Datasource;
public class MergeGraphSparkJobTest {
private ObjectMapper mapper;
@BeforeEach
public void setUp() {
mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
@Test
public void testMergeDatasources() throws IOException {
assertEquals(
"openaire-cris_1.1",
MergeGraphSparkJob
.mergeDatasource(
d("datasource_cris.json"),
d("datasource_UNKNOWN.json"))
.getOpenairecompatibility()
.getClassid());
assertEquals(
"openaire-cris_1.1",
MergeGraphSparkJob
.mergeDatasource(
d("datasource_UNKNOWN.json"),
d("datasource_cris.json"))
.getOpenairecompatibility()
.getClassid());
assertEquals(
"driver-openaire2.0",
MergeGraphSparkJob
.mergeDatasource(
d("datasource_native.json"),
d("datasource_driver-openaire2.0.json"))
.getOpenairecompatibility()
.getClassid());
assertEquals(
"driver-openaire2.0",
MergeGraphSparkJob
.mergeDatasource(
d("datasource_driver-openaire2.0.json"),
d("datasource_native.json"))
.getOpenairecompatibility()
.getClassid());
assertEquals(
"openaire4.0",
MergeGraphSparkJob
.mergeDatasource(
d("datasource_notCompatible.json"),
d("datasource_openaire4.0.json"))
.getOpenairecompatibility()
.getClassid());
assertEquals(
"notCompatible",
MergeGraphSparkJob
.mergeDatasource(
d("datasource_notCompatible.json"),
d("datasource_UNKNOWN.json"))
.getOpenairecompatibility()
.getClassid());
}
private Optional<Datasource> d(String file) throws IOException {
String json = IOUtils.toString(getClass().getResourceAsStream(file));
return Optional.of(mapper.readValue(json, Datasource.class));
}
}

View File

@ -0,0 +1,54 @@
package eu.dnetlib.dhp.sx.graph
import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature}
import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication
import eu.dnetlib.dhp.sx.ebi.EBIAggregator
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import scala.io.Source
class SparkScholexplorerAggregationTest {
@Test
def testFunderRelationshipsMapping(): Unit = {
val publications = Source.fromInputStream(getClass.getResourceAsStream("publication.json")).mkString
var s: List[DLIPublication] = List[DLIPublication]()
val m: ObjectMapper = new ObjectMapper()
m.enable(SerializationFeature.INDENT_OUTPUT)
for (line <- publications.lines) {
s = m.readValue(line, classOf[DLIPublication]) :: s
}
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
val spark: SparkSession = SparkSession.builder().appName("Test").master("local[*]").getOrCreate()
val ds: Dataset[DLIPublication] = spark.createDataset(spark.sparkContext.parallelize(s)).as[DLIPublication]
val unique = ds.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder))
.groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDLIPublicationAggregator().toColumn)
.map(p => p._2)
val uniquePubs: DLIPublication = unique.first()
s.foreach(pp => assertFalse(pp.getAuthor.isEmpty))
assertNotNull(uniquePubs.getAuthor)
assertFalse(uniquePubs.getAuthor.isEmpty)
}
}

View File

@ -0,0 +1 @@
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "UNKNOWN" }}

View File

@ -0,0 +1 @@
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire-cris_1.1" }}

View File

@ -0,0 +1 @@
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "driver-openaire2.0" }}

View File

@ -0,0 +1 @@
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "hostedBy" }}

View File

@ -0,0 +1 @@
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "native" }}

View File

@ -0,0 +1 @@
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "notCompatible" }}

View File

@ -0,0 +1 @@
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire2.0" }}

View File

@ -0,0 +1 @@
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire2.0_data" }}

View File

@ -0,0 +1 @@
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire3.0" }}

View File

@ -0,0 +1 @@
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire4.0" }}

View File

@ -47,6 +47,7 @@ object DLIToOAF {
"References" -> ("isRelatedTo", "relationship"),
"IsRelatedTo" -> ("isRelatedTo", "relationship"),
"IsSupplementedBy" -> ("isSupplementedBy", "supplement"),
"Documents"-> ("isRelatedTo", "relationship"),
"Cites" -> ("cites", "citation"),
"Unknown" -> ("isRelatedTo", "relationship"),
"IsSourceOf" -> ("isRelatedTo", "relationship"),
@ -83,7 +84,7 @@ object DLIToOAF {
val rel_inverse: Map[String, String] = Map(
"isRelatedTo" -> "isRelatedTo",
"IsSupplementedBy" -> "isSupplementTo",
"isSupplementedBy" -> "isSupplementTo",
"cites" -> "IsCitedBy",
"IsCitedBy" -> "cites",
"reviews" -> "IsReviewedBy"
@ -273,29 +274,18 @@ object DLIToOAF {
}
// def convertDLIRelation(r: DLIRelation): Relation = {
//
// val result = new Relation
// if (!relationTypeMapping.contains(r.getRelType))
// return null
//
// if (r.getProperties == null || r.getProperties.size() == 0 || (r.getProperties.size() == 1 && r.getProperties.get(0) == null))
// return null
// val t = relationTypeMapping.get(r.getRelType)
//
// result.setRelType("resultResult")
// result.setRelClass(t.get._1)
// result.setSubRelType(t.get._2)
// result.setCollectedfrom(r.getProperties.asScala.map(c => collectedFromMap.getOrElse(c.getKey, null)).filter(p => p != null).asJava)
// result.setSource(generateId(r.getSource))
// result.setTarget(generateId(r.getTarget))
//
// if (result.getSource.equals(result.getTarget))
// return null
// result.setDataInfo(generateDataInfo())
//
// result
// }
def convertDLIRelation(r: Relation): Relation = {
val rt = r.getRelType
if (!relationTypeMapping.contains(rt))
return null
r.setRelType("resultResult")
r.setRelClass(relationTypeMapping(rt)._1)
r.setSubRelType(relationTypeMapping(rt)._2)
r.setSource(generateId(r.getSource))
r.setTarget(generateId(r.getTarget))
r
}
def convertDLIDatasetTOOAF(d: DLIDataset): Dataset = {

View File

@ -15,11 +15,13 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.codehaus.jackson.map.ObjectMapper
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConverters._
object SparkExportContentForOpenAire {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkExportContentForOpenAire.getClass.getResourceAsStream("input_export_content_parameters.json")))
@ -42,9 +44,11 @@ object SparkExportContentForOpenAire {
import spark.implicits._
val dsRel = spark.read.load(s"$workingPath/relation_b").as[Relation]
dsRel.filter(r => r.getDataInfo==null || r.getDataInfo.getDeletedbyinference ==false).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationDS")
dsRel.filter(r => r.getDataInfo==null || r.getDataInfo.getDeletedbyinference ==false)
.map(DLIToOAF.convertDLIRelation)
.filter(r => r!= null)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationDS")
val dsPubs = spark.read.load(s"$workingPath/publication").as[DLIPublication]

View File

@ -5,9 +5,7 @@ import java.time.format.DateTimeFormatter
import eu.dnetlib.dhp.schema.oaf.Relation
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
import org.junit.jupiter.api.Test
@ -23,6 +21,19 @@ class ExportDLITOOAFTest {
}
@Test
def testMappingRele():Unit = {
val r:Relation = new Relation
r.setSource("60|fbff1d424e045eecf24151a5fe3aa738")
r.setTarget("50|dedup_wf_001::ec409f09e63347d4e834087fe1483877")
val r1 =DLIToOAF.convertDLIRelation(r)
println(r1.getSource, r1.getTarget)
}
@Test
def testPublicationMapping():Unit = {