bug fix in the id generator and implementation of jobs for organization dedup

This commit is contained in:
miconis 2020-10-20 12:19:46 +02:00
parent 6f8720982c
commit 0e54803177
12 changed files with 353 additions and 189 deletions

View File

@ -1,8 +1,8 @@
package eu.dnetlib.dhp.oa.dedup;
import java.util.*;
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
@ -15,6 +15,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;

View File

@ -6,25 +6,25 @@ import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import eu.dnetlib.dhp.oa.dedup.model.PidType;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.commons.lang.StringUtils;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import eu.dnetlib.dhp.oa.dedup.model.PidType;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.utils.DHPUtils;
public class IdGenerator implements Serializable {
public static String CROSSREF_ID = "10|openaire____::081b82f96300b6a6e3d282bad31cb6e2";
public static String DATACITE_ID = "10|openaire____::9e3be59865b2c1c335d32dae2fe7b254";
public static String BASE_DATE = "2000-01-01";
public static String BASE_DATE = "2000-01-01";
// pick the best pid from the list (consider date and pidtype)
public static String generate(List<Identifier> pids, String defaultID) {
@ -55,9 +55,9 @@ public class IdGenerator implements Serializable {
date = new Date();
}
return Lists
.newArrayList(
new Identifier(new StructuredProperty(), date, PidType.original, entity.getCollectedfrom(),
EntityType.fromClass(entity.getClass()), entity.getId()));
.newArrayList(
new Identifier(new StructuredProperty(), date, PidType.original, entity.getCollectedfrom(),
EntityType.fromClass(entity.getClass()), entity.getId()));
}
// pick the best pid from the entity. Returns a list (length 1) to save time in the call

View File

@ -5,8 +5,11 @@ import java.io.IOException;
import java.util.Optional;
import java.util.Properties;
import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
@ -18,6 +21,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Relation;
@ -62,6 +66,10 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction {
.map(Integer::valueOf)
.orElse(NUM_CONNECTIONS);
final String apiUrl = Optional
.ofNullable(parser.get("apiUrl"))
.orElse("");
final String dbUrl = parser.get("dbUrl");
final String dbTable = parser.get("dbTable");
final String dbUser = parser.get("dbUser");
@ -72,6 +80,7 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction {
log.info("actionSetId: '{}'", actionSetId);
log.info("workingPath: '{}'", workingPath);
log.info("numPartitions: '{}'", numConnections);
log.info("apiUrl: '{}'", apiUrl);
log.info("dbUrl: '{}'", dbUrl);
log.info("dbUser: '{}'", dbUser);
log.info("table: '{}'", dbTable);
@ -89,9 +98,12 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction {
newOrgs
.repartition(numConnections)
.write()
.mode(SaveMode.Overwrite)
.mode(SaveMode.Append)
.jdbc(dbUrl, dbTable, connectionProperties);
if (!apiUrl.isEmpty())
updateSimRels(apiUrl);
}
public static Dataset<OrgSimRel> createNewOrgs(
@ -138,9 +150,21 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction {
r._1()._2().getLegalshortname() != null ? r._1()._2().getLegalshortname().getValue() : "",
r._1()._2().getCountry() != null ? r._1()._2().getCountry().getClassid() : "",
r._1()._2().getWebsiteurl() != null ? r._1()._2().getWebsiteurl().getValue() : "",
r._1()._2().getCollectedfrom().get(0).getValue()),
r._1()._2().getCollectedfrom().get(0).getValue(), ""),
Encoders.bean(OrgSimRel.class));
}
private static String updateSimRels(final String apiUrl) throws IOException {
log.info("Updating simrels on the portal");
final HttpGet req = new HttpGet(apiUrl);
try (final CloseableHttpClient client = HttpClients.createDefault()) {
try (final CloseableHttpResponse response = client.execute(req)) {
return IOUtils.toString(response.getEntity().getContent());
}
}
}
}

View File

