[graph merge beta] merge datasource originalid, collectedfrom, and pid lists

This commit is contained in:
Claudio Atzori 2022-05-11 14:13:06 +02:00
parent 77bc9863e9
commit 5d3b4a9c25
4 changed files with 57 additions and 4 deletions

View File

@ -5,6 +5,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -136,7 +137,7 @@ public class MergeGraphTableSparkJob {
/** /**
* Datasources involved in the merge operation doesn't obey to the infra precedence policy, but relies on a custom * Datasources involved in the merge operation doesn't obey to the infra precedence policy, but relies on a custom
* behaviour that, given two datasources from beta and prod returns the one from prod with the highest * behaviour that, given two datasources from beta and prod returns the one from prod with the highest
* compatibility among the two. * compatibility among the two. Furthermore, the procedure merges the collectedfrom, originalId, and pid lists.
* *
* @param p datasource from PROD * @param p datasource from PROD
* @param b datasource from BETA * @param b datasource from BETA
@ -160,9 +161,37 @@ public class MergeGraphTableSparkJob {
List<Qualifier> list = Arrays.asList(dp.getOpenairecompatibility(), db.getOpenairecompatibility()); List<Qualifier> list = Arrays.asList(dp.getOpenairecompatibility(), db.getOpenairecompatibility());
dp.setOpenairecompatibility(Collections.min(list, new DatasourceCompatibilityComparator())); dp.setOpenairecompatibility(Collections.min(list, new DatasourceCompatibilityComparator()));
dp
.setCollectedfrom(
Stream
.concat(
Optional
.ofNullable(dp.getCollectedfrom())
.map(Collection::stream)
.orElse(Stream.empty()),
Optional
.ofNullable(db.getCollectedfrom())
.map(Collection::stream)
.orElse(Stream.empty()))
.distinct() // relies on KeyValue.equals
.collect(Collectors.toList()));
dp.setOriginalId(mergeLists(dp.getOriginalId(), db.getOriginalId()));
dp.setPid(mergeLists(dp.getPid(), db.getPid()));
return (P) dp; return (P) dp;
} }
private static final <T> List<T> mergeLists(final List<T>... lists) {
return Arrays
.stream(lists)
.filter(Objects::nonNull)
.flatMap(List::stream)
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.toList());
}
private static <P extends Oaf, B extends Oaf> P mergeWithPriorityToPROD(Optional<P> p, Optional<B> b) { private static <P extends Oaf, B extends Oaf> P mergeWithPriorityToPROD(Optional<P> p, Optional<B> b) {
if (b.isPresent() & !p.isPresent()) { if (b.isPresent() & !p.isPresent()) {
return (P) b.get(); return (P) b.get();

View File

@ -2,6 +2,7 @@
package eu.dnetlib.dhp.oa.graph.merge; package eu.dnetlib.dhp.oa.graph.merge;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.io.IOException; import java.io.IOException;
import java.util.Optional; import java.util.Optional;
@ -25,7 +26,23 @@ class MergeGraphTableSparkJobTest {
} }
@Test @Test
void testMergeDatasources() throws IOException { void testMerge() throws IOException {
Datasource d = MergeGraphTableSparkJob
.mergeDatasource(
d("datasource_cris.json"),
d("datasource_openaire2.0.json"));
assertEquals("10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", d.getId());
assertNotNull(d.getOriginalId());
assertEquals(2, d.getOriginalId().size());
assertNotNull(d.getCollectedfrom());
assertEquals(2, d.getCollectedfrom().size());
assertNotNull(d.getPid());
assertEquals(1, d.getPid().size());
}
@Test
void testMergeCompatibility() throws IOException {
assertEquals( assertEquals(
"openaire-cris_1.1", "openaire-cris_1.1",
MergeGraphTableSparkJob MergeGraphTableSparkJob

View File

@ -1 +1,5 @@
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire-cris_1.1" }} { "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire-cris_1.1" },
"originalId": ["eurocrisdris::1234"],
"collectedfrom": [{"key": "eurocrisdris::2b29d08e383ff4cd8a2b6b226ce37e38", "value": "Directory of Research Information System (DRIS)"}],
"pid": [{"value": "10.1010.xyx", "qualifier": {"classid": "doi"}}]
}

View File

@ -1 +1,4 @@
{ "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire2.0" }} { "id": "10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7", "openairecompatibility": { "classid": "openaire2.0" },
"originalId": ["opendoar____::1234"],
"collectedfrom": [{"key": "openaire____::47ce9e9f4fad46e732cff06419ecaabb", "value": "OpenDOAR"}]
}