From 1a85020572db5f9e8084a4ff84b11df59643ed5d Mon Sep 17 00:00:00 2001 From: miconis Date: Fri, 26 Feb 2021 10:19:28 +0100 Subject: [PATCH] bug fix in graph-mapper, changes in the implementation of the openorgs wf to create relations and populate openorgs db --- dhp-workflows/dhp-dedup-openaire/pom.xml | 5 +- .../dhp/oa/dedup/AbstractSparkAction.java | 1 + .../eu/dnetlib/dhp/oa/dedup/DedupUtility.java | 5 + .../dhp/oa/dedup/SparkCopyOpenorgs.java | 31 +- .../oa/dedup/SparkCopyOpenorgsMergeRels.java | 37 +- .../oa/dedup/SparkCopyOpenorgsSimRels.java | 145 ++++---- .../dedup/SparkCopyRelationsNoOpenorgs.java | 144 ++++---- .../dhp/oa/dedup/SparkCreateSimRels.java | 22 +- .../dhp/oa/dedup/SparkPrepareNewOrgs.java | 249 +++++++++++++ .../dhp/oa/dedup/SparkPrepareOrgRels.java | 341 ++++++++++++++++++ .../dhp/oa/dedup/SparkRemoveDiffRels.java | 131 ++++--- .../dnetlib/dhp/oa/dedup/model/OrgSimRel.java | 108 ++++++ .../oa/dedup/openorgs/oozie_app/workflow.xml | 35 +- .../oa/dedup/prepareNewOrgs_parameters.json | 62 ++++ .../oa/dedup/prepareOrgRels_parameters.json | 56 +++ .../dhp/oa/dedup/SparkOpenorgsTest.java | 296 +++++++++++++++ .../profiles/mock_orchestrator_openorgs.xml | 24 ++ .../raw/MigrateDbEntitiesApplication.java | 2 +- pom.xml | 3 - 19 files changed, 1436 insertions(+), 261 deletions(-) create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/OrgSimRel.java create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareNewOrgs_parameters.json create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json create mode 100644 dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsTest.java create mode 100644 dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_openorgs.xml diff --git a/dhp-workflows/dhp-dedup-openaire/pom.xml b/dhp-workflows/dhp-dedup-openaire/pom.xml index 03ddbcf4c..04e158542 100644 --- a/dhp-workflows/dhp-dedup-openaire/pom.xml +++ b/dhp-workflows/dhp-dedup-openaire/pom.xml @@ -90,7 +90,10 @@ com.fasterxml.jackson.core jackson-core - + + org.apache.httpcomponents + httpclient + diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java index 74cecb7b6..9a1127764 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java @@ -29,6 +29,7 @@ import eu.dnetlib.pace.config.DedupConfig; abstract class AbstractSparkAction implements Serializable { protected static final int NUM_PARTITIONS = 1000; + protected static final int NUM_CONNECTIONS = 20; protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java index 01065510a..88873086d 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java @@ -95,6 +95,11 @@ public class DedupUtility { return String.format("%s/%s/%s_simrel", basePath, actionSetId, entityType); } + public static String createOpenorgsMergeRelsPath( + final String basePath, final String actionSetId, final String entityType) { + return String.format("%s/%s/%s_openorgs_mergerels", basePath, actionSetId, entityType); + } + public static String createMergeRelPath( final String basePath, final String actionSetId, final String entityType) { return String.format("%s/%s/%s_mergerel", basePath, actionSetId, entityType); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java index aa7a131e7..ff7aca627 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java @@ -6,6 +6,7 @@ import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -19,6 +20,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Organization; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -34,7 +36,7 @@ public class SparkCopyOpenorgs extends AbstractSparkAction { ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( - SparkCreateSimRels.class + SparkCopyOpenorgs.class .getResourceAsStream( "/eu/dnetlib/dhp/oa/dedup/copyOpenorgs_parameters.json"))); parser.parseArgument(args); @@ -72,7 +74,7 @@ public class SparkCopyOpenorgs extends AbstractSparkAction { final Class clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity)); - filterEntities(spark, entityPath, clazz) + filterOpenorgs(spark, entityPath) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") @@ -80,21 +82,20 @@ public class SparkCopyOpenorgs extends AbstractSparkAction { } - public static Dataset filterEntities( + public static Dataset filterOpenorgs( final SparkSession spark, - final String entitiesInputPath, - final Class clazz) { + final String entitiesInputPath) { - // - Dataset entities = spark - .read() - .textFile(entitiesInputPath) - .map( - (MapFunction) it -> { - T entity = OBJECT_MAPPER.readValue(it, clazz); - return entity; - }, - Encoders.kryo(clazz)); + JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + Dataset entities = spark + .createDataset( + sc + .textFile(entitiesInputPath) + .map(it -> OBJECT_MAPPER.readValue(it, Organization.class)) + .rdd(), + Encoders.bean(Organization.class)); + + entities.show(); return entities.filter(entities.col("id").contains("openorgs____")); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java index d705fca6b..4bb46222e 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java @@ -6,14 +6,13 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; -import eu.dnetlib.dhp.schema.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.Qualifier; -import eu.dnetlib.pace.config.DedupConfig; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.dom4j.DocumentException; import org.slf4j.Logger; @@ -21,10 +20,13 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.config.DedupConfig; //copy simrels (verified) from relation to the workdir in order to make them available for the deduplication public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction { @@ -83,17 +85,17 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction { .textFile(relationPath) .map(patchRelFn(), Encoders.bean(Relation.class)) .toJavaRDD() - .filter(this::isOpenorgs) //takes only relations coming from openorgs - .filter(this::filterOpenorgsRels) //takes only isSimilarTo relations between organizations from openorgs - .filter(this::excludeOpenorgsMesh) //excludes relations between an organization and an openorgsmesh - .filter(this::excludeNonOpenorgs); //excludes relations with no openorgs id involved + .filter(this::isOpenorgs) // takes only relations coming from openorgs + .filter(this::filterOpenorgsRels) // takes only isSimilarTo relations between organizations from openorgs + .filter(this::excludeOpenorgsMesh) // excludes relations between an organization and an openorgsmesh + .filter(this::excludeNonOpenorgs); // excludes relations with no openorgs id involved - //turn openorgs isSimilarTo relations into mergerels - JavaRDD mergeRels = rawRels.flatMap(rel -> { + // turn openorgs isSimilarTo relations into mergerels + JavaRDD mergeRelsRDD = rawRels.flatMap(rel -> { List mergerels = new ArrayList<>(); - String openorgsId = rel.getSource().contains("openorgs____")? rel.getSource() : rel.getTarget(); - String mergedId = rel.getSource().contains("openorgs____")? rel.getTarget() : rel.getSource(); + String openorgsId = rel.getSource().contains("openorgs____") ? rel.getSource() : rel.getTarget(); + String mergedId = rel.getSource().contains("openorgs____") ? rel.getTarget() : rel.getSource(); mergerels.add(rel(openorgsId, mergedId, "merges", dedupConf)); mergerels.add(rel(mergedId, openorgsId, "isMergedIn", dedupConf)); @@ -101,7 +103,13 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction { return mergerels.iterator(); }); - mergeRels.saveAsTextFile(outputPath); + spark + .createDataset( + mergeRelsRDD.rdd(), + Encoders.bean(Relation.class)) + .write() + .mode(SaveMode.Append) + .parquet(outputPath); } private static MapFunction patchRelFn() { @@ -116,7 +124,8 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction { private boolean filterOpenorgsRels(Relation rel) { - if (rel.getRelClass().equals("isSimilarTo") && rel.getRelType().equals("organizationOrganization") && rel.getSubRelType().equals("dedup")) + if (rel.getRelClass().equals("isSimilarTo") && rel.getRelType().equals("organizationOrganization") + && rel.getSubRelType().equals("dedup")) return true; return false; } @@ -124,7 +133,7 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction { private boolean isOpenorgs(Relation rel) { if (rel.getCollectedfrom() != null) { - for (KeyValue k: rel.getCollectedfrom()) { + for (KeyValue k : rel.getCollectedfrom()) { if (k.getValue().equals("OpenOrgs Database")) { return true; } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java index 3ce676f84..b7f88a5f6 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java @@ -6,13 +6,13 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; -import eu.dnetlib.dhp.schema.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.Qualifier; -import eu.dnetlib.pace.config.DedupConfig; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.graphx.Edge; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; @@ -22,97 +22,98 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.config.DedupConfig; //copy simrels (verified) from relation to the workdir in order to make them available for the deduplication public class SparkCopyOpenorgsSimRels extends AbstractSparkAction { - private static final Logger log = LoggerFactory.getLogger(SparkCopyOpenorgsMergeRels.class); + private static final Logger log = LoggerFactory.getLogger(SparkCopyOpenorgsSimRels.class); - public SparkCopyOpenorgsSimRels(ArgumentApplicationParser parser, SparkSession spark) { - super(parser, spark); - } + public SparkCopyOpenorgsSimRels(ArgumentApplicationParser parser, SparkSession spark) { + super(parser, spark); + } - public static void main(String[] args) throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCopyOpenorgsSimRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json"))); - parser.parseArgument(args); + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCopyOpenorgsSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json"))); + parser.parseArgument(args); - SparkConf conf = new SparkConf(); - new SparkCopyOpenorgsSimRels(parser, getSparkSession(conf)) - .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); - } + SparkConf conf = new SparkConf(); + new SparkCopyOpenorgsSimRels(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } - @Override - public void run(ISLookUpService isLookUpService) - throws DocumentException, IOException, ISLookUpException { + @Override + public void run(ISLookUpService isLookUpService) + throws DocumentException, IOException, ISLookUpException { - // read oozie parameters - final String graphBasePath = parser.get("graphBasePath"); - final String actionSetId = parser.get("actionSetId"); - final String workingPath = parser.get("workingPath"); - final int numPartitions = Optional - .ofNullable(parser.get("numPartitions")) - .map(Integer::valueOf) - .orElse(NUM_PARTITIONS); + // read oozie parameters + final String graphBasePath = parser.get("graphBasePath"); + final String actionSetId = parser.get("actionSetId"); + final String workingPath = parser.get("workingPath"); + final int numPartitions = Optional + .ofNullable(parser.get("numPartitions")) + .map(Integer::valueOf) + .orElse(NUM_PARTITIONS); - log.info("numPartitions: '{}'", numPartitions); - log.info("graphBasePath: '{}'", graphBasePath); - log.info("actionSetId: '{}'", actionSetId); - log.info("workingPath: '{}'", workingPath); + log.info("numPartitions: '{}'", numPartitions); + log.info("graphBasePath: '{}'", graphBasePath); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); - log.info("Copying OpenOrgs SimRels"); + log.info("Copying OpenOrgs SimRels"); - final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, "organization"); + final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, "organization"); - removeOutputDir(spark, outputPath); + final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation"); - final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation"); + Dataset rawRels = spark + .read() + .textFile(relationPath) + .map(patchRelFn(), Encoders.bean(Relation.class)) + .filter(this::filterOpenorgsRels); - JavaRDD rawRels = spark - .read() - .textFile(relationPath) - .map(patchRelFn(), Encoders.bean(Relation.class)) - .toJavaRDD() - .filter(this::isOpenorgs) - .filter(this::filterOpenorgsRels); + save(rawRels, outputPath, SaveMode.Append); - save(spark.createDataset(rawRels.rdd(),Encoders.bean(Relation.class)), outputPath, SaveMode.Append); - } + log.info("Copied " + rawRels.count() + " Similarity Relations"); + } - private static MapFunction patchRelFn() { - return value -> { - final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); - if (rel.getDataInfo() == null) { - rel.setDataInfo(new DataInfo()); - } - return rel; - }; - } + private static MapFunction patchRelFn() { + return value -> { + final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); + if (rel.getDataInfo() == null) { + rel.setDataInfo(new DataInfo()); + } + return rel; + }; + } - private boolean filterOpenorgsRels(Relation rel) { + private boolean filterOpenorgsRels(Relation rel) { - if (rel.getRelClass().equals("isSimilarTo") && rel.getRelType().equals("organizationOrganization") && rel.getSubRelType().equals("dedup")) - return true; - return false; - } + if (rel.getRelClass().equals("isSimilarTo") && rel.getRelType().equals("organizationOrganization") + && rel.getSubRelType().equals("dedup") && isOpenorgs(rel)) + return true; + return false; + } - private boolean isOpenorgs(Relation rel) { + private boolean isOpenorgs(Relation rel) { - if (rel.getCollectedfrom() != null) { - for (KeyValue k: rel.getCollectedfrom()) { - if (k.getValue().equals("OpenOrgs Database")) { - return true; - } - } - } - return false; - } + if (rel.getCollectedfrom() != null) { + for (KeyValue k : rel.getCollectedfrom()) { + if (k.getValue() != null && k.getValue().equals("OpenOrgs Database")) { + return true; + } + } + } + return false; + } } - diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java index 319c40d8d..64a110892 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java @@ -1,12 +1,9 @@ + package eu.dnetlib.dhp.oa.dedup; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import eu.dnetlib.pace.util.MapDocumentUtil; +import java.io.IOException; +import java.util.Optional; + import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -19,92 +16,95 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; import org.dom4j.DocumentException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Tuple2; -import java.io.IOException; -import java.util.Optional; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.util.MapDocumentUtil; +import scala.Tuple2; public class SparkCopyRelationsNoOpenorgs extends AbstractSparkAction { - private static final Logger log = LoggerFactory.getLogger(SparkUpdateEntity.class); + private static final Logger log = LoggerFactory.getLogger(SparkUpdateEntity.class); - private static final String IDJSONPATH = "$.id"; + public SparkCopyRelationsNoOpenorgs(ArgumentApplicationParser parser, SparkSession spark) { + super(parser, spark); + } - public SparkCopyRelationsNoOpenorgs(ArgumentApplicationParser parser, SparkSession spark) { - super(parser, spark); - } + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCopyRelationsNoOpenorgs.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"))); + parser.parseArgument(args); - public static void main(String[] args) throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCopyRelationsNoOpenorgs.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"))); - parser.parseArgument(args); + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); - SparkConf conf = new SparkConf(); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + new SparkUpdateEntity(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } - new SparkUpdateEntity(parser, getSparkSession(conf)) - .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); - } + public void run(ISLookUpService isLookUpService) throws IOException { - public void run(ISLookUpService isLookUpService) throws IOException { + final String graphBasePath = parser.get("graphBasePath"); + final String workingPath = parser.get("workingPath"); + final String dedupGraphPath = parser.get("dedupGraphPath"); - final String graphBasePath = parser.get("graphBasePath"); - final String workingPath = parser.get("workingPath"); - final String dedupGraphPath = parser.get("dedupGraphPath"); + log.info("graphBasePath: '{}'", graphBasePath); + log.info("workingPath: '{}'", workingPath); + log.info("dedupGraphPath: '{}'", dedupGraphPath); - log.info("graphBasePath: '{}'", graphBasePath); - log.info("workingPath: '{}'", workingPath); - log.info("dedupGraphPath: '{}'", dedupGraphPath); + final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation"); + final String outputPath = DedupUtility.createEntityPath(dedupGraphPath, "relation"); - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + removeOutputDir(spark, outputPath); - final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation"); - final String outputPath = DedupUtility.createEntityPath(dedupGraphPath, "relation"); + JavaRDD simRels = spark + .read() + .textFile(relationPath) + .map(patchRelFn(), Encoders.bean(Relation.class)) + .toJavaRDD() + .filter(this::excludeOpenorgsRels); - removeOutputDir(spark, outputPath); + spark + .createDataset(simRels.rdd(), Encoders.bean(Relation.class)) + .write() + .mode(SaveMode.Overwrite) + .json(outputPath); - JavaRDD simRels = spark - .read() - .textFile(relationPath) - .map(patchRelFn(), Encoders.bean(Relation.class)) - .toJavaRDD() - .filter(this::excludeOpenorgsRels); + } - simRels.saveAsTextFile(outputPath); + private static MapFunction patchRelFn() { + return value -> { + final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); + if (rel.getDataInfo() == null) { + rel.setDataInfo(new DataInfo()); + } + return rel; + }; + } - } + private boolean excludeOpenorgsRels(Relation rel) { - private static MapFunction patchRelFn() { - return value -> { - final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); - if (rel.getDataInfo() == null) { - rel.setDataInfo(new DataInfo()); - } - return rel; - }; - } - - private boolean excludeOpenorgsRels(Relation rel) { - - if (rel.getCollectedfrom() != null) { - for (KeyValue k: rel.getCollectedfrom()) { - if (k.getValue().equals("OpenOrgs Database")) { - return false; - } - } - } - return true; - } + if (rel.getCollectedfrom() != null) { + for (KeyValue k : rel.getCollectedfrom()) { + if (k.getValue().equals("OpenOrgs Database")) { + return false; + } + } + } + return true; + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index b3ee47bfc..a7566f2e2 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -10,6 +10,7 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; @@ -81,7 +82,6 @@ public class SparkCreateSimRels extends AbstractSparkAction { log.info("Creating simrels for: '{}'", subEntity); final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity); - removeOutputDir(spark, outputPath); JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -99,13 +99,19 @@ public class SparkCreateSimRels extends AbstractSparkAction { .createSortedBlocks(mapDocuments, dedupConf) .repartition(numPartitions); - // create relations by comparing only elements in the same group - Deduper - .computeRelations(sc, blocks, dedupConf) - .map(t -> createSimRel(t._1(), t._2(), entity)) - .repartition(numPartitions) - .map(r -> OBJECT_MAPPER.writeValueAsString(r)) - .saveAsTextFile(outputPath); + Dataset simRels = spark + .createDataset( + Deduper + .computeRelations(sc, blocks, dedupConf) + .map(t -> createSimRel(t._1(), t._2(), entity)) + .repartition(numPartitions) + .rdd(), + Encoders.bean(Relation.class)); + + save(simRels, outputPath, SaveMode.Append); + + log.info("Generated " + simRels.count() + " Similarity Relations"); + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java new file mode 100644 index 000000000..3b29e1e17 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java @@ -0,0 +1,249 @@ + +package eu.dnetlib.dhp.oa.dedup; + +import java.io.IOException; +import java.util.Optional; +import java.util.Properties; + +import org.apache.commons.io.IOUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import scala.Tuple2; + +public class SparkPrepareNewOrgs extends AbstractSparkAction { + + private static final Logger log = LoggerFactory.getLogger(SparkPrepareNewOrgs.class); + + public SparkPrepareNewOrgs(ArgumentApplicationParser parser, SparkSession spark) { + super(parser, spark); + } + + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkPrepareNewOrgs.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/prepareNewOrgs_parameters.json"))); + parser.parseArgument(args); + + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + + new SparkPrepareNewOrgs(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } + + @Override + public void run(ISLookUpService isLookUpService) throws IOException { + + final String graphBasePath = parser.get("graphBasePath"); + final String isLookUpUrl = parser.get("isLookUpUrl"); + final String actionSetId = parser.get("actionSetId"); + final String workingPath = parser.get("workingPath"); + final int numConnections = Optional + .ofNullable(parser.get("numConnections")) + .map(Integer::valueOf) + .orElse(NUM_CONNECTIONS); + + final String apiUrl = Optional + .ofNullable(parser.get("apiUrl")) + .orElse(""); + + final String dbUrl = parser.get("dbUrl"); + final String dbTable = parser.get("dbTable"); + final String dbUser = parser.get("dbUser"); + final String dbPwd = parser.get("dbPwd"); + + log.info("graphBasePath: '{}'", graphBasePath); + log.info("isLookUpUrl: '{}'", isLookUpUrl); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); + log.info("numPartitions: '{}'", numConnections); + log.info("apiUrl: '{}'", apiUrl); + log.info("dbUrl: '{}'", dbUrl); + log.info("dbUser: '{}'", dbUser); + log.info("table: '{}'", dbTable); + log.info("dbPwd: '{}'", "xxx"); + + final String entityPath = DedupUtility.createEntityPath(graphBasePath, "organization"); + final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization"); + final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation"); + + Dataset newOrgs = createNewOrgs(spark, mergeRelPath, relationPath, entityPath); + + final Properties connectionProperties = new Properties(); + connectionProperties.put("user", dbUser); + connectionProperties.put("password", dbPwd); + + log.info("Number of New Organization created: '{}'", newOrgs.count()); + + newOrgs + .repartition(numConnections) + .write() + .mode(SaveMode.Append) + .jdbc(dbUrl, dbTable, connectionProperties); + + if (!apiUrl.isEmpty()) + updateSimRels(apiUrl); + + } + + public static Dataset createNewOrgs( + final SparkSession spark, + final String mergeRelsPath, + final String relationPath, + final String entitiesPath) { + + // collect diffrels from the raw graph relations: + JavaPairRDD diffRels = spark + .read() + .textFile(relationPath) + .map(patchRelFn(), Encoders.bean(Relation.class)) + .toJavaRDD() + .filter(r -> filterRels(r, "organization")) + // take the worst id of the diffrel: + .mapToPair(rel -> { + if (compareIds(rel.getSource(), rel.getTarget()) > 0) + return new Tuple2<>(rel.getSource(), "diffRel"); + else + return new Tuple2<>(rel.getTarget(), "diffRel"); + }) + .distinct(); + log.info("Number of DiffRels collected: '{}'", diffRels.count()); + + // collect entities: + Dataset> entities = spark + .read() + .textFile(entitiesPath) + .map( + (MapFunction>) it -> { + Organization entity = OBJECT_MAPPER.readValue(it, Organization.class); + return new Tuple2<>(entity.getId(), entity); + }, + Encoders.tuple(Encoders.STRING(), Encoders.kryo(Organization.class))); + + // collect mergerels and remove ids in the diffrels + Dataset> openorgsRels = spark + .createDataset( + spark + .read() + .load(mergeRelsPath) + .as(Encoders.bean(Relation.class)) + .where("relClass == 'isMergedIn'") + .toJavaRDD() + .mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget())) // + .leftOuterJoin(diffRels) // + .filter(rel -> !rel._2()._2().isPresent()) + .mapToPair(rel -> new Tuple2<>(rel._1(), rel._2()._1())) + .rdd(), + Encoders.tuple(Encoders.STRING(), Encoders.STRING())); + log.info("Number of Openorgs Relations loaded: '{}'", openorgsRels.count()); + + return entities + .joinWith(openorgsRels, entities.col("_1").equalTo(openorgsRels.col("_1")), "left") + .filter((FilterFunction, Tuple2>>) t -> t._2() == null) + // take entities not in mergerels (they are single entities, therefore are new orgs) + .filter( + (FilterFunction, Tuple2>>) t -> !t + ._1() + ._1() + .contains("openorgs")) + // exclude openorgs, don't need to propose them as new orgs + .map( + (MapFunction, Tuple2>, OrgSimRel>) r -> new OrgSimRel( + "", + r._1()._2().getOriginalId().get(0), + r._1()._2().getLegalname() != null ? r._1()._2().getLegalname().getValue() : "", + r._1()._2().getLegalshortname() != null ? r._1()._2().getLegalshortname().getValue() : "", + r._1()._2().getCountry() != null ? r._1()._2().getCountry().getClassid() : "", + r._1()._2().getWebsiteurl() != null ? r._1()._2().getWebsiteurl().getValue() : "", + r._1()._2().getCollectedfrom().get(0).getValue(), ""), + Encoders.bean(OrgSimRel.class)); + + } + + private static String updateSimRels(final String apiUrl) throws IOException { + + log.info("Updating simrels on the portal"); + + final HttpGet req = new HttpGet(apiUrl); + try (final CloseableHttpClient client = HttpClients.createDefault()) { + try (final CloseableHttpResponse response = client.execute(req)) { + return IOUtils.toString(response.getEntity().getContent()); + } + } + } + + private static MapFunction patchRelFn() { + return value -> { + final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); + if (rel.getDataInfo() == null) { + rel.setDataInfo(new DataInfo()); + } + return rel; + }; + } + + public static int compareIds(String o1, String o2) { + if (o1.contains("openorgs____") && o2.contains("openorgs____")) + return o1.compareTo(o2); + if (o1.contains("corda") && o2.contains("corda")) + return o1.compareTo(o2); + + if (o1.contains("openorgs____")) + return -1; + if (o2.contains("openorgs____")) + return 1; + + if (o1.contains("corda")) + return -1; + if (o2.contains("corda")) + return 1; + + return o1.compareTo(o2); + } + + private static boolean filterRels(Relation rel, String entityType) { + + switch (entityType) { + case "result": + if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("resultResult") + && rel.getSubRelType().equals("dedup")) + return true; + break; + case "organization": + if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("organizationOrganization") + && rel.getSubRelType().equals("dedup")) + return true; + break; + default: + return false; + } + return false; + } + +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java new file mode 100644 index 000000000..cbca0b326 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java @@ -0,0 +1,341 @@ + +package eu.dnetlib.dhp.oa.dedup; + +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import scala.Tuple2; +import scala.Tuple3; + +public class SparkPrepareOrgRels extends AbstractSparkAction { + + private static final Logger log = LoggerFactory.getLogger(SparkPrepareOrgRels.class); + + public SparkPrepareOrgRels(ArgumentApplicationParser parser, SparkSession spark) { + super(parser, spark); + } + + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json"))); + parser.parseArgument(args); + + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + + new SparkPrepareOrgRels(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } + + @Override + public void run(ISLookUpService isLookUpService) throws IOException { + + final String graphBasePath = parser.get("graphBasePath"); + final String isLookUpUrl = parser.get("isLookUpUrl"); + final String actionSetId = parser.get("actionSetId"); + final String workingPath = parser.get("workingPath"); + final int numConnections = Optional + .ofNullable(parser.get("numConnections")) + .map(Integer::valueOf) + .orElse(NUM_CONNECTIONS); + + final String dbUrl = parser.get("dbUrl"); + final String dbTable = parser.get("dbTable"); + final String dbUser = parser.get("dbUser"); + final String dbPwd = parser.get("dbPwd"); + + log.info("graphBasePath: '{}'", graphBasePath); + log.info("isLookUpUrl: '{}'", isLookUpUrl); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); + log.info("numPartitions: '{}'", numConnections); + log.info("dbUrl: '{}'", dbUrl); + log.info("dbUser: '{}'", dbUser); + log.info("table: '{}'", dbTable); + log.info("dbPwd: '{}'", "xxx"); + + final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization"); + final String entityPath = DedupUtility.createEntityPath(graphBasePath, "organization"); + final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation"); + + Dataset relations = createRelations(spark, mergeRelPath, relationPath, entityPath); + + final Properties connectionProperties = new Properties(); + connectionProperties.put("user", dbUser); + connectionProperties.put("password", dbPwd); + + relations + .repartition(numConnections) + .write() + .mode(SaveMode.Overwrite) + .jdbc(dbUrl, dbTable, connectionProperties); + + } + + private static boolean filterRels(Relation rel, String entityType) { + + switch (entityType) { + case "result": + if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("resultResult") + && rel.getSubRelType().equals("dedup")) + return true; + break; + case "organization": + if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("organizationOrganization") + && rel.getSubRelType().equals("dedup")) + return true; + break; + default: + return false; + } + return false; + } + + // create openorgs simrels starting from mergerels, remove the diffrels + public static Dataset createRelations( + final SparkSession spark, + final String mergeRelsPath, + final String relationPath, + final String entitiesPath) { + + // collect diffrels from the raw graph relations: <, "diffRel"> + JavaRDD, String>> diffRels = spark + .read() + .textFile(relationPath) + .map(patchRelFn(), Encoders.bean(Relation.class)) + .toJavaRDD() + .filter(r -> filterRels(r, "organization")) + // put the best id as source of the diffrel: + .map(rel -> { + if (compareIds(rel.getSource(), rel.getTarget()) < 0) + return new Tuple2<>(new Tuple2<>(rel.getSource(), rel.getTarget()), "diffRel"); + else + return new Tuple2<>(new Tuple2<>(rel.getTarget(), rel.getSource()), "diffRel"); + }) + .distinct(); + log.info("Number of DiffRels collected: {}", diffRels.count()); + + // collect all the organizations + Dataset> entities = spark + .read() + .textFile(entitiesPath) + .map( + (MapFunction>) it -> { + Organization entity = OBJECT_MAPPER.readValue(it, Organization.class); + return new Tuple2<>(entity.getId(), entity); + }, + Encoders.tuple(Encoders.STRING(), Encoders.kryo(Organization.class))); + + // relations with their group (connected component id) + JavaRDD, String>> rawOpenorgsRels = spark + .read() + .load(mergeRelsPath) + .as(Encoders.bean(Relation.class)) + .where("relClass == 'merges'") + .toJavaRDD() + .mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget())) + .filter(t -> !t._2().contains("openorgsmesh")) // remove openorgsmesh: they are only for dedup + .groupByKey() + .map(g -> Lists.newArrayList(g._2())) + .filter(l -> l.size() > 1) + .flatMap(l -> { + String groupId = "group::" + UUID.randomUUID(); + List ids = sortIds(l); // sort IDs by type + List, String>> rels = new ArrayList<>(); + String source = ids.get(0); + for (String target : ids) { + rels.add(new Tuple2<>(new Tuple2<>(source, target), groupId)); + } + + return rels.iterator(); + }); + log.info("Number of Raw Openorgs Relations created: {}", rawOpenorgsRels.count()); + + // filter out diffRels + JavaRDD> openorgsRels = rawOpenorgsRels + .union(diffRels) + // concatenation of source and target: or + .mapToPair(t -> new Tuple2<>(t._1()._1() + "@@@" + t._1()._2(), t._2())) + .groupByKey() + .map( + g -> new Tuple2<>(g._1(), StreamSupport + .stream(g._2().spliterator(), false) + .collect(Collectors.toList()))) + // : take only relations with only the group_id, it + // means they are correct. If the diffRel is present the relation has to be removed + .filter(g -> g._2().size() == 1 && g._2().get(0).contains("group::")) + .map( + t -> new Tuple3<>( + t._1().split("@@@")[0], + t._1().split("@@@")[1], + t._2().get(0))); + log.info("Number of Openorgs Relations created: '{}'", openorgsRels.count()); + + // + Dataset> relations = spark + .createDataset( + openorgsRels.rdd(), + Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.STRING())); + + // create orgsimrels + Dataset> relations2 = relations + .joinWith(entities, relations.col("_2").equalTo(entities.col("_1")), "inner") + .map( + (MapFunction, Tuple2>, OrgSimRel>) r -> new OrgSimRel( + r._1()._1(), + r._2()._2().getOriginalId().get(0), + r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "", + r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "", + r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "", + r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "", + r._2()._2().getCollectedfrom().get(0).getValue(), + r._1()._3()), + Encoders.bean(OrgSimRel.class)) + .map( + (MapFunction>) o -> new Tuple2<>(o.getLocal_id(), o), + Encoders.tuple(Encoders.STRING(), Encoders.bean(OrgSimRel.class))); + + return relations2 + .joinWith(entities, relations2.col("_1").equalTo(entities.col("_1")), "inner") + .map( + (MapFunction, Tuple2>, OrgSimRel>) r -> { + OrgSimRel orgSimRel = r._1()._2(); + orgSimRel.setLocal_id(r._2()._2().getOriginalId().get(0)); + return orgSimRel; + }, + Encoders.bean(OrgSimRel.class)); + + } + + public static int compareIds(String o1, String o2) { + if (o1.contains("openorgs____") && o2.contains("openorgs____")) + return o1.compareTo(o2); + if (o1.contains("corda") && o2.contains("corda")) + return o1.compareTo(o2); + + if (o1.contains("openorgs____")) + return -1; + if (o2.contains("openorgs____")) + return 1; + + if (o1.contains("corda")) + return -1; + if (o2.contains("corda")) + return 1; + + return o1.compareTo(o2); + } + + // Sort IDs basing on the type. Priority: 1) openorgs, 2)corda, 3)alphabetic + public static List sortIds(List ids) { + ids.sort((o1, o2) -> compareIds(o1, o2)); + return ids; + } + + public static Dataset createRelationsFromScratch( + final SparkSession spark, + final String mergeRelsPath, + final String entitiesPath) { + + // + Dataset> entities = spark + .read() + .textFile(entitiesPath) + .map( + (MapFunction>) it -> { + Organization entity = OBJECT_MAPPER.readValue(it, Organization.class); + return new Tuple2<>(entity.getId(), entity); + }, + Encoders.tuple(Encoders.STRING(), Encoders.kryo(Organization.class))); + + Dataset> relations = spark + .createDataset( + spark + .read() + .load(mergeRelsPath) + .as(Encoders.bean(Relation.class)) + .where("relClass == 'merges'") + .toJavaRDD() + .mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget())) + .groupByKey() + .flatMap(g -> { + List> rels = new ArrayList<>(); + for (String id1 : g._2()) { + for (String id2 : g._2()) { + if (!id1.equals(id2)) + if (id1.contains("openorgs____") && !id2.contains("openorgsmesh")) + rels.add(new Tuple2<>(id1, id2)); + } + } + return rels.iterator(); + }) + .rdd(), + Encoders.tuple(Encoders.STRING(), Encoders.STRING())); + + Dataset> relations2 = relations // + .joinWith(entities, relations.col("_2").equalTo(entities.col("_1")), "inner") + .map( + (MapFunction, Tuple2>, OrgSimRel>) r -> new OrgSimRel( + r._1()._1(), + r._2()._2().getOriginalId().get(0), + r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "", + r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "", + r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "", + r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "", + r._2()._2().getCollectedfrom().get(0).getValue(), + "group::" + r._1()._1()), + Encoders.bean(OrgSimRel.class)) + .map( + (MapFunction>) o -> new Tuple2<>(o.getLocal_id(), o), + Encoders.tuple(Encoders.STRING(), Encoders.bean(OrgSimRel.class))); + + return relations2 + .joinWith(entities, relations2.col("_1").equalTo(entities.col("_1")), "inner") + .map( + (MapFunction, Tuple2>, OrgSimRel>) r -> { + OrgSimRel orgSimRel = r._1()._2(); + orgSimRel.setLocal_id(r._2()._2().getOriginalId().get(0)); + return orgSimRel; + }, + Encoders.bean(OrgSimRel.class)); + + } + + private static MapFunction patchRelFn() { + return value -> { + final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); + if (rel.getDataInfo() == null) { + rel.setDataInfo(new DataInfo()); + } + return rel; + }; + } +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkRemoveDiffRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkRemoveDiffRels.java index 6f012e00a..4c0bfadf0 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkRemoveDiffRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkRemoveDiffRels.java @@ -27,6 +27,8 @@ import org.dom4j.DocumentException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Iterables; + import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.dedup.graph.ConnectedComponent; import eu.dnetlib.dhp.oa.dedup.graph.GraphProcessor; @@ -95,6 +97,9 @@ public class SparkRemoveDiffRels extends AbstractSparkAction { final String relationPath = DedupUtility.createEntityPath(graphBasePath, subEntity); + final String openorgsMergeRelsPath = DedupUtility + .createOpenorgsMergeRelsPath(workingPath, actionSetId, subEntity); + final int maxIterations = dedupConf.getWf().getMaxIterations(); log.info("Max iterations {}", maxIterations); @@ -105,67 +110,103 @@ public class SparkRemoveDiffRels extends AbstractSparkAction { .where("relClass == 'merges'") .toJavaRDD(); + System.out.println("mergeRelsRDD = " + mergeRelsRDD.count()); + +// JavaRDD, String>> diffRelsRDD = spark +// .read() +// .textFile(relationPath) +// .map(patchRelFn(), Encoders.bean(Relation.class)) +// .toJavaRDD() +// .filter(r -> filterRels(r, entity)) +// .map(rel -> { +// if (rel.getSource().compareTo(rel.getTarget()) < 0) +// return new Tuple2<>(new Tuple2<>(rel.getSource(), rel.getTarget()), "diffRel"); +// else +// return new Tuple2<>(new Tuple2<>(rel.getTarget(), rel.getSource()), "diffRel"); +// }); + // THIS IS FOR TESTING PURPOSE JavaRDD, String>> diffRelsRDD = spark .read() - .textFile(relationPath) - .map(patchRelFn(), Encoders.bean(Relation.class)) + .load(mergeRelsPath) + .as(Encoders.bean(Relation.class)) .toJavaRDD() - .filter(r -> filterRels(r, entity)) .map(rel -> { if (rel.getSource().compareTo(rel.getTarget()) < 0) return new Tuple2<>(new Tuple2<>(rel.getSource(), rel.getTarget()), "diffRel"); else return new Tuple2<>(new Tuple2<>(rel.getTarget(), rel.getSource()), "diffRel"); + }) + .distinct(); + + System.out.println("diffRelsRDD = " + diffRelsRDD.count()); + +// JavaRDD, String>> flatMergeRels = mergeRelsRDD +// .mapToPair(rel -> new Tuple2<>(rel.getSource(), rel.getTarget())) +// .groupByKey() +// .flatMap(g -> { +// List, String>> rels = new ArrayList<>(); +// +// List ids = StreamSupport +// .stream(g._2().spliterator(), false) +// .collect(Collectors.toList()); +// +// for (int i = 0; i < ids.size(); i++) { +// for (int j = i + 1; j < ids.size(); j++) { +// if (ids.get(i).compareTo(ids.get(j)) < 0) +// rels.add(new Tuple2<>(new Tuple2<>(ids.get(i), ids.get(j)), g._1())); +// else +// rels.add(new Tuple2<>(new Tuple2<>(ids.get(j), ids.get(i)), g._1())); +// } +// } +// return rels.iterator(); +// +// }); + JavaRDD, String>> mergeRels = mergeRelsRDD + .map(rel -> { + if (rel.getSource().compareTo(rel.getTarget()) < 0) + return new Tuple2<>(new Tuple2<>(rel.getSource(), rel.getTarget()), "mergeRel"); + else + return new Tuple2<>(new Tuple2<>(rel.getTarget(), rel.getSource()), "mergeRel"); }); + System.out.println("mergeRelsProcessed = " + mergeRels.count()); - JavaRDD, String>> flatMergeRels = mergeRelsRDD - .mapToPair(rel -> new Tuple2<>(rel.getSource(), rel.getTarget())) - .groupByKey() - .flatMap(g -> { - List, String>> rels = new ArrayList<>(); - - List ids = StreamSupport - .stream(g._2().spliterator(), false) - .collect(Collectors.toList()); - - for (int i = 0; i < ids.size(); i++) { - for (int j = i + 1; j < ids.size(); j++) { - if (ids.get(i).compareTo(ids.get(j)) < 0) - rels.add(new Tuple2<>(new Tuple2<>(ids.get(i), ids.get(j)), g._1())); - else - rels.add(new Tuple2<>(new Tuple2<>(ids.get(j), ids.get(i)), g._1())); - } - } - return rels.iterator(); - - }); - - JavaRDD purgedMergeRels = flatMergeRels +// JavaRDD purgedMergeRels = flatMergeRels +// .union(diffRelsRDD) +// .mapToPair(rel -> new Tuple2<>(rel._1(), Arrays.asList(rel._2()))) +// .reduceByKey((a, b) -> { +// List list = new ArrayList(); +// list.addAll(a); +// list.addAll(b); +// return list; +// }) +// .filter(rel -> rel._2().size() == 1) +// .mapToPair(rel -> new Tuple2<>(rel._2().get(0), rel._1())) +// .flatMap(rel -> { +// List> rels = new ArrayList<>(); +// String source = rel._1(); +// rels.add(new Tuple2<>(source, rel._2()._1())); +// rels.add(new Tuple2<>(source, rel._2()._2())); +// return rels.iterator(); +// }) +// .distinct() +// .flatMap(rel -> tupleToMergeRel(rel, dedupConf)); + JavaRDD purgedMergeRels = mergeRels .union(diffRelsRDD) - .mapToPair(rel -> new Tuple2<>(rel._1(), Arrays.asList(rel._2()))) - .reduceByKey((a, b) -> { - List list = new ArrayList(); - list.addAll(a); - list.addAll(b); - return list; - }) - .filter(rel -> rel._2().size() == 1) - .mapToPair(rel -> new Tuple2<>(rel._2().get(0), rel._1())) - .flatMap(rel -> { - List> rels = new ArrayList<>(); - String source = rel._1(); - rels.add(new Tuple2<>(source, rel._2()._1())); - rels.add(new Tuple2<>(source, rel._2()._2())); - return rels.iterator(); - }) - .distinct() - .flatMap(rel -> tupleToMergeRel(rel, dedupConf)); + .mapToPair(t -> new Tuple2<>(t._1()._1() + "|||" + t._1()._2(), t._2())) + .groupByKey() + .filter(g -> Iterables.size(g._2()) == 1) + .flatMap( + t -> tupleToMergeRel( + new Tuple2<>(t._1().split("\\|\\|\\|")[0], t._1().split("\\|\\|\\|")[1]), + dedupConf)); + + System.out.println("purgedMergeRels = " + purgedMergeRels.count()); spark .createDataset(purgedMergeRels.rdd(), Encoders.bean(Relation.class)) .write() .mode(SaveMode.Overwrite) - .json(mergeRelsPath); + .json(openorgsMergeRelsPath); } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/OrgSimRel.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/OrgSimRel.java new file mode 100644 index 000000000..65f383500 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/OrgSimRel.java @@ -0,0 +1,108 @@ + +package eu.dnetlib.dhp.oa.dedup.model; + +import java.io.Serializable; + +public class OrgSimRel implements Serializable { + + String local_id; + String oa_original_id; + String oa_name; + String oa_acronym; + String oa_country; + String oa_url; + String oa_collectedfrom; + String group_id; + + public OrgSimRel() { + } + + public OrgSimRel(String local_id, String oa_original_id, String oa_name, String oa_acronym, String oa_country, + String oa_url, String oa_collectedfrom, String group_id) { + this.local_id = local_id; + this.oa_original_id = oa_original_id; + this.oa_name = oa_name; + this.oa_acronym = oa_acronym; + this.oa_country = oa_country; + this.oa_url = oa_url; + this.oa_collectedfrom = oa_collectedfrom; + this.group_id = group_id; + } + + public String getLocal_id() { + return local_id; + } + + public void setLocal_id(String local_id) { + this.local_id = local_id; + } + + public String getOa_original_id() { + return oa_original_id; + } + + public void setOa_original_id(String oa_original_id) { + this.oa_original_id = oa_original_id; + } + + public String getOa_name() { + return oa_name; + } + + public void setOa_name(String oa_name) { + this.oa_name = oa_name; + } + + public String getOa_acronym() { + return oa_acronym; + } + + public void setOa_acronym(String oa_acronym) { + this.oa_acronym = oa_acronym; + } + + public String getOa_country() { + return oa_country; + } + + public void setOa_country(String oa_country) { + this.oa_country = oa_country; + } + + public String getOa_url() { + return oa_url; + } + + public void setOa_url(String oa_url) { + this.oa_url = oa_url; + } + + public String getOa_collectedfrom() { + return oa_collectedfrom; + } + + public void setOa_collectedfrom(String oa_collectedfrom) { + this.oa_collectedfrom = oa_collectedfrom; + } + + public String getGroup_id() { + return group_id; + } + + public void setGroup_id(String group_id) { + this.group_id = group_id; + } + + @Override + public String toString() { + return "OrgSimRel{" + + "local_id='" + local_id + '\'' + + ", oa_original_id='" + oa_original_id + '\'' + + ", oa_name='" + oa_name + '\'' + + ", oa_acronym='" + oa_acronym + '\'' + + ", oa_country='" + oa_country + '\'' + + ", oa_url='" + oa_url + '\'' + + ", oa_collectedfrom='" + oa_collectedfrom + '\'' + + '}'; + } +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml index 4c5505eb5..339e99084 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml @@ -79,7 +79,7 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -88,8 +88,9 @@ + - + @@ -120,7 +121,7 @@ - + yarn @@ -139,6 +140,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphBasePath${graphBasePath} + --isLookUpUrl${isLookUpUrl} --workingPath${workingPath} --actionSetId${actionSetId} --numPartitions8000 @@ -170,33 +172,6 @@ --actionSetId${actionSetId} --cutConnectedComponent${cutConnectedComponent} - - - - - - - yarn - cluster - Create Merge Relations - eu.dnetlib.dhp.oa.dedup.SparkRemoveDiffRels - dhp-dedup-openaire-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - - --graphBasePath${graphBasePath} - --isLookUpUrl${isLookUpUrl} - --actionSetId${actionSetId} - --workingPath${workingPath} - --numPartitions8000 - diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareNewOrgs_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareNewOrgs_parameters.json new file mode 100644 index 000000000..b70d1af28 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareNewOrgs_parameters.json @@ -0,0 +1,62 @@ +[ + { + "paramName": "i", + "paramLongName": "graphBasePath", + "paramDescription": "the base path of raw graph", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workingPath", + "paramDescription": "the working directory path", + "paramRequired": true + }, + { + "paramName": "la", + "paramLongName": "isLookUpUrl", + "paramDescription": "the url of the lookup service", + "paramRequired": true + }, + { + "paramName": "asi", + "paramLongName": "actionSetId", + "paramDescription": "the id of the actionset (orchestrator)", + "paramRequired": true + }, + { + "paramName": "nc", + "paramLongName": "numConnections", + "paramDescription": "number of connections to the postgres db (for the write operation)", + "paramRequired": false + }, + { + "paramName": "au", + "paramLongName": "apiUrl", + "paramDescription": "the url for the APIs of the openorgs service", + "paramRequired": false + }, + { + "paramName": "du", + "paramLongName": "dbUrl", + "paramDescription": "the url of the database", + "paramRequired": true + }, + { + "paramName": "dusr", + "paramLongName": "dbUser", + "paramDescription": "the user of the database", + "paramRequired": true + }, + { + "paramName": "t", + "paramLongName": "dbTable", + "paramDescription": "the name of the table in the database", + "paramRequired": true + }, + { + "paramName": "dpwd", + "paramLongName": "dbPwd", + "paramDescription": "the password for the user of the database", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json new file mode 100644 index 000000000..2119cbc3a --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json @@ -0,0 +1,56 @@ +[ + { + "paramName": "i", + "paramLongName": "graphBasePath", + "paramDescription": "the base path of raw graph", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workingPath", + "paramDescription": "the working directory path", + "paramRequired": true + }, + { + "paramName": "la", + "paramLongName": "isLookUpUrl", + "paramDescription": "the url of the lookup service", + "paramRequired": true + }, + { + "paramName": "asi", + "paramLongName": "actionSetId", + "paramDescription": "the id of the actionset (orchestrator)", + "paramRequired": true + }, + { + "paramName": "nc", + "paramLongName": "numConnections", + "paramDescription": "number of connections to the postgres db (for the write operation)", + "paramRequired": false + }, + { + "paramName": "du", + "paramLongName": "dbUrl", + "paramDescription": "the url of the database", + "paramRequired": true + }, + { + "paramName": "dusr", + "paramLongName": "dbUser", + "paramDescription": "the user of the database", + "paramRequired": true + }, + { + "paramName": "t", + "paramLongName": "dbTable", + "paramDescription": "the name of the table in the database", + "paramRequired": true + }, + { + "paramName": "dpwd", + "paramLongName": "dbPwd", + "paramDescription": "the password for the user of the database", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsTest.java new file mode 100644 index 000000000..f8627d023 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsTest.java @@ -0,0 +1,296 @@ + +package eu.dnetlib.dhp.oa.dedup; + +import static java.nio.file.Files.createTempDirectory; + +import static org.apache.spark.sql.functions.count; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.lenient; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.net.URISyntaxException; +import java.nio.file.Paths; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import jdk.nashorn.internal.ir.annotations.Ignore; + +@ExtendWith(MockitoExtension.class) +public class SparkOpenorgsTest implements Serializable { + + @Mock(serializable = true) + ISLookUpService isLookUpService; + + private static SparkSession spark; + private static JavaSparkContext jsc; + + private static String testGraphBasePath; + private static String testOutputBasePath; + private static String testDedupGraphBasePath; + private static final String testActionSetId = "test-orchestrator"; + + protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + @BeforeAll + public static void cleanUp() throws IOException, URISyntaxException { + + testGraphBasePath = Paths + .get(SparkOpenorgsTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI()) + .toFile() + .getAbsolutePath(); + testOutputBasePath = createTempDirectory(SparkOpenorgsTest.class.getSimpleName() + "-") + .toAbsolutePath() + .toString(); + testDedupGraphBasePath = createTempDirectory(SparkOpenorgsTest.class.getSimpleName() + "-") + .toAbsolutePath() + .toString(); + +// FileUtils.deleteDirectory(new File(testOutputBasePath)); + FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); + FileUtils.deleteDirectory(new File("/tmp/test-orchestrator/organization_openorgs_mergerels")); + + final SparkConf conf = new SparkConf(); + conf.set("spark.sql.shuffle.partitions", "200"); + spark = SparkSession + .builder() + .appName(SparkDedupTest.class.getSimpleName()) + .master("local[*]") + .config(conf) + .getOrCreate(); + + jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + } + + @BeforeEach + public void setUp() throws IOException, ISLookUpException { + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId))) + .thenReturn( + IOUtils + .toString( + SparkDedupTest.class + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_openorgs.xml"))); + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization"))) + .thenReturn( + IOUtils + .toString( + SparkDedupTest.class + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))); + } + + @Test + public void copyOpenorgsTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCopyOpenorgs.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/copyOpenorgs_parameters.json"))); + parser + .parseArgument( + new String[] { + "-i", testGraphBasePath, + "-asi", testActionSetId, + "-w", testOutputBasePath, + "-np", "50" + }); + + new SparkCopyOpenorgs(parser, spark).run(isLookUpService); + + long orgs_deduprecord = jsc + .textFile(testOutputBasePath + "/" + testActionSetId + "/organization_deduprecord") + .count(); + + assertEquals(0, orgs_deduprecord); + } + + @Test + public void copyOpenorgsMergeRels() throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCopyOpenorgsMergeRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json"))); + parser + .parseArgument( + new String[] { + "-i", testGraphBasePath, + "-asi", testActionSetId, + "-w", testOutputBasePath, + "-la", "lookupurl", + "-np", "50" + }); + + new SparkCopyOpenorgsMergeRels(parser, spark).run(isLookUpService); + + long orgs_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") + .count(); + + assertEquals(0, orgs_mergerel); + + } + + @Test + public void copyOpenorgsSimRels() throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCopyOpenorgsSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json"))); + parser + .parseArgument( + new String[] { + "-i", testGraphBasePath, + "-asi", testActionSetId, + "-w", testOutputBasePath, + "-la", "lookupurl", + "-np", "50" + }); + + new SparkCopyOpenorgsSimRels(parser, spark).run(isLookUpService); + + long orgs_simrel = spark + .read() + .textFile(testOutputBasePath + "/" + testActionSetId + "/organization_simrel") + .count(); + + System.out.println("orgs_simrel = " + orgs_simrel); + } + + @Test + public void createSimRelsTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); + parser + .parseArgument( + new String[] { + "-i", testGraphBasePath, + "-asi", testActionSetId, + "-la", "lookupurl", + "-w", "/tmp", + "-np", "50" + }); + + new SparkCreateSimRels(parser, spark).run(isLookUpService); + + long orgs_simrel = spark + .read() + .textFile("/tmp/" + testActionSetId + "/organization_simrel") + .count(); + + assertEquals(3082, orgs_simrel); + } + + @Test + public void createMergeRelsTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateMergeRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); + parser + .parseArgument( + new String[] { + "-i", + testGraphBasePath, + "-asi", + testActionSetId, + "-la", + "lookupurl", + "-w", + "/tmp" + }); + + new SparkCreateMergeRels(parser, spark).run(isLookUpService); + + long orgs_mergerel = spark + .read() + .load("/tmp/" + testActionSetId + "/organization_mergerel") + .count(); + assertEquals(1272, orgs_mergerel); + } + + @Test + public void copyRelationsNoOpenorgsTest() throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCopyRelationsNoOpenorgs.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"))); + parser + .parseArgument( + new String[] { + "-i", testGraphBasePath, + "-w", testOutputBasePath, + "-o", testDedupGraphBasePath + }); + + new SparkCopyRelationsNoOpenorgs(parser, spark).run(isLookUpService); + + long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); + +// Dataset relsRDD = spark.read().textFile(testDedupGraphBasePath + "/relation").map(patchRelFn(), Encoders.bean(Relation.class)); + + assertEquals(500, relations); + } + + @AfterAll + public static void finalCleanUp() throws IOException { + FileUtils.deleteDirectory(new File(testOutputBasePath)); + FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); + } + + private static MapFunction patchRelFn() { + return value -> { + final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); + if (rel.getDataInfo() == null) { + rel.setDataInfo(new DataInfo()); + } + return rel; + }; + } +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_openorgs.xml b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_openorgs.xml new file mode 100644 index 000000000..59b6179ed --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_openorgs.xml @@ -0,0 +1,24 @@ + +
+ + + + + +
+ + + + + + + + + + + + + + SECURITY_PARAMETERS + +
\ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java index 532bb43b2..3e5030eaa 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java @@ -612,7 +612,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i final DataInfo info = prepareDataInfo(rs); // TODO final String orgId1 = createOpenaireId(20, rs.getString("id1"), true); - final String orgId2 = createOpenaireId(40, rs.getString("id2"), true); + final String orgId2 = createOpenaireId(20, rs.getString("id2"), true); final String relClass = rs.getString("relclass"); final List collectedFrom = listKeyValues( diff --git a/pom.xml b/pom.xml index a2e2587b3..25a52064a 100644 --- a/pom.xml +++ b/pom.xml @@ -114,9 +114,6 @@ test - - -