diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkEoscTag.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkEoscTag.java index 7dc3b5878b..ea7e77e398 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkEoscTag.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkEoscTag.java @@ -1,22 +1,13 @@ package eu.dnetlib.dhp.bulktag; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.gson.Gson; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.bulktag.community.CommunityConfiguration; -import eu.dnetlib.dhp.bulktag.community.CommunityConfigurationFactory; -import eu.dnetlib.dhp.bulktag.community.ProtoMap; -import eu.dnetlib.dhp.bulktag.community.QueryInformationSystem; import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.schema.oaf.Software; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; @@ -26,25 +17,30 @@ import org.slf4j.LoggerFactory; import java.util.Optional; import static eu.dnetlib.dhp.PropagationConstant.readPath; -import static eu.dnetlib.dhp.PropagationConstant.removeOutputDir; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; public class SparkEoscTag { private static final Logger log = LoggerFactory.getLogger(SparkEoscTag.class); public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static final Qualifier EOSC_QUALIFIER = OafMapperUtils.qualifier("eosc", + "European Open Science Cloud", + ModelConstants.DNET_SUBJECT_TYPOLOGIES,ModelConstants.DNET_SUBJECT_TYPOLOGIES); + public static final DataInfo EOSC_DATAINFO = OafMapperUtils.dataInfo(false, "propagation", true, false, + OafMapperUtils.qualifier("propagation:subject","Inferred by OpenAIRE", + ModelConstants.DNET_PROVENANCE_ACTIONS,ModelConstants.DNET_PROVENANCE_ACTIONS), "0.9"); public final static StructuredProperty EOSC_NOTEBOOK = OafMapperUtils.structuredProperty( - "EOSC::Jupyter Notebook", OafMapperUtils.qualifier("eosc","European Open Science Cloud", - ModelConstants.DNET_SUBJECT_TYPOLOGIES,ModelConstants.DNET_SUBJECT_TYPOLOGIES) - ,OafMapperUtils.dataInfo(false, "propagation", true, false, - OafMapperUtils.qualifier("propagation:subject","Inferred by OpenAIRE", - ModelConstants.DNET_PROVENANCE_ACTIONS,ModelConstants.DNET_PROVENANCE_ACTIONS), "0.9")); + "EOSC::Jupyter Notebook", EOSC_QUALIFIER,EOSC_DATAINFO); + public final static StructuredProperty EOSC_GALAXY = OafMapperUtils.structuredProperty( + "EOSC::Galaxy Workflow", EOSC_QUALIFIER, EOSC_DATAINFO); + public final static StructuredProperty EOSC_TWITTER = OafMapperUtils.structuredProperty( + "EOSC::Twitter Data", EOSC_QUALIFIER,EOSC_DATAINFO); public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( SparkEoscTag.class .getResourceAsStream( - "/eu/dnetlib/dhp/bulktag/input_eosctag_parameters.json")); + "/eu/dnetlib/dhp/bulktag/input_eoscTag_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); @@ -73,17 +69,15 @@ public class SparkEoscTag { } private static void execEoscTag(SparkSession spark, String inputPath, String workingPath) { - //search for notebook - //subject contiene jupyter. - //esistono python e notebook nei subject non necessariamente nello stesso - //si cerca fra i prodotti di tipo software - readPath(spark, inputPath + "/software", Software.class) .map((MapFunction) s -> { - if(containsSubjectNotebook(s)){ + if(containsCriteriaNotebook(s)){ s.getSubject().add(EOSC_NOTEBOOK); } + if(containsCriteriaGalaxy(s)){ + s.getSubject().add(EOSC_GALAXY); + } return s; }, Encoders.bean(Software.class) ) .write() @@ -97,9 +91,75 @@ public class SparkEoscTag { .option("compression","gzip") .json(inputPath + "/software"); + readPath(spark, inputPath + "/otherresearchproduct", OtherResearchProduct.class) + .map((MapFunction) orp -> + { + if(containsCriteriaGalaxy(orp)){ + orp.getSubject().add(EOSC_GALAXY); + } + if(containscriteriaTwitter(orp)){ + orp.getSubject().add(EOSC_TWITTER); + } + return orp; + }, Encoders.bean(OtherResearchProduct.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(workingPath + "/otherresearchproduct"); + + readPath(spark, workingPath + "/otherresearchproduct", OtherResearchProduct.class) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(inputPath + "/otherresearchproduct"); + + readPath(spark, inputPath + "/dataset", Dataset.class) + .map((MapFunction) d -> { + if(containscriteriaTwitter(d)){ + d.getSubject().add(EOSC_TWITTER); + } + return d; + } , Encoders.bean(Dataset.class) ) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(workingPath + "/dataset"); + + readPath(spark, workingPath + "/dataset" , Dataset.class) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(inputPath + "/dataset"); } - private static boolean containsSubjectNotebook(Software s) { + private static boolean containscriteriaTwitter(Result r) { + if (r.getTitle().stream().anyMatch(t -> t.getValue().toLowerCase().contains("twitter") && + t.getValue().toLowerCase().contains("data"))) + return true; + if(r.getDescription().stream().anyMatch(d -> d.getValue().toLowerCase().contains("twitter") && + d.getValue().toLowerCase().contains("data") )) + return true; + if(r.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("twitter")) && + r.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("data"))) + return true; + return false; + } + + private static boolean containsCriteriaGalaxy(Result r) { + if (r.getTitle().stream().anyMatch(t -> t.getValue().toLowerCase().contains("galaxy") && + (t.getValue().toLowerCase().contains("workflow") || t.getValue().toLowerCase().contains("software")))) + return true; + if(r.getDescription().stream().anyMatch(d -> d.getValue().toLowerCase().contains("galaxy") && + (d.getValue().toLowerCase().contains("workflow") || d.getValue().toLowerCase().contains("software")))) + return true; + if(r.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("galaxy")) && + (r.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("workflow"))) || + r.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("software"))) + return true; + return false; + } + + private static boolean containsCriteriaNotebook(Software s) { if(s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("jupyter"))) return true; if(s.getSubject().stream().anyMatch(sbj -> sbj.getValue().toLowerCase().contains("python") && @@ -117,4 +177,5 @@ public class SparkEoscTag { return true; return false; } + } diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/bulktag/input_eoscTag_parameters.json b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/bulktag/input_eoscTag_parameters.json new file mode 100644 index 0000000000..4c25fea019 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/bulktag/input_eoscTag_parameters.json @@ -0,0 +1,21 @@ +[ + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + { + "paramName": "wp", + "paramLongName": "workingPath", + "paramDescription": "the path used to store temporary output files", + "paramRequired": true + }, + { + "paramName": "ssm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "true if the spark session is managed, false otherwise", + "paramRequired": false + } + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/bulktag/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/bulktag/oozie_app/workflow.xml index f019f8413d..91fca7d61d 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/bulktag/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/bulktag/oozie_app/workflow.xml @@ -204,7 +204,31 @@ - + + + + + yarn-cluster + cluster + EOSC_tagging + eu.dnetlib.dhp.bulktag.SparkEoscTag + dhp-enrichment-${projectVersion}.jar + + --num-executors=${sparkExecutorNumber} + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${outputPath} + --workingPath${workingDir}/eoscTag + + + +