code refactoring (useless classes and wf removed), implementation of the test for the openorgs dedup

This commit is contained in:
miconis 2021-03-29 16:10:46 +02:00
parent 2355cc4e9b
commit f446580e9f
18 changed files with 424 additions and 789 deletions

View File

@ -94,6 +94,12 @@
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.200</version>
<scope>test</scope>
</dependency>
</dependencies>

View File

@ -1,184 +0,0 @@
package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.*;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
import scala.Tuple2;
public class SparkCollectSimRels extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkCollectSimRels.class);
Dataset<Row> simGroupsDS;
Dataset<Row> groupsDS;
public SparkCollectSimRels(ArgumentApplicationParser parser, SparkSession spark, Dataset<Row> simGroupsDS,
Dataset<Row> groupsDS) {
super(parser, spark);
this.simGroupsDS = simGroupsDS;
this.groupsDS = groupsDS;
}
public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkBlockStats.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json")));
parser.parseArgument(args);
SparkConf conf = new SparkConf();
final String dbUrl = parser.get("postgresUrl");
final String dbUser = parser.get("postgresUser");
final String dbPassword = parser.get("postgresPassword");
SparkSession spark = getSparkSession(conf);
DataFrameReader readOptions = spark
.read()
.format("jdbc")
.option("url", dbUrl)
.option("user", dbUser)
.option("password", dbPassword);
new SparkCollectSimRels(
parser,
spark,
readOptions.option("dbtable", "similarity_groups").load(),
readOptions.option("dbtable", "groups").load())
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
}
@Override
void run(ISLookUpService isLookUpService) throws DocumentException, ISLookUpException, IOException {
// read oozie parameters
final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath");
final int numPartitions = Optional
.ofNullable(parser.get("numPartitions"))
.map(Integer::valueOf)
.orElse(NUM_PARTITIONS);
final String dbUrl = parser.get("postgresUrl");
final String dbUser = parser.get("postgresUser");
log.info("numPartitions: '{}'", numPartitions);
log.info("isLookUpUrl: '{}'", isLookUpUrl);
log.info("actionSetId: '{}'", actionSetId);
log.info("workingPath: '{}'", workingPath);
log.info("postgresUser: {}", dbUser);
log.info("postgresUrl: {}", dbUrl);
log.info("postgresPassword: xxx");
JavaPairRDD<String, List<String>> similarityGroup = simGroupsDS
.toJavaRDD()
.mapToPair(r -> new Tuple2<>(r.getString(0), r.getString(1)))
.groupByKey()
.mapToPair(
i -> new Tuple2<>(i._1(), StreamSupport
.stream(i._2().spliterator(), false)
.collect(Collectors.toList())));
JavaPairRDD<String, String> groupIds = groupsDS
.toJavaRDD()
.mapToPair(r -> new Tuple2<>(r.getString(0), r.getString(1)));
JavaRDD<Tuple2<Tuple2<String, String>, List<String>>> groups = similarityGroup
.leftOuterJoin(groupIds)
.filter(g -> g._2()._2().isPresent())
.map(g -> new Tuple2<>(new Tuple2<>(g._1(), g._2()._2().get()), g._2()._1()));
JavaRDD<Relation> relations = groups.flatMap(g -> {
String firstId = g._2().get(0);
List<Relation> rels = new ArrayList<>();
for (String id : g._2()) {
if (!firstId.equals(id))
rels.add(createSimRel(firstId, id, g._1()._2()));
}
return rels.iterator();
});
Dataset<Relation> resultRelations = spark
.createDataset(
relations.filter(r -> r.getRelType().equals("resultResult")).rdd(),
Encoders.bean(Relation.class))
.repartition(numPartitions);
Dataset<Relation> organizationRelations = spark
.createDataset(
relations.filter(r -> r.getRelType().equals("organizationOrganization")).rdd(),
Encoders.bean(Relation.class))
.repartition(numPartitions);
for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) {
switch (dedupConf.getWf().getSubEntityValue()) {
case "organization":
savePostgresRelation(organizationRelations, workingPath, actionSetId, "organization");
break;
default:
savePostgresRelation(
resultRelations, workingPath, actionSetId, dedupConf.getWf().getSubEntityValue());
break;
}
}
}
private Relation createSimRel(String source, String target, String entity) {
final Relation r = new Relation();
r.setSubRelType("dedupSimilarity");
r.setRelClass("isSimilarTo");
r.setDataInfo(new DataInfo());
switch (entity) {
case "result":
r.setSource("50|" + source);
r.setTarget("50|" + target);
r.setRelType("resultResult");
break;
case "organization":
r.setSource("20|" + source);
r.setTarget("20|" + target);
r.setRelType("organizationOrganization");
break;
default:
throw new IllegalArgumentException("unmanaged entity type: " + entity);
}
return r;
}
private void savePostgresRelation(Dataset<Relation> newRelations, String workingPath, String actionSetId,
String entityType) {
newRelations
.write()
.mode(SaveMode.Append)
.parquet(DedupUtility.createSimRelPath(workingPath, actionSetId, entityType));
}
}

