From 0e54803177cdaeba475f54e28145205a52f11525 Mon Sep 17 00:00:00 2001 From: miconis Date: Tue, 20 Oct 2020 12:19:46 +0200 Subject: [PATCH] bug fix in the id generator and implementation of jobs for organization dedup --- .../dhp/oa/dedup/DedupRecordFactory.java | 3 +- .../eu/dnetlib/dhp/oa/dedup/IdGenerator.java | 14 +- .../dhp/oa/dedup/SparkPrepareNewOrgs.java | 30 ++- .../dhp/oa/dedup/SparkPrepareOrgRels.java | 163 +++++++++---- .../oa/dedup/graph/ConnectedComponent.java | 2 +- .../dhp/oa/dedup/model/Identifier.java | 10 +- .../dnetlib/dhp/oa/dedup/model/OrgSimRel.java | 12 +- .../oa/dedup/orgsdedup/oozie_app/workflow.xml | 33 ++- .../oa/dedup/prepareNewOrgs_parameters.json | 6 + .../oa/dedup/prepareOrgRels_parameters.json | 6 - .../dnetlib/dhp/oa/dedup/IdGeneratorTest.java | 221 ++++++++++-------- .../dnetlib/dhp/oa/dedup/SparkDedupTest.java | 42 ++-- 12 files changed, 353 insertions(+), 189 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index 2e4b219b5..eec36bc0e 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -1,8 +1,8 @@ + package eu.dnetlib.dhp.oa.dedup; import java.util.*; -import eu.dnetlib.dhp.oa.dedup.model.Identifier; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; @@ -15,6 +15,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; +import eu.dnetlib.dhp.oa.dedup.model.Identifier; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import scala.Tuple2; diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java index 7980d8e69..405a9abf1 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java @@ -6,25 +6,25 @@ import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; -import eu.dnetlib.dhp.oa.dedup.model.Identifier; -import eu.dnetlib.dhp.oa.dedup.model.PidType; -import eu.dnetlib.dhp.utils.DHPUtils; import org.apache.commons.lang.StringUtils; import com.google.common.collect.Lists; +import eu.dnetlib.dhp.oa.dedup.model.Identifier; +import eu.dnetlib.dhp.oa.dedup.model.PidType; import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.Field; import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import eu.dnetlib.dhp.utils.DHPUtils; public class IdGenerator implements Serializable { public static String CROSSREF_ID = "10|openaire____::081b82f96300b6a6e3d282bad31cb6e2"; public static String DATACITE_ID = "10|openaire____::9e3be59865b2c1c335d32dae2fe7b254"; - public static String BASE_DATE = "2000-01-01"; + public static String BASE_DATE = "2000-01-01"; // pick the best pid from the list (consider date and pidtype) public static String generate(List pids, String defaultID) { @@ -55,9 +55,9 @@ public class IdGenerator implements Serializable { date = new Date(); } return Lists - .newArrayList( - new Identifier(new StructuredProperty(), date, PidType.original, entity.getCollectedfrom(), - EntityType.fromClass(entity.getClass()), entity.getId())); + .newArrayList( + new Identifier(new StructuredProperty(), date, PidType.original, entity.getCollectedfrom(), + EntityType.fromClass(entity.getClass()), entity.getId())); } // pick the best pid from the entity. Returns a list (length 1) to save time in the call diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java index ea6e6db82..9b91a545e 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java @@ -5,8 +5,11 @@ import java.io.IOException; import java.util.Optional; import java.util.Properties; -import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel; import org.apache.commons.io.IOUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; @@ -18,6 +21,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.Organization; import eu.dnetlib.dhp.schema.oaf.Relation; @@ -62,6 +66,10 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction { .map(Integer::valueOf) .orElse(NUM_CONNECTIONS); + final String apiUrl = Optional + .ofNullable(parser.get("apiUrl")) + .orElse(""); + final String dbUrl = parser.get("dbUrl"); final String dbTable = parser.get("dbTable"); final String dbUser = parser.get("dbUser"); @@ -72,6 +80,7 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction { log.info("actionSetId: '{}'", actionSetId); log.info("workingPath: '{}'", workingPath); log.info("numPartitions: '{}'", numConnections); + log.info("apiUrl: '{}'", apiUrl); log.info("dbUrl: '{}'", dbUrl); log.info("dbUser: '{}'", dbUser); log.info("table: '{}'", dbTable); @@ -89,9 +98,12 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction { newOrgs .repartition(numConnections) .write() - .mode(SaveMode.Overwrite) + .mode(SaveMode.Append) .jdbc(dbUrl, dbTable, connectionProperties); + if (!apiUrl.isEmpty()) + updateSimRels(apiUrl); + } public static Dataset createNewOrgs( @@ -138,9 +150,21 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction { r._1()._2().getLegalshortname() != null ? r._1()._2().getLegalshortname().getValue() : "", r._1()._2().getCountry() != null ? r._1()._2().getCountry().getClassid() : "", r._1()._2().getWebsiteurl() != null ? r._1()._2().getWebsiteurl().getValue() : "", - r._1()._2().getCollectedfrom().get(0).getValue()), + r._1()._2().getCollectedfrom().get(0).getValue(), ""), Encoders.bean(OrgSimRel.class)); } + private static String updateSimRels(final String apiUrl) throws IOException { + + log.info("Updating simrels on the portal"); + + final HttpGet req = new HttpGet(apiUrl); + try (final CloseableHttpClient client = HttpClients.createDefault()) { + try (final CloseableHttpResponse response = client.execute(req)) { + return IOUtils.toString(response.getEntity().getContent()); + } + } + } + } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java index 4a3ce80df..e9933c4e5 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java @@ -1,18 +1,14 @@ - package eu.dnetlib.dhp.oa.dedup; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.Properties; - +import com.google.common.collect.Lists; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import org.apache.commons.io.IOUtils; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; @@ -21,14 +17,11 @@ import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.Organization; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import scala.Tuple2; +import scala.Tuple3; + +import java.io.IOException; +import java.util.*; public class SparkPrepareOrgRels extends AbstractSparkAction { @@ -67,10 +60,6 @@ public class SparkPrepareOrgRels extends AbstractSparkAction { .map(Integer::valueOf) .orElse(NUM_CONNECTIONS); - final String apiUrl = Optional - .ofNullable(parser.get("apiUrl")) - .orElse(""); - final String dbUrl = parser.get("dbUrl"); final String dbTable = parser.get("dbTable"); final String dbUser = parser.get("dbUser"); @@ -81,7 +70,6 @@ public class SparkPrepareOrgRels extends AbstractSparkAction { log.info("actionSetId: '{}'", actionSetId); log.info("workingPath: '{}'", workingPath); log.info("numPartitions: '{}'", numConnections); - log.info("apiUrl: '{}'", apiUrl); log.info("dbUrl: '{}'", dbUrl); log.info("dbUser: '{}'", dbUser); log.info("table: '{}'", dbTable); @@ -102,9 +90,6 @@ public class SparkPrepareOrgRels extends AbstractSparkAction { .mode(SaveMode.Overwrite) .jdbc(dbUrl, dbTable, connectionProperties); - if (!apiUrl.isEmpty()) - updateSimRels(apiUrl); - } public static Dataset createRelations( @@ -112,6 +97,105 @@ public class SparkPrepareOrgRels extends AbstractSparkAction { final String mergeRelsPath, final String entitiesPath) { + Dataset> entities = spark + .read() + .textFile(entitiesPath) + .map( + (MapFunction>) it -> { + Organization entity = OBJECT_MAPPER.readValue(it, Organization.class); + return new Tuple2<>(entity.getId(), entity); + }, + Encoders.tuple(Encoders.STRING(), Encoders.kryo(Organization.class))); + + Dataset> relations = spark + .createDataset( + spark + .read() + .load(mergeRelsPath) + .as(Encoders.bean(Relation.class)) + .where("relClass == 'merges'") + .toJavaRDD() + .mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget())) + .filter(t -> !t._2().contains("openorgsmesh")) + .groupByKey() + .map(g -> Lists.newArrayList(g._2())) + .filter(l -> l.size() > 1) + .flatMap(l -> { + String groupId = "group::" + UUID.randomUUID(); + List ids = sortIds(l); + List> rels = new ArrayList<>(); + + for (String source : ids) { + if (source.contains("openorgs____") || ids.indexOf(source) == 0) + for (String target : ids) { + rels.add(new Tuple3<>(source, target, groupId)); + } + } + return rels.iterator(); + }) + .rdd(), + Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.STRING())); + + Dataset> relations2 = relations // + .joinWith(entities, relations.col("_2").equalTo(entities.col("_1")), "inner") + .map( + (MapFunction, Tuple2>, OrgSimRel>) r -> new OrgSimRel( + r._1()._1(), + r._2()._2().getOriginalId().get(0), + r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "", + r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "", + r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "", + r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "", + r._2()._2().getCollectedfrom().get(0).getValue(), + r._1()._3()), + Encoders.bean(OrgSimRel.class)) + .map( + (MapFunction>) o -> new Tuple2<>(o.getLocal_id(), o), + Encoders.tuple(Encoders.STRING(), Encoders.bean(OrgSimRel.class))); + + return relations2 + .joinWith(entities, relations2.col("_1").equalTo(entities.col("_1")), "inner") + .map( + (MapFunction, Tuple2>, OrgSimRel>) r -> { + OrgSimRel orgSimRel = r._1()._2(); + orgSimRel.setLocal_id(r._2()._2().getOriginalId().get(0)); + return orgSimRel; + }, + Encoders.bean(OrgSimRel.class)); + + } + + // select best ids from the list. Priority: 1) openorgs, 2)corda, 3)alphabetic + public static List sortIds(List ids) { + + ids.sort((o1, o2) -> { + + if (o1.contains("openorgs____") && o2.contains("openorgs____")) + return o1.compareTo(o2); + if (o1.contains("corda") && o2.contains("corda")) + return o1.compareTo(o2); + + if (o1.contains("openorgs____")) + return -1; + if (o2.contains("openorgs____")) + return 1; + + if (o1.contains("corda")) + return -1; + if (o2.contains("corda")) + return 1; + + return o1.compareTo(o2); + }); + + return ids; + } + + public static Dataset createRelationsFromScratch( + final SparkSession spark, + final String mergeRelsPath, + final String entitiesPath) { + // Dataset> entities = spark .read() @@ -151,13 +235,14 @@ public class SparkPrepareOrgRels extends AbstractSparkAction { .joinWith(entities, relations.col("_2").equalTo(entities.col("_1")), "inner") .map( (MapFunction, Tuple2>, OrgSimRel>) r -> new OrgSimRel( - r._1()._1(), - r._2()._2().getOriginalId().get(0), - r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "", - r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "", - r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "", - r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "", - r._2()._2().getCollectedfrom().get(0).getValue()), + r._1()._1(), + r._2()._2().getOriginalId().get(0), + r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "", + r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "", + r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "", + r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "", + r._2()._2().getCollectedfrom().get(0).getValue(), + "group::" + r._1()._1()), Encoders.bean(OrgSimRel.class)) .map( (MapFunction>) o -> new Tuple2<>(o.getLocal_id(), o), @@ -175,16 +260,4 @@ public class SparkPrepareOrgRels extends AbstractSparkAction { } - private static String updateSimRels(final String apiUrl) throws IOException { - - log.info("Updating simrels on the portal"); - - final HttpGet req = new HttpGet(apiUrl); - try (final CloseableHttpClient client = HttpClients.createDefault()) { - try (final CloseableHttpResponse response = client.execute(req)) { - return IOUtils.toString(response.getEntity().getContent()); - } - } - } - } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java index 3d0d24d23..3f24adb93 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java @@ -6,13 +6,13 @@ import java.io.Serializable; import java.util.Set; import java.util.stream.Collectors; -import eu.dnetlib.dhp.utils.DHPUtils; import org.apache.commons.lang.StringUtils; import org.codehaus.jackson.annotate.JsonIgnore; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.oa.dedup.DedupUtility; +import eu.dnetlib.dhp.utils.DHPUtils; import eu.dnetlib.pace.util.PaceException; public class ConnectedComponent implements Serializable { diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java index 95fd21c64..f5b2f48c5 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java @@ -7,6 +7,7 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import com.google.common.collect.Sets; import eu.dnetlib.dhp.oa.dedup.IdGenerator; import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.oaf.KeyValue; @@ -95,8 +96,13 @@ public class Identifier implements Serializable, Comparable { // priority in comparisons: 1) pidtype, 2) collectedfrom (depending on the entity type) , 3) date 4) // alphabetical order of the originalID - Set lKeys = this.collectedFrom.stream().map(KeyValue::getKey).collect(Collectors.toSet()); - Set rKeys = i.getCollectedFrom().stream().map(KeyValue::getKey).collect(Collectors.toSet()); + Set lKeys = Sets.newHashSet(); + if (this.collectedFrom != null) + lKeys = this.collectedFrom.stream().map(KeyValue::getKey).collect(Collectors.toSet()); + + Set rKeys = Sets.newHashSet(); + if (i.getCollectedFrom() != null) + rKeys = i.getCollectedFrom().stream().map(KeyValue::getKey).collect(Collectors.toSet()); if (this.getType().compareTo(i.getType()) == 0) { // same type if (entityType == EntityType.publication) { diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/OrgSimRel.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/OrgSimRel.java index 50164ce4c..65f383500 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/OrgSimRel.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/OrgSimRel.java @@ -12,12 +12,13 @@ public class OrgSimRel implements Serializable { String oa_country; String oa_url; String oa_collectedfrom; + String group_id; public OrgSimRel() { } public OrgSimRel(String local_id, String oa_original_id, String oa_name, String oa_acronym, String oa_country, - String oa_url, String oa_collectedfrom) { + String oa_url, String oa_collectedfrom, String group_id) { this.local_id = local_id; this.oa_original_id = oa_original_id; this.oa_name = oa_name; @@ -25,6 +26,7 @@ public class OrgSimRel implements Serializable { this.oa_country = oa_country; this.oa_url = oa_url; this.oa_collectedfrom = oa_collectedfrom; + this.group_id = group_id; } public String getLocal_id() { @@ -83,6 +85,14 @@ public class OrgSimRel implements Serializable { this.oa_collectedfrom = oa_collectedfrom; } + public String getGroup_id() { + return group_id; + } + + public void setGroup_id(String group_id) { + this.group_id = group_id; + } + @Override public String toString() { return "OrgSimRel{" + diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/workflow.xml index ec9967d6a..e7c95ee8d 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/workflow.xml @@ -112,7 +112,7 @@ -pb - ${graphBasePath}/relation + /tmp/graph_openorgs_and_corda/relation ${workingPath}/${actionSetId}/organization_simrel @@ -194,6 +194,37 @@ --workingPath${workingPath} --isLookUpUrl${isLookUpUrl} --actionSetId${actionSetId} + --dbUrl${dbUrl} + --dbTable${dbTable} + --dbUser${dbUser} + --dbPwd${dbPwd} + --numConnections20 + + + + + + + + yarn + cluster + Prepare New Organizations + eu.dnetlib.dhp.oa.dedup.SparkPrepareNewOrgs + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphBasePath${graphBasePath} + --workingPath${workingPath} + --isLookUpUrl${isLookUpUrl} + --actionSetId${actionSetId} --apiUrl${apiUrl} --dbUrl${dbUrl} --dbTable${dbTable} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareNewOrgs_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareNewOrgs_parameters.json index 2119cbc3a..b70d1af28 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareNewOrgs_parameters.json +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareNewOrgs_parameters.json @@ -29,6 +29,12 @@ "paramDescription": "number of connections to the postgres db (for the write operation)", "paramRequired": false }, + { + "paramName": "au", + "paramLongName": "apiUrl", + "paramDescription": "the url for the APIs of the openorgs service", + "paramRequired": false + }, { "paramName": "du", "paramLongName": "dbUrl", diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json index b70d1af28..2119cbc3a 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json @@ -29,12 +29,6 @@ "paramDescription": "number of connections to the postgres db (for the write operation)", "paramRequired": false }, - { - "paramName": "au", - "paramLongName": "apiUrl", - "paramDescription": "the url for the APIs of the openorgs service", - "paramRequired": false - }, { "paramName": "du", "paramLongName": "dbUrl", diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/IdGeneratorTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/IdGeneratorTest.java index 1bf851527..403498aeb 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/IdGeneratorTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/IdGeneratorTest.java @@ -1,17 +1,6 @@ + package eu.dnetlib.dhp.oa.dedup; -import com.google.common.collect.Lists; -import eu.dnetlib.dhp.oa.dedup.model.Identifier; -import eu.dnetlib.dhp.oa.dedup.model.PidType; -import eu.dnetlib.dhp.schema.common.EntityType; -import eu.dnetlib.dhp.schema.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.Publication; -import eu.dnetlib.dhp.schema.oaf.Qualifier; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; -import eu.dnetlib.pace.util.MapDocumentUtil; -import org.codehaus.jackson.map.ObjectMapper; -import org.junit.jupiter.api.*; -import scala.Tuple2; import static org.junit.jupiter.api.Assertions.assertEquals; import java.io.BufferedReader; @@ -22,119 +11,149 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.stream.Collectors; + +import org.codehaus.jackson.map.ObjectMapper; +import org.junit.jupiter.api.*; + +import com.google.common.collect.Lists; + +import eu.dnetlib.dhp.oa.dedup.model.Identifier; +import eu.dnetlib.dhp.oa.dedup.model.PidType; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import eu.dnetlib.pace.util.MapDocumentUtil; +import scala.Tuple2; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class IdGeneratorTest { - private static List bestIds; - private static List> pubs; + private static List bestIds; + private static List> pubs; - private static List bestIds2; - private static List bestIds3; + private static List bestIds2; + private static List bestIds3; - private static String testEntityBasePath; + private static String testEntityBasePath; - private static SimpleDateFormat sdf; - private static Date baseDate; + private static SimpleDateFormat sdf; + private static Date baseDate; - @BeforeAll - public static void setUp() throws Exception { + @BeforeAll + public static void setUp() throws Exception { - sdf = new SimpleDateFormat("yyyy-MM-dd"); - baseDate = sdf.parse("2000-01-01"); + sdf = new SimpleDateFormat("yyyy-MM-dd"); + baseDate = sdf.parse("2000-01-01"); - bestIds = new ArrayList<>(); - bestIds2 = Lists.newArrayList( - new Identifier(pid("pid1", "original", "original"), baseDate, PidType.original, keyValue("key", "value"), EntityType.publication, "50|originalID1"), - new Identifier(pid("pid2", "original", "original"), baseDate, PidType.original, keyValue("key", "value"), EntityType.publication, "50|originalID2"), - new Identifier(pid("pid3", "original", "original"), baseDate, PidType.original, keyValue("key", "value"), EntityType.publication, "50|originalID3") - ); - bestIds3 = Lists.newArrayList( - new Identifier(pid("pid1", "original", "original"), baseDate, PidType.original, keyValue("key", "value"), EntityType.publication, "50|originalID1"), - new Identifier(pid("pid2", "doi", "doi"), baseDate, PidType.doi, keyValue("key", "value"), EntityType.publication, "50|originalID2"), - new Identifier(pid("pid3", "original", "original"), baseDate, PidType.original, keyValue("key", "value"), EntityType.publication, "50|originalID3") - ); + bestIds = new ArrayList<>(); + bestIds2 = Lists + .newArrayList( + new Identifier(pid("pid1", "original", "original"), baseDate, PidType.original, + keyValue("key", "value"), EntityType.publication, "50|originalID1"), + new Identifier(pid("pid2", "original", "original"), baseDate, PidType.original, + keyValue("key", "value"), EntityType.publication, "50|originalID2"), + new Identifier(pid("pid3", "original", "original"), baseDate, PidType.original, + keyValue("key", "value"), EntityType.publication, "50|originalID3")); + bestIds3 = Lists + .newArrayList( + new Identifier(pid("pid1", "original", "original"), baseDate, PidType.original, + keyValue("key", "value"), EntityType.publication, "50|originalID1"), + new Identifier(pid("pid2", "doi", "doi"), baseDate, PidType.doi, keyValue("key", "value"), + EntityType.publication, "50|originalID2"), + new Identifier(pid("pid3", "original", "original"), baseDate, PidType.original, + keyValue("key", "value"), EntityType.publication, "50|originalID3")); - testEntityBasePath = Paths - .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/json").toURI()) - .toFile() - .getAbsolutePath(); + testEntityBasePath = Paths + .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/json").toURI()) + .toFile() + .getAbsolutePath(); - pubs = readSample(testEntityBasePath + "/publication_idgeneration.json", Publication.class); + pubs = readSample(testEntityBasePath + "/publication_idgeneration.json", Publication.class); - } + } - @Test - @Order(1) - public void bestPidToIdentifierTest(){ + @Test + @Order(1) + public void bestPidToIdentifierTest() { - List typesForAssertions = Lists.newArrayList(PidType.pmc.toString(), PidType.doi.toString(), PidType.doi.toString()); + List typesForAssertions = Lists + .newArrayList(PidType.pmc.toString(), PidType.doi.toString(), PidType.doi.toString()); - for (Tuple2 pub : pubs) { - List ids = IdGenerator.bestPidToIdentifier(pub._2()); - assertEquals(typesForAssertions.get(pubs.indexOf(pub)), ids.get(0).getPid().getQualifier().getClassid()); - bestIds.addAll(ids); - } - } + for (Tuple2 pub : pubs) { + List ids = IdGenerator.bestPidToIdentifier(pub._2()); + assertEquals(typesForAssertions.get(pubs.indexOf(pub)), ids.get(0).getPid().getQualifier().getClassid()); + bestIds.addAll(ids); + } + } - @Test - @Order(2) - public void generateIdTest1(){ - String id1 = IdGenerator.generate(bestIds, "50|defaultID"); + @Test + @Order(2) + public void generateIdTest1() { + String id1 = IdGenerator.generate(bestIds, "50|defaultID"); - assertEquals("50|dedup_doi___::84f2cc49e3af11f20952eae15cdae066", id1); - } + System.out.println("id list 1 = " + bestIds.stream().map(i -> i.getPid().getValue()).collect(Collectors.toList())); - @Test - public void generateIdTest2(){ - String id1 = IdGenerator.generate(bestIds2, "50|defaultID"); - String id2 = IdGenerator.generate(bestIds3, "50|defaultID"); + assertEquals("50|dedup_wf_001::9c5cfbf993d38476e0f959a301239719", id1); + } - assertEquals("50|dedup_wf_001::2c56cc1914bffdb30fdff354e0099612", id1); - assertEquals("50|dedup_doi___::128ead3ed8d9ecf262704b6fcf592b8d", id2); - } + @Test + public void generateIdTest2() { + String id1 = IdGenerator.generate(bestIds2, "50|defaultID"); + String id2 = IdGenerator.generate(bestIds3, "50|defaultID"); - public static List> readSample(String path, Class clazz) { - List> res = new ArrayList<>(); - BufferedReader reader; - try { - reader = new BufferedReader(new FileReader(path)); - String line = reader.readLine(); - while (line != null) { - res - .add( - new Tuple2<>( - MapDocumentUtil.getJPathString("$.id", line), - new ObjectMapper().readValue(line, clazz))); - // read next line - line = reader.readLine(); - } - reader.close(); - } catch (IOException e) { - e.printStackTrace(); - } + System.out.println("id list 2 = " + bestIds2.stream().map(i -> i.getPid().getValue()).collect(Collectors.toList())); + System.out.println("winner 2 = " + id1); + System.out.println("id list 3 = " + bestIds3.stream().map(i -> i.getPid().getValue()).collect(Collectors.toList())); + System.out.println("winner 3 = " + id2); - return res; - } + assertEquals("50|dedup_wf_001::2c56cc1914bffdb30fdff354e0099612", id1); + assertEquals("50|dedup_doi___::128ead3ed8d9ecf262704b6fcf592b8d", id2); + } - public static StructuredProperty pid(String pid, String classid, String classname){ + public static List> readSample(String path, Class clazz) { + List> res = new ArrayList<>(); + BufferedReader reader; + try { + reader = new BufferedReader(new FileReader(path)); + String line = reader.readLine(); + while (line != null) { + res + .add( + new Tuple2<>( + MapDocumentUtil.getJPathString("$.id", line), + new ObjectMapper().readValue(line, clazz))); + // read next line + line = reader.readLine(); + } + reader.close(); + } catch (IOException e) { + e.printStackTrace(); + } - StructuredProperty sp = new StructuredProperty(); - sp.setValue(pid); - Qualifier q = new Qualifier(); - q.setSchemeid(classid); - q.setSchemename(classname); - q.setClassname(classname); - q.setClassid(classid); - sp.setQualifier(q); - return sp; - } + return res; + } - public static List keyValue(String key, String value){ + public static StructuredProperty pid(String pid, String classid, String classname) { - KeyValue kv = new KeyValue(); - kv.setKey(key); - kv.setValue(value); - return Lists.newArrayList(kv); - } + StructuredProperty sp = new StructuredProperty(); + sp.setValue(pid); + Qualifier q = new Qualifier(); + q.setSchemeid(classid); + q.setSchemename(classname); + q.setClassname(classname); + q.setClassid(classid); + sp.setQualifier(q); + return sp; + } + + public static List keyValue(String key, String value) { + + KeyValue kv = new KeyValue(); + kv.setKey(key); + kv.setValue(value); + return Lists.newArrayList(kv); + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index 6516eda52..b849160ff 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -1,18 +1,12 @@ package eu.dnetlib.dhp.oa.dedup; -import static java.nio.file.Files.createTempDirectory; - -import static org.apache.spark.sql.functions.count; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.lenient; - -import java.io.File; -import java.io.IOException; -import java.io.Serializable; -import java.net.URISyntaxException; -import java.nio.file.Paths; - +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.util.MapDocumentUtil; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; @@ -22,22 +16,28 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.sql.*; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.*; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import eu.dnetlib.pace.util.MapDocumentUtil; import scala.Tuple2; +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.net.URISyntaxException; +import java.nio.file.Paths; + +import static java.nio.file.Files.createTempDirectory; +import static org.apache.spark.sql.functions.count; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.lenient; + @ExtendWith(MockitoExtension.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class SparkDedupTest implements Serializable {