diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareSDGSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareSDGSparkJob.java index a88607986..012178c1e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareSDGSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareSDGSparkJob.java @@ -6,26 +6,23 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.schema.oaf.Subject; -import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import eu.dnetlib.dhp.utils.DHPUtils; public class PrepareSDGSparkJob implements Serializable { @@ -52,42 +49,83 @@ public class PrepareSDGSparkJob implements Serializable { final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); + final Boolean distributeDOI = Optional + .ofNullable(parser.get("distributeDoi")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("distribute doi {}", distributeDOI); + SparkConf conf = new SparkConf(); runWithSparkSession( conf, isSparkSessionManaged, spark -> { - doPrepare( - spark, - sourcePath, + if (distributeDOI) + doPrepare( + spark, + sourcePath, + + outputPath); + else + doPrepareoaid(spark, sourcePath, outputPath); - outputPath); }); } - private static void doPrepare(SparkSession spark, String sourcePath, String outputPath) { - Dataset sdgDataset = readPath(spark, sourcePath, SDGDataModel.class); + private static void doPrepareoaid(SparkSession spark, String sourcePath, String outputPath) { + Dataset sdgDataset = spark + .read() + .format("csv") + .option("sep", DEFAULT_DELIMITER) + .option("inferSchema", "true") + .option("header", "true") + .option("quotes", "\"") + .load(sourcePath); sdgDataset - .groupByKey((MapFunction) r -> r.getDoi().toLowerCase(), Encoders.STRING()) - .mapGroups((MapGroupsFunction) (k, it) -> { - Result r = new Result(); - r.setId(DHPUtils.generateUnresolvedIdentifier(k, DOI)); - SDGDataModel first = it.next(); - List sbjs = new ArrayList<>(); - sbjs.add(getSubject(first.getSbj(), SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID)); - it - .forEachRemaining( - s -> sbjs - .add(getSubject(s.getSbj(), SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID))); - r.setSubject(sbjs); - - return r; - }, Encoders.bean(Result.class)) + .groupByKey((MapFunction) v -> ((String) v.getAs("oaid")).toLowerCase(), Encoders.STRING()) + .mapGroups( + (MapGroupsFunction) (k, + it) -> getResult( + DHPUtils + .generateUnresolvedIdentifier( + ModelSupport.entityIdPrefix.get(Result.class.getSimpleName().toLowerCase()) + "|" + k, + DOI), + it), + Encoders.bean(Result.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath + "/sdg"); } + private static void doPrepare(SparkSession spark, String sourcePath, String outputPath) { + Dataset sdgDataset = spark.read().csv(sourcePath); + + sdgDataset + .groupByKey((MapFunction) r -> ((String) r.getAs("doi")).toLowerCase(), Encoders.STRING()) + .mapGroups( + (MapGroupsFunction) PrepareSDGSparkJob::getResult, Encoders.bean(Result.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "/sdg"); + } + + private static @NotNull Result getResult(String id, Iterator it) { + Result r = new Result(); + r.setId(id); + Row first = it.next(); + List sbjs = new ArrayList<>(); + sbjs.add(getSubject(first.getAs("sdg"), SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID)); + it + .forEachRemaining( + s -> sbjs + .add(getSubject(s.getAs("sdg"), SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID))); + r.setSubject(sbjs); + + return r; + } + } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/fosnodoi/CreateActionSetSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/fosnodoi/CreateActionSetSparkJob.java index e86fccb84..fceed2008 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/fosnodoi/CreateActionSetSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/fosnodoi/CreateActionSetSparkJob.java @@ -13,9 +13,6 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; @@ -24,13 +21,9 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.action.AtomicAction; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.schema.oaf.utils.*; import scala.Tuple2; public class CreateActionSetSparkJob implements Serializable { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/sdgnodoi/CreateActionSetSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/sdgnodoi/CreateActionSetSparkJob.java new file mode 100644 index 000000000..a2f8cfa39 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/sdgnodoi/CreateActionSetSparkJob.java @@ -0,0 +1,86 @@ + +package eu.dnetlib.dhp.actionmanager.sdgnodoi; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Objects; +import java.util.Optional; + +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.oaf.Result; +import scala.Tuple2; + +public class CreateActionSetSparkJob implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(final String[] args) throws IOException, ParseException { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + Objects + .requireNonNull( + CreateActionSetSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/fosnodoi/as_parameters.json")))); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}", outputPath); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> createActionSet(spark, inputPath, outputPath)); + + } + + private static void createActionSet(SparkSession spark, String inputPath, String outputPath) { + spark + .read() + .textFile(inputPath) + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, Result.class), + Encoders.bean(Result.class)) + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p)) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile( + outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/sdgnodoi/as_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/sdgnodoi/as_parameters.json new file mode 100644 index 000000000..3f056edf7 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/sdgnodoi/as_parameters.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "sp", + "paramLongName": "sourcePath", + "paramDescription": "the zipped opencitations file", + "paramRequired": true + }, + { + "paramName": "op", + "paramLongName": "outputPath", + "paramDescription": "the working path", + "paramRequired": true + }, + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "the hdfs name node", + "paramRequired": false + } +] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/sdgnodoi/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/sdgnodoi/oozie_app/config-default.xml new file mode 100644 index 000000000..d262cb6e0 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/sdgnodoi/oozie_app/config-default.xml @@ -0,0 +1,30 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + hiveJdbcUrl + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + + + hiveDbName + openaire + + + oozie.launcher.mapreduce.user.classpath.first + true + + diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/sdgnodoi/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/sdgnodoi/oozie_app/workflow.xml new file mode 100644 index 000000000..82144d0d6 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/sdgnodoi/oozie_app/workflow.xml @@ -0,0 +1,125 @@ + + + + + sdgPath + the input path of the resources to be extended + + + outputPath + the path where to store the actionset + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + Produces the results from FOS + eu.dnetlib.dhp.actionmanager.createunresolvedentities.PrepareSDGSparkJob + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${sdgPath} + --outputPath${workingDir}/prepared + --distributeDoifalse + + + + + + + + + + yarn + cluster + Save the action set grouping results with the same id + eu.dnetlib.dhp.actionmanager.sdgnodoi.CreateActionSetSparkJob + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${workingDir}/prepared/sdg + --outputPath${outputPath} + + + + + + + \ No newline at end of file