parse input file

This commit is contained in:
Michele Artini 2021-04-29 11:34:47 +02:00
parent f77ba34126
commit a278d67175
4 changed files with 177 additions and 159 deletions

View File

@ -9,6 +9,7 @@ import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.listKeyValues;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.qualifier; import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.structuredProperty; import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.structuredProperty;
import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -22,10 +23,12 @@ import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
@ -57,16 +60,19 @@ public class GenerateRorActionSetJob {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final List<KeyValue> ROR_COLLECTED_FROM = listKeyValues("10|openaire____::993a7ae7a863813cf95028b50708e222", "ROR"); private static final List<KeyValue> ROR_COLLECTED_FROM = listKeyValues(
"10|openaire____::993a7ae7a863813cf95028b50708e222", "ROR");
private static final DataInfo ROR_DATA_INFO = dataInfo(false, "", false, false, ENTITYREGISTRY_PROVENANCE_ACTION, "0.92"); private static final DataInfo ROR_DATA_INFO = dataInfo(
false, "", false, false, ENTITYREGISTRY_PROVENANCE_ACTION, "0.92");
private static final Qualifier ROR_PID_TYPE = qualifier("ROR", "ROR", "dnet:pid_types", "dnet:pid_types"); private static final Qualifier ROR_PID_TYPE = qualifier("ROR", "ROR", "dnet:pid_types", "dnet:pid_types");
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
final String jsonConfiguration = IOUtils final String jsonConfiguration = IOUtils
.toString(SparkAtomicActionJob.class .toString(
SparkAtomicActionJob.class
.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/ror/action_set_parameters.json")); .getResourceAsStream("/eu/dnetlib/dhp/actionmanager/ror/action_set_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
@ -100,16 +106,16 @@ public class GenerateRorActionSetJob {
private static void processRorOrganizations(final SparkSession spark, private static void processRorOrganizations(final SparkSession spark,
final String inputPath, final String inputPath,
final String outputPath) { final String outputPath) throws Exception {
readInputPath(spark, inputPath) readInputPath(spark, inputPath)
.map(GenerateRorActionSetJob::convertRorOrg, Encoders.bean(Organization.class)) .map(GenerateRorActionSetJob::convertRorOrg, Encoders.bean(Organization.class))
.toJavaRDD() .toJavaRDD()
.map(o -> new AtomicAction<>(Organization.class, o)) .map(o -> new AtomicAction<>(Organization.class, o))
.mapToPair(aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), .mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa)))) new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
} }
protected static Organization convertRorOrg(final RorOrganization r) { protected static Organization convertRorOrg(final RorOrganization r) {
@ -142,7 +148,11 @@ public class GenerateRorActionSetJob {
o.setEcsmevalidated(null); o.setEcsmevalidated(null);
o.setEcnutscode(null); o.setEcnutscode(null);
if (r.getCountry() != null) { if (r.getCountry() != null) {
o.setCountry(qualifier(r.getCountry().getCountryCode(), r.getCountry().getCountryName(), COUNTRIES_VOC, COUNTRIES_VOC)); o
.setCountry(
qualifier(
r.getCountry().getCountryCode(), r.getCountry().getCountryName(), COUNTRIES_VOC,
COUNTRIES_VOC));
} else { } else {
o.setCountry(null); o.setCountry(null);
} }
@ -162,14 +172,24 @@ public class GenerateRorActionSetJob {
if (all == null) { if (all == null) {
// skip // skip
} else if (all instanceof String) { } else if (all instanceof String) {
pids.add(structuredProperty(all.toString(), qualifier(type, type, "dnet:pid_types", "dnet:pid_types"), ROR_DATA_INFO)); pids
.add(
structuredProperty(
all.toString(), qualifier(type, type, "dnet:pid_types", "dnet:pid_types"), ROR_DATA_INFO));
} else if (all instanceof Collection) { } else if (all instanceof Collection) {
for (final Object pid : (Collection<?>) all) { for (final Object pid : (Collection<?>) all) {
pids.add(structuredProperty(pid.toString(), qualifier(type, type, "dnet:pid_types", "dnet:pid_types"), ROR_DATA_INFO)); pids
.add(
structuredProperty(
pid.toString(), qualifier(type, type, "dnet:pid_types", "dnet:pid_types"),
ROR_DATA_INFO));
} }
} else if (all instanceof String[]) { } else if (all instanceof String[]) {
for (final String pid : (String[]) all) { for (final String pid : (String[]) all) {
pids.add(structuredProperty(pid, qualifier(type, type, "dnet:pid_types", "dnet:pid_types"), ROR_DATA_INFO)); pids
.add(
structuredProperty(
pid, qualifier(type, type, "dnet:pid_types", "dnet:pid_types"), ROR_DATA_INFO));
} }
} else { } else {
log.warn("Invalid type for pid list: " + all.getClass()); log.warn("Invalid type for pid list: " + all.getClass());
@ -185,16 +205,22 @@ public class GenerateRorActionSetJob {
names.addAll(r.getAcronyms()); names.addAll(r.getAcronyms());
r.getLabels().forEach(l -> names.add(l.getLabel())); r.getLabels().forEach(l -> names.add(l.getLabel()));
return names.stream().filter(StringUtils::isNotBlank).map(s -> field(s, ROR_DATA_INFO)).collect(Collectors.toList()); return names
.stream()
.filter(StringUtils::isNotBlank)
.map(s -> field(s, ROR_DATA_INFO))
.collect(Collectors.toList());
} }
private static Dataset<RorOrganization> readInputPath( private static Dataset<RorOrganization> readInputPath(
final SparkSession spark, final SparkSession spark,
final String inputPath) { final String path) throws Exception {
return spark
.read() try (final FileSystem fileSystem = FileSystem.get(new Configuration());
.textFile(inputPath) final InputStream is = fileSystem.open(new Path(path))) {
.map((MapFunction<String, RorOrganization>) value -> OBJECT_MAPPER.readValue(value, RorOrganization.class), Encoders.bean(RorOrganization.class)); final RorOrganization[] arr = OBJECT_MAPPER.readValue(is, RorOrganization[].class);
return spark.createDataset(Arrays.asList(arr), Encoders.bean(RorOrganization.class));
}
} }
} }

View File

@ -1,11 +1,11 @@
<workflow-app name="Update_ROR_action_set" xmlns="uri:oozie:workflow:0.5"> <workflow-app name="Update_ROR_action_set" xmlns="uri:oozie:workflow:0.5">
<parameters> <parameters>
<property> <property>
<name>inputPath</name> <name>rorJsonInputPath</name>
<description>the path of the json</description> <description>the path of the json</description>
</property> </property>
<property> <property>
<name>outputPath</name> <name>rorActionSetPath</name>
<description>path where to store the action set</description> <description>path where to store the action set</description>
</property> </property>
</parameters> </parameters>
@ -18,8 +18,8 @@
<action name="deleteoutputpath"> <action name="deleteoutputpath">
<fs> <fs>
<delete path='${outputPath}'/> <delete path='${rorActionSetPath}'/>
<mkdir path='${outputPath}'/> <mkdir path='${rorActionSetPath}'/>
<delete path='${workingDir}'/> <delete path='${workingDir}'/>
<mkdir path='${workingDir}'/> <mkdir path='${workingDir}'/>
</fs> </fs>
@ -44,8 +44,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${inputPath}</arg> <arg>--inputPath</arg><arg>${rorJsonInputPath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg> <arg>--outputPath</arg><arg>${rorActionSetPath}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.actionmanager.ror; package eu.dnetlib.dhp.actionmanager.ror;
import java.io.FileInputStream; import java.io.FileInputStream;
@ -20,11 +21,13 @@ class GenerateRorActionSetJobTest {
private static final String local_file_path = "/Users/michele/Downloads/ror-data-2021-04-06.json"; private static final String local_file_path = "/Users/michele/Downloads/ror-data-2021-04-06.json";
@BeforeEach @BeforeEach
void setUp() throws Exception {} void setUp() throws Exception {
}
@Test @Test
void testConvertRorOrg() throws Exception { void testConvertRorOrg() throws Exception {
final RorOrganization r = mapper.readValue(IOUtils.toString(getClass().getResourceAsStream("ror_org.json")), RorOrganization.class); final RorOrganization r = mapper
.readValue(IOUtils.toString(getClass().getResourceAsStream("ror_org.json")), RorOrganization.class);
final Organization org = GenerateRorActionSetJob.convertRorOrg(r); final Organization org = GenerateRorActionSetJob.convertRorOrg(r);
System.out.println(mapper.writeValueAsString(org)); System.out.println(mapper.writeValueAsString(org));
@ -32,7 +35,8 @@ class GenerateRorActionSetJobTest {
@Test @Test
void testConvertAllRorOrg() throws Exception { void testConvertAllRorOrg() throws Exception {
final RorOrganization[] arr = mapper.readValue(IOUtils.toString(new FileInputStream(local_file_path)), RorOrganization[].class); final RorOrganization[] arr = mapper
.readValue(IOUtils.toString(new FileInputStream(local_file_path)), RorOrganization[].class);
for (final RorOrganization r : arr) { for (final RorOrganization r : arr) {
GenerateRorActionSetJob.convertRorOrg(r); GenerateRorActionSetJob.convertRorOrg(r);

View File

@ -12,7 +12,6 @@ import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.List;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -27,7 +26,13 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*; import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -97,8 +102,7 @@ public class SparkDedupTest implements Serializable {
IOUtils IOUtils
.toString( .toString(
SparkDedupTest.class SparkDedupTest.class
.getResourceAsStream( .getResourceAsStream("/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml")));
"/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml")));
lenient() lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization"))) .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization")))
@ -106,8 +110,7 @@ public class SparkDedupTest implements Serializable {
IOUtils IOUtils
.toString( .toString(
SparkDedupTest.class SparkDedupTest.class
.getResourceAsStream( .getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
"/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
lenient() lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication"))) .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication")))
@ -115,8 +118,7 @@ public class SparkDedupTest implements Serializable {
IOUtils IOUtils
.toString( .toString(
SparkDedupTest.class SparkDedupTest.class
.getResourceAsStream( .getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json")));
"/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json")));
lenient() lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("software"))) .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("software")))
@ -124,8 +126,7 @@ public class SparkDedupTest implements Serializable {
IOUtils IOUtils
.toString( .toString(
SparkDedupTest.class SparkDedupTest.class
.getResourceAsStream( .getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json")));
"/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json")));
lenient() lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("dataset"))) .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("dataset")))
@ -133,8 +134,7 @@ public class SparkDedupTest implements Serializable {
IOUtils IOUtils
.toString( .toString(
SparkDedupTest.class SparkDedupTest.class
.getResourceAsStream( .getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json")));
"/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json")));
lenient() lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("otherresearchproduct"))) .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("otherresearchproduct")))
@ -142,24 +142,21 @@ public class SparkDedupTest implements Serializable {
IOUtils IOUtils
.toString( .toString(
SparkDedupTest.class SparkDedupTest.class
.getResourceAsStream( .getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json")));
"/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json")));
} }
@Test @Test
@Order(1) @Order(1)
public void createSimRelsTest() throws Exception { public void createSimRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(
SparkCreateSimRels.class SparkCreateSimRels.class
.getResourceAsStream( .getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json")));
"/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json")));
parser parser
.parseArgument( .parseArgument(new String[] {
new String[] {
"-i", testGraphBasePath, "-i", testGraphBasePath,
"-asi", testActionSetId, "-asi", testActionSetId,
"-la", "lookupurl", "-la", "lookupurl",
@ -169,27 +166,27 @@ public class SparkDedupTest implements Serializable {
new SparkCreateSimRels(parser, spark).run(isLookUpService); new SparkCreateSimRels(parser, spark).run(isLookUpService);
long orgs_simrel = spark final long orgs_simrel = spark
.read() .read()
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization")) .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization"))
.count(); .count();
long pubs_simrel = spark final long pubs_simrel = spark
.read() .read()
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "publication")) .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "publication"))
.count(); .count();
long sw_simrel = spark final long sw_simrel = spark
.read() .read()
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "software")) .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "software"))
.count(); .count();
long ds_simrel = spark final long ds_simrel = spark
.read() .read()
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "dataset")) .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "dataset"))
.count(); .count();
long orp_simrel = spark final long orp_simrel = spark
.read() .read()
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "otherresearchproduct")) .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "otherresearchproduct"))
.count(); .count();
@ -205,16 +202,14 @@ public class SparkDedupTest implements Serializable {
@Order(2) @Order(2)
public void cutMergeRelsTest() throws Exception { public void cutMergeRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(
SparkCreateMergeRels.class SparkCreateMergeRels.class
.getResourceAsStream( .getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
"/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
parser parser
.parseArgument( .parseArgument(new String[] {
new String[] {
"-i", "-i",
testGraphBasePath, testGraphBasePath,
"-asi", "-asi",
@ -229,7 +224,7 @@ public class SparkDedupTest implements Serializable {
new SparkCreateMergeRels(parser, spark).run(isLookUpService); new SparkCreateMergeRels(parser, spark).run(isLookUpService);
long orgs_mergerel = spark final long orgs_mergerel = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
.as(Encoders.bean(Relation.class)) .as(Encoders.bean(Relation.class))
@ -240,7 +235,7 @@ public class SparkDedupTest implements Serializable {
.where("cnt > 3") .where("cnt > 3")
.count(); .count();
long pubs_mergerel = spark final long pubs_mergerel = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel")
.as(Encoders.bean(Relation.class)) .as(Encoders.bean(Relation.class))
@ -250,7 +245,7 @@ public class SparkDedupTest implements Serializable {
.select("source", "cnt") .select("source", "cnt")
.where("cnt > 3") .where("cnt > 3")
.count(); .count();
long sw_mergerel = spark final long sw_mergerel = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")
.as(Encoders.bean(Relation.class)) .as(Encoders.bean(Relation.class))
@ -261,7 +256,7 @@ public class SparkDedupTest implements Serializable {
.where("cnt > 3") .where("cnt > 3")
.count(); .count();
long ds_mergerel = spark final long ds_mergerel = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel") .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")
.as(Encoders.bean(Relation.class)) .as(Encoders.bean(Relation.class))
@ -272,7 +267,7 @@ public class SparkDedupTest implements Serializable {
.where("cnt > 3") .where("cnt > 3")
.count(); .count();
long orp_mergerel = spark final long orp_mergerel = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel")
.as(Encoders.bean(Relation.class)) .as(Encoders.bean(Relation.class))
@ -301,16 +296,14 @@ public class SparkDedupTest implements Serializable {
@Order(3) @Order(3)
public void createMergeRelsTest() throws Exception { public void createMergeRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(
SparkCreateMergeRels.class SparkCreateMergeRels.class
.getResourceAsStream( .getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
"/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
parser parser
.parseArgument( .parseArgument(new String[] {
new String[] {
"-i", "-i",
testGraphBasePath, testGraphBasePath,
"-asi", "-asi",
@ -323,24 +316,24 @@ public class SparkDedupTest implements Serializable {
new SparkCreateMergeRels(parser, spark).run(isLookUpService); new SparkCreateMergeRels(parser, spark).run(isLookUpService);
long orgs_mergerel = spark final long orgs_mergerel = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
.count(); .count();
long pubs_mergerel = spark final long pubs_mergerel = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel")
.count(); .count();
long sw_mergerel = spark final long sw_mergerel = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")
.count(); .count();
long ds_mergerel = spark final long ds_mergerel = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel") .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")
.count(); .count();
long orp_mergerel = spark final long orp_mergerel = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel")
.count(); .count();
@ -357,15 +350,13 @@ public class SparkDedupTest implements Serializable {
@Order(4) @Order(4)
public void createDedupRecordTest() throws Exception { public void createDedupRecordTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(
SparkCreateDedupRecord.class SparkCreateDedupRecord.class
.getResourceAsStream( .getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json")));
"/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json")));
parser parser
.parseArgument( .parseArgument(new String[] {
new String[] {
"-i", "-i",
testGraphBasePath, testGraphBasePath,
"-asi", "-asi",
@ -378,19 +369,20 @@ public class SparkDedupTest implements Serializable {
new SparkCreateDedupRecord(parser, spark).run(isLookUpService); new SparkCreateDedupRecord(parser, spark).run(isLookUpService);
long orgs_deduprecord = jsc final long orgs_deduprecord = jsc
.textFile(testOutputBasePath + "/" + testActionSetId + "/organization_deduprecord") .textFile(testOutputBasePath + "/" + testActionSetId + "/organization_deduprecord")
.count(); .count();
long pubs_deduprecord = jsc final long pubs_deduprecord = jsc
.textFile(testOutputBasePath + "/" + testActionSetId + "/publication_deduprecord") .textFile(testOutputBasePath + "/" + testActionSetId + "/publication_deduprecord")
.count(); .count();
long sw_deduprecord = jsc final long sw_deduprecord = jsc
.textFile(testOutputBasePath + "/" + testActionSetId + "/software_deduprecord") .textFile(testOutputBasePath + "/" + testActionSetId + "/software_deduprecord")
.count(); .count();
long ds_deduprecord = jsc.textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_deduprecord").count(); final long ds_deduprecord = jsc
long orp_deduprecord = jsc .textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_deduprecord")
.textFile( .count();
testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_deduprecord") final long orp_deduprecord = jsc
.textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_deduprecord")
.count(); .count();
assertEquals(85, orgs_deduprecord); assertEquals(85, orgs_deduprecord);
@ -404,29 +396,27 @@ public class SparkDedupTest implements Serializable {
@Order(5) @Order(5)
public void updateEntityTest() throws Exception { public void updateEntityTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(
SparkUpdateEntity.class SparkUpdateEntity.class
.getResourceAsStream( .getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json")));
"/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json")));
parser parser
.parseArgument( .parseArgument(new String[] {
new String[] {
"-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath
}); });
new SparkUpdateEntity(parser, spark).run(isLookUpService); new SparkUpdateEntity(parser, spark).run(isLookUpService);
long organizations = jsc.textFile(testDedupGraphBasePath + "/organization").count(); final long organizations = jsc.textFile(testDedupGraphBasePath + "/organization").count();
long publications = jsc.textFile(testDedupGraphBasePath + "/publication").count(); final long publications = jsc.textFile(testDedupGraphBasePath + "/publication").count();
long projects = jsc.textFile(testDedupGraphBasePath + "/project").count(); final long projects = jsc.textFile(testDedupGraphBasePath + "/project").count();
long datasource = jsc.textFile(testDedupGraphBasePath + "/datasource").count(); final long datasource = jsc.textFile(testDedupGraphBasePath + "/datasource").count();
long softwares = jsc.textFile(testDedupGraphBasePath + "/software").count(); final long softwares = jsc.textFile(testDedupGraphBasePath + "/software").count();
long dataset = jsc.textFile(testDedupGraphBasePath + "/dataset").count(); final long dataset = jsc.textFile(testDedupGraphBasePath + "/dataset").count();
long otherresearchproduct = jsc.textFile(testDedupGraphBasePath + "/otherresearchproduct").count(); final long otherresearchproduct = jsc.textFile(testDedupGraphBasePath + "/otherresearchproduct").count();
long mergedOrgs = spark final long mergedOrgs = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
.as(Encoders.bean(Relation.class)) .as(Encoders.bean(Relation.class))
@ -436,7 +426,7 @@ public class SparkDedupTest implements Serializable {
.distinct() .distinct()
.count(); .count();
long mergedPubs = spark final long mergedPubs = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel")
.as(Encoders.bean(Relation.class)) .as(Encoders.bean(Relation.class))
@ -446,7 +436,7 @@ public class SparkDedupTest implements Serializable {
.distinct() .distinct()
.count(); .count();
long mergedSw = spark final long mergedSw = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")
.as(Encoders.bean(Relation.class)) .as(Encoders.bean(Relation.class))
@ -456,7 +446,7 @@ public class SparkDedupTest implements Serializable {
.distinct() .distinct()
.count(); .count();
long mergedDs = spark final long mergedDs = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel") .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")
.as(Encoders.bean(Relation.class)) .as(Encoders.bean(Relation.class))
@ -466,7 +456,7 @@ public class SparkDedupTest implements Serializable {
.distinct() .distinct()
.count(); .count();
long mergedOrp = spark final long mergedOrp = spark
.read() .read()
.load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel")
.as(Encoders.bean(Relation.class)) .as(Encoders.bean(Relation.class))
@ -484,27 +474,27 @@ public class SparkDedupTest implements Serializable {
assertEquals(389, dataset); assertEquals(389, dataset);
assertEquals(517, otherresearchproduct); assertEquals(517, otherresearchproduct);
long deletedOrgs = jsc final long deletedOrgs = jsc
.textFile(testDedupGraphBasePath + "/organization") .textFile(testDedupGraphBasePath + "/organization")
.filter(this::isDeletedByInference) .filter(this::isDeletedByInference)
.count(); .count();
long deletedPubs = jsc final long deletedPubs = jsc
.textFile(testDedupGraphBasePath + "/publication") .textFile(testDedupGraphBasePath + "/publication")
.filter(this::isDeletedByInference) .filter(this::isDeletedByInference)
.count(); .count();
long deletedSw = jsc final long deletedSw = jsc
.textFile(testDedupGraphBasePath + "/software") .textFile(testDedupGraphBasePath + "/software")
.filter(this::isDeletedByInference) .filter(this::isDeletedByInference)
.count(); .count();
long deletedDs = jsc final long deletedDs = jsc
.textFile(testDedupGraphBasePath + "/dataset") .textFile(testDedupGraphBasePath + "/dataset")
.filter(this::isDeletedByInference) .filter(this::isDeletedByInference)
.count(); .count();
long deletedOrp = jsc final long deletedOrp = jsc
.textFile(testDedupGraphBasePath + "/otherresearchproduct") .textFile(testDedupGraphBasePath + "/otherresearchproduct")
.filter(this::isDeletedByInference) .filter(this::isDeletedByInference)
.count(); .count();
@ -520,21 +510,19 @@ public class SparkDedupTest implements Serializable {
@Order(6) @Order(6)
public void propagateRelationTest() throws Exception { public void propagateRelationTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(
SparkPropagateRelation.class SparkPropagateRelation.class
.getResourceAsStream( .getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json")));
"/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json")));
parser parser
.parseArgument( .parseArgument(new String[] {
new String[] {
"-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath "-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath
}); });
new SparkPropagateRelation(parser, spark).run(isLookUpService); new SparkPropagateRelation(parser, spark).run(isLookUpService);
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); final long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
assertEquals(4862, relations); assertEquals(4862, relations);
@ -548,10 +536,9 @@ public class SparkDedupTest implements Serializable {
.select(mergeRels.col("target")) .select(mergeRels.col("target"))
.distinct() .distinct()
.toJavaRDD() .toJavaRDD()
.mapToPair( .mapToPair((PairFunction<Row, String, String>) r -> new Tuple2<>(r.getString(0), "d"));
(PairFunction<Row, String, String>) r -> new Tuple2<String, String>(r.getString(0), "d"));
JavaRDD<String> toCheck = jsc final JavaRDD<String> toCheck = jsc
.textFile(testDedupGraphBasePath + "/relation") .textFile(testDedupGraphBasePath + "/relation")
.mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json)) .mapToPair(json -> new Tuple2<>(MapDocumentUtil.getJPathString("$.source", json), json))
.join(mergedIds) .join(mergedIds)
@ -560,8 +547,8 @@ public class SparkDedupTest implements Serializable {
.join(mergedIds) .join(mergedIds)
.map(t -> t._2()._1()); .map(t -> t._2()._1());
long deletedbyinference = toCheck.filter(this::isDeletedByInference).count(); final long deletedbyinference = toCheck.filter(this::isDeletedByInference).count();
long updated = toCheck.count(); final long updated = toCheck.count();
assertEquals(updated, deletedbyinference); assertEquals(updated, deletedbyinference);
} }
@ -573,8 +560,8 @@ public class SparkDedupTest implements Serializable {
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2); testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2);
} }
private void testUniqueness(String path, int expected_total, int expected_unique) { private void testUniqueness(final String path, final int expected_total, final int expected_unique) {
Dataset<Relation> rel = spark final Dataset<Relation> rel = spark
.read() .read()
.textFile(getClass().getResource(path).getPath()) .textFile(getClass().getResource(path).getPath())
.map( .map(
@ -591,7 +578,8 @@ public class SparkDedupTest implements Serializable {
FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
} }
public boolean isDeletedByInference(String s) { public boolean isDeletedByInference(final String s) {
return s.contains("\"deletedbyinference\":true"); return s.contains("\"deletedbyinference\":true");
} }
} }