[graph cleaning] testing the collectedfron and hostedby patch procedure

This commit is contained in:
Claudio Atzori 2022-11-29 16:07:09 +01:00
parent 58c05731f9
commit 8e3edba318
2 changed files with 208 additions and 108 deletions

View File

@ -25,11 +25,13 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.action.model.MasterDuplicate; import eu.dnetlib.dhp.common.action.model.MasterDuplicate;
import eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob; import eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob;
import eu.dnetlib.dhp.schema.oaf.Instance; import eu.dnetlib.dhp.schema.oaf.Instance;
import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.utils.DHPUtils;
import scala.Tuple2; import scala.Tuple2;
public class CleanCfHbSparkJob { public class CleanCfHbSparkJob {
@ -76,6 +78,8 @@ public class CleanCfHbSparkJob {
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
HdfsSupport.remove(resolvedPath, spark.sparkContext().hadoopConfiguration());
cleanCfHb( cleanCfHb(
spark, inputPath, entityClazz, resolvedPath, dsMasterDuplicatePath, outputPath); spark, inputPath, entityClazz, resolvedPath, dsMasterDuplicatePath, outputPath);
}); });
@ -92,33 +96,15 @@ public class CleanCfHbSparkJob {
// prepare the resolved CF|HB references with the corresponding EMPTY master ID // prepare the resolved CF|HB references with the corresponding EMPTY master ID
Dataset<IdCfHbMapping> resolved = spark Dataset<IdCfHbMapping> resolved = spark
.read() .read()
.textFile(inputPath) .textFile(inputPath)
.map(as(entityClazz), Encoders.bean(entityClazz)) .map(as(entityClazz), Encoders.bean(entityClazz))
.flatMap( .flatMap(flattenCfHbFn(), Encoders.bean(IdCfHbMapping.class));
(FlatMapFunction<T, IdCfHbMapping>) r -> {
final List<IdCfHbMapping> list = Stream
.concat(
r.getCollectedfrom().stream().map(KeyValue::getKey),
Stream
.concat(
r.getInstance().stream().map(Instance::getHostedby).map(KeyValue::getKey),
r.getInstance().stream().map(Instance::getCollectedfrom).map(KeyValue::getKey)))
.distinct()
.map(s -> asIdCfHbMapping(r.getId(), s))
.collect(Collectors.toList());
return list.iterator();
},
Encoders.bean(IdCfHbMapping.class));
// set the EMPTY master ID/NAME and save it // set the EMPTY master ID/NAME and save it
resolved resolved
.joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicateId"))) .joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicateId")))
.map((MapFunction<Tuple2<IdCfHbMapping, MasterDuplicate>, IdCfHbMapping>) t -> { .map(asIdCfHbMapping(), Encoders.bean(IdCfHbMapping.class))
t._1().setMasterId(t._2().getMasterId());
t._1().setMasterName(t._2().getMasterName());
return t._1();
}, Encoders.bean(IdCfHbMapping.class))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.json(resolvedPath); .json(resolvedPath);
@ -131,27 +117,46 @@ public class CleanCfHbSparkJob {
// read the result table // read the result table
Dataset<T> res = spark Dataset<T> res = spark
.read() .read()
.textFile(inputPath) .textFile(inputPath)
.map(as(entityClazz), Encoders.bean(entityClazz)); .map(as(entityClazz), Encoders.bean(entityClazz));
// Join the results with the resolved CF|HB mapping, apply the mapping and save it // Join the results with the resolved CF|HB mapping, apply the mapping and save it
res res
.joinWith(resolvedDS, res.col("id").equalTo(resolvedDS.col("resultId")), "left") .joinWith(resolvedDS, res.col("id").equalTo(resolvedDS.col("resultId")), "left")
.groupByKey((MapFunction<Tuple2<T, IdCfHbMapping>, String>) t -> t._1().getId(), Encoders.STRING()) .groupByKey((MapFunction<Tuple2<T, IdCfHbMapping>, String>) t -> t._1().getId(), Encoders.STRING())
.mapGroups(getMapGroupsFunction(), Encoders.bean(entityClazz)) .mapGroups(getMapGroupsFunction(), Encoders.bean(entityClazz))
//.agg(new IdCfHbMappingAggregator(entityClazz).toColumn())
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath); .json(outputPath);
} }
@NotNull private static MapFunction<Tuple2<IdCfHbMapping, MasterDuplicate>, IdCfHbMapping> asIdCfHbMapping() {
return t -> {
t._1().setMasterId(t._2().getMasterId());
t._1().setMasterName(t._2().getMasterName());
return t._1();
};
}
private static <T extends Result> FlatMapFunction<T, IdCfHbMapping> flattenCfHbFn() {
return r -> Stream
.concat(
r.getCollectedfrom().stream().map(KeyValue::getKey),
Stream
.concat(
r.getInstance().stream().map(Instance::getHostedby).map(KeyValue::getKey),
r.getInstance().stream().map(Instance::getCollectedfrom).map(KeyValue::getKey)))
.distinct()
.map(s -> asIdCfHbMapping(r.getId(), s))
.iterator();
}
private static <T extends Result> MapGroupsFunction<String, Tuple2<T, IdCfHbMapping>, T> getMapGroupsFunction() { private static <T extends Result> MapGroupsFunction<String, Tuple2<T, IdCfHbMapping>, T> getMapGroupsFunction() {
return new MapGroupsFunction<String, Tuple2<T, IdCfHbMapping>, T>() { return new MapGroupsFunction<String, Tuple2<T, IdCfHbMapping>, T>() {
@Override @Override
public T call(String key, Iterator<Tuple2<T, IdCfHbMapping>> values) throws Exception { public T call(String key, Iterator<Tuple2<T, IdCfHbMapping>> values) {
final Tuple2<T, IdCfHbMapping> first = values.next(); final Tuple2<T, IdCfHbMapping> first = values.next();
final T res = first._1(); final T res = first._1();

View File

@ -1,8 +1,16 @@
package eu.dnetlib.dhp.oa.graph.clean.cfhb; package eu.dnetlib.dhp.oa.graph.clean.cfhb;
import com.fasterxml.jackson.databind.ObjectMapper; import static org.junit.jupiter.api.Assertions.assertEquals;
import eu.dnetlib.dhp.schema.oaf.Dataset; import static org.junit.jupiter.api.Assertions.assertTrue;
import eu.dnetlib.dhp.schema.oaf.Publication;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
@ -13,106 +21,193 @@ import org.junit.jupiter.api.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File; import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.net.URISyntaxException; import eu.dnetlib.dhp.schema.oaf.Dataset;
import java.nio.file.Files; import eu.dnetlib.dhp.schema.oaf.Publication;
import java.nio.file.Path;
import java.nio.file.Paths;
public class CleanCfHbSparkJobTest { public class CleanCfHbSparkJobTest {
private static final Logger log = LoggerFactory.getLogger(CleanCfHbSparkJobTest.class); private static final Logger log = LoggerFactory.getLogger(CleanCfHbSparkJobTest.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark; private static SparkSession spark;
private static Path testBaseTmpPath; private static Path testBaseTmpPath;
private static String resolvedPath; private static String resolvedPath;
private static String graphInputPath; private static String graphInputPath;
private static String graphOutputPath; private static String graphOutputPath;
private static String dsMasterDuplicatePath; private static String dsMasterDuplicatePath;
@BeforeAll @BeforeAll
public static void beforeAll() throws IOException, URISyntaxException { public static void beforeAll() throws IOException, URISyntaxException {
testBaseTmpPath = Files.createTempDirectory(CleanCfHbSparkJobTest.class.getSimpleName()); testBaseTmpPath = Files.createTempDirectory(CleanCfHbSparkJobTest.class.getSimpleName());
log.info("using test base path {}", testBaseTmpPath); log.info("using test base path {}", testBaseTmpPath);
final File entitiesSources = Paths final File entitiesSources = Paths
.get(CleanCfHbSparkJobTest.class.getResource("/eu/dnetlib/dhp/oa/graph/clean/cfhb/entities").toURI()) .get(CleanCfHbSparkJobTest.class.getResource("/eu/dnetlib/dhp/oa/graph/clean/cfhb/entities").toURI())
.toFile(); .toFile();
FileUtils FileUtils
.copyDirectory( .copyDirectory(
entitiesSources, entitiesSources,
testBaseTmpPath.resolve("input").resolve("entities").toFile()); testBaseTmpPath.resolve("input").resolve("entities").toFile());
FileUtils FileUtils
.copyFileToDirectory( .copyFileToDirectory(
Paths Paths
.get(CleanCfHbSparkJobTest.class.getResource("/eu/dnetlib/dhp/oa/graph/clean/cfhb/masterduplicate.json").toURI()) .get(
.toFile(), CleanCfHbSparkJobTest.class
testBaseTmpPath.resolve("workingDir").resolve("masterduplicate").toFile()); .getResource("/eu/dnetlib/dhp/oa/graph/clean/cfhb/masterduplicate.json")
.toURI())
.toFile(),
testBaseTmpPath.resolve("workingDir").resolve("masterduplicate").toFile());
graphInputPath = testBaseTmpPath.resolve("input").resolve("entities").toString();
resolvedPath = testBaseTmpPath.resolve("workingDir").resolve("cfHbResolved").toString();
graphOutputPath = testBaseTmpPath.resolve("workingDir").resolve("cfHbPatched").toString();
dsMasterDuplicatePath = testBaseTmpPath.resolve("workingDir").resolve("masterduplicate").toString();
graphInputPath = testBaseTmpPath.resolve("input").resolve("entities").toString(); SparkConf conf = new SparkConf();
resolvedPath = testBaseTmpPath.resolve("workingDir").resolve("cfHbResolved").toString(); conf.setAppName(CleanCfHbSparkJobTest.class.getSimpleName());
graphOutputPath = testBaseTmpPath.resolve("workingDir").resolve("cfHbPatched").toString();
dsMasterDuplicatePath = testBaseTmpPath.resolve("workingDir").resolve("masterduplicate").toString();
SparkConf conf = new SparkConf(); conf.setMaster("local[*]");
conf.setAppName(CleanCfHbSparkJobTest.class.getSimpleName()); conf.set("spark.driver.host", "localhost");
conf.set("spark.ui.enabled", "false");
conf.setMaster("local[*]"); spark = SparkSession
conf.set("spark.driver.host", "localhost"); .builder()
conf.set("spark.ui.enabled", "false"); .appName(CleanCfHbSparkJobTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
spark = SparkSession @AfterAll
.builder() public static void afterAll() throws IOException {
.appName(CleanCfHbSparkJobTest.class.getSimpleName()) FileUtils.deleteDirectory(testBaseTmpPath.toFile());
.config(conf) spark.stop();
.getOrCreate(); }
}
@AfterAll @Test
public static void afterAll() throws IOException { void testCleanCfHbSparkJob() throws Exception {
FileUtils.deleteDirectory(testBaseTmpPath.toFile()); final String outputPath = graphOutputPath + "/dataset";
spark.stop(); final String inputPath = graphInputPath + "/dataset";
}
@Test org.apache.spark.sql.Dataset<Dataset> records = read(spark, inputPath, Dataset.class);
void testCleanCfHbSparkJob() throws Exception { Dataset d = records
final String outputPath = graphOutputPath + "/dataset"; .filter("id = '50|doi_________::09821844208a5cd6300b2bfb13bca1b9'")
CleanCfHbSparkJob .first();
.main( assertEquals("10|re3data_____::4c4416659cb74c2e0e891a883a047cbc", d.getCollectedfrom().get(0).getKey());
new String[] { assertEquals("Bacterial Protein Interaction Database - DUP", d.getCollectedfrom().get(0).getValue());
"--isSparkSessionManaged", Boolean.FALSE.toString(), assertEquals(
"--inputPath", graphInputPath + "/dataset", "10|re3data_____::4c4416659cb74c2e0e891a883a047cbc", d.getInstance().get(0).getCollectedfrom().getKey());
"--outputPath", outputPath, assertEquals(
"--resolvedPath", resolvedPath + "/dataset", "Bacterial Protein Interaction Database - DUP", d.getInstance().get(0).getCollectedfrom().getValue());
"--graphTableClassName", Dataset.class.getCanonicalName(),
"--datasourceMasterDuplicate", dsMasterDuplicatePath
});
//final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); d = records
.filter("id = '50|DansKnawCris::0dd644304b7116e8e58da3a5e3adc37a'")
.first();
assertEquals("10|opendoar____::788b4ac1e172d8e520c2b9461c0a3d35", d.getCollectedfrom().get(0).getKey());
assertEquals("FILUR DATA - DUP", d.getCollectedfrom().get(0).getValue());
assertEquals(
"10|opendoar____::788b4ac1e172d8e520c2b9461c0a3d35", d.getInstance().get(0).getCollectedfrom().getKey());
assertEquals("FILUR DATA - DUP", d.getInstance().get(0).getCollectedfrom().getValue());
assertEquals(
"10|re3data_____::6ffd7bc058f762912dc494cd9c175341", d.getInstance().get(0).getHostedby().getKey());
assertEquals("depositar - DUP", d.getInstance().get(0).getHostedby().getValue());
Assertions.assertTrue(Files.exists(Paths.get(graphOutputPath, "dataset"))); d = records
.filter("id = '50|DansKnawCris::203a27996ddc0fd1948258e5b7dec61c'")
.first();
assertEquals("10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getCollectedfrom().get(0).getKey());
assertEquals("DANS (Data Archiving and Networked Services)", d.getCollectedfrom().get(0).getValue());
assertEquals(
"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getCollectedfrom().getKey());
assertEquals(
"DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getCollectedfrom().getValue());
assertEquals(
"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getHostedby().getKey());
assertEquals("DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getHostedby().getValue());
final org.apache.spark.sql.Dataset<Dataset> d = spark CleanCfHbSparkJob
.read() .main(
.textFile(outputPath) new String[] {
.map(as(Dataset.class), Encoders.bean(Dataset.class)); "--isSparkSessionManaged", Boolean.FALSE.toString(),
Assertions.assertEquals(3, d.count()); "--inputPath", inputPath,
"--outputPath", outputPath,
"--resolvedPath", resolvedPath + "/dataset",
"--graphTableClassName", Dataset.class.getCanonicalName(),
"--datasourceMasterDuplicate", dsMasterDuplicatePath
});
} assertTrue(Files.exists(Paths.get(graphOutputPath, "dataset")));
private static <R> MapFunction<String, R> as(Class<R> clazz) { records = read(spark, outputPath, Dataset.class);
return s -> OBJECT_MAPPER.readValue(s, clazz);
} assertEquals(3, records.count());
d = records
.filter("id = '50|doi_________::09821844208a5cd6300b2bfb13bca1b9'")
.first();
assertEquals("10|fairsharing_::a29d1598024f9e87beab4b98411d48ce", d.getCollectedfrom().get(0).getKey());
assertEquals("Bacterial Protein Interaction Database", d.getCollectedfrom().get(0).getValue());
assertEquals(
"10|fairsharing_::a29d1598024f9e87beab4b98411d48ce", d.getInstance().get(0).getCollectedfrom().getKey());
assertEquals("Bacterial Protein Interaction Database", d.getInstance().get(0).getCollectedfrom().getValue());
d = records
.filter("id = '50|DansKnawCris::0dd644304b7116e8e58da3a5e3adc37a'")
.first();
assertEquals("10|re3data_____::fc1db64b3964826913b1e9eafe830490", d.getCollectedfrom().get(0).getKey());
assertEquals("FULIR Data", d.getCollectedfrom().get(0).getValue());
assertEquals(
"10|re3data_____::fc1db64b3964826913b1e9eafe830490", d.getInstance().get(0).getCollectedfrom().getKey());
assertEquals("FULIR Data", d.getInstance().get(0).getCollectedfrom().getValue());
assertEquals(
"10|fairsharing_::3f647cadf56541fb9513cb63ec370187", d.getInstance().get(0).getHostedby().getKey());
assertEquals("depositar", d.getInstance().get(0).getHostedby().getValue());
d = records
.filter("id = '50|DansKnawCris::203a27996ddc0fd1948258e5b7dec61c'")
.first();
assertEquals("10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getCollectedfrom().get(0).getKey());
assertEquals("DANS (Data Archiving and Networked Services)", d.getCollectedfrom().get(0).getValue());
assertEquals(
"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getCollectedfrom().getKey());
assertEquals(
"DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getCollectedfrom().getValue());
assertEquals(
"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getHostedby().getKey());
assertEquals("DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getHostedby().getValue());
d = records
.filter("id = '50|DansKnawCris::203a27996ddc0fd1948258e5b7dec61c'")
.first();
assertEquals("10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getCollectedfrom().get(0).getKey());
assertEquals("DANS (Data Archiving and Networked Services)", d.getCollectedfrom().get(0).getValue());
assertEquals(
"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getCollectedfrom().getKey());
assertEquals(
"DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getCollectedfrom().getValue());
assertEquals(
"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getHostedby().getKey());
assertEquals("DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getHostedby().getValue());
}
private <R> org.apache.spark.sql.Dataset<R> read(SparkSession spark, String path, Class<R> clazz) {
return spark
.read()
.textFile(path)
.map(as(clazz), Encoders.bean(clazz));
}
private static <R> MapFunction<String, R> as(Class<R> clazz) {
return s -> OBJECT_MAPPER.readValue(s, clazz);
}
} }