graph cleaning refactoring #282

Merged
claudio.atzori merged 10 commits from graph_cleaning_refactoring into beta 2023-05-02 10:40:03 +02:00
6 changed files with 448 additions and 38 deletions
Showing only changes of commit 488d9a5eaa - Show all commits

View File

@ -292,7 +292,7 @@ public class CleanGraphSparkJob {
private void updateResult(T res, IdCfHbMapping m) { private void updateResult(T res, IdCfHbMapping m) {
if (Objects.nonNull(m)) { if (Objects.nonNull(m)) {
res.getCollectedfrom().forEach(kv -> updateKeyValue(kv, m)); filter(res.getCollectedfrom()).forEach(kv -> updateKeyValue(kv, m));
((Result) res).getInstance().forEach(i -> { ((Result) res).getInstance().forEach(i -> {
updateKeyValue(i.getHostedby(), m); updateKeyValue(i.getHostedby(), m);
updateKeyValue(i.getCollectedfrom(), m); updateKeyValue(i.getCollectedfrom(), m);
@ -300,8 +300,14 @@ public class CleanGraphSparkJob {
} }
} }
private Stream<KeyValue> filter(List<KeyValue> kvs) {
return kvs
.stream()
.filter(kv -> StringUtils.isNotBlank(kv.getKey()) && StringUtils.isNotBlank(kv.getValue()));
}
private void updateKeyValue(final KeyValue kv, final IdCfHbMapping a) { private void updateKeyValue(final KeyValue kv, final IdCfHbMapping a) {
if (kv.getKey().equals(a.getCfhb())) { if (Objects.nonNull(kv) && Objects.nonNull(kv.getKey()) && kv.getKey().equals(a.getCfhb())) {
kv.setKey(a.getMasterId()); kv.setKey(a.getMasterId());
kv.setValue(a.getMasterName()); kv.setValue(a.getMasterName());
} }

View File

@ -4,6 +4,8 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob;
import eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.GraphCleaningFunctions; import eu.dnetlib.dhp.schema.oaf.utils.GraphCleaningFunctions;
@ -15,15 +17,15 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.io.filefilter.*; import org.apache.commons.io.filefilter.*;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.ForeachFunction; import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.*;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
@ -233,15 +235,15 @@ public class CleanGraphSparkJobTest {
@Test @Test
void testCleaning_publication() throws Exception { void testCleaning_publication() throws Exception {
spark.read() final String id = "50|CSC_________::2250a70c903c6ac6e4c01438259e9375";
.textFile(graphInputPath.toString() + "/publication")
.map(as(Publication.class), Encoders.bean(Publication.class)) Publication p_in = read(spark, graphOutputPath.toString() + "/publication", Publication.class)
.collectAsList() .filter(String.format("id = '%s'", id))
.forEach(p -> { .first();
assertNull(p.getBestaccessright());
assertTrue(p instanceof Result); assertNull(p_in.getBestaccessright());
assertTrue(p instanceof Publication); assertTrue(p_in instanceof Result);
}); assertTrue(p_in instanceof Publication);
new CleanGraphSparkJob( new CleanGraphSparkJob(
args("/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json", args("/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json",
@ -253,9 +255,8 @@ public class CleanGraphSparkJobTest {
"--deepClean", "false" "--deepClean", "false"
})).run(false, isLookUpService); })).run(false, isLookUpService);
Publication p = spark.read() Publication p = read(spark, graphOutputPath.toString() + "/publication", Publication.class)
.textFile(graphOutputPath.toString() + "/publication") .filter(String.format("id = '%s'", id))
.map(as(Publication.class), Encoders.bean(Publication.class))
.first(); .first();
assertNull(p.getPublisher()); assertNull(p.getPublisher());
@ -383,6 +384,393 @@ public class CleanGraphSparkJobTest {
verify_keyword(p, "Avicennia"); verify_keyword(p, "Avicennia");
} }
@Test
public void testCleanDoiBoost() throws IOException, ParseException, ISLookUpException, ClassNotFoundException {
verifyFiltering(1, "50|doi_________::b0baa0eb88a5788f0b8815560d2a32f2");
}
@Test
public void testCleanDoiBoost2() throws IOException, ParseException, ISLookUpException, ClassNotFoundException {
verifyFiltering(1, "50|doi_________::4972b0ca81b96b225aed8038bb965656");
}
private void verifyFiltering(int expectedCount, String id) throws ISLookUpException, ClassNotFoundException, IOException, ParseException {
new CleanGraphSparkJob(
args("/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json",
new String[] {
"--inputPath", graphInputPath.toString() + "/publication",
"--outputPath", graphOutputPath.toString() + "/publication",
"--isLookupUrl", "lookupurl",
"--graphTableClassName", Publication.class.getCanonicalName(),
"--deepClean", "false"
})).run(false, isLookUpService);
Dataset<Publication> p = read(spark, graphOutputPath.toString() + "/publication", Publication.class)
.filter(String.format("id = '%s'", id));
assertEquals(expectedCount, p.count());
}
@Test
public void testCleanContext() throws Exception {
final String prefix = "gcube ";
new CleanGraphSparkJob(
args("/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json",
new String[] {
"--inputPath", graphInputPath.toString() + "/publication",
"--outputPath", graphOutputPath.toString() + "/publication",
"--isLookupUrl", "lookupurl",
"--graphTableClassName", Publication.class.getCanonicalName(),
"--deepClean", "true",
"--contextId", "sobigdata",
"--verifyParam", "gCube ",
"--masterDuplicatePath", dsMasterDuplicatePath,
"--country", "NL",
"--verifyCountryParam", "10.17632",
"--collectedfrom", "NARCIS",
"--hostedBy", getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy")
.getPath()
})).run(false, isLookUpService);
Dataset<Publication> pubs = read(spark, graphOutputPath.toString() + "/publication", Publication.class)
.filter((FilterFunction<Publication>) p1 -> StringUtils.endsWith(p1.getId(), "_ctx"));
Assertions.assertEquals(7, pubs.count());
// original result with sobigdata context and gcube as starting string in the main title for the publication
assertEquals(
0,
pubs
.filter((FilterFunction<Publication>) p -> p.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6439a_ctx"))
.first()
.getContext()
.size());
// original result with sobigdata context without gcube as starting string in the main title for the publication
assertEquals(
1,
pubs
.filter((FilterFunction<Publication>) p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67d_ctx"))
.first()
.getContext()
.size());
assertEquals(
"sobigdata::projects::2",
pubs
.filter((FilterFunction<Publication>) p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67d_ctx"))
.first()
.getContext()
.get(0)
.getId());
// original result with sobigdata context with gcube as starting string in the subtitle
assertEquals(
1,
pubs
.filter((FilterFunction<Publication>) p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6f_ctx"))
.first()
.getContext()
.size());
assertEquals(
"sobigdata::projects::2",
pubs
.filter((FilterFunction<Publication>) p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6f_ctx"))
.first()
.getContext()
.get(0)
.getId());
List<StructuredProperty> titles = pubs
.filter((FilterFunction<Publication>) p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6f_ctx"))
.first()
.getTitle();
assertEquals(1, titles.size());
assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix));
assertEquals("subtitle", titles.get(0).getQualifier().getClassid());
// original result with sobigdata context with gcube not as starting string in the main title
assertEquals(
1,
pubs
.filter((FilterFunction<Publication>) p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9_ctx"))
.first()
.getContext()
.size());
assertEquals(
"sobigdata::projects::1",
pubs
.filter((FilterFunction<Publication>) p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9_ctx"))
.first()
.getContext()
.get(0)
.getId());
titles = pubs
.filter((FilterFunction<Publication>) p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9_ctx"))
.first()
.getTitle();
assertEquals(1, titles.size());
assertFalse(titles.get(0).getValue().toLowerCase().startsWith(prefix));
assertTrue(titles.get(0).getValue().toLowerCase().contains(prefix.trim()));
assertEquals("main title", titles.get(0).getQualifier().getClassid());
// original result with sobigdata in context and also other contexts with gcube as starting string for the main
// title
assertEquals(
1,
pubs
.filter((FilterFunction<Publication>) p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fd_ctx"))
.first()
.getContext()
.size());
assertEquals(
"dh-ch",
pubs
.filter((FilterFunction<Publication>) p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fd_ctx"))
.first()
.getContext()
.get(0)
.getId());
titles = pubs
.filter((FilterFunction<Publication>) p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fd_ctx"))
.first()
.getTitle();
assertEquals(1, titles.size());
assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix));
assertEquals("main title", titles.get(0).getQualifier().getClassid());
// original result with multiple main title one of which whith gcube as starting string and with 2 contextes
assertEquals(
1,
pubs
.filter((FilterFunction<Publication>) p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74_ctx"))
.first()
.getContext()
.size());
assertEquals(
"dh-ch",
pubs
.filter((FilterFunction<Publication>) p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74_ctx"))
.first()
.getContext()
.get(0)
.getId());
titles = pubs
.filter((FilterFunction<Publication>) p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74_ctx"))
.first()
.getTitle();
assertEquals(2, titles.size());
assertTrue(
titles
.stream()
.anyMatch(
t -> t.getQualifier().getClassid().equals("main title")
&& t.getValue().toLowerCase().startsWith(prefix)));
// original result without sobigdata in context with gcube as starting string for the main title
assertEquals(
1,
pubs
.filter((FilterFunction<Publication>) p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6_ctx"))
.first()
.getContext()
.size());
assertEquals(
"dh-ch",
pubs
.filter((FilterFunction<Publication>) p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6_ctx"))
.first()
.getContext()
.get(0)
.getId());
titles = pubs
.filter((FilterFunction<Publication>) p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6_ctx"))
.first()
.getTitle();
assertEquals(2, titles.size());
assertTrue(
titles
.stream()
.anyMatch(
t -> t.getQualifier().getClassid().equals("main title")
&& t.getValue().toLowerCase().startsWith(prefix)));
}
@Test
void testCleanCfHbSparkJob() throws Exception {
final Dataset<Publication> pubs_in = read(spark, graphInputPath.toString() + "/publication", Publication.class);
final Publication p1_in = pubs_in
.filter("id = '50|doi_________::09821844208a5cd6300b2bfb13b_cfhb'")
.first();
assertEquals("10|re3data_____::4c4416659cb74c2e0e891a883a047cbc", p1_in.getCollectedfrom().get(0).getKey());
assertEquals("Bacterial Protein Interaction Database - DUP", p1_in.getCollectedfrom().get(0).getValue());
assertEquals(
"10|re3data_____::4c4416659cb74c2e0e891a883a047cbc", p1_in.getInstance().get(0).getCollectedfrom().getKey());
assertEquals(
"Bacterial Protein Interaction Database - DUP", p1_in.getInstance().get(0).getCollectedfrom().getValue());
final Publication p2_in = pubs_in
.filter("id = '50|DansKnawCris::0dd644304b7116e8e58da3a5e3a_cfhb'")
.first();
assertEquals("10|opendoar____::788b4ac1e172d8e520c2b9461c0a3d35", p2_in.getCollectedfrom().get(0).getKey());
assertEquals("FILUR DATA - DUP", p2_in.getCollectedfrom().get(0).getValue());
assertEquals(
"10|opendoar____::788b4ac1e172d8e520c2b9461c0a3d35", p2_in.getInstance().get(0).getCollectedfrom().getKey());
assertEquals("FILUR DATA - DUP", p2_in.getInstance().get(0).getCollectedfrom().getValue());
assertEquals(
"10|re3data_____::6ffd7bc058f762912dc494cd9c175341", p2_in.getInstance().get(0).getHostedby().getKey());
assertEquals("depositar - DUP", p2_in.getInstance().get(0).getHostedby().getValue());
final Publication p3_in = pubs_in
.filter("id = '50|DansKnawCris::203a27996ddc0fd1948258e5b7d_cfhb'")
.first();
assertEquals("10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", p3_in.getCollectedfrom().get(0).getKey());
assertEquals("DANS (Data Archiving and Networked Services)", p3_in.getCollectedfrom().get(0).getValue());
assertEquals(
"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", p3_in.getInstance().get(0).getCollectedfrom().getKey());
assertEquals(
"DANS (Data Archiving and Networked Services)", p3_in.getInstance().get(0).getCollectedfrom().getValue());
assertEquals(
"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", p3_in.getInstance().get(0).getHostedby().getKey());
assertEquals("DANS (Data Archiving and Networked Services)", p3_in.getInstance().get(0).getHostedby().getValue());
new CleanGraphSparkJob(
args("/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json",
new String[] {
"--inputPath", graphInputPath.toString() + "/publication",
"--outputPath", graphOutputPath.toString() + "/publication",
"--isLookupUrl", "lookupurl",
"--graphTableClassName", Publication.class.getCanonicalName(),
"--deepClean", "true",
"--contextId", "sobigdata",
"--verifyParam", "gCube ",
"--masterDuplicatePath", dsMasterDuplicatePath,
"--country", "NL",
"--verifyCountryParam", "10.17632",
"--collectedfrom", "NARCIS",
"--hostedBy", getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy")
.getPath()
})).run(false, isLookUpService);
assertTrue(Files.exists(Paths.get(graphOutputPath, "publication")));
final Dataset<Publication> pubs_out = read(spark, graphOutputPath.toString() + "/publication", Publication.class)
.filter((FilterFunction<Publication>) p -> StringUtils.endsWith(p.getId(), "_cfhb"));
assertEquals(3, pubs_out.count());
final Publication p1_out = pubs_out
.filter("id = '50|doi_________::09821844208a5cd6300b2bfb13b_cfhb'")
.first();
assertEquals("10|fairsharing_::a29d1598024f9e87beab4b98411d48ce", p1_out.getCollectedfrom().get(0).getKey());
assertEquals("Bacterial Protein Interaction Database", p1_out.getCollectedfrom().get(0).getValue());
assertEquals(
"10|fairsharing_::a29d1598024f9e87beab4b98411d48ce", p1_out.getInstance().get(0).getCollectedfrom().getKey());
assertEquals("Bacterial Protein Interaction Database", p1_out.getInstance().get(0).getCollectedfrom().getValue());
final Publication p2_out = pubs_out
.filter("id = '50|DansKnawCris::0dd644304b7116e8e58da3a5e3a_cfhb'")
.first();
assertEquals("10|re3data_____::fc1db64b3964826913b1e9eafe830490", p2_out.getCollectedfrom().get(0).getKey());
assertEquals("FULIR Data", p2_out.getCollectedfrom().get(0).getValue());
assertEquals(
"10|re3data_____::fc1db64b3964826913b1e9eafe830490", p2_out.getInstance().get(0).getCollectedfrom().getKey());
assertEquals("FULIR Data", p2_out.getInstance().get(0).getCollectedfrom().getValue());
assertEquals(
"10|fairsharing_::3f647cadf56541fb9513cb63ec370187", p2_out.getInstance().get(0).getHostedby().getKey());
assertEquals("depositar", p2_out.getInstance().get(0).getHostedby().getValue());
final Publication p3_out = pubs_out
.filter("id = '50|DansKnawCris::203a27996ddc0fd1948258e5b7d_cfhb'")
.first();
assertEquals("10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", p3_out.getCollectedfrom().get(0).getKey());
assertEquals("DANS (Data Archiving and Networked Services)", p3_out.getCollectedfrom().get(0).getValue());
assertEquals(
"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", p3_out.getInstance().get(0).getCollectedfrom().getKey());
assertEquals(
"DANS (Data Archiving and Networked Services)", p3_out.getInstance().get(0).getCollectedfrom().getValue());
assertEquals(
"10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", p3_out.getInstance().get(0).getHostedby().getKey());
assertEquals("DANS (Data Archiving and Networked Services)", p3_out.getInstance().get(0).getHostedby().getValue());
}
@Test
public void testCleanCountry() throws Exception {
new CleanGraphSparkJob(
args("/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json",
new String[] {
"--inputPath", graphInputPath.toString() + "/publication",
"--outputPath", graphOutputPath.toString() + "/publication",
"--isLookupUrl", "lookupurl",
"--graphTableClassName", Publication.class.getCanonicalName(),
"--deepClean", "true",
"--contextId", "sobigdata",
"--verifyParam", "gCube ",
"--masterDuplicatePath", dsMasterDuplicatePath,
"--country", "NL",
"--verifyCountryParam", "10.17632",
"--collectedfrom", "NARCIS",
"--hostedBy", getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy")
.getPath()
})).run(false, isLookUpService);
final Dataset<Publication> pubs_out = read(spark, graphOutputPath.toString() + "/publication", Publication.class)
.filter((FilterFunction<Publication>) p -> StringUtils.endsWith(p.getId(), "_country"));
Assertions.assertEquals(7, pubs_out.count());
// original result with NL country and doi starting with Mendely prefix, but not collectedfrom NARCIS
assertEquals(
1,
pubs_out
.filter((FilterFunction<Publication>) p -> p.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6_country"))
.first()
.getCountry()
.size());
// original result with NL country and pid not starting with Mendely prefix
assertEquals(
1,
pubs_out
.filter((FilterFunction<Publication>) p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1_country"))
.first()
.getCountry()
.size());
// original result with NL country and doi starting with Mendely prefix and collectedfrom NARCIS but not
// inserted with propagation
assertEquals(
1,
pubs_out
.filter((FilterFunction<Publication>) p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817e_country"))
.first()
.getCountry()
.size());
// original result with NL country and doi starting with Mendely prefix and collectedfrom NARCIS inserted with
// propagation
assertEquals(
0,
pubs_out
.filter((FilterFunction<Publication>) p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817e_country"))
.first()
.getCountry()
.size());
}
private List<String> vocs() throws IOException { private List<String> vocs() throws IOException {
return IOUtils return IOUtils
.readLines( .readLines(
@ -395,6 +783,13 @@ public class CleanGraphSparkJobTest {
GraphCleaningFunctionsTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt")); GraphCleaningFunctionsTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt"));
} }
private <R> org.apache.spark.sql.Dataset<R> read(SparkSession spark, String path, Class<R> clazz) {
return spark
.read()
.textFile(path)
.map(as(clazz), Encoders.bean(clazz));
}
private static <R> MapFunction<String, R> as(Class<R> clazz) { private static <R> MapFunction<String, R> as(Class<R> clazz) {
return s -> MAPPER.readValue(s, clazz); return s -> MAPPER.readValue(s, clazz);
} }