From 4cf79f32eb2b0de4afd14ade83b43abf0d5fdef0 Mon Sep 17 00:00:00 2001 From: miconis Date: Fri, 25 Sep 2020 11:29:51 +0200 Subject: [PATCH] implementation of the oozie wf to prepare the openorgs input: relations between organizations --- .../eu/dnetlib/dhp/oa/dedup/OrgSimRel.java | 83 +++++++ .../dhp/oa/dedup/SparkCollectSimRels.java | 21 +- .../dhp/oa/dedup/SparkPrepareOrgRels.java | 168 ++++++++++++++ .../orgsdedup/oozie_app/config-default.xml | 18 ++ .../oa/dedup/orgsdedup/oozie_app/workflow.xml | 212 ++++++++++++++++++ .../oa/dedup/prepareOrgRels_parameters.json | 56 +++++ 6 files changed, 552 insertions(+), 6 deletions(-) create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/OrgSimRel.java create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/OrgSimRel.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/OrgSimRel.java new file mode 100644 index 000000000..a7d8ead0b --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/OrgSimRel.java @@ -0,0 +1,83 @@ +package eu.dnetlib.dhp.oa.dedup; + +import java.io.Serializable; + +public class OrgSimRel implements Serializable { + + String local_id; + String oa_original_id; + String oa_name; + String oa_acronym; + String oa_country; + String oa_url; + String oa_collectedfrom; + + public OrgSimRel() { + } + + public OrgSimRel(String local_id, String oa_original_id, String oa_name, String oa_acronym, String oa_country, String oa_url, String oa_collectedfrom) { + this.local_id = local_id; + this.oa_original_id = oa_original_id; + this.oa_name = oa_name; + this.oa_acronym = oa_acronym; + this.oa_country = oa_country; + this.oa_url = oa_url; + this.oa_collectedfrom = oa_collectedfrom; + } + + public String getLocal_id() { + return local_id; + } + + public void setLocal_id(String local_id) { + this.local_id = local_id; + } + + public String getOa_original_id() { + return oa_original_id; + } + + public void setOa_original_id(String oa_original_id) { + this.oa_original_id = oa_original_id; + } + + public String getOa_name() { + return oa_name; + } + + public void setOa_name(String oa_name) { + this.oa_name = oa_name; + } + + public String getOa_acronym() { + return oa_acronym; + } + + public void setOa_acronym(String oa_acronym) { + this.oa_acronym = oa_acronym; + } + + public String getOa_country() { + return oa_country; + } + + public void setOa_country(String oa_country) { + this.oa_country = oa_country; + } + + public String getOa_url() { + return oa_url; + } + + public void setOa_url(String oa_url) { + this.oa_url = oa_url; + } + + public String getOa_collectedfrom() { + return oa_collectedfrom; + } + + public void setOa_collectedfrom(String oa_collectedfrom) { + this.oa_collectedfrom = oa_collectedfrom; + } +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCollectSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCollectSimRels.java index 7c1e6550e..b35f9c54a 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCollectSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCollectSimRels.java @@ -4,16 +4,20 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.config.DedupConfig; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.*; +import org.dom4j.DocumentException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Tuple2; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -65,7 +69,7 @@ public class SparkCollectSimRels extends AbstractSparkAction { } @Override - void run(ISLookUpService isLookUpService) { + void run(ISLookUpService isLookUpService) throws DocumentException, ISLookUpException, IOException { // read oozie parameters final String isLookUpUrl = parser.get("isLookUpUrl"); @@ -126,11 +130,16 @@ public class SparkCollectSimRels extends AbstractSparkAction { Encoders.bean(Relation.class) ).repartition(numPartitions); - savePostgresRelation(organizationRelations, workingPath, actionSetId, "organization"); - savePostgresRelation(resultRelations, workingPath, actionSetId, "publication"); - savePostgresRelation(resultRelations, workingPath, actionSetId, "software"); - savePostgresRelation(resultRelations, workingPath, actionSetId, "otherresearchproduct"); - savePostgresRelation(resultRelations, workingPath, actionSetId, "dataset"); + for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) { + switch(dedupConf.getWf().getSubEntityValue()){ + case "organization": + savePostgresRelation(organizationRelations, workingPath, actionSetId, "organization"); + break; + default: + savePostgresRelation(resultRelations, workingPath, actionSetId, dedupConf.getWf().getSubEntityValue()); + break; + } + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java new file mode 100644 index 000000000..dff1bbb0d --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java @@ -0,0 +1,168 @@ +package eu.dnetlib.dhp.oa.dedup; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.config.DedupConfig; +import org.apache.commons.io.IOUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.dom4j.DocumentException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + +public class SparkPrepareOrgRels extends AbstractSparkAction { + + private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class); + + public static final String ROOT_TRUST = "0.8"; + public static final String PROVENANCE_ACTION_CLASS = "sysimport:dedup"; + public static final String PROVENANCE_ACTIONS = "dnet:provenanceActions"; + + public SparkPrepareOrgRels(ArgumentApplicationParser parser, SparkSession spark) { + super(parser, spark); + } + + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json"))); + parser.parseArgument(args); + + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + + new SparkCreateDedupRecord(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } + + @Override + public void run(ISLookUpService isLookUpService) throws IOException { + + final String graphBasePath = parser.get("graphBasePath"); + final String isLookUpUrl = parser.get("isLookUpUrl"); + final String actionSetId = parser.get("actionSetId"); + final String workingPath = parser.get("workingPath"); + final String apiUrl = parser.get("apiUrl"); + final String dbUrl = parser.get("dbUrl"); + final String dbTable = parser.get("dbTable"); + final String dbUser = parser.get("dbUser"); + final String dbPwd = parser.get("dbPwd"); + + log.info("graphBasePath: '{}'", graphBasePath); + log.info("isLookUpUrl: '{}'", isLookUpUrl); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); + log.info("apiUrl: '{}'", apiUrl); + log.info("dbUrl: '{}'", dbUrl); + log.info("dbUser: '{}'", dbUser); + log.info("table: '{}'", dbTable); + log.info("dbPwd: '{}'", "xxx"); + + final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization"); + final String entityPath = DedupUtility.createEntityPath(graphBasePath, "organization"); + + Dataset relations = createRelations(spark, mergeRelPath, entityPath); + + final Properties connectionProperties = new Properties(); + connectionProperties.put("user", dbUser); + connectionProperties.put("password", dbPwd); + + relations.write().mode(SaveMode.Overwrite).jdbc(dbUrl, dbTable, connectionProperties); + + if (!apiUrl.isEmpty()) + updateSimRels(apiUrl); + + } + + public static Dataset createRelations( + final SparkSession spark, + final String mergeRelsPath, + final String entitiesPath) { + + // + Dataset> entities = spark + .read() + .textFile(entitiesPath) + .map( + (MapFunction>) it -> { + Organization entity = OBJECT_MAPPER.readValue(it, Organization.class); + return new Tuple2<>(entity.getId(), entity); + }, + Encoders.tuple(Encoders.STRING(), Encoders.kryo(Organization.class))); + + Dataset> relations = spark.createDataset( + spark + .read() + .load(mergeRelsPath) + .as(Encoders.bean(Relation.class)) + .where("relClass == 'merges'") + .toJavaRDD() + .mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget())) + .groupByKey() + .flatMap(g -> { + List> rels = new ArrayList<>(); + for (String id1 : g._2()) { + for (String id2 : g._2()) { + if (!id1.equals(id2)) + if (id1.contains("openorgs")) + rels.add(new Tuple2<>(id1, id2)); + } + } + return rels.iterator(); + }).rdd(), + Encoders.tuple(Encoders.STRING(), Encoders.STRING())); + + return relations + .joinWith(entities, relations.col("_2").equalTo(entities.col("_1")), "inner") + .map( + (MapFunction, Tuple2>, OrgSimRel>)r -> + new OrgSimRel( + r._1()._2(), + r._2()._2().getOriginalId().get(0), + r._2()._2().getLegalname().getValue(), + r._2()._2().getLegalshortname().getValue(), + r._2()._2().getCountry().getClassid(), + r._2()._2().getWebsiteurl().getValue(), + r._2()._2().getCollectedfrom().get(0).getValue() + ), + Encoders.bean(OrgSimRel.class) + ); + + } + + private static String updateSimRels(final String apiUrl) throws IOException { + final HttpGet req = new HttpGet(apiUrl); + try (final CloseableHttpClient client = HttpClients.createDefault()) { + try (final CloseableHttpResponse response = client.execute(req)) { + return IOUtils.toString(response.getEntity().getContent()); + } + } + } + +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/config-default.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/config-default.xml new file mode 100644 index 000000000..2e0ed9aee --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/workflow.xml new file mode 100644 index 000000000..82ecbc46b --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/workflow.xml @@ -0,0 +1,212 @@ + + + + graphBasePath + the raw graph base path + + + isLookUpUrl + the address of the lookUp service + + + actionSetId + id of the actionSet + + + workingPath + path for the working directory + + + dedupGraphPath + path for the output graph + + + cutConnectedComponent + max number of elements in a connected component + + + apiUrl + the url for the APIs of the openorgs service + + + dbUrl + the url of the database + + + dbUser + the user of the database + + + dbTable + the name of the table in the database + + + dbPwd + the passowrd of the user of the database + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + yarn + cluster + Create Similarity Relations + eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphBasePath${graphBasePath} + --isLookUpUrl${isLookUpUrl} + --actionSetId${actionSetId} + --workingPath${workingPath} + --numPartitions8000 + + + + + + + + -pb + ${graphBasePath}/relation + ${workingPath}/organization_simrel + + + + + + + + yarn + cluster + Create Merge Relations + eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphBasePath${graphBasePath} + --workingPath${workingPath} + --isLookUpUrl${isLookUpUrl} + --actionSetId${actionSetId} + --cutConnectedComponent${cutConnectedComponent} + + + + + + + + yarn + cluster + Prepare Organization Relations + eu.dnetlib.dhp.oa.dedup.SparkPrepareOrgRels + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphBasePath${graphBasePath} + --workingPath${workingPath} + --isLookUpUrl${isLookUpUrl} + --actionSetId${actionSetId} + --apiUrl${apiUrl} + --dbUrl${dbUrl} + --dbTable${dbTable} + --dbUser${dbUser} + --dbPwd${dbPwd} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json new file mode 100644 index 000000000..bcca48a15 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json @@ -0,0 +1,56 @@ +[ + { + "paramName": "i", + "paramLongName": "graphBasePath", + "paramDescription": "the base path of raw graph", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workingPath", + "paramDescription": "the working directory path", + "paramRequired": true + }, + { + "paramName": "la", + "paramLongName": "isLookUpUrl", + "paramDescription": "the url of the lookup service", + "paramRequired": true + }, + { + "paramName": "asi", + "paramLongName": "actionSetId", + "paramDescription": "the id of the actionset (orchestrator)", + "paramRequired": true + }, + { + "paramName": "au", + "paramLongName": "apiUrl", + "paramDescription": "the url for the APIs of the openorgs service", + "paramRequired": true + }, + { + "paramName": "du", + "paramLongName": "dbUrl", + "paramDescription": "the url of the database", + "paramRequired": true + }, + { + "paramName": "dusr", + "paramLongName": "dbUser", + "paramDescription": "the user of the database", + "paramRequired": true + }, + { + "paramName": "t", + "paramLongName": "dbTable", + "paramDescription": "the name of the table in the database", + "paramRequired": true + }, + { + "paramName": "dpwd", + "paramLongName": "dbPwd", + "paramDescription": "the password for the user of the database", + "paramRequired": true + } +] \ No newline at end of file