From 8947a60c803355242735b53da8d6d951889b922b Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 7 Nov 2022 17:56:57 +0100 Subject: [PATCH] [EOSC DUMP] added the class to extend the result with the usage count values. This is temporary until the usage stats are imported in the graph - promotion of the action set in the wrong place --- .../dump/eosc/ExtendWithUsageCounts.java | 115 ++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendWithUsageCounts.java diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendWithUsageCounts.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendWithUsageCounts.java new file mode 100644 index 0000000..5d0cd72 --- /dev/null +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendWithUsageCounts.java @@ -0,0 +1,115 @@ + +package eu.dnetlib.dhp.oa.graph.dump.eosc; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.eosc.model.EoscResult; +import eu.dnetlib.dhp.eosc.model.Indicator; +import eu.dnetlib.dhp.eosc.model.UsageCounts; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.oaf.Result; +import scala.Tuple2; + +/** + * @author miriam.baglioni + * @Date 07/11/22 + */ +public class ExtendWithUsageCounts implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(ExtendEoscResultWithOrganizationStep2.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + ExtendEoscResultWithOrganizationStep2.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/eosc_extend_result_with_indicators_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String actionSetPath = parser.get("actionSetPath"); + log.info("actionSetPath: {}", actionSetPath); + + final String resultPath = parser.get("resultPath"); + log.info("resultPath: {}", resultPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + addIndicators(spark, actionSetPath, outputPath, resultPath); + }); + } + + private static void addIndicators(SparkSession spark, String actionSetPath, String outputPath, String resultPath) { + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + JavaRDD resultsWithIndicatorsRdd = sc + .sequenceFile(actionSetPath, Text.class, Text.class) + .map(value -> new ObjectMapper().readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Result) aa.getPayload())); + + Dataset resultWithIndicators = spark + .createDataset(resultsWithIndicatorsRdd.rdd(), Encoders.bean(Result.class)); + + Dataset result = Utils + .readPath(spark, resultPath, EoscResult.class); + + result + .joinWith(resultWithIndicators, result.col("id").equalTo(resultWithIndicators.col("id")), "left") + .map((MapFunction, EoscResult>) t2 -> { + if (Optional.ofNullable(t2._2()).isPresent()) { + Indicator indicator = new Indicator(); + UsageCounts uc = new UsageCounts(); + t2._2().getMeasures().stream().forEach(m -> { + if (m.getId().equals("downloads")) { + uc.setDownloads(m.getUnit().get(0).getValue()); + } else { + uc.setViews(m.getUnit().get(0).getValue()); + } + indicator.setUsageCounts(uc); + t2._1().setIndicator(indicator); + }); + + } + return t2._1(); + }, Encoders.bean(EoscResult.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + + } +}