From e4b56a4f88c4bf8ef80b47d82ec7978b524bcf2b Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 6 Feb 2024 14:13:12 +0200 Subject: [PATCH] [Organization] dump for organizations only --- .../src/test/java/GenerateJsonSchema.java | 2 +- .../funderresults/SparkDumpFunderResults.java | 4 + .../SparkDumpOrganizationJob.java | 270 ++++++++++++++++++ .../oozie_app/config-default.xml | 30 ++ .../organizationonly/oozie_app/workflow.xml | 88 ++++++ 5 files changed, 393 insertions(+), 1 deletion(-) create mode 100644 dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/organizationonly/SparkDumpOrganizationJob.java create mode 100644 dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/organizationonly/oozie_app/config-default.xml create mode 100644 dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/organizationonly/oozie_app/workflow.xml diff --git a/dump-schema/src/test/java/GenerateJsonSchema.java b/dump-schema/src/test/java/GenerateJsonSchema.java index 63f8121..c7916e2 100644 --- a/dump-schema/src/test/java/GenerateJsonSchema.java +++ b/dump-schema/src/test/java/GenerateJsonSchema.java @@ -1,6 +1,5 @@ import java.io.IOException; -import eu.dnetlib.dhp.oa.model.Result; import org.junit.jupiter.api.Test; import com.fasterxml.jackson.core.JsonProcessingException; @@ -10,6 +9,7 @@ import com.github.imifou.jsonschema.module.addon.AddonModule; import com.github.victools.jsonschema.generator.*; import eu.dnetlib.dhp.ExecCreateSchemas; +import eu.dnetlib.dhp.oa.model.Result; import eu.dnetlib.dhp.oa.model.community.CommunityResult; import eu.dnetlib.dhp.oa.model.graph.*; diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java index 668ae21..af957dc 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java @@ -9,6 +9,8 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; +import com.amazonaws.transform.SimpleTypeUnmarshallers; +import io.netty.util.internal.StringUtil; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; @@ -95,6 +97,8 @@ public class SparkDumpFunderResults implements Serializable { Optional ofunder = Optional.ofNullable(p.getFunder()); if (ofunder.isPresent()) { String fName = ofunder.get().getShortName(); + if(StringUtil.isNullOrEmpty(fName)) + return ofunder.get().getName(); if (fName.equalsIgnoreCase("ec")) { fName += "_" + ofunder.get().getFundingStream(); } diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/organizationonly/SparkDumpOrganizationJob.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/organizationonly/SparkDumpOrganizationJob.java new file mode 100644 index 0000000..c218600 --- /dev/null +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/organizationonly/SparkDumpOrganizationJob.java @@ -0,0 +1,270 @@ + +package eu.dnetlib.dhp.oa.graph.dump.organizationonly; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static eu.dnetlib.dhp.oa.graph.dump.Utils.ENTITY_ID_SEPARATOR; +import static eu.dnetlib.dhp.oa.graph.dump.Utils.getEntityId; + +import java.io.Serializable; +import java.io.StringReader; +import java.util.*; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.Node; +import org.dom4j.io.SAXReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Constants; +import eu.dnetlib.dhp.oa.graph.dump.ResultMapper; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; +import eu.dnetlib.dhp.oa.graph.dump.exceptions.CardinalityTooHighException; +import eu.dnetlib.dhp.oa.graph.dump.exceptions.NoAvailableEntityTypeException; +import eu.dnetlib.dhp.oa.model.Container; +import eu.dnetlib.dhp.oa.model.Provenance; +import eu.dnetlib.dhp.oa.model.Result; +import eu.dnetlib.dhp.oa.model.graph.*; +import eu.dnetlib.dhp.oa.model.graph.Datasource; +import eu.dnetlib.dhp.oa.model.graph.Organization; +import eu.dnetlib.dhp.oa.model.graph.Project; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.Relation; +import scala.Tuple2; + +/** + * Spark Job that fires the dump for the entites + */ +public class SparkDumpOrganizationJob implements Serializable { + private static final Logger log = LoggerFactory + .getLogger(eu.dnetlib.dhp.oa.graph.dump.organizationonly.SparkDumpOrganizationJob.class); + public static final String COMPRESSION = "compression"; + public static final String GZIP = "gzip"; + + public static void main(String[] args) throws Exception { + + Boolean isSparkSessionManaged = Boolean.TRUE; + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = "/tmp/prod_provision/graph/20_graph_blacklisted/"; + log.info("inputPath: {}", inputPath); + + final String outputPath = "/tmp/miriam/organizationsOnly/"; + log.info("outputPath: {}", outputPath); + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + // Utils.removeOutputDir(spark, outputPath); + organizationMap(spark, inputPath, outputPath); + // relationMap2(spark, inputPath, outputPath); + }); + + } + + private static void relationMap2(SparkSession spark, String inputPath, String outputPath) { + Utils + .readPath(spark, inputPath + "relation", Relation.class) + .filter((FilterFunction) r -> r.getRelType().equalsIgnoreCase("organizationOrganization")) + .map((MapFunction) relation -> { + eu.dnetlib.dhp.oa.model.graph.Relation relNew = new eu.dnetlib.dhp.oa.model.graph.Relation(); + relNew + .setSource(getEntityId(relation.getSource(), ENTITY_ID_SEPARATOR)); + relNew.setSourceType(ModelSupport.idPrefixEntity.get(relation.getSource().substring(0, 2))); + + relNew + .setTarget(getEntityId(relation.getTarget(), ENTITY_ID_SEPARATOR)); + relNew.setTargetType(ModelSupport.idPrefixEntity.get(relation.getTarget().substring(0, 2))); + + relNew + .setReltype( + RelType + .newInstance( + relation.getRelClass(), + relation.getSubRelType())); + + Optional odInfo = Optional.ofNullable(relation.getDataInfo()); + if (odInfo.isPresent()) { + DataInfo dInfo = odInfo.get(); + if (Optional.ofNullable(dInfo.getProvenanceaction()).isPresent() && + Optional.ofNullable(dInfo.getProvenanceaction().getClassname()).isPresent()) { + relNew + .setProvenance( + Provenance + .newInstance( + dInfo.getProvenanceaction().getClassname(), + dInfo.getTrust())); + } + } + if (Boolean.TRUE.equals(relation.getValidated())) { + relNew.setValidated(relation.getValidated()); + relNew.setValidationDate(relation.getValidationDate()); + } + + return relNew; + }, Encoders.bean(eu.dnetlib.dhp.oa.model.graph.Relation.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "relation"); + } + + private static void relationMap(SparkSession spark, String inputPath, String outputPath) { + Dataset organization = Utils + .readPath(spark, inputPath + "organization", eu.dnetlib.dhp.schema.oaf.Organization.class); + Dataset rels = Utils.readPath(spark, inputPath + "relation", Relation.class); + organization + .joinWith(rels, organization.col("id").equalTo(rels.col("source")), "left") + .map( + (MapFunction, Relation>) t2 -> t2._2(), + Encoders.bean(Relation.class)) + .filter(Objects::nonNull) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json("/tmp/orgSource"); + + rels = Utils.readPath(spark, "/tmp/orgSource", Relation.class); + + organization + .joinWith(rels, organization.col("id").equalTo(rels.col("target")), "left") + .map( + (MapFunction, Relation>) t2 -> t2._2(), + Encoders.bean(Relation.class)) + .filter(Objects::nonNull) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json("/tmp/orgSourceTarget"); + + Utils + .readPath(spark, "/tmp/orgSourceTarget", Relation.class) + .map((MapFunction) relation -> { + eu.dnetlib.dhp.oa.model.graph.Relation relNew = new eu.dnetlib.dhp.oa.model.graph.Relation(); + relNew + .setSource(getEntityId(relation.getSource(), ENTITY_ID_SEPARATOR)); + relNew.setSourceType(ModelSupport.idPrefixEntity.get(relation.getSource().substring(0, 2))); + + relNew + .setTarget(getEntityId(relation.getTarget(), ENTITY_ID_SEPARATOR)); + relNew.setTargetType(ModelSupport.idPrefixEntity.get(relation.getTarget().substring(0, 2))); + + relNew + .setReltype( + RelType + .newInstance( + relation.getRelClass(), + relation.getSubRelType())); + + Optional odInfo = Optional.ofNullable(relation.getDataInfo()); + if (odInfo.isPresent()) { + DataInfo dInfo = odInfo.get(); + if (Optional.ofNullable(dInfo.getProvenanceaction()).isPresent() && + Optional.ofNullable(dInfo.getProvenanceaction().getClassname()).isPresent()) { + relNew + .setProvenance( + Provenance + .newInstance( + dInfo.getProvenanceaction().getClassname(), + dInfo.getTrust())); + } + } + if (Boolean.TRUE.equals(relation.getValidated())) { + relNew.setValidated(relation.getValidated()); + relNew.setValidationDate(relation.getValidationDate()); + } + + return relNew; + }, Encoders.bean(eu.dnetlib.dhp.oa.model.graph.Relation.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "relation"); + } + + private static void organizationMap(SparkSession spark, String inputPath, String outputPath) { + Utils + .readPath(spark, inputPath + "organization", eu.dnetlib.dhp.schema.oaf.Organization.class) + .map( + (MapFunction) o -> mapOrganization(o), + Encoders.bean(Organization.class)) + .filter((FilterFunction) o -> o != null) + .write() + .mode(SaveMode.Overwrite) + .option(COMPRESSION, GZIP) + .json(outputPath + "/organization"); + } + + private static eu.dnetlib.dhp.oa.model.graph.Organization mapOrganization( + eu.dnetlib.dhp.schema.oaf.Organization org) { + + Organization organization = new Organization(); + + Optional + .ofNullable(org.getLegalshortname()) + .ifPresent(value -> organization.setLegalshortname(value.getValue())); + + Optional + .ofNullable(org.getLegalname()) + .ifPresent(value -> organization.setLegalname(value.getValue())); + + Optional + .ofNullable(org.getWebsiteurl()) + .ifPresent(value -> organization.setWebsiteurl(value.getValue())); + + Optional + .ofNullable(org.getAlternativeNames()) + .ifPresent( + value -> organization + .setAlternativenames( + value + .stream() + .map(v -> v.getValue()) + .collect(Collectors.toList()))); + + Optional + .ofNullable(org.getCountry()) + .ifPresent( + value -> { + if (!value.getClassid().equals(eu.dnetlib.dhp.oa.graph.dump.complete.Constants.UNKNOWN)) { + organization + .setCountry( + eu.dnetlib.dhp.oa.model.Country.newInstance(value.getClassid(), value.getClassname())); + } + + }); + + Optional + .ofNullable(org.getId()) + .ifPresent(value -> organization.setId(getEntityId(value, ENTITY_ID_SEPARATOR))); + + Optional + .ofNullable(org.getPid()) + .ifPresent( + value -> organization + .setPid( + value + .stream() + .map(p -> OrganizationPid.newInstance(p.getQualifier().getClassid(), p.getValue())) + .collect(Collectors.toList()))); + + return organization; + } + +} diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/organizationonly/oozie_app/config-default.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/organizationonly/oozie_app/config-default.xml new file mode 100644 index 0000000..d262cb6 --- /dev/null +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/organizationonly/oozie_app/config-default.xml @@ -0,0 +1,30 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + hiveJdbcUrl + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + + + hiveDbName + openaire + + + oozie.launcher.mapreduce.user.classpath.first + true + + diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/organizationonly/oozie_app/workflow.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/organizationonly/oozie_app/workflow.xml new file mode 100644 index 0000000..41b0ebe --- /dev/null +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/organizationonly/oozie_app/workflow.xml @@ -0,0 +1,88 @@ + + + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + yarn + cluster + Dump table organization and related relations + eu.dnetlib.dhp.oa.graph.dump.organizationonly.SparkDumpOrganizationJob + dump-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${sourcePath}/project + --resultTableNameeu.dnetlib.dhp.schema.oaf.Project + --outputPath${workingDir}/project + --communityMapPathnoneed + + + + + + + \ No newline at end of file