diff --git a/dhp-workflows/dhp-dedup-openaire/pom.xml b/dhp-workflows/dhp-dedup-openaire/pom.xml
index 03ddbcf4c..04e158542 100644
--- a/dhp-workflows/dhp-dedup-openaire/pom.xml
+++ b/dhp-workflows/dhp-dedup-openaire/pom.xml
@@ -90,7 +90,10 @@
com.fasterxml.jackson.core
jackson-core
-
+
+ org.apache.httpcomponents
+ httpclient
+
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java
index 74cecb7b6..9a1127764 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java
@@ -29,6 +29,7 @@ import eu.dnetlib.pace.config.DedupConfig;
abstract class AbstractSparkAction implements Serializable {
protected static final int NUM_PARTITIONS = 1000;
+ protected static final int NUM_CONNECTIONS = 20;
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java
index 01065510a..88873086d 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java
@@ -95,6 +95,11 @@ public class DedupUtility {
return String.format("%s/%s/%s_simrel", basePath, actionSetId, entityType);
}
+ public static String createOpenorgsMergeRelsPath(
+ final String basePath, final String actionSetId, final String entityType) {
+ return String.format("%s/%s/%s_openorgs_mergerels", basePath, actionSetId, entityType);
+ }
+
public static String createMergeRelPath(
final String basePath, final String actionSetId, final String entityType) {
return String.format("%s/%s/%s_mergerel", basePath, actionSetId, entityType);
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java
index aa7a131e7..ff7aca627 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java
@@ -6,6 +6,7 @@ import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
@@ -19,6 +20,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
+import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@@ -34,7 +36,7 @@ public class SparkCopyOpenorgs extends AbstractSparkAction {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
- SparkCreateSimRels.class
+ SparkCopyOpenorgs.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/copyOpenorgs_parameters.json")));
parser.parseArgument(args);
@@ -72,7 +74,7 @@ public class SparkCopyOpenorgs extends AbstractSparkAction {
final Class clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity));
- filterEntities(spark, entityPath, clazz)
+ filterOpenorgs(spark, entityPath)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
@@ -80,21 +82,20 @@ public class SparkCopyOpenorgs extends AbstractSparkAction {
}
- public static Dataset filterEntities(
+ public static Dataset filterOpenorgs(
final SparkSession spark,
- final String entitiesInputPath,
- final Class clazz) {
+ final String entitiesInputPath) {
- //
- Dataset entities = spark
- .read()
- .textFile(entitiesInputPath)
- .map(
- (MapFunction) it -> {
- T entity = OBJECT_MAPPER.readValue(it, clazz);
- return entity;
- },
- Encoders.kryo(clazz));
+ JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+ Dataset entities = spark
+ .createDataset(
+ sc
+ .textFile(entitiesInputPath)
+ .map(it -> OBJECT_MAPPER.readValue(it, Organization.class))
+ .rdd(),
+ Encoders.bean(Organization.class));
+
+ entities.show();
return entities.filter(entities.col("id").contains("openorgs____"));
}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java
index d705fca6b..4bb46222e 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java
@@ -6,14 +6,13 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
-import eu.dnetlib.dhp.schema.oaf.KeyValue;
-import eu.dnetlib.dhp.schema.oaf.Qualifier;
-import eu.dnetlib.pace.config.DedupConfig;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
@@ -21,10 +20,13 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
+import eu.dnetlib.dhp.schema.oaf.KeyValue;
+import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+import eu.dnetlib.pace.config.DedupConfig;
//copy simrels (verified) from relation to the workdir in order to make them available for the deduplication
public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction {
@@ -83,17 +85,17 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction {
.textFile(relationPath)
.map(patchRelFn(), Encoders.bean(Relation.class))
.toJavaRDD()
- .filter(this::isOpenorgs) //takes only relations coming from openorgs
- .filter(this::filterOpenorgsRels) //takes only isSimilarTo relations between organizations from openorgs
- .filter(this::excludeOpenorgsMesh) //excludes relations between an organization and an openorgsmesh
- .filter(this::excludeNonOpenorgs); //excludes relations with no openorgs id involved
+ .filter(this::isOpenorgs) // takes only relations coming from openorgs
+ .filter(this::filterOpenorgsRels) // takes only isSimilarTo relations between organizations from openorgs
+ .filter(this::excludeOpenorgsMesh) // excludes relations between an organization and an openorgsmesh
+ .filter(this::excludeNonOpenorgs); // excludes relations with no openorgs id involved
- //turn openorgs isSimilarTo relations into mergerels
- JavaRDD mergeRels = rawRels.flatMap(rel -> {
+ // turn openorgs isSimilarTo relations into mergerels
+ JavaRDD mergeRelsRDD = rawRels.flatMap(rel -> {
List mergerels = new ArrayList<>();
- String openorgsId = rel.getSource().contains("openorgs____")? rel.getSource() : rel.getTarget();
- String mergedId = rel.getSource().contains("openorgs____")? rel.getTarget() : rel.getSource();
+ String openorgsId = rel.getSource().contains("openorgs____") ? rel.getSource() : rel.getTarget();
+ String mergedId = rel.getSource().contains("openorgs____") ? rel.getTarget() : rel.getSource();
mergerels.add(rel(openorgsId, mergedId, "merges", dedupConf));
mergerels.add(rel(mergedId, openorgsId, "isMergedIn", dedupConf));
@@ -101,7 +103,13 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction {
return mergerels.iterator();
});
- mergeRels.saveAsTextFile(outputPath);
+ spark
+ .createDataset(
+ mergeRelsRDD.rdd(),
+ Encoders.bean(Relation.class))
+ .write()
+ .mode(SaveMode.Append)
+ .parquet(outputPath);
}
private static MapFunction patchRelFn() {
@@ -116,7 +124,8 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction {
private boolean filterOpenorgsRels(Relation rel) {
- if (rel.getRelClass().equals("isSimilarTo") && rel.getRelType().equals("organizationOrganization") && rel.getSubRelType().equals("dedup"))
+ if (rel.getRelClass().equals("isSimilarTo") && rel.getRelType().equals("organizationOrganization")
+ && rel.getSubRelType().equals("dedup"))
return true;
return false;
}
@@ -124,7 +133,7 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction {
private boolean isOpenorgs(Relation rel) {
if (rel.getCollectedfrom() != null) {
- for (KeyValue k: rel.getCollectedfrom()) {
+ for (KeyValue k : rel.getCollectedfrom()) {
if (k.getValue().equals("OpenOrgs Database")) {
return true;
}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java
index 3ce676f84..b7f88a5f6 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java
@@ -6,13 +6,13 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
-import eu.dnetlib.dhp.schema.oaf.KeyValue;
-import eu.dnetlib.dhp.schema.oaf.Qualifier;
-import eu.dnetlib.pace.config.DedupConfig;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.graphx.Edge;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
@@ -22,97 +22,98 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
+import eu.dnetlib.dhp.schema.oaf.KeyValue;
+import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+import eu.dnetlib.pace.config.DedupConfig;
//copy simrels (verified) from relation to the workdir in order to make them available for the deduplication
public class SparkCopyOpenorgsSimRels extends AbstractSparkAction {
- private static final Logger log = LoggerFactory.getLogger(SparkCopyOpenorgsMergeRels.class);
+ private static final Logger log = LoggerFactory.getLogger(SparkCopyOpenorgsSimRels.class);
- public SparkCopyOpenorgsSimRels(ArgumentApplicationParser parser, SparkSession spark) {
- super(parser, spark);
- }
+ public SparkCopyOpenorgsSimRels(ArgumentApplicationParser parser, SparkSession spark) {
+ super(parser, spark);
+ }
- public static void main(String[] args) throws Exception {
- ArgumentApplicationParser parser = new ArgumentApplicationParser(
- IOUtils
- .toString(
- SparkCopyOpenorgsSimRels.class
- .getResourceAsStream(
- "/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json")));
- parser.parseArgument(args);
+ public static void main(String[] args) throws Exception {
+ ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ SparkCopyOpenorgsSimRels.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json")));
+ parser.parseArgument(args);
- SparkConf conf = new SparkConf();
- new SparkCopyOpenorgsSimRels(parser, getSparkSession(conf))
- .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
- }
+ SparkConf conf = new SparkConf();
+ new SparkCopyOpenorgsSimRels(parser, getSparkSession(conf))
+ .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
+ }
- @Override
- public void run(ISLookUpService isLookUpService)
- throws DocumentException, IOException, ISLookUpException {
+ @Override
+ public void run(ISLookUpService isLookUpService)
+ throws DocumentException, IOException, ISLookUpException {
- // read oozie parameters
- final String graphBasePath = parser.get("graphBasePath");
- final String actionSetId = parser.get("actionSetId");
- final String workingPath = parser.get("workingPath");
- final int numPartitions = Optional
- .ofNullable(parser.get("numPartitions"))
- .map(Integer::valueOf)
- .orElse(NUM_PARTITIONS);
+ // read oozie parameters
+ final String graphBasePath = parser.get("graphBasePath");
+ final String actionSetId = parser.get("actionSetId");
+ final String workingPath = parser.get("workingPath");
+ final int numPartitions = Optional
+ .ofNullable(parser.get("numPartitions"))
+ .map(Integer::valueOf)
+ .orElse(NUM_PARTITIONS);
- log.info("numPartitions: '{}'", numPartitions);
- log.info("graphBasePath: '{}'", graphBasePath);
- log.info("actionSetId: '{}'", actionSetId);
- log.info("workingPath: '{}'", workingPath);
+ log.info("numPartitions: '{}'", numPartitions);
+ log.info("graphBasePath: '{}'", graphBasePath);
+ log.info("actionSetId: '{}'", actionSetId);
+ log.info("workingPath: '{}'", workingPath);
- log.info("Copying OpenOrgs SimRels");
+ log.info("Copying OpenOrgs SimRels");
- final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, "organization");
+ final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, "organization");
- removeOutputDir(spark, outputPath);
+ final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation");
- final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation");
+ Dataset rawRels = spark
+ .read()
+ .textFile(relationPath)
+ .map(patchRelFn(), Encoders.bean(Relation.class))
+ .filter(this::filterOpenorgsRels);
- JavaRDD rawRels = spark
- .read()
- .textFile(relationPath)
- .map(patchRelFn(), Encoders.bean(Relation.class))
- .toJavaRDD()
- .filter(this::isOpenorgs)
- .filter(this::filterOpenorgsRels);
+ save(rawRels, outputPath, SaveMode.Append);
- save(spark.createDataset(rawRels.rdd(),Encoders.bean(Relation.class)), outputPath, SaveMode.Append);
- }
+ log.info("Copied " + rawRels.count() + " Similarity Relations");
+ }
- private static MapFunction patchRelFn() {
- return value -> {
- final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
- if (rel.getDataInfo() == null) {
- rel.setDataInfo(new DataInfo());
- }
- return rel;
- };
- }
+ private static MapFunction patchRelFn() {
+ return value -> {
+ final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
+ if (rel.getDataInfo() == null) {
+ rel.setDataInfo(new DataInfo());
+ }
+ return rel;
+ };
+ }
- private boolean filterOpenorgsRels(Relation rel) {
+ private boolean filterOpenorgsRels(Relation rel) {
- if (rel.getRelClass().equals("isSimilarTo") && rel.getRelType().equals("organizationOrganization") && rel.getSubRelType().equals("dedup"))
- return true;
- return false;
- }
+ if (rel.getRelClass().equals("isSimilarTo") && rel.getRelType().equals("organizationOrganization")
+ && rel.getSubRelType().equals("dedup") && isOpenorgs(rel))
+ return true;
+ return false;
+ }
- private boolean isOpenorgs(Relation rel) {
+ private boolean isOpenorgs(Relation rel) {
- if (rel.getCollectedfrom() != null) {
- for (KeyValue k: rel.getCollectedfrom()) {
- if (k.getValue().equals("OpenOrgs Database")) {
- return true;
- }
- }
- }
- return false;
- }
+ if (rel.getCollectedfrom() != null) {
+ for (KeyValue k : rel.getCollectedfrom()) {
+ if (k.getValue() != null && k.getValue().equals("OpenOrgs Database")) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
}
-
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java
index 319c40d8d..64a110892 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java
@@ -1,12 +1,9 @@
+
package eu.dnetlib.dhp.oa.dedup;
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.schema.common.ModelSupport;
-import eu.dnetlib.dhp.schema.oaf.*;
-import eu.dnetlib.dhp.utils.ISLookupClientFactory;
-import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
-import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
-import eu.dnetlib.pace.util.MapDocumentUtil;
+import java.io.IOException;
+import java.util.Optional;
+
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -19,92 +16,95 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-import java.io.IOException;
-import java.util.Optional;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.schema.common.ModelSupport;
+import eu.dnetlib.dhp.schema.oaf.*;
+import eu.dnetlib.dhp.utils.ISLookupClientFactory;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+import eu.dnetlib.pace.util.MapDocumentUtil;
+import scala.Tuple2;
public class SparkCopyRelationsNoOpenorgs extends AbstractSparkAction {
- private static final Logger log = LoggerFactory.getLogger(SparkUpdateEntity.class);
+ private static final Logger log = LoggerFactory.getLogger(SparkUpdateEntity.class);
- private static final String IDJSONPATH = "$.id";
+ public SparkCopyRelationsNoOpenorgs(ArgumentApplicationParser parser, SparkSession spark) {
+ super(parser, spark);
+ }
- public SparkCopyRelationsNoOpenorgs(ArgumentApplicationParser parser, SparkSession spark) {
- super(parser, spark);
- }
+ public static void main(String[] args) throws Exception {
+ ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ SparkCopyRelationsNoOpenorgs.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json")));
+ parser.parseArgument(args);
- public static void main(String[] args) throws Exception {
- ArgumentApplicationParser parser = new ArgumentApplicationParser(
- IOUtils
- .toString(
- SparkCopyRelationsNoOpenorgs.class
- .getResourceAsStream(
- "/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json")));
- parser.parseArgument(args);
+ SparkConf conf = new SparkConf();
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.registerKryoClasses(ModelSupport.getOafModelClasses());
- SparkConf conf = new SparkConf();
- conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- conf.registerKryoClasses(ModelSupport.getOafModelClasses());
+ new SparkUpdateEntity(parser, getSparkSession(conf))
+ .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
+ }
- new SparkUpdateEntity(parser, getSparkSession(conf))
- .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
- }
+ public void run(ISLookUpService isLookUpService) throws IOException {
- public void run(ISLookUpService isLookUpService) throws IOException {
+ final String graphBasePath = parser.get("graphBasePath");
+ final String workingPath = parser.get("workingPath");
+ final String dedupGraphPath = parser.get("dedupGraphPath");
- final String graphBasePath = parser.get("graphBasePath");
- final String workingPath = parser.get("workingPath");
- final String dedupGraphPath = parser.get("dedupGraphPath");
+ log.info("graphBasePath: '{}'", graphBasePath);
+ log.info("workingPath: '{}'", workingPath);
+ log.info("dedupGraphPath: '{}'", dedupGraphPath);
- log.info("graphBasePath: '{}'", graphBasePath);
- log.info("workingPath: '{}'", workingPath);
- log.info("dedupGraphPath: '{}'", dedupGraphPath);
+ final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation");
+ final String outputPath = DedupUtility.createEntityPath(dedupGraphPath, "relation");
- final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+ removeOutputDir(spark, outputPath);
- final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation");
- final String outputPath = DedupUtility.createEntityPath(dedupGraphPath, "relation");
+ JavaRDD simRels = spark
+ .read()
+ .textFile(relationPath)
+ .map(patchRelFn(), Encoders.bean(Relation.class))
+ .toJavaRDD()
+ .filter(this::excludeOpenorgsRels);
- removeOutputDir(spark, outputPath);
+ spark
+ .createDataset(simRels.rdd(), Encoders.bean(Relation.class))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .json(outputPath);
- JavaRDD simRels = spark
- .read()
- .textFile(relationPath)
- .map(patchRelFn(), Encoders.bean(Relation.class))
- .toJavaRDD()
- .filter(this::excludeOpenorgsRels);
+ }
- simRels.saveAsTextFile(outputPath);
+ private static MapFunction patchRelFn() {
+ return value -> {
+ final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
+ if (rel.getDataInfo() == null) {
+ rel.setDataInfo(new DataInfo());
+ }
+ return rel;
+ };
+ }
- }
+ private boolean excludeOpenorgsRels(Relation rel) {
- private static MapFunction patchRelFn() {
- return value -> {
- final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
- if (rel.getDataInfo() == null) {
- rel.setDataInfo(new DataInfo());
- }
- return rel;
- };
- }
-
- private boolean excludeOpenorgsRels(Relation rel) {
-
- if (rel.getCollectedfrom() != null) {
- for (KeyValue k: rel.getCollectedfrom()) {
- if (k.getValue().equals("OpenOrgs Database")) {
- return false;
- }
- }
- }
- return true;
- }
+ if (rel.getCollectedfrom() != null) {
+ for (KeyValue k : rel.getCollectedfrom()) {
+ if (k.getValue().equals("OpenOrgs Database")) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java
index b3ee47bfc..a7566f2e2 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java
@@ -10,6 +10,7 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
@@ -81,7 +82,6 @@ public class SparkCreateSimRels extends AbstractSparkAction {
log.info("Creating simrels for: '{}'", subEntity);
final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity);
- removeOutputDir(spark, outputPath);
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@@ -99,13 +99,19 @@ public class SparkCreateSimRels extends AbstractSparkAction {
.createSortedBlocks(mapDocuments, dedupConf)
.repartition(numPartitions);
- // create relations by comparing only elements in the same group
- Deduper
- .computeRelations(sc, blocks, dedupConf)
- .map(t -> createSimRel(t._1(), t._2(), entity))
- .repartition(numPartitions)
- .map(r -> OBJECT_MAPPER.writeValueAsString(r))
- .saveAsTextFile(outputPath);
+ Dataset simRels = spark
+ .createDataset(
+ Deduper
+ .computeRelations(sc, blocks, dedupConf)
+ .map(t -> createSimRel(t._1(), t._2(), entity))
+ .repartition(numPartitions)
+ .rdd(),
+ Encoders.bean(Relation.class));
+
+ save(simRels, outputPath, SaveMode.Append);
+
+ log.info("Generated " + simRels.count() + " Similarity Relations");
+
}
}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java
new file mode 100644
index 000000000..3b29e1e17
--- /dev/null
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java
@@ -0,0 +1,249 @@
+
+package eu.dnetlib.dhp.oa.dedup;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Properties;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FilterFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel;
+import eu.dnetlib.dhp.schema.common.ModelSupport;
+import eu.dnetlib.dhp.schema.oaf.DataInfo;
+import eu.dnetlib.dhp.schema.oaf.Organization;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+import eu.dnetlib.dhp.utils.ISLookupClientFactory;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+import scala.Tuple2;
+
+public class SparkPrepareNewOrgs extends AbstractSparkAction {
+
+ private static final Logger log = LoggerFactory.getLogger(SparkPrepareNewOrgs.class);
+
+ public SparkPrepareNewOrgs(ArgumentApplicationParser parser, SparkSession spark) {
+ super(parser, spark);
+ }
+
+ public static void main(String[] args) throws Exception {
+ ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ SparkPrepareNewOrgs.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/oa/dedup/prepareNewOrgs_parameters.json")));
+ parser.parseArgument(args);
+
+ SparkConf conf = new SparkConf();
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.registerKryoClasses(ModelSupport.getOafModelClasses());
+
+ new SparkPrepareNewOrgs(parser, getSparkSession(conf))
+ .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
+ }
+
+ @Override
+ public void run(ISLookUpService isLookUpService) throws IOException {
+
+ final String graphBasePath = parser.get("graphBasePath");
+ final String isLookUpUrl = parser.get("isLookUpUrl");
+ final String actionSetId = parser.get("actionSetId");
+ final String workingPath = parser.get("workingPath");
+ final int numConnections = Optional
+ .ofNullable(parser.get("numConnections"))
+ .map(Integer::valueOf)
+ .orElse(NUM_CONNECTIONS);
+
+ final String apiUrl = Optional
+ .ofNullable(parser.get("apiUrl"))
+ .orElse("");
+
+ final String dbUrl = parser.get("dbUrl");
+ final String dbTable = parser.get("dbTable");
+ final String dbUser = parser.get("dbUser");
+ final String dbPwd = parser.get("dbPwd");
+
+ log.info("graphBasePath: '{}'", graphBasePath);
+ log.info("isLookUpUrl: '{}'", isLookUpUrl);
+ log.info("actionSetId: '{}'", actionSetId);
+ log.info("workingPath: '{}'", workingPath);
+ log.info("numPartitions: '{}'", numConnections);
+ log.info("apiUrl: '{}'", apiUrl);
+ log.info("dbUrl: '{}'", dbUrl);
+ log.info("dbUser: '{}'", dbUser);
+ log.info("table: '{}'", dbTable);
+ log.info("dbPwd: '{}'", "xxx");
+
+ final String entityPath = DedupUtility.createEntityPath(graphBasePath, "organization");
+ final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization");
+ final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation");
+
+ Dataset newOrgs = createNewOrgs(spark, mergeRelPath, relationPath, entityPath);
+
+ final Properties connectionProperties = new Properties();
+ connectionProperties.put("user", dbUser);
+ connectionProperties.put("password", dbPwd);
+
+ log.info("Number of New Organization created: '{}'", newOrgs.count());
+
+ newOrgs
+ .repartition(numConnections)
+ .write()
+ .mode(SaveMode.Append)
+ .jdbc(dbUrl, dbTable, connectionProperties);
+
+ if (!apiUrl.isEmpty())
+ updateSimRels(apiUrl);
+
+ }
+
+ public static Dataset createNewOrgs(
+ final SparkSession spark,
+ final String mergeRelsPath,
+ final String relationPath,
+ final String entitiesPath) {
+
+ // collect diffrels from the raw graph relations:
+ JavaPairRDD diffRels = spark
+ .read()
+ .textFile(relationPath)
+ .map(patchRelFn(), Encoders.bean(Relation.class))
+ .toJavaRDD()
+ .filter(r -> filterRels(r, "organization"))
+ // take the worst id of the diffrel:
+ .mapToPair(rel -> {
+ if (compareIds(rel.getSource(), rel.getTarget()) > 0)
+ return new Tuple2<>(rel.getSource(), "diffRel");
+ else
+ return new Tuple2<>(rel.getTarget(), "diffRel");
+ })
+ .distinct();
+ log.info("Number of DiffRels collected: '{}'", diffRels.count());
+
+ // collect entities:
+ Dataset> entities = spark
+ .read()
+ .textFile(entitiesPath)
+ .map(
+ (MapFunction>) it -> {
+ Organization entity = OBJECT_MAPPER.readValue(it, Organization.class);
+ return new Tuple2<>(entity.getId(), entity);
+ },
+ Encoders.tuple(Encoders.STRING(), Encoders.kryo(Organization.class)));
+
+ // collect mergerels and remove ids in the diffrels
+ Dataset> openorgsRels = spark
+ .createDataset(
+ spark
+ .read()
+ .load(mergeRelsPath)
+ .as(Encoders.bean(Relation.class))
+ .where("relClass == 'isMergedIn'")
+ .toJavaRDD()
+ .mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget())) //
+ .leftOuterJoin(diffRels) //
+ .filter(rel -> !rel._2()._2().isPresent())
+ .mapToPair(rel -> new Tuple2<>(rel._1(), rel._2()._1()))
+ .rdd(),
+ Encoders.tuple(Encoders.STRING(), Encoders.STRING()));
+ log.info("Number of Openorgs Relations loaded: '{}'", openorgsRels.count());
+
+ return entities
+ .joinWith(openorgsRels, entities.col("_1").equalTo(openorgsRels.col("_1")), "left")
+ .filter((FilterFunction, Tuple2>>) t -> t._2() == null)
+ // take entities not in mergerels (they are single entities, therefore are new orgs)
+ .filter(
+ (FilterFunction, Tuple2>>) t -> !t
+ ._1()
+ ._1()
+ .contains("openorgs"))
+ // exclude openorgs, don't need to propose them as new orgs
+ .map(
+ (MapFunction, Tuple2>, OrgSimRel>) r -> new OrgSimRel(
+ "",
+ r._1()._2().getOriginalId().get(0),
+ r._1()._2().getLegalname() != null ? r._1()._2().getLegalname().getValue() : "",
+ r._1()._2().getLegalshortname() != null ? r._1()._2().getLegalshortname().getValue() : "",
+ r._1()._2().getCountry() != null ? r._1()._2().getCountry().getClassid() : "",
+ r._1()._2().getWebsiteurl() != null ? r._1()._2().getWebsiteurl().getValue() : "",
+ r._1()._2().getCollectedfrom().get(0).getValue(), ""),
+ Encoders.bean(OrgSimRel.class));
+
+ }
+
+ private static String updateSimRels(final String apiUrl) throws IOException {
+
+ log.info("Updating simrels on the portal");
+
+ final HttpGet req = new HttpGet(apiUrl);
+ try (final CloseableHttpClient client = HttpClients.createDefault()) {
+ try (final CloseableHttpResponse response = client.execute(req)) {
+ return IOUtils.toString(response.getEntity().getContent());
+ }
+ }
+ }
+
+ private static MapFunction patchRelFn() {
+ return value -> {
+ final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
+ if (rel.getDataInfo() == null) {
+ rel.setDataInfo(new DataInfo());
+ }
+ return rel;
+ };
+ }
+
+ public static int compareIds(String o1, String o2) {
+ if (o1.contains("openorgs____") && o2.contains("openorgs____"))
+ return o1.compareTo(o2);
+ if (o1.contains("corda") && o2.contains("corda"))
+ return o1.compareTo(o2);
+
+ if (o1.contains("openorgs____"))
+ return -1;
+ if (o2.contains("openorgs____"))
+ return 1;
+
+ if (o1.contains("corda"))
+ return -1;
+ if (o2.contains("corda"))
+ return 1;
+
+ return o1.compareTo(o2);
+ }
+
+ private static boolean filterRels(Relation rel, String entityType) {
+
+ switch (entityType) {
+ case "result":
+ if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("resultResult")
+ && rel.getSubRelType().equals("dedup"))
+ return true;
+ break;
+ case "organization":
+ if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("organizationOrganization")
+ && rel.getSubRelType().equals("dedup"))
+ return true;
+ break;
+ default:
+ return false;
+ }
+ return false;
+ }
+
+}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java
new file mode 100644
index 000000000..cbca0b326
--- /dev/null
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java
@@ -0,0 +1,341 @@
+
+package eu.dnetlib.dhp.oa.dedup;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel;
+import eu.dnetlib.dhp.schema.common.ModelSupport;
+import eu.dnetlib.dhp.schema.oaf.DataInfo;
+import eu.dnetlib.dhp.schema.oaf.Organization;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+import eu.dnetlib.dhp.utils.ISLookupClientFactory;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+import scala.Tuple2;
+import scala.Tuple3;
+
+public class SparkPrepareOrgRels extends AbstractSparkAction {
+
+ private static final Logger log = LoggerFactory.getLogger(SparkPrepareOrgRels.class);
+
+ public SparkPrepareOrgRels(ArgumentApplicationParser parser, SparkSession spark) {
+ super(parser, spark);
+ }
+
+ public static void main(String[] args) throws Exception {
+ ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ SparkCreateSimRels.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json")));
+ parser.parseArgument(args);
+
+ SparkConf conf = new SparkConf();
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.registerKryoClasses(ModelSupport.getOafModelClasses());
+
+ new SparkPrepareOrgRels(parser, getSparkSession(conf))
+ .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
+ }
+
+ @Override
+ public void run(ISLookUpService isLookUpService) throws IOException {
+
+ final String graphBasePath = parser.get("graphBasePath");
+ final String isLookUpUrl = parser.get("isLookUpUrl");
+ final String actionSetId = parser.get("actionSetId");
+ final String workingPath = parser.get("workingPath");
+ final int numConnections = Optional
+ .ofNullable(parser.get("numConnections"))
+ .map(Integer::valueOf)
+ .orElse(NUM_CONNECTIONS);
+
+ final String dbUrl = parser.get("dbUrl");
+ final String dbTable = parser.get("dbTable");
+ final String dbUser = parser.get("dbUser");
+ final String dbPwd = parser.get("dbPwd");
+
+ log.info("graphBasePath: '{}'", graphBasePath);
+ log.info("isLookUpUrl: '{}'", isLookUpUrl);
+ log.info("actionSetId: '{}'", actionSetId);
+ log.info("workingPath: '{}'", workingPath);
+ log.info("numPartitions: '{}'", numConnections);
+ log.info("dbUrl: '{}'", dbUrl);
+ log.info("dbUser: '{}'", dbUser);
+ log.info("table: '{}'", dbTable);
+ log.info("dbPwd: '{}'", "xxx");
+
+ final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization");
+ final String entityPath = DedupUtility.createEntityPath(graphBasePath, "organization");
+ final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation");
+
+ Dataset relations = createRelations(spark, mergeRelPath, relationPath, entityPath);
+
+ final Properties connectionProperties = new Properties();
+ connectionProperties.put("user", dbUser);
+ connectionProperties.put("password", dbPwd);
+
+ relations
+ .repartition(numConnections)
+ .write()
+ .mode(SaveMode.Overwrite)
+ .jdbc(dbUrl, dbTable, connectionProperties);
+
+ }
+
+ private static boolean filterRels(Relation rel, String entityType) {
+
+ switch (entityType) {
+ case "result":
+ if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("resultResult")
+ && rel.getSubRelType().equals("dedup"))
+ return true;
+ break;
+ case "organization":
+ if (rel.getRelClass().equals("isDifferentFrom") && rel.getRelType().equals("organizationOrganization")
+ && rel.getSubRelType().equals("dedup"))
+ return true;
+ break;
+ default:
+ return false;
+ }
+ return false;
+ }
+
+ // create openorgs simrels starting from mergerels, remove the diffrels
+ public static Dataset createRelations(
+ final SparkSession spark,
+ final String mergeRelsPath,
+ final String relationPath,
+ final String entitiesPath) {
+
+ // collect diffrels from the raw graph relations: <, "diffRel">
+ JavaRDD, String>> diffRels = spark
+ .read()
+ .textFile(relationPath)
+ .map(patchRelFn(), Encoders.bean(Relation.class))
+ .toJavaRDD()
+ .filter(r -> filterRels(r, "organization"))
+ // put the best id as source of the diffrel:
+ .map(rel -> {
+ if (compareIds(rel.getSource(), rel.getTarget()) < 0)
+ return new Tuple2<>(new Tuple2<>(rel.getSource(), rel.getTarget()), "diffRel");
+ else
+ return new Tuple2<>(new Tuple2<>(rel.getTarget(), rel.getSource()), "diffRel");
+ })
+ .distinct();
+ log.info("Number of DiffRels collected: {}", diffRels.count());
+
+ // collect all the organizations
+ Dataset> entities = spark
+ .read()
+ .textFile(entitiesPath)
+ .map(
+ (MapFunction>) it -> {
+ Organization entity = OBJECT_MAPPER.readValue(it, Organization.class);
+ return new Tuple2<>(entity.getId(), entity);
+ },
+ Encoders.tuple(Encoders.STRING(), Encoders.kryo(Organization.class)));
+
+ // relations with their group (connected component id)
+ JavaRDD, String>> rawOpenorgsRels = spark
+ .read()
+ .load(mergeRelsPath)
+ .as(Encoders.bean(Relation.class))
+ .where("relClass == 'merges'")
+ .toJavaRDD()
+ .mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget()))
+ .filter(t -> !t._2().contains("openorgsmesh")) // remove openorgsmesh: they are only for dedup
+ .groupByKey()
+ .map(g -> Lists.newArrayList(g._2()))
+ .filter(l -> l.size() > 1)
+ .flatMap(l -> {
+ String groupId = "group::" + UUID.randomUUID();
+ List ids = sortIds(l); // sort IDs by type
+ List, String>> rels = new ArrayList<>();
+ String source = ids.get(0);
+ for (String target : ids) {
+ rels.add(new Tuple2<>(new Tuple2<>(source, target), groupId));
+ }
+
+ return rels.iterator();
+ });
+ log.info("Number of Raw Openorgs Relations created: {}", rawOpenorgsRels.count());
+
+ // filter out diffRels
+ JavaRDD> openorgsRels = rawOpenorgsRels
+ .union(diffRels)
+ // concatenation of source and target: