diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java new file mode 100644 index 000000000..beb4dbad7 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java @@ -0,0 +1,145 @@ + +package eu.dnetlib.dhp.oa.dedup; + +import java.io.IOException; +import java.util.Optional; +import java.util.Properties; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import scala.Tuple2; + +public class SparkPrepareNewOrgs extends AbstractSparkAction { + + private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class); + + public SparkPrepareNewOrgs(ArgumentApplicationParser parser, SparkSession spark) { + super(parser, spark); + } + + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkPrepareNewOrgs.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/prepareNewOrgs_parameters.json"))); + parser.parseArgument(args); + + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + + new SparkPrepareNewOrgs(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } + + @Override + public void run(ISLookUpService isLookUpService) throws IOException { + + final String graphBasePath = parser.get("graphBasePath"); + final String isLookUpUrl = parser.get("isLookUpUrl"); + final String actionSetId = parser.get("actionSetId"); + final String workingPath = parser.get("workingPath"); + final int numConnections = Optional + .ofNullable(parser.get("numConnections")) + .map(Integer::valueOf) + .orElse(NUM_CONNECTIONS); + + final String dbUrl = parser.get("dbUrl"); + final String dbTable = parser.get("dbTable"); + final String dbUser = parser.get("dbUser"); + final String dbPwd = parser.get("dbPwd"); + + log.info("graphBasePath: '{}'", graphBasePath); + log.info("isLookUpUrl: '{}'", isLookUpUrl); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); + log.info("numPartitions: '{}'", numConnections); + log.info("dbUrl: '{}'", dbUrl); + log.info("dbUser: '{}'", dbUser); + log.info("table: '{}'", dbTable); + log.info("dbPwd: '{}'", "xxx"); + + final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization"); + final String entityPath = DedupUtility.createEntityPath(graphBasePath, "organization"); + + Dataset newOrgs = createNewOrgs(spark, mergeRelPath, entityPath); + + final Properties connectionProperties = new Properties(); + connectionProperties.put("user", dbUser); + connectionProperties.put("password", dbPwd); + + newOrgs + .repartition(numConnections) + .write() + .mode(SaveMode.Overwrite) + .jdbc(dbUrl, dbTable, connectionProperties); + + } + + public static Dataset createNewOrgs( + final SparkSession spark, + final String mergeRelsPath, + final String entitiesPath) { + + // + Dataset> entities = spark + .read() + .textFile(entitiesPath) + .map( + (MapFunction>) it -> { + Organization entity = OBJECT_MAPPER.readValue(it, Organization.class); + return new Tuple2<>(entity.getId(), entity); + }, + Encoders.tuple(Encoders.STRING(), Encoders.kryo(Organization.class))); + + Dataset> mergerels = spark + .createDataset( + spark + .read() + .load(mergeRelsPath) + .as(Encoders.bean(Relation.class)) + .where("relClass == 'isMergedIn'") + .toJavaRDD() + .mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget())) + .rdd(), + Encoders.tuple(Encoders.STRING(), Encoders.STRING())); + + return entities + .joinWith(mergerels, entities.col("_1").equalTo(mergerels.col("_1")), "left") + .filter((FilterFunction, Tuple2>>) t -> t._2() == null) + .filter( + (FilterFunction, Tuple2>>) t -> !t + ._1() + ._1() + .contains("openorgs")) + .map( + (MapFunction, Tuple2>, OrgSimRel>) r -> new OrgSimRel( + "", + r._1()._2().getOriginalId().get(0), + r._1()._2().getLegalname() != null ? r._1()._2().getLegalname().getValue() : "", + r._1()._2().getLegalshortname() != null ? r._1()._2().getLegalshortname().getValue() : "", + r._1()._2().getCountry() != null ? r._1()._2().getCountry().getClassid() : "", + r._1()._2().getWebsiteurl() != null ? r._1()._2().getWebsiteurl().getValue() : "", + r._1()._2().getCollectedfrom().get(0).getValue()), + Encoders.bean(OrgSimRel.class)); + + } + +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java index d6c548de3..5e0081d72 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java @@ -1,10 +1,11 @@ package eu.dnetlib.dhp.oa.dedup; -import static jdk.nashorn.internal.objects.NativeDebug.map; - import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; import org.apache.commons.io.IOUtils; import org.apache.http.client.methods.CloseableHttpResponse; @@ -12,26 +13,20 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; -import org.dom4j.DocumentException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import eu.dnetlib.pace.config.DedupConfig; import scala.Tuple2; public class SparkPrepareOrgRels extends AbstractSparkAction { diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index ae5bf9252..699039c99 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -28,8 +28,7 @@ public class SparkPropagateRelation extends AbstractSparkAction { SOURCE, TARGET } - public SparkPropagateRelation(ArgumentApplicationParser parser, SparkSession spark) - throws Exception { + public SparkPropagateRelation(ArgumentApplicationParser parser, SparkSession spark) throws Exception { super(parser, spark); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/neworgs/oozie_app/config-default.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/neworgs/oozie_app/config-default.xml new file mode 100644 index 000000000..2e0ed9aee --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/neworgs/oozie_app/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/neworgs/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/neworgs/oozie_app/workflow.xml new file mode 100644 index 000000000..9bfdaaebd --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/neworgs/oozie_app/workflow.xml @@ -0,0 +1,208 @@ + + + + graphBasePath + the raw graph base path + + + isLookUpUrl + the address of the lookUp service + + + actionSetId + id of the actionSet + + + workingPath + path for the working directory + + + dedupGraphPath + path for the output graph + + + cutConnectedComponent + max number of elements in a connected component + + + dbUrl + the url of the database + + + dbUser + the user of the database + + + dbTable + the name of the table in the database + + + dbPwd + the passowrd of the user of the database + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + -pb + ${graphBasePath}/relation + ${workingPath}/${actionSetId}/organization_simrel + + + + + + + + yarn + cluster + Create Similarity Relations + eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphBasePath${graphBasePath} + --isLookUpUrl${isLookUpUrl} + --actionSetId${actionSetId} + --workingPath${workingPath} + --numPartitions8000 + + + + + + + + yarn + cluster + Create Merge Relations + eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphBasePath${graphBasePath} + --workingPath${workingPath} + --isLookUpUrl${isLookUpUrl} + --actionSetId${actionSetId} + --cutConnectedComponent${cutConnectedComponent} + + + + + + + + yarn + cluster + Prepare New Organizations + eu.dnetlib.dhp.oa.dedup.SparkPrepareNewOrgs + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphBasePath${graphBasePath} + --workingPath${workingPath} + --isLookUpUrl${isLookUpUrl} + --actionSetId${actionSetId} + --dbUrl${dbUrl} + --dbTable${dbTable} + --dbUser${dbUser} + --dbPwd${dbPwd} + --numConnections20 + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareNewOrgs_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareNewOrgs_parameters.json new file mode 100644 index 000000000..2119cbc3a --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareNewOrgs_parameters.json @@ -0,0 +1,56 @@ +[ + { + "paramName": "i", + "paramLongName": "graphBasePath", + "paramDescription": "the base path of raw graph", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workingPath", + "paramDescription": "the working directory path", + "paramRequired": true + }, + { + "paramName": "la", + "paramLongName": "isLookUpUrl", + "paramDescription": "the url of the lookup service", + "paramRequired": true + }, + { + "paramName": "asi", + "paramLongName": "actionSetId", + "paramDescription": "the id of the actionset (orchestrator)", + "paramRequired": true + }, + { + "paramName": "nc", + "paramLongName": "numConnections", + "paramDescription": "number of connections to the postgres db (for the write operation)", + "paramRequired": false + }, + { + "paramName": "du", + "paramLongName": "dbUrl", + "paramDescription": "the url of the database", + "paramRequired": true + }, + { + "paramName": "dusr", + "paramLongName": "dbUser", + "paramDescription": "the user of the database", + "paramRequired": true + }, + { + "paramName": "t", + "paramLongName": "dbTable", + "paramDescription": "the name of the table in the database", + "paramRequired": true + }, + { + "paramName": "dpwd", + "paramLongName": "dbPwd", + "paramDescription": "the password for the user of the database", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index d9b5e30eb..6516eda52 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -223,11 +223,11 @@ public class SparkDedupTest implements Serializable { }); new SparkCollectSimRels( - parser, - spark, - spark.read().load(testDedupAssertionsBasePath + "/similarity_groups"), - spark.read().load(testDedupAssertionsBasePath + "/groups")) - .run(isLookUpService); + parser, + spark, + spark.read().load(testDedupAssertionsBasePath + "/similarity_groups"), + spark.read().load(testDedupAssertionsBasePath + "/groups")) + .run(isLookUpService); long orgs_simrel = spark .read()