From 5e2fa996aa76b13b08d08127a4abdb25edb1e25a Mon Sep 17 00:00:00 2001 From: sandro Date: Fri, 17 Apr 2020 12:11:51 +0200 Subject: [PATCH 1/3] fixed problem with conversion of long into string --- .../dhp/schema/oaf/StructuredProperty.java | 2 +- .../provision/update/Datacite2Scholix.java | 10 ++++++++-- .../dhp/sx/synch/oozie_app/config-default.xml | 4 ++++ .../dhp/provision/DataciteClientTest.java | 19 ++++++++++++++----- 4 files changed, 27 insertions(+), 8 deletions(-) diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/StructuredProperty.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/StructuredProperty.java index f6c6b7335..5df6b80f3 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/StructuredProperty.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/StructuredProperty.java @@ -2,7 +2,7 @@ package eu.dnetlib.dhp.schema.oaf; import java.io.Serializable; -public class StructuredProperty implements Serializable { +public class StructuredProperty implements Serializable { private String value; diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/Datacite2Scholix.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/Datacite2Scholix.java index ac05a8350..fd2e37837 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/Datacite2Scholix.java +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/update/Datacite2Scholix.java @@ -34,8 +34,13 @@ public class Datacite2Scholix { ScholixResource resource = generateDataciteScholixResource(dJson); return relIds.stream().flatMap(s-> { - final List result = generateScholix(resource, s.get("relatedIdentifier"), s.get("relatedIdentifierType"), s.get("relationType"), updated); - return result.stream(); + try { + final List result = generateScholix(resource, ""+s.get("relatedIdentifier"), s.get("relatedIdentifierType"), s.get("relationType"), updated); + return result.stream(); + } catch (Throwable e) + { + return new ArrayList().stream(); + } }).collect(Collectors.toList()); } @@ -48,6 +53,7 @@ public class Datacite2Scholix { } private List generateScholix(ScholixResource source, final String pid, final String pidtype, final String relType, final String updated) { + if ("doi".equalsIgnoreCase(pidtype)) { ScholixResource target = new ScholixResource(); target.setIdentifier(Collections.singletonList(new ScholixIdentifier(pid, pidtype))); diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/synch/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/synch/oozie_app/config-default.xml index 6fb2a1253..7c1a43e51 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/synch/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/synch/oozie_app/config-default.xml @@ -7,4 +7,8 @@ oozie.action.sharelib.for.spark spark2 + + oozie.launcher.mapreduce.user.classpath.first + true + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/provision/DataciteClientTest.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/provision/DataciteClientTest.java index f4a480163..cc4b0047a 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/provision/DataciteClientTest.java +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/test/java/eu/dnetlib/dhp/provision/DataciteClientTest.java @@ -3,16 +3,12 @@ package eu.dnetlib.dhp.provision; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.provision.scholix.Scholix; import eu.dnetlib.dhp.provision.scholix.ScholixResource; -import eu.dnetlib.dhp.provision.update.CrossrefClient; -import eu.dnetlib.dhp.provision.update.Datacite2Scholix; -import eu.dnetlib.dhp.provision.update.DataciteClient; -import eu.dnetlib.dhp.provision.update.DataciteClientIterator; +import eu.dnetlib.dhp.provision.update.*; import eu.dnetlib.scholexplorer.relation.RelationMapper; import org.apache.commons.io.IOUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.io.IOException; import java.util.List; @@ -27,6 +23,19 @@ public class DataciteClientTest { System.out.println(new ObjectMapper().writeValueAsString(s)); } + +// public void testS() throws Exception { +// RetrieveUpdateFromDatacite.main(new String[]{ +// "-n", "file:///data/new_s2.txt", +// "-t", "/data/new_s2.txt", +// "-ts", "1586974078", +// "-ih", "ip-90-147-167-25.ct1.garrservices.it", +// "-in", "datacite", +// }); +// +// } + + public void testResolveDataset() throws Exception { DataciteClient dc = new DataciteClient("ip-90-147-167-25.ct1.garrservices.it"); ScholixResource datasetByDOI = dc.getDatasetByDOI("10.17182/hepdata.15392.v1/t5"); From 038ac7afd71a01efd02dca666c27dca8044d09e0 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 17 Apr 2020 13:12:44 +0200 Subject: [PATCH 2/3] relation consistency workflow separated from dedup scan and creation of CCs --- .../eu/dnetlib/dhp/common/HdfsSupport.java | 15 ++ .../PartitionActionSetsByPayloadTypeJob.java | 2 + .../dhp/oa/dedup/AbstractSparkAction.java | 9 ++ .../dhp/oa/dedup/SparkCreateDedupRecord.java | 20 +-- .../dhp/oa/dedup/SparkCreateMergeRels.java | 100 +++++++++---- .../dhp/oa/dedup/SparkCreateSimRels.java | 55 +++----- .../dhp/oa/dedup/SparkPropagateRelation.java | 5 +- .../dhp/oa/dedup/SparkUpdateEntity.java | 110 +++++++-------- .../dedup/consistency/oozie_app/workflow.xml | 133 ++++++++++++++---- .../dhp/oa/dedup/scan/oozie_app/workflow.xml | 44 ++++++ 10 files changed, 330 insertions(+), 163 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java index 05beaa51e..f6b94c921 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java @@ -22,6 +22,21 @@ public class HdfsSupport { private HdfsSupport() { } + /** + * Checks a path (file or dir) exists on HDFS. + * + * @param path Path to be checked + * @param configuration Configuration of hadoop env + */ + public static boolean exists(String path, Configuration configuration) { + logger.info("Removing path: {}", path); + return rethrowAsRuntimeException(() -> { + Path f = new Path(path); + FileSystem fileSystem = FileSystem.get(configuration); + return fileSystem.exists(f); + }); + } + /** * Removes a path (file or dir) from HDFS. * diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJob.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJob.java index 31a4da190..7c4d0616c 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJob.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJob.java @@ -99,6 +99,8 @@ public class PartitionActionSetsByPayloadTypeJob { List inputActionSetPaths, String outputPath) { inputActionSetPaths + .stream() + .filter(path -> HdfsSupport.exists(path, spark.sparkContext().hadoopConfiguration())) .forEach(inputActionSetPath -> { Dataset actionDS = readActionSetFromPath(spark, inputActionSetPath); saveActions(actionDS, outputPath); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java index 8549839fc..6213f342a 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java @@ -1,7 +1,9 @@ package eu.dnetlib.dhp.oa.dedup; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -21,6 +23,9 @@ import java.util.List; abstract class AbstractSparkAction implements Serializable { + protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + public ArgumentApplicationParser parser; //parameters for the spark action public SparkSession spark; //the spark session @@ -108,4 +113,8 @@ abstract class AbstractSparkAction implements Serializable { .config(conf) .getOrCreate(); } + + protected static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java index 22e6f145e..02cf6587a 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java @@ -1,6 +1,5 @@ package eu.dnetlib.dhp.oa.dedup; -import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.utils.ISLookupClientFactory; @@ -42,26 +41,27 @@ public class SparkCreateDedupRecord extends AbstractSparkAction { final String actionSetId = parser.get("actionSetId"); final String workingPath = parser.get("workingPath"); - System.out.println(String.format("graphBasePath: '%s'", graphBasePath)); - System.out.println(String.format("isLookUpUrl: '%s'", isLookUpUrl)); - System.out.println(String.format("actionSetId: '%s'", actionSetId)); - System.out.println(String.format("workingPath: '%s'", workingPath)); + log.info("graphBasePath: '{}'", graphBasePath); + log.info("isLookUpUrl: '{}'", isLookUpUrl); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); for (DedupConfig dedupConf: getConfigurations(isLookUpService, actionSetId)) { String subEntity = dedupConf.getWf().getSubEntityValue(); - System.out.println(String.format("Creating deduprecords for: '%s'", subEntity)); + log.info("Creating deduprecords for: '{}'", subEntity); + + final String outputPath = DedupUtility.createDedupRecordPath(workingPath, actionSetId, subEntity); + removeOutputDir(spark, outputPath); final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity); final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity); final OafEntityType entityType = OafEntityType.valueOf(subEntity); final JavaRDD dedupRecord = DedupRecordFactory.createDedupRecord(sc, spark, mergeRelPath, entityPath, entityType, dedupConf); - dedupRecord.map(r -> { - ObjectMapper mapper = new ObjectMapper(); - return mapper.writeValueAsString(r); - }).saveAsTextFile(DedupUtility.createDedupRecordPath(workingPath, actionSetId, subEntity)); + + dedupRecord.map(r -> OBJECT_MAPPER.writeValueAsString(r)).saveAsTextFile(outputPath); } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index fb607a87e..ef82b4eaf 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -4,6 +4,8 @@ import com.google.common.hash.Hashing; import eu.dnetlib.dhp.oa.dedup.graph.ConnectedComponent; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.dedup.graph.GraphProcessor; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; @@ -19,6 +21,7 @@ import org.apache.spark.graphx.Edge; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.dom4j.DocumentException; import scala.Tuple2; @@ -32,7 +35,9 @@ import java.util.List; public class SparkCreateMergeRels extends AbstractSparkAction { + public static final String PROVENANCE_ACTION_CLASS = "sysimport:dedup"; private static final Logger log = LoggerFactory.getLogger(SparkCreateMergeRels.class); + public static final String DNET_PROVENANCE_ACTIONS = "dnet:provenanceActions"; public SparkCreateMergeRels(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); @@ -44,7 +49,10 @@ public class SparkCreateMergeRels extends AbstractSparkAction { SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); parser.parseArgument(args); - new SparkCreateMergeRels(parser, getSparkSession(parser)).run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + final String isLookUpUrl = parser.get("isLookUpUrl"); + log.info("isLookupUrl {}", isLookUpUrl); + + new SparkCreateMergeRels(parser, getSparkSession(parser)).run(ISLookupClientFactory.getLookUpService(isLookUpUrl)); } @Override @@ -55,55 +63,91 @@ public class SparkCreateMergeRels extends AbstractSparkAction { final String isLookUpUrl = parser.get("isLookUpUrl"); final String actionSetId = parser.get("actionSetId"); - System.out.println(String.format("graphBasePath: '%s'", graphBasePath)); - System.out.println(String.format("isLookUpUrl: '%s'", isLookUpUrl)); - System.out.println(String.format("actionSetId: '%s'", actionSetId)); - System.out.println(String.format("workingPath: '%s'", workingPath)); + log.info("graphBasePath: '{}'", graphBasePath); + log.info("isLookUpUrl: '{}'", isLookUpUrl); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); for (DedupConfig dedupConf: getConfigurations(isLookUpService, actionSetId)) { final String subEntity = dedupConf.getWf().getSubEntityValue(); - System.out.println(String.format("Creating mergerels for: '%s'", subEntity)); + log.info("Creating mergerels for: '{}'", subEntity); + + final int maxIterations = dedupConf.getWf().getMaxIterations(); + log.info("Max iterations {}", maxIterations); + + final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity); final JavaPairRDD vertexes = sc.textFile(graphBasePath + "/" + subEntity) .map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s)) .mapToPair((PairFunction) - s -> new Tuple2(getHashcode(s), s) - ); + s -> new Tuple2<>(getHashcode(s), s)); - final Dataset similarityRelations = spark.read().load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)).as(Encoders.bean(Relation.class)); - final RDD> edgeRdd = similarityRelations.javaRDD().map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass())).rdd(); - final JavaRDD cc = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, dedupConf.getWf().getMaxIterations()).toJavaRDD(); - final Dataset mergeRelation = spark.createDataset(cc.filter(k -> k.getDocIds().size() > 1) - .flatMap(this::ccToMergeRel).rdd(), Encoders.bean(Relation.class)); + final Dataset similarityRelations = spark + .read() + .load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)) + .as(Encoders.bean(Relation.class)); - mergeRelation - .write().mode("overwrite") - .save(DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity)); + final RDD> edgeRdd = similarityRelations + .javaRDD() + .map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass())) + .rdd(); + + final RDD connectedComponents = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, maxIterations) + .toJavaRDD() + .filter(k -> k.getDocIds().size() > 1) + .flatMap(cc -> ccToMergeRel(cc, dedupConf)) + .rdd(); + + spark + .createDataset(connectedComponents, Encoders.bean(Relation.class)) + .write() + .mode(SaveMode.Append) + .save(mergeRelPath); } } - public Iterator ccToMergeRel(ConnectedComponent cc){ + public Iterator ccToMergeRel(ConnectedComponent cc, DedupConfig dedupConf){ return cc.getDocIds() .stream() .flatMap(id -> { List tmp = new ArrayList<>(); - Relation r = new Relation(); - r.setSource(cc.getCcId()); - r.setTarget(id); - r.setRelClass("merges"); - tmp.add(r); - r = new Relation(); - r.setTarget(cc.getCcId()); - r.setSource(id); - r.setRelClass("isMergedIn"); - tmp.add(r); + + tmp.add(rel(cc.getCcId(), id, "merges", dedupConf)); + tmp.add(rel(id, cc.getCcId(), "isMergedIn", dedupConf)); + return tmp.stream(); }).iterator(); } + private Relation rel(String source, String target, String relClass, DedupConfig dedupConf) { + Relation r = new Relation(); + r.setSource(source); + r.setTarget(target); + r.setRelClass(relClass); + r.setSubRelType("dedup"); + + DataInfo info = new DataInfo(); + info.setDeletedbyinference(false); + info.setInferred(true); + info.setInvisible(false); + info.setInferenceprovenance(dedupConf.getWf().getConfigurationId()); + Qualifier provenanceAction = new Qualifier(); + provenanceAction.setClassid(PROVENANCE_ACTION_CLASS); + provenanceAction.setClassname(PROVENANCE_ACTION_CLASS); + provenanceAction.setSchemeid(DNET_PROVENANCE_ACTIONS); + provenanceAction.setSchemename(DNET_PROVENANCE_ACTIONS); + info.setProvenanceaction(provenanceAction); + + //TODO calculate the trust value based on the similarity score of the elements in the CC + //info.setTrust(); + + r.setDataInfo(info); + return r; + } + public static long getHashcode(final String id) { return Hashing.murmur3_128().hashString(id).asLong(); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index a2b7e7b5d..de842d822 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; @@ -18,6 +19,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.dom4j.DocumentException; import org.slf4j.Logger; @@ -53,24 +55,27 @@ public class SparkCreateSimRels extends AbstractSparkAction { final String actionSetId = parser.get("actionSetId"); final String workingPath = parser.get("workingPath"); - System.out.println(String.format("graphBasePath: '%s'", graphBasePath)); - System.out.println(String.format("isLookUpUrl: '%s'", isLookUpUrl)); - System.out.println(String.format("actionSetId: '%s'", actionSetId)); - System.out.println(String.format("workingPath: '%s'", workingPath)); + log.info("graphBasePath: '{}'", graphBasePath); + log.info("isLookUpUrl: '{}'", isLookUpUrl); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); //for each dedup configuration for (DedupConfig dedupConf: getConfigurations(isLookUpService, actionSetId)) { final String entity = dedupConf.getWf().getEntityType(); final String subEntity = dedupConf.getWf().getSubEntityValue(); - System.out.println(String.format("Creating simrels for: '%s'", subEntity)); + log.info("Creating simrels for: '{}'", subEntity); + + final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity); + removeOutputDir(spark, outputPath); JavaPairRDD mapDocument = sc.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) .mapToPair((PairFunction) s -> { MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); - return new Tuple2(d.getIdentifier(), d); + return new Tuple2<>(d.getIdentifier(), d); }); //create blocks for deduplication @@ -84,46 +89,30 @@ public class SparkCreateSimRels extends AbstractSparkAction { //save the simrel in the workingdir spark.createDataset(relationsRDD.rdd(), Encoders.bean(Relation.class)) .write() - .mode("overwrite") - .save(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)); + .mode(SaveMode.Append) + .save(outputPath); } } - /** - * Utility method used to create an atomic action from a Relation object - * @param relation input relation - * @return A tuple2 with [id, json serialization of the atomic action] - * @throws JsonProcessingException - */ - public Tuple2 createSequenceFileRow(Relation relation) throws JsonProcessingException { - - ObjectMapper mapper = new ObjectMapper(); - - String id = relation.getSource() + "@" + relation.getRelClass() + "@" + relation.getTarget(); - AtomicAction aa = new AtomicAction<>(Relation.class, relation); - - return new Tuple2<>( - new Text(id), - new Text(mapper.writeValueAsString(aa)) - ); - } - - public Relation createSimRel(String source, String target, String entity){ + public Relation createSimRel(String source, String target, String entity) { final Relation r = new Relation(); r.setSource(source); r.setTarget(target); + r.setSubRelType("dedupSimilarity"); + r.setRelClass("isSimilarTo"); + r.setDataInfo(new DataInfo()); switch(entity){ case "result": - r.setRelClass("resultResult_dedupSimilarity_isSimilarTo"); + r.setRelType("resultResult"); break; case "organization": - r.setRelClass("organizationOrganization_dedupSimilarity_isSimilarTo"); + r.setRelType("organizationOrganization"); break; default: - r.setRelClass("isSimilarTo"); - break; + throw new IllegalArgumentException("unmanaged entity type: " + entity); } return r; } + } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index 4e012293b..371d70ba2 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -27,9 +27,6 @@ public class SparkPropagateRelation extends AbstractSparkAction { private static final Logger log = LoggerFactory.getLogger(SparkPropagateRelation.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - enum FieldType { SOURCE, TARGET @@ -62,7 +59,7 @@ public class SparkPropagateRelation extends AbstractSparkAction { log.info("dedupGraphPath: '{}'", dedupGraphPath); final String outputRelationPath = DedupUtility.createEntityPath(dedupGraphPath, "relation"); - deletePath(outputRelationPath); + removeOutputDir(spark, outputRelationPath); Dataset mergeRels = spark.read() .load(DedupUtility.createMergeRelPath(workingPath, "*", "*")) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java index c2e1df78c..5347489e7 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java @@ -3,17 +3,18 @@ package eu.dnetlib.dhp.oa.dedup; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.util.MapDocumentUtil; import org.apache.commons.io.IOUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -27,7 +28,7 @@ import org.slf4j.LoggerFactory; import scala.Tuple2; import java.io.IOException; -import java.io.Serializable; +import java.util.Map; public class SparkUpdateEntity extends AbstractSparkAction { @@ -48,98 +49,87 @@ public class SparkUpdateEntity extends AbstractSparkAction { new SparkUpdateEntity(parser, getSparkSession(parser)).run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); } - public boolean mergeRelExists(String basePath, String entity) throws IOException { - - boolean result = false; - - FileSystem fileSystem = FileSystem.get(new Configuration()); - - FileStatus[] fileStatuses = fileSystem.listStatus(new Path(basePath)); - - for (FileStatus fs : fileStatuses) { - if (fs.isDirectory()) - if (fileSystem.exists(new Path(DedupUtility.createMergeRelPath(basePath, fs.getPath().getName(), entity)))) - result = true; - } - - return result; - } - public void run(ISLookUpService isLookUpService) throws IOException { final String graphBasePath = parser.get("graphBasePath"); final String workingPath = parser.get("workingPath"); final String dedupGraphPath = parser.get("dedupGraphPath"); - System.out.println(String.format("graphBasePath: '%s'", graphBasePath)); - System.out.println(String.format("workingPath: '%s'", workingPath)); - System.out.println(String.format("dedupGraphPath: '%s'", dedupGraphPath)); + log.info("graphBasePath: '{}'", graphBasePath); + log.info("workingPath: '{}'", workingPath); + log.info("dedupGraphPath: '{}'", dedupGraphPath); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); //for each entity - for (OafEntityType entity: OafEntityType.values()) { + ModelSupport.entityTypes.forEach((entity, clazz) -> { + final String outputPath = dedupGraphPath + "/" + entity; + removeOutputDir(spark, outputPath); JavaRDD sourceEntity = sc.textFile(DedupUtility.createEntityPath(graphBasePath, entity.toString())); if (mergeRelExists(workingPath, entity.toString())) { - final Dataset rel = spark.read().load(DedupUtility.createMergeRelPath(workingPath, "*", entity.toString())).as(Encoders.bean(Relation.class)); + final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, "*", entity.toString()); + final String dedupRecordPath = DedupUtility.createDedupRecordPath(workingPath, "*", entity.toString()); + + final Dataset rel = spark.read() + .load(mergeRelPath) + .as(Encoders.bean(Relation.class)); final JavaPairRDD mergedIds = rel .where("relClass == 'merges'") .select(rel.col("target")) .distinct() .toJavaRDD() - .mapToPair((PairFunction) r -> new Tuple2(r.getString(0), "d")); + .mapToPair((PairFunction) r -> new Tuple2<>(r.getString(0), "d")); - final JavaRDD dedupEntity = sc.textFile(DedupUtility.createDedupRecordPath(workingPath, "*", entity.toString())); + JavaPairRDD entitiesWithId = sourceEntity + .mapToPair((PairFunction) s -> new Tuple2<>(MapDocumentUtil.getJPathString(IDJSONPATH, s), s)); - JavaPairRDD entitiesWithId = sourceEntity.mapToPair((PairFunction) s -> new Tuple2(MapDocumentUtil.getJPathString(IDJSONPATH, s), s)); - - JavaRDD map = entitiesWithId.leftOuterJoin(mergedIds).map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), getOafClass(entity)) : k._2()._1()); - sourceEntity = map.union(dedupEntity); + JavaRDD map = entitiesWithId + .leftOuterJoin(mergedIds) + .map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), clazz) : k._2()._1()); + sourceEntity = map.union(sc.textFile(dedupRecordPath)); } - sourceEntity.saveAsTextFile(dedupGraphPath + "/" + entity, GzipCodec.class); + sourceEntity.saveAsTextFile(outputPath, GzipCodec.class); - } + }); } - public Class getOafClass(OafEntityType className) { - switch (className.toString()) { - case "publication": - return Publication.class; - case "dataset": - return eu.dnetlib.dhp.schema.oaf.Dataset.class; - case "datasource": - return Datasource.class; - case "software": - return Software.class; - case "organization": - return Organization.class; - case "otherresearchproduct": - return OtherResearchProduct.class; - case "project": - return Project.class; - default: - throw new IllegalArgumentException("Illegal type " + className); - } - } + public boolean mergeRelExists(String basePath, String entity) { - private static String updateDeletedByInference(final String json, final Class clazz) { - final ObjectMapper mapper = new ObjectMapper(); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + boolean result = false; try { - Oaf entity = mapper.readValue(json, clazz); + FileSystem fileSystem = FileSystem.get(new Configuration()); + + FileStatus[] fileStatuses = fileSystem.listStatus(new Path(basePath)); + + for (FileStatus fs : fileStatuses) { + if (fs.isDirectory()) + if (fileSystem.exists(new Path(DedupUtility.createMergeRelPath(basePath, fs.getPath().getName(), entity)))) + result = true; + } + + return result; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static String updateDeletedByInference(final String json, final Class clazz) { + try { + Oaf entity = OBJECT_MAPPER.readValue(json, clazz); if (entity.getDataInfo()== null) entity.setDataInfo(new DataInfo()); entity.getDataInfo().setDeletedbyinference(true); - return mapper.writeValueAsString(entity); + return OBJECT_MAPPER.writeValueAsString(entity); } catch (IOException e) { throw new RuntimeException("Unable to convert json", e); } } + } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml index 69411701b..df6877dfa 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml @@ -68,38 +68,12 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - yarn - cluster - Update Entity - eu.dnetlib.dhp.oa.dedup.SparkUpdateEntity - dhp-dedup-openaire-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - - -mtyarn - --i${graphBasePath} - --w${workingPath} - --o${dedupGraphPath} - - - - - yarn @@ -122,9 +96,112 @@ --o${dedupGraphPath} --w${workingPath} - + + + + + + + + + + + + + + + + + -pb + ${graphBasePath}/datasource + ${dedupGraphPath}/datasource + + + + + + + + + + + -pb + ${graphBasePath}/project + ${dedupGraphPath}/project + + + + + + + + + + + -pb + ${graphBasePath}/organization + ${dedupGraphPath}/organization + + + + + + + + + + + -pb + ${graphBasePath}/publication + ${dedupGraphPath}/publication + + + + + + + + + + + -pb + ${graphBasePath}/dataset + ${dedupGraphPath}/dataset + + + + + + + + + + + -pb + ${graphBasePath}/software + ${dedupGraphPath}/software + + + + + + + + + + + -pb + ${graphBasePath}/otherresearchproduct + ${dedupGraphPath}/otherresearchproduct + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml index 6c74a204d..6790dde32 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml @@ -16,6 +16,10 @@ workingPath path for the working directory + + dedupGraphPath + path for the output graph + sparkDriverMemory memory for driver process @@ -146,6 +150,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 -mtyarn --i${graphBasePath} @@ -153,6 +158,45 @@ --la${isLookUpUrl} --asi${actionSetId} + + + + + + + yarn + cluster + Update Entity + eu.dnetlib.dhp.oa.dedup.SparkUpdateEntity + dhp-dedup-openaire-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + -mtyarn + --i${graphBasePath} + --w${workingPath} + --o${dedupGraphPath} + + + + + + + + + + + -pb + ${graphBasePath}/relation + ${dedupGraphPath}/relation + From 71813795f61a1ce23944d107145c4a994a7f9425 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Sat, 18 Apr 2020 12:06:23 +0200 Subject: [PATCH 3/3] various refactorings on the dnet-dedup-openaire workflow --- .../dhp/oa/dedup/AbstractSparkAction.java | 47 +-- .../dhp/oa/dedup/DedupRecordFactory.java | 303 ++++-------------- .../java/eu/dnetlib/dhp/oa/dedup/Deduper.java | 58 +--- .../dnetlib/dhp/oa/dedup/OafEntityType.java | 15 - .../dhp/oa/dedup/SparkCreateDedupRecord.java | 27 +- .../dhp/oa/dedup/SparkCreateMergeRels.java | 33 +- .../dhp/oa/dedup/SparkCreateSimRels.java | 45 ++- .../dhp/oa/dedup/SparkPropagateRelation.java | 35 +- .../dnetlib/dhp/oa/dedup/SparkReporter.java | 6 +- .../dhp/oa/dedup/SparkUpdateEntity.java | 24 +- .../eu/dnetlib/dhp/oa/dedup/model/Block.java | 76 +++++ .../dedup/consistency/oozie_app/workflow.xml | 1 - .../dhp/oa/dedup/createCC_parameters.json | 6 - .../dedup/createDedupRecord_parameters.json | 6 - .../oa/dedup/createSimRels_parameters.json | 6 - .../dedup/propagateRelation_parameters.json | 6 - .../dhp/oa/dedup/scan/oozie_app/workflow.xml | 4 - .../dhp/oa/dedup/updateEntity_parameters.json | 6 - 18 files changed, 262 insertions(+), 442 deletions(-) delete mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/OafEntityType.java create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Block.java diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java index 6213f342a..f52e4bb39 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java @@ -4,11 +4,14 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.config.DedupConfig; import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.dom4j.Document; import org.dom4j.DocumentException; @@ -73,47 +76,21 @@ abstract class AbstractSparkAction implements Serializable { abstract void run(ISLookUpService isLookUpService) throws DocumentException, IOException, ISLookUpException; - protected static SparkSession getSparkSession(ArgumentApplicationParser parser) { - SparkConf conf = new SparkConf(); - - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(new Class[] { - Author.class, - Context.class, - Country.class, - DataInfo.class, - Dataset.class, - Datasource.class, - ExternalReference.class, - ExtraInfo.class, - Field.class, - GeoLocation.class, - Instance.class, - Journal.class, - KeyValue.class, - Oaf.class, - OafEntity.class, - OAIProvenance.class, - Organization.class, - OriginDescription.class, - OtherResearchProduct.class, - Project.class, - Publication.class, - Qualifier.class, - Relation.class, - Result.class, - Software.class, - StructuredProperty.class - }); - + protected static SparkSession getSparkSession(SparkConf conf) { return SparkSession .builder() - .appName(SparkCreateSimRels.class.getSimpleName()) - .master(parser.get("master")) .config(conf) .getOrCreate(); } + protected static void save(Dataset dataset, String outPath, SaveMode mode) { + dataset + .write() + .option("compression", "gzip") + .mode(mode) + .json(outPath); + } + protected static void removeOutputDir(SparkSession spark, String path) { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index df64d1011..da16e6dff 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -1,274 +1,95 @@ package eu.dnetlib.dhp.oa.dedup; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.pace.config.DedupConfig; -import eu.dnetlib.pace.util.MapDocumentUtil; import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; -import org.codehaus.jackson.map.ObjectMapper; + import scala.Tuple2; import java.util.Collection; +import java.util.Iterator; public class DedupRecordFactory { - public static JavaRDD createDedupRecord(final JavaSparkContext sc, final SparkSession spark, final String mergeRelsInputPath, final String entitiesInputPath, final OafEntityType entityType, final DedupConfig dedupConf) { + protected static final ObjectMapper OBJECT_MAPPER = new com.fasterxml.jackson.databind.ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + public static Dataset createDedupRecord( + final SparkSession spark, final String mergeRelsInputPath, final String entitiesInputPath, final Class clazz, final DedupConfig dedupConf) { + long ts = System.currentTimeMillis(); + // - final JavaPairRDD inputJsonEntities = sc.textFile(entitiesInputPath) - .mapToPair((PairFunction) it -> - new Tuple2(MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), it), it) - ); + Dataset> entities = spark.read() + .textFile(entitiesInputPath) + .map((MapFunction>) it -> { + T entity = OBJECT_MAPPER.readValue(it, clazz); + return new Tuple2<>(entity.getId(), entity); + }, Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))); + //: source is the dedup_id, target is the id of the mergedIn - JavaPairRDD mergeRels = spark - .read().load(mergeRelsInputPath).as(Encoders.bean(Relation.class)) - .where("relClass=='merges'") - .javaRDD() - .mapToPair( - (PairFunction) r -> - new Tuple2(r.getTarget(), r.getSource()) - ); + Dataset> mergeRels = spark + .read() + .load(mergeRelsInputPath) + .as(Encoders.bean(Relation.class)) + .where("relClass == 'merges'") + .map((MapFunction>) + r -> new Tuple2<>(r.getSource(), r.getTarget()), Encoders.tuple(Encoders.STRING(), Encoders.STRING())); // - final JavaPairRDD joinResult = mergeRels.join(inputJsonEntities).mapToPair((PairFunction>, String, String>) Tuple2::_2); - - JavaPairRDD> sortedJoinResult = joinResult.groupByKey(); - - switch (entityType) { - case publication: - return sortedJoinResult.map(p -> DedupRecordFactory.publicationMerger(p, ts)); - case dataset: - return sortedJoinResult.map(d -> DedupRecordFactory.datasetMerger(d, ts)); - case project: - return sortedJoinResult.map(p -> DedupRecordFactory.projectMerger(p, ts)); - case software: - return sortedJoinResult.map(s -> DedupRecordFactory.softwareMerger(s, ts)); - case datasource: - return sortedJoinResult.map(d -> DedupRecordFactory.datasourceMerger(d, ts)); - case organization: - return sortedJoinResult.map(o -> DedupRecordFactory.organizationMerger(o, ts)); - case otherresearchproduct: - return sortedJoinResult.map(o -> DedupRecordFactory.otherresearchproductMerger(o, ts)); - default: - return null; - } - + return mergeRels.joinWith(entities, mergeRels.col("_1").equalTo(entities.col("_1")), "left_outer") + .filter((FilterFunction, Tuple2>>) value -> value._2() != null) + .map((MapFunction, Tuple2>, T>) + value -> value._2()._2(), Encoders.kryo(clazz)) + .groupByKey((MapFunction) value -> value.getId(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) + (key, values) -> entityMerger(key, values, ts, clazz), Encoders.bean(clazz)); } - private static Publication publicationMerger(Tuple2> e, final long ts) { + private static T entityMerger(String id, Iterator entities, final long ts, Class clazz) { + try { + T entity = clazz.newInstance(); + entity.setId(id); + if (entity.getDataInfo() == null) { + entity.setDataInfo(new DataInfo()); + } + entity.getDataInfo().setTrust("0.9"); + entity.setLastupdatetimestamp(ts); - Publication p = new Publication(); //the result of the merge, to be returned at the end + final Collection dates = Lists.newArrayList(); + entities.forEachRemaining(e -> { + entity.mergeFrom(e); + if (ModelSupport.isSubClass(e, Result.class)) { + Result r1 = (Result) e; + Result er = (Result) entity; + er.setAuthor(DedupUtility.mergeAuthor(er.getAuthor(), r1.getAuthor())); - p.setId(e._1()); - - final ObjectMapper mapper = new ObjectMapper(); - - final Collection dateofacceptance = Lists.newArrayList(); - - if (e._2() != null) - e._2().forEach(pub -> { - try { - Publication publication = mapper.readValue(pub, Publication.class); - - p.mergeFrom(publication); - p.setAuthor(DedupUtility.mergeAuthor(p.getAuthor(), publication.getAuthor())); - //add to the list if they are not null - if (publication.getDateofacceptance() != null) - dateofacceptance.add(publication.getDateofacceptance().getValue()); - } catch (Exception exc) { - throw new RuntimeException(exc); - } - }); - p.setDateofacceptance(DatePicker.pick(dateofacceptance)); - if (p.getDataInfo() == null) - p.setDataInfo(new DataInfo()); - p.getDataInfo().setTrust("0.9"); - p.setLastupdatetimestamp(ts); - return p; - } - - private static Dataset datasetMerger(Tuple2> e, final long ts) { - - Dataset d = new Dataset(); //the result of the merge, to be returned at the end - - d.setId(e._1()); - - final ObjectMapper mapper = new ObjectMapper(); - - final Collection dateofacceptance = Lists.newArrayList(); - - if (e._2() != null) - e._2().forEach(dat -> { - try { - Dataset dataset = mapper.readValue(dat, Dataset.class); - - d.mergeFrom(dataset); - d.setAuthor(DedupUtility.mergeAuthor(d.getAuthor(), dataset.getAuthor())); - //add to the list if they are not null - if (dataset.getDateofacceptance() != null) - dateofacceptance.add(dataset.getDateofacceptance().getValue()); - } catch (Exception exc) { - throw new RuntimeException(exc); - } - }); - d.setDateofacceptance(DatePicker.pick(dateofacceptance)); - if (d.getDataInfo() == null) - d.setDataInfo(new DataInfo()); - d.getDataInfo().setTrust("0.9"); - d.setLastupdatetimestamp(ts); - return d; - } - - private static Project projectMerger(Tuple2> e, final long ts) { - - Project p = new Project(); //the result of the merge, to be returned at the end - - p.setId(e._1()); - - final ObjectMapper mapper = new ObjectMapper(); - if (e._2() != null) - e._2().forEach(proj -> { - try { - Project project = mapper.readValue(proj, Project.class); - - p.mergeFrom(project); - } catch (Exception exc) { - throw new RuntimeException(exc); - } - }); - if (p.getDataInfo() == null) - p.setDataInfo(new DataInfo()); - p.getDataInfo().setTrust("0.9"); - p.setLastupdatetimestamp(ts); - return p; - } - - private static Software softwareMerger(Tuple2> e, final long ts) { - - Software s = new Software(); //the result of the merge, to be returned at the end - - s.setId(e._1()); - final ObjectMapper mapper = new ObjectMapper(); - final Collection dateofacceptance = Lists.newArrayList(); - if (e._2() != null) - e._2().forEach(soft -> { - try { - Software software = mapper.readValue(soft, Software.class); - - s.mergeFrom(software); - s.setAuthor(DedupUtility.mergeAuthor(s.getAuthor(), software.getAuthor())); - //add to the list if they are not null - if (software.getDateofacceptance() != null) - dateofacceptance.add(software.getDateofacceptance().getValue()); - } catch (Exception exc) { - throw new RuntimeException(exc); - } - }); - s.setDateofacceptance(DatePicker.pick(dateofacceptance)); - if (s.getDataInfo() == null) - s.setDataInfo(new DataInfo()); - s.getDataInfo().setTrust("0.9"); - s.setLastupdatetimestamp(ts); - return s; - } - - private static Datasource datasourceMerger(Tuple2> e, final long ts) { - Datasource d = new Datasource(); //the result of the merge, to be returned at the end - d.setId(e._1()); - final ObjectMapper mapper = new ObjectMapper(); - if (e._2() != null) - e._2().forEach(dat -> { - try { - Datasource datasource = mapper.readValue(dat, Datasource.class); - - d.mergeFrom(datasource); - } catch (Exception exc) { - throw new RuntimeException(exc); - } - }); - if (d.getDataInfo() == null) - d.setDataInfo(new DataInfo()); - d.getDataInfo().setTrust("0.9"); - d.setLastupdatetimestamp(ts); - return d; - } - - private static Organization organizationMerger(Tuple2> e, final long ts) { - - Organization o = new Organization(); //the result of the merge, to be returned at the end - - o.setId(e._1()); - - final ObjectMapper mapper = new ObjectMapper(); - - - StringBuilder trust = new StringBuilder("0.0"); - - if (e._2() != null) - e._2().forEach(pub -> { - try { - Organization organization = mapper.readValue(pub, Organization.class); - - final String currentTrust = organization.getDataInfo().getTrust(); - if (!"1.0".equals(currentTrust)) { - trust.setLength(0); - trust.append(currentTrust); + if (er.getDateofacceptance() != null) { + dates.add(r1.getDateofacceptance().getValue()); } - o.mergeFrom(organization); - - } catch (Exception exc) { - throw new RuntimeException(exc); } }); - if (o.getDataInfo() == null) - { - o.setDataInfo(new DataInfo()); + if (ModelSupport.isSubClass(entity, Result.class)) { + ((Result) entity).setDateofacceptance(DatePicker.pick(dates)); + } + return entity; + } catch (IllegalAccessException | InstantiationException e) { + throw new RuntimeException(e); } - if (o.getDataInfo() == null) - o.setDataInfo(new DataInfo()); - o.getDataInfo().setTrust("0.9"); - o.setLastupdatetimestamp(ts); - - return o; - } - - private static OtherResearchProduct otherresearchproductMerger(Tuple2> e, final long ts) { - - OtherResearchProduct o = new OtherResearchProduct(); //the result of the merge, to be returned at the end - - o.setId(e._1()); - - final ObjectMapper mapper = new ObjectMapper(); - - final Collection dateofacceptance = Lists.newArrayList(); - - if (e._2() != null) - e._2().forEach(orp -> { - try { - OtherResearchProduct otherResearchProduct = mapper.readValue(orp, OtherResearchProduct.class); - - o.mergeFrom(otherResearchProduct); - o.setAuthor(DedupUtility.mergeAuthor(o.getAuthor(), otherResearchProduct.getAuthor())); - //add to the list if they are not null - if (otherResearchProduct.getDateofacceptance() != null) - dateofacceptance.add(otherResearchProduct.getDateofacceptance().getValue()); - } catch (Exception exc) { - throw new RuntimeException(exc); - } - }); - if (o.getDataInfo() == null) - o.setDataInfo(new DataInfo()); - o.setDateofacceptance(DatePicker.pick(dateofacceptance)); - o.getDataInfo().setTrust("0.9"); - o.setLastupdatetimestamp(ts); - return o; } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java index 1f9f84c55..28b85853f 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java @@ -1,74 +1,50 @@ package eu.dnetlib.dhp.oa.dedup; +import eu.dnetlib.dhp.oa.dedup.model.Block; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.model.MapDocument; import eu.dnetlib.pace.util.BlockProcessor; -import eu.dnetlib.pace.util.MapDocumentUtil; import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFlatMapFunction; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.util.LongAccumulator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.Serializable; import scala.Tuple2; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; import java.util.Map; import java.util.stream.Collectors; public class Deduper implements Serializable { - private static final Logger log = LoggerFactory.getLogger(Deduper.class); - - public static JavaPairRDD computeRelations(JavaSparkContext context, JavaPairRDD> blocks, DedupConfig config) { + public static JavaPairRDD computeRelations(JavaSparkContext context, JavaPairRDD blocks, DedupConfig config) { Map accumulators = DedupUtility.constructAccumulator(config, context.sc()); - return blocks.flatMapToPair((PairFlatMapFunction>, String, String>) it -> { - try { - final SparkReporter reporter = new SparkReporter(accumulators); - new BlockProcessor(config).processSortedBlock(it._1(), it._2(), reporter); - return reporter.getRelations().iterator(); - } catch (Exception e) { - throw new RuntimeException(it._2().get(0).getIdentifier(), e); - } - }).mapToPair( - (PairFunction, String, Tuple2>) item -> - new Tuple2>(item._1() + item._2(), item)) + return blocks + .flatMapToPair(it -> { + final SparkReporter reporter = new SparkReporter(accumulators); + new BlockProcessor(config).processSortedBlock(it._1(), it._2().getDocuments(), reporter); + return reporter.getRelations().iterator(); + }) + .mapToPair(it -> new Tuple2<>(it._1() + it._2(), it)) .reduceByKey((a, b) -> a) - .mapToPair((PairFunction>, String, String>) Tuple2::_2); + .mapToPair(Tuple2::_2); } - public static JavaPairRDD> createSortedBlocks(JavaSparkContext context, JavaPairRDD mapDocs, DedupConfig config) { + public static JavaPairRDD createSortedBlocks(JavaPairRDD mapDocs, DedupConfig config) { final String of = config.getWf().getOrderField(); final int maxQueueSize = config.getWf().getGroupMaxSize(); + return mapDocs //the reduce is just to be sure that we haven't document with same id .reduceByKey((a, b) -> a) .map(Tuple2::_2) //Clustering: from to List - .flatMapToPair((PairFlatMapFunction>) a -> - DedupUtility.getGroupingKeys(config, a) + .flatMap(a -> DedupUtility.getGroupingKeys(config, a) .stream() - .map(it -> { - List tmp = new ArrayList<>(); - tmp.add(a); - return new Tuple2<>(it, tmp); - } - ) + .map(it -> Block.from(it, a)) .collect(Collectors.toList()) .iterator()) - .reduceByKey((Function2, List, List>) (v1, v2) -> { - v1.addAll(v2); - v1.sort(Comparator.comparing(a -> a.getFieldMap().get(of).stringValue())); - if (v1.size() > maxQueueSize) - return new ArrayList<>(v1.subList(0, maxQueueSize)); - return v1; - }); + .mapToPair(block -> new Tuple2<>(block.getKey(), block)) + .reduceByKey((b1, b2) -> Block.from(b1, b2, of, maxQueueSize)); } + } \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/OafEntityType.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/OafEntityType.java deleted file mode 100644 index da2bc3a37..000000000 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/OafEntityType.java +++ /dev/null @@ -1,15 +0,0 @@ -package eu.dnetlib.dhp.oa.dedup; - -public enum OafEntityType { - - datasource, - organization, - project, - dataset, - otherresearchproduct, - software, - publication - - - -} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java index 02cf6587a..c2b1fc9c2 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java @@ -1,14 +1,21 @@ package eu.dnetlib.dhp.oa.dedup; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.config.DedupConfig; import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.dom4j.DocumentException; import org.slf4j.Logger; @@ -30,7 +37,12 @@ public class SparkCreateDedupRecord extends AbstractSparkAction { SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json"))); parser.parseArgument(args); - new SparkCreateDedupRecord(parser, getSparkSession(parser)).run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + + new SparkCreateDedupRecord(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); } @Override @@ -46,8 +58,6 @@ public class SparkCreateDedupRecord extends AbstractSparkAction { log.info("actionSetId: '{}'", actionSetId); log.info("workingPath: '{}'", workingPath); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - for (DedupConfig dedupConf: getConfigurations(isLookUpService, actionSetId)) { String subEntity = dedupConf.getWf().getSubEntityValue(); log.info("Creating deduprecords for: '{}'", subEntity); @@ -57,11 +67,14 @@ public class SparkCreateDedupRecord extends AbstractSparkAction { final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity); final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity); - final OafEntityType entityType = OafEntityType.valueOf(subEntity); - final JavaRDD dedupRecord = - DedupRecordFactory.createDedupRecord(sc, spark, mergeRelPath, entityPath, entityType, dedupConf); - dedupRecord.map(r -> OBJECT_MAPPER.writeValueAsString(r)).saveAsTextFile(outputPath); + Class clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity)); + + DedupRecordFactory.createDedupRecord(spark, mergeRelPath, entityPath, clazz, dedupConf) + .map((MapFunction) value -> OBJECT_MAPPER.writeValueAsString(value), Encoders.STRING()) + .write() + .mode(SaveMode.Overwrite) + .json(outputPath); } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index ef82b4eaf..9c46404b7 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -4,6 +4,7 @@ import com.google.common.hash.Hashing; import eu.dnetlib.dhp.oa.dedup.graph.ConnectedComponent; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.dedup.graph.GraphProcessor; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Relation; @@ -13,8 +14,8 @@ import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.util.MapDocumentUtil; import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.graphx.Edge; @@ -52,7 +53,11 @@ public class SparkCreateMergeRels extends AbstractSparkAction { final String isLookUpUrl = parser.get("isLookUpUrl"); log.info("isLookupUrl {}", isLookUpUrl); - new SparkCreateMergeRels(parser, getSparkSession(parser)).run(ISLookupClientFactory.getLookUpService(isLookUpUrl)); + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + + new SparkCreateMergeRels(parser, getSparkSession(conf)).run(ISLookupClientFactory.getLookUpService(isLookUpUrl)); } @Override @@ -79,32 +84,30 @@ public class SparkCreateMergeRels extends AbstractSparkAction { log.info("Max iterations {}", maxIterations); final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity); + final JavaPairRDD vertexes = sc.textFile(graphBasePath + "/" + subEntity) .map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s)) - .mapToPair((PairFunction) - s -> new Tuple2<>(getHashcode(s), s)); + .mapToPair((PairFunction) s -> new Tuple2<>(hash(s), s)); - final Dataset similarityRelations = spark + final RDD> edgeRdd = spark .read() .load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)) - .as(Encoders.bean(Relation.class)); - - final RDD> edgeRdd = similarityRelations + .as(Encoders.bean(Relation.class)) .javaRDD() - .map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass())) + .map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass())) .rdd(); - final RDD connectedComponents = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, maxIterations) + final Dataset mergeRels = spark + .createDataset(GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, maxIterations) .toJavaRDD() .filter(k -> k.getDocIds().size() > 1) .flatMap(cc -> ccToMergeRel(cc, dedupConf)) - .rdd(); + .rdd(), Encoders.bean(Relation.class)); - spark - .createDataset(connectedComponents, Encoders.bean(Relation.class)) + mergeRels .write() .mode(SaveMode.Append) - .save(mergeRelPath); + .parquet(mergeRelPath); } } @@ -148,7 +151,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction { return r; } - public static long getHashcode(final String id) { + public static long hash(final String id) { return Hashing.murmur3_128().hashString(id).asLong(); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index de842d822..d02aef64c 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -1,23 +1,26 @@ package eu.dnetlib.dhp.oa.dedup; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.oa.dedup.model.Block; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.config.DedupConfig; +import eu.dnetlib.pace.model.FieldListImpl; +import eu.dnetlib.pace.model.FieldValueImpl; import eu.dnetlib.pace.model.MapDocument; import eu.dnetlib.pace.util.MapDocumentUtil; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; @@ -27,7 +30,6 @@ import org.slf4j.LoggerFactory; import scala.Tuple2; import java.io.IOException; -import java.util.List; public class SparkCreateSimRels extends AbstractSparkAction { @@ -43,7 +45,17 @@ public class SparkCreateSimRels extends AbstractSparkAction { SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); parser.parseArgument(args); - new SparkCreateSimRels(parser, getSparkSession(parser)).run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(new Class[] { + MapDocument.class, + FieldListImpl.class, + FieldValueImpl.class, + Block.class + }); + + new SparkCreateSimRels(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); } @Override @@ -60,8 +72,6 @@ public class SparkCreateSimRels extends AbstractSparkAction { log.info("actionSetId: '{}'", actionSetId); log.info("workingPath: '{}'", workingPath); - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - //for each dedup configuration for (DedupConfig dedupConf: getConfigurations(isLookUpService, actionSetId)) { @@ -72,29 +82,30 @@ public class SparkCreateSimRels extends AbstractSparkAction { final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity); removeOutputDir(spark, outputPath); - JavaPairRDD mapDocument = sc.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) - .mapToPair((PairFunction) s -> { + JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaPairRDD mapDocuments = sc.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) + .mapToPair((PairFunction) s -> { MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); return new Tuple2<>(d.getIdentifier(), d); }); //create blocks for deduplication - JavaPairRDD> blocks = Deduper.createSortedBlocks(sc, mapDocument, dedupConf); + JavaPairRDD blocks = Deduper.createSortedBlocks(mapDocuments, dedupConf); //create relations by comparing only elements in the same group - final JavaPairRDD dedupRels = Deduper.computeRelations(sc, blocks, dedupConf); - - JavaRDD relationsRDD = dedupRels.map(r -> createSimRel(r._1(), r._2(), entity)); + JavaRDD relations = Deduper.computeRelations(sc, blocks, dedupConf) + .map(t -> createSimRel(t._1(), t._2(), entity)); //save the simrel in the workingdir - spark.createDataset(relationsRDD.rdd(), Encoders.bean(Relation.class)) + spark.createDataset(relations.rdd(), Encoders.bean(Relation.class)) .write() .mode(SaveMode.Append) .save(outputPath); } } - public Relation createSimRel(String source, String target, String entity) { + private Relation createSimRel(String source, String target, String entity) { final Relation r = new Relation(); r.setSource(source); r.setTarget(target); @@ -102,7 +113,7 @@ public class SparkCreateSimRels extends AbstractSparkAction { r.setRelClass("isSimilarTo"); r.setDataInfo(new DataInfo()); - switch(entity){ + switch(entity) { case "result": r.setRelType("resultResult"); break; diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index 371d70ba2..86d19d96d 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.oa.dedup; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; @@ -10,11 +11,9 @@ import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Tuple2; @@ -43,7 +42,11 @@ public class SparkPropagateRelation extends AbstractSparkAction { parser.parseArgument(args); - new SparkPropagateRelation(parser, getSparkSession(parser)) + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + + new SparkPropagateRelation(parser, getSparkSession(conf)) .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); } @@ -90,7 +93,7 @@ public class SparkPropagateRelation extends AbstractSparkAction { processDataset(rels, mergedIds, FieldType.SOURCE, getDeletedFn()), mergedIds, FieldType.TARGET, getDeletedFn()); - save(newRels.union(updated), outputRelationPath); + save(newRels.union(updated), outputRelationPath, SaveMode.Overwrite); } @@ -164,26 +167,6 @@ public class SparkPropagateRelation extends AbstractSparkAction { }; } - private void deletePath(String path) { - try { - Path p = new Path(path); - FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration()); - - if (fs.exists(p)) { - fs.delete(p, true); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private static void save(Dataset dataset, String outPath) { - dataset - .write() - .option("compression", "gzip") - .json(outPath); - } - private static boolean containsDedup(final Relation r) { return r.getSource().toLowerCase().contains("dedup") || r.getTarget().toLowerCase().contains("dedup"); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkReporter.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkReporter.java index cc03db385..98ee37e14 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkReporter.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkReporter.java @@ -13,9 +13,9 @@ import java.util.Map; public class SparkReporter implements Serializable, Reporter { - final List> relations = new ArrayList<>(); - private static final Log log = LogFactory.getLog(SparkReporter.class); - Map accumulators; + private final List> relations = new ArrayList<>(); + + private Map accumulators; public SparkReporter(Map accumulators){ this.accumulators = accumulators; diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java index 5347489e7..ea0a06bbe 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java @@ -1,13 +1,17 @@ package eu.dnetlib.dhp.oa.dedup; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.oa.dedup.model.Block; import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.model.FieldListImpl; +import eu.dnetlib.pace.model.FieldValueImpl; +import eu.dnetlib.pace.model.MapDocument; import eu.dnetlib.pace.util.MapDocumentUtil; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; @@ -15,6 +19,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -28,7 +33,6 @@ import org.slf4j.LoggerFactory; import scala.Tuple2; import java.io.IOException; -import java.util.Map; public class SparkUpdateEntity extends AbstractSparkAction { @@ -43,10 +47,16 @@ public class SparkUpdateEntity extends AbstractSparkAction { public static void main(String[] args) throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils.toString( - SparkUpdateEntity.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"))); + SparkUpdateEntity.class.getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"))); parser.parseArgument(args); - new SparkUpdateEntity(parser, getSparkSession(parser)).run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + + new SparkUpdateEntity(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); } public void run(ISLookUpService isLookUpService) throws IOException { diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Block.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Block.java new file mode 100644 index 000000000..e1ccf143c --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Block.java @@ -0,0 +1,76 @@ +package eu.dnetlib.dhp.oa.dedup.model; + +import com.google.common.collect.Lists; +import eu.dnetlib.pace.model.MapDocument; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +public class Block implements Serializable { + + private String key; + + private List documents; + + public Block() { + super(); + } + + public static Block from(String key, MapDocument doc) { + Block block = new Block(); + block.setKey(key); + block.setDocuments(Lists.newArrayList(doc)); + return block; + } + + public static Block from(String key, Iterator blocks, String orderField, int maxSize) { + Block block = new Block(); + block.setKey(key); + + Iterable it = () -> blocks; + + block.setDocuments( + StreamSupport.stream(it.spliterator(), false) + .flatMap(b -> b.getDocuments().stream()) + .sorted(Comparator.comparing(a -> a.getFieldMap().get(orderField).stringValue())) + .limit(maxSize) + .collect(Collectors.toCollection(ArrayList::new))); + return block; + } + + public static Block from(Block b1, Block b2, String orderField, int maxSize) { + Block block = new Block(); + block.setKey(b1.getKey()); + block.setDocuments( + Stream.concat( + b1.getDocuments().stream(), + b2.getDocuments().stream()) + .sorted(Comparator.comparing(a -> a.getFieldMap().get(orderField).stringValue())) + .limit(maxSize) + .collect(Collectors.toCollection(ArrayList::new))); + + return block; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public List getDocuments() { + return documents; + } + + public void setDocuments(List documents) { + this.documents = documents; + } +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml index df6877dfa..926287032 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml @@ -91,7 +91,6 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - -mtyarn --i${graphBasePath} --o${dedupGraphPath} --w${workingPath} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json index 42ef2b78e..6eedd5432 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json @@ -1,10 +1,4 @@ [ - { - "paramName": "mt", - "paramLongName": "master", - "paramDescription": "should be local or yarn", - "paramRequired": true - }, { "paramName": "asi", "paramLongName": "actionSetId", diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json index f7bf5e518..3d48cb2d8 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json @@ -1,10 +1,4 @@ [ - { - "paramName": "mt", - "paramLongName": "master", - "paramDescription": "should be local or yarn", - "paramRequired": true - }, { "paramName": "i", "paramLongName": "graphBasePath", diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json index 8cffa86dc..ce38dc6f0 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json @@ -1,10 +1,4 @@ [ - { - "paramName": "mt", - "paramLongName": "master", - "paramDescription": "should be local or yarn", - "paramRequired": true - }, { "paramName": "la", "paramLongName": "isLookUpUrl", diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json index 721a783e1..6a2a48746 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json @@ -1,10 +1,4 @@ [ - { - "paramName": "mt", - "paramLongName": "master", - "paramDescription": "should be local or yarn", - "paramRequired": true - }, { "paramName": "i", "paramLongName": "graphBasePath", diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml index 6790dde32..2451947a1 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml @@ -98,7 +98,6 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - -mtyarn --i${graphBasePath} --la${isLookUpUrl} --asi${actionSetId} @@ -125,7 +124,6 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - -mtyarn --i${graphBasePath} --w${workingPath} --la${isLookUpUrl} @@ -152,7 +150,6 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - -mtyarn --i${graphBasePath} --w${workingPath} --la${isLookUpUrl} @@ -179,7 +176,6 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - -mtyarn --i${graphBasePath} --w${workingPath} --o${dedupGraphPath} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json index 06b67f732..c91f3c04b 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json @@ -1,10 +1,4 @@ [ -{ - "paramName": "mt", - "paramLongName": "master", - "paramDescription": "should be local or yarn", - "paramRequired": true -}, { "paramName": "i", "paramLongName": "graphBasePath",