View File

@ -9,6 +9,7 @@ import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.graphx.Edge;
import org.apache.spark.rdd.RDD;

View File

@ -1,44 +0,0 @@
[
{
"paramName": "la",
"paramLongName": "isLookUpUrl",
"paramDescription": "address for the LookUp",
"paramRequired": true
},
{
"paramName": "asi",
"paramLongName": "actionSetId",
"paramDescription": "action set identifier (name of the orchestrator)",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workingPath",
"paramDescription": "path of the working directory",
"paramRequired": true
},
{
"paramName": "np",
"paramLongName": "numPartitions",
"paramDescription": "number of partitions for the similarity relations intermediate phases",
"paramRequired": false
},
{
"paramName": "purl",
"paramLongName": "postgresUrl",
"paramDescription": "the url of the postgres server",
"paramRequired": true
},
{
"paramName": "pusr",
"paramLongName": "postgresUser",
"paramDescription": "the owner of the postgres database",
"paramRequired": true
},
{
"paramName": "ppwd",
"paramLongName": "postgresPassword",
"paramDescription": "the password for the postgres user",
"paramRequired": true
}
]

View File

@ -1,18 +0,0 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -1,208 +0,0 @@
<workflow-app name="Organization Dedup" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphBasePath</name>
<description>the raw graph base path</description>
</property>
<property>
<name>isLookUpUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>actionSetId</name>
<description>id of the actionSet</description>
</property>
<property>
<name>workingPath</name>
<description>path for the working directory</description>
</property>
<property>
<name>dedupGraphPath</name>
<description>path for the output graph</description>
</property>
<property>
<name>cutConnectedComponent</name>
<description>max number of elements in a connected component</description>
</property>
<property>
<name>dbUrl</name>
<description>the url of the database</description>
</property>
<property>
<name>dbUser</name>
<description>the user of the database</description>
</property>
<property>
<name>dbTable</name>
<description>the name of the table in the database</description>
</property>
<property>
<name>dbPwd</name>
<description>the passowrd of the user of the database</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="resetWorkingPath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="resetWorkingPath">
<fs>
<delete path="${workingPath}"/>
</fs>
<ok to="copyRelations"/>
<error to="Kill"/>
</action>
<action name="copyRelations">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>-pb</arg>
<arg>${graphBasePath}/relation</arg>
<arg>${workingPath}/${actionSetId}/organization_simrel</arg>
</distcp>
<ok to="CreateSimRel"/>
<error to="Kill"/>
</action>
<action name="CreateSimRel">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Create Similarity Relations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--numPartitions</arg><arg>8000</arg>
</spark>
<ok to="CreateMergeRel"/>
<error to="Kill"/>
</action>
<action name="CreateMergeRel">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Create Merge Relations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--cutConnectedComponent</arg><arg>${cutConnectedComponent}</arg>
</spark>
<ok to="PrepareNewOrgs"/>
<error to="Kill"/>
</action>
<action name="PrepareNewOrgs">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Prepare New Organizations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkPrepareNewOrgs</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--dbUrl</arg><arg>${dbUrl}</arg>
<arg>--dbTable</arg><arg>${dbTable}</arg>
<arg>--dbUser</arg><arg>${dbUser}</arg>
<arg>--dbPwd</arg><arg>${dbPwd}</arg>
<arg>--numConnections</arg><arg>20</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -1,18 +0,0 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -1,240 +0,0 @@
<workflow-app name="Organization Dedup" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphBasePath</name>
<description>the raw graph base path</description>
</property>
<property>
<name>isLookUpUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>actionSetId</name>
<description>id of the actionSet</description>
</property>
<property>
<name>workingPath</name>
<description>path for the working directory</description>
</property>
<property>
<name>dedupGraphPath</name>
<description>path for the output graph</description>
</property>
<property>
<name>cutConnectedComponent</name>
<description>max number of elements in a connected component</description>
</property>
<property>
<name>dbUrl</name>
<description>the url of the database</description>
</property>
<property>
<name>dbUser</name>
<description>the user of the database</description>
</property>
<property>
<name>dbTable</name>
<description>the name of the table in the database</description>
</property>
<property>
<name>dbPwd</name>
<description>the passowrd of the user of the database</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="resetWorkingPath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="resetWorkingPath">
<fs>
<delete path="${workingPath}"/>
</fs>
<ok to="copyRelations"/>
<error to="Kill"/>
</action>
<action name="copyRelations">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>-pb</arg>
<arg>/tmp/graph_openorgs_and_corda/relation</arg>
<arg>${workingPath}/${actionSetId}/organization_simrel</arg>
</distcp>
<ok to="CreateSimRel"/>
<error to="Kill"/>
</action>
<action name="CreateSimRel">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Create Similarity Relations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--numPartitions</arg><arg>8000</arg>
</spark>
<ok to="CreateMergeRel"/>
<error to="Kill"/>
</action>
<action name="CreateMergeRel">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Create Merge Relations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--cutConnectedComponent</arg><arg>${cutConnectedComponent}</arg>
</spark>
<ok to="PrepareOrgRels"/>
<error to="Kill"/>
</action>
<action name="PrepareOrgRels">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Prepare Organization Relations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkPrepareOrgRels</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--dbUrl</arg><arg>${dbUrl}</arg>
<arg>--dbTable</arg><arg>${dbTable}</arg>
<arg>--dbUser</arg><arg>${dbUser}</arg>
<arg>--dbPwd</arg><arg>${dbPwd}</arg>
<arg>--numConnections</arg><arg>20</arg>
</spark>
<ok to="PrepareNewOrgs"/>
<error to="Kill"/>
</action>
<action name="PrepareNewOrgs">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Prepare New Organizations</name>
<class>eu.dnetlib.dhp.oa.dedup.SparkPrepareNewOrgs</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--actionSetId</arg><arg>${actionSetId}</arg>
<arg>--apiUrl</arg><arg>${apiUrl}</arg>
<arg>--dbUrl</arg><arg>${dbUrl}</arg>
<arg>--dbTable</arg><arg>${dbTable}</arg>
<arg>--dbUser</arg><arg>${dbUser}</arg>
<arg>--dbPwd</arg><arg>${dbPwd}</arg>
<arg>--numConnections</arg><arg>20</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -204,76 +204,8 @@ public class SparkDedupTest implements Serializable {
assertEquals(6750, orp_simrel);
}
@Disabled
@Test
@Order(2)
public void collectSimRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCollectSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json")));
parser
.parseArgument(
new String[] {
"-asi", testActionSetId,
"-la", "lookupurl",
"-w", testOutputBasePath,
"-np", "50",
"-purl", "jdbc:postgresql://localhost:5432/dnet_dedup",
"-pusr", "postgres_user",
"-ppwd", ""
});
new SparkCollectSimRels(
parser,
spark,
spark.read().load(testDedupAssertionsBasePath + "/similarity_groups"),
spark.read().load(testDedupAssertionsBasePath + "/groups"))
.run(isLookUpService);
long orgs_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
.count();
long pubs_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel")
.count();
long sw_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/software_simrel")
.count();
long ds_simrel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel")
.count();
long orp_simrel = spark
.read()
.json(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel")
.count();
// System.out.println("orgs_simrel = " + orgs_simrel);
// System.out.println("pubs_simrel = " + pubs_simrel);
// System.out.println("sw_simrel = " + sw_simrel);
// System.out.println("ds_simrel = " + ds_simrel);
// System.out.println("orp_simrel = " + orp_simrel);
assertEquals(3672, orgs_simrel);
assertEquals(10459, pubs_simrel);
assertEquals(3767, sw_simrel);
assertEquals(3865, ds_simrel);
assertEquals(10173, orp_simrel);
}
@Test
@Order(3)
public void cutMergeRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -369,7 +301,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
@Order(4)
@Order(3)
public void createMergeRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -424,7 +356,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
@Order(5)
@Order(4)
public void createDedupRecordTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -471,7 +403,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
@Order(6)
@Order(5)
public void updateEntityTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -587,7 +519,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
@Order(7)
@Order(6)
public void propagateRelationTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -637,7 +569,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
@Order(8)
@Order(7)
public void testRelations() throws Exception {
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10);
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2);

