adopted dedup to the new schema

This commit is contained in:
Sandro La Bruzzo 2020-07-31 09:06:57 +02:00
parent c97c8f0c44
commit 168bfb496a
8 changed files with 150 additions and 69 deletions

View File

@ -289,4 +289,12 @@ public class JsonPathTest {
System.out.println("d = " + d);
}
@Test
public void testNull() throws Exception {
final Object p = null;
System.out.println((String) p);
}
}

View File

@ -6,6 +6,7 @@ import java.util.Collection;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
@ -15,6 +16,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset;
import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2;
@ -30,10 +33,16 @@ public class DedupRecordFactory {
final DedupConfig dedupConf) {
long ts = System.currentTimeMillis();
// <id, json_entity>
final JavaPairRDD<String, String> inputJsonEntities = sc
.textFile(entitiesInputPath)
final JavaPairRDD<String, String> inputJsonEntities = spark
.read()
.load(entitiesInputPath)
.as(Encoders.kryo(Oaf.class))
.map(
(MapFunction<Oaf, String>) p -> new org.codehaus.jackson.map.ObjectMapper().writeValueAsString(p),
Encoders.STRING())
.javaRDD()
.mapToPair(
(PairFunction<String, String, String>) it -> new Tuple2<String, String>(
(PairFunction<String, String, String>) it -> new Tuple2<>(
MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), it), it));
// <source, target>: source is the dedup_id, target is the id of the mergedIn
@ -74,9 +83,9 @@ public class DedupRecordFactory {
}
}
private static Publication publicationMerger(Tuple2<String, Iterable<String>> e, final long ts) {
private static DLIPublication publicationMerger(Tuple2<String, Iterable<String>> e, final long ts) {
Publication p = new Publication(); // the result of the merge, to be returned at the end
DLIPublication p = new DLIPublication(); // the result of the merge, to be returned at the end
p.setId(e._1());
@ -110,9 +119,9 @@ public class DedupRecordFactory {
return p;
}
private static Dataset datasetMerger(Tuple2<String, Iterable<String>> e, final long ts) {
private static DLIDataset datasetMerger(Tuple2<String, Iterable<String>> e, final long ts) {
Dataset d = new Dataset(); // the result of the merge, to be returned at the end
DLIDataset d = new DLIDataset(); // the result of the merge, to be returned at the end
d.setId(e._1());

View File

@ -9,18 +9,21 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.graphx.Edge;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.hash.Hashing;
import eu.dnetlib.dedup.graph.ConnectedComponent;
import eu.dnetlib.dedup.graph.GraphProcessor;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.util.MapDocumentUtil;
@ -42,7 +45,6 @@ public class SparkCreateConnectedComponent {
.master(parser.get("master"))
.getOrCreate();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final String inputPath = parser.get("sourcePath");
final String entity = parser.get("entity");
final String targetPath = parser.get("targetPath");
@ -50,8 +52,12 @@ public class SparkCreateConnectedComponent {
// DedupConfig.load(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf2.json")));
final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf"));
final JavaPairRDD<Object, String> vertexes = sc
.textFile(inputPath + "/" + entity)
final JavaPairRDD<Object, String> vertexes = spark
.read()
.load(inputPath + "/" + entity)
.as(Encoders.kryo(Oaf.class))
.map((MapFunction<Oaf, String>) p -> new ObjectMapper().writeValueAsString(p), Encoders.STRING())
.javaRDD()
.map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s))
.mapToPair(
(PairFunction<String, Object, String>) s -> new Tuple2<Object, String>(getHashcode(s), s));

View File

@ -4,10 +4,10 @@ package eu.dnetlib.dedup;
import org.apache.commons.io.IOUtils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.pace.config.DedupConfig;
@ -41,12 +41,19 @@ public class SparkCreateDedupRecord {
DedupUtility.createEntityPath(sourcePath, entity),
OafEntityType.valueOf(entity),
dedupConf);
dedupRecord
.map(
r -> {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(r);
})
.saveAsTextFile(dedupPath + "/" + entity + "/dedup_records");
spark
.createDataset(dedupRecord.rdd(), Encoders.kryo(OafEntity.class))
.write()
.mode(SaveMode.Overwrite)
.save(dedupPath + "/" + entity + "/dedup_records");
//
//
// dedupRecord
// .map(
// r -> {
// ObjectMapper mapper = new ObjectMapper();
// return mapper.writeValueAsString(r);
// })
// .saveAsTextFile(dedupPath + "/" + entity + "/dedup_records");
}
}

View File

@ -7,10 +7,13 @@ import org.apache.commons.io.IOUtils;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.codehaus.jackson.map.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.MapDocument;
@ -46,8 +49,12 @@ public class SparkCreateSimRels {
// DedupConfig.load(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf"));
JavaPairRDD<String, MapDocument> mapDocument = sc
.textFile(inputPath + "/" + entity)
JavaPairRDD<String, MapDocument> mapDocument = spark
.read()
.load(inputPath + "/" + entity)
.as(Encoders.kryo(Oaf.class))
.map((MapFunction<Oaf, String>) p -> new ObjectMapper().writeValueAsString(p), Encoders.STRING())
.javaRDD()
.mapToPair(
s -> {
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);

View File

@ -14,16 +14,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.scholexplorer.DLIRelation;
import eu.dnetlib.dhp.utils.DHPUtils;
import scala.Tuple2;
public class SparkPropagateRelationsJob {
enum FieldType {
SOURCE, TARGET
}
static final String SOURCEJSONPATH = "$.source";
static final String TARGETJSONPATH = "$.target";
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -39,7 +34,6 @@ public class SparkPropagateRelationsJob {
.master(parser.get("master"))
.getOrCreate();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final String relationPath = parser.get("relationPath");
final String mergeRelPath = parser.get("mergeRelPath");
final String targetRelPath = parser.get("targetRelPath");
@ -50,63 +44,38 @@ public class SparkPropagateRelationsJob {
.as(Encoders.bean(Relation.class))
.where("relClass == 'merges'");
final Dataset<Relation> rels = spark.read().load(relationPath).as(Encoders.bean(Relation.class));
final Dataset<DLIRelation> rels = spark
.read()
.load(relationPath)
.as(Encoders.kryo(DLIRelation.class))
.map(
(MapFunction<DLIRelation, DLIRelation>) r -> r,
Encoders.bean(DLIRelation.class));
final Dataset<Relation> firstJoin = rels
final Dataset<DLIRelation> firstJoin = rels
.joinWith(merge, merge.col("target").equalTo(rels.col("source")), "left_outer")
.map(
(MapFunction<Tuple2<Relation, Relation>, Relation>) r -> {
(MapFunction<Tuple2<DLIRelation, Relation>, DLIRelation>) r -> {
final Relation mergeRelation = r._2();
final Relation relation = r._1();
final DLIRelation relation = r._1();
if (mergeRelation != null)
relation.setSource(mergeRelation.getSource());
return relation;
},
Encoders.bean(Relation.class));
Encoders.bean(DLIRelation.class));
final Dataset<Relation> secondJoin = firstJoin
final Dataset<DLIRelation> secondJoin = firstJoin
.joinWith(merge, merge.col("target").equalTo(firstJoin.col("target")), "left_outer")
.map(
(MapFunction<Tuple2<Relation, Relation>, Relation>) r -> {
(MapFunction<Tuple2<DLIRelation, Relation>, DLIRelation>) r -> {
final Relation mergeRelation = r._2();
final Relation relation = r._1();
final DLIRelation relation = r._1();
if (mergeRelation != null)
relation.setTarget(mergeRelation.getSource());
return relation;
},
Encoders.bean(Relation.class));
Encoders.kryo(DLIRelation.class));
secondJoin.write().mode(SaveMode.Overwrite).save(targetRelPath);
}
private static boolean containsDedup(final String json) {
final String source = DHPUtils.getJPathString(SOURCEJSONPATH, json);
final String target = DHPUtils.getJPathString(TARGETJSONPATH, json);
return source.toLowerCase().contains("dedup") || target.toLowerCase().contains("dedup");
}
private static String replaceField(final String json, final String id, final FieldType type) {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
try {
Relation relation = mapper.readValue(json, Relation.class);
if (relation.getDataInfo() == null)
relation.setDataInfo(new DataInfo());
relation.getDataInfo().setDeletedbyinference(false);
switch (type) {
case SOURCE:
relation.setSource(id);
return mapper.writeValueAsString(relation);
case TARGET:
relation.setTarget(id);
return mapper.writeValueAsString(relation);
default:
throw new IllegalArgumentException("");
}
} catch (IOException e) {
throw new RuntimeException("unable to deserialize json relation: " + json, e);
}
}
}

View File

@ -0,0 +1,75 @@
package eu.dnetlib.dedup.sx
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Oaf, OafEntity, Relation}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, DLIUnknown, OafUtils}
import org.apache.commons.io.IOUtils
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
import org.apache.spark.sql.functions.col
object SparkUpdateEntityWithDedupInfo {
def main(args: Array[String]): Unit = {
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkUpdateEntityWithDedupInfo.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json")))
val logger = LoggerFactory.getLogger(SparkUpdateEntityWithDedupInfo.getClass)
parser.parseArgument(args)
val workingPath: String = parser.get("workingPath")
logger.info(s"Working dir path = $workingPath")
implicit val oafEncoder: Encoder[OafEntity] = Encoders.kryo[OafEntity]
implicit val relEncoder: Encoder[Relation] = Encoders.bean(classOf[Relation])
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset]
implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown]
implicit val dlirelEncoder: Encoder[DLIRelation] = Encoders.kryo[DLIRelation]
val spark: SparkSession = SparkSession
.builder()
.appName(SparkUpdateEntityWithDedupInfo.getClass.getSimpleName)
.master(parser.get("master"))
.getOrCreate()
val entityPath = parser.get("entityPath")
val mergeRelPath = parser.get("mergeRelPath")
val dedupRecordPath = parser.get("dedupRecordPath")
val entity = parser.get("entity")
val destination = parser.get("targetPath")
val mergedIds = spark.read.load(mergeRelPath).as[Relation]
.where("relClass == 'merges'")
.select(col("target"))
val entities: Dataset[(String, OafEntity)] = spark
.read
.load(entityPath).as[OafEntity]
.map(o => (o.getId, o))(Encoders.tuple(Encoders.STRING, oafEncoder))
val finalDataset:Dataset[OafEntity] = entities.joinWith(mergedIds, entities("_1").equalTo(mergedIds("target")), "left")
.map(k => {
val e: OafEntity = k._1._2
val t = k._2
if (t != null && t.getString(0).nonEmpty) {
if (e.getDataInfo == null) {
e.setDataInfo(OafUtils.generateDataInfo())
}
e.getDataInfo.setDeletedbyinference(true)
}
e
})
val dedupRecords :Dataset[OafEntity] = spark.read.load(dedupRecordPath).as[OafEntity]
finalDataset.union(dedupRecords)
.repartition(1200).write
.mode(SaveMode.Overwrite).save(destination)
}
}

View File

@ -144,7 +144,7 @@
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Update ${entity} and add DedupRecord</name>
<class>eu.dnetlib.dedup.sx.SparkUpdateEntityJob</class>
<class>eu.dnetlib.dedup.sx.SparkUpdateEntityWithDedupInfo</class>
<jar>dhp-dedup-scholexplorer-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}