package eu.dnetlib.dhp.oa.dedup; import static org.apache.spark.sql.functions.col; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import scala.Tuple2; public class SparkPropagateRelation extends AbstractSparkAction { private static final Logger log = LoggerFactory.getLogger(SparkPropagateRelation.class); enum FieldType { SOURCE, TARGET } public SparkPropagateRelation(ArgumentApplicationParser parser, SparkSession spark) throws Exception { super(parser, spark); } public static void main(String[] args) throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( SparkCreateSimRels.class .getResourceAsStream( "/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json"))); parser.parseArgument(args); SparkConf conf = new SparkConf(); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.registerKryoClasses(ModelSupport.getOafModelClasses()); new SparkPropagateRelation(parser, getSparkSession(conf)) .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); } @Override public void run(ISLookUpService isLookUpService) { final String graphBasePath = parser.get("graphBasePath"); final String workingPath = parser.get("workingPath"); final String dedupGraphPath = parser.get("dedupGraphPath"); log.info("graphBasePath: '{}'", graphBasePath); log.info("workingPath: '{}'", workingPath); log.info("dedupGraphPath: '{}'", dedupGraphPath); final String outputRelationPath = DedupUtility.createEntityPath(dedupGraphPath, "relation"); removeOutputDir(spark, outputRelationPath); Dataset mergeRels = spark .read() .load(DedupUtility.createMergeRelPath(workingPath, "*", "*")) .as(Encoders.bean(Relation.class)); Dataset> mergedIds = mergeRels .where(col("relClass").equalTo("merges")) .select(col("source"), col("target")) .distinct() .map( (MapFunction>) r -> new Tuple2<>(r.getString(1), r.getString(0)), Encoders.tuple(Encoders.STRING(), Encoders.STRING())) .cache(); final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation"); Dataset rels = spark.read().textFile(relationPath).map(patchRelFn(), Encoders.bean(Relation.class)); Dataset newRels = processDataset( processDataset(rels, mergedIds, FieldType.SOURCE, getFixRelFn(FieldType.SOURCE)), mergedIds, FieldType.TARGET, getFixRelFn(FieldType.TARGET)) .filter(SparkPropagateRelation::containsDedup) .distinct(); Dataset updated = processDataset( processDataset(rels, mergedIds, FieldType.SOURCE, getDeletedFn()), mergedIds, FieldType.TARGET, getDeletedFn()); save( distinctRelations( newRels .union(updated) .union(mergeRels) .map((MapFunction) r -> r, Encoders.kryo(Relation.class))), outputRelationPath, SaveMode.Overwrite); } private Dataset distinctRelations(Dataset rels) { return rels .filter(getRelationFilterFunction()) .groupByKey( (MapFunction) r -> String .join(r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()), Encoders.STRING()) .agg(new RelationAggregator().toColumn()) .map((MapFunction, Relation>) t -> t._2(), Encoders.bean(Relation.class)); } private static Dataset processDataset( Dataset rels, Dataset> mergedIds, FieldType type, MapFunction, Tuple2>, Relation> mapFn) { final Dataset> mapped = rels .map( (MapFunction>) r -> new Tuple2<>(getId(r, type), r), Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class))); return mapped .joinWith(mergedIds, mapped.col("_1").equalTo(mergedIds.col("_1")), "left_outer") .map(mapFn, Encoders.bean(Relation.class)); } private FilterFunction getRelationFilterFunction() { return (FilterFunction) r -> StringUtils.isNotBlank(r.getSource()) || StringUtils.isNotBlank(r.getTarget()) || StringUtils.isNotBlank(r.getRelClass()) || StringUtils.isNotBlank(r.getSubRelType()) || StringUtils.isNotBlank(r.getRelClass()); } private static MapFunction patchRelFn() { return value -> { final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); if (rel.getDataInfo() == null) { rel.setDataInfo(new DataInfo()); } return rel; }; } private static String getId(Relation r, FieldType type) { switch (type) { case SOURCE: return r.getSource(); case TARGET: return r.getTarget(); default: throw new IllegalArgumentException(""); } } private static MapFunction, Tuple2>, Relation> getFixRelFn( FieldType type) { return value -> { if (value._2() != null) { Relation r = value._1()._2(); String id = value._2()._2(); if (r.getDataInfo() == null) { r.setDataInfo(new DataInfo()); } r.getDataInfo().setDeletedbyinference(false); switch (type) { case SOURCE: r.setSource(id); return r; case TARGET: r.setTarget(id); return r; default: throw new IllegalArgumentException(""); } } return value._1()._2(); }; } private static MapFunction, Tuple2>, Relation> getDeletedFn() { return value -> { if (value._2() != null) { Relation r = value._1()._2(); if (r.getDataInfo() == null) { r.setDataInfo(new DataInfo()); } r.getDataInfo().setDeletedbyinference(true); return r; } return value._1()._2(); }; } private static boolean containsDedup(final Relation r) { return r.getSource().toLowerCase().contains("dedup") || r.getTarget().toLowerCase().contains("dedup"); } }