@ -1,18 +1,14 @@
package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
@ -21,14 +17,11 @@ import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import scala.Tuple2;
import scala.Tuple3;
import java.io.IOException;
import java.util.*;
public class SparkPrepareOrgRels extends AbstractSparkAction {
@ -67,10 +60,6 @@ public class SparkPrepareOrgRels extends AbstractSparkAction {
.map(Integer::valueOf)
.orElse(NUM_CONNECTIONS);
final String apiUrl = Optional
.ofNullable(parser.get("apiUrl"))
.orElse("");
final String dbUrl = parser.get("dbUrl");
final String dbTable = parser.get("dbTable");
final String dbUser = parser.get("dbUser");
@ -81,7 +70,6 @@ public class SparkPrepareOrgRels extends AbstractSparkAction {
log.info("actionSetId: '{}'", actionSetId);
log.info("workingPath: '{}'", workingPath);
log.info("numPartitions: '{}'", numConnections);
log.info("apiUrl: '{}'", apiUrl);
log.info("dbUrl: '{}'", dbUrl);
log.info("dbUser: '{}'", dbUser);
log.info("table: '{}'", dbTable);
@ -102,9 +90,6 @@ public class SparkPrepareOrgRels extends AbstractSparkAction {
.mode(SaveMode.Overwrite)
.jdbc(dbUrl, dbTable, connectionProperties);
if (!apiUrl.isEmpty())
updateSimRels(apiUrl);
}
public static Dataset<OrgSimRel> createRelations(
@ -112,6 +97,105 @@ public class SparkPrepareOrgRels extends AbstractSparkAction {
final String mergeRelsPath,
final String entitiesPath) {
Dataset<Tuple2<String, Organization>> entities = spark
.read()
.textFile(entitiesPath)
.map(
(MapFunction<String, Tuple2<String, Organization>>) it -> {
Organization entity = OBJECT_MAPPER.readValue(it, Organization.class);
return new Tuple2<>(entity.getId(), entity);
},
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Organization.class)));
Dataset<Tuple3<String, String, String>> relations = spark
.createDataset(
spark
.read()
.load(mergeRelsPath)
.as(Encoders.bean(Relation.class))
.where("relClass == 'merges'")
.toJavaRDD()
.mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget()))
.filter(t -> !t._2().contains("openorgsmesh"))
.groupByKey()
.map(g -> Lists.newArrayList(g._2()))
.filter(l -> l.size() > 1)
.flatMap(l -> {
String groupId = "group::" + UUID.randomUUID();
List<String> ids = sortIds(l);
List<Tuple3<String, String, String>> rels = new ArrayList<>();
for (String source : ids) {
if (source.contains("openorgs____") || ids.indexOf(source) == 0)
for (String target : ids) {
rels.add(new Tuple3<>(source, target, groupId));
}
}
return rels.iterator();
})
.rdd(),
Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.STRING()));
Dataset<Tuple2<String, OrgSimRel>> relations2 = relations // <openorgs, corda>
.joinWith(entities, relations.col("_2").equalTo(entities.col("_1")), "inner")
.map(
(MapFunction<Tuple2<Tuple3<String, String, String>, Tuple2<String, Organization>>, OrgSimRel>) r -> new OrgSimRel(
r._1()._1(),
r._2()._2().getOriginalId().get(0),
r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "",
r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "",
r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "",
r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "",
r._2()._2().getCollectedfrom().get(0).getValue(),
r._1()._3()),
Encoders.bean(OrgSimRel.class))
.map(
(MapFunction<OrgSimRel, Tuple2<String, OrgSimRel>>) o -> new Tuple2<>(o.getLocal_id(), o),
Encoders.tuple(Encoders.STRING(), Encoders.bean(OrgSimRel.class)));
return relations2
.joinWith(entities, relations2.col("_1").equalTo(entities.col("_1")), "inner")
.map(
(MapFunction<Tuple2<Tuple2<String, OrgSimRel>, Tuple2<String, Organization>>, OrgSimRel>) r -> {
OrgSimRel orgSimRel = r._1()._2();
orgSimRel.setLocal_id(r._2()._2().getOriginalId().get(0));
return orgSimRel;
},
Encoders.bean(OrgSimRel.class));
}
// select best ids from the list. Priority: 1) openorgs, 2)corda, 3)alphabetic
public static List<String> sortIds(List<String> ids) {
ids.sort((o1, o2) -> {
if (o1.contains("openorgs____") && o2.contains("openorgs____"))
return o1.compareTo(o2);
if (o1.contains("corda") && o2.contains("corda"))
return o1.compareTo(o2);
if (o1.contains("openorgs____"))
return -1;
if (o2.contains("openorgs____"))
return 1;
if (o1.contains("corda"))
return -1;
if (o2.contains("corda"))
return 1;
return o1.compareTo(o2);
});
return ids;
}
public static Dataset<OrgSimRel> createRelationsFromScratch(
final SparkSession spark,
final String mergeRelsPath,
final String entitiesPath) {
// <id, json_entity>
Dataset<Tuple2<String, Organization>> entities = spark
.read()
@ -151,13 +235,14 @@ public class SparkPrepareOrgRels extends AbstractSparkAction {
.joinWith(entities, relations.col("_2").equalTo(entities.col("_1")), "inner")
.map(
(MapFunction<Tuple2<Tuple2<String, String>, Tuple2<String, Organization>>, OrgSimRel>) r -> new OrgSimRel(
r._1()._1(),
r._2()._2().getOriginalId().get(0),
r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "",
r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "",
r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "",
r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "",
r._2()._2().getCollectedfrom().get(0).getValue()),
r._1()._1(),
r._2()._2().getOriginalId().get(0),
r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "",
r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "",
r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "",
r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "",
r._2()._2().getCollectedfrom().get(0).getValue(),
"group::" + r._1()._1()),
Encoders.bean(OrgSimRel.class))
.map(
(MapFunction<OrgSimRel, Tuple2<String, OrgSimRel>>) o -> new Tuple2<>(o.getLocal_id(), o),
@ -175,16 +260,4 @@ public class SparkPrepareOrgRels extends AbstractSparkAction {
}
private static String updateSimRels(final String apiUrl) throws IOException {
log.info("Updating simrels on the portal");
final HttpGet req = new HttpGet(apiUrl);
try (final CloseableHttpClient client = HttpClients.createDefault()) {
try (final CloseableHttpResponse response = client.execute(req)) {
return IOUtils.toString(response.getEntity().getContent());
}
}
}
}

View File

@ -6,13 +6,13 @@ import java.io.Serializable;
import java.util.Set;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.commons.lang.StringUtils;
import org.codehaus.jackson.annotate.JsonIgnore;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.dedup.DedupUtility;
import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.pace.util.PaceException;
public class ConnectedComponent implements Serializable {

View File

@ -7,6 +7,7 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.oa.dedup.IdGenerator;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
@ -95,8 +96,13 @@ public class Identifier implements Serializable, Comparable<Identifier> {
// priority in comparisons: 1) pidtype, 2) collectedfrom (depending on the entity type) , 3) date 4)
// alphabetical order of the originalID
Set<String> lKeys = this.collectedFrom.stream().map(KeyValue::getKey).collect(Collectors.toSet());
Set<String> rKeys = i.getCollectedFrom().stream().map(KeyValue::getKey).collect(Collectors.toSet());
Set<String> lKeys = Sets.newHashSet();
if (this.collectedFrom != null)
lKeys = this.collectedFrom.stream().map(KeyValue::getKey).collect(Collectors.toSet());
Set<String> rKeys = Sets.newHashSet();
if (i.getCollectedFrom() != null)
rKeys = i.getCollectedFrom().stream().map(KeyValue::getKey).collect(Collectors.toSet());
if (this.getType().compareTo(i.getType()) == 0) { // same type
if (entityType == EntityType.publication) {

View File

@ -12,12 +12,13 @@ public class OrgSimRel implements Serializable {
String oa_country;
String oa_url;
String oa_collectedfrom;
String group_id;
public OrgSimRel() {
}
public OrgSimRel(String local_id, String oa_original_id, String oa_name, String oa_acronym, String oa_country,
String oa_url, String oa_collectedfrom) {
String oa_url, String oa_collectedfrom, String group_id) {
this.local_id = local_id;
this.oa_original_id = oa_original_id;
this.oa_name = oa_name;
@ -25,6 +26,7 @@ public class OrgSimRel implements Serializable {
this.oa_country = oa_country;
this.oa_url = oa_url;
this.oa_collectedfrom = oa_collectedfrom;
this.group_id = group_id;
}
public String getLocal_id() {
@ -83,6 +85,14 @@ public class OrgSimRel implements Serializable {
this.oa_collectedfrom = oa_collectedfrom;
}
public String getGroup_id() {
return group_id;
}
public void setGroup_id(String group_id) {
this.group_id = group_id;
}
@Override
public String toString() {
return "OrgSimRel{" +

View File

@ -112,7 +112,7 @@
<action name="copyRelations">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>-pb</arg>
<arg>${graphBasePath}/relation</arg>
<arg>/tmp/graph_openorgs_and_corda/relation</arg>
<arg>${workingPath}/${actionSetId}/organization_simrel</arg>
</distcp>
<ok to="CreateSimRel"/>
@ -194,6 +194,37 @@
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--dbUrl</arg><arg>${dbUrl}</arg>
<arg>--dbTable</arg><arg>${dbTable}</arg>
<arg>--dbUser</arg><arg>${dbUser}</arg>
<arg>--dbPwd</arg><arg>${dbPwd}</arg>
<arg>--numConnections</arg><arg>20</arg>
</spark>
<ok to="PrepareNewOrgs"/>
<error to="Kill"/>
</action>
<action name="PrepareNewOrgs">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Prepare New Organizations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkPrepareNewOrgs</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--apiUrl</arg><arg>${apiUrl}</arg>
<arg>--dbUrl</arg><arg>${dbUrl}</arg>
<arg>--dbTable</arg><arg>${dbTable}</arg>

View File

@ -29,6 +29,12 @@
"paramDescription": "number of connections to the postgres db (for the write operation)",
"paramRequired": false
},
{
"paramName": "au",
"paramLongName": "apiUrl",
"paramDescription": "the url for the APIs of the openorgs service",
"paramRequired": false
},
{
"paramName": "du",
"paramLongName": "dbUrl",

View File

@ -29,12 +29,6 @@
"paramDescription": "number of connections to the postgres db (for the write operation)",
"paramRequired": false
},
{
"paramName": "au",
"paramLongName": "apiUrl",
"paramDescription": "the url for the APIs of the openorgs service",
"paramRequired": false
},
{
"paramName": "du",
"paramLongName": "dbUrl",

View File

@ -1,17 +1,6 @@
package eu.dnetlib.dhp.oa.dedup;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import eu.dnetlib.dhp.oa.dedup.model.PidType;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.pace.util.MapDocumentUtil;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.jupiter.api.*;
import scala.Tuple2;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.BufferedReader;
@ -22,119 +11,149 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.jupiter.api.*;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import eu.dnetlib.dhp.oa.dedup.model.PidType;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class IdGeneratorTest {
private static List<Identifier> bestIds;
private static List<Tuple2<String, Publication>> pubs;
private static List<Identifier> bestIds;
private static List<Tuple2<String, Publication>> pubs;
private static List<Identifier> bestIds2;
private static List<Identifier> bestIds3;
private static List<Identifier> bestIds2;
private static List<Identifier> bestIds3;
private static String testEntityBasePath;
private static String testEntityBasePath;
private static SimpleDateFormat sdf;
private static Date baseDate;
private static SimpleDateFormat sdf;
private static Date baseDate;
@BeforeAll
public static void setUp() throws Exception {
@BeforeAll
public static void setUp() throws Exception {
sdf = new SimpleDateFormat("yyyy-MM-dd");
baseDate = sdf.parse("2000-01-01");
sdf = new SimpleDateFormat("yyyy-MM-dd");
baseDate = sdf.parse("2000-01-01");
bestIds = new ArrayList<>();
bestIds2 = Lists.newArrayList(
new Identifier(pid("pid1", "original", "original"), baseDate, PidType.original, keyValue("key", "value"), EntityType.publication, "50|originalID1"),
new Identifier(pid("pid2", "original", "original"), baseDate, PidType.original, keyValue("key", "value"), EntityType.publication, "50|originalID2"),
new Identifier(pid("pid3", "original", "original"), baseDate, PidType.original, keyValue("key", "value"), EntityType.publication, "50|originalID3")
);
bestIds3 = Lists.newArrayList(
new Identifier(pid("pid1", "original", "original"), baseDate, PidType.original, keyValue("key", "value"), EntityType.publication, "50|originalID1"),
new Identifier(pid("pid2", "doi", "doi"), baseDate, PidType.doi, keyValue("key", "value"), EntityType.publication, "50|originalID2"),
new Identifier(pid("pid3", "original", "original"), baseDate, PidType.original, keyValue("key", "value"), EntityType.publication, "50|originalID3")
);
bestIds = new ArrayList<>();
bestIds2 = Lists
.newArrayList(
new Identifier(pid("pid1", "original", "original"), baseDate, PidType.original,
keyValue("key", "value"), EntityType.publication, "50|originalID1"),
new Identifier(pid("pid2", "original", "original"), baseDate, PidType.original,
keyValue("key", "value"), EntityType.publication, "50|originalID2"),
new Identifier(pid("pid3", "original", "original"), baseDate, PidType.original,
keyValue("key", "value"), EntityType.publication, "50|originalID3"));
bestIds3 = Lists
.newArrayList(
new Identifier(pid("pid1", "original", "original"), baseDate, PidType.original,
keyValue("key", "value"), EntityType.publication, "50|originalID1"),
new Identifier(pid("pid2", "doi", "doi"), baseDate, PidType.doi, keyValue("key", "value"),
EntityType.publication, "50|originalID2"),
new Identifier(pid("pid3", "original", "original"), baseDate, PidType.original,
keyValue("key", "value"), EntityType.publication, "50|originalID3"));
testEntityBasePath = Paths
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/json").toURI())
.toFile()
.getAbsolutePath();
testEntityBasePath = Paths
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/json").toURI())
.toFile()
.getAbsolutePath();
pubs = readSample(testEntityBasePath + "/publication_idgeneration.json", Publication.class);
pubs = readSample(testEntityBasePath + "/publication_idgeneration.json", Publication.class);
}
}
@Test
@Order(1)
public void bestPidToIdentifierTest(){
@Test
@Order(1)
public void bestPidToIdentifierTest() {
List<String> typesForAssertions = Lists.newArrayList(PidType.pmc.toString(), PidType.doi.toString(), PidType.doi.toString());
List<String> typesForAssertions = Lists
.newArrayList(PidType.pmc.toString(), PidType.doi.toString(), PidType.doi.toString());
for (Tuple2<String, Publication> pub : pubs) {
List<Identifier> ids = IdGenerator.bestPidToIdentifier(pub._2());
assertEquals(typesForAssertions.get(pubs.indexOf(pub)), ids.get(0).getPid().getQualifier().getClassid());
bestIds.addAll(ids);
}
}
for (Tuple2<String, Publication> pub : pubs) {
List<Identifier> ids = IdGenerator.bestPidToIdentifier(pub._2());
assertEquals(typesForAssertions.get(pubs.indexOf(pub)), ids.get(0).getPid().getQualifier().getClassid());
bestIds.addAll(ids);
}
}
@Test
@Order(2)
public void generateIdTest1(){
String id1 = IdGenerator.generate(bestIds, "50|defaultID");
@Test
@Order(2)
public void generateIdTest1() {
String id1 = IdGenerator.generate(bestIds, "50|defaultID");
assertEquals("50|dedup_doi___::84f2cc49e3af11f20952eae15cdae066", id1);
}
System.out.println("id list 1 = " + bestIds.stream().map(i -> i.getPid().getValue()).collect(Collectors.toList()));
@Test
public void generateIdTest2(){
String id1 = IdGenerator.generate(bestIds2, "50|defaultID");
String id2 = IdGenerator.generate(bestIds3, "50|defaultID");
assertEquals("50|dedup_wf_001::9c5cfbf993d38476e0f959a301239719", id1);
}
assertEquals("50|dedup_wf_001::2c56cc1914bffdb30fdff354e0099612", id1);
assertEquals("50|dedup_doi___::128ead3ed8d9ecf262704b6fcf592b8d", id2);
}
@Test
public void generateIdTest2() {
String id1 = IdGenerator.generate(bestIds2, "50|defaultID");
String id2 = IdGenerator.generate(bestIds3, "50|defaultID");
public static <T> List<Tuple2<String, T>> readSample(String path, Class<T> clazz) {
List<Tuple2<String, T>> res = new ArrayList<>();
BufferedReader reader;
try {
reader = new BufferedReader(new FileReader(path));
String line = reader.readLine();
while (line != null) {
res
.add(
new Tuple2<>(
MapDocumentUtil.getJPathString("$.id", line),
new ObjectMapper().readValue(line, clazz)));
// read next line
line = reader.readLine();
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("id list 2 = " + bestIds2.stream().map(i -> i.getPid().getValue()).collect(Collectors.toList()));
System.out.println("winner 2 = " + id1);
System.out.println("id list 3 = " + bestIds3.stream().map(i -> i.getPid().getValue()).collect(Collectors.toList()));
System.out.println("winner 3 = " + id2);
return res;
}
assertEquals("50|dedup_wf_001::2c56cc1914bffdb30fdff354e0099612", id1);
assertEquals("50|dedup_doi___::128ead3ed8d9ecf262704b6fcf592b8d", id2);
}
public static StructuredProperty pid(String pid, String classid, String classname){
public static <T> List<Tuple2<String, T>> readSample(String path, Class<T> clazz) {
List<Tuple2<String, T>> res = new ArrayList<>();
BufferedReader reader;
try {
reader = new BufferedReader(new FileReader(path));
String line = reader.readLine();
while (line != null) {
res
.add(
new Tuple2<>(
MapDocumentUtil.getJPathString("$.id", line),
new ObjectMapper().readValue(line, clazz)));
// read next line
line = reader.readLine();
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
StructuredProperty sp = new StructuredProperty();
sp.setValue(pid);
Qualifier q = new Qualifier();
q.setSchemeid(classid);
q.setSchemename(classname);
q.setClassname(classname);
q.setClassid(classid);
sp.setQualifier(q);
return sp;
}
return res;
}
public static List<KeyValue> keyValue(String key, String value){
public static StructuredProperty pid(String pid, String classid, String classname) {
KeyValue kv = new KeyValue();
kv.setKey(key);
kv.setValue(value);
return Lists.newArrayList(kv);
}
StructuredProperty sp = new StructuredProperty();
sp.setValue(pid);
Qualifier q = new Qualifier();
q.setSchemeid(classid);
q.setSchemename(classname);
q.setClassname(classname);
q.setClassid(classid);
sp.setQualifier(q);
return sp;
}
public static List<KeyValue> keyValue(String key, String value) {
KeyValue kv = new KeyValue();
kv.setKey(key);
kv.setValue(value);
return Lists.newArrayList(kv);
}
}

View File

@ -1,18 +1,12 @@
package eu.dnetlib.dhp.oa.dedup;
import static java.nio.file.Files.createTempDirectory;
import static org.apache.spark.sql.functions.count;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.lenient;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.util.MapDocumentUtil;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
@ -22,22 +16,28 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import static java.nio.file.Files.createTempDirectory;
import static org.apache.spark.sql.functions.count;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.lenient;
@ExtendWith(MockitoExtension.class)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class SparkDedupTest implements Serializable {