From 9ddafd46cafddc172f73d7cda623fe3f954befc3 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 23 Apr 2020 07:50:18 +0200 Subject: [PATCH] fixed dedup record id prefix, set the correct dataInfo in the DedupRecordFactory --- .../dhp/oa/dedup/DedupRecordFactory.java | 53 +++++++++---------- .../dhp/oa/dedup/SparkCreateDedupRecord.java | 29 ++++++++-- .../oa/dedup/graph/ConnectedComponent.java | 2 +- .../dedup/graph/ConnectedComponent.java | 2 +- 4 files changed, 54 insertions(+), 32 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index e5c8a4606..47aab1d20 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -25,6 +25,7 @@ public class DedupRecordFactory { public static Dataset createDedupRecord( final SparkSession spark, + final DataInfo dataInfo, final String mergeRelsInputPath, final String entitiesInputPath, final Class clazz) { @@ -67,41 +68,39 @@ public class DedupRecordFactory { Encoders.STRING()) .mapGroups( (MapGroupsFunction, T>) - (key, values) -> entityMerger(key, values, ts, clazz), + (key, values) -> entityMerger(key, values, ts, dataInfo), Encoders.bean(clazz)); } private static T entityMerger( - String id, Iterator> entities, long ts, Class clazz) { - try { - T entity = clazz.newInstance(); - entity.setId(id); - entity.setDataInfo(new DataInfo()); - entity.getDataInfo().setTrust("0.9"); - entity.setLastupdatetimestamp(ts); + String id, Iterator> entities, long ts, DataInfo dataInfo) { - final Collection dates = Lists.newArrayList(); - entities.forEachRemaining( - t -> { - T duplicate = t._2(); - entity.mergeFrom(duplicate); - if (ModelSupport.isSubClass(duplicate, Result.class)) { - Result r1 = (Result) duplicate; - Result er = (Result) entity; - er.setAuthor(DedupUtility.mergeAuthor(er.getAuthor(), r1.getAuthor())); + T entity = entities.next()._2(); - if (er.getDateofacceptance() != null) { - dates.add(r1.getDateofacceptance().getValue()); - } + final Collection dates = Lists.newArrayList(); + entities.forEachRemaining( + t -> { + T duplicate = t._2(); + entity.mergeFrom(duplicate); + if (ModelSupport.isSubClass(duplicate, Result.class)) { + Result r1 = (Result) duplicate; + Result er = (Result) entity; + er.setAuthor(DedupUtility.mergeAuthor(er.getAuthor(), r1.getAuthor())); + + if (r1.getDateofacceptance() != null) { + dates.add(r1.getDateofacceptance().getValue()); } - }); + } + }); - if (ModelSupport.isSubClass(entity, Result.class)) { - ((Result) entity).setDateofacceptance(DatePicker.pick(dates)); - } - return entity; - } catch (IllegalAccessException | InstantiationException e) { - throw new RuntimeException(e); + if (ModelSupport.isSubClass(entity, Result.class)) { + ((Result) entity).setDateofacceptance(DatePicker.pick(dates)); } + + entity.setId(id); + entity.setLastupdatetimestamp(ts); + entity.setDataInfo(dataInfo); + + return entity; } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java index c46464ffd..42a0cff8a 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java @@ -3,7 +3,9 @@ package eu.dnetlib.dhp.oa.dedup; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -21,6 +23,10 @@ public class SparkCreateDedupRecord extends AbstractSparkAction { private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class); + public static final String ROOT_TRUST = "0.8"; + public static final String PROVENANCE_ACTION_CLASS = "sysimport:dedup"; + public static final String PROVENANCE_ACTIONS = "dnet:provenanceActions"; + public SparkCreateDedupRecord(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); } @@ -67,13 +73,30 @@ public class SparkCreateDedupRecord extends AbstractSparkAction { DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity); final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity); - Class clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity)); - - DedupRecordFactory.createDedupRecord(spark, mergeRelPath, entityPath, clazz) + final Class clazz = + ModelSupport.entityTypes.get(EntityType.valueOf(subEntity)); + final DataInfo dataInfo = getDataInfo(dedupConf); + DedupRecordFactory.createDedupRecord(spark, dataInfo, mergeRelPath, entityPath, clazz) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath); } } + + private static DataInfo getDataInfo(DedupConfig dedupConf) { + DataInfo info = new DataInfo(); + info.setDeletedbyinference(false); + info.setInferred(true); + info.setInvisible(false); + info.setTrust(ROOT_TRUST); + info.setInferenceprovenance(dedupConf.getWf().getConfigurationId()); + Qualifier provenance = new Qualifier(); + provenance.setClassid(PROVENANCE_ACTION_CLASS); + provenance.setClassname(PROVENANCE_ACTION_CLASS); + provenance.setSchemeid(PROVENANCE_ACTIONS); + provenance.setSchemename(PROVENANCE_ACTIONS); + info.setProvenanceaction(provenance); + return info; + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java index 4baac0229..b89a0e7e2 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java @@ -25,7 +25,7 @@ public class ConnectedComponent implements Serializable { if (docIds.size() > 1) { final String s = getMin(); String prefix = s.split("\\|")[0]; - ccId = prefix + "|dedup_______::" + DedupUtility.md5(s); + ccId = prefix + "|dedup_wf_001::" + DedupUtility.md5(s); return ccId; } else { return docIds.iterator().next(); diff --git a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/ConnectedComponent.java b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/ConnectedComponent.java index 41d53944f..a5aa94e09 100644 --- a/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/ConnectedComponent.java +++ b/dhp-workflows/dhp-dedup-scholexplorer/src/main/java/eu/dnetlib/dedup/graph/ConnectedComponent.java @@ -25,7 +25,7 @@ public class ConnectedComponent implements Serializable { if (docIds.size() > 1) { final String s = getMin(); String prefix = s.split("\\|")[0]; - ccId = prefix + "|dedup_______::" + DedupUtility.md5(s); + ccId = prefix + "|dedup_wf_001::" + DedupUtility.md5(s); return ccId; } else { return docIds.iterator().next();