View File

@ -0,0 +1,408 @@
package eu.dnetlib.dhp.oa.dedup;
import static java.nio.file.Files.createTempDirectory;
import static org.apache.spark.sql.functions.count;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.lenient;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.CollectionsUtils;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.platform.commons.util.StringUtils;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.OafMapperUtils;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2;
@ExtendWith(MockitoExtension.class)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class SparkOpenorgsDedupTest implements Serializable {
private static String dbUrl = "jdbc:h2:mem:openorgs_test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false";
private static String dbUser = "sa";
private static String dbTable = "tmp_dedup_events";
private static String dbPwd = "";
@Mock(serializable = true)
ISLookUpService isLookUpService;
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
private static SparkSession spark;
private static JavaSparkContext jsc;
private static String testGraphBasePath;
private static String testOutputBasePath;
private static String testDedupGraphBasePath;
private static final String testActionSetId = "test-orchestrator-openorgs";
@BeforeAll
public static void cleanUp() throws IOException, URISyntaxException {
testGraphBasePath = Paths
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/openorgs_dedup").toURI())
.toFile()
.getAbsolutePath();
testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
.toAbsolutePath()
.toString();
testDedupGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
.toAbsolutePath()
.toString();
FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
final SparkConf conf = new SparkConf();
conf.set("spark.sql.shuffle.partitions", "200");
spark = SparkSession
.builder()
.appName(SparkDedupTest.class.getSimpleName())
.master("local[*]")
.config(conf)
.getOrCreate();
jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
}
@BeforeEach
public void setUp() throws IOException, ISLookUpException {
lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId)))
.thenReturn(
IOUtils
.toString(
SparkDedupTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_openorgs.xml")));
lenient()
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization")))
.thenReturn(
IOUtils
.toString(
SparkDedupTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
}
@Test
@Order(1)
public void createSimRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCreateSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json")));
parser
.parseArgument(
new String[] {
"-i", testGraphBasePath,
"-asi", testActionSetId,
"-la", "lookupurl",
"-w", testOutputBasePath,
"-np", "50"
});
new SparkCreateSimRels(parser, spark).run(isLookUpService);
long orgs_simrel = spark
.read()
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization"))
.count();
assertEquals(288, orgs_simrel);
}
@Test
@Order(2)
public void copyOpenorgsSimRels() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCopyOpenorgsSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json")));
parser
.parseArgument(
new String[] {
"-i", testGraphBasePath,
"-asi", testActionSetId,
"-w", testOutputBasePath,
"-la", "lookupurl",
"-np", "50"
});
new SparkCopyOpenorgsSimRels(parser, spark).run(isLookUpService);
long orgs_simrel = spark
.read()
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization"))
.count();
assertEquals(324, orgs_simrel);
}
@Test
@Order(3)
public void createMergeRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCreateMergeRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
parser
.parseArgument(
new String[] {
"-i",
testGraphBasePath,
"-asi",
testActionSetId,
"-la",
"lookupurl",
"-w",
testOutputBasePath
});
new SparkCreateMergeRels(parser, spark).run(isLookUpService);
long orgs_mergerel = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
.count();
assertEquals(132, orgs_mergerel);
// verify that a DiffRel is in the mergerels (to be sure that the job supposed to remove them has something to
// do)
List<String> diffRels = jsc
.textFile(DedupUtility.createEntityPath(testGraphBasePath, "relation"))
.map(s -> OBJECT_MAPPER.readValue(s, Relation.class))
.filter(r -> r.getRelClass().equals("isDifferentFrom"))
.map(r -> r.getTarget())
.collect();
assertEquals(18, diffRels.size());
List<String> mergeRels = spark
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
.as(Encoders.bean(Relation.class))
.toJavaRDD()
.map(r -> r.getTarget())
.collect();
assertFalse(Collections.disjoint(mergeRels, diffRels));
}
@Test
@Order(4)
public void prepareOrgRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCreateSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json")));
parser
.parseArgument(
new String[] {
"-i",
testGraphBasePath,
"-asi",
testActionSetId,
"-la",
"lookupurl",
"-w",
testOutputBasePath,
"-du",
dbUrl,
"-dusr",
dbUser,
"-t",
dbTable,
"-dpwd",
dbPwd
});
new SparkPrepareOrgRels(parser, spark).run(isLookUpService);
final Properties connectionProperties = new Properties();
connectionProperties.put("user", dbUser);
connectionProperties.put("password", dbPwd);
Connection connection = DriverManager.getConnection(dbUrl, connectionProperties);
ResultSet resultSet = connection
.prepareStatement("SELECT COUNT(*) as total_rels FROM " + dbTable)
.executeQuery();
if (resultSet.next()) {
int total_rels = resultSet.getInt("total_rels");
assertEquals(32, total_rels);
} else
fail("No result in the sql DB");
resultSet.close();
// verify the number of organizations with duplicates
ResultSet resultSet2 = connection
.prepareStatement("SELECT COUNT(DISTINCT(local_id)) as total_orgs FROM " + dbTable)
.executeQuery();
if (resultSet2.next()) {
int total_orgs = resultSet2.getInt("total_orgs");
assertEquals(6, total_orgs);
} else
fail("No result in the sql DB");
resultSet2.close();
// verify that no DiffRel is in the DB
List<String> diffRels = jsc
.textFile(DedupUtility.createEntityPath(testGraphBasePath, "relation"))
.map(s -> OBJECT_MAPPER.readValue(s, Relation.class))
.filter(r -> r.getRelClass().equals("isDifferentFrom"))
.map(r -> r.getSource() + "@@@" + r.getTarget())
.collect();
List<String> dbRels = new ArrayList<>();
ResultSet resultSet3 = connection
.prepareStatement("SELECT local_id, oa_original_id FROM " + dbTable)
.executeQuery();
while (resultSet3.next()) {
String source = OafMapperUtils.createOpenaireId("organization", resultSet3.getString("local_id"), true);
String target = OafMapperUtils
.createOpenaireId("organization", resultSet3.getString("oa_original_id"), true);
dbRels.add(source + "@@@" + target);
}
resultSet3.close();
assertTrue(Collections.disjoint(dbRels, diffRels));
connection.close();
}
@Test
@Order(5)
public void prepareNewOrgsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkCreateSimRels.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/prepareNewOrgs_parameters.json")));
parser
.parseArgument(
new String[] {
"-i",
testGraphBasePath,
"-asi",
testActionSetId,
"-la",
"lookupurl",
"-w",
testOutputBasePath,
"-du",
dbUrl,
"-dusr",
dbUser,
"-t",
dbTable,
"-dpwd",
dbPwd
});
new SparkPrepareNewOrgs(parser, spark).run(isLookUpService);
final Properties connectionProperties = new Properties();
connectionProperties.put("user", dbUser);
connectionProperties.put("password", dbPwd);
long orgs_in_diffrel = jsc
.textFile(DedupUtility.createEntityPath(testGraphBasePath, "relation"))
.map(s -> OBJECT_MAPPER.readValue(s, Relation.class))
.filter(r -> r.getRelClass().equals("isDifferentFrom"))
.map(r -> r.getTarget())
.distinct()
.count();
Connection connection = DriverManager.getConnection(dbUrl, connectionProperties);
jsc
.textFile(DedupUtility.createEntityPath(testGraphBasePath, "relation"))
.map(s -> OBJECT_MAPPER.readValue(s, Relation.class))
.filter(r -> r.getRelClass().equals("isDifferentFrom"))
.map(r -> r.getTarget())
.distinct()
.foreach(s -> System.out.println("difforgs = " + s));
ResultSet resultSet0 = connection
.prepareStatement("SELECT oa_original_id FROM " + dbTable + " WHERE local_id = ''")
.executeQuery();
while (resultSet0.next())
System.out
.println(
"dborgs = " + OafMapperUtils.createOpenaireId(20, resultSet0.getString("oa_original_id"), true));
resultSet0.close();
ResultSet resultSet = connection
.prepareStatement("SELECT COUNT(*) as total_new_orgs FROM " + dbTable + " WHERE local_id = ''")
.executeQuery();
if (resultSet.next()) {
int total_new_orgs = resultSet.getInt("total_new_orgs");
assertEquals(orgs_in_diffrel + 1, total_new_orgs);
} else
fail("No result in the sql DB");
resultSet.close();
}
@AfterAll
public static void finalCleanUp() throws IOException {
FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
}
}

View File

@ -39,9 +39,9 @@ GROUP BY
o.creation_date,
o.modification_date,
o.country
UNION ALL
SELECT
'openorgsmesh'||substring(o.id, 13)||'-'||md5(n.name) AS organizationid,
n.name AS legalshortname,

View File

@ -40,4 +40,4 @@ SELECT
0.99 AS trust,
'' AS inferenceprovenance,
'isDifferentFrom' AS relclass
FROM oa_duplicates WHERE reltype = 'is_different'
FROM oa_duplicates WHERE reltype = 'is_different';

View File

@ -50,4 +50,4 @@ GROUP BY
o.trust,
d.id,
d.officialname,
o.country;
o.country;