diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java index d8de48946..1f9f84c55 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java @@ -4,8 +4,6 @@ import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.model.MapDocument; import eu.dnetlib.pace.util.BlockProcessor; import eu.dnetlib.pace.util.MapDocumentUtil; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -13,59 +11,32 @@ import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.util.LongAccumulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Serializable; import scala.Tuple2; -import java.util.*; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; import java.util.stream.Collectors; public class Deduper implements Serializable { - private static final Log log = LogFactory.getLog(Deduper.class); - - /** - * @return the list of relations generated by the deduplication - * @param: the spark context - * @param: list of JSON entities to be deduped - * @param: the dedup configuration - */ - public static JavaPairRDD dedup(JavaSparkContext context, JavaRDD entities, DedupConfig config) { + private static final Logger log = LoggerFactory.getLogger(Deduper.class); + public static JavaPairRDD computeRelations(JavaSparkContext context, JavaPairRDD> blocks, DedupConfig config) { Map accumulators = DedupUtility.constructAccumulator(config, context.sc()); - //create vertexes of the graph: - JavaPairRDD mapDocs = mapToVertexes(context, entities, config); - - - //create blocks for deduplication - JavaPairRDD> blocks = createBlocks(context, mapDocs, config); - - //create relations by comparing only elements in the same group - return computeRelations(context, blocks, config); - -// final RDD> edgeRdd = relationRDD.map(it -> new Edge<>(it._1().hashCode(), it._2().hashCode(), "equalTo")).rdd(); -// -// RDD> vertexes = mapDocs.mapToPair((PairFunction, Object, MapDocument>) t -> new Tuple2((long) t._1().hashCode(), t._2())).rdd(); -// accumulators.forEach((name, acc) -> log.info(name + " -> " + acc.value())); -// -// return GraphProcessor.findCCs(vertexes, edgeRdd, 20).toJavaRDD(); - } - - /** - * @return the list of relations generated by the deduplication - * @param: the spark context - * @param: list of blocks - * @param: the dedup configuration - */ - public static JavaPairRDD computeRelations(JavaSparkContext context, JavaPairRDD> blocks, DedupConfig config) { - - Map accumulators = DedupUtility.constructAccumulator(config, context.sc()); - - return blocks.flatMapToPair((PairFlatMapFunction>, String, String>) it -> { - final SparkReporter reporter = new SparkReporter(accumulators); - new BlockProcessor(config).process(it._1(), it._2(), reporter); - return reporter.getRelations().iterator(); - + return blocks.flatMapToPair((PairFlatMapFunction>, String, String>) it -> { + try { + final SparkReporter reporter = new SparkReporter(accumulators); + new BlockProcessor(config).processSortedBlock(it._1(), it._2(), reporter); + return reporter.getRelations().iterator(); + } catch (Exception e) { + throw new RuntimeException(it._2().get(0).getIdentifier(), e); + } }).mapToPair( (PairFunction, String, Tuple2>) item -> new Tuple2>(item._1() + item._2(), item)) @@ -73,30 +44,7 @@ public class Deduper implements Serializable { .mapToPair((PairFunction>, String, String>) Tuple2::_2); } - - /** - * @return the list of blocks based on clustering of dedup configuration - * @param: the spark context - * @param: list of entities: - * @param: the dedup configuration - */ - public static JavaPairRDD> createBlocks(JavaSparkContext context, JavaPairRDD mapDocs, DedupConfig config) { - return mapDocs - //the reduce is just to be sure that we haven't document with same id - .reduceByKey((a, b) -> a) - .map(Tuple2::_2) - //Clustering: from to List - .flatMapToPair((PairFlatMapFunction) a -> - DedupUtility.getGroupingKeys(config, a) - .stream() - .map(it -> new Tuple2<>(it, a)) - .collect(Collectors.toList()) - .iterator()) - .groupByKey(); - } - - - public static JavaPairRDD> createsortedBlocks(JavaSparkContext context, JavaPairRDD mapDocs, DedupConfig config) { + public static JavaPairRDD> createSortedBlocks(JavaSparkContext context, JavaPairRDD mapDocs, DedupConfig config) { final String of = config.getWf().getOrderField(); final int maxQueueSize = config.getWf().getGroupMaxSize(); return mapDocs @@ -123,39 +71,4 @@ public class Deduper implements Serializable { return v1; }); } - - /** - * @return the list of vertexes: - * @param: the spark context - * @param: list of JSON entities - * @param: the dedup configuration - */ - public static JavaPairRDD mapToVertexes(JavaSparkContext context, JavaRDD entities, DedupConfig config) { - - return entities.mapToPair((PairFunction) s -> { - - MapDocument mapDocument = MapDocumentUtil.asMapDocumentWithJPath(config, s); - return new Tuple2(mapDocument.getIdentifier(), mapDocument); - - - }); - } - - public static JavaPairRDD computeRelations2(JavaSparkContext context, JavaPairRDD> blocks, DedupConfig config) { - Map accumulators = DedupUtility.constructAccumulator(config, context.sc()); - - return blocks.flatMapToPair((PairFlatMapFunction>, String, String>) it -> { - try { - final SparkReporter reporter = new SparkReporter(accumulators); - new BlockProcessor(config).processSortedBlock(it._1(), it._2(), reporter); - return reporter.getRelations().iterator(); - } catch (Exception e) { - throw new RuntimeException(it._2().get(0).getIdentifier(), e); - } - }).mapToPair( - (PairFunction, String, Tuple2>) item -> - new Tuple2>(item._1() + item._2(), item)) - .reduceByKey((a, b) -> a) - .mapToPair((PairFunction>, String, String>) Tuple2::_2); - } } \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java index 2c065253a..22e6f145e 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java @@ -8,21 +8,20 @@ import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.config.DedupConfig; import org.apache.commons.io.IOUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import org.dom4j.DocumentException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; public class SparkCreateDedupRecord extends AbstractSparkAction { - private static final Log log = LogFactory.getLog(SparkCreateDedupRecord.class); + private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class); - public SparkCreateDedupRecord(ArgumentApplicationParser parser, SparkSession spark) throws Exception { + public SparkCreateDedupRecord(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index 1425422f8..fb607a87e 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -11,8 +11,6 @@ import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.util.MapDocumentUtil; import org.apache.commons.io.IOUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -24,6 +22,8 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.dom4j.DocumentException; import scala.Tuple2; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -32,9 +32,9 @@ import java.util.List; public class SparkCreateMergeRels extends AbstractSparkAction { - private static final Log log = LogFactory.getLog(SparkCreateMergeRels.class); + private static final Logger log = LoggerFactory.getLogger(SparkCreateMergeRels.class); - public SparkCreateMergeRels(ArgumentApplicationParser parser, SparkSession spark) throws Exception { + public SparkCreateMergeRels(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index 0d07d1fe1..a2b7e7b5d 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -12,33 +12,26 @@ import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.model.MapDocument; import eu.dnetlib.pace.util.MapDocumentUtil; import org.apache.commons.io.IOUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Text; -import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; -import org.dom4j.Document; import org.dom4j.DocumentException; -import org.dom4j.Element; -import org.dom4j.io.SAXReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; import java.io.IOException; -import java.io.Serializable; -import java.io.StringReader; -import java.util.ArrayList; import java.util.List; public class SparkCreateSimRels extends AbstractSparkAction { - private static final Log log = LogFactory.getLog(SparkCreateSimRels.class); + private static final Logger log = LoggerFactory.getLogger(SparkCreateSimRels.class); - public SparkCreateSimRels(ArgumentApplicationParser parser, SparkSession spark) throws Exception { + public SparkCreateSimRels(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); } @@ -81,10 +74,10 @@ public class SparkCreateSimRels extends AbstractSparkAction { }); //create blocks for deduplication - JavaPairRDD> blocks = Deduper.createsortedBlocks(sc, mapDocument, dedupConf); + JavaPairRDD> blocks = Deduper.createSortedBlocks(sc, mapDocument, dedupConf); //create relations by comparing only elements in the same group - final JavaPairRDD dedupRels = Deduper.computeRelations2(sc, blocks, dedupConf); + final JavaPairRDD dedupRels = Deduper.computeRelations(sc, blocks, dedupConf); JavaRDD relationsRDD = dedupRels.map(r -> createSimRel(r._1(), r._2(), entity)); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java index f92fcf3ce..c2e1df78c 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java @@ -22,6 +22,8 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; import java.io.IOException; @@ -29,11 +31,11 @@ import java.io.Serializable; public class SparkUpdateEntity extends AbstractSparkAction { + private static final Logger log = LoggerFactory.getLogger(SparkUpdateEntity.class); + final String IDJSONPATH = "$.id"; - private static final Log log = LogFactory.getLog(SparkUpdateEntity.class); - - public SparkUpdateEntity(ArgumentApplicationParser parser, SparkSession spark) throws Exception { + public SparkUpdateEntity(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); }