diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OrganizationPidComparator.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OrganizationPidComparator.java index 57285fb82..3a6df2924 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OrganizationPidComparator.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OrganizationPidComparator.java @@ -13,6 +13,11 @@ public class OrganizationPidComparator implements Comparator PidType lClass = PidType.tryValueOf(left.getQualifier().getClassid()); PidType rClass = PidType.tryValueOf(right.getQualifier().getClassid()); + if (lClass.equals(PidType.openorgs)) + return -1; + if (rClass.equals(PidType.openorgs)) + return 1; + if (lClass.equals(PidType.GRID)) return -1; if (rClass.equals(PidType.GRID)) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/PidType.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/PidType.java index 62f682026..5a297be5e 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/PidType.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/PidType.java @@ -9,7 +9,7 @@ public enum PidType { doi, pmid, pmc, handle, arXiv, nct, pdb, // Organization - GRID, mag_id, urn, + openorgs, corda, corda_h2020, GRID, mag_id, urn, // Used by dedup undefined, original; diff --git a/dhp-workflows/dhp-dedup-openaire/pom.xml b/dhp-workflows/dhp-dedup-openaire/pom.xml index ff11c66e0..52cc149a9 100644 --- a/dhp-workflows/dhp-dedup-openaire/pom.xml +++ b/dhp-workflows/dhp-dedup-openaire/pom.xml @@ -94,7 +94,12 @@ org.apache.httpcomponents httpclient - + + com.h2database + h2 + 1.4.200 + test + diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java index 9a1127764..708d67f6e 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java @@ -6,7 +6,9 @@ import java.io.Serializable; import java.io.StringReader; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.SaveMode; @@ -31,6 +33,9 @@ abstract class AbstractSparkAction implements Serializable { protected static final int NUM_PARTITIONS = 1000; protected static final int NUM_CONNECTIONS = 20; + protected static final String TYPE_VALUE_SEPARATOR = "###"; + protected static final String SP_SEPARATOR = "@@@"; + protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); @@ -94,7 +99,22 @@ abstract class AbstractSparkAction implements Serializable { dataset.write().option("compression", "gzip").mode(mode).json(outPath); } + protected static void saveParquet(Dataset dataset, String outPath, SaveMode mode) { + dataset.write().option("compression", "gzip").mode(mode).parquet(outPath); + } + protected static void removeOutputDir(SparkSession spark, String path) { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } + + protected static String structuredPropertyListToString(List list) { + + return list + .stream() + .filter(p -> p.getQualifier() != null) + .filter(p -> StringUtils.isNotBlank(p.getQualifier().getClassid())) + .filter(p -> StringUtils.isNotBlank(p.getValue())) + .map(p -> p.getValue() + TYPE_VALUE_SEPARATOR + p.getQualifier().getClassid()) + .collect(Collectors.joining(SP_SEPARATOR)); + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index 99cd7c31f..fe9bd74ce 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -89,7 +89,7 @@ public class DedupRecordFactory { t -> { T duplicate = t._2(); - // prepare the list of pids to use for the id generation + // prepare the list of pids to be used for the id generation bestPids.add(Identifier.newInstance(duplicate)); entity.mergeFrom(duplicate); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java index a44d51af3..4ee8a08da 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java @@ -84,6 +84,11 @@ public class DedupUtility { return String.format("%s/%s/%s_simrel", basePath, actionSetId, entityType); } + public static String createOpenorgsMergeRelsPath( + final String basePath, final String actionSetId, final String entityType) { + return String.format("%s/%s/%s_openorgs_mergerels", basePath, actionSetId, entityType); + } + public static String createMergeRelPath( final String basePath, final String actionSetId, final String entityType) { return String.format("%s/%s/%s_mergerel", basePath, actionSetId, entityType); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java index 51e54ee4f..dd9b16790 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java @@ -36,7 +36,14 @@ public class IdGenerator implements Serializable { } private static String dedupify(String ns) { - StringBuilder prefix = new StringBuilder(substringBefore(ns, "_")).append("_dedup"); + + StringBuilder prefix; + if (PidType.valueOf(substringBefore(ns, "_")) == PidType.openorgs) { + prefix = new StringBuilder(substringBefore(ns, "_")); + } else { + prefix = new StringBuilder(substringBefore(ns, "_")).append("_dedup"); + } + while (prefix.length() < 12) { prefix.append("_"); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCollectSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCollectSimRels.java deleted file mode 100644 index f9e6448b0..000000000 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCollectSimRels.java +++ /dev/null @@ -1,184 +0,0 @@ - -package eu.dnetlib.dhp.oa.dedup; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.sql.*; -import org.dom4j.DocumentException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.DataInfo; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import eu.dnetlib.pace.config.DedupConfig; -import scala.Tuple2; - -public class SparkCollectSimRels extends AbstractSparkAction { - - private static final Logger log = LoggerFactory.getLogger(SparkCollectSimRels.class); - - Dataset simGroupsDS; - Dataset groupsDS; - - public SparkCollectSimRels(ArgumentApplicationParser parser, SparkSession spark, Dataset simGroupsDS, - Dataset groupsDS) { - super(parser, spark); - this.simGroupsDS = simGroupsDS; - this.groupsDS = groupsDS; - } - - public static void main(String[] args) throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkBlockStats.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json"))); - parser.parseArgument(args); - - SparkConf conf = new SparkConf(); - - final String dbUrl = parser.get("postgresUrl"); - final String dbUser = parser.get("postgresUser"); - final String dbPassword = parser.get("postgresPassword"); - - SparkSession spark = getSparkSession(conf); - - DataFrameReader readOptions = spark - .read() - .format("jdbc") - .option("url", dbUrl) - .option("user", dbUser) - .option("password", dbPassword); - - new SparkCollectSimRels( - parser, - spark, - readOptions.option("dbtable", "similarity_groups").load(), - readOptions.option("dbtable", "groups").load()) - .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); - } - - @Override - void run(ISLookUpService isLookUpService) throws DocumentException, ISLookUpException, IOException { - - // read oozie parameters - final String isLookUpUrl = parser.get("isLookUpUrl"); - final String actionSetId = parser.get("actionSetId"); - final String workingPath = parser.get("workingPath"); - final int numPartitions = Optional - .ofNullable(parser.get("numPartitions")) - .map(Integer::valueOf) - .orElse(NUM_PARTITIONS); - final String dbUrl = parser.get("postgresUrl"); - final String dbUser = parser.get("postgresUser"); - - log.info("numPartitions: '{}'", numPartitions); - log.info("isLookUpUrl: '{}'", isLookUpUrl); - log.info("actionSetId: '{}'", actionSetId); - log.info("workingPath: '{}'", workingPath); - log.info("postgresUser: {}", dbUser); - log.info("postgresUrl: {}", dbUrl); - log.info("postgresPassword: xxx"); - - JavaPairRDD> similarityGroup = simGroupsDS - .toJavaRDD() - .mapToPair(r -> new Tuple2<>(r.getString(0), r.getString(1))) - .groupByKey() - .mapToPair( - i -> new Tuple2<>(i._1(), StreamSupport - .stream(i._2().spliterator(), false) - .collect(Collectors.toList()))); - - JavaPairRDD groupIds = groupsDS - .toJavaRDD() - .mapToPair(r -> new Tuple2<>(r.getString(0), r.getString(1))); - - JavaRDD, List>> groups = similarityGroup - .leftOuterJoin(groupIds) - .filter(g -> g._2()._2().isPresent()) - .map(g -> new Tuple2<>(new Tuple2<>(g._1(), g._2()._2().get()), g._2()._1())); - - JavaRDD relations = groups.flatMap(g -> { - String firstId = g._2().get(0); - List rels = new ArrayList<>(); - - for (String id : g._2()) { - if (!firstId.equals(id)) - rels.add(createSimRel(firstId, id, g._1()._2())); - } - - return rels.iterator(); - }); - - Dataset resultRelations = spark - .createDataset( - relations.filter(r -> r.getRelType().equals("resultResult")).rdd(), - Encoders.bean(Relation.class)) - .repartition(numPartitions); - - Dataset organizationRelations = spark - .createDataset( - relations.filter(r -> r.getRelType().equals("organizationOrganization")).rdd(), - Encoders.bean(Relation.class)) - .repartition(numPartitions); - - for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) { - switch (dedupConf.getWf().getSubEntityValue()) { - case "organization": - savePostgresRelation(organizationRelations, workingPath, actionSetId, "organization"); - break; - default: - savePostgresRelation( - resultRelations, workingPath, actionSetId, dedupConf.getWf().getSubEntityValue()); - break; - } - } - - } - - private Relation createSimRel(String source, String target, String entity) { - final Relation r = new Relation(); - r.setSubRelType("dedupSimilarity"); - r.setRelClass("isSimilarTo"); - r.setDataInfo(new DataInfo()); - - switch (entity) { - case "result": - r.setSource("50|" + source); - r.setTarget("50|" + target); - r.setRelType("resultResult"); - break; - case "organization": - r.setSource("20|" + source); - r.setTarget("20|" + target); - r.setRelType("organizationOrganization"); - break; - default: - throw new IllegalArgumentException("unmanaged entity type: " + entity); - } - return r; - } - - private void savePostgresRelation(Dataset newRelations, String workingPath, String actionSetId, - String entityType) { - newRelations - .write() - .mode(SaveMode.Append) - .parquet(DedupUtility.createSimRelPath(workingPath, actionSetId, entityType)); - } - -} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java new file mode 100644 index 000000000..7984f0104 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java @@ -0,0 +1,104 @@ + +package eu.dnetlib.dhp.oa.dedup; + +import java.io.IOException; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.dom4j.DocumentException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; + +public class SparkCopyOpenorgs extends AbstractSparkAction { + private static final Logger log = LoggerFactory.getLogger(SparkCopyOpenorgs.class); + + public SparkCopyOpenorgs(ArgumentApplicationParser parser, SparkSession spark) { + super(parser, spark); + } + + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCopyOpenorgs.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/copyOpenorgs_parameters.json"))); + parser.parseArgument(args); + + SparkConf conf = new SparkConf(); + new SparkCopyOpenorgs(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } + + @Override + public void run(ISLookUpService isLookUpService) + throws DocumentException, IOException, ISLookUpException { + + // read oozie parameters + final String graphBasePath = parser.get("graphBasePath"); + final String actionSetId = parser.get("actionSetId"); + final String workingPath = parser.get("workingPath"); + final int numPartitions = Optional + .ofNullable(parser.get("numPartitions")) + .map(Integer::valueOf) + .orElse(NUM_PARTITIONS); + + log.info("numPartitions: '{}'", numPartitions); + log.info("graphBasePath: '{}'", graphBasePath); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); + + String subEntity = "organization"; + log.info("Copying openorgs to the working dir"); + + final String outputPath = DedupUtility.createDedupRecordPath(workingPath, actionSetId, subEntity); + + final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity); + + filterOpenorgs(spark, entityPath) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + + } + + public static Dataset filterOpenorgs( + final SparkSession spark, + final String entitiesInputPath) { + + JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + Dataset entities = spark + .createDataset( + sc + .textFile(entitiesInputPath) + .map(it -> OBJECT_MAPPER.readValue(it, Organization.class)) + .rdd(), + Encoders.bean(Organization.class)); + + log.info("Number of organization entities processed: {}", entities.count()); + + entities = entities.filter(entities.col("id").contains("openorgs____")); + + log.info("Number of Openorgs organization entities: {}", entities.count()); + + return entities; + } + +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java new file mode 100644 index 000000000..6bd1a00b9 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java @@ -0,0 +1,191 @@ + +package eu.dnetlib.dhp.oa.dedup; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.dom4j.DocumentException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.utils.DHPUtils; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.config.DedupConfig; +import net.sf.saxon.ma.trie.Tuple2; + +public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction { + private static final Logger log = LoggerFactory.getLogger(SparkCopyOpenorgsMergeRels.class); + public static final String PROVENANCE_ACTION_CLASS = "sysimport:dedup"; + public static final String DNET_PROVENANCE_ACTIONS = "dnet:provenanceActions"; + + public SparkCopyOpenorgsMergeRels(ArgumentApplicationParser parser, SparkSession spark) { + super(parser, spark); + } + + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCopyOpenorgsMergeRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json"))); + parser.parseArgument(args); + + SparkConf conf = new SparkConf(); + new SparkCopyOpenorgsMergeRels(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } + + @Override + public void run(ISLookUpService isLookUpService) + throws DocumentException, IOException, ISLookUpException { + + // read oozie parameters + final String graphBasePath = parser.get("graphBasePath"); + final String actionSetId = parser.get("actionSetId"); + final String workingPath = parser.get("workingPath"); + final int numPartitions = Optional + .ofNullable(parser.get("numPartitions")) + .map(Integer::valueOf) + .orElse(NUM_PARTITIONS); + + log.info("numPartitions: '{}'", numPartitions); + log.info("graphBasePath: '{}'", graphBasePath); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); + + log.info("Copying OpenOrgs Merge Rels"); + + final String outputPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization"); + + final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation"); + + DedupConfig dedupConf = getConfigurations(isLookUpService, actionSetId).get(0); + + JavaRDD rawRels = spark + .read() + .textFile(relationPath) + .map(patchRelFn(), Encoders.bean(Relation.class)) + .toJavaRDD() + .filter(this::isOpenorgs) + .filter(this::filterOpenorgsRels); + + JavaRDD selfRawRels = rawRels + .map(r -> r.getSource()) + .distinct() + .map(s -> rel(s, s, "isSimilarTo", dedupConf)); + + log.info("Number of raw Openorgs Relations collected: {}", rawRels.count()); + + // turn openorgs isSimilarTo relations into mergerels + JavaRDD mergeRelsRDD = rawRels + .union(selfRawRels) + .map(r -> { + r.setSource(createDedupID(r.getSource())); // create the dedup_id to align it to the openaire dedup + // format + return r; + }) + .flatMap(rel -> { + + List mergerels = new ArrayList<>(); + + mergerels.add(rel(rel.getSource(), rel.getTarget(), "merges", dedupConf)); + mergerels.add(rel(rel.getTarget(), rel.getSource(), "isMergedIn", dedupConf)); + + return mergerels.iterator(); + }); + + log.info("Number of Openorgs Merge Relations created: {}", mergeRelsRDD.count()); + + spark + .createDataset( + mergeRelsRDD.rdd(), + Encoders.bean(Relation.class)) + .write() + .mode(SaveMode.Append) + .parquet(outputPath); + } + + private static MapFunction patchRelFn() { + return value -> { + final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); + if (rel.getDataInfo() == null) { + rel.setDataInfo(new DataInfo()); + } + return rel; + }; + } + + private boolean filterOpenorgsRels(Relation rel) { + + if (rel.getRelClass().equals("isSimilarTo") && rel.getRelType().equals("organizationOrganization") + && rel.getSubRelType().equals("dedup")) + return true; + return false; + } + + private boolean isOpenorgs(Relation rel) { + + if (rel.getCollectedfrom() != null) { + for (KeyValue k : rel.getCollectedfrom()) { + if (k.getValue() != null && k.getValue().equals("OpenOrgs Database")) { + return true; + } + } + } + return false; + } + + private Relation rel(String source, String target, String relClass, DedupConfig dedupConf) { + + String entityType = dedupConf.getWf().getEntityType(); + + Relation r = new Relation(); + r.setSource(source); + r.setTarget(target); + r.setRelClass(relClass); + r.setRelType(entityType + entityType.substring(0, 1).toUpperCase() + entityType.substring(1)); + r.setSubRelType("dedup"); + + DataInfo info = new DataInfo(); + info.setDeletedbyinference(false); + info.setInferred(true); + info.setInvisible(false); + info.setInferenceprovenance(dedupConf.getWf().getConfigurationId()); + Qualifier provenanceAction = new Qualifier(); + provenanceAction.setClassid(PROVENANCE_ACTION_CLASS); + provenanceAction.setClassname(PROVENANCE_ACTION_CLASS); + provenanceAction.setSchemeid(DNET_PROVENANCE_ACTIONS); + provenanceAction.setSchemename(DNET_PROVENANCE_ACTIONS); + info.setProvenanceaction(provenanceAction); + + // TODO calculate the trust value based on the similarity score of the elements in the CC + // info.setTrust(); + + r.setDataInfo(info); + return r; + } + + public String createDedupID(String id) { + + String prefix = id.split("\\|")[0]; + return prefix + "|dedup_wf_001::" + DHPUtils.md5(id); + } +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java new file mode 100644 index 000000000..dbcd40289 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java @@ -0,0 +1,120 @@ + +package eu.dnetlib.dhp.oa.dedup; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.ForeachFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.graphx.Edge; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.dom4j.DocumentException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.config.DedupConfig; + +//copy simrels (verified) from relation to the workdir in order to make them available for the deduplication +public class SparkCopyOpenorgsSimRels extends AbstractSparkAction { + private static final Logger log = LoggerFactory.getLogger(SparkCopyOpenorgsSimRels.class); + + public SparkCopyOpenorgsSimRels(ArgumentApplicationParser parser, SparkSession spark) { + super(parser, spark); + } + + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCopyOpenorgsSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json"))); + parser.parseArgument(args); + + SparkConf conf = new SparkConf(); + new SparkCopyOpenorgsSimRels(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } + + @Override + public void run(ISLookUpService isLookUpService) + throws DocumentException, IOException, ISLookUpException { + + // read oozie parameters + final String graphBasePath = parser.get("graphBasePath"); + final String actionSetId = parser.get("actionSetId"); + final String workingPath = parser.get("workingPath"); + final int numPartitions = Optional + .ofNullable(parser.get("numPartitions")) + .map(Integer::valueOf) + .orElse(NUM_PARTITIONS); + + log.info("numPartitions: '{}'", numPartitions); + log.info("graphBasePath: '{}'", graphBasePath); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); + + log.info("Copying OpenOrgs SimRels"); + + final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, "organization"); + + final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation"); + + Dataset rawRels = spark + .read() + .textFile(relationPath) + .map(patchRelFn(), Encoders.bean(Relation.class)) + .filter(this::filterOpenorgsRels); + + saveParquet(rawRels, outputPath, SaveMode.Append); + + log.info("Copied " + rawRels.count() + " Similarity Relations"); + } + + private static MapFunction patchRelFn() { + return value -> { + final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); + if (rel.getDataInfo() == null) { + rel.setDataInfo(new DataInfo()); + } + return rel; + }; + } + + private boolean filterOpenorgsRels(Relation rel) { + + if (rel.getRelClass().equals("isSimilarTo") && rel.getRelType().equals("organizationOrganization") + && rel.getSubRelType().equals("dedup") && isOpenorgs(rel)) + return true; + return false; + } + + private boolean isOpenorgs(Relation rel) { + + if (rel.getCollectedfrom() != null) { + for (KeyValue k : rel.getCollectedfrom()) { + if (k.getValue() != null && k.getValue().equals("OpenOrgs Database")) { + return true; + } + } + } + return false; + } +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java new file mode 100644 index 000000000..71bab79d0 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java @@ -0,0 +1,110 @@ + +package eu.dnetlib.dhp.oa.dedup; + +import java.io.IOException; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.*; +import org.apache.spark.sql.Dataset; +import org.dom4j.DocumentException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.util.MapDocumentUtil; +import scala.Tuple2; + +public class SparkCopyRelationsNoOpenorgs extends AbstractSparkAction { + + private static final Logger log = LoggerFactory.getLogger(SparkCopyRelationsNoOpenorgs.class); + + public SparkCopyRelationsNoOpenorgs(ArgumentApplicationParser parser, SparkSession spark) { + super(parser, spark); + } + + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCopyRelationsNoOpenorgs.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"))); + parser.parseArgument(args); + + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + + new SparkCopyRelationsNoOpenorgs(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } + + public void run(ISLookUpService isLookUpService) throws IOException { + + final String graphBasePath = parser.get("graphBasePath"); + final String workingPath = parser.get("workingPath"); + final String dedupGraphPath = parser.get("dedupGraphPath"); + + log.info("graphBasePath: '{}'", graphBasePath); + log.info("workingPath: '{}'", workingPath); + log.info("dedupGraphPath: '{}'", dedupGraphPath); + + final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation"); + final String outputPath = DedupUtility.createEntityPath(dedupGraphPath, "relation"); + + JavaRDD simRels = spark + .read() + .textFile(relationPath) + .map(patchRelFn(), Encoders.bean(Relation.class)) + .toJavaRDD() + .filter(x -> !isOpenorgs(x)); + + log.info("Number of non-Openorgs relations collected: {}", simRels.count()); + + spark + .createDataset(simRels.rdd(), Encoders.bean(Relation.class)) + .write() + .mode(SaveMode.Overwrite) + .json(outputPath); + + } + + private static MapFunction patchRelFn() { + return value -> { + final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); + if (rel.getDataInfo() == null) { + rel.setDataInfo(new DataInfo()); + } + return rel; + }; + } + + private boolean isOpenorgs(Relation rel) { + + if (rel.getCollectedfrom() != null) { + for (KeyValue k : rel.getCollectedfrom()) { + if (k.getValue() != null && k.getValue().equals("OpenOrgs Database")) { + return true; + } + } + } + return false; + } +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index d5033d425..96693ebf0 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -10,6 +10,7 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; @@ -99,18 +100,19 @@ public class SparkCreateSimRels extends AbstractSparkAction { .createSortedBlocks(mapDocuments, dedupConf) .repartition(numPartitions); - // create relations by comparing only elements in the same group - spark + Dataset simRels = spark .createDataset( Deduper .computeRelations(sc, blocks, dedupConf) .map(t -> createSimRel(t._1(), t._2(), entity)) .repartition(numPartitions) .rdd(), - Encoders.bean(Relation.class)) - .write() - .mode(SaveMode.Append) - .parquet(outputPath); + Encoders.bean(Relation.class)); + + saveParquet(simRels, outputPath, SaveMode.Append); + + log.info("Generated " + simRels.count() + " Similarity Relations"); + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java index 9b91a545e..950676677 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java @@ -11,6 +11,7 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; @@ -23,6 +24,7 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel; import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Organization; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; @@ -31,7 +33,7 @@ import scala.Tuple2; public class SparkPrepareNewOrgs extends AbstractSparkAction { - private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class); + private static final Logger log = LoggerFactory.getLogger(SparkPrepareNewOrgs.class); public SparkPrepareNewOrgs(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); @@ -86,15 +88,18 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction { log.info("table: '{}'", dbTable); log.info("dbPwd: '{}'", "xxx"); - final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization"); final String entityPath = DedupUtility.createEntityPath(graphBasePath, "organization"); + final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization"); + final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation"); - Dataset newOrgs = createNewOrgs(spark, mergeRelPath, entityPath); + Dataset newOrgs = createNewOrgs(spark, mergeRelPath, relationPath, entityPath); final Properties connectionProperties = new Properties(); connectionProperties.put("user", dbUser); connectionProperties.put("password", dbPwd); + log.info("Number of New Organization created: '{}'", newOrgs.count()); + newOrgs .repartition(numConnections) .write() @@ -109,9 +114,27 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction { public static Dataset createNewOrgs( final SparkSession spark, final String mergeRelsPath, + final String relationPath, final String entitiesPath) { - // + // collect diffrels from the raw graph relations: + JavaPairRDD diffRels = spark + .read() + .textFile(relationPath) + .map(patchRelFn(), Encoders.bean(Relation.class)) + .toJavaRDD() + .filter(r -> filterRels(r, "organization")) + // take the worst id of the diffrel: + .mapToPair(rel -> { + if (compareIds(rel.getSource(), rel.getTarget()) > 0) + return new Tuple2<>(rel.getSource(), "diffRel"); + else + return new Tuple2<>(rel.getTarget(), "diffRel"); + }) + .distinct(); + log.info("Number of DiffRels collected: '{}'", diffRels.count()); + + // collect entities: Dataset> entities = spark .read() .textFile(entitiesPath) @@ -122,7 +145,8 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction { }, Encoders.tuple(Encoders.STRING(), Encoders.kryo(Organization.class))); - Dataset> mergerels = spark + // collect mergerels and remove ids in the diffrels + Dataset> openorgsRels = spark .createDataset( spark .read() @@ -130,18 +154,24 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction { .as(Encoders.bean(Relation.class)) .where("relClass == 'isMergedIn'") .toJavaRDD() - .mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget())) + .mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget())) // + .leftOuterJoin(diffRels) // + .filter(rel -> !rel._2()._2().isPresent()) + .mapToPair(rel -> new Tuple2<>(rel._1(), rel._2()._1())) .rdd(), Encoders.tuple(Encoders.STRING(), Encoders.STRING())); + log.info("Number of Openorgs Relations loaded: '{}'", openorgsRels.count()); return entities - .joinWith(mergerels, entities.col("_1").equalTo(mergerels.col("_1")), "left") + .joinWith(openorgsRels, entities.col("_1").equalTo(openorgsRels.col("_1")), "left") .filter((FilterFunction, Tuple2>>) t -> t._2() == null) + // take entities not in mergerels (they are single entities, therefore are new orgs) .filter( (FilterFunction, Tuple2>>) t -> !t ._1() ._1() .contains("openorgs")) + // exclude openorgs, don't need to propose them as new orgs .map( (MapFunction, Tuple2>, OrgSimRel>) r -> new OrgSimRel( "", @@ -150,7 +180,9 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction { r._1()._2().getLegalshortname() != null ? r._1()._2().getLegalshortname().getValue() : "", r._1()._2().getCountry() != null ? r._1()._2().getCountry().getClassid() : "", r._1()._2().getWebsiteurl() != null ? r._1()._2().getWebsiteurl().getValue() : "", - r._1()._2().getCollectedfrom().get(0).getValue(), ""), + r._1()._2().getCollectedfrom().get(0).getValue(), + "", + structuredPropertyListToString(r._1()._2().getPid())), Encoders.bean(OrgSimRel.class)); } @@ -167,4 +199,52 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction { } } + private static MapFunction patchRelFn() { + return value -> { + final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); + if (rel.getDataInfo() == null) { + rel.setDataInfo(new DataInfo()); + } + return rel; + }; + } + + public static int compareIds(String o1, String o2) { + if (o1.contains("openorgs____") && o2.contains("openorgs____")) + return o1.compareTo(o2); + if (o1.contains("corda") && o2.contains("corda")) + return o1.compareTo(o2); + + if (o1.contains("openorgs____")) + return -1; + if (o2.contains("openorgs____")) + return 1; + + if (o1.contains("corda")) + return -1; + if (o2.contains("corda")) + return 1; + + return o1.compareTo(o2); + } + + private static boolean filterRels(Relation rel, String entityType) { + + switch (entityType) { + case "result": + if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("resultResult") + && rel.getSubRelType().equals("dedup")) + return true; + break; + case "organization": + if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("organizationOrganization") + && rel.getSubRelType().equals("dedup")) + return true; + break; + default: + return false; + } + return false; + } + } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java index 0510b1a90..e2d9ae9c6 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java @@ -3,14 +3,18 @@ package eu.dnetlib.dhp.oa.dedup; import java.io.IOException; import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,6 +23,7 @@ import com.google.common.collect.Lists; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel; import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Organization; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; @@ -28,7 +33,7 @@ import scala.Tuple3; public class SparkPrepareOrgRels extends AbstractSparkAction { - private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class); + private static final Logger log = LoggerFactory.getLogger(SparkPrepareOrgRels.class); public SparkPrepareOrgRels(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); @@ -80,8 +85,9 @@ public class SparkPrepareOrgRels extends AbstractSparkAction { final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization"); final String entityPath = DedupUtility.createEntityPath(graphBasePath, "organization"); + final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation"); - Dataset relations = createRelations(spark, mergeRelPath, entityPath); + Dataset relations = createRelations(spark, mergeRelPath, relationPath, entityPath); final Properties connectionProperties = new Properties(); connectionProperties.put("user", dbUser); @@ -95,11 +101,50 @@ public class SparkPrepareOrgRels extends AbstractSparkAction { } + private static boolean filterRels(Relation rel, String entityType) { + + switch (entityType) { + case "result": + if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("resultResult") + && rel.getSubRelType().equals("dedup")) + return true; + break; + case "organization": + if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("organizationOrganization") + && rel.getSubRelType().equals("dedup")) + return true; + break; + default: + return false; + } + return false; + } + + // create openorgs simrels starting from mergerels, remove the diffrels public static Dataset createRelations( final SparkSession spark, final String mergeRelsPath, + final String relationPath, final String entitiesPath) { + // collect diffrels from the raw graph relations: <, "diffRel"> + JavaRDD, String>> diffRels = spark + .read() + .textFile(relationPath) + .map(patchRelFn(), Encoders.bean(Relation.class)) + .toJavaRDD() + .filter(r -> filterRels(r, "organization")) + // put the best id as source of the diffrel: + .map(rel -> { + if (compareIds(rel.getSource(), rel.getTarget()) < 0) + return new Tuple2<>(new Tuple2<>(rel.getSource(), rel.getTarget()), "diffRel"); + else + return new Tuple2<>(new Tuple2<>(rel.getTarget(), rel.getSource()), "diffRel"); + }) + .distinct(); + log.info("Number of DiffRels collected: {}", diffRels.count()); + + // collect all the organizations Dataset> entities = spark .read() .textFile(entitiesPath) @@ -110,47 +155,74 @@ public class SparkPrepareOrgRels extends AbstractSparkAction { }, Encoders.tuple(Encoders.STRING(), Encoders.kryo(Organization.class))); + // relations with their group (connected component id) + JavaRDD, String>> rawOpenorgsRels = spark + .read() + .load(mergeRelsPath) + .as(Encoders.bean(Relation.class)) + .where("relClass == 'merges'") + .toJavaRDD() + .mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget())) + .filter(t -> !t._2().contains("openorgsmesh")) // remove openorgsmesh: they are only for dedup + .groupByKey() + .map(g -> Lists.newArrayList(g._2())) + .filter(l -> l.size() > 1) + .flatMap(l -> { + String groupId = "group::" + UUID.randomUUID(); + List ids = sortIds(l); // sort IDs by type + List, String>> rels = new ArrayList<>(); + String source = ids.get(0); + for (String target : ids) { + rels.add(new Tuple2<>(new Tuple2<>(source, target), groupId)); + } + + return rels.iterator(); + }); + log.info("Number of Raw Openorgs Relations created: {}", rawOpenorgsRels.count()); + + // filter out diffRels + JavaRDD> openorgsRels = rawOpenorgsRels + .union(diffRels) + // concatenation of source and target: or + .mapToPair(t -> new Tuple2<>(t._1()._1() + "@@@" + t._1()._2(), t._2())) + .groupByKey() + .map( + g -> new Tuple2<>(g._1(), StreamSupport + .stream(g._2().spliterator(), false) + .collect(Collectors.toList()))) + // : take only relations with only the group_id, it + // means they are correct. If the diffRel is present the relation has to be removed + .filter(g -> g._2().size() == 1 && g._2().get(0).contains("group::")) + .map( + t -> new Tuple3<>( + t._1().split("@@@")[0], + t._1().split("@@@")[1], + t._2().get(0))); + log.info("Number of Openorgs Relations created: '{}'", openorgsRels.count()); + + // Dataset> relations = spark .createDataset( - spark - .read() - .load(mergeRelsPath) - .as(Encoders.bean(Relation.class)) - .where("relClass == 'merges'") - .toJavaRDD() - .mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget())) - .filter(t -> !t._2().contains("openorgsmesh")) - .groupByKey() - .map(g -> Lists.newArrayList(g._2())) - .filter(l -> l.size() > 1) - .flatMap(l -> { - String groupId = "group::" + UUID.randomUUID(); - List ids = sortIds(l); - List> rels = new ArrayList<>(); - - for (String source : ids) { - if (source.contains("openorgs____") || ids.indexOf(source) == 0) - for (String target : ids) { - rels.add(new Tuple3<>(source, target, groupId)); - } - } - return rels.iterator(); - }) - .rdd(), + openorgsRels.rdd(), Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.STRING())); - Dataset> relations2 = relations // + // create orgsimrels + Dataset> relations2 = relations .joinWith(entities, relations.col("_2").equalTo(entities.col("_1")), "inner") .map( - (MapFunction, Tuple2>, OrgSimRel>) r -> new OrgSimRel( - r._1()._1(), - r._2()._2().getOriginalId().get(0), - r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "", - r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "", - r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "", - r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "", - r._2()._2().getCollectedfrom().get(0).getValue(), - r._1()._3()), + (MapFunction, Tuple2>, OrgSimRel>) r -> { + + return new OrgSimRel( + r._1()._1(), + r._2()._2().getOriginalId().get(0), + r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "", + r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "", + r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "", + r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "", + r._2()._2().getCollectedfrom().get(0).getValue(), + r._1()._3(), + structuredPropertyListToString(r._2()._2().getPid())); + }, Encoders.bean(OrgSimRel.class)) .map( (MapFunction>) o -> new Tuple2<>(o.getLocal_id(), o), @@ -168,29 +240,28 @@ public class SparkPrepareOrgRels extends AbstractSparkAction { } - // select best ids from the list. Priority: 1) openorgs, 2)corda, 3)alphabetic - public static List sortIds(List ids) { - - ids.sort((o1, o2) -> { - - if (o1.contains("openorgs____") && o2.contains("openorgs____")) - return o1.compareTo(o2); - if (o1.contains("corda") && o2.contains("corda")) - return o1.compareTo(o2); - - if (o1.contains("openorgs____")) - return -1; - if (o2.contains("openorgs____")) - return 1; - - if (o1.contains("corda")) - return -1; - if (o2.contains("corda")) - return 1; - + public static int compareIds(String o1, String o2) { + if (o1.contains("openorgs____") && o2.contains("openorgs____")) + return o1.compareTo(o2); + if (o1.contains("corda") && o2.contains("corda")) return o1.compareTo(o2); - }); + if (o1.contains("openorgs____")) + return -1; + if (o2.contains("openorgs____")) + return 1; + + if (o1.contains("corda")) + return -1; + if (o2.contains("corda")) + return 1; + + return o1.compareTo(o2); + } + + // Sort IDs basing on the type. Priority: 1) openorgs, 2)corda, 3)alphabetic + public static List sortIds(List ids) { + ids.sort((o1, o2) -> compareIds(o1, o2)); return ids; } @@ -245,7 +316,8 @@ public class SparkPrepareOrgRels extends AbstractSparkAction { r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "", r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "", r._2()._2().getCollectedfrom().get(0).getValue(), - "group::" + r._1()._1()), + "group::" + r._1()._1(), + structuredPropertyListToString(r._2()._2().getPid())), Encoders.bean(OrgSimRel.class)) .map( (MapFunction>) o -> new Tuple2<>(o.getLocal_id(), o), @@ -263,4 +335,13 @@ public class SparkPrepareOrgRels extends AbstractSparkAction { } + private static MapFunction patchRelFn() { + return value -> { + final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); + if (rel.getDataInfo() == null) { + rel.setDataInfo(new DataInfo()); + } + return rel; + }; + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java index 779fb91d6..5ebc00d5a 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java @@ -13,6 +13,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -22,11 +23,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.DataInfo; -import eu.dnetlib.dhp.schema.oaf.Oaf; -import eu.dnetlib.dhp.schema.oaf.OafEntity; -import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.PidType; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.util.MapDocumentUtil; @@ -103,12 +103,22 @@ public class SparkUpdateEntity extends AbstractSparkAction { MapDocumentUtil.getJPathString(IDJSONPATH, s), s)); JavaRDD map = entitiesWithId .leftOuterJoin(mergedIds) - .map( - k -> k._2()._2().isPresent() - ? updateDeletedByInference(k._2()._1(), clazz) - : k._2()._1()); + .map(k -> { + if (k._2()._2().isPresent()) { + return updateDeletedByInference(k._2()._1(), clazz); + } + return k._2()._1(); + }); + + if (type == EntityType.organization) // exclude openorgs with deletedbyinference=true + map = map.filter(it -> { + Organization org = OBJECT_MAPPER.readValue(it, Organization.class); + return !org.getId().contains("openorgs____") || (org.getId().contains("openorgs____") + && !org.getDataInfo().getDeletedbyinference()); + }); sourceEntity = map.union(sc.textFile(dedupRecordPath)); + } sourceEntity.saveAsTextFile(outputPath, GzipCodec.class); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/OrgSimRel.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/OrgSimRel.java index 65f383500..adff1ab8a 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/OrgSimRel.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/OrgSimRel.java @@ -13,12 +13,13 @@ public class OrgSimRel implements Serializable { String oa_url; String oa_collectedfrom; String group_id; + String pid_list; // separator for type-pid: "###"; separator for pids: "@@@" public OrgSimRel() { } public OrgSimRel(String local_id, String oa_original_id, String oa_name, String oa_acronym, String oa_country, - String oa_url, String oa_collectedfrom, String group_id) { + String oa_url, String oa_collectedfrom, String group_id, String pid_list) { this.local_id = local_id; this.oa_original_id = oa_original_id; this.oa_name = oa_name; @@ -27,6 +28,7 @@ public class OrgSimRel implements Serializable { this.oa_url = oa_url; this.oa_collectedfrom = oa_collectedfrom; this.group_id = group_id; + this.pid_list = pid_list; } public String getLocal_id() { @@ -93,6 +95,14 @@ public class OrgSimRel implements Serializable { this.group_id = group_id; } + public String getPid_list() { + return pid_list; + } + + public void setPid_list(String pid_list) { + this.pid_list = pid_list; + } + @Override public String toString() { return "OrgSimRel{" + @@ -103,6 +113,8 @@ public class OrgSimRel implements Serializable { ", oa_country='" + oa_country + '\'' + ", oa_url='" + oa_url + '\'' + ", oa_collectedfrom='" + oa_collectedfrom + '\'' + + ", group_id='" + group_id + '\'' + + ", pid_list='" + pid_list + '\'' + '}'; } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json similarity index 56% rename from dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json rename to dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json index da1011371..75054637f 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json @@ -1,44 +1,32 @@ [ - { - "paramName": "la", - "paramLongName": "isLookUpUrl", - "paramDescription": "address for the LookUp", - "paramRequired": true - }, { "paramName": "asi", "paramLongName": "actionSetId", "paramDescription": "action set identifier (name of the orchestrator)", "paramRequired": true }, + { + "paramName": "i", + "paramLongName": "graphBasePath", + "paramDescription": "the base path of the raw graph", + "paramRequired": true + }, { "paramName": "w", "paramLongName": "workingPath", "paramDescription": "path of the working directory", "paramRequired": true }, + { + "paramName": "la", + "paramLongName": "isLookUpUrl", + "paramDescription": "the url for the lookup service", + "paramRequired": true + }, { "paramName": "np", "paramLongName": "numPartitions", "paramDescription": "number of partitions for the similarity relations intermediate phases", "paramRequired": false - }, - { - "paramName": "purl", - "paramLongName": "postgresUrl", - "paramDescription": "the url of the postgres server", - "paramRequired": true - }, - { - "paramName": "pusr", - "paramLongName": "postgresUser", - "paramDescription": "the owner of the postgres database", - "paramRequired": true - }, - { - "paramName": "ppwd", - "paramLongName": "postgresPassword", - "paramDescription": "the password for the postgres user", - "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/copyOpenorgs_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/copyOpenorgs_parameters.json new file mode 100644 index 000000000..e45efca01 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/copyOpenorgs_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "asi", + "paramLongName": "actionSetId", + "paramDescription": "action set identifier (name of the orchestrator)", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "graphBasePath", + "paramDescription": "the base path of the raw graph", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workingPath", + "paramDescription": "path of the working directory", + "paramRequired": true + }, + { + "paramName": "np", + "paramLongName": "numPartitions", + "paramDescription": "number of partitions for the similarity relations intermediate phases", + "paramRequired": false + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/neworgs/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/neworgs/oozie_app/workflow.xml deleted file mode 100644 index 9bfdaaebd..000000000 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/neworgs/oozie_app/workflow.xml +++ /dev/null @@ -1,208 +0,0 @@ - - - - graphBasePath - the raw graph base path - - - isLookUpUrl - the address of the lookUp service - - - actionSetId - id of the actionSet - - - workingPath - path for the working directory - - - dedupGraphPath - path for the output graph - - - cutConnectedComponent - max number of elements in a connected component - - - dbUrl - the url of the database - - - dbUser - the user of the database - - - dbTable - the name of the table in the database - - - dbPwd - the passowrd of the user of the database - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - oozieActionShareLibForSpark2 - oozie action sharelib for spark 2.* - - - spark2ExtraListeners - com.cloudera.spark.lineage.NavigatorAppListener - spark 2.* extra listeners classname - - - spark2SqlQueryExecutionListeners - com.cloudera.spark.lineage.NavigatorQueryListener - spark 2.* sql query execution listeners classname - - - spark2YarnHistoryServerAddress - spark 2.* yarn history server address - - - spark2EventLogDir - spark 2.* event log dir location - - - - - ${jobTracker} - ${nameNode} - - - mapreduce.job.queuename - ${queueName} - - - oozie.launcher.mapred.job.queue.name - ${oozieLauncherQueueName} - - - oozie.action.sharelib.for.spark - ${oozieActionShareLibForSpark2} - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - - - - - -pb - ${graphBasePath}/relation - ${workingPath}/${actionSetId}/organization_simrel - - - - - - - - yarn - cluster - Create Similarity Relations - eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels - dhp-dedup-openaire-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - - --graphBasePath${graphBasePath} - --isLookUpUrl${isLookUpUrl} - --actionSetId${actionSetId} - --workingPath${workingPath} - --numPartitions8000 - - - - - - - - yarn - cluster - Create Merge Relations - eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels - dhp-dedup-openaire-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - - --graphBasePath${graphBasePath} - --workingPath${workingPath} - --isLookUpUrl${isLookUpUrl} - --actionSetId${actionSetId} - --cutConnectedComponent${cutConnectedComponent} - - - - - - - - yarn - cluster - Prepare New Organizations - eu.dnetlib.dhp.oa.dedup.SparkPrepareNewOrgs - dhp-dedup-openaire-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - - --graphBasePath${graphBasePath} - --workingPath${workingPath} - --isLookUpUrl${isLookUpUrl} - --actionSetId${actionSetId} - --dbUrl${dbUrl} - --dbTable${dbTable} - --dbUser${dbUser} - --dbPwd${dbPwd} - --numConnections20 - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/neworgs/oozie_app/config-default.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/neworgs/oozie_app/config-default.xml rename to dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml similarity index 80% rename from dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/workflow.xml rename to dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml index e7c95ee8d..dc63d0a79 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - + graphBasePath @@ -24,22 +24,6 @@ cutConnectedComponent max number of elements in a connected component - - dbUrl - the url of the database - - - dbUser - the user of the database - - - dbTable - the name of the table in the database - - - dbPwd - the passowrd of the user of the database - sparkDriverMemory memory for driver process @@ -95,31 +79,22 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + - + + - + - - - -pb - /tmp/graph_openorgs_and_corda/relation - ${workingPath}/${actionSetId}/organization_simrel - - - - - - + yarn cluster @@ -138,15 +113,43 @@ --graphBasePath${graphBasePath} --isLookUpUrl${isLookUpUrl} - --actionSetId${actionSetId} + --actionSetId${actionSetIdOpenorgs} --workingPath${workingPath} --numPartitions8000 - + - + + + + yarn + cluster + Copy OpenOrgs Sim Rels + eu.dnetlib.dhp.oa.dedup.SparkCopyOpenorgsSimRels + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphBasePath${graphBasePath} + --isLookUpUrl${isLookUpUrl} + --workingPath${workingPath} + --actionSetId${actionSetIdOpenorgs} + --numPartitions8000 + + + + + + yarn cluster @@ -166,7 +169,7 @@ --graphBasePath${graphBasePath} --workingPath${workingPath} --isLookUpUrl${isLookUpUrl} - --actionSetId${actionSetId} + --actionSetId${actionSetIdOpenorgs} --cutConnectedComponent${cutConnectedComponent} @@ -193,7 +196,7 @@ --graphBasePath${graphBasePath} --workingPath${workingPath} --isLookUpUrl${isLookUpUrl} - --actionSetId${actionSetId} + --actionSetId${actionSetIdOpenorgs} --dbUrl${dbUrl} --dbTable${dbTable} --dbUser${dbUser} @@ -224,7 +227,7 @@ --graphBasePath${graphBasePath} --workingPath${workingPath} --isLookUpUrl${isLookUpUrl} - --actionSetId${actionSetId} + --actionSetId${actionSetIdOpenorgs} --apiUrl${apiUrl} --dbUrl${dbUrl} --dbTable${dbTable} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml index c42ce1263..4b39cb56a 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml @@ -12,6 +12,10 @@ actionSetId id of the actionSet + + actionSetIdOpenorgs + id of the actionSet for OpenOrgs dedup + workingPath path for the working directory @@ -79,7 +83,7 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -88,11 +92,21 @@ + + + + + + + + + + yarn @@ -169,10 +183,91 @@ --isLookUpUrl${isLookUpUrl} --actionSetId${actionSetId} + + + + + + + + yarn + cluster + Copy Openorgs Merge Relations + eu.dnetlib.dhp.oa.dedup.SparkCopyOpenorgsMergeRels + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphBasePath${graphBasePath} + --workingPath${workingPath} + --isLookUpUrl${isLookUpUrl} + --actionSetId${actionSetIdOpenorgs} + --numPartitions8000 + + + + + + + + yarn + cluster + Create Organizations Dedup Records + eu.dnetlib.dhp.oa.dedup.SparkCreateDedupRecord + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphBasePath${graphBasePath} + --workingPath${workingPath} + --isLookUpUrl${isLookUpUrl} + --actionSetId${actionSetIdOpenorgs} + + + + + + + + + + + + + + + + + + + + + + + + + + + + yarn @@ -198,15 +293,28 @@ + - - - - - -pb - ${graphBasePath}/relation - ${dedupGraphPath}/relation - + + yarn + cluster + Copy Non-Openorgs Relations + eu.dnetlib.dhp.oa.dedup.SparkCopyRelationsNoOpenorgs + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphBasePath${graphBasePath} + --workingPath${workingPath} + --dedupGraphPath${dedupGraphPath} + diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java index 3f10af5b8..787295c41 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java @@ -112,7 +112,7 @@ public class EntityMergerTest implements Serializable { assertEquals("2018-09-30", pub_merged.getDateofacceptance().getValue()); // verify authors - assertEquals(9, pub_merged.getAuthor().size()); + assertEquals(13, pub_merged.getAuthor().size()); assertEquals(4, AuthorMerger.countAuthorsPids(pub_merged.getAuthor())); // verify title diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/IdGeneratorTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/IdGeneratorTest.java index a6604dd30..6b0b8dfa2 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/IdGeneratorTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/IdGeneratorTest.java @@ -36,6 +36,8 @@ public class IdGeneratorTest { private static List> bestIds2; private static List> bestIds3; + private static List> bestIdsOrg; + private static String testEntityBasePath; @BeforeAll @@ -48,6 +50,8 @@ public class IdGeneratorTest { bestIds = createBestIds(testEntityBasePath + "/publication_idgeneration.json", Publication.class); bestIds2 = createBestIds(testEntityBasePath + "/publication_idgeneration2.json", Publication.class); bestIds3 = createBestIds(testEntityBasePath + "/publication_idgeneration3.json", Publication.class); + + bestIdsOrg = createBestIds(testEntityBasePath + "/organization_idgeneration.json", Organization.class); } @Test @@ -76,6 +80,13 @@ public class IdGeneratorTest { assertEquals("50|dedup_wf_001::0829b5191605bdbea36d6502b8c1ce1g", id2); } + @Test + public void generateIdOrganizationTest() { + String id1 = IdGenerator.generate(bestIdsOrg, "20|defaultID"); + + assertEquals("20|openorgs____::599c15a70fcb03be6ba08f75f14d6076", id1); + } + protected static List> createBestIds(String path, Class clazz) { final Stream> ids = readSample(path, clazz) .stream() diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index c706061a0..851e72dee 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -174,27 +174,27 @@ public class SparkDedupTest implements Serializable { long orgs_simrel = spark .read() - .load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel") + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization")) .count(); long pubs_simrel = spark .read() - .load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel") + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "publication")) .count(); long sw_simrel = spark .read() - .load(testOutputBasePath + "/" + testActionSetId + "/software_simrel") + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "software")) .count(); long ds_simrel = spark .read() - .load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel") + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "dataset")) .count(); long orp_simrel = spark .read() - .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel") + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "otherresearchproduct")) .count(); assertEquals(3082, orgs_simrel); @@ -206,67 +206,6 @@ public class SparkDedupTest implements Serializable { @Test @Order(2) - public void collectSimRelsTest() throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCollectSimRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json"))); - parser - .parseArgument( - new String[] { - "-asi", testActionSetId, - "-la", "lookupurl", - "-w", testOutputBasePath, - "-np", "50", - "-purl", "jdbc:postgresql://localhost:5432/dnet_dedup", - "-pusr", "postgres_user", - "-ppwd", "" - }); - - new SparkCollectSimRels( - parser, - spark, - spark.read().load(testDedupAssertionsBasePath + "/similarity_groups"), - spark.read().load(testDedupAssertionsBasePath + "/groups")) - .run(isLookUpService); - - long orgs_simrel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel") - .count(); - - long pubs_simrel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel") - .count(); - - long sw_simrel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/software_simrel") - .count(); - - long ds_simrel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel") - .count(); - - long orp_simrel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel") - .count(); - - assertEquals(3672, orgs_simrel); - assertEquals(10459, pubs_simrel); - assertEquals(3767, sw_simrel); - assertEquals(3865, ds_simrel); - assertEquals(10173, orp_simrel); - - } - - @Test - @Order(3) public void cutMergeRelsTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -362,7 +301,7 @@ public class SparkDedupTest implements Serializable { } @Test - @Order(4) + @Order(3) public void createMergeRelsTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -417,7 +356,7 @@ public class SparkDedupTest implements Serializable { } @Test - @Order(5) + @Order(4) public void createDedupRecordTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -456,7 +395,7 @@ public class SparkDedupTest implements Serializable { testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_deduprecord") .count(); - assertEquals(84, orgs_deduprecord); + assertEquals(85, orgs_deduprecord); assertEquals(65, pubs_deduprecord); assertEquals(51, sw_deduprecord); assertEquals(97, ds_deduprecord); @@ -464,7 +403,7 @@ public class SparkDedupTest implements Serializable { } @Test - @Order(6) + @Order(5) public void updateEntityTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -540,7 +479,7 @@ public class SparkDedupTest implements Serializable { .count(); assertEquals(896, publications); - assertEquals(837, organizations); + assertEquals(838, organizations); assertEquals(100, projects); assertEquals(100, datasource); assertEquals(200, softwares); @@ -580,7 +519,7 @@ public class SparkDedupTest implements Serializable { } @Test - @Order(7) + @Order(6) public void propagateRelationTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -630,7 +569,7 @@ public class SparkDedupTest implements Serializable { } @Test - @Order(8) + @Order(7) public void testRelations() throws Exception { testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10); testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2); diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsDedupTest.java new file mode 100644 index 000000000..f33eca57f --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsDedupTest.java @@ -0,0 +1,408 @@ + +package eu.dnetlib.dhp.oa.dedup; + +import static java.nio.file.Files.createTempDirectory; + +import static org.apache.spark.sql.functions.count; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.lenient; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.ForeachFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.util.CollectionsUtils; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.platform.commons.util.StringUtils; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.OafMapperUtils; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.utils.DHPUtils; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.util.MapDocumentUtil; +import scala.Tuple2; + +@ExtendWith(MockitoExtension.class) +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class SparkOpenorgsDedupTest implements Serializable { + + private static String dbUrl = "jdbc:h2:mem:openorgs_test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false"; + private static String dbUser = "sa"; + private static String dbTable = "tmp_dedup_events"; + private static String dbPwd = ""; + + @Mock(serializable = true) + ISLookUpService isLookUpService; + + protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + private static SparkSession spark; + private static JavaSparkContext jsc; + + private static String testGraphBasePath; + private static String testOutputBasePath; + private static String testDedupGraphBasePath; + private static final String testActionSetId = "test-orchestrator-openorgs"; + + @BeforeAll + public static void cleanUp() throws IOException, URISyntaxException { + + testGraphBasePath = Paths + .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/openorgs_dedup").toURI()) + .toFile() + .getAbsolutePath(); + testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") + .toAbsolutePath() + .toString(); + testDedupGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") + .toAbsolutePath() + .toString(); + + FileUtils.deleteDirectory(new File(testOutputBasePath)); + FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); + + final SparkConf conf = new SparkConf(); + conf.set("spark.sql.shuffle.partitions", "200"); + spark = SparkSession + .builder() + .appName(SparkDedupTest.class.getSimpleName()) + .master("local[*]") + .config(conf) + .getOrCreate(); + + jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + } + + @BeforeEach + public void setUp() throws IOException, ISLookUpException { + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId))) + .thenReturn( + IOUtils + .toString( + SparkDedupTest.class + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_openorgs.xml"))); + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization"))) + .thenReturn( + IOUtils + .toString( + SparkDedupTest.class + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))); + } + + @Test + @Order(1) + public void createSimRelsTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); + + parser + .parseArgument( + new String[] { + "-i", testGraphBasePath, + "-asi", testActionSetId, + "-la", "lookupurl", + "-w", testOutputBasePath, + "-np", "50" + }); + + new SparkCreateSimRels(parser, spark).run(isLookUpService); + + long orgs_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization")) + .count(); + + assertEquals(288, orgs_simrel); + } + + @Test + @Order(2) + public void copyOpenorgsSimRels() throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCopyOpenorgsSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json"))); + parser + .parseArgument( + new String[] { + "-i", testGraphBasePath, + "-asi", testActionSetId, + "-w", testOutputBasePath, + "-la", "lookupurl", + "-np", "50" + }); + + new SparkCopyOpenorgsSimRels(parser, spark).run(isLookUpService); + + long orgs_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization")) + .count(); + + assertEquals(324, orgs_simrel); + } + + @Test + @Order(3) + public void createMergeRelsTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateMergeRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); + + parser + .parseArgument( + new String[] { + "-i", + testGraphBasePath, + "-asi", + testActionSetId, + "-la", + "lookupurl", + "-w", + testOutputBasePath + }); + + new SparkCreateMergeRels(parser, spark).run(isLookUpService); + + long orgs_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") + .count(); + assertEquals(132, orgs_mergerel); + + // verify that a DiffRel is in the mergerels (to be sure that the job supposed to remove them has something to + // do) + List diffRels = jsc + .textFile(DedupUtility.createEntityPath(testGraphBasePath, "relation")) + .map(s -> OBJECT_MAPPER.readValue(s, Relation.class)) + .filter(r -> r.getRelClass().equals("isDifferentFrom")) + .map(r -> r.getTarget()) + .collect(); + assertEquals(18, diffRels.size()); + + List mergeRels = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") + .as(Encoders.bean(Relation.class)) + .toJavaRDD() + .map(r -> r.getTarget()) + .collect(); + assertFalse(Collections.disjoint(mergeRels, diffRels)); + + } + + @Test + @Order(4) + public void prepareOrgRelsTest() throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json"))); + parser + .parseArgument( + new String[] { + "-i", + testGraphBasePath, + "-asi", + testActionSetId, + "-la", + "lookupurl", + "-w", + testOutputBasePath, + "-du", + dbUrl, + "-dusr", + dbUser, + "-t", + dbTable, + "-dpwd", + dbPwd + }); + + new SparkPrepareOrgRels(parser, spark).run(isLookUpService); + + final Properties connectionProperties = new Properties(); + connectionProperties.put("user", dbUser); + connectionProperties.put("password", dbPwd); + + Connection connection = DriverManager.getConnection(dbUrl, connectionProperties); + + ResultSet resultSet = connection + .prepareStatement("SELECT COUNT(*) as total_rels FROM " + dbTable) + .executeQuery(); + if (resultSet.next()) { + int total_rels = resultSet.getInt("total_rels"); + assertEquals(32, total_rels); + } else + fail("No result in the sql DB"); + resultSet.close(); + + // verify the number of organizations with duplicates + ResultSet resultSet2 = connection + .prepareStatement("SELECT COUNT(DISTINCT(local_id)) as total_orgs FROM " + dbTable) + .executeQuery(); + if (resultSet2.next()) { + int total_orgs = resultSet2.getInt("total_orgs"); + assertEquals(6, total_orgs); + } else + fail("No result in the sql DB"); + resultSet2.close(); + + // verify that no DiffRel is in the DB + List diffRels = jsc + .textFile(DedupUtility.createEntityPath(testGraphBasePath, "relation")) + .map(s -> OBJECT_MAPPER.readValue(s, Relation.class)) + .filter(r -> r.getRelClass().equals("isDifferentFrom")) + .map(r -> r.getSource() + "@@@" + r.getTarget()) + .collect(); + + List dbRels = new ArrayList<>(); + ResultSet resultSet3 = connection + .prepareStatement("SELECT local_id, oa_original_id FROM " + dbTable) + .executeQuery(); + while (resultSet3.next()) { + String source = OafMapperUtils.createOpenaireId("organization", resultSet3.getString("local_id"), true); + String target = OafMapperUtils + .createOpenaireId("organization", resultSet3.getString("oa_original_id"), true); + dbRels.add(source + "@@@" + target); + } + resultSet3.close(); + assertTrue(Collections.disjoint(dbRels, diffRels)); + + connection.close(); + } + + @Test + @Order(5) + public void prepareNewOrgsTest() throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/prepareNewOrgs_parameters.json"))); + parser + .parseArgument( + new String[] { + "-i", + testGraphBasePath, + "-asi", + testActionSetId, + "-la", + "lookupurl", + "-w", + testOutputBasePath, + "-du", + dbUrl, + "-dusr", + dbUser, + "-t", + dbTable, + "-dpwd", + dbPwd + }); + + new SparkPrepareNewOrgs(parser, spark).run(isLookUpService); + + final Properties connectionProperties = new Properties(); + connectionProperties.put("user", dbUser); + connectionProperties.put("password", dbPwd); + + long orgs_in_diffrel = jsc + .textFile(DedupUtility.createEntityPath(testGraphBasePath, "relation")) + .map(s -> OBJECT_MAPPER.readValue(s, Relation.class)) + .filter(r -> r.getRelClass().equals("isDifferentFrom")) + .map(r -> r.getTarget()) + .distinct() + .count(); + + Connection connection = DriverManager.getConnection(dbUrl, connectionProperties); + + jsc + .textFile(DedupUtility.createEntityPath(testGraphBasePath, "relation")) + .map(s -> OBJECT_MAPPER.readValue(s, Relation.class)) + .filter(r -> r.getRelClass().equals("isDifferentFrom")) + .map(r -> r.getTarget()) + .distinct() + .foreach(s -> System.out.println("difforgs = " + s)); + ResultSet resultSet0 = connection + .prepareStatement("SELECT oa_original_id FROM " + dbTable + " WHERE local_id = ''") + .executeQuery(); + while (resultSet0.next()) + System.out + .println( + "dborgs = " + OafMapperUtils.createOpenaireId(20, resultSet0.getString("oa_original_id"), true)); + resultSet0.close(); + + ResultSet resultSet = connection + .prepareStatement("SELECT COUNT(*) as total_new_orgs FROM " + dbTable + " WHERE local_id = ''") + .executeQuery(); + if (resultSet.next()) { + int total_new_orgs = resultSet.getInt("total_new_orgs"); + assertEquals(orgs_in_diffrel + 1, total_new_orgs); + } else + fail("No result in the sql DB"); + resultSet.close(); + } + + @AfterAll + public static void finalCleanUp() throws IOException { + FileUtils.deleteDirectory(new File(testOutputBasePath)); + FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); + } + +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsTest.java new file mode 100644 index 000000000..7aaed3de7 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsTest.java @@ -0,0 +1,236 @@ + +package eu.dnetlib.dhp.oa.dedup; + +import static java.nio.file.Files.createTempDirectory; + +import static org.apache.spark.sql.functions.count; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.lenient; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.net.URISyntaxException; +import java.nio.file.Paths; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.ForeachFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import jdk.nashorn.internal.ir.annotations.Ignore; + +@ExtendWith(MockitoExtension.class) +public class SparkOpenorgsTest implements Serializable { + + @Mock(serializable = true) + ISLookUpService isLookUpService; + + private static SparkSession spark; + private static JavaSparkContext jsc; + + private static String testGraphBasePath; + private static String testOutputBasePath; + private static String testDedupGraphBasePath; + private static final String testActionSetId = "test-orchestrator"; + + protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + @BeforeAll + public static void cleanUp() throws IOException, URISyntaxException { + + testGraphBasePath = Paths + .get(SparkOpenorgsTest.class.getResource("/eu/dnetlib/dhp/dedup/openorgs").toURI()) + .toFile() + .getAbsolutePath(); + testOutputBasePath = createTempDirectory(SparkOpenorgsTest.class.getSimpleName() + "-") + .toAbsolutePath() + .toString(); + testDedupGraphBasePath = createTempDirectory(SparkOpenorgsTest.class.getSimpleName() + "-") + .toAbsolutePath() + .toString(); + + FileUtils.deleteDirectory(new File(testOutputBasePath)); + FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); + + final SparkConf conf = new SparkConf(); + conf.set("spark.sql.shuffle.partitions", "200"); + spark = SparkSession + .builder() + .appName(SparkDedupTest.class.getSimpleName()) + .master("local[*]") + .config(conf) + .getOrCreate(); + + jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + } + + @BeforeEach + public void setUp() throws IOException, ISLookUpException { + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId))) + .thenReturn( + IOUtils + .toString( + SparkDedupTest.class + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_openorgs.xml"))); + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization"))) + .thenReturn( + IOUtils + .toString( + SparkDedupTest.class + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))); + } + + @Disabled + @Test + public void copyOpenorgsTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCopyOpenorgs.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/copyOpenorgs_parameters.json"))); + parser + .parseArgument( + new String[] { + "-i", testGraphBasePath, + "-asi", testActionSetId, + "-w", testOutputBasePath, + "-np", "50" + }); + + new SparkCopyOpenorgs(parser, spark).run(isLookUpService); + + long orgs_deduprecord = jsc + .textFile(testOutputBasePath + "/" + testActionSetId + "/organization_deduprecord") + .count(); + + assertEquals(100, orgs_deduprecord); + } + + @Test + public void copyOpenorgsMergeRels() throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCopyOpenorgsMergeRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json"))); + parser + .parseArgument( + new String[] { + "-i", testGraphBasePath, + "-asi", testActionSetId, + "-w", testOutputBasePath, + "-la", "lookupurl", + "-np", "50" + }); + + new SparkCopyOpenorgsMergeRels(parser, spark).run(isLookUpService); + + long orgs_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") + .count(); + + assertEquals(384, orgs_mergerel); + + } + + @Test + public void copyOpenorgsSimRels() throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCopyOpenorgsSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json"))); + parser + .parseArgument( + new String[] { + "-i", testGraphBasePath, + "-asi", testActionSetId, + "-w", testOutputBasePath, + "-la", "lookupurl", + "-np", "50" + }); + + new SparkCopyOpenorgsSimRels(parser, spark).run(isLookUpService); + + long orgs_simrel = spark + .read() + .textFile(testOutputBasePath + "/" + testActionSetId + "/organization_simrel") + .count(); + + assertEquals(73, orgs_simrel); + } + + @Test + public void copyRelationsNoOpenorgsTest() throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCopyRelationsNoOpenorgs.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"))); + parser + .parseArgument( + new String[] { + "-i", testGraphBasePath, + "-w", testOutputBasePath, + "-o", testDedupGraphBasePath + }); + + new SparkCopyRelationsNoOpenorgs(parser, spark).run(isLookUpService); + + long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); + + assertEquals(400, relations); + } + + @AfterAll + public static void finalCleanUp() throws IOException { + FileUtils.deleteDirectory(new File(testOutputBasePath)); + FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); + } + + private static MapFunction patchRelFn() { + return value -> { + final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); + if (rel.getDataInfo() == null) { + rel.setDataInfo(new DataInfo()); + } + return rel; + }; + } +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/json/organization_idgeneration.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/json/organization_idgeneration.json new file mode 100644 index 000000000..7e8ec63c7 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/json/organization_idgeneration.json @@ -0,0 +1,3 @@ +{"eclegalbody": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "value": "false"}, "ecresearchorganization": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "value": "false"}, "legalname": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "value": "Universitas Dr Soetomo"}, "pid": [], "websiteurl": null, "oaiprovenance": null, "logourl": null, "collectedfrom": [{"dataInfo": null, "value": "DOAJ-Articles", "key": "10|driver______::bee53aa31dc2cbb538c10c2b65fa5824"}], "dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "alternativeNames": [], "echighereducation": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "value": "false"}, "id": "20|doajarticles::0af3389716873a78a03f2316de09845b", "eclegalperson": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "value": "false"}, "lastupdatetimestamp": 1616749318035, "ecinternationalorganizationeurinterests": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "value": "false"}, "dateofcollection": "2020-05-25", "dateoftransformation": "2020-05-25", "ecnonprofit": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "value": "false"}, "ecenterprise": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "value": "false"}, "ecinternationalorganization": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "value": "false"}, "ecnutscode": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "value": "false"}, "legalshortname": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "value": "Universitas Dr Soetomo"}, "country": {"classid": "ID", "classname": "Indonesia", "schemename": "dnet:countries", "schemeid": "dnet:countries"}, "extraInfo": [], "originalId": ["doajarticles::Universitas_Dr_Soetomo"], "ecsmevalidated": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "value": "false"}} +{"eclegalbody": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "value": "false"}, "ecresearchorganization": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "value": "false"}, "legalname": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "value": "University of DR Soetomo"}, "pid": [], "websiteurl": null, "oaiprovenance": null, "logourl": null, "collectedfrom": [{"dataInfo": null, "value": "DOAJ-Articles", "key": "10|driver______::bee53aa31dc2cbb538c10c2b65fa5824"}], "dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "alternativeNames": [], "echighereducation": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "value": "false"}, "id": "20|doajarticles::4a639ae8f8668ea44699e98ee5a8f1b9", "eclegalperson": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "value": "false"}, "lastupdatetimestamp": 1616749318035, "ecinternationalorganizationeurinterests": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "value": "false"}, "dateofcollection": "2018-09-18", "dateoftransformation": "2018-09-18", "ecnonprofit": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "value": "false"}, "ecenterprise": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "value": "false"}, "ecinternationalorganization": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "value": "false"}, "ecnutscode": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "value": "false"}, "legalshortname": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "value": "University of DR Soetomo"}, "country": {"classid": "ID", "classname": "Indonesia", "schemename": "dnet:countries", "schemeid": "dnet:countries"}, "extraInfo": [], "originalId": ["doajarticles::University_of_DR_Soetomo"], "ecsmevalidated": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.900"}, "value": "false"}} +{"eclegalbody": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.950"}, "value": "false"}, "ecresearchorganization": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.950"}, "value": "false"}, "legalname": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.950"}, "value": "Universitas Dr. Soetomo"}, "pid": [{"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.950"}, "qualifier": {"classid": "ISNI", "classname": "International Standard Name Identifier", "schemename": "dnet:pid_types", "schemeid": "dnet:pid_types"}, "value": "0000 0004 1758 8103"}, {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.950"}, "qualifier": {"classid": "GRID", "classname": "GRID", "schemename": "dnet:pid_types", "schemeid": "dnet:pid_types"}, "value": "grid.444390.e"}, {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.950"}, "qualifier": {"classid": "ROR", "classname": "ROR", "schemename": "dnet:pid_types", "schemeid": "dnet:pid_types"}, "value": "https://ror.org/04s03g948"}, {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.950"}, "qualifier": {"classid": "Wikidata", "classname": "Wikidata", "schemename": "dnet:pid_types", "schemeid": "dnet:pid_types"}, "value": "Q12523318"}], "websiteurl": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.950"}, "value": "https://unitomo.ac.id/"}, "oaiprovenance": null, "logourl": null, "collectedfrom": [{"dataInfo": null, "value": "OpenOrgs Database", "key": "10|openaire____::0362fcdb3076765d9c0041ad331553e8"}], "dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.950"}, "alternativeNames": [], "echighereducation": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.950"}, "value": "false"}, "id": "20|openorgs____::599c15a70fcb03be6ba08f75f14d6076", "eclegalperson": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.950"}, "value": "false"}, "lastupdatetimestamp": 1616749318824, "ecinternationalorganizationeurinterests": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.950"}, "value": "false"}, "dateofcollection": "2020-07-16", "dateoftransformation": "2020-07-16", "ecnonprofit": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.950"}, "value": "false"}, "ecenterprise": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.950"}, "value": "false"}, "ecinternationalorganization": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.950"}, "value": "false"}, "ecnutscode": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.950"}, "value": "false"}, "legalshortname": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.950"}, "value": "UNITOMO"}, "country": {"classid": "ID", "classname": "Indonesia", "schemename": "dnet:countries", "schemeid": "dnet:countries"}, "extraInfo": [], "originalId": ["openorgs____::0000034824"], "ecsmevalidated": {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.950"}, "value": "false"}} \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs/organization/organization.gz b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs/organization/organization.gz new file mode 100644 index 000000000..45b0edeb2 Binary files /dev/null and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs/organization/organization.gz differ diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs/relation/relation.gz b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs/relation/relation.gz new file mode 100644 index 000000000..f0c7f4211 Binary files /dev/null and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs/relation/relation.gz differ diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/organization/part-00000-5248a339-09c4-4aa5-83fe-4cc5405607ad-c000.txt.gz b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/organization/part-00000-5248a339-09c4-4aa5-83fe-4cc5405607ad-c000.txt.gz new file mode 100644 index 000000000..ba58d823c Binary files /dev/null and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/organization/part-00000-5248a339-09c4-4aa5-83fe-4cc5405607ad-c000.txt.gz differ diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/organization/part-00001-5248a339-09c4-4aa5-83fe-4cc5405607ad-c000.txt.gz b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/organization/part-00001-5248a339-09c4-4aa5-83fe-4cc5405607ad-c000.txt.gz new file mode 100644 index 000000000..137790bde Binary files /dev/null and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/organization/part-00001-5248a339-09c4-4aa5-83fe-4cc5405607ad-c000.txt.gz differ diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/organization/part-00002-5248a339-09c4-4aa5-83fe-4cc5405607ad-c000.txt.gz b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/organization/part-00002-5248a339-09c4-4aa5-83fe-4cc5405607ad-c000.txt.gz new file mode 100644 index 000000000..6b090b9f5 Binary files /dev/null and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/organization/part-00002-5248a339-09c4-4aa5-83fe-4cc5405607ad-c000.txt.gz differ diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/relation/part-00000-94553c9f-4ae6-4db9-919d-85ddc0a60f92-c000.txt.gz b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/relation/part-00000-94553c9f-4ae6-4db9-919d-85ddc0a60f92-c000.txt.gz new file mode 100644 index 000000000..080665d22 Binary files /dev/null and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/relation/part-00000-94553c9f-4ae6-4db9-919d-85ddc0a60f92-c000.txt.gz differ diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/relation/part-00003-94553c9f-4ae6-4db9-919d-85ddc0a60f92-c000.txt.gz b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/relation/part-00003-94553c9f-4ae6-4db9-919d-85ddc0a60f92-c000.txt.gz new file mode 100644 index 000000000..71fd6b35a Binary files /dev/null and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/relation/part-00003-94553c9f-4ae6-4db9-919d-85ddc0a60f92-c000.txt.gz differ diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_openorgs.xml b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_openorgs.xml new file mode 100644 index 000000000..59b6179ed --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_openorgs.xml @@ -0,0 +1,24 @@ + +
+ + + + + +
+ + + + + + + + + + + + + + SECURITY_PARAMETERS + +
\ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java index 7ff06e428..299ff7d78 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java @@ -57,6 +57,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.DbClient; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; +import eu.dnetlib.dhp.oa.graph.raw.common.MigrateAction; import eu.dnetlib.dhp.oa.graph.raw.common.VerifyNsPrefixPredicate; import eu.dnetlib.dhp.schema.oaf.Context; import eu.dnetlib.dhp.schema.oaf.DataInfo; @@ -84,6 +85,9 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i public static final String SOURCE_TYPE = "source_type"; public static final String TARGET_TYPE = "target_type"; + private static final String ORG_ORG_RELTYPE = "organizationOrganization"; + private static final String ORG_ORG_SUBRELTYPE = "dedup"; + private final DbClient dbClient; private final long lastUpdateTimestamp; @@ -122,35 +126,71 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i final Predicate verifyNamespacePrefix = new VerifyNsPrefixPredicate(nsPrefixBlacklist); - final boolean processClaims = parser.get("action") != null && parser.get("action").equalsIgnoreCase("claims"); - log.info("processClaims: {}", processClaims); + final MigrateAction process = parser.get("action") != null ? MigrateAction.valueOf(parser.get("action")) + : MigrateAction.openaire; + log.info("migrateAction: {}", process); try (final MigrateDbEntitiesApplication smdbe = new MigrateDbEntitiesApplication(hdfsPath, dbUrl, dbUser, dbPassword, isLookupUrl)) { - if (processClaims) { - log.info("Processing claims..."); - smdbe.execute("queryClaims.sql", smdbe::processClaims); - } else { - log.info("Processing datasources..."); - smdbe.execute("queryDatasources.sql", smdbe::processDatasource, verifyNamespacePrefix); - log.info("Processing projects..."); - if (dbSchema.equalsIgnoreCase("beta")) { - smdbe.execute("queryProjects.sql", smdbe::processProject, verifyNamespacePrefix); - } else { - smdbe.execute("queryProjects_production.sql", smdbe::processProject, verifyNamespacePrefix); - } + switch (process) { + case claims: + log.info("Processing claims..."); + smdbe.execute("queryClaims.sql", smdbe::processClaims); + break; + case openaire: + log.info("Processing datasources..."); + smdbe.execute("queryDatasources.sql", smdbe::processDatasource, verifyNamespacePrefix); - log.info("Processing orgs..."); - smdbe.execute("queryOrganizations.sql", smdbe::processOrganization, verifyNamespacePrefix); + log.info("Processing projects..."); + if (dbSchema.equalsIgnoreCase("beta")) { + smdbe.execute("queryProjects.sql", smdbe::processProject, verifyNamespacePrefix); + } else { + smdbe.execute("queryProjects_production.sql", smdbe::processProject, verifyNamespacePrefix); + } - log.info("Processing relationsNoRemoval ds <-> orgs ..."); - smdbe - .execute( - "queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization, verifyNamespacePrefix); + log.info("Processing Organizations..."); + smdbe.execute("queryOrganizations.sql", smdbe::processOrganization, verifyNamespacePrefix); - log.info("Processing projects <-> orgs ..."); - smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization, verifyNamespacePrefix); + log.info("Processing relationsNoRemoval ds <-> orgs ..."); + smdbe + .execute( + "queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization, + verifyNamespacePrefix); + + log.info("Processing projects <-> orgs ..."); + smdbe + .execute( + "queryProjectOrganization.sql", smdbe::processProjectOrganization, verifyNamespacePrefix); + break; + case openorgs_dedup: + log.info("Processing Openorgs..."); + smdbe + .execute( + "queryOpenOrgsForOrgsDedup.sql", smdbe::processOrganization, verifyNamespacePrefix); + + log.info("Processing Openorgs Merge Rels..."); + smdbe.execute("queryOpenOrgsSimilarityForOrgsDedup.sql", smdbe::processOrgOrgSimRels); + + break; + + case openorgs: + log.info("Processing Openorgs For Provision..."); + smdbe + .execute( + "queryOpenOrgsForProvision.sql", smdbe::processOrganization, verifyNamespacePrefix); + + log.info("Processing Openorgs Merge Rels..."); + smdbe.execute("queryOpenOrgsSimilarityForProvision.sql", smdbe::processOrgOrgSimRels); + + break; + + case openaire_organizations: + + log.info("Processing Organizations..."); + smdbe.execute("queryOrganizations.sql", smdbe::processOrganization, verifyNamespacePrefix); + + break; } log.info("All done."); } @@ -597,6 +637,45 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i } } + public List processOrgOrgSimRels(final ResultSet rs) { + try { + final DataInfo info = prepareDataInfo(rs); // TODO + + final String orgId1 = createOpenaireId(20, rs.getString("id1"), true); + final String orgId2 = createOpenaireId(20, rs.getString("id2"), true); + final String relClass = rs.getString("relclass"); + + final List collectedFrom = listKeyValues( + createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname")); + + final Relation r1 = new Relation(); + r1.setRelType(ORG_ORG_RELTYPE); + r1.setSubRelType(ORG_ORG_SUBRELTYPE); + r1.setRelClass(relClass); + r1.setSource(orgId1); + r1.setTarget(orgId2); + r1.setCollectedfrom(collectedFrom); + r1.setDataInfo(info); + r1.setLastupdatetimestamp(lastUpdateTimestamp); + + // removed because there's no difference between two sides //TODO +// final Relation r2 = new Relation(); +// r2.setRelType(ORG_ORG_RELTYPE); +// r2.setSubRelType(ORG_ORG_SUBRELTYPE); +// r2.setRelClass(relClass); +// r2.setSource(orgId2); +// r2.setTarget(orgId1); +// r2.setCollectedfrom(collectedFrom); +// r2.setDataInfo(info); +// r2.setLastupdatetimestamp(lastUpdateTimestamp); +// return Arrays.asList(r1, r2); + + return Arrays.asList(r1); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + @Override public void close() throws IOException { super.close(); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/MigrateAction.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/MigrateAction.java new file mode 100644 index 000000000..517cc8d62 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/MigrateAction.java @@ -0,0 +1,11 @@ + +package eu.dnetlib.dhp.oa.graph.raw.common; + +//enum to specify the different actions available for the MigrateDbEntitiesApplication job +public enum MigrateAction { + claims, // migrate claims to the raw graph + openorgs_dedup, // migrate organizations from openorgs to the raw graph + openorgs, // migrate organization from openorgs to the raw graph for provision + openaire, // migrate openaire entities to the raw graph + openaire_organizations // migrate openaire organizations entities to the raw graph +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index 823b185f8..389a889cb 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -25,6 +25,18 @@ postgresPassword the password postgres + + + postgresOpenOrgsURL + the postgres URL to access to the OpenOrgs database + + + postgresOpenOrgsUser + the user of OpenOrgs database + + + postgresOpenOrgsPassword + the password of OpenOrgs database dbSchema @@ -178,14 +190,34 @@ - + eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication - --hdfsPath${contentPath}/db_records + --hdfsPath${contentPath}/db_openaire --postgresUrl${postgresURL} --postgresUser${postgresUser} --postgresPassword${postgresPassword} --isLookupUrl${isLookupUrl} + --actionopenaire + --dbschema${dbSchema} + --nsPrefixBlacklist${nsPrefixBlacklist} + + + + + + + + + + + eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication + --hdfsPath${contentPath}/db_openorgs + --postgresUrl${postgresOpenOrgsURL} + --postgresUser${postgresOpenOrgsUser} + --postgresPassword${postgresOpenOrgsPassword} + --isLookupUrl${isLookupUrl} + --actionopenorgs --dbschema${dbSchema} --nsPrefixBlacklist${nsPrefixBlacklist} @@ -315,7 +347,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --sourcePaths${contentPath}/db_records,${contentPath}/oaf_records,${contentPath}/odf_records + --sourcePaths${contentPath}/db_openaire,${contentPath}/db_openorgs,${contentPath}/oaf_records,${contentPath}/odf_records --targetPath${workingDir}/entities --isLookupUrl${isLookupUrl} --shouldHashId${shouldHashId} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_organizations/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/config-default.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_organizations/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_organizations/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_organizations/oozie_app/workflow.xml new file mode 100644 index 000000000..714d69697 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_organizations/oozie_app/workflow.xml @@ -0,0 +1,270 @@ + + + + + graphOutputPath + the target path to store raw graph + + + reuseContent + false + should import content from the aggregator or reuse a previous version + + + contentPath + path location to store (or reuse) content from the aggregator + + + postgresURL + the postgres URL to access to the database + + + postgresUser + the user postgres + + + postgresPassword + the password postgres + + + postgresOpenOrgsURL + the postgres URL to access to the OpenOrgs database + + + postgresOpenOrgsUser + the user of OpenOrgs database + + + postgresOpenOrgsPassword + the password of OpenOrgs database + + + dbSchema + beta + the database schema according to the D-Net infrastructure (beta or production) + + + isLookupUrl + the address of the lookUp service + + + nsPrefixBlacklist + + a blacklist of nsprefixes (comma separeted) + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + ${wf:conf('reuseContent') eq false} + ${wf:conf('reuseContent') eq true} + + + + + + + + + + + + + + + eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication + --hdfsPath${contentPath}/db_openaire_organizations + --postgresUrl${postgresURL} + --postgresUser${postgresUser} + --postgresPassword${postgresPassword} + --isLookupUrl${isLookupUrl} + --actionopenaire_organizations + --dbschema${dbSchema} + --nsPrefixBlacklist${nsPrefixBlacklist} + + + + + + + + + + + eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication + --hdfsPath${contentPath}/db_openorgs + --postgresUrl${postgresOpenOrgsURL} + --postgresUser${postgresOpenOrgsUser} + --postgresPassword${postgresOpenOrgsPassword} + --isLookupUrl${isLookupUrl} + --actionopenorgs_dedup + --dbschema${dbSchema} + --nsPrefixBlacklist${nsPrefixBlacklist} + + + + + + + + + + yarn + cluster + GenerateEntities + eu.dnetlib.dhp.oa.graph.raw.GenerateEntitiesApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePaths${contentPath}/db_openaire_organizations,${contentPath}/db_openorgs + --targetPath${workingDir}/entities + --isLookupUrl${isLookupUrl} + + + + + + + + yarn + cluster + GenerateGraph + eu.dnetlib.dhp.oa.graph.raw.DispatchEntitiesApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --sourcePath${workingDir}/entities + --graphRawPath${graphOutputPath} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizationsFromOpenOrgsDB.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOpenOrgsForOrgsDedup.sql similarity index 68% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizationsFromOpenOrgsDB.sql rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOpenOrgsForOrgsDedup.sql index 3396f365c..aa694c7df 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizationsFromOpenOrgsDB.sql +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOpenOrgsForOrgsDedup.sql @@ -4,6 +4,8 @@ SELECT o.name AS legalname, array_agg(DISTINCT n.name) AS "alternativeNames", (array_agg(u.url))[1] AS websiteurl, + '' AS logourl, + o.creation_date AS dateofcollection, o.modification_date AS dateoftransformation, false AS inferred, false AS deletedbyinference, @@ -13,26 +15,41 @@ SELECT 'OpenOrgs Database' AS collectedfromname, o.country || '@@@dnet:countries' AS country, 'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction, - array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid + array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid, + null AS eclegalbody, + null AS eclegalperson, + null AS ecnonprofit, + null AS ecresearchorganization, + null AS echighereducation, + null AS ecinternationalorganizationeurinterests, + null AS ecinternationalorganization, + null AS ecenterprise, + null AS ecsmevalidated, + null AS ecnutscode FROM organizations o LEFT OUTER JOIN acronyms a ON (a.id = o.id) LEFT OUTER JOIN urls u ON (u.id = o.id) LEFT OUTER JOIN other_ids i ON (i.id = o.id) LEFT OUTER JOIN other_names n ON (n.id = o.id) +WHERE + o.status = 'approved' GROUP BY o.id, o.name, + o.creation_date, o.modification_date, o.country - + UNION ALL - + SELECT 'openorgsmesh'||substring(o.id, 13)||'-'||md5(n.name) AS organizationid, n.name AS legalshortname, n.name AS legalname, ARRAY[]::text[] AS "alternativeNames", (array_agg(u.url))[1] AS websiteurl, + '' AS logourl, + o.creation_date AS dateofcollection, o.modification_date AS dateoftransformation, false AS inferred, false AS deletedbyinference, @@ -42,12 +59,26 @@ SELECT 'OpenOrgs Database' AS collectedfromname, o.country || '@@@dnet:countries' AS country, 'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction, - array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid + array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid, + null AS eclegalbody, + null AS eclegalperson, + null AS ecnonprofit, + null AS ecresearchorganization, + null AS echighereducation, + null AS ecinternationalorganizationeurinterests, + null AS ecinternationalorganization, + null AS ecenterprise, + null AS ecsmevalidated, + null AS ecnutscode FROM other_names n LEFT OUTER JOIN organizations o ON (n.id = o.id) LEFT OUTER JOIN urls u ON (u.id = o.id) LEFT OUTER JOIN other_ids i ON (i.id = o.id) +WHERE + o.status = 'approved' GROUP BY - o.id, o.modification_date, o.country, n.name - - + o.id, + o.creation_date, + o.modification_date, + o.country, + n.name; \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOpenOrgsForProvision.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOpenOrgsForProvision.sql new file mode 100644 index 000000000..6f5f93789 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOpenOrgsForProvision.sql @@ -0,0 +1,41 @@ +SELECT + o.id AS organizationid, + coalesce((array_agg(a.acronym))[1], o.name) AS legalshortname, + o.name AS legalname, + array_agg(DISTINCT n.name) AS "alternativeNames", + (array_agg(u.url))[1] AS websiteurl, + '' AS logourl, + o.creation_date AS dateofcollection, + o.modification_date AS dateoftransformation, + false AS inferred, + false AS deletedbyinference, + 0.95 AS trust, + '' AS inferenceprovenance, + 'openaire____::openorgs' AS collectedfromid, + 'OpenOrgs Database' AS collectedfromname, + o.country || '@@@dnet:countries' AS country, + 'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction, + array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid, + null AS eclegalbody, + null AS eclegalperson, + null AS ecnonprofit, + null AS ecresearchorganization, + null AS echighereducation, + null AS ecinternationalorganizationeurinterests, + null AS ecinternationalorganization, + null AS ecenterprise, + null AS ecsmevalidated, + null AS ecnutscode +FROM organizations o + LEFT OUTER JOIN acronyms a ON (a.id = o.id) + LEFT OUTER JOIN urls u ON (u.id = o.id) + LEFT OUTER JOIN other_ids i ON (i.id = o.id) + LEFT OUTER JOIN other_names n ON (n.id = o.id) +WHERE + o.status = 'approved' +GROUP BY + o.id, + o.name, + o.creation_date, + o.modification_date, + o.country; \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOpenOrgsSimilarityForOrgsDedup.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOpenOrgsSimilarityForOrgsDedup.sql new file mode 100644 index 000000000..7ae5eb43f --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOpenOrgsSimilarityForOrgsDedup.sql @@ -0,0 +1,43 @@ +-- relations approved by the user +SELECT + local_id AS id1, + oa_original_id AS id2, + 'openaire____::openorgs' AS collectedfromid, + 'OpenOrgs Database' AS collectedfromname, + false AS inferred, + false AS deletedbyinference, + 0.99 AS trust, + '' AS inferenceprovenance, + 'isSimilarTo' AS relclass +FROM oa_duplicates WHERE reltype = 'is_similar' + +UNION ALL + +-- relations between openorgs and mesh (alternative names) +SELECT + o.id AS id1, + 'openorgsmesh'||substring(o.id, 13)||'-'||md5(n.name) AS id2, + 'openaire____::openorgs' AS collectedfromid, + 'OpenOrgs Database' AS collectedfromname, + false AS inferred, + false AS deletedbyinference, + 0.99 AS trust, + '' AS inferenceprovenance, + 'isSimilarTo' AS relclass +FROM other_names n + LEFT OUTER JOIN organizations o ON (n.id = o.id) + +UNION ALL + +-- diff relations approved by the user +SELECT + local_id AS id1, + oa_original_id AS id2, + 'openaire____::openorgs' AS collectedfromid, + 'OpenOrgs Database' AS collectedfromname, + false AS inferred, + false AS deletedbyinference, + 0.99 AS trust, + '' AS inferenceprovenance, + 'isDifferentFrom' AS relclass +FROM oa_duplicates WHERE reltype = 'is_different'; \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOpenOrgsSimilarityForProvision.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOpenOrgsSimilarityForProvision.sql new file mode 100644 index 000000000..db95cfe0b --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOpenOrgsSimilarityForProvision.sql @@ -0,0 +1,12 @@ +-- relations approved by the user and suggested by the dedup +SELECT + local_id AS id1, + oa_original_id AS id2, + 'openaire____::openorgs' AS collectedfromid, + 'OpenOrgs Database' AS collectedfromname, + false AS inferred, + false AS deletedbyinference, + 0.99 AS trust, + '' AS inferenceprovenance, + 'isSimilarTo' AS relclass +FROM oa_duplicates WHERE reltype = 'is_similar' OR reltype = 'suggested'; \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizations.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizations.sql index 938744b11..42ba0cf91 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizations.sql +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizations.sql @@ -24,7 +24,7 @@ SELECT d.officialname AS collectedfromname, o.country || '@@@dnet:countries' AS country, 'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction, - array_remove(array_agg(DISTINCT i.pid || '###' || i.issuertype), NULL) AS pid + array_agg(DISTINCT i.pid || '###' || i.issuertype || '@@@dnet:pid_types') AS pid FROM dsm_organizations o LEFT OUTER JOIN dsm_datasources d ON (d.id = o.collectedfrom) LEFT OUTER JOIN dsm_organizationpids p ON (p.organization = o.id) @@ -50,4 +50,4 @@ GROUP BY o.trust, d.id, d.officialname, - o.country + o.country; \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/querySimilarityFromOpenOrgsDB.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/querySimilarityFromOpenOrgsDB.sql deleted file mode 100644 index 4407559c6..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/querySimilarityFromOpenOrgsDB.sql +++ /dev/null @@ -1,17 +0,0 @@ -SELECT local_id AS id1, oa_original_id AS id2 FROM openaire_simrels WHERE reltype = 'is_similar' - -UNION ALL - -SELECT - o.id AS id1, - 'openorgsmesh'||substring(o.id, 13)||'-'||md5(a.acronym) AS id2 -FROM acronyms a - LEFT OUTER JOIN organizations o ON (a.id = o.id) - -UNION ALL - -SELECT - o.id AS id1, - 'openorgsmesh'||substring(o.id, 13)||'-'||md5(n.name) AS id2 -FROM other_names n - LEFT OUTER JOIN organizations o ON (n.id = o.id) diff --git a/pom.xml b/pom.xml index 8ffc1aab6..fdd07c97a 100644 --- a/pom.xml +++ b/pom.xml @@ -114,9 +114,6 @@ test - - -