diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java index 12ae4e73a..aa7a131e7 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java @@ -1,12 +1,9 @@ + package eu.dnetlib.dhp.oa.dedup; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.common.EntityType; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.OafEntity; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import java.io.IOException; +import java.util.Optional; + import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; @@ -18,83 +15,88 @@ import org.dom4j.DocumentException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Optional; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -public class SparkCopyOpenorgs extends AbstractSparkAction{ - private static final Logger log = LoggerFactory.getLogger(SparkCopyRels.class); +public class SparkCopyOpenorgs extends AbstractSparkAction { + private static final Logger log = LoggerFactory.getLogger(SparkCopyOpenorgs.class); - public SparkCopyOpenorgs(ArgumentApplicationParser parser, SparkSession spark) { - super(parser, spark); - } + public SparkCopyOpenorgs(ArgumentApplicationParser parser, SparkSession spark) { + super(parser, spark); + } - public static void main(String[] args) throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateSimRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/copyOpenorgs_parameters.json"))); - parser.parseArgument(args); + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/copyOpenorgs_parameters.json"))); + parser.parseArgument(args); - SparkConf conf = new SparkConf(); - new SparkCopyOpenorgs(parser, getSparkSession(conf)) - .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); - } + SparkConf conf = new SparkConf(); + new SparkCopyOpenorgs(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } - @Override - public void run(ISLookUpService isLookUpService) - throws DocumentException, IOException, ISLookUpException { + @Override + public void run(ISLookUpService isLookUpService) + throws DocumentException, IOException, ISLookUpException { - // read oozie parameters - final String graphBasePath = parser.get("graphBasePath"); - final String actionSetId = parser.get("actionSetId"); - final String workingPath = parser.get("workingPath"); - final int numPartitions = Optional - .ofNullable(parser.get("numPartitions")) - .map(Integer::valueOf) - .orElse(NUM_PARTITIONS); + // read oozie parameters + final String graphBasePath = parser.get("graphBasePath"); + final String actionSetId = parser.get("actionSetId"); + final String workingPath = parser.get("workingPath"); + final int numPartitions = Optional + .ofNullable(parser.get("numPartitions")) + .map(Integer::valueOf) + .orElse(NUM_PARTITIONS); - log.info("numPartitions: '{}'", numPartitions); - log.info("graphBasePath: '{}'", graphBasePath); - log.info("actionSetId: '{}'", actionSetId); - log.info("workingPath: '{}'", workingPath); + log.info("numPartitions: '{}'", numPartitions); + log.info("graphBasePath: '{}'", graphBasePath); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); - String subEntity = "organization"; - log.info("Copying openorgs to the working dir"); + String subEntity = "organization"; + log.info("Copying openorgs to the working dir"); - final String outputPath = DedupUtility.createDedupRecordPath(workingPath, actionSetId, subEntity); - removeOutputDir(spark, outputPath); + final String outputPath = DedupUtility.createDedupRecordPath(workingPath, actionSetId, subEntity); + removeOutputDir(spark, outputPath); - final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity); + final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity); - final Class clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity)); + final Class clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity)); - filterEntities(spark, entityPath, clazz) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); + filterEntities(spark, entityPath, clazz) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); - } + } - public static Dataset filterEntities( - final SparkSession spark, - final String entitiesInputPath, - final Class clazz) { + public static Dataset filterEntities( + final SparkSession spark, + final String entitiesInputPath, + final Class clazz) { - // - Dataset entities = spark - .read() - .textFile(entitiesInputPath) - .map( - (MapFunction) it -> { - T entity = OBJECT_MAPPER.readValue(it, clazz); - return entity; - }, - Encoders.kryo(clazz)); + // + Dataset entities = spark + .read() + .textFile(entitiesInputPath) + .map( + (MapFunction) it -> { + T entity = OBJECT_MAPPER.readValue(it, clazz); + return entity; + }, + Encoders.kryo(clazz)); - return entities.filter(entities.col("id").contains("openorgs____")); - } + return entities.filter(entities.col("id").contains("openorgs____")); + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java new file mode 100644 index 000000000..d705fca6b --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java @@ -0,0 +1,181 @@ + +package eu.dnetlib.dhp.oa.dedup; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.pace.config.DedupConfig; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.dom4j.DocumentException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; + +//copy simrels (verified) from relation to the workdir in order to make them available for the deduplication +public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction { + private static final Logger log = LoggerFactory.getLogger(SparkCopyOpenorgsMergeRels.class); + public static final String PROVENANCE_ACTION_CLASS = "sysimport:dedup"; + public static final String DNET_PROVENANCE_ACTIONS = "dnet:provenanceActions"; + + public SparkCopyOpenorgsMergeRels(ArgumentApplicationParser parser, SparkSession spark) { + super(parser, spark); + } + + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCopyOpenorgsMergeRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json"))); + parser.parseArgument(args); + + SparkConf conf = new SparkConf(); + new SparkCopyOpenorgsMergeRels(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } + + @Override + public void run(ISLookUpService isLookUpService) + throws DocumentException, IOException, ISLookUpException { + + // read oozie parameters + final String graphBasePath = parser.get("graphBasePath"); + final String actionSetId = parser.get("actionSetId"); + final String workingPath = parser.get("workingPath"); + final int numPartitions = Optional + .ofNullable(parser.get("numPartitions")) + .map(Integer::valueOf) + .orElse(NUM_PARTITIONS); + + log.info("numPartitions: '{}'", numPartitions); + log.info("graphBasePath: '{}'", graphBasePath); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); + + log.info("Copying OpenOrgs Merge Rels"); + + final String outputPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization"); + + removeOutputDir(spark, outputPath); + + final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation"); + + DedupConfig dedupConf = getConfigurations(isLookUpService, actionSetId).get(0); + + JavaRDD rawRels = spark + .read() + .textFile(relationPath) + .map(patchRelFn(), Encoders.bean(Relation.class)) + .toJavaRDD() + .filter(this::isOpenorgs) //takes only relations coming from openorgs + .filter(this::filterOpenorgsRels) //takes only isSimilarTo relations between organizations from openorgs + .filter(this::excludeOpenorgsMesh) //excludes relations between an organization and an openorgsmesh + .filter(this::excludeNonOpenorgs); //excludes relations with no openorgs id involved + + //turn openorgs isSimilarTo relations into mergerels + JavaRDD mergeRels = rawRels.flatMap(rel -> { + List mergerels = new ArrayList<>(); + + String openorgsId = rel.getSource().contains("openorgs____")? rel.getSource() : rel.getTarget(); + String mergedId = rel.getSource().contains("openorgs____")? rel.getTarget() : rel.getSource(); + + mergerels.add(rel(openorgsId, mergedId, "merges", dedupConf)); + mergerels.add(rel(mergedId, openorgsId, "isMergedIn", dedupConf)); + + return mergerels.iterator(); + }); + + mergeRels.saveAsTextFile(outputPath); + } + + private static MapFunction patchRelFn() { + return value -> { + final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); + if (rel.getDataInfo() == null) { + rel.setDataInfo(new DataInfo()); + } + return rel; + }; + } + + private boolean filterOpenorgsRels(Relation rel) { + + if (rel.getRelClass().equals("isSimilarTo") && rel.getRelType().equals("organizationOrganization") && rel.getSubRelType().equals("dedup")) + return true; + return false; + } + + private boolean isOpenorgs(Relation rel) { + + if (rel.getCollectedfrom() != null) { + for (KeyValue k: rel.getCollectedfrom()) { + if (k.getValue().equals("OpenOrgs Database")) { + return true; + } + } + } + return false; + } + + private boolean excludeOpenorgsMesh(Relation rel) { + + if (rel.getSource().equals("openorgsmesh") || rel.getTarget().equals("openorgsmesh")) { + return false; + } + return true; + } + + private boolean excludeNonOpenorgs(Relation rel) { + + if (rel.getSource().equals("openorgs____") || rel.getTarget().equals("openorgs____")) { + return true; + } + return false; + } + + private Relation rel(String source, String target, String relClass, DedupConfig dedupConf) { + + String entityType = dedupConf.getWf().getEntityType(); + + Relation r = new Relation(); + r.setSource(source); + r.setTarget(target); + r.setRelClass(relClass); + r.setRelType(entityType + entityType.substring(0, 1).toUpperCase() + entityType.substring(1)); + r.setSubRelType("dedup"); + + DataInfo info = new DataInfo(); + info.setDeletedbyinference(false); + info.setInferred(true); + info.setInvisible(false); + info.setInferenceprovenance(dedupConf.getWf().getConfigurationId()); + Qualifier provenanceAction = new Qualifier(); + provenanceAction.setClassid(PROVENANCE_ACTION_CLASS); + provenanceAction.setClassname(PROVENANCE_ACTION_CLASS); + provenanceAction.setSchemeid(DNET_PROVENANCE_ACTIONS); + provenanceAction.setSchemename(DNET_PROVENANCE_ACTIONS); + info.setProvenanceaction(provenanceAction); + + // TODO calculate the trust value based on the similarity score of the elements in the CC + // info.setTrust(); + + r.setDataInfo(info); + return r; + } +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java similarity index 61% rename from dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRels.java rename to dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java index 802085ab9..3ce676f84 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java @@ -1,29 +1,37 @@ + package eu.dnetlib.dhp.oa.dedup; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.pace.config.DedupConfig; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.dom4j.DocumentException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SparkSession; -import org.dom4j.DocumentException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Optional; //copy simrels (verified) from relation to the workdir in order to make them available for the deduplication -public class SparkCopyRels extends AbstractSparkAction{ - private static final Logger log = LoggerFactory.getLogger(SparkCopyRels.class); +public class SparkCopyOpenorgsSimRels extends AbstractSparkAction { + private static final Logger log = LoggerFactory.getLogger(SparkCopyOpenorgsMergeRels.class); - public SparkCopyRels(ArgumentApplicationParser parser, SparkSession spark) { + public SparkCopyOpenorgsSimRels(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); } @@ -31,13 +39,13 @@ public class SparkCopyRels extends AbstractSparkAction{ ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( - SparkCopyRels.class + SparkCopyOpenorgsSimRels.class .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/copyRels_parameters.json"))); + "/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json"))); parser.parseArgument(args); SparkConf conf = new SparkConf(); - new SparkCopyRels(parser, getSparkSession(conf)) + new SparkCopyOpenorgsSimRels(parser, getSparkSession(conf)) .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); } @@ -49,8 +57,6 @@ public class SparkCopyRels extends AbstractSparkAction{ final String graphBasePath = parser.get("graphBasePath"); final String actionSetId = parser.get("actionSetId"); final String workingPath = parser.get("workingPath"); - final String destination = parser.get("destination"); - final String entity = parser.get("entityType"); final int numPartitions = Optional .ofNullable(parser.get("numPartitions")) .map(Integer::valueOf) @@ -60,30 +66,24 @@ public class SparkCopyRels extends AbstractSparkAction{ log.info("graphBasePath: '{}'", graphBasePath); log.info("actionSetId: '{}'", actionSetId); log.info("workingPath: '{}'", workingPath); - log.info("entity: '{}'", entity); - log.info("Copying " + destination + " for: '{}'", entity); + log.info("Copying OpenOrgs SimRels"); - final String outputPath; - if (destination.contains("mergerel")) { - outputPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, entity); - } - else { - outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, entity); - } + final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, "organization"); removeOutputDir(spark, outputPath); final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation"); - JavaRDD simRels = - spark.read() - .textFile(relationPath) - .map(patchRelFn(), Encoders.bean(Relation.class)) - .toJavaRDD() - .filter(r -> filterRels(r, entity)); + JavaRDD rawRels = spark + .read() + .textFile(relationPath) + .map(patchRelFn(), Encoders.bean(Relation.class)) + .toJavaRDD() + .filter(this::isOpenorgs) + .filter(this::filterOpenorgsRels); - simRels.saveAsTextFile(outputPath); + save(spark.createDataset(rawRels.rdd(),Encoders.bean(Relation.class)), outputPath, SaveMode.Append); } private static MapFunction patchRelFn() { @@ -96,20 +96,23 @@ public class SparkCopyRels extends AbstractSparkAction{ }; } - private boolean filterRels(Relation rel, String entityType) { + private boolean filterOpenorgsRels(Relation rel) { - switch(entityType) { - case "result": - if (rel.getRelClass().equals("isSimilarTo") && rel.getRelType().equals("resultResult") && rel.getSubRelType().equals("dedup")) + if (rel.getRelClass().equals("isSimilarTo") && rel.getRelType().equals("organizationOrganization") && rel.getSubRelType().equals("dedup")) + return true; + return false; + } + + private boolean isOpenorgs(Relation rel) { + + if (rel.getCollectedfrom() != null) { + for (KeyValue k: rel.getCollectedfrom()) { + if (k.getValue().equals("OpenOrgs Database")) { return true; - break; - case "organization": - if (rel.getRelClass().equals("isSimilarTo") && rel.getRelType().equals("organizationOrganization") && rel.getSubRelType().equals("dedup")) - return true; - break; - default: - return false; + } + } } return false; } } + diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java new file mode 100644 index 000000000..319c40d8d --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java @@ -0,0 +1,110 @@ +package eu.dnetlib.dhp.oa.dedup; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.util.MapDocumentUtil; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.dom4j.DocumentException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.IOException; +import java.util.Optional; + +public class SparkCopyRelationsNoOpenorgs extends AbstractSparkAction { + + private static final Logger log = LoggerFactory.getLogger(SparkUpdateEntity.class); + + private static final String IDJSONPATH = "$.id"; + + public SparkCopyRelationsNoOpenorgs(ArgumentApplicationParser parser, SparkSession spark) { + super(parser, spark); + } + + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCopyRelationsNoOpenorgs.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"))); + parser.parseArgument(args); + + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + + new SparkUpdateEntity(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } + + public void run(ISLookUpService isLookUpService) throws IOException { + + final String graphBasePath = parser.get("graphBasePath"); + final String workingPath = parser.get("workingPath"); + final String dedupGraphPath = parser.get("dedupGraphPath"); + + log.info("graphBasePath: '{}'", graphBasePath); + log.info("workingPath: '{}'", workingPath); + log.info("dedupGraphPath: '{}'", dedupGraphPath); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation"); + final String outputPath = DedupUtility.createEntityPath(dedupGraphPath, "relation"); + + removeOutputDir(spark, outputPath); + + JavaRDD simRels = spark + .read() + .textFile(relationPath) + .map(patchRelFn(), Encoders.bean(Relation.class)) + .toJavaRDD() + .filter(this::excludeOpenorgsRels); + + simRels.saveAsTextFile(outputPath); + + } + + private static MapFunction patchRelFn() { + return value -> { + final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); + if (rel.getDataInfo() == null) { + rel.setDataInfo(new DataInfo()); + } + return rel; + }; + } + + private boolean excludeOpenorgsRels(Relation rel) { + + if (rel.getCollectedfrom() != null) { + for (KeyValue k: rel.getCollectedfrom()) { + if (k.getValue().equals("OpenOrgs Database")) { + return false; + } + } + } + return true; + } +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkRemoveDiffRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkRemoveDiffRels.java index 030f3b783..6f012e00a 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkRemoveDiffRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkRemoveDiffRels.java @@ -1,18 +1,15 @@ + package eu.dnetlib.dhp.oa.dedup; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.dedup.graph.ConnectedComponent; -import eu.dnetlib.dhp.oa.dedup.graph.GraphProcessor; -import eu.dnetlib.dhp.oa.dedup.model.Block; -import eu.dnetlib.dhp.schema.oaf.DataInfo; -import eu.dnetlib.dhp.schema.oaf.Qualifier; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import eu.dnetlib.pace.config.DedupConfig; -import eu.dnetlib.pace.model.MapDocument; -import eu.dnetlib.pace.util.MapDocumentUtil; +import static eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels.DNET_PROVENANCE_ACTIONS; +import static eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels.PROVENANCE_ACTION_CLASS; +import static eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels.hash; + +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; @@ -29,205 +26,215 @@ import org.apache.spark.sql.SparkSession; import org.dom4j.DocumentException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.dedup.graph.ConnectedComponent; +import eu.dnetlib.dhp.oa.dedup.graph.GraphProcessor; +import eu.dnetlib.dhp.oa.dedup.model.Block; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.config.DedupConfig; +import eu.dnetlib.pace.model.MapDocument; +import eu.dnetlib.pace.util.MapDocumentUtil; import scala.Tuple2; -import java.io.IOException; -import java.util.*; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; - -import static eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels.DNET_PROVENANCE_ACTIONS; -import static eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels.PROVENANCE_ACTION_CLASS; -import static eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels.hash; - public class SparkRemoveDiffRels extends AbstractSparkAction { - private static final Logger log = LoggerFactory.getLogger(SparkRemoveDiffRels.class); + private static final Logger log = LoggerFactory.getLogger(SparkRemoveDiffRels.class); - public SparkRemoveDiffRels(ArgumentApplicationParser parser, SparkSession spark) { - super(parser, spark); - } + public SparkRemoveDiffRels(ArgumentApplicationParser parser, SparkSession spark) { + super(parser, spark); + } - public static void main(String[] args) throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateSimRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); - parser.parseArgument(args); + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); + parser.parseArgument(args); - SparkConf conf = new SparkConf(); - new SparkCreateSimRels(parser, getSparkSession(conf)) - .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); - } + SparkConf conf = new SparkConf(); + new SparkCreateSimRels(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } - @Override - public void run(ISLookUpService isLookUpService) - throws DocumentException, IOException, ISLookUpException { + @Override + public void run(ISLookUpService isLookUpService) + throws DocumentException, IOException, ISLookUpException { - // read oozie parameters - final String graphBasePath = parser.get("graphBasePath"); - final String isLookUpUrl = parser.get("isLookUpUrl"); - final String actionSetId = parser.get("actionSetId"); - final String workingPath = parser.get("workingPath"); - final int numPartitions = Optional - .ofNullable(parser.get("numPartitions")) - .map(Integer::valueOf) - .orElse(NUM_PARTITIONS); + // read oozie parameters + final String graphBasePath = parser.get("graphBasePath"); + final String isLookUpUrl = parser.get("isLookUpUrl"); + final String actionSetId = parser.get("actionSetId"); + final String workingPath = parser.get("workingPath"); + final int numPartitions = Optional + .ofNullable(parser.get("numPartitions")) + .map(Integer::valueOf) + .orElse(NUM_PARTITIONS); - log.info("numPartitions: '{}'", numPartitions); - log.info("graphBasePath: '{}'", graphBasePath); - log.info("isLookUpUrl: '{}'", isLookUpUrl); - log.info("actionSetId: '{}'", actionSetId); - log.info("workingPath: '{}'", workingPath); + log.info("numPartitions: '{}'", numPartitions); + log.info("graphBasePath: '{}'", graphBasePath); + log.info("isLookUpUrl: '{}'", isLookUpUrl); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); - // for each dedup configuration - for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) { + // for each dedup configuration + for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) { - final String entity = dedupConf.getWf().getEntityType(); - final String subEntity = dedupConf.getWf().getSubEntityValue(); - log.info("Removing diffrels for: '{}'", subEntity); + final String entity = dedupConf.getWf().getEntityType(); + final String subEntity = dedupConf.getWf().getSubEntityValue(); + log.info("Removing diffrels for: '{}'", subEntity); - final String mergeRelsPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity); + final String mergeRelsPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity); - final String relationPath = DedupUtility.createEntityPath(graphBasePath, subEntity); + final String relationPath = DedupUtility.createEntityPath(graphBasePath, subEntity); - final int maxIterations = dedupConf.getWf().getMaxIterations(); - log.info("Max iterations {}", maxIterations); + final int maxIterations = dedupConf.getWf().getMaxIterations(); + log.info("Max iterations {}", maxIterations); - JavaRDD mergeRelsRDD = spark - .read() - .load(mergeRelsPath) - .as(Encoders.bean(Relation.class)) - .where("relClass == 'merges'") - .toJavaRDD(); + JavaRDD mergeRelsRDD = spark + .read() + .load(mergeRelsPath) + .as(Encoders.bean(Relation.class)) + .where("relClass == 'merges'") + .toJavaRDD(); - JavaRDD, String>> diffRelsRDD = spark - .read() - .textFile(relationPath) - .map(patchRelFn(), Encoders.bean(Relation.class)) - .toJavaRDD().filter(r -> filterRels(r, entity)) - .map(rel -> { - if (rel.getSource().compareTo(rel.getTarget()) < 0) - return new Tuple2<>(new Tuple2<>(rel.getSource(), rel.getTarget()), "diffRel"); - else - return new Tuple2<>(new Tuple2<>(rel.getTarget(), rel.getSource()), "diffRel"); - }); + JavaRDD, String>> diffRelsRDD = spark + .read() + .textFile(relationPath) + .map(patchRelFn(), Encoders.bean(Relation.class)) + .toJavaRDD() + .filter(r -> filterRels(r, entity)) + .map(rel -> { + if (rel.getSource().compareTo(rel.getTarget()) < 0) + return new Tuple2<>(new Tuple2<>(rel.getSource(), rel.getTarget()), "diffRel"); + else + return new Tuple2<>(new Tuple2<>(rel.getTarget(), rel.getSource()), "diffRel"); + }); - JavaRDD, String>> flatMergeRels = mergeRelsRDD - .mapToPair(rel -> new Tuple2<>(rel.getSource(), rel.getTarget())) - .groupByKey() - .flatMap(g -> { - List, String>> rels = new ArrayList<>(); + JavaRDD, String>> flatMergeRels = mergeRelsRDD + .mapToPair(rel -> new Tuple2<>(rel.getSource(), rel.getTarget())) + .groupByKey() + .flatMap(g -> { + List, String>> rels = new ArrayList<>(); - List ids = StreamSupport - .stream(g._2().spliterator(), false) - .collect(Collectors.toList()); + List ids = StreamSupport + .stream(g._2().spliterator(), false) + .collect(Collectors.toList()); - for (int i = 0; i < ids.size(); i++){ - for (int j = i+1; j < ids.size(); j++){ - if (ids.get(i).compareTo(ids.get(j)) < 0) - rels.add(new Tuple2<>(new Tuple2<>(ids.get(i), ids.get(j)), g._1())); - else - rels.add(new Tuple2<>(new Tuple2<>(ids.get(j), ids.get(i)), g._1())); - } - } - return rels.iterator(); + for (int i = 0; i < ids.size(); i++) { + for (int j = i + 1; j < ids.size(); j++) { + if (ids.get(i).compareTo(ids.get(j)) < 0) + rels.add(new Tuple2<>(new Tuple2<>(ids.get(i), ids.get(j)), g._1())); + else + rels.add(new Tuple2<>(new Tuple2<>(ids.get(j), ids.get(i)), g._1())); + } + } + return rels.iterator(); - }); + }); - JavaRDD purgedMergeRels = flatMergeRels.union(diffRelsRDD) - .mapToPair(rel -> new Tuple2<>(rel._1(), Arrays.asList(rel._2()))) - .reduceByKey((a, b) -> { - List list = new ArrayList(); - list.addAll(a); - list.addAll(b); - return list; - }) - .filter(rel -> rel._2().size() == 1) - .mapToPair(rel -> new Tuple2<>(rel._2().get(0), rel._1())) - .flatMap(rel -> { - List> rels = new ArrayList<>(); - String source = rel._1(); - rels.add(new Tuple2<>(source, rel._2()._1())); - rels.add(new Tuple2<>(source, rel._2()._2())); - return rels.iterator(); - }) - .distinct() - .flatMap(rel -> tupleToMergeRel(rel, dedupConf)); + JavaRDD purgedMergeRels = flatMergeRels + .union(diffRelsRDD) + .mapToPair(rel -> new Tuple2<>(rel._1(), Arrays.asList(rel._2()))) + .reduceByKey((a, b) -> { + List list = new ArrayList(); + list.addAll(a); + list.addAll(b); + return list; + }) + .filter(rel -> rel._2().size() == 1) + .mapToPair(rel -> new Tuple2<>(rel._2().get(0), rel._1())) + .flatMap(rel -> { + List> rels = new ArrayList<>(); + String source = rel._1(); + rels.add(new Tuple2<>(source, rel._2()._1())); + rels.add(new Tuple2<>(source, rel._2()._2())); + return rels.iterator(); + }) + .distinct() + .flatMap(rel -> tupleToMergeRel(rel, dedupConf)); - spark - .createDataset(purgedMergeRels.rdd(), Encoders.bean(Relation.class)) - .write() - .mode(SaveMode.Overwrite).parquet(mergeRelsPath); - } - } + spark + .createDataset(purgedMergeRels.rdd(), Encoders.bean(Relation.class)) + .write() + .mode(SaveMode.Overwrite) + .json(mergeRelsPath); + } + } - private static MapFunction patchRelFn() { - return value -> { - final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); - if (rel.getDataInfo() == null) { - rel.setDataInfo(new DataInfo()); - } - return rel; - }; - } + private static MapFunction patchRelFn() { + return value -> { + final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); + if (rel.getDataInfo() == null) { + rel.setDataInfo(new DataInfo()); + } + return rel; + }; + } - private boolean filterRels(Relation rel, String entityType) { + private boolean filterRels(Relation rel, String entityType) { - switch(entityType) { - case "result": - if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("resultResult") && rel.getSubRelType().equals("dedup")) - return true; - break; - case "organization": - if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("organizationOrganization") && rel.getSubRelType().equals("dedup")) - return true; - break; - default: - return false; - } - return false; - } + switch (entityType) { + case "result": + if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("resultResult") + && rel.getSubRelType().equals("dedup")) + return true; + break; + case "organization": + if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("organizationOrganization") + && rel.getSubRelType().equals("dedup")) + return true; + break; + default: + return false; + } + return false; + } - public Iterator tupleToMergeRel(Tuple2 rel, DedupConfig dedupConf) { + public Iterator tupleToMergeRel(Tuple2 rel, DedupConfig dedupConf) { - List rels = new ArrayList<>(); + List rels = new ArrayList<>(); - rels.add(rel(rel._1(), rel._2(), "merges", dedupConf)); - rels.add(rel(rel._2(), rel._1(), "isMergedIn", dedupConf)); + rels.add(rel(rel._1(), rel._2(), "merges", dedupConf)); + rels.add(rel(rel._2(), rel._1(), "isMergedIn", dedupConf)); - return rels.iterator(); - } + return rels.iterator(); + } - private Relation rel(String source, String target, String relClass, DedupConfig dedupConf) { + private Relation rel(String source, String target, String relClass, DedupConfig dedupConf) { - String entityType = dedupConf.getWf().getEntityType(); + String entityType = dedupConf.getWf().getEntityType(); - Relation r = new Relation(); - r.setSource(source); - r.setTarget(target); - r.setRelClass(relClass); - r.setRelType(entityType + entityType.substring(0, 1).toUpperCase() + entityType.substring(1)); - r.setSubRelType("dedup"); + Relation r = new Relation(); + r.setSource(source); + r.setTarget(target); + r.setRelClass(relClass); + r.setRelType(entityType + entityType.substring(0, 1).toUpperCase() + entityType.substring(1)); + r.setSubRelType("dedup"); - DataInfo info = new DataInfo(); - info.setDeletedbyinference(false); - info.setInferred(true); - info.setInvisible(false); - info.setInferenceprovenance(dedupConf.getWf().getConfigurationId()); - Qualifier provenanceAction = new Qualifier(); - provenanceAction.setClassid(PROVENANCE_ACTION_CLASS); - provenanceAction.setClassname(PROVENANCE_ACTION_CLASS); - provenanceAction.setSchemeid(DNET_PROVENANCE_ACTIONS); - provenanceAction.setSchemename(DNET_PROVENANCE_ACTIONS); - info.setProvenanceaction(provenanceAction); + DataInfo info = new DataInfo(); + info.setDeletedbyinference(false); + info.setInferred(true); + info.setInvisible(false); + info.setInferenceprovenance(dedupConf.getWf().getConfigurationId()); + Qualifier provenanceAction = new Qualifier(); + provenanceAction.setClassid(PROVENANCE_ACTION_CLASS); + provenanceAction.setClassname(PROVENANCE_ACTION_CLASS); + provenanceAction.setSchemeid(DNET_PROVENANCE_ACTIONS); + provenanceAction.setSchemename(DNET_PROVENANCE_ACTIONS); + info.setProvenanceaction(provenanceAction); - // TODO calculate the trust value based on the similarity score of the elements in the CC - // info.setTrust(); + // TODO calculate the trust value based on the similarity score of the elements in the CC + // info.setTrust(); - r.setDataInfo(info); - return r; - } + r.setDataInfo(info); + return r; + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/copyRels_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json similarity index 84% rename from dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/copyRels_parameters.json rename to dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json index 715b0e74e..75054637f 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/copyRels_parameters.json +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json @@ -18,9 +18,9 @@ "paramRequired": true }, { - "paramName": "e", - "paramLongName": "entityType", - "paramDescription": "type of the entity for the merge relations", + "paramName": "la", + "paramLongName": "isLookUpUrl", + "paramDescription": "the url for the lookup service", "paramRequired": true }, { diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml index a6b313cad..4c5505eb5 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml @@ -85,9 +85,6 @@ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - @@ -96,34 +93,6 @@ - - - yarn - cluster - Copy Merge Relations - eu.dnetlib.dhp.oa.dedup.SparkCopyRels - dhp-dedup-openaire-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - - --graphBasePath${graphBasePath} - --workingPath${workingPath} - --actionSetId${actionSetId} - --entityTypeorganization - --destinationsimrel - --numPartitions8000 - - - - - yarn @@ -147,6 +116,33 @@ --workingPath${workingPath} --numPartitions8000 + + + + + + + + yarn + cluster + Copy OpenOrgs Sim Rels + eu.dnetlib.dhp.oa.dedup.SparkCopyOpenorgsSimRels + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphBasePath${graphBasePath} + --workingPath${workingPath} + --actionSetId${actionSetId} + --numPartitions8000 + diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml index d22f05ca8..998f3ac21 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml @@ -12,6 +12,10 @@ actionSetId id of the actionSet + + actionSetIdOpenorgs + id of the actionSet for OpenOrgs dedup + workingPath path for the working directory @@ -169,17 +173,17 @@ --isLookUpUrl${isLookUpUrl} --actionSetId${actionSetId} - + - - + + yarn cluster Copy Merge Relations - eu.dnetlib.dhp.oa.dedup.SparkCopyRels + eu.dnetlib.dhp.oa.dedup.SparkCopyOpenorgsMergeRels dhp-dedup-openaire-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -193,15 +197,15 @@ --graphBasePath${graphBasePath} --workingPath${workingPath} - --actionSetId${actionSetId} - --entityTypeorganization - --destinationmergerel + --isLookUpUrl${isLookUpUrl} + --actionSetId${actionSetIdOpenorgs} --numPartitions8000 + yarn @@ -222,7 +226,7 @@ --graphBasePath${graphBasePath} --workingPath${workingPath} --isLookUpUrl${isLookUpUrl} - --actionSetId${actionSetId} + --actionSetId${actionSetIdOpenorgs} @@ -253,15 +257,28 @@ + - - - - - -pb - ${graphBasePath}/relation - ${dedupGraphPath}/relation - + + yarn + cluster + Update Entity + eu.dnetlib.dhp.oa.dedup.SparkCopyRelationsNoOpenorgs + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphBasePath${graphBasePath} + --workingPath${workingPath} + --dedupGraphPath${dedupGraphPath} + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java index b6210013c..532bb43b2 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java @@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.DbClient; import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; +import eu.dnetlib.dhp.oa.graph.raw.common.MigrateAction; import eu.dnetlib.dhp.oa.graph.raw.common.VerifyNsPrefixPredicate; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.schema.oaf.Context; @@ -76,6 +77,9 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i public static final String SOURCE_TYPE = "source_type"; public static final String TARGET_TYPE = "target_type"; + private static final String ORG_ORG_RELTYPE = "organizationOrganization"; + private static final String ORG_ORG_SUBRELTYPE = "dedup"; + private final DbClient dbClient; private final long lastUpdateTimestamp; @@ -114,35 +118,53 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i final Predicate verifyNamespacePrefix = new VerifyNsPrefixPredicate(nsPrefixBlacklist); - final boolean processClaims = parser.get("action") != null && parser.get("action").equalsIgnoreCase("claims"); - log.info("processClaims: {}", processClaims); + final MigrateAction process = parser.get("action") != null ? MigrateAction.valueOf(parser.get("action")) + : MigrateAction.openaire; + log.info("migrateAction: {}", process); try (final MigrateDbEntitiesApplication smdbe = new MigrateDbEntitiesApplication(hdfsPath, dbUrl, dbUser, dbPassword, isLookupUrl)) { - if (processClaims) { - log.info("Processing claims..."); - smdbe.execute("queryClaims.sql", smdbe::processClaims); - } else { - log.info("Processing datasources..."); - smdbe.execute("queryDatasources.sql", smdbe::processDatasource, verifyNamespacePrefix); - log.info("Processing projects..."); - if (dbSchema.equalsIgnoreCase("beta")) { - smdbe.execute("queryProjects.sql", smdbe::processProject, verifyNamespacePrefix); - } else { - smdbe.execute("queryProjects_production.sql", smdbe::processProject, verifyNamespacePrefix); - } + switch (process) { + case claims: + log.info("Processing claims..."); + smdbe.execute("queryClaims.sql", smdbe::processClaims); + break; + case openaire: + log.info("Processing datasources..."); + smdbe.execute("queryDatasources.sql", smdbe::processDatasource, verifyNamespacePrefix); - log.info("Processing orgs..."); - smdbe.execute("queryOrganizations.sql", smdbe::processOrganization, verifyNamespacePrefix); + log.info("Processing projects..."); + if (dbSchema.equalsIgnoreCase("beta")) { + smdbe.execute("queryProjects.sql", smdbe::processProject, verifyNamespacePrefix); + } else { + smdbe.execute("queryProjects_production.sql", smdbe::processProject, verifyNamespacePrefix); + } - log.info("Processing relationsNoRemoval ds <-> orgs ..."); - smdbe - .execute( - "queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization, verifyNamespacePrefix); + log.info("Processing Organizations..."); + smdbe.execute("queryOrganizations.sql", smdbe::processOrganization, verifyNamespacePrefix); - log.info("Processing projects <-> orgs ..."); - smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization, verifyNamespacePrefix); + log.info("Processing relationsNoRemoval ds <-> orgs ..."); + smdbe + .execute( + "queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization, + verifyNamespacePrefix); + + log.info("Processing projects <-> orgs ..."); + smdbe + .execute( + "queryProjectOrganization.sql", smdbe::processProjectOrganization, verifyNamespacePrefix); + break; + case openorgs: + log.info("Processing Openorgs..."); + smdbe + .execute( + "queryOrganizationsFromOpenOrgsDB.sql", smdbe::processOrganization, verifyNamespacePrefix); + + log.info("Processing Openorgs Merge Rels..."); + smdbe.execute("querySimilarityFromOpenOrgsDB.sql", smdbe::processOrgOrgSimRels); + + break; } log.info("All done."); } @@ -585,6 +607,43 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i } } + public List processOrgOrgSimRels(final ResultSet rs) { + try { + final DataInfo info = prepareDataInfo(rs); // TODO + + final String orgId1 = createOpenaireId(20, rs.getString("id1"), true); + final String orgId2 = createOpenaireId(40, rs.getString("id2"), true); + final String relClass = rs.getString("relclass"); + + final List collectedFrom = listKeyValues( + createOpenaireId(10, rs.getString("collectedfromid"), true), rs.getString("collectedfromname")); + + final Relation r1 = new Relation(); + r1.setRelType(ORG_ORG_RELTYPE); + r1.setSubRelType(ORG_ORG_SUBRELTYPE); + r1.setRelClass(relClass); + r1.setSource(orgId1); + r1.setTarget(orgId2); + r1.setCollectedfrom(collectedFrom); + r1.setDataInfo(info); + r1.setLastupdatetimestamp(lastUpdateTimestamp); + + final Relation r2 = new Relation(); + r2.setRelType(ORG_ORG_RELTYPE); + r2.setSubRelType(ORG_ORG_SUBRELTYPE); + r2.setRelClass(relClass); + r2.setSource(orgId2); + r2.setTarget(orgId1); + r2.setCollectedfrom(collectedFrom); + r2.setDataInfo(info); + r2.setLastupdatetimestamp(lastUpdateTimestamp); + + return Arrays.asList(r1, r2); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + @Override public void close() throws IOException { super.close(); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/MigrateAction.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/MigrateAction.java new file mode 100644 index 000000000..d9ee9bb6a --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/MigrateAction.java @@ -0,0 +1,9 @@ + +package eu.dnetlib.dhp.oa.graph.raw.common; + +//enum to specify the different actions available for the MigrateDbEntitiesApplication job +public enum MigrateAction { + claims, // migrate claims to the raw graph + openorgs, // migrate organizations from openorgs to the raw graph + openaire // migrate openaire entities to the raw graph +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index d8146d9a2..adaee65d3 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -25,6 +25,18 @@ postgresPassword the password postgres + + + postgresOpenOrgsURL + the postgres URL to access to the OpenOrgs database + + + postgresOpenOrgsUser + the user of OpenOrgs database + + + postgresOpenOrgsPassword + the password of OpenOrgs database dbSchema @@ -178,14 +190,34 @@ - + eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication - --hdfsPath${contentPath}/db_records + --hdfsPath${contentPath}/db_openaire --postgresUrl${postgresURL} --postgresUser${postgresUser} --postgresPassword${postgresPassword} --isLookupUrl${isLookupUrl} + --actionopenaire + --dbschema${dbSchema} + --nsPrefixBlacklist${nsPrefixBlacklist} + + + + + + + + + + + eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication + --hdfsPath${contentPath}/db_openorgs + --postgresUrl${postgresOpenOrgsURL} + --postgresUser${postgresOpenOrgsUser} + --postgresPassword${postgresOpenOrgsPassword} + --isLookupUrl${isLookupUrl} + --actionopenorgs --dbschema${dbSchema} --nsPrefixBlacklist${nsPrefixBlacklist} @@ -314,7 +346,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --sourcePaths${contentPath}/db_records,${contentPath}/oaf_records,${contentPath}/odf_records + --sourcePaths${contentPath}/db_openaire,${contentPath}/db_openorgs,${contentPath}/oaf_records,${contentPath}/odf_records --targetPath${workingDir}/entities --isLookupUrl${isLookupUrl} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizationsFromOpenOrgsDB.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizationsFromOpenOrgsDB.sql index 3396f365c..82ece5a1c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizationsFromOpenOrgsDB.sql +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizationsFromOpenOrgsDB.sql @@ -4,6 +4,8 @@ SELECT o.name AS legalname, array_agg(DISTINCT n.name) AS "alternativeNames", (array_agg(u.url))[1] AS websiteurl, + '' AS logourl, + o.creation_date AS dateofcollection, o.modification_date AS dateoftransformation, false AS inferred, false AS deletedbyinference, @@ -13,7 +15,17 @@ SELECT 'OpenOrgs Database' AS collectedfromname, o.country || '@@@dnet:countries' AS country, 'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction, - array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid + array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid, + null AS eclegalbody, + null AS eclegalperson, + null AS ecnonprofit, + null AS ecresearchorganization, + null AS echighereducation, + null AS ecinternationalorganizationeurinterests, + null AS ecinternationalorganization, + null AS ecenterprise, + null AS ecsmevalidated, + null AS ecnutscode FROM organizations o LEFT OUTER JOIN acronyms a ON (a.id = o.id) LEFT OUTER JOIN urls u ON (u.id = o.id) @@ -22,6 +34,7 @@ FROM organizations o GROUP BY o.id, o.name, + o.creation_date, o.modification_date, o.country @@ -33,6 +46,8 @@ SELECT n.name AS legalname, ARRAY[]::text[] AS "alternativeNames", (array_agg(u.url))[1] AS websiteurl, + '' AS logourl, + o.creation_date AS dateofcollection, o.modification_date AS dateoftransformation, false AS inferred, false AS deletedbyinference, @@ -42,12 +57,24 @@ SELECT 'OpenOrgs Database' AS collectedfromname, o.country || '@@@dnet:countries' AS country, 'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction, - array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid + array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid, + null AS eclegalbody, + null AS eclegalperson, + null AS ecnonprofit, + null AS ecresearchorganization, + null AS echighereducation, + null AS ecinternationalorganizationeurinterests, + null AS ecinternationalorganization, + null AS ecenterprise, + null AS ecsmevalidated, + null AS ecnutscode FROM other_names n LEFT OUTER JOIN organizations o ON (n.id = o.id) LEFT OUTER JOIN urls u ON (u.id = o.id) LEFT OUTER JOIN other_ids i ON (i.id = o.id) GROUP BY - o.id, o.modification_date, o.country, n.name - - + o.id, + o.creation_date, + o.modification_date, + o.country, + n.name; \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/querySimilarityFromOpenOrgsDB.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/querySimilarityFromOpenOrgsDB.sql index 4407559c6..138bf6a96 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/querySimilarityFromOpenOrgsDB.sql +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/querySimilarityFromOpenOrgsDB.sql @@ -1,17 +1,47 @@ -SELECT local_id AS id1, oa_original_id AS id2 FROM openaire_simrels WHERE reltype = 'is_similar' +-- relations approved by the user +SELECT + local_id AS id1, + oa_original_id AS id2, + 'openaire____::openorgs' AS collectedfromid, + 'OpenOrgs Database' AS collectedfromname, + false AS inferred, + false AS deletedbyinference, + 0.99 AS trust, + '' AS inferenceprovenance, + 'isSimilarTo' AS relclass +FROM oa_duplicates WHERE reltype = 'is_similar' UNION ALL +-- relations between openorgs and mesh (alternative names) SELECT - o.id AS id1, - 'openorgsmesh'||substring(o.id, 13)||'-'||md5(a.acronym) AS id2 -FROM acronyms a - LEFT OUTER JOIN organizations o ON (a.id = o.id) - -UNION ALL - -SELECT - o.id AS id1, - 'openorgsmesh'||substring(o.id, 13)||'-'||md5(n.name) AS id2 + o.id AS id1, + 'openorgsmesh'||substring(o.id, 13)||'-'||md5(n.name) AS id2, + 'openaire____::openorgs' AS collectedfromid, + 'OpenOrgs Database' AS collectedfromname, + false AS inferred, + false AS deletedbyinference, + 0.99 AS trust, + '' AS inferenceprovenance, + 'isSimilarTo' AS relclass FROM other_names n LEFT OUTER JOIN organizations o ON (n.id = o.id) + +UNION ALL + +-- diff relations approved by the user +SELECT + local_id AS id1, + oa_original_id AS id2, + 'openaire____::openorgs' AS collectedfromid, + 'OpenOrgs Database' AS collectedfromname, + false AS inferred, + false AS deletedbyinference, + 0.99 AS trust, + '' AS inferenceprovenance, + 'isDifferentFrom' AS relclass +FROM oa_duplicates WHERE reltype = 'is_different' + + +--TODO ??? +--Creare relazioni isDifferentFrom anche tra i suggerimenti: (A is_similar B) and (A is_different C) => (B is_different C) \ No newline at end of file