diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java index 23e97a97a..a6562789d 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java @@ -25,6 +25,19 @@ public class PropagationConstant { private PropagationConstant() { } + + + public static final String UPDATE_DATA_INFO_TYPE = "update"; + public static final String UPDATE_SUBJECT_FOS_CLASS_ID = "subject:fos"; + public static final String UPDATE_SUBJECT_FOS_CLASS_NAME = "Update of results with FOS subjects"; + public static final String UPDATE_MEASURE_BIP_CLASS_ID = "measure:bip"; + public static final String UPDATE_MEASURE_BIP_CLASS_NAME = "Update of results with BipFinder! measures"; + public static final String FOS_CLASS_ID = "fos"; + public static final String FOS_CLASS_NAME = "Subject from fos classification"; + + + + public static final String INSTITUTIONAL_REPO_TYPE = "pubsrepository::institutional"; public static final String PROPAGATION_DATA_INFO_TYPE = "propagation"; @@ -75,10 +88,17 @@ public class PropagationConstant { public static DataInfo getDataInfo( String inference_provenance, String inference_class_id, String inference_class_name, String qualifierSchema) { + + return getDataInfo(inference_provenance, inference_class_id, inference_class_name, qualifierSchema, "0.85"); + } + + public static DataInfo getDataInfo( + String inference_provenance, String inference_class_id, String inference_class_name, String qualifierSchema, + String trust) { DataInfo di = new DataInfo(); di.setInferred(true); di.setDeletedbyinference(false); - di.setTrust("0.85"); + di.setTrust(trust); di.setInferenceprovenance(inference_provenance); di.setProvenanceaction(getQualifier(inference_class_id, inference_class_name, qualifierSchema)); return di; diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/SparkUpdateBip.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/SparkUpdateBip.java new file mode 100644 index 000000000..737aac0bc --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/SparkUpdateBip.java @@ -0,0 +1,135 @@ + +package eu.dnetlib.dhp.bypassactionset; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.bypassactionset.model.BipScore; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.Serializable; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +/** + * created the Atomic Action for each tipe of results + */ +public class SparkUpdateBip implements Serializable { + + + private static final Logger log = LoggerFactory.getLogger(SparkUpdateBip.class); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + SparkUpdateBip.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/bipfinder/input_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("inputPath"); + log.info("inputPath {}: ", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}: ", outputPath); + + final String bipScorePath = parser.get("bipScorePath"); + log.info("bipScorePath: {}", bipScorePath); + + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); + + Class inputClazz = (Class) Class.forName(resultClassName); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> + updateBipFinder(spark, inputPath, outputPath, bipScorePath, inputClazz) + + ); + } + + private static void updateBipFinder(SparkSession spark, String inputPath, String outputPath, + String bipScorePath, Class inputClazz) { + + Dataset results = readPath(spark, inputPath, inputClazz); + Dataset bipScores = readPath(spark, bipScorePath, BipScore.class); + + results.joinWith(bipScores, results.col("id").equalTo(bipScores.col("id")), "left") + .map((MapFunction, I>) value -> { + if (!Optional.ofNullable(value._2()).isPresent()){ + return value._1(); + } + value._1().setMeasures(getMeasure(value._2())); + return value._1(); + }, Encoders.bean(inputClazz)) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(outputPath + "/bip"); + + } + + private static List getMeasure(BipScore value) { + return value + .getScoreList() + .stream() + .map(score -> { + Measure m = new Measure(); + m.setId(score.getId()); + m + .setUnit( + score + .getUnit() + .stream() + .map(unit -> { + KeyValue kv = new KeyValue(); + kv.setValue(unit.getValue()); + kv.setKey(unit.getKey()); + kv.setDataInfo(getDataInfo(UPDATE_DATA_INFO_TYPE, + UPDATE_MEASURE_BIP_CLASS_ID, + UPDATE_MEASURE_BIP_CLASS_NAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, "")); + return kv; + }) + .collect(Collectors.toList())); + return m; + }) + .collect(Collectors.toList()); + } + + + + + + + +} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/SparkUpdateFOS.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/SparkUpdateFOS.java new file mode 100644 index 000000000..79d3d6d14 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/SparkUpdateFOS.java @@ -0,0 +1,121 @@ +package eu.dnetlib.dhp.bypassactionset; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +import eu.dnetlib.dhp.bypassactionset.model.FOSDataModel; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Measure; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.Serializable; +import java.util.*; +import java.util.stream.Collectors; + +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class SparkUpdateFOS implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SparkUpdateFOS.class); + private final static String NULL = "NULL"; + private final static String DNET_RESULT_SUBJECT = "dnet:result_subject"; + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + SparkUpdateFOS.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/bipfinder/input_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("inputPath"); + log.info("inputPath {}: ", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}: ", outputPath); + + final String fosPath = parser.get("fosPath"); + log.info("fosPath: {}", fosPath); + + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); + + Class inputClazz = (Class) Class.forName(resultClassName); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> + updateFos(spark, inputPath, outputPath, fosPath, inputClazz) + + ); + } + + private static void updateFos(SparkSession spark, String inputPath, String outputPath, + String bipScorePath, Class inputClazz) { + + Dataset results = readPath(spark, inputPath, inputClazz); + Dataset bipScores = readPath(spark, bipScorePath, FOSDataModel.class); + + results.joinWith(bipScores, results.col("id").equalTo(bipScores.col("id")), "left") + .map((MapFunction, I>) value -> { + if (!Optional.ofNullable(value._2()).isPresent()){ + return value._1(); + } + value._1().getSubject().addAll(getSubjects(value._2())); + return value._1(); + }, Encoders.bean(inputClazz)) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(outputPath); + + } + + private static List getSubjects(FOSDataModel fos) { + return Arrays.asList(getSubject(fos.getLevel1()), getSubject(fos.getLevel2()), getSubject(fos.getLevel3())) + .stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + private static StructuredProperty getSubject(String sbj) { + if (sbj.equals(NULL)) + return null; + StructuredProperty sp = new StructuredProperty(); + sp.setValue(sbj); + sp.setQualifier(getQualifier(FOS_CLASS_ID, FOS_CLASS_NAME, DNET_RESULT_SUBJECT)); + sp.setDataInfo(getDataInfo(UPDATE_DATA_INFO_TYPE, + UPDATE_SUBJECT_FOS_CLASS_ID, + UPDATE_SUBJECT_FOS_CLASS_NAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, "")); + return sp; + + } + + +} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/Utils.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/Utils.java new file mode 100644 index 000000000..79ae871f3 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/Utils.java @@ -0,0 +1,15 @@ +package eu.dnetlib.dhp.bypassactionset; + +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import org.jetbrains.annotations.NotNull; + +public class Utils { + private static final String ID_PREFIX = "50|doi_________"; + + @NotNull + public static String getIdentifier(String d) { + return ID_PREFIX + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", d)); + } +} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/bipfinder/PrepareBipFinder.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/bipfinder/PrepareBipFinder.java new file mode 100644 index 000000000..7c4f5b859 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/bipfinder/PrepareBipFinder.java @@ -0,0 +1,89 @@ +package eu.dnetlib.dhp.bypassactionset.bipfinder; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.bypassactionset.SparkUpdateBip; +import eu.dnetlib.dhp.bypassactionset.model.BipDeserialize; +import eu.dnetlib.dhp.bypassactionset.model.BipScore; +import eu.dnetlib.dhp.schema.oaf.Result; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Optional; +import java.util.stream.Collectors; + +import static eu.dnetlib.dhp.bypassactionset.Utils.getIdentifier; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class PrepareBipFinder implements Serializable { + private static final String DOI = "doi"; + private static final Logger log = LoggerFactory.getLogger(SparkUpdateBip.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + SparkUpdateBip.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/bipfinder/input_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("inputPath"); + log.info("inputPath {}: ", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}: ", outputPath); + + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + + prepareResults(spark, inputPath, outputPath); + }); + } + + private static void prepareResults(SparkSession spark, String inputPath, String outputPath) { + + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD bipDeserializeJavaRDD = sc + .textFile(inputPath) + .map(item -> OBJECT_MAPPER.readValue(item, BipDeserialize.class)); + + spark + .createDataset(bipDeserializeJavaRDD.flatMap(entry -> entry.keySet().stream().map(key -> { + BipScore bs = new BipScore(); + bs.setId(getIdentifier(key)); + bs.setScoreList(entry.get(key)); + return bs; + }).collect(Collectors.toList()).iterator()).rdd(), Encoders.bean(BipScore.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(outputPath); + } +} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/GetFOSData.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/fos/GetFOSData.java similarity index 98% rename from dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/GetFOSData.java rename to dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/fos/GetFOSData.java index 849bf1c6a..dfab8f409 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/GetFOSData.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/fos/GetFOSData.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.bypassactionset; +package eu.dnetlib.dhp.bypassactionset.fos; import eu.dnetlib.dhp.application.ArgumentApplicationParser; diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/DistributeFOSSparkJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/fos/PrepareFOSSparkJob.java similarity index 82% rename from dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/DistributeFOSSparkJob.java rename to dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/fos/PrepareFOSSparkJob.java index 8c2c56be9..eb10ebd13 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/DistributeFOSSparkJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/fos/PrepareFOSSparkJob.java @@ -1,7 +1,9 @@ -package eu.dnetlib.dhp.bypassactionset; +package eu.dnetlib.dhp.bypassactionset.fos; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.bypassactionset.model.FOSDataModel; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; @@ -9,6 +11,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,16 +21,18 @@ import java.util.Arrays; import java.util.List; import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.bypassactionset.Utils.getIdentifier; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -public class DistributeFOSSparkJob implements Serializable { - private static final Logger log = LoggerFactory.getLogger(DistributeFOSSparkJob.class); +public class PrepareFOSSparkJob implements Serializable { + private static final Logger log = LoggerFactory.getLogger(PrepareFOSSparkJob.class); + public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( - DistributeFOSSparkJob.class + PrepareFOSSparkJob.class .getResourceAsStream( "/eu/dnetlib/dhp/bypassactionset/distribute_fos_parameters.json")); @@ -71,7 +76,8 @@ public class DistributeFOSSparkJob implements Serializable { final String level1 = v.getLevel1(); final String level2 = v.getLevel2(); final String level3 = v.getLevel3(); - Arrays.stream(v.getDoi().split("\u0002")).forEach(d -> fosList.add(FOSDataModel.newInstance(d, level1, level2, level3))); + Arrays.stream(v.getDoi().split("\u0002")).forEach(d -> + fosList.add(FOSDataModel.newInstance(getIdentifier(d), level1, level2, level3))); return fosList.iterator(); }, Encoders.bean(FOSDataModel.class)) .write() @@ -81,4 +87,6 @@ public class DistributeFOSSparkJob implements Serializable { } + + } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/model/BipScore.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/model/BipScore.java new file mode 100644 index 000000000..fb2687704 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bypassactionset/model/BipScore.java @@ -0,0 +1,30 @@ + +package eu.dnetlib.dhp.bypassactionset.model; + +import java.io.Serializable; +import java.util.List; + +/** + * Rewriting of the bipFinder input data by extracting the identifier of the result (doi) + */ + +public class BipScore implements Serializable { + private String id; // doi + private List scoreList; // unit as given in the inputfile + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public List getScoreList() { + return scoreList; + } + + public void setScoreList(List scoreList) { + this.scoreList = scoreList; + } +} diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bypassactionset/GetFOSTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bypassactionset/GetFOSTest.java index 6e05ced64..520289748 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bypassactionset/GetFOSTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bypassactionset/GetFOSTest.java @@ -1,9 +1,13 @@ package eu.dnetlib.dhp.bypassactionset; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.bypassactionset.fos.PrepareFOSSparkJob; +import eu.dnetlib.dhp.bypassactionset.fos.GetFOSData; import eu.dnetlib.dhp.bypassactionset.model.FOSDataModel; import eu.dnetlib.dhp.common.collection.CollectorException; import eu.dnetlib.dhp.countrypropagation.CountryPropagationJobTest; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -35,6 +39,7 @@ public class GetFOSTest { private static SparkSession spark; private static LocalFileSystem fs; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String ID_PREFIX = "50|doi_________"; @BeforeAll public static void beforeAll() throws IOException { @@ -103,7 +108,7 @@ public class GetFOSTest { .getResource("/eu/dnetlib/dhp/bypassactionset/fos/fos.json") .getPath(); - DistributeFOSSparkJob + PrepareFOSSparkJob .main( new String[] { "--isSparkSessionManaged", Boolean.FALSE.toString(), @@ -119,17 +124,21 @@ public class GetFOSTest { .textFile(workingDir.toString() + "/distribute") .map(item -> OBJECT_MAPPER.readValue(item, FOSDataModel.class)); + String doi1 = ID_PREFIX + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.3390/s18072310")); assertEquals(50, tmp.count()); - assertEquals(1, tmp.filter(row -> row.getDoi().equals("10.3390/s18072310")).count()); - assertEquals("engineering and technology", tmp.filter(r -> r.getDoi().equals("10.3390/s18072310")).collect().get(0).getLevel1()); - assertEquals("nano-technology", tmp.filter(r -> r.getDoi().equals("10.3390/s18072310")).collect().get(0).getLevel2()); - assertEquals("nanoscience & nanotechnology", tmp.filter(r -> r.getDoi().equals("10.3390/s18072310")).collect().get(0).getLevel3()); + assertEquals(1, tmp.filter(row -> row.getDoi().equals(doi1)).count()); + assertEquals("engineering and technology", tmp.filter(r -> r.getDoi().equals(doi1)).collect().get(0).getLevel1()); + assertEquals("nano-technology", tmp.filter(r -> r.getDoi().equals(doi1)).collect().get(0).getLevel2()); + assertEquals("nanoscience & nanotechnology", tmp.filter(r -> r.getDoi().equals(doi1)).collect().get(0).getLevel3()); - assertEquals(1, tmp.filter(row -> row.getDoi().equals("10.1111/1365-2656.12831")).count()); - assertEquals("social sciences", tmp.filter(r -> r.getDoi().equals("10.1111/1365-2656.12831")).collect().get(0).getLevel1()); - assertEquals("psychology and cognitive sciences", tmp.filter(r -> r.getDoi().equals("10.1111/1365-2656.12831")).collect().get(0).getLevel2()); - assertEquals("NULL", tmp.filter(r -> r.getDoi().equals("10.1111/1365-2656.12831")).collect().get(0).getLevel3()); + String doi = ID_PREFIX + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1111/1365-2656.12831")); + assertEquals(1, tmp.filter(row -> row.getDoi().equals(doi)).count()); + assertEquals("social sciences", tmp.filter(r -> r.getDoi().equals(doi)).collect().get(0).getLevel1()); + assertEquals("psychology and cognitive sciences", tmp.filter(r -> r.getDoi().equals(doi)).collect().get(0).getLevel2()); + assertEquals("NULL", tmp.filter(r -> r.getDoi().equals(doi)).collect().get(0).getLevel3()); // {"doi":"10.1111/1365-2656.12831\u000210.17863/cam.24369","level1":"social sciences","level2":"psychology and cognitive sciences","level3":"NULL"}