Merge pull request 'mergeResultsOfDifferentTypes only when checkDelegatedAuthority is true' (#478) from merge_by_id_fix into main
Reviewed-on: #478
This commit is contained in:
commit
24b5dc97c6
|
@ -30,6 +30,7 @@ import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
|
|
||||||
public class MergeUtils {
|
public class MergeUtils {
|
||||||
|
|
||||||
public static <T extends Oaf> T mergeById(String s, Iterator<T> oafEntityIterator) {
|
public static <T extends Oaf> T mergeById(String s, Iterator<T> oafEntityIterator) {
|
||||||
return mergeGroup(s, oafEntityIterator, true);
|
return mergeGroup(s, oafEntityIterator, true);
|
||||||
}
|
}
|
||||||
|
@ -88,7 +89,7 @@ public class MergeUtils {
|
||||||
private static Oaf mergeEntities(Oaf left, Oaf right, boolean checkDelegatedAuthority) {
|
private static Oaf mergeEntities(Oaf left, Oaf right, boolean checkDelegatedAuthority) {
|
||||||
|
|
||||||
if (sameClass(left, right, Result.class)) {
|
if (sameClass(left, right, Result.class)) {
|
||||||
if (!left.getClass().equals(right.getClass()) || checkDelegatedAuthority) {
|
if (checkDelegatedAuthority) {
|
||||||
return mergeResultsOfDifferentTypes((Result) left, (Result) right);
|
return mergeResultsOfDifferentTypes((Result) left, (Result) right);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,210 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2024.
|
||||||
|
* SPDX-FileCopyrightText: © 2023 Consiglio Nazionale delle Ricerche
|
||||||
|
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||||
|
*/
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.promote;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.common.FunctionalInterfaceSupport.*;
|
||||||
|
import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass;
|
||||||
|
import static org.apache.spark.sql.functions.*;
|
||||||
|
import static org.junit.jupiter.api.Assertions.*;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.DirectoryStream;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.function.BiFunction;
|
||||||
|
import java.util.function.Function;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.sql.*;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
|
|
||||||
|
public class PromoteResultWithMeasuresTest {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(PromoteResultWithMeasuresTest.class);
|
||||||
|
|
||||||
|
private static SparkSession spark;
|
||||||
|
|
||||||
|
private static Path tempDir;
|
||||||
|
|
||||||
|
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void beforeAll() throws IOException {
|
||||||
|
tempDir = Files.createTempDirectory(PromoteResultWithMeasuresTest.class.getSimpleName());
|
||||||
|
log.info("using work dir {}", tempDir);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
conf.setMaster("local[*]");
|
||||||
|
conf.setAppName(PromoteResultWithMeasuresTest.class.getSimpleName());
|
||||||
|
conf.set("spark.driver.host", "localhost");
|
||||||
|
|
||||||
|
conf.set("hive.metastore.local", "true");
|
||||||
|
conf.set("spark.ui.enabled", "false");
|
||||||
|
|
||||||
|
conf.set("spark.sql.warehouse.dir", tempDir.toString());
|
||||||
|
conf.set("hive.metastore.warehouse.dir", tempDir.resolve("warehouse").toString());
|
||||||
|
|
||||||
|
spark = SparkSession.builder().config(conf).getOrCreate();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
public static void afterAll() throws IOException {
|
||||||
|
spark.stop();
|
||||||
|
FileUtils.deleteDirectory(tempDir.toFile());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testPromoteResultWithMeasures_job() throws Exception {
|
||||||
|
|
||||||
|
final String inputGraphTablePath = getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/actionmanager/promote/measures/graph")
|
||||||
|
.getPath();
|
||||||
|
|
||||||
|
final String inputActionPayloadPath = getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/actionmanager/promote/measures/actionPayloads")
|
||||||
|
.getPath();
|
||||||
|
|
||||||
|
final String actionPayloadsPath = tempDir.resolve("actionPayloads").toString();
|
||||||
|
|
||||||
|
spark
|
||||||
|
.read()
|
||||||
|
.text(inputActionPayloadPath)
|
||||||
|
.withColumn("payload", col("value"))
|
||||||
|
.select("payload")
|
||||||
|
.write()
|
||||||
|
.parquet(actionPayloadsPath);
|
||||||
|
|
||||||
|
final Path outputGraphTablePath = tempDir.resolve("outputGraphTablePath");
|
||||||
|
|
||||||
|
PromoteActionPayloadForGraphTableJob
|
||||||
|
.main(new String[] {
|
||||||
|
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||||
|
"--graphTableClassName", Publication.class.getCanonicalName(),
|
||||||
|
"--inputGraphTablePath", inputGraphTablePath,
|
||||||
|
"--inputActionPayloadPath", actionPayloadsPath,
|
||||||
|
"--actionPayloadClassName", Result.class.getCanonicalName(),
|
||||||
|
"--outputGraphTablePath", outputGraphTablePath.toString(),
|
||||||
|
"--mergeAndGetStrategy", MergeAndGet.Strategy.MERGE_FROM_AND_GET.toString(),
|
||||||
|
"--promoteActionStrategy", PromoteAction.Strategy.ENRICH.toString(),
|
||||||
|
"--shouldGroupById", "true"
|
||||||
|
});
|
||||||
|
|
||||||
|
assertFalse(isDirEmpty(outputGraphTablePath));
|
||||||
|
|
||||||
|
final Encoder<Publication> pubEncoder = Encoders.bean(Publication.class);
|
||||||
|
List<Publication> results = spark
|
||||||
|
.read()
|
||||||
|
.schema(pubEncoder.schema())
|
||||||
|
.json(outputGraphTablePath.toString())
|
||||||
|
.as(pubEncoder)
|
||||||
|
.collectAsList();
|
||||||
|
|
||||||
|
verify(results);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testPromoteResultWithMeasures_internal() throws JsonProcessingException {
|
||||||
|
|
||||||
|
Dataset<Publication> rowDS = spark
|
||||||
|
.read()
|
||||||
|
.schema(Encoders.bean(Publication.class).schema())
|
||||||
|
.json("src/test/resources/eu/dnetlib/dhp/actionmanager/promote/measures/graph")
|
||||||
|
.as(Encoders.bean(Publication.class));
|
||||||
|
|
||||||
|
Dataset<Result> actionPayloadDS = spark
|
||||||
|
.read()
|
||||||
|
.schema(Encoders.bean(Result.class).schema())
|
||||||
|
.json("src/test/resources/eu/dnetlib/dhp/actionmanager/promote/measures/actionPayloads")
|
||||||
|
.as(Encoders.bean(Result.class));
|
||||||
|
|
||||||
|
final MergeAndGet.Strategy mergeFromAndGet = MergeAndGet.Strategy.MERGE_FROM_AND_GET;
|
||||||
|
|
||||||
|
final SerializableSupplier<Function<Publication, String>> rowIdFn = ModelSupport::idFn;
|
||||||
|
final SerializableSupplier<BiFunction<Publication, Result, Publication>> mergeAndGetFn = MergeAndGet
|
||||||
|
.functionFor(mergeFromAndGet);
|
||||||
|
final SerializableSupplier<Publication> zeroFn = () -> Publication.class
|
||||||
|
.cast(new eu.dnetlib.dhp.schema.oaf.Publication());
|
||||||
|
final SerializableSupplier<Function<Publication, Boolean>> isNotZeroFn = PromoteResultWithMeasuresTest::isNotZeroFnUsingIdOrSourceAndTarget;
|
||||||
|
|
||||||
|
Dataset<Publication> joinedResults = PromoteActionPayloadFunctions
|
||||||
|
.joinGraphTableWithActionPayloadAndMerge(
|
||||||
|
rowDS,
|
||||||
|
actionPayloadDS,
|
||||||
|
rowIdFn,
|
||||||
|
ModelSupport::idFn,
|
||||||
|
mergeAndGetFn,
|
||||||
|
PromoteAction.Strategy.ENRICH,
|
||||||
|
Publication.class,
|
||||||
|
Result.class);
|
||||||
|
|
||||||
|
SerializableSupplier<BiFunction<Publication, Publication, Publication>> mergeRowsAndGetFn = MergeAndGet
|
||||||
|
.functionFor(mergeFromAndGet);
|
||||||
|
|
||||||
|
Dataset<Publication> mergedResults = PromoteActionPayloadFunctions
|
||||||
|
.groupGraphTableByIdAndMerge(
|
||||||
|
joinedResults, rowIdFn, mergeRowsAndGetFn, zeroFn, isNotZeroFn, Publication.class);
|
||||||
|
|
||||||
|
verify(mergedResults.collectAsList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void verify(List<Publication> results) throws JsonProcessingException {
|
||||||
|
assertNotNull(results);
|
||||||
|
assertEquals(1, results.size());
|
||||||
|
|
||||||
|
Result r = results.get(0);
|
||||||
|
|
||||||
|
log.info(OBJECT_MAPPER.writeValueAsString(r));
|
||||||
|
|
||||||
|
assertNotNull(r.getMeasures());
|
||||||
|
assertFalse(r.getMeasures().isEmpty());
|
||||||
|
assertTrue(
|
||||||
|
r
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.map(Measure::getId)
|
||||||
|
.collect(Collectors.toCollection(HashSet::new))
|
||||||
|
.containsAll(
|
||||||
|
Lists
|
||||||
|
.newArrayList(
|
||||||
|
"downloads", "views", "influence", "popularity", "influence_alt", "popularity_alt",
|
||||||
|
"impulse")));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <T extends Oaf> Function<T, Boolean> isNotZeroFnUsingIdOrSourceAndTarget() {
|
||||||
|
return t -> {
|
||||||
|
if (isSubClass(t, Relation.class)) {
|
||||||
|
final Relation rel = (Relation) t;
|
||||||
|
return StringUtils.isNotBlank(rel.getSource()) && StringUtils.isNotBlank(rel.getTarget());
|
||||||
|
}
|
||||||
|
return StringUtils.isNotBlank(((OafEntity) t).getId());
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean isDirEmpty(final Path directory) throws IOException {
|
||||||
|
try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(directory)) {
|
||||||
|
return !dirStream.iterator().hasNext();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,3 @@
|
||||||
|
{"collectedfrom":null,"dataInfo":null,"lastupdatetimestamp":null,"id":"50|doi_dedup___::02317b7093277ec8aa0311d5c6a25b9b","originalId":null,"pid":null,"dateofcollection":null,"dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":[{"id":"downloads","unit":[{"key":"opendoar____::358aee4cc897452c00244351e4d91f69||ZENODO","value":"125","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:usage_counts","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}]},{"id":"views","unit":[{"key":"opendoar____::358aee4cc897452c00244351e4d91f69||ZENODO","value":"35","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:usage_counts","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}]}],"context":null,"processingchargeamount":null,"processingchargecurrency":null,"author":null,"resulttype":null,"metaResourceType":null,"language":null,"country":null,"subject":null,"title":null,"relevantdate":null,"description":null,"dateofacceptance":null,"publisher":null,"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"externalReference":null,"instance":null,"eoscifguidelines":null,"openAccessColor":null,"publiclyFunded":null,"transformativeAgreement":null,"isGreen":null,"isInDiamondJournal":null}
|
||||||
|
{"collectedfrom":null,"dataInfo":null,"lastupdatetimestamp":null,"id":"50|doi_dedup___::02317b7093277ec8aa0311d5c6a25b9b","originalId":null,"pid":null,"dateofcollection":null,"dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":[{"id":"influence","unit":[{"key":"score","value":"3.1167566E-9","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},{"key":"class","value":"C5","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}]},{"id":"popularity","unit":[{"key":"score","value":"7.335433E-9","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},{"key":"class","value":"C4","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}]},{"id":"influence_alt","unit":[{"key":"score","value":"4","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},{"key":"class","value":"C5","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}]},{"id":"popularity_alt","unit":[{"key":"score","value":"2.96","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},{"key":"class","value":"C4","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}]},{"id":"impulse","unit":[{"key":"score","value":"4","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},{"key":"class","value":"C5","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}]}],"context":null,"processingchargeamount":null,"processingchargecurrency":null,"author":null,"resulttype":null,"metaResourceType":null,"language":null,"country":null,"subject":null,"title":null,"relevantdate":null,"description":null,"dateofacceptance":null,"publisher":null,"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"externalReference":null,"instance":null,"eoscifguidelines":null,"openAccessColor":null,"publiclyFunded":null,"transformativeAgreement":null,"isGreen":null,"isInDiamondJournal":null}
|
||||||
|
{"collectedfrom":null,"dataInfo":null,"lastupdatetimestamp":null,"id":"50|doi_dedup___::02317b7093277ec8aa0311d5c6a25b9b","originalId":null,"pid":null,"dateofcollection":null,"dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"context":null,"processingchargeamount":null,"processingchargecurrency":null,"author":null,"resulttype":null,"metaResourceType":null,"language":null,"country":null,"subject":null,"title":null,"relevantdate":null,"description":null,"dateofacceptance":null,"publisher":null,"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"externalReference":null,"instance":null,"eoscifguidelines":null,"openAccessColor":"hybrid","publiclyFunded":false,"transformativeAgreement":null,"isGreen":true,"isInDiamondJournal":false}
|
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue