From a7866a26f76b0f2ae0275fa89af7866aa0168b0a Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 21 Sep 2023 17:44:10 +0200 Subject: [PATCH] first step to add eosc datasource id to instance --- .../oa/graph/dump/eosc/EoscDatasourceId.java | 83 +++++++++++++++++++ .../oa/graph/dump/eosc/oozie_app/workflow.xml | 10 +++ 2 files changed, 93 insertions(+) create mode 100644 dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/EoscDatasourceId.java diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/EoscDatasourceId.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/EoscDatasourceId.java new file mode 100644 index 0000000..dcf8364 --- /dev/null +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/EoscDatasourceId.java @@ -0,0 +1,83 @@ +package eu.dnetlib.dhp.oa.graph.dump.eosc; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Datasource; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.rmi.CORBA.Util; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +/** + * @author miriam.baglioni + * @Date 20/09/23 + */ +public class EoscDatasourceId implements Serializable { + private static final Logger log = LoggerFactory.getLogger(EoscDatasourceId.class); + private static String EOSC = "10|openaire____::2e06c1122c7df43765fdcf91080824fa"; + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SaveCommunityMap.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/eosc_identifiers_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String nameNode = parser.get("nameNode"); + log.info("nameNode: {}", nameNode); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String inputPath = parser.get("inputPath"); + log.info("inputPath: {}", inputPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + mapEoscIdentifier(spark, inputPath, outputPath); + }); + } + + private static void mapEoscIdentifier(SparkSession spark, String inputPath, String outputPath){ + spark.read() + .schema(Encoders.bean(Datasource.class).schema()) + .json(inputPath + "/datasource") + .withColumn("collfrom", functions.explode(new Column("collectedfrom.key"))) + .filter("collfrom == " + EOSC) + .withColumn("orId", functions.explode(new Column("originalId"))) + .filter(new Column("orId").startsWith("eosc")) + .select( + new Column("id").as("graphId"), + new Column("orId").as("eoscId")); + + } + +} diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/oozie_app/workflow.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/oozie_app/workflow.xml index 1c83b28..51c14c5 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/oozie_app/workflow.xml +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/oozie_app/workflow.xml @@ -97,6 +97,16 @@ --nameNode${nameNode} --isLookUpUrl${isLookUpUrl} + + + + + + eu.dnetlib.dhp.oa.graph.dump.eosc.EoscDatasourceId + --outputPath${workingDir}/eoscDatasourceIds + --inputPath${sourcePath} + --nameNode${nameNode} +