diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java index b24daaa5d..fc515b5b1 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java @@ -13,6 +13,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoders; import com.github.sisyphsu.dateparser.DateParserUtils; import com.google.common.collect.Lists; @@ -23,8 +25,6 @@ import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import me.xuender.unidecode.Unidecode; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Encoders; public class GraphCleaningFunctions extends CleaningFunctions { diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala index 87116f00a..8ac8b00bf 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala @@ -27,7 +27,8 @@ object SparkCreateBaselineDataFrame { def requestBaseLineUpdatePage(maxFile: String): List[(String, String)] = { val data = requestPage("https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/") - val result = data.linesWithSeparators.map(l =>l.stripLineEnd) + val result = data.linesWithSeparators + .map(l => l.stripLineEnd) .filter(l => l.startsWith("") diff --git a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/sx/bio/BioScholixTest.scala b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/sx/bio/BioScholixTest.scala index 24caaa553..d1611300d 100644 --- a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/sx/bio/BioScholixTest.scala +++ b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/sx/bio/BioScholixTest.scala @@ -63,7 +63,9 @@ class BioScholixTest extends AbstractVocabularyTest { val records: String = Source .fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/bio/pubmed_dump")) .mkString - val r: List[Oaf] = records.linesWithSeparators.map(l =>l.stripLineEnd).toList + val r: List[Oaf] = records.linesWithSeparators + .map(l => l.stripLineEnd) + .toList .map(s => mapper.readValue(s, classOf[PMArticle])) .map(a => PubMedToOaf.convert(a, vocabularies)) assertEquals(10, r.size) @@ -173,9 +175,10 @@ class BioScholixTest extends AbstractVocabularyTest { val records: String = Source .fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/bio/pdb_dump")) .mkString - records.linesWithSeparators.map(l =>l.stripLineEnd).foreach(s => assertTrue(s.nonEmpty)) + records.linesWithSeparators.map(l => l.stripLineEnd).foreach(s => assertTrue(s.nonEmpty)) - val result: List[Oaf] = records.linesWithSeparators.map(l =>l.stripLineEnd).toList.flatMap(o => BioDBToOAF.pdbTOOaf(o)) + val result: List[Oaf] = + records.linesWithSeparators.map(l => l.stripLineEnd).toList.flatMap(o => BioDBToOAF.pdbTOOaf(o)) assertTrue(result.nonEmpty) result.foreach(r => assertNotNull(r)) @@ -194,9 +197,10 @@ class BioScholixTest extends AbstractVocabularyTest { val records: String = Source .fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/bio/uniprot_dump")) .mkString - records.linesWithSeparators.map(l =>l.stripLineEnd).foreach(s => assertTrue(s.nonEmpty)) + records.linesWithSeparators.map(l => l.stripLineEnd).foreach(s => assertTrue(s.nonEmpty)) - val result: List[Oaf] = records.linesWithSeparators.map(l =>l.stripLineEnd).toList.flatMap(o => BioDBToOAF.uniprotToOAF(o)) + val result: List[Oaf] = + records.linesWithSeparators.map(l => l.stripLineEnd).toList.flatMap(o => BioDBToOAF.uniprotToOAF(o)) assertTrue(result.nonEmpty) result.foreach(r => assertNotNull(r)) @@ -239,9 +243,10 @@ class BioScholixTest extends AbstractVocabularyTest { val records: String = Source .fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/bio/crossref_links")) .mkString - records.linesWithSeparators.map(l =>l.stripLineEnd).foreach(s => assertTrue(s.nonEmpty)) + records.linesWithSeparators.map(l => l.stripLineEnd).foreach(s => assertTrue(s.nonEmpty)) - val result: List[Oaf] = records.linesWithSeparators.map(l =>l.stripLineEnd).map(s => BioDBToOAF.crossrefLinksToOaf(s)).toList + val result: List[Oaf] = + records.linesWithSeparators.map(l => l.stripLineEnd).map(s => BioDBToOAF.crossrefLinksToOaf(s)).toList assertNotNull(result) assertTrue(result.nonEmpty) @@ -276,14 +281,17 @@ class BioScholixTest extends AbstractVocabularyTest { getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/bio/scholix_resolved") ) .mkString - records.linesWithSeparators.map(l =>l.stripLineEnd).foreach(s => assertTrue(s.nonEmpty)) + records.linesWithSeparators.map(l => l.stripLineEnd).foreach(s => assertTrue(s.nonEmpty)) implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats - val l: List[ScholixResolved] = records.linesWithSeparators.map(l =>l.stripLineEnd).map { input => - lazy val json = parse(input) - json.extract[ScholixResolved] - }.toList + val l: List[ScholixResolved] = records.linesWithSeparators + .map(l => l.stripLineEnd) + .map { input => + lazy val json = parse(input) + json.extract[ScholixResolved] + } + .toList val result: List[Oaf] = l.map(s => BioDBToOAF.scholixResolvedToOAF(s)) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtils.java index cf3562193..4792a7719 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtils.java @@ -37,12 +37,24 @@ public class SubscriptionUtils { } public static boolean verifyDateRange(final long date, final String min, final String max) { + + long from = 0; + long to = Long.MAX_VALUE; + try { - return date >= DateUtils.parseDate(min, "yyyy-MM-dd").getTime() - && date < DateUtils.parseDate(max, "yyyy-MM-dd").getTime() + ONE_DAY; + from = min != null ? DateUtils.parseDate(min, "yyyy-MM-dd").getTime() : 0; } catch (final ParseException e) { - return false; + from = 0; } + + try { + to = max != null ? DateUtils.parseDate(max, "yyyy-MM-dd").getTime() + ONE_DAY : Long.MAX_VALUE; + } catch (final ParseException e) { + to = Long.MAX_VALUE; + } + + return date >= from && date < to; + } public static boolean verifyExact(final String s1, final String s2) { diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtilsTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtilsTest.java index d93390e4a..63b49d362 100644 --- a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtilsTest.java +++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtilsTest.java @@ -41,6 +41,18 @@ public class SubscriptionUtilsTest { assertTrue(SubscriptionUtils.verifyDateRange(date, "2010-01-01", "2011-01-01")); assertFalse(SubscriptionUtils.verifyDateRange(date, "2020-01-01", "2021-01-01")); + + assertTrue(SubscriptionUtils.verifyDateRange(date, "2010-01-01", "NULL")); + assertTrue(SubscriptionUtils.verifyDateRange(date, "2010-01-01", null)); + assertTrue(SubscriptionUtils.verifyDateRange(date, "NULL", "2011-01-01")); + assertTrue(SubscriptionUtils.verifyDateRange(date, null, "2011-01-01")); + assertTrue(SubscriptionUtils.verifyDateRange(date, "NULL", "NULL")); + assertTrue(SubscriptionUtils.verifyDateRange(date, null, null)); + + assertFalse(SubscriptionUtils.verifyDateRange(date, "2020-01-01", null)); + assertFalse(SubscriptionUtils.verifyDateRange(date, "2020-01-01", "NULL")); + assertFalse(SubscriptionUtils.verifyDateRange(date, null, "2005-01-01")); + assertFalse(SubscriptionUtils.verifyDateRange(date, "NULL", "2005-01-01")); } @Test diff --git a/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/crossref/CrossrefMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/crossref/CrossrefMappingTest.scala index bd6dd7cec..572a48372 100644 --- a/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/crossref/CrossrefMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/crossref/CrossrefMappingTest.scala @@ -36,13 +36,13 @@ class CrossrefMappingTest { .fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/funder_doi")) .mkString - for (line <- funder_doi.linesWithSeparators.map(l =>l.stripLineEnd)) { + for (line <- funder_doi.linesWithSeparators.map(l => l.stripLineEnd)) { val json = template.replace("%s", line) val resultList: List[Oaf] = Crossref2Oaf.convert(json) assertTrue(resultList.nonEmpty) checkRelation(resultList) } - for (line <- funder_name.linesWithSeparators.map(l =>l.stripLineEnd)) { + for (line <- funder_name.linesWithSeparators.map(l => l.stripLineEnd)) { val json = template.replace("%s", line) val resultList: List[Oaf] = Crossref2Oaf.convert(json) assertTrue(resultList.nonEmpty) diff --git a/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/orcid/MappingORCIDToOAFTest.scala b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/orcid/MappingORCIDToOAFTest.scala index d7a6a94a5..8033f02fb 100644 --- a/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/orcid/MappingORCIDToOAFTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/orcid/MappingORCIDToOAFTest.scala @@ -25,9 +25,11 @@ class MappingORCIDToOAFTest { .mkString assertNotNull(json) assertFalse(json.isEmpty) - json.linesWithSeparators.map(l =>l.stripLineEnd).foreach(s => { - assertNotNull(ORCIDToOAF.extractValueFromInputString(s)) - }) + json.linesWithSeparators + .map(l => l.stripLineEnd) + .foreach(s => { + assertNotNull(ORCIDToOAF.extractValueFromInputString(s)) + }) } @Test diff --git a/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/uw/UnpayWallMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/uw/UnpayWallMappingTest.scala index 7fe0e9935..30001acb5 100644 --- a/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/uw/UnpayWallMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/uw/UnpayWallMappingTest.scala @@ -22,7 +22,7 @@ class UnpayWallMappingTest { .mkString var i: Int = 0 - for (line <- Ilist.linesWithSeparators.map(l =>l.stripLineEnd)) { + for (line <- Ilist.linesWithSeparators.map(l => l.stripLineEnd)) { val p = UnpayWallToOAF.convertToOAF(line) if (p != null) { @@ -43,7 +43,7 @@ class UnpayWallMappingTest { i = i + 1 } - val l = Ilist.linesWithSeparators.map(l =>l.stripLineEnd).next() + val l = Ilist.linesWithSeparators.map(l => l.stripLineEnd).next() val item = UnpayWallToOAF.convertToOAF(l) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java index de9e4fc90..3bc69cfd1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java @@ -5,7 +5,6 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import eu.dnetlib.dhp.schema.oaf.Dataset; import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -27,6 +26,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob; +import eu.dnetlib.dhp.schema.oaf.Dataset; import eu.dnetlib.dhp.schema.oaf.Publication; public class CleanCountryTest { @@ -151,41 +151,40 @@ public class CleanCountryTest { @Test public void testDatasetClean() throws Exception { final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/clean/dataset_clean_country.json") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/clean/dataset_clean_country.json") + .getPath(); spark - .read() - .textFile(sourcePath) - .map( - (MapFunction) r -> OBJECT_MAPPER.readValue(r, Dataset.class), - Encoders.bean(Dataset.class)) - .write() - .json(workingDir.toString() + "/dataset"); + .read() + .textFile(sourcePath) + .map( + (MapFunction) r -> OBJECT_MAPPER.readValue(r, Dataset.class), + Encoders.bean(Dataset.class)) + .write() + .json(workingDir.toString() + "/dataset"); CleanCountrySparkJob.main(new String[] { - "--isSparkSessionManaged", Boolean.FALSE.toString(), - "--inputPath", workingDir.toString() + "/dataset", - "-graphTableClassName", Dataset.class.getCanonicalName(), - "-workingDir", workingDir.toString() + "/working", - "-country", "NL", - "-verifyParam", "10.17632", - "-collectedfrom", "NARCIS", - "-hostedBy", getClass() + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--inputPath", workingDir.toString() + "/dataset", + "-graphTableClassName", Dataset.class.getCanonicalName(), + "-workingDir", workingDir.toString() + "/working", + "-country", "NL", + "-verifyParam", "10.17632", + "-collectedfrom", "NARCIS", + "-hostedBy", getClass() .getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy") .getPath() }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/dataset") - .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); + .textFile(workingDir.toString() + "/dataset") + .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); Assertions.assertEquals(1, tmp.count()); Assertions.assertEquals(0, tmp.first().getCountry().size()); - } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/oa/graph/resolution/ResolveEntitiesTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/oa/graph/resolution/ResolveEntitiesTest.scala index d415b7fc9..022168de5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/oa/graph/resolution/ResolveEntitiesTest.scala +++ b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/oa/graph/resolution/ResolveEntitiesTest.scala @@ -53,7 +53,8 @@ class ResolveEntitiesTest extends Serializable { def generateUpdates(spark: SparkSession): Unit = { val template = Source.fromInputStream(this.getClass.getResourceAsStream("updates")).mkString - val pids: List[String] = template.linesWithSeparators.map(l =>l.stripLineEnd) + val pids: List[String] = template.linesWithSeparators + .map(l => l.stripLineEnd) .map { id => val r = new Result r.setId(id.toLowerCase.trim) @@ -127,7 +128,7 @@ class ResolveEntitiesTest extends Serializable { entities.foreach { e => val template = Source.fromInputStream(this.getClass.getResourceAsStream(s"$e")).mkString spark - .createDataset(spark.sparkContext.parallelize(template.linesWithSeparators.map(l =>l.stripLineEnd).toList)) + .createDataset(spark.sparkContext.parallelize(template.linesWithSeparators.map(l => l.stripLineEnd).toList)) .as[String] .write .option("compression", "gzip") @@ -264,7 +265,8 @@ class ResolveEntitiesTest extends Serializable { Source .fromInputStream(this.getClass.getResourceAsStream(s"publication")) .mkString - .linesWithSeparators.map(l =>l.stripLineEnd) + .linesWithSeparators + .map(l => l.stripLineEnd) .next(), classOf[Publication] ) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixGraphTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixGraphTest.scala index 0ea908290..b838ae065 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixGraphTest.scala +++ b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/sx/graph/scholix/ScholixGraphTest.scala @@ -47,7 +47,7 @@ class ScholixGraphTest extends AbstractVocabularyTest { val inputRelations = Source .fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/oaf_to_summary")) .mkString - val items = inputRelations.linesWithSeparators.map(l =>l.stripLineEnd).toList + val items = inputRelations.linesWithSeparators.map(l => l.stripLineEnd).toList assertNotNull(items) items.foreach(i => assertTrue(i.nonEmpty)) val result = @@ -69,7 +69,8 @@ class ScholixGraphTest extends AbstractVocabularyTest { getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/merge_result_scholix") ) .mkString - val result: List[(Relation, ScholixSummary)] = inputRelations.linesWithSeparators.map(l =>l.stripLineEnd) + val result: List[(Relation, ScholixSummary)] = inputRelations.linesWithSeparators + .map(l => l.stripLineEnd) .sliding(2) .map(s => (s.head, s(1))) .map(p => (mapper.readValue(p._1, classOf[Relation]), mapper.readValue(p._2, classOf[ScholixSummary])))