diff --git a/dhp-workflows/dhp-dedup-openaire/pom.xml b/dhp-workflows/dhp-dedup-openaire/pom.xml index 04e158542..52cc149a9 100644 --- a/dhp-workflows/dhp-dedup-openaire/pom.xml +++ b/dhp-workflows/dhp-dedup-openaire/pom.xml @@ -94,6 +94,12 @@ org.apache.httpcomponents httpclient + + com.h2database + h2 + 1.4.200 + test + diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCollectSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCollectSimRels.java deleted file mode 100644 index f9e6448b0..000000000 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCollectSimRels.java +++ /dev/null @@ -1,184 +0,0 @@ - -package eu.dnetlib.dhp.oa.dedup; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.sql.*; -import org.dom4j.DocumentException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.DataInfo; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import eu.dnetlib.pace.config.DedupConfig; -import scala.Tuple2; - -public class SparkCollectSimRels extends AbstractSparkAction { - - private static final Logger log = LoggerFactory.getLogger(SparkCollectSimRels.class); - - Dataset simGroupsDS; - Dataset groupsDS; - - public SparkCollectSimRels(ArgumentApplicationParser parser, SparkSession spark, Dataset simGroupsDS, - Dataset groupsDS) { - super(parser, spark); - this.simGroupsDS = simGroupsDS; - this.groupsDS = groupsDS; - } - - public static void main(String[] args) throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkBlockStats.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json"))); - parser.parseArgument(args); - - SparkConf conf = new SparkConf(); - - final String dbUrl = parser.get("postgresUrl"); - final String dbUser = parser.get("postgresUser"); - final String dbPassword = parser.get("postgresPassword"); - - SparkSession spark = getSparkSession(conf); - - DataFrameReader readOptions = spark - .read() - .format("jdbc") - .option("url", dbUrl) - .option("user", dbUser) - .option("password", dbPassword); - - new SparkCollectSimRels( - parser, - spark, - readOptions.option("dbtable", "similarity_groups").load(), - readOptions.option("dbtable", "groups").load()) - .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); - } - - @Override - void run(ISLookUpService isLookUpService) throws DocumentException, ISLookUpException, IOException { - - // read oozie parameters - final String isLookUpUrl = parser.get("isLookUpUrl"); - final String actionSetId = parser.get("actionSetId"); - final String workingPath = parser.get("workingPath"); - final int numPartitions = Optional - .ofNullable(parser.get("numPartitions")) - .map(Integer::valueOf) - .orElse(NUM_PARTITIONS); - final String dbUrl = parser.get("postgresUrl"); - final String dbUser = parser.get("postgresUser"); - - log.info("numPartitions: '{}'", numPartitions); - log.info("isLookUpUrl: '{}'", isLookUpUrl); - log.info("actionSetId: '{}'", actionSetId); - log.info("workingPath: '{}'", workingPath); - log.info("postgresUser: {}", dbUser); - log.info("postgresUrl: {}", dbUrl); - log.info("postgresPassword: xxx"); - - JavaPairRDD> similarityGroup = simGroupsDS - .toJavaRDD() - .mapToPair(r -> new Tuple2<>(r.getString(0), r.getString(1))) - .groupByKey() - .mapToPair( - i -> new Tuple2<>(i._1(), StreamSupport - .stream(i._2().spliterator(), false) - .collect(Collectors.toList()))); - - JavaPairRDD groupIds = groupsDS - .toJavaRDD() - .mapToPair(r -> new Tuple2<>(r.getString(0), r.getString(1))); - - JavaRDD, List>> groups = similarityGroup - .leftOuterJoin(groupIds) - .filter(g -> g._2()._2().isPresent()) - .map(g -> new Tuple2<>(new Tuple2<>(g._1(), g._2()._2().get()), g._2()._1())); - - JavaRDD relations = groups.flatMap(g -> { - String firstId = g._2().get(0); - List rels = new ArrayList<>(); - - for (String id : g._2()) { - if (!firstId.equals(id)) - rels.add(createSimRel(firstId, id, g._1()._2())); - } - - return rels.iterator(); - }); - - Dataset resultRelations = spark - .createDataset( - relations.filter(r -> r.getRelType().equals("resultResult")).rdd(), - Encoders.bean(Relation.class)) - .repartition(numPartitions); - - Dataset organizationRelations = spark - .createDataset( - relations.filter(r -> r.getRelType().equals("organizationOrganization")).rdd(), - Encoders.bean(Relation.class)) - .repartition(numPartitions); - - for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) { - switch (dedupConf.getWf().getSubEntityValue()) { - case "organization": - savePostgresRelation(organizationRelations, workingPath, actionSetId, "organization"); - break; - default: - savePostgresRelation( - resultRelations, workingPath, actionSetId, dedupConf.getWf().getSubEntityValue()); - break; - } - } - - } - - private Relation createSimRel(String source, String target, String entity) { - final Relation r = new Relation(); - r.setSubRelType("dedupSimilarity"); - r.setRelClass("isSimilarTo"); - r.setDataInfo(new DataInfo()); - - switch (entity) { - case "result": - r.setSource("50|" + source); - r.setTarget("50|" + target); - r.setRelType("resultResult"); - break; - case "organization": - r.setSource("20|" + source); - r.setTarget("20|" + target); - r.setRelType("organizationOrganization"); - break; - default: - throw new IllegalArgumentException("unmanaged entity type: " + entity); - } - return r; - } - - private void savePostgresRelation(Dataset newRelations, String workingPath, String actionSetId, - String entityType) { - newRelations - .write() - .mode(SaveMode.Append) - .parquet(DedupUtility.createSimRelPath(workingPath, actionSetId, entityType)); - } - -} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java index 8cffacd7e..dbcd40289 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java @@ -9,6 +9,7 @@ import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.ForeachFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.graphx.Edge; import org.apache.spark.rdd.RDD; diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json deleted file mode 100644 index da1011371..000000000 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json +++ /dev/null @@ -1,44 +0,0 @@ -[ - { - "paramName": "la", - "paramLongName": "isLookUpUrl", - "paramDescription": "address for the LookUp", - "paramRequired": true - }, - { - "paramName": "asi", - "paramLongName": "actionSetId", - "paramDescription": "action set identifier (name of the orchestrator)", - "paramRequired": true - }, - { - "paramName": "w", - "paramLongName": "workingPath", - "paramDescription": "path of the working directory", - "paramRequired": true - }, - { - "paramName": "np", - "paramLongName": "numPartitions", - "paramDescription": "number of partitions for the similarity relations intermediate phases", - "paramRequired": false - }, - { - "paramName": "purl", - "paramLongName": "postgresUrl", - "paramDescription": "the url of the postgres server", - "paramRequired": true - }, - { - "paramName": "pusr", - "paramLongName": "postgresUser", - "paramDescription": "the owner of the postgres database", - "paramRequired": true - }, - { - "paramName": "ppwd", - "paramLongName": "postgresPassword", - "paramDescription": "the password for the postgres user", - "paramRequired": true - } -] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/neworgs/oozie_app/config-default.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/neworgs/oozie_app/config-default.xml deleted file mode 100644 index 2e0ed9aee..000000000 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/neworgs/oozie_app/config-default.xml +++ /dev/null @@ -1,18 +0,0 @@ - - - jobTracker - yarnRM - - - nameNode - hdfs://nameservice1 - - - oozie.use.system.libpath - true - - - oozie.action.sharelib.for.spark - spark2 - - \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/neworgs/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/neworgs/oozie_app/workflow.xml deleted file mode 100644 index 9bfdaaebd..000000000 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/neworgs/oozie_app/workflow.xml +++ /dev/null @@ -1,208 +0,0 @@ - - - - graphBasePath - the raw graph base path - - - isLookUpUrl - the address of the lookUp service - - - actionSetId - id of the actionSet - - - workingPath - path for the working directory - - - dedupGraphPath - path for the output graph - - - cutConnectedComponent - max number of elements in a connected component - - - dbUrl - the url of the database - - - dbUser - the user of the database - - - dbTable - the name of the table in the database - - - dbPwd - the passowrd of the user of the database - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - oozieActionShareLibForSpark2 - oozie action sharelib for spark 2.* - - - spark2ExtraListeners - com.cloudera.spark.lineage.NavigatorAppListener - spark 2.* extra listeners classname - - - spark2SqlQueryExecutionListeners - com.cloudera.spark.lineage.NavigatorQueryListener - spark 2.* sql query execution listeners classname - - - spark2YarnHistoryServerAddress - spark 2.* yarn history server address - - - spark2EventLogDir - spark 2.* event log dir location - - - - - ${jobTracker} - ${nameNode} - - - mapreduce.job.queuename - ${queueName} - - - oozie.launcher.mapred.job.queue.name - ${oozieLauncherQueueName} - - - oozie.action.sharelib.for.spark - ${oozieActionShareLibForSpark2} - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - - - - - -pb - ${graphBasePath}/relation - ${workingPath}/${actionSetId}/organization_simrel - - - - - - - - yarn - cluster - Create Similarity Relations - eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels - dhp-dedup-openaire-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - - --graphBasePath${graphBasePath} - --isLookUpUrl${isLookUpUrl} - --actionSetId${actionSetId} - --workingPath${workingPath} - --numPartitions8000 - - - - - - - - yarn - cluster - Create Merge Relations - eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels - dhp-dedup-openaire-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - - --graphBasePath${graphBasePath} - --workingPath${workingPath} - --isLookUpUrl${isLookUpUrl} - --actionSetId${actionSetId} - --cutConnectedComponent${cutConnectedComponent} - - - - - - - - yarn - cluster - Prepare New Organizations - eu.dnetlib.dhp.oa.dedup.SparkPrepareNewOrgs - dhp-dedup-openaire-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - - --graphBasePath${graphBasePath} - --workingPath${workingPath} - --isLookUpUrl${isLookUpUrl} - --actionSetId${actionSetId} - --dbUrl${dbUrl} - --dbTable${dbTable} - --dbUser${dbUser} - --dbPwd${dbPwd} - --numConnections20 - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/config-default.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/config-default.xml deleted file mode 100644 index 2e0ed9aee..000000000 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/config-default.xml +++ /dev/null @@ -1,18 +0,0 @@ - - - jobTracker - yarnRM - - - nameNode - hdfs://nameservice1 - - - oozie.use.system.libpath - true - - - oozie.action.sharelib.for.spark - spark2 - - \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/workflow.xml deleted file mode 100644 index e7c95ee8d..000000000 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/workflow.xml +++ /dev/null @@ -1,240 +0,0 @@ - - - - graphBasePath - the raw graph base path - - - isLookUpUrl - the address of the lookUp service - - - actionSetId - id of the actionSet - - - workingPath - path for the working directory - - - dedupGraphPath - path for the output graph - - - cutConnectedComponent - max number of elements in a connected component - - - dbUrl - the url of the database - - - dbUser - the user of the database - - - dbTable - the name of the table in the database - - - dbPwd - the passowrd of the user of the database - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - oozieActionShareLibForSpark2 - oozie action sharelib for spark 2.* - - - spark2ExtraListeners - com.cloudera.spark.lineage.NavigatorAppListener - spark 2.* extra listeners classname - - - spark2SqlQueryExecutionListeners - com.cloudera.spark.lineage.NavigatorQueryListener - spark 2.* sql query execution listeners classname - - - spark2YarnHistoryServerAddress - spark 2.* yarn history server address - - - spark2EventLogDir - spark 2.* event log dir location - - - - - ${jobTracker} - ${nameNode} - - - mapreduce.job.queuename - ${queueName} - - - oozie.launcher.mapred.job.queue.name - ${oozieLauncherQueueName} - - - oozie.action.sharelib.for.spark - ${oozieActionShareLibForSpark2} - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - - - - - -pb - /tmp/graph_openorgs_and_corda/relation - ${workingPath}/${actionSetId}/organization_simrel - - - - - - - - yarn - cluster - Create Similarity Relations - eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels - dhp-dedup-openaire-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - - --graphBasePath${graphBasePath} - --isLookUpUrl${isLookUpUrl} - --actionSetId${actionSetId} - --workingPath${workingPath} - --numPartitions8000 - - - - - - - - yarn - cluster - Create Merge Relations - eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels - dhp-dedup-openaire-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - - --graphBasePath${graphBasePath} - --workingPath${workingPath} - --isLookUpUrl${isLookUpUrl} - --actionSetId${actionSetId} - --cutConnectedComponent${cutConnectedComponent} - - - - - - - - yarn - cluster - Prepare Organization Relations - eu.dnetlib.dhp.oa.dedup.SparkPrepareOrgRels - dhp-dedup-openaire-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - - --graphBasePath${graphBasePath} - --workingPath${workingPath} - --isLookUpUrl${isLookUpUrl} - --actionSetId${actionSetId} - --dbUrl${dbUrl} - --dbTable${dbTable} - --dbUser${dbUser} - --dbPwd${dbPwd} - --numConnections20 - - - - - - - - yarn - cluster - Prepare New Organizations - eu.dnetlib.dhp.oa.dedup.SparkPrepareNewOrgs - dhp-dedup-openaire-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - - --graphBasePath${graphBasePath} - --workingPath${workingPath} - --isLookUpUrl${isLookUpUrl} - --actionSetId${actionSetId} - --apiUrl${apiUrl} - --dbUrl${dbUrl} - --dbTable${dbTable} - --dbUser${dbUser} - --dbPwd${dbPwd} - --numConnections20 - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index 33da45feb..851e72dee 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -204,76 +204,8 @@ public class SparkDedupTest implements Serializable { assertEquals(6750, orp_simrel); } - @Disabled @Test @Order(2) - public void collectSimRelsTest() throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCollectSimRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json"))); - parser - .parseArgument( - new String[] { - "-asi", testActionSetId, - "-la", "lookupurl", - "-w", testOutputBasePath, - "-np", "50", - "-purl", "jdbc:postgresql://localhost:5432/dnet_dedup", - "-pusr", "postgres_user", - "-ppwd", "" - }); - - new SparkCollectSimRels( - parser, - spark, - spark.read().load(testDedupAssertionsBasePath + "/similarity_groups"), - spark.read().load(testDedupAssertionsBasePath + "/groups")) - .run(isLookUpService); - - long orgs_simrel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel") - .count(); - - long pubs_simrel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel") - .count(); - - long sw_simrel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/software_simrel") - .count(); - - long ds_simrel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel") - .count(); - - long orp_simrel = spark - .read() - .json(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel") - .count(); - -// System.out.println("orgs_simrel = " + orgs_simrel); -// System.out.println("pubs_simrel = " + pubs_simrel); -// System.out.println("sw_simrel = " + sw_simrel); -// System.out.println("ds_simrel = " + ds_simrel); -// System.out.println("orp_simrel = " + orp_simrel); - - assertEquals(3672, orgs_simrel); - assertEquals(10459, pubs_simrel); - assertEquals(3767, sw_simrel); - assertEquals(3865, ds_simrel); - assertEquals(10173, orp_simrel); - - } - - @Test - @Order(3) public void cutMergeRelsTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -369,7 +301,7 @@ public class SparkDedupTest implements Serializable { } @Test - @Order(4) + @Order(3) public void createMergeRelsTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -424,7 +356,7 @@ public class SparkDedupTest implements Serializable { } @Test - @Order(5) + @Order(4) public void createDedupRecordTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -471,7 +403,7 @@ public class SparkDedupTest implements Serializable { } @Test - @Order(6) + @Order(5) public void updateEntityTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -587,7 +519,7 @@ public class SparkDedupTest implements Serializable { } @Test - @Order(7) + @Order(6) public void propagateRelationTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -637,7 +569,7 @@ public class SparkDedupTest implements Serializable { } @Test - @Order(8) + @Order(7) public void testRelations() throws Exception { testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10); testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2); diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsDedupTest.java new file mode 100644 index 000000000..f33eca57f --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsDedupTest.java @@ -0,0 +1,408 @@ + +package eu.dnetlib.dhp.oa.dedup; + +import static java.nio.file.Files.createTempDirectory; + +import static org.apache.spark.sql.functions.count; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.lenient; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.ForeachFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.util.CollectionsUtils; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.platform.commons.util.StringUtils; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.OafMapperUtils; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.utils.DHPUtils; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.util.MapDocumentUtil; +import scala.Tuple2; + +@ExtendWith(MockitoExtension.class) +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class SparkOpenorgsDedupTest implements Serializable { + + private static String dbUrl = "jdbc:h2:mem:openorgs_test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false"; + private static String dbUser = "sa"; + private static String dbTable = "tmp_dedup_events"; + private static String dbPwd = ""; + + @Mock(serializable = true) + ISLookUpService isLookUpService; + + protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + private static SparkSession spark; + private static JavaSparkContext jsc; + + private static String testGraphBasePath; + private static String testOutputBasePath; + private static String testDedupGraphBasePath; + private static final String testActionSetId = "test-orchestrator-openorgs"; + + @BeforeAll + public static void cleanUp() throws IOException, URISyntaxException { + + testGraphBasePath = Paths + .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/openorgs_dedup").toURI()) + .toFile() + .getAbsolutePath(); + testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") + .toAbsolutePath() + .toString(); + testDedupGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") + .toAbsolutePath() + .toString(); + + FileUtils.deleteDirectory(new File(testOutputBasePath)); + FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); + + final SparkConf conf = new SparkConf(); + conf.set("spark.sql.shuffle.partitions", "200"); + spark = SparkSession + .builder() + .appName(SparkDedupTest.class.getSimpleName()) + .master("local[*]") + .config(conf) + .getOrCreate(); + + jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + } + + @BeforeEach + public void setUp() throws IOException, ISLookUpException { + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId))) + .thenReturn( + IOUtils + .toString( + SparkDedupTest.class + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_openorgs.xml"))); + + lenient() + .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization"))) + .thenReturn( + IOUtils + .toString( + SparkDedupTest.class + .getResourceAsStream( + "/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))); + } + + @Test + @Order(1) + public void createSimRelsTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); + + parser + .parseArgument( + new String[] { + "-i", testGraphBasePath, + "-asi", testActionSetId, + "-la", "lookupurl", + "-w", testOutputBasePath, + "-np", "50" + }); + + new SparkCreateSimRels(parser, spark).run(isLookUpService); + + long orgs_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization")) + .count(); + + assertEquals(288, orgs_simrel); + } + + @Test + @Order(2) + public void copyOpenorgsSimRels() throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCopyOpenorgsSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json"))); + parser + .parseArgument( + new String[] { + "-i", testGraphBasePath, + "-asi", testActionSetId, + "-w", testOutputBasePath, + "-la", "lookupurl", + "-np", "50" + }); + + new SparkCopyOpenorgsSimRels(parser, spark).run(isLookUpService); + + long orgs_simrel = spark + .read() + .load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization")) + .count(); + + assertEquals(324, orgs_simrel); + } + + @Test + @Order(3) + public void createMergeRelsTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateMergeRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); + + parser + .parseArgument( + new String[] { + "-i", + testGraphBasePath, + "-asi", + testActionSetId, + "-la", + "lookupurl", + "-w", + testOutputBasePath + }); + + new SparkCreateMergeRels(parser, spark).run(isLookUpService); + + long orgs_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") + .count(); + assertEquals(132, orgs_mergerel); + + // verify that a DiffRel is in the mergerels (to be sure that the job supposed to remove them has something to + // do) + List diffRels = jsc + .textFile(DedupUtility.createEntityPath(testGraphBasePath, "relation")) + .map(s -> OBJECT_MAPPER.readValue(s, Relation.class)) + .filter(r -> r.getRelClass().equals("isDifferentFrom")) + .map(r -> r.getTarget()) + .collect(); + assertEquals(18, diffRels.size()); + + List mergeRels = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") + .as(Encoders.bean(Relation.class)) + .toJavaRDD() + .map(r -> r.getTarget()) + .collect(); + assertFalse(Collections.disjoint(mergeRels, diffRels)); + + } + + @Test + @Order(4) + public void prepareOrgRelsTest() throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json"))); + parser + .parseArgument( + new String[] { + "-i", + testGraphBasePath, + "-asi", + testActionSetId, + "-la", + "lookupurl", + "-w", + testOutputBasePath, + "-du", + dbUrl, + "-dusr", + dbUser, + "-t", + dbTable, + "-dpwd", + dbPwd + }); + + new SparkPrepareOrgRels(parser, spark).run(isLookUpService); + + final Properties connectionProperties = new Properties(); + connectionProperties.put("user", dbUser); + connectionProperties.put("password", dbPwd); + + Connection connection = DriverManager.getConnection(dbUrl, connectionProperties); + + ResultSet resultSet = connection + .prepareStatement("SELECT COUNT(*) as total_rels FROM " + dbTable) + .executeQuery(); + if (resultSet.next()) { + int total_rels = resultSet.getInt("total_rels"); + assertEquals(32, total_rels); + } else + fail("No result in the sql DB"); + resultSet.close(); + + // verify the number of organizations with duplicates + ResultSet resultSet2 = connection + .prepareStatement("SELECT COUNT(DISTINCT(local_id)) as total_orgs FROM " + dbTable) + .executeQuery(); + if (resultSet2.next()) { + int total_orgs = resultSet2.getInt("total_orgs"); + assertEquals(6, total_orgs); + } else + fail("No result in the sql DB"); + resultSet2.close(); + + // verify that no DiffRel is in the DB + List diffRels = jsc + .textFile(DedupUtility.createEntityPath(testGraphBasePath, "relation")) + .map(s -> OBJECT_MAPPER.readValue(s, Relation.class)) + .filter(r -> r.getRelClass().equals("isDifferentFrom")) + .map(r -> r.getSource() + "@@@" + r.getTarget()) + .collect(); + + List dbRels = new ArrayList<>(); + ResultSet resultSet3 = connection + .prepareStatement("SELECT local_id, oa_original_id FROM " + dbTable) + .executeQuery(); + while (resultSet3.next()) { + String source = OafMapperUtils.createOpenaireId("organization", resultSet3.getString("local_id"), true); + String target = OafMapperUtils + .createOpenaireId("organization", resultSet3.getString("oa_original_id"), true); + dbRels.add(source + "@@@" + target); + } + resultSet3.close(); + assertTrue(Collections.disjoint(dbRels, diffRels)); + + connection.close(); + } + + @Test + @Order(5) + public void prepareNewOrgsTest() throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/prepareNewOrgs_parameters.json"))); + parser + .parseArgument( + new String[] { + "-i", + testGraphBasePath, + "-asi", + testActionSetId, + "-la", + "lookupurl", + "-w", + testOutputBasePath, + "-du", + dbUrl, + "-dusr", + dbUser, + "-t", + dbTable, + "-dpwd", + dbPwd + }); + + new SparkPrepareNewOrgs(parser, spark).run(isLookUpService); + + final Properties connectionProperties = new Properties(); + connectionProperties.put("user", dbUser); + connectionProperties.put("password", dbPwd); + + long orgs_in_diffrel = jsc + .textFile(DedupUtility.createEntityPath(testGraphBasePath, "relation")) + .map(s -> OBJECT_MAPPER.readValue(s, Relation.class)) + .filter(r -> r.getRelClass().equals("isDifferentFrom")) + .map(r -> r.getTarget()) + .distinct() + .count(); + + Connection connection = DriverManager.getConnection(dbUrl, connectionProperties); + + jsc + .textFile(DedupUtility.createEntityPath(testGraphBasePath, "relation")) + .map(s -> OBJECT_MAPPER.readValue(s, Relation.class)) + .filter(r -> r.getRelClass().equals("isDifferentFrom")) + .map(r -> r.getTarget()) + .distinct() + .foreach(s -> System.out.println("difforgs = " + s)); + ResultSet resultSet0 = connection + .prepareStatement("SELECT oa_original_id FROM " + dbTable + " WHERE local_id = ''") + .executeQuery(); + while (resultSet0.next()) + System.out + .println( + "dborgs = " + OafMapperUtils.createOpenaireId(20, resultSet0.getString("oa_original_id"), true)); + resultSet0.close(); + + ResultSet resultSet = connection + .prepareStatement("SELECT COUNT(*) as total_new_orgs FROM " + dbTable + " WHERE local_id = ''") + .executeQuery(); + if (resultSet.next()) { + int total_new_orgs = resultSet.getInt("total_new_orgs"); + assertEquals(orgs_in_diffrel + 1, total_new_orgs); + } else + fail("No result in the sql DB"); + resultSet.close(); + } + + @AfterAll + public static void finalCleanUp() throws IOException { + FileUtils.deleteDirectory(new File(testOutputBasePath)); + FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); + } + +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/organization/part-00000-5248a339-09c4-4aa5-83fe-4cc5405607ad-c000.txt.gz b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/organization/part-00000-5248a339-09c4-4aa5-83fe-4cc5405607ad-c000.txt.gz new file mode 100644 index 000000000..ba58d823c Binary files /dev/null and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/organization/part-00000-5248a339-09c4-4aa5-83fe-4cc5405607ad-c000.txt.gz differ diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/organization/part-00001-5248a339-09c4-4aa5-83fe-4cc5405607ad-c000.txt.gz b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/organization/part-00001-5248a339-09c4-4aa5-83fe-4cc5405607ad-c000.txt.gz new file mode 100644 index 000000000..137790bde Binary files /dev/null and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/organization/part-00001-5248a339-09c4-4aa5-83fe-4cc5405607ad-c000.txt.gz differ diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/organization/part-00002-5248a339-09c4-4aa5-83fe-4cc5405607ad-c000.txt.gz b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/organization/part-00002-5248a339-09c4-4aa5-83fe-4cc5405607ad-c000.txt.gz new file mode 100644 index 000000000..6b090b9f5 Binary files /dev/null and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/organization/part-00002-5248a339-09c4-4aa5-83fe-4cc5405607ad-c000.txt.gz differ diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/relation/part-00000-94553c9f-4ae6-4db9-919d-85ddc0a60f92-c000.txt.gz b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/relation/part-00000-94553c9f-4ae6-4db9-919d-85ddc0a60f92-c000.txt.gz new file mode 100644 index 000000000..080665d22 Binary files /dev/null and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/relation/part-00000-94553c9f-4ae6-4db9-919d-85ddc0a60f92-c000.txt.gz differ diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/relation/part-00003-94553c9f-4ae6-4db9-919d-85ddc0a60f92-c000.txt.gz b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/relation/part-00003-94553c9f-4ae6-4db9-919d-85ddc0a60f92-c000.txt.gz new file mode 100644 index 000000000..71fd6b35a Binary files /dev/null and b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs_dedup/relation/part-00003-94553c9f-4ae6-4db9-919d-85ddc0a60f92-c000.txt.gz differ diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOpenOrgsForOrgsDedup.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOpenOrgsForOrgsDedup.sql index dbe0c136b..aa694c7df 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOpenOrgsForOrgsDedup.sql +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOpenOrgsForOrgsDedup.sql @@ -39,9 +39,9 @@ GROUP BY o.creation_date, o.modification_date, o.country - + UNION ALL - + SELECT 'openorgsmesh'||substring(o.id, 13)||'-'||md5(n.name) AS organizationid, n.name AS legalshortname, diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOpenOrgsSimilarityForOrgsDedup.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOpenOrgsSimilarityForOrgsDedup.sql index e509127df..7ae5eb43f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOpenOrgsSimilarityForOrgsDedup.sql +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOpenOrgsSimilarityForOrgsDedup.sql @@ -40,4 +40,4 @@ SELECT 0.99 AS trust, '' AS inferenceprovenance, 'isDifferentFrom' AS relclass -FROM oa_duplicates WHERE reltype = 'is_different' \ No newline at end of file +FROM oa_duplicates WHERE reltype = 'is_different'; \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizations.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizations.sql index 9a8f98931..42ba0cf91 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizations.sql +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizations.sql @@ -50,4 +50,4 @@ GROUP BY o.trust, d.id, d.officialname, - o.country; + o.country; \ No newline at end of file