forked from antonis.lempesis/dnet-hadoop
fixed dedup record id prefix, set the correct dataInfo in the DedupRecordFactory
This commit is contained in:
parent
ade4cb97af
commit
9ddafd46ca
|
@ -25,6 +25,7 @@ public class DedupRecordFactory {
|
||||||
|
|
||||||
public static <T extends OafEntity> Dataset<T> createDedupRecord(
|
public static <T extends OafEntity> Dataset<T> createDedupRecord(
|
||||||
final SparkSession spark,
|
final SparkSession spark,
|
||||||
|
final DataInfo dataInfo,
|
||||||
final String mergeRelsInputPath,
|
final String mergeRelsInputPath,
|
||||||
final String entitiesInputPath,
|
final String entitiesInputPath,
|
||||||
final Class<T> clazz) {
|
final Class<T> clazz) {
|
||||||
|
@ -67,41 +68,39 @@ public class DedupRecordFactory {
|
||||||
Encoders.STRING())
|
Encoders.STRING())
|
||||||
.mapGroups(
|
.mapGroups(
|
||||||
(MapGroupsFunction<String, Tuple2<String, T>, T>)
|
(MapGroupsFunction<String, Tuple2<String, T>, T>)
|
||||||
(key, values) -> entityMerger(key, values, ts, clazz),
|
(key, values) -> entityMerger(key, values, ts, dataInfo),
|
||||||
Encoders.bean(clazz));
|
Encoders.bean(clazz));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T extends OafEntity> T entityMerger(
|
private static <T extends OafEntity> T entityMerger(
|
||||||
String id, Iterator<Tuple2<String, T>> entities, long ts, Class<T> clazz) {
|
String id, Iterator<Tuple2<String, T>> entities, long ts, DataInfo dataInfo) {
|
||||||
try {
|
|
||||||
T entity = clazz.newInstance();
|
|
||||||
entity.setId(id);
|
|
||||||
entity.setDataInfo(new DataInfo());
|
|
||||||
entity.getDataInfo().setTrust("0.9");
|
|
||||||
entity.setLastupdatetimestamp(ts);
|
|
||||||
|
|
||||||
final Collection<String> dates = Lists.newArrayList();
|
T entity = entities.next()._2();
|
||||||
entities.forEachRemaining(
|
|
||||||
t -> {
|
|
||||||
T duplicate = t._2();
|
|
||||||
entity.mergeFrom(duplicate);
|
|
||||||
if (ModelSupport.isSubClass(duplicate, Result.class)) {
|
|
||||||
Result r1 = (Result) duplicate;
|
|
||||||
Result er = (Result) entity;
|
|
||||||
er.setAuthor(DedupUtility.mergeAuthor(er.getAuthor(), r1.getAuthor()));
|
|
||||||
|
|
||||||
if (er.getDateofacceptance() != null) {
|
final Collection<String> dates = Lists.newArrayList();
|
||||||
dates.add(r1.getDateofacceptance().getValue());
|
entities.forEachRemaining(
|
||||||
}
|
t -> {
|
||||||
|
T duplicate = t._2();
|
||||||
|
entity.mergeFrom(duplicate);
|
||||||
|
if (ModelSupport.isSubClass(duplicate, Result.class)) {
|
||||||
|
Result r1 = (Result) duplicate;
|
||||||
|
Result er = (Result) entity;
|
||||||
|
er.setAuthor(DedupUtility.mergeAuthor(er.getAuthor(), r1.getAuthor()));
|
||||||
|
|
||||||
|
if (r1.getDateofacceptance() != null) {
|
||||||
|
dates.add(r1.getDateofacceptance().getValue());
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
|
});
|
||||||
|
|
||||||
if (ModelSupport.isSubClass(entity, Result.class)) {
|
if (ModelSupport.isSubClass(entity, Result.class)) {
|
||||||
((Result) entity).setDateofacceptance(DatePicker.pick(dates));
|
((Result) entity).setDateofacceptance(DatePicker.pick(dates));
|
||||||
}
|
|
||||||
return entity;
|
|
||||||
} catch (IllegalAccessException | InstantiationException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
entity.setId(id);
|
||||||
|
entity.setLastupdatetimestamp(ts);
|
||||||
|
entity.setDataInfo(dataInfo);
|
||||||
|
|
||||||
|
return entity;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,9 @@ package eu.dnetlib.dhp.oa.dedup;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.schema.common.EntityType;
|
import eu.dnetlib.dhp.schema.common.EntityType;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
|
@ -21,6 +23,10 @@ public class SparkCreateDedupRecord extends AbstractSparkAction {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class);
|
private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class);
|
||||||
|
|
||||||
|
public static final String ROOT_TRUST = "0.8";
|
||||||
|
public static final String PROVENANCE_ACTION_CLASS = "sysimport:dedup";
|
||||||
|
public static final String PROVENANCE_ACTIONS = "dnet:provenanceActions";
|
||||||
|
|
||||||
public SparkCreateDedupRecord(ArgumentApplicationParser parser, SparkSession spark) {
|
public SparkCreateDedupRecord(ArgumentApplicationParser parser, SparkSession spark) {
|
||||||
super(parser, spark);
|
super(parser, spark);
|
||||||
}
|
}
|
||||||
|
@ -67,13 +73,30 @@ public class SparkCreateDedupRecord extends AbstractSparkAction {
|
||||||
DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity);
|
DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity);
|
||||||
final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity);
|
final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity);
|
||||||
|
|
||||||
Class<OafEntity> clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity));
|
final Class<OafEntity> clazz =
|
||||||
|
ModelSupport.entityTypes.get(EntityType.valueOf(subEntity));
|
||||||
DedupRecordFactory.createDedupRecord(spark, mergeRelPath, entityPath, clazz)
|
final DataInfo dataInfo = getDataInfo(dedupConf);
|
||||||
|
DedupRecordFactory.createDedupRecord(spark, dataInfo, mergeRelPath, entityPath, clazz)
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.json(outputPath);
|
.json(outputPath);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static DataInfo getDataInfo(DedupConfig dedupConf) {
|
||||||
|
DataInfo info = new DataInfo();
|
||||||
|
info.setDeletedbyinference(false);
|
||||||
|
info.setInferred(true);
|
||||||
|
info.setInvisible(false);
|
||||||
|
info.setTrust(ROOT_TRUST);
|
||||||
|
info.setInferenceprovenance(dedupConf.getWf().getConfigurationId());
|
||||||
|
Qualifier provenance = new Qualifier();
|
||||||
|
provenance.setClassid(PROVENANCE_ACTION_CLASS);
|
||||||
|
provenance.setClassname(PROVENANCE_ACTION_CLASS);
|
||||||
|
provenance.setSchemeid(PROVENANCE_ACTIONS);
|
||||||
|
provenance.setSchemename(PROVENANCE_ACTIONS);
|
||||||
|
info.setProvenanceaction(provenance);
|
||||||
|
return info;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,7 @@ public class ConnectedComponent implements Serializable {
|
||||||
if (docIds.size() > 1) {
|
if (docIds.size() > 1) {
|
||||||
final String s = getMin();
|
final String s = getMin();
|
||||||
String prefix = s.split("\\|")[0];
|
String prefix = s.split("\\|")[0];
|
||||||
ccId = prefix + "|dedup_______::" + DedupUtility.md5(s);
|
ccId = prefix + "|dedup_wf_001::" + DedupUtility.md5(s);
|
||||||
return ccId;
|
return ccId;
|
||||||
} else {
|
} else {
|
||||||
return docIds.iterator().next();
|
return docIds.iterator().next();
|
||||||
|
|
|
@ -25,7 +25,7 @@ public class ConnectedComponent implements Serializable {
|
||||||
if (docIds.size() > 1) {
|
if (docIds.size() > 1) {
|
||||||
final String s = getMin();
|
final String s = getMin();
|
||||||
String prefix = s.split("\\|")[0];
|
String prefix = s.split("\\|")[0];
|
||||||
ccId = prefix + "|dedup_______::" + DedupUtility.md5(s);
|
ccId = prefix + "|dedup_wf_001::" + DedupUtility.md5(s);
|
||||||
return ccId;
|
return ccId;
|
||||||
} else {
|
} else {
|
||||||
return docIds.iterator().next();
|
return docIds.iterator().next();
|
||||||
|
|
Loading…
Reference in New Issue