From 3525a8f504f92fc9776ddcbe1c3cc57cbe5a1e2f Mon Sep 17 00:00:00 2001 From: miconis Date: Wed, 14 Apr 2021 18:06:07 +0200 Subject: [PATCH 1/5] id generation of representative record moved to the SparkCreateMergeRel job --- .../dhp/oa/dedup/DedupRecordFactory.java | 7 +- .../dhp/oa/dedup/SparkCreateMergeRels.java | 32 +++++--- .../oa/dedup/graph/ConnectedComponent.java | 77 +++++++++++++------ .../dhp/oa/dedup/graph/GraphProcessor.scala | 8 +- .../dhp/oa/dedup/model/Identifier.java | 15 ++-- .../dnetlib/dhp/oa/dedup/SparkDedupTest.java | 3 +- 6 files changed, 91 insertions(+), 51 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index fe9bd74ce..d7ec49ccd 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -82,7 +82,7 @@ public class DedupRecordFactory { final Collection dates = Lists.newArrayList(); final List> authors = Lists.newArrayList(); - final List> bestPids = Lists.newArrayList(); // best pids list +// final List> bestPids = Lists.newArrayList(); // best pids list entities .forEachRemaining( @@ -90,7 +90,7 @@ public class DedupRecordFactory { T duplicate = t._2(); // prepare the list of pids to be used for the id generation - bestPids.add(Identifier.newInstance(duplicate)); +// bestPids.add(Identifier.newInstance(duplicate)); entity.mergeFrom(duplicate); if (ModelSupport.isSubClass(duplicate, Result.class)) { @@ -109,7 +109,8 @@ public class DedupRecordFactory { ((Result) entity).setAuthor(AuthorMerger.merge(authors)); } - entity.setId(IdGenerator.generate(bestPids, id)); +// entity.setId(IdGenerator.generate(bestPids, id)); + entity.setId(id); entity.setLastupdatetimestamp(ts); entity.setDataInfo(dataInfo); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index c27464f90..a58ce0231 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -7,6 +7,8 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.oaf.OafEntity; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; @@ -100,10 +102,8 @@ public class SparkCreateMergeRels extends AbstractSparkAction { final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity); - final JavaPairRDD vertexes = sc - .textFile(graphBasePath + "/" + subEntity) - .map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s)) - .mapToPair((PairFunction) s -> new Tuple2<>(hash(s), s)); + // + JavaPairRDD vertexes = createVertexes(sc, graphBasePath, subEntity, dedupConf); final RDD> edgeRdd = spark .read() @@ -116,9 +116,9 @@ public class SparkCreateMergeRels extends AbstractSparkAction { final Dataset mergeRels = spark .createDataset( GraphProcessor - .findCCs(vertexes.rdd(), edgeRdd, maxIterations, cut) + .findCCs(vertexes.rdd(), edgeRdd, subEntity, maxIterations, cut) .toJavaRDD() - .filter(k -> k.getDocIds().size() > 1) + .filter(k -> k.getEntities().size() > 1) .flatMap(cc -> ccToMergeRel(cc, dedupConf)) .rdd(), Encoders.bean(Relation.class)); @@ -128,16 +128,26 @@ public class SparkCreateMergeRels extends AbstractSparkAction { } } - public Iterator ccToMergeRel(ConnectedComponent cc, DedupConfig dedupConf) { + private JavaPairRDD createVertexes(JavaSparkContext sc, String graphBasePath, String subEntity, DedupConfig dedupConf) throws IOException { + + return sc + .textFile(graphBasePath + "/" + subEntity) + .mapToPair(json -> { + String id = MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), json); + return new Tuple2<>(hash(id), json); + }); + } + + private Iterator ccToMergeRel(ConnectedComponent cc, DedupConfig dedupConf) { return cc - .getDocIds() + .getEntities() .stream() .flatMap( - id -> { + e -> { List tmp = new ArrayList<>(); - tmp.add(rel(cc.getCcId(), id, ModelConstants.MERGES, dedupConf)); - tmp.add(rel(id, cc.getCcId(), ModelConstants.IS_MERGED_IN, dedupConf)); + tmp.add(rel(cc.getCcId(), MapDocumentUtil.getJPathString("$.id", e), ModelConstants.MERGES, dedupConf)); + tmp.add(rel( MapDocumentUtil.getJPathString("$.id", e), cc.getCcId(), ModelConstants.IS_MERGED_IN, dedupConf)); return tmp.stream(); }) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java index 3f24adb93..e1b12412a 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java @@ -3,10 +3,23 @@ package eu.dnetlib.dhp.oa.dedup.graph; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.google.common.collect.Lists; +import eu.dnetlib.dhp.oa.dedup.IdGenerator; +import eu.dnetlib.dhp.oa.dedup.model.Identifier; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.pace.config.DedupConfig; +import eu.dnetlib.pace.util.MapDocumentUtil; import org.apache.commons.lang.StringUtils; +import org.apache.spark.api.java.function.MapFunction; import org.codehaus.jackson.annotate.JsonIgnore; import com.fasterxml.jackson.databind.ObjectMapper; @@ -17,30 +30,48 @@ import eu.dnetlib.pace.util.PaceException; public class ConnectedComponent implements Serializable { - private Set docIds; private String ccId; + private Set entities; - public ConnectedComponent(Set docIds, final int cut) { - this.docIds = docIds; - createID(); - if (cut > 0 && docIds.size() > cut) { - this.docIds = docIds - .stream() - .filter(s -> !ccId.equalsIgnoreCase(s)) - .limit(cut - 1) - .collect(Collectors.toSet()); - this.docIds.add(ccId); + protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + public ConnectedComponent(Set entities, String subEntity, final int cut) { + this.entities = entities; + final Class clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity)); + + List> identifiers = Lists.newArrayList(); + + entities.forEach(e -> { + try { + T entity = OBJECT_MAPPER.readValue(e, clazz); + identifiers.add(Identifier.newInstance(entity)); + } catch (IOException e1) { + } + }); + + this.ccId = IdGenerator.generate( + identifiers, + createDefaultID() + ); + + if (cut > 0 && entities.size() > cut) { + this.entities = entities + .stream() + .filter(e -> !ccId.equalsIgnoreCase(MapDocumentUtil.getJPathString("$.id", e))) + .limit(cut - 1) + .collect(Collectors.toSet()); } } - public String createID() { - if (docIds.size() > 1) { + public String createDefaultID() { + if (entities.size() > 1) { final String s = getMin(); String prefix = s.split("\\|")[0]; ccId = prefix + "|dedup_wf_001::" + DHPUtils.md5(s); return ccId; } else { - return docIds.iterator().next(); + return MapDocumentUtil.getJPathString("$.id", entities.iterator().next()); } } @@ -49,15 +80,15 @@ public class ConnectedComponent implements Serializable { final StringBuilder min = new StringBuilder(); - docIds + entities .forEach( - i -> { + e -> { if (StringUtils.isBlank(min.toString())) { - min.append(i); + min.append(MapDocumentUtil.getJPathString("$.id", e)); } else { - if (min.toString().compareTo(i) > 0) { + if (min.toString().compareTo(MapDocumentUtil.getJPathString("$.id", e)) > 0) { min.setLength(0); - min.append(i); + min.append(MapDocumentUtil.getJPathString("$.id", e)); } } }); @@ -74,12 +105,12 @@ public class ConnectedComponent implements Serializable { } } - public Set getDocIds() { - return docIds; + public Set getEntities() { + return entities; } - public void setDocIds(Set docIds) { - this.docIds = docIds; + public void setEntities(Set docIds) { + this.entities = entities; } public String getCcId() { diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala index f4dd85d75..d8237410e 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala @@ -7,7 +7,7 @@ import scala.collection.JavaConversions; object GraphProcessor { - def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int, cut:Int): RDD[ConnectedComponent] = { + def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], subEntity: String, maxIterations: Int, cut:Int): RDD[ConnectedComponent] = { val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby val cc = graph.connectedComponents(maxIterations).vertices @@ -22,15 +22,15 @@ object GraphProcessor { } } val connectedComponents = joinResult.groupByKey() - .map[ConnectedComponent](cc => asConnectedComponent(cc, cut)) + .map[ConnectedComponent](cc => asConnectedComponent(cc, subEntity, cut)) connectedComponents } - def asConnectedComponent(group: (VertexId, Iterable[String]), cut:Int): ConnectedComponent = { + def asConnectedComponent(group: (VertexId, Iterable[String]), subEntity: String,cut:Int): ConnectedComponent = { val docs = group._2.toSet[String] - val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs), cut); + val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs), subEntity, cut); connectedComponent } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java index 94a3bf613..dded5a6b0 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java @@ -1,16 +1,7 @@ package eu.dnetlib.dhp.oa.dedup.model; -import java.io.Serializable; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.*; -import java.util.stream.Collectors; - -import org.apache.commons.lang3.StringUtils; - import com.google.common.collect.Sets; - import eu.dnetlib.dhp.oa.dedup.DatePicker; import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelConstants; @@ -18,6 +9,12 @@ import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.utils.PidComparator; import eu.dnetlib.dhp.schema.oaf.utils.PidType; +import org.apache.commons.lang3.StringUtils; + +import java.io.Serializable; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.stream.Collectors; public class Identifier implements Serializable, Comparable { diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index 851e72dee..010c569f2 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -353,6 +353,7 @@ public class SparkDedupTest implements Serializable { assertEquals(288, sw_mergerel); assertEquals(472, ds_mergerel); assertEquals(718, orp_mergerel); + } @Test @@ -538,7 +539,7 @@ public class SparkDedupTest implements Serializable { long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); - assertEquals(4858, relations); + assertEquals(4862, relations); // check deletedbyinference final Dataset mergeRels = spark From f64e57c11291f9d3d9dcae9c4cd0aa5ea112244e Mon Sep 17 00:00:00 2001 From: miconis Date: Thu, 15 Apr 2021 10:59:24 +0200 Subject: [PATCH 2/5] refactoring of the id generation, sparkcreatemergerels collects entities to create root id after a join --- .../dhp/schema/common/ModelConstants.java | 3 +- .../dhp/oa/dedup/DedupRecordFactory.java | 5 - .../dhp/oa/dedup/SparkCreateDedupRecord.java | 13 +- .../dhp/oa/dedup/SparkCreateMergeRels.java | 111 +++++++++++++----- .../oa/dedup/graph/ConnectedComponent.java | 83 ++++++------- .../dhp/oa/dedup/graph/GraphProcessor.scala | 8 +- .../dhp/oa/dedup/model/Identifier.java | 14 ++- .../dnetlib/dhp/oa/dedup/SparkDedupTest.java | 5 - 8 files changed, 141 insertions(+), 101 deletions(-) diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java index 1f9a09c3a..9b59c90db 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java @@ -36,7 +36,6 @@ public class ModelConstants { public static final String DNET_COUNTRY_TYPE = "dnet:countries"; public static final String DNET_REVIEW_LEVELS = "dnet:review_levels"; public static final String DNET_PROGRAMMING_LANGUAGES = "dnet:programming_languages"; - public static final String DNET_PROVENANCEACTIONS = "dnet:provenanceActions"; public static final String DNET_EXTERNAL_REF_TYPES = "dnet:externalReference_typologies"; public static final String SYSIMPORT_CROSSWALK_REPOSITORY = "sysimport:crosswalk:repository"; @@ -50,7 +49,7 @@ public class ModelConstants { public static final String PROVENANCE_DEDUP = "sysimport:dedup"; public static final Qualifier PROVENANCE_ACTION_SET_QUALIFIER = qualifier( - SYSIMPORT_ACTIONSET, SYSIMPORT_ACTIONSET, DNET_PROVENANCEACTIONS, DNET_PROVENANCEACTIONS); + SYSIMPORT_ACTIONSET, SYSIMPORT_ACTIONSET, DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS); public static final String DATASET_RESULTTYPE_CLASSID = "dataset"; public static final String PUBLICATION_RESULTTYPE_CLASSID = "publication"; diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index d7ec49ccd..5ba6f6e6d 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -82,16 +82,12 @@ public class DedupRecordFactory { final Collection dates = Lists.newArrayList(); final List> authors = Lists.newArrayList(); -// final List> bestPids = Lists.newArrayList(); // best pids list entities .forEachRemaining( t -> { T duplicate = t._2(); - // prepare the list of pids to be used for the id generation -// bestPids.add(Identifier.newInstance(duplicate)); - entity.mergeFrom(duplicate); if (ModelSupport.isSubClass(duplicate, Result.class)) { Result r1 = (Result) duplicate; @@ -109,7 +105,6 @@ public class DedupRecordFactory { ((Result) entity).setAuthor(AuthorMerger.merge(authors)); } -// entity.setId(IdGenerator.generate(bestPids, id)); entity.setId(id); entity.setLastupdatetimestamp(ts); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java index d870f6256..b41507e95 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java @@ -1,6 +1,9 @@ package eu.dnetlib.dhp.oa.dedup; +import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS; +import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVENANCE_DEDUP; + import java.io.IOException; import org.apache.commons.io.IOUtils; @@ -27,8 +30,6 @@ public class SparkCreateDedupRecord extends AbstractSparkAction { private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class); public static final String ROOT_TRUST = "0.8"; - public static final String PROVENANCE_ACTION_CLASS = "sysimport:dedup"; - public static final String PROVENANCE_ACTIONS = "dnet:provenanceActions"; public SparkCreateDedupRecord(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); @@ -94,10 +95,10 @@ public class SparkCreateDedupRecord extends AbstractSparkAction { info.setTrust(ROOT_TRUST); info.setInferenceprovenance(dedupConf.getWf().getConfigurationId()); Qualifier provenance = new Qualifier(); - provenance.setClassid(PROVENANCE_ACTION_CLASS); - provenance.setClassname(PROVENANCE_ACTION_CLASS); - provenance.setSchemeid(PROVENANCE_ACTIONS); - provenance.setSchemename(PROVENANCE_ACTIONS); + provenance.setClassid(PROVENANCE_DEDUP); + provenance.setClassname(PROVENANCE_DEDUP); + provenance.setSchemeid(DNET_PROVENANCE_ACTIONS); + provenance.setSchemename(DNET_PROVENANCE_ACTIONS); info.setProvenanceaction(provenance); return info; } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index a58ce0231..229214229 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -1,20 +1,20 @@ package eu.dnetlib.dhp.oa.dedup; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Optional; +import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS; +import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVENANCE_DEDUP; + +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.common.EntityType; -import eu.dnetlib.dhp.schema.oaf.OafEntity; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.graphx.Edge; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Dataset; @@ -25,14 +25,18 @@ import org.dom4j.DocumentException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; import com.google.common.hash.Hashing; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.dedup.graph.ConnectedComponent; import eu.dnetlib.dhp.oa.dedup.graph.GraphProcessor; +import eu.dnetlib.dhp.oa.dedup.model.Identifier; +import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; @@ -44,9 +48,7 @@ import scala.Tuple2; public class SparkCreateMergeRels extends AbstractSparkAction { - public static final String PROVENANCE_ACTION_CLASS = "sysimport:dedup"; private static final Logger log = LoggerFactory.getLogger(SparkCreateMergeRels.class); - public static final String DNET_PROVENANCE_ACTIONS = "dnet:provenanceActions"; public SparkCreateMergeRels(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); @@ -94,6 +96,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction { for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) { final String subEntity = dedupConf.getWf().getSubEntityValue(); + final Class clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity)); log.info("Creating mergerels for: '{}'", subEntity); @@ -102,7 +105,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction { final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity); - // + // JavaPairRDD vertexes = createVertexes(sc, graphBasePath, subEntity, dedupConf); final RDD> edgeRdd = spark @@ -113,14 +116,42 @@ public class SparkCreateMergeRels extends AbstractSparkAction { .map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass())) .rdd(); - final Dataset mergeRels = spark + Dataset> rawMergeRels = spark .createDataset( GraphProcessor - .findCCs(vertexes.rdd(), edgeRdd, subEntity, maxIterations, cut) + .findCCs(vertexes.rdd(), edgeRdd, maxIterations, cut) .toJavaRDD() - .filter(k -> k.getEntities().size() > 1) - .flatMap(cc -> ccToMergeRel(cc, dedupConf)) + .filter(k -> k.getIds().size() > 1) + .flatMap(this::ccToRels) .rdd(), + Encoders.tuple(Encoders.STRING(), Encoders.STRING())); + + Dataset> entities = spark + .read() + .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) + .map( + (MapFunction>) it -> { + OafEntity entity = OBJECT_MAPPER.readValue(it, clazz); + return new Tuple2<>(entity.getId(), entity); + }, + Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))); + + Dataset mergeRels = rawMergeRels + .joinWith(entities, rawMergeRels.col("_2").equalTo(entities.col("_1")), "inner") + // , + .map( + (MapFunction, Tuple2>, Tuple2>) value -> new Tuple2<>( + value._1()._1(), value._2()._2()), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))) + // + .groupByKey( + (MapFunction, String>) Tuple2::_1, Encoders.STRING()) + .mapGroups( + (MapGroupsFunction, ConnectedComponent>) this::generateID, + Encoders.bean(ConnectedComponent.class)) + // + .flatMap( + (FlatMapFunction) cc -> ccToMergeRel(cc, dedupConf), Encoders.bean(Relation.class)); mergeRels.write().mode(SaveMode.Append).parquet(mergeRelPath); @@ -128,26 +159,52 @@ public class SparkCreateMergeRels extends AbstractSparkAction { } } - private JavaPairRDD createVertexes(JavaSparkContext sc, String graphBasePath, String subEntity, DedupConfig dedupConf) throws IOException { + private ConnectedComponent generateID(String key, Iterator> values) { + + List> identifiers = Lists.newArrayList(values).stream().map(v -> { + T entity = v._2(); + Identifier identifier = Identifier.newInstance(entity); + return identifier; + }).collect(Collectors.toList()); + + String rootID = IdGenerator.generate(identifiers, key); + + if (Objects.equals(rootID, key)) + throw new IllegalStateException("generated default ID: " + rootID); + + return new ConnectedComponent(rootID, + identifiers.stream().map(i -> i.getEntity().getId()).collect(Collectors.toSet())); + } + + private JavaPairRDD createVertexes(JavaSparkContext sc, String graphBasePath, String subEntity, + DedupConfig dedupConf) { return sc - .textFile(graphBasePath + "/" + subEntity) - .mapToPair(json -> { - String id = MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), json); - return new Tuple2<>(hash(id), json); - }); + .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) + .mapToPair(json -> { + String id = MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), json); + return new Tuple2<>(hash(id), id); + }); + } + + private Iterator> ccToRels(ConnectedComponent cc) { + return cc + .getIds() + .stream() + .map(id -> new Tuple2<>(cc.getCcId(), id)) + .iterator(); } private Iterator ccToMergeRel(ConnectedComponent cc, DedupConfig dedupConf) { return cc - .getEntities() + .getIds() .stream() .flatMap( - e -> { + id -> { List tmp = new ArrayList<>(); - tmp.add(rel(cc.getCcId(), MapDocumentUtil.getJPathString("$.id", e), ModelConstants.MERGES, dedupConf)); - tmp.add(rel( MapDocumentUtil.getJPathString("$.id", e), cc.getCcId(), ModelConstants.IS_MERGED_IN, dedupConf)); + tmp.add(rel(cc.getCcId(), id, ModelConstants.MERGES, dedupConf)); + tmp.add(rel(id, cc.getCcId(), ModelConstants.IS_MERGED_IN, dedupConf)); return tmp.stream(); }) @@ -171,8 +228,8 @@ public class SparkCreateMergeRels extends AbstractSparkAction { info.setInvisible(false); info.setInferenceprovenance(dedupConf.getWf().getConfigurationId()); Qualifier provenanceAction = new Qualifier(); - provenanceAction.setClassid(PROVENANCE_ACTION_CLASS); - provenanceAction.setClassname(PROVENANCE_ACTION_CLASS); + provenanceAction.setClassid(PROVENANCE_DEDUP); + provenanceAction.setClassname(PROVENANCE_DEDUP); provenanceAction.setSchemeid(DNET_PROVENANCE_ACTIONS); provenanceAction.setSchemename(DNET_PROVENANCE_ACTIONS); info.setProvenanceaction(provenanceAction); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java index e1b12412a..3a986a9dd 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java @@ -9,69 +9,60 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; +import org.apache.commons.lang.StringUtils; +import org.apache.spark.api.java.function.MapFunction; +import org.codehaus.jackson.annotate.JsonIgnore; + import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; + +import eu.dnetlib.dhp.oa.dedup.DedupUtility; import eu.dnetlib.dhp.oa.dedup.IdGenerator; import eu.dnetlib.dhp.oa.dedup.model.Identifier; import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.utils.DHPUtils; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.util.MapDocumentUtil; -import org.apache.commons.lang.StringUtils; -import org.apache.spark.api.java.function.MapFunction; -import org.codehaus.jackson.annotate.JsonIgnore; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.oa.dedup.DedupUtility; -import eu.dnetlib.dhp.utils.DHPUtils; import eu.dnetlib.pace.util.PaceException; public class ConnectedComponent implements Serializable { private String ccId; - private Set entities; + private Set ids; - protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + private static final String CONNECTED_COMPONENT_ID_PREFIX = "connect_comp"; - public ConnectedComponent(Set entities, String subEntity, final int cut) { - this.entities = entities; - final Class clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity)); + public ConnectedComponent(Set ids, final int cut) { + this.ids = ids; - List> identifiers = Lists.newArrayList(); + this.ccId = createDefaultID(); - entities.forEach(e -> { - try { - T entity = OBJECT_MAPPER.readValue(e, clazz); - identifiers.add(Identifier.newInstance(entity)); - } catch (IOException e1) { - } - }); - - this.ccId = IdGenerator.generate( - identifiers, - createDefaultID() - ); - - if (cut > 0 && entities.size() > cut) { - this.entities = entities - .stream() - .filter(e -> !ccId.equalsIgnoreCase(MapDocumentUtil.getJPathString("$.id", e))) - .limit(cut - 1) - .collect(Collectors.toSet()); + if (cut > 0 && ids.size() > cut) { + this.ids = ids + .stream() + .filter(id -> !ccId.equalsIgnoreCase(id)) + .limit(cut - 1) + .collect(Collectors.toSet()); +// this.ids.add(ccId); ?? } } + public ConnectedComponent(String ccId, Set ids) { + this.ccId = ccId; + this.ids = ids; + } + public String createDefaultID() { - if (entities.size() > 1) { + if (ids.size() > 1) { final String s = getMin(); String prefix = s.split("\\|")[0]; - ccId = prefix + "|dedup_wf_001::" + DHPUtils.md5(s); + ccId = prefix + "|" + CONNECTED_COMPONENT_ID_PREFIX + "::" + DHPUtils.md5(s); return ccId; } else { - return MapDocumentUtil.getJPathString("$.id", entities.iterator().next()); + return ids.iterator().next(); } } @@ -80,15 +71,15 @@ public class ConnectedComponent implements Serializable { final StringBuilder min = new StringBuilder(); - entities + ids .forEach( - e -> { + id -> { if (StringUtils.isBlank(min.toString())) { - min.append(MapDocumentUtil.getJPathString("$.id", e)); + min.append(id); } else { - if (min.toString().compareTo(MapDocumentUtil.getJPathString("$.id", e)) > 0) { + if (min.toString().compareTo(id) > 0) { min.setLength(0); - min.append(MapDocumentUtil.getJPathString("$.id", e)); + min.append(id); } } }); @@ -105,12 +96,12 @@ public class ConnectedComponent implements Serializable { } } - public Set getEntities() { - return entities; + public Set getIds() { + return ids; } - public void setEntities(Set docIds) { - this.entities = entities; + public void setIds(Set ids) { + this.ids = ids; } public String getCcId() { diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala index d8237410e..f4dd85d75 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala @@ -7,7 +7,7 @@ import scala.collection.JavaConversions; object GraphProcessor { - def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], subEntity: String, maxIterations: Int, cut:Int): RDD[ConnectedComponent] = { + def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int, cut:Int): RDD[ConnectedComponent] = { val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby val cc = graph.connectedComponents(maxIterations).vertices @@ -22,15 +22,15 @@ object GraphProcessor { } } val connectedComponents = joinResult.groupByKey() - .map[ConnectedComponent](cc => asConnectedComponent(cc, subEntity, cut)) + .map[ConnectedComponent](cc => asConnectedComponent(cc, cut)) connectedComponents } - def asConnectedComponent(group: (VertexId, Iterable[String]), subEntity: String,cut:Int): ConnectedComponent = { + def asConnectedComponent(group: (VertexId, Iterable[String]), cut:Int): ConnectedComponent = { val docs = group._2.toSet[String] - val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs), subEntity, cut); + val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs), cut); connectedComponent } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java index dded5a6b0..d7f0644fd 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java @@ -1,7 +1,15 @@ package eu.dnetlib.dhp.oa.dedup.model; +import java.io.Serializable; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; + import com.google.common.collect.Sets; + import eu.dnetlib.dhp.oa.dedup.DatePicker; import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelConstants; @@ -9,12 +17,6 @@ import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.utils.PidComparator; import eu.dnetlib.dhp.schema.oaf.utils.PidType; -import org.apache.commons.lang3.StringUtils; - -import java.io.Serializable; -import java.text.SimpleDateFormat; -import java.util.*; -import java.util.stream.Collectors; public class Identifier implements Serializable, Comparable { diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index 010c569f2..93caadf93 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -55,7 +55,6 @@ public class SparkDedupTest implements Serializable { private static String testOutputBasePath; private static String testDedupGraphBasePath; private static final String testActionSetId = "test-orchestrator"; - private static String testDedupAssertionsBasePath; @BeforeAll public static void cleanUp() throws IOException, URISyntaxException { @@ -70,10 +69,6 @@ public class SparkDedupTest implements Serializable { testDedupGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") .toAbsolutePath() .toString(); - testDedupAssertionsBasePath = Paths - .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/assertions").toURI()) - .toFile() - .getAbsolutePath(); FileUtils.deleteDirectory(new File(testOutputBasePath)); FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); From 3d58f95522bce69bd339da8d1cdf5e357bfa2878 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 15 Apr 2021 15:03:05 +0200 Subject: [PATCH 3/5] [stats update] properly invalidating impala metadata --- .../eu/dnetlib/dhp/oa/graph/stats/oozie_app/finalizedb.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/finalizedb.sh b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/finalizedb.sh index 57acb2ee7..d04c5ccfd 100644 --- a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/finalizedb.sh +++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/finalizedb.sh @@ -10,6 +10,7 @@ export SOURCE=$1 export SHADOW=$2 echo "Updating shadow database" +impala-shell -q "invalidate metadata" impala-shell -d ${SOURCE} -q "invalidate metadata" impala-shell -d ${SOURCE} -q "show tables" --delimited | sed "s/^\(.*\)/compute stats ${SOURCE}.\1;/" | impala-shell -c -f - impala-shell -q "create database if not exists ${SHADOW}" From ba4b4c74d86850db063d0b02948220747f1e20d9 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 15 Apr 2021 16:48:26 +0200 Subject: [PATCH 4/5] do not make the identifier prefix depend on the Handle --- .../schema/oaf/utils/IdentifierFactory.java | 22 +++++------ .../oaf/utils/IdentifierFactoryTest.java | 3 ++ .../schema/oaf/utils/publication_doi4.json | 37 +++++++++++++++++++ 3 files changed, 51 insertions(+), 11 deletions(-) create mode 100644 dhp-common/src/test/resources/eu/dnetlib/dhp/schema/oaf/utils/publication_doi4.json diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactory.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactory.java index fe4642ee9..a0532da26 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactory.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactory.java @@ -57,7 +57,7 @@ public class IdentifierFactory implements Serializable { } public static List getPids(List pid, KeyValue collectedFrom) { - return pidFromInstance(pid, collectedFrom).distinct().collect(Collectors.toList()); + return pidFromInstance(pid, collectedFrom, true).distinct().collect(Collectors.toList()); } public static String createDOIBoostIdentifier(T entity) { @@ -104,7 +104,7 @@ public class IdentifierFactory implements Serializable { checkArgument(StringUtils.isNoneBlank(entity.getId()), "missing entity identifier"); - final Map> pids = extractPids(entity); + final Map> pids = extractPids(entity); return pids .values() @@ -125,7 +125,7 @@ public class IdentifierFactory implements Serializable { .orElseGet(entity::getId); } - private static Map> extractPids(T entity) { + private static Map> extractPids(T entity) { if (entity instanceof Result) { return Optional .ofNullable(((Result) entity).getInstance()) @@ -142,23 +142,23 @@ public class IdentifierFactory implements Serializable { Collectors .groupingBy( p -> p.getQualifier().getClassid(), - Collectors.mapping(p -> p, Collectors.toList()))); + Collectors.mapping(p -> p, Collectors.toCollection(HashSet::new)))); } } - private static Map> mapPids(List instance) { + private static Map> mapPids(List instance) { return instance .stream() - .map(i -> pidFromInstance(i.getPid(), i.getCollectedfrom())) + .map(i -> pidFromInstance(i.getPid(), i.getCollectedfrom(), false)) .flatMap(Function.identity()) .collect( Collectors .groupingBy( p -> p.getQualifier().getClassid(), - Collectors.mapping(p -> p, Collectors.toList()))); + Collectors.mapping(p -> p, Collectors.toCollection(HashSet::new)))); } - private static Stream pidFromInstance(List pid, KeyValue collectedFrom) { + private static Stream pidFromInstance(List pid, KeyValue collectedFrom, boolean mapHandles) { return Optional .ofNullable(pid) .map( @@ -167,16 +167,16 @@ public class IdentifierFactory implements Serializable { // filter away PIDs provided by a DS that is not considered an authority for the // given PID Type .filter(p -> { - return shouldFilterPid(collectedFrom, p); + return shouldFilterPid(collectedFrom, p, mapHandles); }) .map(CleaningFunctions::normalizePidValue) .filter(CleaningFunctions::pidFilter)) .orElse(Stream.empty()); } - private static boolean shouldFilterPid(KeyValue collectedFrom, StructuredProperty p) { + private static boolean shouldFilterPid(KeyValue collectedFrom, StructuredProperty p, boolean mapHandles) { final PidType pType = PidType.tryValueOf(p.getQualifier().getClassid()); - return pType.equals(PidType.handle) || Optional.ofNullable(collectedFrom).isPresent() && + return (mapHandles && pType.equals(PidType.handle)) || Optional.ofNullable(collectedFrom).isPresent() && Optional .ofNullable(PID_AUTHORITY.get(pType)) .map(authorities -> { diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactoryTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactoryTest.java index 31ef91a7a..935b74b08 100644 --- a/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactoryTest.java +++ b/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactoryTest.java @@ -31,6 +31,9 @@ public class IdentifierFactoryTest { verifyIdentifier( "publication_doi3.json", "50|pmc_________::94e4cb08c93f8733b48e2445d04002ac", true); + verifyIdentifier( + "publication_doi4.json", "50|od______2852::38861c44e6052a8d49f59a4c39ba5e66", true); + verifyIdentifier( "publication_pmc1.json", "50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1f", true); diff --git a/dhp-common/src/test/resources/eu/dnetlib/dhp/schema/oaf/utils/publication_doi4.json b/dhp-common/src/test/resources/eu/dnetlib/dhp/schema/oaf/utils/publication_doi4.json new file mode 100644 index 000000000..ac99ca93a --- /dev/null +++ b/dhp-common/src/test/resources/eu/dnetlib/dhp/schema/oaf/utils/publication_doi4.json @@ -0,0 +1,37 @@ +{ + "id": "50|od______2852::38861c44e6052a8d49f59a4c39ba5e66", + "instance": [ + { + "collectedfrom": { + "key": "10|openaire____::1234", + "value": "Zenodo" + }, + "pid": [ + { + "qualifier": {"classid": "doi"}, + "value": "10.1016/j.cmet.2010.03.013" + }, + { + "qualifier": {"classid": "handle"}, + "value": "11012/83840" + } + ] + }, + { + "collectedfrom": { + "key": "10|opendoar____::2852", + "value": "Digital library of Brno University of Technology" + }, + "pid": [ + { + "qualifier": {"classid": "pmc"}, + "value": "21459329" + }, + { + "qualifier": {"classid": "handle"}, + "value": "11012/83840" + } + ] + } + ] +} \ No newline at end of file From 8704d3278082f051f42148a1cd1a78b4c7683071 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 15 Apr 2021 16:52:58 +0200 Subject: [PATCH 5/5] code formatting --- .../eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactory.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactory.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactory.java index a0532da26..c6c5cdcea 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactory.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactory.java @@ -158,7 +158,8 @@ public class IdentifierFactory implements Serializable { Collectors.mapping(p -> p, Collectors.toCollection(HashSet::new)))); } - private static Stream pidFromInstance(List pid, KeyValue collectedFrom, boolean mapHandles) { + private static Stream pidFromInstance(List pid, KeyValue collectedFrom, + boolean mapHandles) { return Optional .ofNullable(pid) .map(