From 932d07d2dd501b08d2926a2d5953998fdc100c42 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 6 Apr 2023 15:08:27 +0200 Subject: [PATCH] [bulkTag] added filtering for datasources in eosctag --- .../dhp/bulktag/eosc/SparkEoscBulkTag.java | 58 ++++++++++++++++--- .../input_eosc_bulkTag_parameters.json | 7 +++ .../dhp/bulktag/oozie_app/workflow.xml | 20 ++++--- 3 files changed, 68 insertions(+), 17 deletions(-) diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/eosc/SparkEoscBulkTag.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/eosc/SparkEoscBulkTag.java index c4b2122b4..010ad5a87 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/eosc/SparkEoscBulkTag.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/eosc/SparkEoscBulkTag.java @@ -16,6 +16,7 @@ import javax.print.attribute.DocAttributeSet; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.ForeachFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; @@ -34,6 +35,7 @@ import eu.dnetlib.dhp.bulktag.community.*; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import scala.Tuple2; /** * @author miriam.baglioni @@ -44,6 +46,11 @@ public class SparkEoscBulkTag implements Serializable { private static final Logger log = LoggerFactory.getLogger(SparkEoscBulkTag.class); public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static String OPENAIRE_3 = "openaire3.0"; + private static String OPENAIRE_4 = "openaire-pub_4.0"; + private static String OPENAIRE_CRIS = "openaire-cris_1.1"; + private static String OPENAIRE_DATA = "openaire2.0_data"; + public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( @@ -72,6 +79,9 @@ public class SparkEoscBulkTag implements Serializable { final String resultClassName = parser.get("resultTableName"); log.info("resultTableName: {}", resultClassName); + final String resultType = parser.get("resultType"); + log.info("resultType: {}", resultType); + Class resultClazz = (Class) Class.forName(resultClassName); SparkConf conf = new SparkConf(); @@ -82,41 +92,71 @@ public class SparkEoscBulkTag implements Serializable { isSparkSessionManaged, spark -> { removeOutputDir(spark, workingPath); - execBulkTag(spark, inputPath, workingPath, datasourceMapPath, resultClazz); + selectCompliantDatasources(spark, inputPath, workingPath, datasourceMapPath); + execBulkTag(spark, inputPath, workingPath, resultType, resultClazz); }); } + private static void selectCompliantDatasources(SparkSession spark, String inputPath, String workingPath, + String datasourceMapPath) { + Dataset datasources = readPath(spark, inputPath + "datasource", Datasource.class) + .filter((FilterFunction) ds -> { + final String compatibility = ds.getOpenairecompatibility().getClassid(); + return compatibility.equalsIgnoreCase(OPENAIRE_3) || + compatibility.equalsIgnoreCase(OPENAIRE_4) || + compatibility.equalsIgnoreCase(OPENAIRE_CRIS) || + compatibility.equalsIgnoreCase(OPENAIRE_DATA); + }); + + Dataset datasourceMaster = readPath(spark, datasourceMapPath, DatasourceMaster.class); + + datasources + .joinWith(datasourceMaster, datasources.col("id").equalTo(datasourceMaster.col("master")), "left") + .map( + (MapFunction, DatasourceMaster>) t2 -> t2._2(), + Encoders.bean(DatasourceMaster.class)) + .filter(Objects::nonNull) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingPath + "datasource"); + } + private static void execBulkTag( SparkSession spark, String inputPath, String workingPath, - String datasourceMapPath, + String resultType, Class resultClazz) { - List hostedByList = readPath(spark, datasourceMapPath, DatasourceMaster.class) + List hostedByList = readPath(spark, workingPath + "datasource", DatasourceMaster.class) .map((MapFunction) dm -> dm.getMaster(), Encoders.STRING()) .collectAsList(); - readPath(spark, inputPath, resultClazz) - .map(patchResult(), Encoders.bean(resultClazz)) - .filter(Objects::nonNull) + readPath(spark, inputPath + resultType, resultClazz) .map( (MapFunction) value -> enrich(value, hostedByList), Encoders.bean(resultClazz)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(workingPath); + .json(workingPath + resultType); - readPath(spark, workingPath, resultClazz) + readPath(spark, workingPath + resultType, resultClazz) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(inputPath); + .json(inputPath + resultType); } private static R enrich(R value, List hostedByList) { + if (value.getDataInfo().getDeletedbyinference() == null) { + value.getDataInfo().setDeletedbyinference(false); + } + if (value.getContext() == null) { + value.setContext(new ArrayList<>()); + } if (value .getInstance() .stream() diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/bulktag/input_eosc_bulkTag_parameters.json b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/bulktag/input_eosc_bulkTag_parameters.json index ebbbd408b..5aace346d 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/bulktag/input_eosc_bulkTag_parameters.json +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/bulktag/input_eosc_bulkTag_parameters.json @@ -29,6 +29,13 @@ "paramLongName": "isSparkSessionManaged", "paramDescription": "true if the spark session is managed, false otherwise", "paramRequired": false + }, + { + + "paramName": "rt", + "paramLongName": "resultType", + "paramDescription": "the result type", + "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/bulktag/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/bulktag/oozie_app/workflow.xml index 08a507c89..8ac8097e8 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/bulktag/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/bulktag/oozie_app/workflow.xml @@ -282,8 +282,9 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --sourcePath${outputPath}/publication - --workingPath${workingDir}/eoscContextTag/publication + --sourcePath${outputPath}/ + --resultTypepublication + --workingPath${workingDir}/eoscContextTag/ --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication --datasourceMapPath${workingDir}/datasourcemaster @@ -308,8 +309,9 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --sourcePath${outputPath}/dataset - --workingPath${workingDir}/eoscContextTag/dataset + --sourcePath${outputPath}/ + --resultTypedataset + --workingPath${workingDir}/eoscContextTag/ --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset --datasourceMapPath${workingDir}/datasourcemaster @@ -333,8 +335,9 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --sourcePath${outputPath}/software - --workingPath${workingDir}/eoscContextTag/software + --sourcePath${outputPath}/ + --resultTypesoftware + --workingPath${workingDir}/eoscContextTag/ --resultTableNameeu.dnetlib.dhp.schema.oaf.Software --datasourceMapPath${workingDir}/datasourcemaster @@ -358,8 +361,9 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --sourcePath${outputPath}/otherresearchproduct - --workingPath${workingDir}/eoscContextTag/otherresearchproduct + --sourcePath${outputPath}/ + --resultTypeotherresearchproduct + --workingPath${workingDir}/eoscContextTag/ --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct --datasourceMapPath${workingDir}/datasourcemaster