From 348b0ef9217a8264f325841ee0aaf28176590abb Mon Sep 17 00:00:00 2001 From: miconis Date: Wed, 24 Mar 2021 15:51:27 +0100 Subject: [PATCH] bug fix, implementation of the workflow for the creation of raw_organizations (openorgs dedup), addition of the pid lists to the openorgs postgres db --- .../dhp/oa/dedup/AbstractSparkAction.java | 16 ++ .../dhp/oa/dedup/SparkPrepareNewOrgs.java | 10 +- .../dhp/oa/dedup/SparkPrepareOrgRels.java | 26 +- .../dnetlib/dhp/oa/dedup/model/OrgSimRel.java | 14 +- .../raw/MigrateDbEntitiesApplication.java | 7 + .../oa/graph/raw/common/MigrateAction.java | 3 +- .../oozie_app/config-default.xml | 18 ++ .../raw_organizations/oozie_app/workflow.xml | 270 ++++++++++++++++++ .../dhp/oa/graph/sql/queryOrganizations.sql | 4 +- .../sql/queryOrganizationsFromOpenOrgsDB.sql | 4 + 10 files changed, 354 insertions(+), 18 deletions(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_organizations/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_organizations/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java index 9a1127764..28f6e3107 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java @@ -6,7 +6,9 @@ import java.io.Serializable; import java.io.StringReader; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.SaveMode; @@ -31,6 +33,9 @@ abstract class AbstractSparkAction implements Serializable { protected static final int NUM_PARTITIONS = 1000; protected static final int NUM_CONNECTIONS = 20; + protected static final String TYPE_VALUE_SEPARATOR = "###"; + protected static final String SP_SEPARATOR = "@@@"; + protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); @@ -97,4 +102,15 @@ abstract class AbstractSparkAction implements Serializable { protected static void removeOutputDir(SparkSession spark, String path) { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } + + protected static String structuredPropertyListToString(List list) { + + return list + .stream() + .filter(p -> p.getQualifier() != null) + .filter(p -> StringUtils.isNotBlank(p.getQualifier().getClassid())) + .filter(p -> StringUtils.isNotBlank(p.getValue())) + .map(p -> p.getValue() + TYPE_VALUE_SEPARATOR + p.getQualifier().getClassid()) + .collect(Collectors.joining(SP_SEPARATOR)); + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java index 3b29e1e17..465f56c83 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java @@ -12,7 +12,6 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; @@ -107,8 +106,9 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction { .mode(SaveMode.Append) .jdbc(dbUrl, dbTable, connectionProperties); - if (!apiUrl.isEmpty()) - updateSimRels(apiUrl); + // TODO de-comment once finished +// if (!apiUrl.isEmpty()) +// updateSimRels(apiUrl); } @@ -181,7 +181,9 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction { r._1()._2().getLegalshortname() != null ? r._1()._2().getLegalshortname().getValue() : "", r._1()._2().getCountry() != null ? r._1()._2().getCountry().getClassid() : "", r._1()._2().getWebsiteurl() != null ? r._1()._2().getWebsiteurl().getValue() : "", - r._1()._2().getCollectedfrom().get(0).getValue(), ""), + r._1()._2().getCollectedfrom().get(0).getValue(), + "", + structuredPropertyListToString(r._1()._2().getPid())), Encoders.bean(OrgSimRel.class)); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java index cbca0b326..e2d9ae9c6 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java @@ -14,6 +14,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -209,15 +210,19 @@ public class SparkPrepareOrgRels extends AbstractSparkAction { Dataset> relations2 = relations .joinWith(entities, relations.col("_2").equalTo(entities.col("_1")), "inner") .map( - (MapFunction, Tuple2>, OrgSimRel>) r -> new OrgSimRel( - r._1()._1(), - r._2()._2().getOriginalId().get(0), - r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "", - r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "", - r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "", - r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "", - r._2()._2().getCollectedfrom().get(0).getValue(), - r._1()._3()), + (MapFunction, Tuple2>, OrgSimRel>) r -> { + + return new OrgSimRel( + r._1()._1(), + r._2()._2().getOriginalId().get(0), + r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "", + r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "", + r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "", + r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "", + r._2()._2().getCollectedfrom().get(0).getValue(), + r._1()._3(), + structuredPropertyListToString(r._2()._2().getPid())); + }, Encoders.bean(OrgSimRel.class)) .map( (MapFunction>) o -> new Tuple2<>(o.getLocal_id(), o), @@ -311,7 +316,8 @@ public class SparkPrepareOrgRels extends AbstractSparkAction { r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "", r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "", r._2()._2().getCollectedfrom().get(0).getValue(), - "group::" + r._1()._1()), + "group::" + r._1()._1(), + structuredPropertyListToString(r._2()._2().getPid())), Encoders.bean(OrgSimRel.class)) .map( (MapFunction>) o -> new Tuple2<>(o.getLocal_id(), o), diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/OrgSimRel.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/OrgSimRel.java index 65f383500..adff1ab8a 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/OrgSimRel.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/OrgSimRel.java @@ -13,12 +13,13 @@ public class OrgSimRel implements Serializable { String oa_url; String oa_collectedfrom; String group_id; + String pid_list; // separator for type-pid: "###"; separator for pids: "@@@" public OrgSimRel() { } public OrgSimRel(String local_id, String oa_original_id, String oa_name, String oa_acronym, String oa_country, - String oa_url, String oa_collectedfrom, String group_id) { + String oa_url, String oa_collectedfrom, String group_id, String pid_list) { this.local_id = local_id; this.oa_original_id = oa_original_id; this.oa_name = oa_name; @@ -27,6 +28,7 @@ public class OrgSimRel implements Serializable { this.oa_url = oa_url; this.oa_collectedfrom = oa_collectedfrom; this.group_id = group_id; + this.pid_list = pid_list; } public String getLocal_id() { @@ -93,6 +95,14 @@ public class OrgSimRel implements Serializable { this.group_id = group_id; } + public String getPid_list() { + return pid_list; + } + + public void setPid_list(String pid_list) { + this.pid_list = pid_list; + } + @Override public String toString() { return "OrgSimRel{" + @@ -103,6 +113,8 @@ public class OrgSimRel implements Serializable { ", oa_country='" + oa_country + '\'' + ", oa_url='" + oa_url + '\'' + ", oa_collectedfrom='" + oa_collectedfrom + '\'' + + ", group_id='" + group_id + '\'' + + ", pid_list='" + pid_list + '\'' + '}'; } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java index 3e5030eaa..4d7de6f7f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java @@ -164,6 +164,13 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i log.info("Processing Openorgs Merge Rels..."); smdbe.execute("querySimilarityFromOpenOrgsDB.sql", smdbe::processOrgOrgSimRels); + break; + + case openaire_organizations: + + log.info("Processing Organizations..."); + smdbe.execute("queryOrganizations.sql", smdbe::processOrganization, verifyNamespacePrefix); + break; } log.info("All done."); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/MigrateAction.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/MigrateAction.java index d9ee9bb6a..06ebeb994 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/MigrateAction.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/MigrateAction.java @@ -5,5 +5,6 @@ package eu.dnetlib.dhp.oa.graph.raw.common; public enum MigrateAction { claims, // migrate claims to the raw graph openorgs, // migrate organizations from openorgs to the raw graph - openaire // migrate openaire entities to the raw graph + openaire, // migrate openaire entities to the raw graph + openaire_organizations // migrate openaire organizations entities to the raw graph } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_organizations/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_organizations/oozie_app/config-default.xml new file mode 100644 index 000000000..2e0ed9aee --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_organizations/oozie_app/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_organizations/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_organizations/oozie_app/workflow.xml new file mode 100644 index 000000000..95b66dc34 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_organizations/oozie_app/workflow.xml @@ -0,0 +1,270 @@ + + + + + graphOutputPath + the target path to store raw graph + + + reuseContent + false + should import content from the aggregator or reuse a previous version + + + contentPath + path location to store (or reuse) content from the aggregator + + + postgresURL + the postgres URL to access to the database + + + postgresUser + the user postgres + + + postgresPassword + the password postgres + + + postgresOpenOrgsURL + the postgres URL to access to the OpenOrgs database + + + postgresOpenOrgsUser + the user of OpenOrgs database + + + postgresOpenOrgsPassword + the password of OpenOrgs database + + + dbSchema + beta + the database schema according to the D-Net infrastructure (beta or production) + + + isLookupUrl + the address of the lookUp service + + + nsPrefixBlacklist + + a blacklist of nsprefixes (comma separeted) + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + ${wf:conf('reuseContent') eq false} + ${wf:conf('reuseContent') eq true} + + + + + + + + + + + + + + + eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication + --hdfsPath${contentPath}/db_openaire_organizations + --postgresUrl${postgresURL} + --postgresUser${postgresUser} + --postgresPassword${postgresPassword} + --isLookupUrl${isLookupUrl} + --actionopenaire_organizations + --dbschema${dbSchema} + --nsPrefixBlacklist${nsPrefixBlacklist} + + + + + + + + + + + eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication + --hdfsPath${contentPath}/db_openorgs + --postgresUrl${postgresOpenOrgsURL} + --postgresUser${postgresOpenOrgsUser} + --postgresPassword${postgresOpenOrgsPassword} + --isLookupUrl${isLookupUrl} + --actionopenorgs + --dbschema${dbSchema} + --nsPrefixBlacklist${nsPrefixBlacklist} + + + + + + + + + + yarn + cluster + GenerateEntities + eu.dnetlib.dhp.oa.graph.raw.GenerateEntitiesApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePaths${contentPath}/db_openaire_organizations,${contentPath}/db_openorgs + --targetPath${workingDir}/entities + --isLookupUrl${isLookupUrl} + + + + + + + + yarn + cluster + GenerateGraph + eu.dnetlib.dhp.oa.graph.raw.DispatchEntitiesApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --sourcePath${workingDir}/entities + --graphRawPath${graphOutputPath} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizations.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizations.sql index 938744b11..9a8f98931 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizations.sql +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizations.sql @@ -24,7 +24,7 @@ SELECT d.officialname AS collectedfromname, o.country || '@@@dnet:countries' AS country, 'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction, - array_remove(array_agg(DISTINCT i.pid || '###' || i.issuertype), NULL) AS pid + array_agg(DISTINCT i.pid || '###' || i.issuertype || '@@@dnet:pid_types') AS pid FROM dsm_organizations o LEFT OUTER JOIN dsm_datasources d ON (d.id = o.collectedfrom) LEFT OUTER JOIN dsm_organizationpids p ON (p.organization = o.id) @@ -50,4 +50,4 @@ GROUP BY o.trust, d.id, d.officialname, - o.country + o.country; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizationsFromOpenOrgsDB.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizationsFromOpenOrgsDB.sql index 82ece5a1c..dbe0c136b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizationsFromOpenOrgsDB.sql +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizationsFromOpenOrgsDB.sql @@ -31,6 +31,8 @@ FROM organizations o LEFT OUTER JOIN urls u ON (u.id = o.id) LEFT OUTER JOIN other_ids i ON (i.id = o.id) LEFT OUTER JOIN other_names n ON (n.id = o.id) +WHERE + o.status = 'approved' GROUP BY o.id, o.name, @@ -72,6 +74,8 @@ FROM other_names n LEFT OUTER JOIN organizations o ON (n.id = o.id) LEFT OUTER JOIN urls u ON (u.id = o.id) LEFT OUTER JOIN other_ids i ON (i.id = o.id) +WHERE + o.status = 'approved' GROUP BY o.id, o.creation_date,