diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/Constants.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java similarity index 58% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/Constants.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java index 131d5fa07..7a3814b71 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/Constants.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java @@ -1,8 +1,11 @@ -package eu.dnetlib.dhp.actionmanager.bipmodel; +package eu.dnetlib.dhp.actionmanager; import java.util.Optional; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -46,4 +49,36 @@ public class Constants { .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); } + public static StructuredProperty getSubject(String sbj, String classid, String classname) { + if (sbj.equals(NULL)) + return null; + StructuredProperty sp = new StructuredProperty(); + sp.setValue(sbj); + sp + .setQualifier( + OafMapperUtils + .qualifier(classid + , + classname, + ModelConstants.DNET_SUBJECT_TYPOLOGIES, + ModelConstants.DNET_SUBJECT_TYPOLOGIES)); + sp + .setDataInfo( + OafMapperUtils + .dataInfo( + false, + UPDATE_DATA_INFO_TYPE, + true, + false, + OafMapperUtils + .qualifier( + UPDATE_SUBJECT_FOS_CLASS_ID, + UPDATE_CLASS_NAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + "")); + + return sp; + + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java index fafa209ea..ddf5f4adf 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java @@ -1,7 +1,7 @@ package eu.dnetlib.dhp.actionmanager.bipfinder; -import static eu.dnetlib.dhp.actionmanager.bipmodel.Constants.*; +import static eu.dnetlib.dhp.actionmanager.Constants.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java index 8d89d958d..e9c9f0350 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java @@ -1,8 +1,8 @@ package eu.dnetlib.dhp.actionmanager.createunresolvedentities; -import static eu.dnetlib.dhp.actionmanager.bipmodel.Constants.*; -import static eu.dnetlib.dhp.actionmanager.bipmodel.Constants.UPDATE_CLASS_NAME; +import static eu.dnetlib.dhp.actionmanager.Constants.*; +import static eu.dnetlib.dhp.actionmanager.Constants.UPDATE_CLASS_NAME; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareFOSSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareFOSSparkJob.java index 558f51f3f..1a6e9ddfc 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareFOSSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareFOSSparkJob.java @@ -1,7 +1,7 @@ package eu.dnetlib.dhp.actionmanager.createunresolvedentities; -import static eu.dnetlib.dhp.actionmanager.bipmodel.Constants.*; +import static eu.dnetlib.dhp.actionmanager.Constants.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; @@ -10,7 +10,6 @@ import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -21,10 +20,8 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; -import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import eu.dnetlib.dhp.utils.DHPUtils; public class PrepareFOSSparkJob implements Serializable { @@ -67,17 +64,7 @@ public class PrepareFOSSparkJob implements Serializable { private static void distributeFOSdois(SparkSession spark, String sourcePath, String outputPath) { Dataset fosDataset = readPath(spark, sourcePath, FOSDataModel.class); - fosDataset.flatMap((FlatMapFunction) v -> { - List fosList = new ArrayList<>(); - final String level1 = v.getLevel1(); - final String level2 = v.getLevel2(); - final String level3 = v.getLevel3(); - Arrays - .stream(v.getDoi().split("\u0002")) - .forEach(d -> fosList.add(FOSDataModel.newInstance(d, level1, level2, level3))); - return fosList.iterator(); - }, Encoders.bean(FOSDataModel.class)) - .map((MapFunction) value -> { + fosDataset.map((MapFunction) value -> { Result r = new Result(); r.setId(DHPUtils.generateUnresolvedIdentifier(value.getDoi(), DOI)); r.setSubject(getSubjects(value)); @@ -91,43 +78,14 @@ public class PrepareFOSSparkJob implements Serializable { private static List getSubjects(FOSDataModel fos) { return Arrays - .asList(getSubject(fos.getLevel1()), getSubject(fos.getLevel2()), getSubject(fos.getLevel3())) + .asList(getSubject(fos.getLevel1(), FOS_CLASS_ID, FOS_CLASS_NAME), + getSubject(fos.getLevel2(), FOS_CLASS_ID, FOS_CLASS_NAME), + getSubject(fos.getLevel3(), FOS_CLASS_ID, FOS_CLASS_NAME)) .stream() .filter(Objects::nonNull) .collect(Collectors.toList()); } - private static StructuredProperty getSubject(String sbj) { - if (sbj.equals(NULL)) - return null; - StructuredProperty sp = new StructuredProperty(); - sp.setValue(sbj); - sp - .setQualifier( - OafMapperUtils - .qualifier( - FOS_CLASS_ID, - FOS_CLASS_NAME, - ModelConstants.DNET_SUBJECT_TYPOLOGIES, - ModelConstants.DNET_SUBJECT_TYPOLOGIES)); - sp - .setDataInfo( - OafMapperUtils - .dataInfo( - false, - UPDATE_DATA_INFO_TYPE, - true, - false, - OafMapperUtils - .qualifier( - UPDATE_SUBJECT_FOS_CLASS_ID, - UPDATE_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS, - ModelConstants.DNET_PROVENANCE_ACTIONS), - "")); - return sp; - - } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/SparkSaveUnresolved.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/SparkSaveUnresolved.java index 1a115e18f..ab8356836 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/SparkSaveUnresolved.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/SparkSaveUnresolved.java @@ -1,7 +1,7 @@ package eu.dnetlib.dhp.actionmanager.createunresolvedentities; -import static eu.dnetlib.dhp.actionmanager.bipmodel.Constants.*; +import static eu.dnetlib.dhp.actionmanager.Constants.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable;