diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java index 1280d6fde..491e98874 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java @@ -120,7 +120,7 @@ public class GroupEntitiesSparkJob { private Entity mergeAndGet(Entity b, Entity a) { if (Objects.nonNull(a) && Objects.nonNull(b)) { - return MergeUtils.merge(b, a); + return MergeUtils.merge(b, a, true); } return Objects.isNull(a) ? b : a; } diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java index 7f148a4c8..cc6e10d81 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java @@ -21,8 +21,12 @@ import eu.dnetlib.dhp.schema.oaf.common.ModelSupport; public class MergeUtils { public static T merge(final T left, final T right) { + return merge(left, right, false); + } + + public static T merge(final T left, final T right, boolean checkDelegatedAuthority) { if (sameClass(left, right, Entity.class)) { - return mergeEntities(left, right); + return mergeEntities(left, right, checkDelegatedAuthority); } else if (sameClass(left, right, Relation.class)) { return mergeRelation(left, right); } else { @@ -34,9 +38,9 @@ public class MergeUtils { } } - private static T mergeEntities(T left, T right) { + private static T mergeEntities(T left, T right, boolean checkDelegatedAuthority) { if (sameClass(left, right, Result.class)) { - if (!left.getClass().equals(right.getClass())) { + if (!left.getClass().equals(right.getClass()) || checkDelegatedAuthority) { return mergeResultsOfDifferentTypes(left, right); } return mergeResult(left, right); @@ -265,16 +269,16 @@ public class MergeUtils { if (enrich.getOaiprovenance() != null && trustCompareResult < 0) mergedResult.setOaiprovenance(enrich.getOaiprovenance()); - if (isSubClass(mergedResult, Publication.class)) { + if (sameClass(mergedResult, enrich, Publication.class)) { return (T) mergePublication(mergedResult, enrich); } - if (isSubClass(mergedResult, Dataset.class)) { + if (sameClass(mergedResult, enrich, Dataset.class)) { return (T) mergeDataset(mergedResult, enrich); } - if (isSubClass(mergedResult, OtherResearchProduct.class)) { + if (sameClass(mergedResult, enrich, OtherResearchProduct.class)) { return (T) mergeORP(mergedResult, enrich); } - if (isSubClass(mergedResult, Software.class)) { + if (sameClass(mergedResult, enrich, Software.class)) { return (T) mergeSoftware(mergedResult, enrich); } @@ -888,11 +892,11 @@ public class MergeUtils { .compare( Optional .ofNullable(a.getDataInfo()) - .map(DataInfo::getTrust) + .map(EntityDataInfo::getTrust) .orElse(0f), Optional .ofNullable(b.getDataInfo()) - .map(DataInfo::getTrust) + .map(EntityDataInfo::getTrust) .orElse(0f)); } diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtils.java index f2f09894c..ff16cf4d8 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtils.java @@ -363,7 +363,7 @@ public class OafMapperUtils { final Entity entity, final String validationDate) { - final List provenance = getProvenance(entity.getCollectedfrom(), entity.getDataInfo()); + final List provenance = getProvenance(entity.getCollectedfrom(), fromEntityDataInfo(entity.getDataInfo())); return getRelation( source, target, relType, subRelType, relClass, provenance, validationDate, null); } @@ -434,4 +434,13 @@ public class OafMapperUtils { .orElse("")) .orElse(""); } + + public static DataInfo fromEntityDataInfo(EntityDataInfo entityDataInfo) { + DataInfo dataInfo = new DataInfo(); + dataInfo.setTrust(entityDataInfo.getTrust()); + dataInfo.setInferenceprovenance(entityDataInfo.getInferenceprovenance()); + dataInfo.setInferred(entityDataInfo.getInferred()); + dataInfo.setProvenanceaction(entityDataInfo.getProvenanceaction()); + return dataInfo; + } } diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtilsTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtilsTest.java index 2b5679770..12edfeac6 100644 --- a/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtilsTest.java +++ b/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtilsTest.java @@ -80,7 +80,7 @@ public class MergeUtilsTest { assertEquals(1, d2.getCollectedfrom().size()); assertTrue(cfId(d2.getCollectedfrom()).contains(ModelConstants.ZENODO_OD_ID)); - Result res = MergeUtils.merge(d1, d2); + Result res = MergeUtils.merge(d1, d2, true); assertEquals(d2, res); } @@ -93,7 +93,7 @@ public class MergeUtilsTest { assertEquals(1, d2.getCollectedfrom().size()); assertTrue(cfId(d2.getCollectedfrom()).contains(ModelConstants.ZENODO_OD_ID)); - Result res = MergeUtils.merge(p1, d2); + Result res = MergeUtils.merge(p1, d2, true); assertEquals(d2, res); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java index 0c16eb70d..cdba4ce09 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java @@ -55,11 +55,8 @@ public class Constants { null, ModelConstants.DNET_PROVENANCE_ACTIONS)); - public static final DataInfo Bip_DATA_INFO3 = OafMapperUtils .dataInfo( - false, - false, 0.8f, UPDATE_DATA_INFO_TYPE, false, @@ -68,31 +65,6 @@ public class Constants { UPDATE_MEASURE_BIP_CLASS_ID, UPDATE_CLASS_NAME, ModelConstants.DNET_PROVENANCE_ACTIONS)); - public static final EntityDataInfo Bip_DATA_INFO2 = OafMapperUtils - .dataInfo( - false, - false, - 0.8f, - UPDATE_DATA_INFO_TYPE, - true, - OafMapperUtils - .qualifier( - UPDATE_MEASURE_BIP_CLASS_ID, - UPDATE_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS)); - - public static final EntityDataInfo Bip_DATA_INFO = OafMapperUtils - .dataInfo( - false, - false, - 0.8f, //TODO check - UPDATE_DATA_INFO_TYPE, - true, - OafMapperUtils - .qualifier( - ModelConstants.PROVENANCE_ENRICH, - null, - ModelConstants.DNET_PROVENANCE_ACTIONS)); private Constants() { } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java index 531da0376..78d3b671a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java @@ -129,20 +129,13 @@ public class CreateActionSetSparkJob implements Serializable { List relationList = new ArrayList<>(); - String citing = ID_PREFIX - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue(PidType.doi.toString(), value.getCiting())); - final String cited = ID_PREFIX - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue(PidType.doi.toString(), value.getCited())); - + String citing = asOpenAireId(value.getCiting()); + final String cited = asOpenAireId(value.getCited()); if (!citing.equals(cited)) { relationList.add(getRelation(citing, cited)); if (duplicate && value.getCiting().endsWith(".refs")) { - citing = ID_PREFIX + IdentifierFactory - .md5( - CleaningFunctions - .normalizePidValue( - "doi", value.getCiting().substring(0, value.getCiting().indexOf(".refs")))); + citing = asOpenAireId(value.getCiting()); relationList.add(getRelation(citing, cited)); } } @@ -150,6 +143,13 @@ public class CreateActionSetSparkJob implements Serializable { return relationList; } + private static String asOpenAireId(String value) { + return IdentifierFactory.idFromPid( + "50", PidType.doi.toString(), + CleaningFunctions.normalizePidValue(PidType.doi.toString(), value), + true); + } + public static Relation getRelation( String source, String target) { diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/DataciteModelConstants.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/DataciteModelConstants.scala index e577d16a0..ccaf81aa9 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/DataciteModelConstants.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/DataciteModelConstants.scala @@ -73,6 +73,7 @@ object DataciteModelConstants { val SUBJ_CLASS = "keywords" val DATACITE_NAME = "Datacite" val dataInfo: EntityDataInfo = dataciteDataInfo(0.9f) + val relDataInfo = OafMapperUtils.fromEntityDataInfo(dataInfo); val DATACITE_COLLECTED_FROM: KeyValue = OafMapperUtils.keyValue(ModelConstants.DATACITE_ID, DATACITE_NAME) diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/DataciteToOAFTransformation.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/DataciteToOAFTransformation.scala index c61803f30..38a3350a0 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/DataciteToOAFTransformation.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/DataciteToOAFTransformation.scala @@ -279,11 +279,6 @@ object DataciteToOAFTransformation { } - def createDNetTargetIdentifier(pid: String, pidType: String, idPrefix: String): String = { - val f_part = s"$idPrefix|${pidType.toLowerCase}".padTo(15, '_') - s"$f_part::${IdentifierFactory.md5(pid.toLowerCase)}" - } - def generateOAFDate(dt: String, q: Qualifier): StructuredProperty = { OafMapperUtils.structuredProperty(dt, q) } @@ -313,7 +308,7 @@ object DataciteToOAFTransformation { val p = match_pattern.get._2 val grantId = m.matcher(awardUri).replaceAll("$2") val targetId = s"$p${DHPUtils.md5(grantId)}" - List(generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, dataInfo)) + List(generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, relDataInfo)) } else List() @@ -357,7 +352,7 @@ object DataciteToOAFTransformation { result.setPid(List(pid).asJava) // This identifiere will be replaced in a second moment using the PID logic generation - result.setId(OafMapperUtils.createOpenaireId(50, s"datacite____::$doi", true)) + result.setId(IdentifierFactory.createOpenaireId(50, s"datacite____::$doi", true)) result.setOriginalId(List(doi).asJava) val d = new Date(dateOfCollection * 1000) @@ -386,7 +381,7 @@ object DataciteToOAFTransformation { ) else null if (ni.nameIdentifier != null && ni.nameIdentifier.isDefined) { - OafMapperUtils.authorPid(ni.nameIdentifier.get, q, dataInfo) + OafMapperUtils.authorPid(ni.nameIdentifier.get, q, relDataInfo) } else null @@ -501,7 +496,7 @@ object DataciteToOAFTransformation { SUBJ_CLASS, SUBJ_CLASS, ModelConstants.DNET_SUBJECT_TYPOLOGIES, - dataInfo + relDataInfo ) ) .asJava @@ -635,7 +630,7 @@ object DataciteToOAFTransformation { .map(r => { val rel = new Relation - rel.setProvenance(Lists.newArrayList(OafMapperUtils.getProvenance(DATACITE_COLLECTED_FROM, dataInfo))) + rel.setProvenance(Lists.newArrayList(OafMapperUtils.getProvenance(DATACITE_COLLECTED_FROM, relDataInfo))) val subRelType = subRelTypeMapping(r.relationType).relType rel.setRelType(REL_TYPE_VALUE) diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/BioDBToOAF.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/BioDBToOAF.scala index 2f94618df..091d48713 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/BioDBToOAF.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/BioDBToOAF.scala @@ -2,7 +2,7 @@ package eu.dnetlib.dhp.sx.bio import com.google.common.collect.Lists import eu.dnetlib.dhp.schema.common.ModelConstants -import eu.dnetlib.dhp.schema.oaf.utils.{GraphCleaningFunctions, OafMapperUtils} +import eu.dnetlib.dhp.schema.oaf.utils.{GraphCleaningFunctions, IdentifierFactory, OafMapperUtils} import eu.dnetlib.dhp.schema.oaf._ import org.json4s.DefaultFormats import org.json4s.JsonAST.{JField, JObject, JString} @@ -152,7 +152,7 @@ object BioDBToOAF { d.setDataInfo(DATA_INFO) val nsPrefix = input.pidType.toLowerCase.padTo(12, '_') - d.setId(OafMapperUtils.createOpenaireId(50, s"$nsPrefix::${input.pid.toLowerCase}", true)) + d.setId(IdentifierFactory.createOpenaireId(50, s"$nsPrefix::${input.pid.toLowerCase}", true)) if (input.tilte != null && input.tilte.nonEmpty) d.setTitle( @@ -233,7 +233,7 @@ object BioDBToOAF { ) d.setDataInfo(DATA_INFO) - d.setId(OafMapperUtils.createOpenaireId(50, s"uniprot_____::$pid", true)) + d.setId(IdentifierFactory.createOpenaireId(50, s"uniprot_____::$pid", true)) d.setCollectedfrom(List(collectedFromMap("uniprot")).asJava) val title: String = (json \ "title").extractOrElse[String](null) @@ -424,7 +424,7 @@ object BioDBToOAF { d.setCollectedfrom(List(collectedFromMap("pdb")).asJava) d.setDataInfo(DATA_INFO) - d.setId(OafMapperUtils.createOpenaireId(50, s"pdb_________::$pdb", true)) + d.setId(IdentifierFactory.createOpenaireId(50, s"pdb_________::$pdb", true)) d.setOriginalId(List(pdb).asJava) val title = (json \ "title").extractOrElse[String](null) @@ -532,7 +532,7 @@ object BioDBToOAF { val nsPrefix = input.targetPidType.toLowerCase.padTo(12, '_') - d.setId(OafMapperUtils.createOpenaireId(50, s"$nsPrefix::${input.targetPid.toLowerCase}", true)) + d.setId(IdentifierFactory.createOpenaireId(50, s"$nsPrefix::${input.targetPid.toLowerCase}", true)) d.setOriginalId(List(input.targetPid.toLowerCase).asJava) d.setPid( diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/pubmed/PubMedToOaf.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/pubmed/PubMedToOaf.scala index 42790349b..9bdee6fb7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/pubmed/PubMedToOaf.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/pubmed/PubMedToOaf.scala @@ -34,6 +34,8 @@ object PubMedToOaf { ModelConstants.PROVENANCE_ACTION_SET_QUALIFIER ) + val REL_DATAINFO = OafMapperUtils.fromEntityDataInfo(ENTITY_DATAINFO) + val collectedFrom: KeyValue = OafMapperUtils.keyValue(ModelConstants.EUROPE_PUBMED_CENTRAL_ID, "Europe PubMed Central") @@ -259,7 +261,7 @@ object PubMedToOaf { SUBJ_CLASS, SUBJ_CLASS, ModelConstants.DNET_SUBJECT_TYPOLOGIES, - ENTITY_DATAINFO + REL_DATAINFO ) )(collection.breakOut) if (subjects != null) diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java index be82b9fc3..b7dd403ca 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java @@ -78,17 +78,12 @@ public class SparkAtomicActionScoreJobTest { SparkAtomicActionScoreJob .main( new String[] { - "-isSparkSessionManaged", - Boolean.FALSE.toString(), - "-inputPath", - - bipScoresPath, - - "-outputPath", - workingDir.toString() + "/actionSet" + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-inputPath", bipScoresPath, + "-outputPath", workingDir.toString() + "/actionSet" }); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java index 074d30a1d..381a463cf 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java @@ -304,7 +304,6 @@ public class ProduceTest { SparkSaveUnresolved.main(new String[] { "--isSparkSessionManaged", Boolean.FALSE.toString(), "--sourcePath", workingDir.toString() + "/work", - "-outputPath", workingDir.toString() + "/unresolved" }); diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java index fa39c0742..aa920ff6c 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java @@ -8,6 +8,7 @@ import java.nio.file.Files; import java.nio.file.Path; import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.common.ModelSupport; import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -100,7 +101,7 @@ public class CreateOpenCitationsASTest { .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(aa -> ((Relation) aa.getPayload())); - assertEquals(62, tmp.count()); + assertEquals(31, tmp.count()); // tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); @@ -132,10 +133,7 @@ public class CreateOpenCitationsASTest { .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(aa -> ((Relation) aa.getPayload())); - assertEquals(46, tmp.count()); - - // tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); - + assertEquals(23, tmp.count()); } @Test @@ -200,7 +198,7 @@ public class CreateOpenCitationsASTest { tmp.foreach(r -> { final DataInfo dataInfo = r.getProvenance().get(0).getDataInfo(); assertEquals(false, dataInfo.getInferred()); - assertEquals("0.91", dataInfo.getTrust()); + assertEquals(0.91f, dataInfo.getTrust()); assertEquals( CreateActionSetSparkJob.OPENCITATIONS_CLASSID, dataInfo.getProvenanceaction().getClassid()); assertEquals( @@ -240,9 +238,8 @@ public class CreateOpenCitationsASTest { assertEquals("citation", r.getSubRelType()); assertEquals("resultResult", r.getRelType()); }); + assertEquals(23, tmp.count()); assertEquals(23, tmp.filter(r -> r.getRelClass().equals("Cites")).count()); - assertEquals(23, tmp.filter(r -> r.getRelClass().equals("IsCitedBy")).count()); - } @Test @@ -281,17 +278,17 @@ public class CreateOpenCitationsASTest { @Test void testRelationsSourceTargetCouple() throws Exception { final String doi1 = "50|doi_________::" - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1007/s10854-015-3684-x")); + + ModelSupport.md5(CleaningFunctions.normalizePidValue("doi", "10.1007/s10854-015-3684-x")); final String doi2 = "50|doi_________::" - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1111/j.1551-2916.2008.02408.x")); + + ModelSupport.md5(CleaningFunctions.normalizePidValue("doi", "10.1111/j.1551-2916.2008.02408.x")); final String doi3 = "50|doi_________::" - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1007/s10854-014-2114-9")); + + ModelSupport.md5(CleaningFunctions.normalizePidValue("doi", "10.1007/s10854-014-2114-9")); final String doi4 = "50|doi_________::" - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1016/j.ceramint.2013.09.069")); + + ModelSupport.md5(CleaningFunctions.normalizePidValue("doi", "10.1016/j.ceramint.2013.09.069")); final String doi5 = "50|doi_________::" - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1007/s10854-009-9913-4")); + + ModelSupport.md5(CleaningFunctions.normalizePidValue("doi", "10.1007/s10854-009-9913-4")); final String doi6 = "50|doi_________::" - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1016/0038-1098(72)90370-5")); + + ModelSupport.md5(CleaningFunctions.normalizePidValue("doi", "10.1016/0038-1098(72)90370-5")); String inputPath = getClass() .getResource( @@ -318,7 +315,7 @@ public class CreateOpenCitationsASTest { JavaRDD check = tmp.filter(r -> r.getSource().equals(doi1) || r.getTarget().equals(doi1)); - assertEquals(10, check.count()); + assertEquals(5, check.count()); check.foreach(r -> { if (r.getSource().equals(doi2) || r.getSource().equals(doi3) || r.getSource().equals(doi4) || diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java index a00dbc65b..bb339d385 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java @@ -73,7 +73,7 @@ public class SparkAtomicActionCountJobTest { SparkAtomicActionUsageJob.writeActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet"); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index b8a5fade7..d363cf6bc 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -18,6 +18,7 @@ import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function2; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -135,7 +136,7 @@ public class GenerateEntitiesApplication { save( inputRdd .mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf)) - .reduceByKey(MergeUtils::merge) + .reduceByKey((Function2) (v1, v2) -> MergeUtils.merge(v1, v2, true)) .map(Tuple2::_2), targetPath); break;