forked from D-Net/dnet-hadoop
Fix ensure all relations are written out, not only those managed by dedup
This commit is contained in:
parent
0d7b2bf83d
commit
a860e19423
|
@ -8,6 +8,7 @@ import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
|||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
import org.apache.commons.beanutils.BeanUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
|
@ -20,6 +21,9 @@ import org.slf4j.LoggerFactory;
|
|||
import scala.Tuple2;
|
||||
import scala.Tuple3;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.apache.spark.sql.functions.col;
|
||||
|
@ -80,68 +84,51 @@ public class SparkPropagateRelation extends AbstractSparkAction {
|
|||
.distinct()
|
||||
.cache();
|
||||
|
||||
final String inputRelationPath = DedupUtility.createEntityPath(graphBasePath, "relation");
|
||||
Dataset<Row> allRels = spark.read()
|
||||
.schema(REL_BEAN_ENC.schema())
|
||||
.json(DedupUtility.createEntityPath(graphBasePath, "relation"));
|
||||
|
||||
Dataset<Relation> rels = spark.read().schema(REL_BEAN_ENC.schema()).json(inputRelationPath)
|
||||
.as(REL_BEAN_ENC)
|
||||
// .map((MapFunction<Relation, Relation>) rel -> {
|
||||
// if (rel.getDataInfo() == null) {
|
||||
// rel.setDataInfo(new DataInfo());
|
||||
// }
|
||||
// return rel;
|
||||
// }, REL_BEAN_ENC)
|
||||
;
|
||||
|
||||
Dataset<Tuple3<Relation, String, String>> dedupedRels = rels
|
||||
.joinWith(mergedIds, rels.col("source").equalTo(mergedIds.col("mergedObjectID")), "left_outer")
|
||||
Dataset<Relation> dedupedRels = allRels
|
||||
.joinWith(mergedIds, allRels.col("source").equalTo(mergedIds.col("mergedObjectID")), "left_outer")
|
||||
.joinWith(mergedIds, col("_1.target").equalTo(mergedIds.col("mergedObjectID")), "left_outer")
|
||||
.filter("_1._2 IS NOT NULL OR _2 IS NOT NULL")
|
||||
.select("_1._1", "_1._2.dedupID", "_2.dedupID")
|
||||
.as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING()))
|
||||
.cache();
|
||||
.flatMap(SparkPropagateRelation::addInferredRelations, REL_KRYO_ENC);
|
||||
|
||||
mergedIds.unpersist();
|
||||
Dataset<Relation> processedRelations = distinctRelations(dedupedRels.union(mergeRels.map((MapFunction<Relation, Relation>) r -> r, REL_KRYO_ENC)))
|
||||
.filter((FilterFunction<Relation>) r -> !Objects.equals(r.getSource(), r.getTarget()));
|
||||
|
||||
Dataset<Relation> newRels = dedupedRels
|
||||
.map((MapFunction<Tuple3<Relation, String, String>, Relation>) t -> {
|
||||
Relation r = t._1();
|
||||
String newSource = t._2();
|
||||
String newTarget = t._3();
|
||||
save(processedRelations, outputRelationPath, SaveMode.Overwrite);
|
||||
}
|
||||
|
||||
if (r.getDataInfo() == null) {
|
||||
r.setDataInfo(new DataInfo());
|
||||
}
|
||||
r.getDataInfo().setDeletedbyinference(false);
|
||||
private static Iterator<Relation> addInferredRelations(Tuple3<Relation, String, String> t) throws Exception {
|
||||
Relation existingRel = t._1();
|
||||
String newSource = t._2();
|
||||
String newTarget = t._3();
|
||||
|
||||
if (newSource != null)
|
||||
r.setSource(newSource);
|
||||
if (newSource == null && newTarget == null) {
|
||||
return Collections.singleton(t._1()).iterator();
|
||||
}
|
||||
|
||||
if (newTarget != null)
|
||||
r.setTarget(newTarget);
|
||||
// update existing relation
|
||||
if (existingRel.getDataInfo() == null) {
|
||||
existingRel.setDataInfo(new DataInfo());
|
||||
}
|
||||
existingRel.getDataInfo().setDeletedbyinference(true);
|
||||
|
||||
return r;
|
||||
}, REL_BEAN_ENC)
|
||||
.distinct();
|
||||
// Create new relation inferred by dedupIDs
|
||||
Relation inferredRel = (Relation) BeanUtils.cloneBean(existingRel);
|
||||
|
||||
Dataset<Relation> updated = dedupedRels
|
||||
.map((MapFunction<Tuple3<Relation, String, String>, Relation>) t -> {
|
||||
Relation r = t._1();
|
||||
if (r.getDataInfo() == null) {
|
||||
r.setDataInfo(new DataInfo());
|
||||
}
|
||||
r.getDataInfo().setDeletedbyinference(true);
|
||||
return r;
|
||||
}, REL_BEAN_ENC);
|
||||
inferredRel.setDataInfo((DataInfo) BeanUtils.cloneBean(existingRel.getDataInfo()));
|
||||
inferredRel.getDataInfo().setDeletedbyinference(false);
|
||||
|
||||
save(
|
||||
distinctRelations(
|
||||
newRels
|
||||
.union(updated)
|
||||
.union(mergeRels)
|
||||
.map((MapFunction<Relation, Relation>) r -> r, REL_KRYO_ENC)
|
||||
)
|
||||
.filter((FilterFunction<Relation>) r -> !Objects.equals(r.getSource(), r.getTarget())),
|
||||
outputRelationPath, SaveMode.Overwrite);
|
||||
if (newSource != null)
|
||||
inferredRel.setSource(newSource);
|
||||
|
||||
if (newTarget != null)
|
||||
inferredRel.setTarget(newTarget);
|
||||
|
||||
return Arrays.asList(existingRel, inferredRel).iterator();
|
||||
}
|
||||
|
||||
private Dataset<Relation> distinctRelations(Dataset<Relation> rels) {
|
||||
|
@ -156,8 +143,7 @@ public class SparkPropagateRelation extends AbstractSparkAction {
|
|||
return b;
|
||||
}
|
||||
)
|
||||
|
||||
.map((MapFunction<Tuple2<String, Relation>, Relation>) Tuple2::_2, REL_BEAN_ENC);
|
||||
.map((MapFunction<Tuple2<String, Relation>, Relation>) Tuple2::_2, REL_BEAN_ENC);
|
||||
}
|
||||
|
||||
private FilterFunction<Relation> getRelationFilterFunction() {
|
||||
|
|
Loading…
Reference in New Issue