diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index 47a2acf37..02db60f44 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -12,6 +12,9 @@ import eu.dnetlib.pace.util.MapDocumentUtil; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; @@ -30,6 +33,11 @@ import java.io.IOException; public class SparkPropagateRelation extends AbstractSparkAction { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);; + + public static final int NUM_PARTITIONS = 3000; + enum FieldType { SOURCE, TARGET @@ -64,8 +72,18 @@ public class SparkPropagateRelation extends AbstractSparkAction { System.out.println(String.format("workingPath: '%s'", workingPath)); System.out.println(String.format("dedupGraphPath:'%s'", dedupGraphPath)); + final String relationsPath = DedupUtility.createEntityPath(dedupGraphPath, "relation"); + final String newRelsPath = DedupUtility.createEntityPath(workingPath, "newRels"); + final String fixedSourceId = DedupUtility.createEntityPath(workingPath, "fixedSourceId"); + final String deletedSourceId = DedupUtility.createEntityPath(workingPath, "deletedSourceId"); + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + deletePath(relationsPath); + deletePath(newRelsPath); + deletePath(fixedSourceId); + deletePath(deletedSourceId); + final Dataset mergeRels = spark.read().load(DedupUtility.createMergeRelPath(workingPath, "*", "*")).as(Encoders.bean(Relation.class)); final JavaPairRDD mergedIds = mergeRels @@ -75,9 +93,9 @@ public class SparkPropagateRelation extends AbstractSparkAction { .toJavaRDD() .mapToPair((PairFunction) r -> new Tuple2(r.getString(1), r.getString(0))); - JavaRDD relations = sc.textFile(DedupUtility.createEntityPath(graphBasePath, "relation")); - - relations.mapToPair( + sc.textFile(DedupUtility.createEntityPath(graphBasePath, "relation")) + .repartition(NUM_PARTITIONS) + .mapToPair( (PairFunction) s -> new Tuple2(MapDocumentUtil.getJPathString(SOURCEJSONPATH, s), s)) .leftOuterJoin(mergedIds) @@ -87,6 +105,9 @@ public class SparkPropagateRelation extends AbstractSparkAction { } return v1._2()._1(); }) + .saveAsTextFile(fixedSourceId, GzipCodec.class); + + sc.textFile(fixedSourceId) .mapToPair( (PairFunction) s -> new Tuple2(MapDocumentUtil.getJPathString(TARGETJSONPATH, s), s)) @@ -97,12 +118,13 @@ public class SparkPropagateRelation extends AbstractSparkAction { } return v1._2()._1(); }).filter(SparkPropagateRelation::containsDedup) - .repartition(500) - .saveAsTextFile(DedupUtility.createEntityPath(workingPath, "newRels"), GzipCodec.class); + .repartition(NUM_PARTITIONS) + .saveAsTextFile(newRelsPath, GzipCodec.class); //update deleted by inference - relations.mapToPair( - (PairFunction) s -> + sc.textFile(DedupUtility.createEntityPath(graphBasePath, "relation")) + .repartition(NUM_PARTITIONS) + .mapToPair((PairFunction) s -> new Tuple2(MapDocumentUtil.getJPathString(SOURCEJSONPATH, s), s)) .leftOuterJoin(mergedIds) .map((Function>>, String>) v1 -> { @@ -111,6 +133,10 @@ public class SparkPropagateRelation extends AbstractSparkAction { } return v1._2()._1(); }) + .saveAsTextFile(deletedSourceId, GzipCodec.class); + + sc.textFile(deletedSourceId) + .repartition(NUM_PARTITIONS) .mapToPair( (PairFunction) s -> new Tuple2(MapDocumentUtil.getJPathString(TARGETJSONPATH, s), s)) @@ -121,17 +147,30 @@ public class SparkPropagateRelation extends AbstractSparkAction { } return v1._2()._1(); }) - .repartition(500) + .repartition(NUM_PARTITIONS) .saveAsTextFile(DedupUtility.createEntityPath(workingPath, "updated"), GzipCodec.class); JavaRDD newRels = sc - .textFile(DedupUtility.createEntityPath(workingPath, "newRels")); + .textFile(newRelsPath); sc .textFile(DedupUtility.createEntityPath(workingPath, "updated")) .union(newRels) - .repartition(1000) - .saveAsTextFile(DedupUtility.createEntityPath(dedupGraphPath, "relation"), GzipCodec.class); + .repartition(NUM_PARTITIONS) + .saveAsTextFile(relationsPath, GzipCodec.class); + } + + private void deletePath(String path) { + try { + Path p = new Path(path); + FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration()); + + if (fs.exists(p)) { + fs.delete(p, true); + } + } catch (IOException e) { + throw new RuntimeException(e); + } } private static boolean containsDedup(final String json) { @@ -142,20 +181,19 @@ public class SparkPropagateRelation extends AbstractSparkAction { } private static String replaceField(final String json, final String id, final FieldType type) { - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); try { - Relation relation = mapper.readValue(json, Relation.class); - if (relation.getDataInfo() == null) + Relation relation = OBJECT_MAPPER.readValue(json, Relation.class); + if (relation.getDataInfo() == null) { relation.setDataInfo(new DataInfo()); + } relation.getDataInfo().setDeletedbyinference(false); switch (type) { case SOURCE: relation.setSource(id); - return mapper.writeValueAsString(relation); + return OBJECT_MAPPER.writeValueAsString(relation); case TARGET: relation.setTarget(id); - return mapper.writeValueAsString(relation); + return OBJECT_MAPPER.writeValueAsString(relation); default: throw new IllegalArgumentException(""); } @@ -165,14 +203,13 @@ public class SparkPropagateRelation extends AbstractSparkAction { } private static String updateDeletedByInference(final String json, final Class clazz) { - final ObjectMapper mapper = new ObjectMapper(); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); try { - Oaf entity = mapper.readValue(json, clazz); - if (entity.getDataInfo()== null) + Oaf entity = OBJECT_MAPPER.readValue(json, clazz); + if (entity.getDataInfo() == null) { entity.setDataInfo(new DataInfo()); + } entity.getDataInfo().setDeletedbyinference(true); - return mapper.writeValueAsString(entity); + return OBJECT_MAPPER.writeValueAsString(entity); } catch (IOException e) { throw new RuntimeException("Unable to convert json", e); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml index 32f4e7db0..66dbdea73 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml @@ -24,6 +24,29 @@ sparkExecutorCores number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + @@ -38,6 +61,10 @@ oozie.launcher.mapred.job.queue.name ${oozieLauncherQueueName} + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + @@ -50,7 +77,7 @@ - + yarn cluster @@ -58,12 +85,14 @@ eu.dnetlib.dhp.oa.dedup.SparkUpdateEntity dhp-dedup-openaire-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} - --executor-cores ${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" - --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" - --conf spark.sql.warehouse.dir="/user/hive/warehouse" + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 -mtyarn --i${graphBasePath} @@ -77,7 +106,9 @@ - + + + yarn cluster @@ -85,12 +116,14 @@ eu.dnetlib.dhp.oa.dedup.SparkPropagateRelation dhp-dedup-openaire-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} - --executor-cores ${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" - --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" - --conf spark.sql.warehouse.dir="/user/hive/warehouse" + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 -mtyarn --i${graphBasePath}