From 2b8675462f2c2baef1d85a712e9e91b9d028c5eb Mon Sep 17 00:00:00 2001 From: "sandro.labruzzo" Date: Wed, 19 Feb 2020 10:07:08 +0100 Subject: [PATCH] refactoring code --- dhp-common/pom.xml | 4 + .../java/eu/dnetlib/dhp/utils/DHPUtils.java | 15 + dhp-workflows/dhp-aggregation/pom.xml | 1 + .../eu/dnetlib/dedup/DedupRecordFactory.java | 16 +- .../java/eu/dnetlib/dedup/DedupUtility.java | 4 +- .../dnetlib/dedup/SparkCreateDedupRecord.java | 7 +- .../eu/dnetlib/dedup/SparkCreateSimRels.java | 4 +- .../dedup/SparkPropagateRelationsJob.java | 117 ++++++ .../dnetlib/dedup/SparkUpdateEntityJob.java | 114 ++++++ .../dedup_delete_by_inference_parameters.json | 31 ++ .../dedup_propagate_relation_parameters.json | 26 ++ .../dnetlib/dhp/dedup/oozie_app/workflow.xml | 81 ++-- .../oozie_app/config-default.xml | 0 .../propagaterels/oozie_app/workflow.xml | 52 +++ .../entity/oozie_app/config-default.xml | 30 ++ .../update/entity/oozie_app/workflow.xml | 65 +++ .../dnetlib/dedup/SparkCreateDedupTest.java | 15 +- .../dnetlib/dedup/conf/pub_scholix.conf.json | 378 ++++++++++++++++++ dhp-workflows/dhp-graph-mapper/pom.xml | 12 +- .../dhp/graph/ImportDataFromMongo.java | 103 +++++ .../SparkScholexplorerMergeEntitiesJob.java | 3 - .../parser/AbstractScholexplorerParser.java | 4 +- .../parser/DatasetScholexplorerParser.java | 25 +- .../PublicationScholexplorerParser.java | 17 +- .../oozie_app/config-default.xml | 0 .../oozie_app/workflow.xml | 26 +- .../oozie_app/config-default.xml | 0 .../Extractentities}/oozie_app/workflow.xml | 31 +- .../oozie_app/config-default.xml | 0 .../ImportMongoToHDFS/oozie_app/workflow.xml | 73 ++++ .../oozie_app/config-default.xml | 10 + .../MergeEntities}/oozie_app/workflow.xml | 38 +- .../graph/import_from_mongo_parameters.json | 12 + .../dnetlib/dhp/graph/oozie_app/workflow.xml | 51 --- .../dhp/graph/ImportDataFromMongoTest.java | 22 + .../ScholexplorerParserTest.java | 38 ++ .../dnetlib/dhp/graph/scholexplorer/dmf.xml | 66 +++ pom.xml | 10 +- 38 files changed, 1332 insertions(+), 169 deletions(-) create mode 100644 dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkPropagateRelationsJob.java create mode 100644 dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkUpdateEntityJob.java create mode 100644 dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/dedup_delete_by_inference_parameters.json create mode 100644 dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/dedup_propagate_relation_parameters.json rename dhp-workflows/{dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph => dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/propagaterels}/oozie_app/config-default.xml (100%) create mode 100644 dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/propagaterels/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/update/entity/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/update/entity/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-dedup/src/test/resources/eu/dnetlib/dedup/conf/pub_scholix.conf.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ImportDataFromMongo.java rename dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/{mergeentities => Application/ConvertXMLToEntities}/oozie_app/config-default.xml (100%) rename dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/{mergeentities => Application/ConvertXMLToEntities}/oozie_app/workflow.xml (72%) rename dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/{scholexplorer/extractentities => Application/Extractentities}/oozie_app/config-default.xml (100%) rename dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/{scholexplorer/extractentities => Application/Extractentities}/oozie_app/workflow.xml (70%) rename dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/{scholexplorer => Application/ImportMongoToHDFS}/oozie_app/config-default.xml (100%) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/ImportMongoToHDFS/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/MergeEntities/oozie_app/config-default.xml rename dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/{scholexplorer => Application/MergeEntities}/oozie_app/workflow.xml (53%) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/import_from_mongo_parameters.json delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/ImportDataFromMongoTest.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/scholexplorer/ScholexplorerParserTest.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/graph/scholexplorer/dmf.xml diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml index 59b7d35d2..345a5475f 100644 --- a/dhp-common/pom.xml +++ b/dhp-common/pom.xml @@ -46,6 +46,10 @@ com.ximpleware vtd-xml + + com.jayway.jsonpath + json-path + diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java index 846ece5ed..5de2b70ff 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java @@ -1,5 +1,7 @@ package eu.dnetlib.dhp.utils; +import com.jayway.jsonpath.JsonPath; +import net.minidev.json.JSONArray; import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Base64OutputStream; import org.apache.commons.codec.binary.Hex; @@ -56,4 +58,17 @@ public class DHPUtils { } + public static String getJPathString(final String jsonPath, final String json) { + try { + Object o = JsonPath.read(json, jsonPath); + if (o instanceof String) + return (String) o; + if (o instanceof JSONArray && ((JSONArray) o).size() > 0) + return (String) ((JSONArray) o).get(0); + return ""; + } catch (Exception e) { + return ""; + } + } + } diff --git a/dhp-workflows/dhp-aggregation/pom.xml b/dhp-workflows/dhp-aggregation/pom.xml index 328e783c4..c6bb99fc3 100644 --- a/dhp-workflows/dhp-aggregation/pom.xml +++ b/dhp-workflows/dhp-aggregation/pom.xml @@ -45,6 +45,7 @@ jaxen + org.mockito mockito-core diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java index 5f81669e9..ebb504078 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java @@ -1,24 +1,20 @@ package eu.dnetlib.dedup; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.google.common.collect.Lists; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.util.MapDocumentUtil; -import org.apache.commons.lang.NotImplementedException; -import org.apache.commons.lang.StringUtils; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; -import org.codehaus.jackson.map.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectMapper; import scala.Tuple2; import java.util.Collection; -import java.util.Random; - -import static java.util.stream.Collectors.toMap; public class DedupRecordFactory { @@ -73,6 +69,8 @@ public class DedupRecordFactory { p.setId(e._1()); final ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + final Collection dateofacceptance = Lists.newArrayList(); @@ -105,6 +103,7 @@ public class DedupRecordFactory { d.setId(e._1()); final ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); final Collection dateofacceptance = Lists.newArrayList(); @@ -137,6 +136,7 @@ public class DedupRecordFactory { p.setId(e._1()); final ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); if (e._2() != null) e._2().forEach(proj -> { try { @@ -160,6 +160,7 @@ public class DedupRecordFactory { s.setId(e._1()); final ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); final Collection dateofacceptance = Lists.newArrayList(); if (e._2() != null) e._2().forEach(soft -> { @@ -187,6 +188,7 @@ public class DedupRecordFactory { Datasource d = new Datasource(); //the result of the merge, to be returned at the end d.setId(e._1()); final ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); if (e._2() != null) e._2().forEach(dat -> { try { @@ -211,6 +213,7 @@ public class DedupRecordFactory { o.setId(e._1()); final ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); StringBuilder trust = new StringBuilder("0.0"); @@ -251,6 +254,7 @@ public class DedupRecordFactory { o.setId(e._1()); final ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); final Collection dateofacceptance = Lists.newArrayList(); diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java index 3bed74f86..196a8c140 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java @@ -151,11 +151,11 @@ public class DedupUtility { } public static String createSimRelPath(final String basePath, final String entityType) { - return String.format("%s/%s_simRel", basePath, entityType); + return String.format("%s/%s/simRel", basePath, entityType); } public static String createMergeRelPath(final String basePath, final String entityType) { - return String.format("%s/%s_mergeRel", basePath, entityType); + return String.format("%s/%s/mergeRel", basePath, entityType); } private static Double sim(Author a, Author b) { diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateDedupRecord.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateDedupRecord.java index db2306526..8e60df945 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateDedupRecord.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateDedupRecord.java @@ -10,7 +10,6 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; public class SparkCreateDedupRecord { - public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateDedupRecord.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedupRecord_parameters.json"))); parser.parseArgument(args); @@ -24,16 +23,12 @@ public class SparkCreateDedupRecord { final String sourcePath = parser.get("sourcePath"); final String entity = parser.get("entity"); final String dedupPath = parser.get("dedupPath"); -// final DedupConfig dedupConf = DedupConfig.load(IOUtils.toString(SparkCreateDedupRecord.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf2.json"))); final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf")); final JavaRDD dedupRecord = DedupRecordFactory.createDedupRecord(sc, spark, DedupUtility.createMergeRelPath(dedupPath,entity), DedupUtility.createEntityPath(sourcePath,entity), OafEntityType.valueOf(entity), dedupConf); dedupRecord.map(r-> { ObjectMapper mapper = new ObjectMapper(); return mapper.writeValueAsString(r); - }).saveAsTextFile(dedupPath+"/"+entity+"_dedup_record_json"); - - + }).saveAsTextFile(dedupPath+"/"+entity+"/dedup_records"); } - } diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java index 831e45daf..2bdfa8759 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java @@ -44,7 +44,7 @@ public class SparkCreateSimRels { // final DedupConfig dedupConf = DedupConfig.load(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))); final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf")); - final long total = sc.textFile(inputPath + "/" + entity).count(); + JavaPairRDD mapDocument = sc.textFile(inputPath + "/" + entity) .mapToPair(s->{ @@ -70,4 +70,4 @@ public class SparkCreateSimRels { spark.createDataset(isSimilarToRDD.rdd(), Encoders.bean(Relation.class)).write().mode("overwrite").save( DedupUtility.createSimRelPath(targetPath,entity)); } -} \ No newline at end of file +} diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkPropagateRelationsJob.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkPropagateRelationsJob.java new file mode 100644 index 000000000..9a9abebe6 --- /dev/null +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkPropagateRelationsJob.java @@ -0,0 +1,117 @@ +package eu.dnetlib.dedup; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.utils.DHPUtils; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.Optional; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +import java.io.IOException; + +public class SparkPropagateRelationsJob { + enum FieldType { + SOURCE, + TARGET + } + final static String IDJSONPATH = "$.id"; + final static String SOURCEJSONPATH = "$.source"; + final static String TARGETJSONPATH = "$.target"; + + public static void main(String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkPropagateRelationsJob.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedup_propagate_relation_parameters.json"))); + parser.parseArgument(args); + final SparkSession spark = SparkSession + .builder() + .appName(SparkUpdateEntityJob.class.getSimpleName()) + .master(parser.get("master")) + .getOrCreate(); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final String relationPath = parser.get("relationPath"); + final String mergeRelPath = parser.get("mergeRelPath"); + final String targetRelPath = parser.get("targetRelPath"); + + + final Dataset df = spark.read().load(mergeRelPath).as(Encoders.bean(Relation.class)); + + + + final JavaPairRDD mergedIds = df + .where("relClass == 'merges'") + .select(df.col("source"),df.col("target")) + .distinct() + .toJavaRDD() + .mapToPair((PairFunction) r -> new Tuple2<>(r.getString(1), r.getString(0))); + + + final JavaRDD sourceEntity = sc.textFile(relationPath); + JavaRDD newRels = sourceEntity.mapToPair( + (PairFunction) s -> + new Tuple2<>(DHPUtils.getJPathString(SOURCEJSONPATH, s), s)) + .leftOuterJoin(mergedIds) + .map((Function>>, String>) v1 -> { + if (v1._2()._2().isPresent()) { + return replaceField(v1._2()._1(), v1._2()._2().get(), FieldType.SOURCE); + } + return v1._2()._1(); + }) + .mapToPair( + (PairFunction) s -> + new Tuple2<>(DHPUtils.getJPathString(TARGETJSONPATH, s), s)) + .leftOuterJoin(mergedIds) + .map((Function>>, String>) v1 -> { + if (v1._2()._2().isPresent()) { + return replaceField(v1._2()._1(), v1._2()._2().get(), FieldType.TARGET); + } + return v1._2()._1(); + }).filter(SparkPropagateRelationsJob::containsDedup) + .repartition(500); + + newRels.union(sourceEntity).repartition(1000).saveAsTextFile(targetRelPath, GzipCodec.class); + } + + private static boolean containsDedup(final String json) { + final String source = DHPUtils.getJPathString(SOURCEJSONPATH, json); + final String target = DHPUtils.getJPathString(TARGETJSONPATH, json); + + return source.toLowerCase().contains("dedup") || target.toLowerCase().contains("dedup"); + } + + + private static String replaceField(final String json, final String id, final FieldType type) { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + try { + Relation relation = mapper.readValue(json, Relation.class); + if (relation.getDataInfo() == null) + relation.setDataInfo(new DataInfo()); + relation.getDataInfo().setDeletedbyinference(false); + switch (type) { + case SOURCE: + relation.setSource(id); + return mapper.writeValueAsString(relation); + case TARGET: + relation.setTarget(id); + return mapper.writeValueAsString(relation); + default: + throw new IllegalArgumentException(""); + } + } catch (IOException e) { + throw new RuntimeException("unable to deserialize json relation: " + json, e); + } + } +} diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkUpdateEntityJob.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkUpdateEntityJob.java new file mode 100644 index 000000000..e7bb4f9c2 --- /dev/null +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkUpdateEntityJob.java @@ -0,0 +1,114 @@ +package eu.dnetlib.dedup; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset; +import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication; +import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown; +import eu.dnetlib.dhp.utils.DHPUtils; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +import java.io.IOException; + +public class SparkUpdateEntityJob { + + final static String IDJSONPATH = "$.id"; + final static String SOURCEJSONPATH = "$.source"; + final static String TARGETJSONPATH = "$.target"; + + public static void main(String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkUpdateEntityJob.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedup_delete_by_inference_parameters.json"))); + parser.parseArgument(args); + final SparkSession spark = SparkSession + .builder() + .appName(SparkUpdateEntityJob.class.getSimpleName()) + .master(parser.get("master")) + .getOrCreate(); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final String entityPath = parser.get("entityPath"); + final String mergeRelPath = parser.get("mergeRelPath"); + final String dedupRecordPath = parser.get("dedupRecordPath"); + final String entity = parser.get("entity"); + + final Dataset df = spark.read().load(mergeRelPath).as(Encoders.bean(Relation.class)); + final JavaPairRDD mergedIds = df + .where("relClass == 'merges'") + .select(df.col("target")) + .distinct() + .toJavaRDD() + .mapToPair((PairFunction) r -> new Tuple2<>(r.getString(0), "d")); + final JavaRDD sourceEntity = sc.textFile(entityPath); + + if ("relation".equalsIgnoreCase(entity)) { + sourceEntity.mapToPair( + (PairFunction) s -> + new Tuple2<>(DHPUtils.getJPathString(SOURCEJSONPATH, s), s)) + .leftOuterJoin(mergedIds) + .map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), Relation.class) : k._2()._1()) + .mapToPair((PairFunction) s -> new Tuple2<>(DHPUtils.getJPathString(TARGETJSONPATH, s), s)) + .leftOuterJoin(mergedIds) + .map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), Relation.class) : k._2()._1()) + .saveAsTextFile(entityPath + "_new", GzipCodec.class); + } else { + final JavaRDD dedupEntity = sc.textFile(dedupRecordPath); + JavaPairRDD entitiesWithId = sourceEntity.mapToPair((PairFunction) s -> new Tuple2<>(DHPUtils.getJPathString(IDJSONPATH, s), s)); + Class mainClass; + switch (entity) { + case "publication": + mainClass = DLIPublication.class; + break; + case "dataset": + mainClass = DLIDataset.class; + break; + case "unknown": + mainClass = DLIUnknown.class; + break; + default: + throw new IllegalArgumentException("Illegal type " + entity); + + } + + JavaRDD map = entitiesWithId.leftOuterJoin(mergedIds).map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), mainClass) : k._2()._1()); + + + map.union(dedupEntity).saveAsTextFile(entityPath + "_new", GzipCodec.class); + } + + + } + + + private static String updateDeletedByInference(final String json, final Class clazz) { + + final ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + try { + Oaf entity = mapper.readValue(json, clazz); + if (entity.getDataInfo()== null) + entity.setDataInfo(new DataInfo()); + entity.getDataInfo().setDeletedbyinference(true); + return mapper.writeValueAsString(entity); + } catch (IOException e) { + throw new RuntimeException("Unable to convert json", e); + } + + + } + + +} diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/dedup_delete_by_inference_parameters.json b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/dedup_delete_by_inference_parameters.json new file mode 100644 index 000000000..fecc666c4 --- /dev/null +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/dedup_delete_by_inference_parameters.json @@ -0,0 +1,31 @@ +[ + { + "paramName": "mt", + "paramLongName": "master", + "paramDescription": "should be local or yarn", + "paramRequired": true + }, + { + "paramName": "ep", + "paramLongName": "entityPath", + "paramDescription": "the input entity path", + "paramRequired": true + }, + { + "paramName": "mr", + "paramLongName": "mergeRelPath", + "paramDescription": "the input path of merge Rel", + "paramRequired": true + }, + { + "paramName": "dr", + "paramLongName": "dedupRecordPath", + "paramDescription": "the inputPath of dedup record", + "paramRequired": true + }, { + "paramName": "e", + "paramLongName": "entity", + "paramDescription": "the type of entity", + "paramRequired": true +} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/dedup_propagate_relation_parameters.json b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/dedup_propagate_relation_parameters.json new file mode 100644 index 000000000..2ce78440f --- /dev/null +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/dedup_propagate_relation_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "mt", + "paramLongName": "master", + "paramDescription": "should be local or yarn", + "paramRequired": true + }, + { + "paramName": "ep", + "paramLongName": "relationPath", + "paramDescription": "the input relation path", + "paramRequired": true + }, + { + "paramName": "mr", + "paramLongName": "mergeRelPath", + "paramDescription": "the input path of merge Rel", + "paramRequired": true + }, + { + "paramName": "t", + "paramLongName": "targetRelPath", + "paramDescription": "the output Rel Path", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml index 5a00a5967..89ebb17ff 100644 --- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml @@ -24,27 +24,24 @@ sparkExecutorMemory memory for individual executor - - sparkExecutorCores - number of cores used by single executor - - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - + + + + + + + + + @@ -55,11 +52,11 @@ Create Similarity Relations eu.dnetlib.dedup.SparkCreateSimRels dhp-dedup-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} --conf - spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf - spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf - spark.sql.warehouse.dir="/user/hive/warehouse" + + --executor-memory ${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --num-executors 100 + --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" -mtyarn-cluster --sourcePath${sourcePath} @@ -71,7 +68,6 @@ - ${jobTracker} @@ -81,11 +77,11 @@ Create Connected Components eu.dnetlib.dedup.SparkCreateConnectedComponent dhp-dedup-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} --conf - spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf - spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf - spark.sql.warehouse.dir="/user/hive/warehouse" + + --executor-memory ${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --num-executors 100 + --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" -mtyarn-cluster --sourcePath${sourcePath} @@ -106,21 +102,46 @@ Create Dedup Record eu.dnetlib.dedup.SparkCreateDedupRecord dhp-dedup-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} --conf - spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf - spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf - spark.sql.warehouse.dir="/user/hive/warehouse" + + --executor-memory ${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --num-executors 100 + --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" -mtyarn-cluster --sourcePath${sourcePath} - --dedupPath${dedupPath} + --dedupPath${targetPath} --entity${entity} --dedupConf${dedupConf} + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + Propagate Dedup Relations + eu.dnetlib.dedup.SparkPropagateRelationsJob + dhp-dedup-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --num-executors 100 + --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" + + -mtyarn-cluster + --mergeRelPath${targetPath}/${entity}/mergeRel + --relationPath${sourcePath}/relation + --targetRelPath${targetPath}/${entity}/relation_updated + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/propagaterels/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml rename to dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/propagaterels/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/propagaterels/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/propagaterels/oozie_app/workflow.xml new file mode 100644 index 000000000..fd5cd6d7f --- /dev/null +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/propagaterels/oozie_app/workflow.xml @@ -0,0 +1,52 @@ + + + + relationPath + the source path + + + mergeRelPath + the target path + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + Propagate Dedup Relations + eu.dnetlib.dedup.SparkPropagateRelationsJob + dhp-dedup-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --num-executors 100 + --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" + + -mtyarn-cluster + --mergeRelPath${mergeRelPath} + --relationPath${relationPath} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/update/entity/oozie_app/config-default.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/update/entity/oozie_app/config-default.xml new file mode 100644 index 000000000..ba2df7773 --- /dev/null +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/update/entity/oozie_app/config-default.xml @@ -0,0 +1,30 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hive_metastore_uris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + hive_db_name + openaire + + + master + yarn + + \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/update/entity/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/update/entity/oozie_app/workflow.xml new file mode 100644 index 000000000..d98344736 --- /dev/null +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/update/entity/oozie_app/workflow.xml @@ -0,0 +1,65 @@ + + + + entity + the entity that should be processed + + + entityPath + the source path + + + mergeRelPath + the target path + + + dedupRecordPath + the target path + + + master + the target path + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + ${jobTracker} + ${nameNode} + ${master} + cluster + Update ${entity} and add DedupRecord + eu.dnetlib.dedup.SparkUpdateEntityJob + dhp-dedup-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --num-executors 100 + --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" + + -mt${master} + --entityPath${entityPath} + --mergeRelPath${mergeRelPath} + --entity${entity} + --dedupRecordPath${dedupRecordPath} + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dedup/SparkCreateDedupTest.java b/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dedup/SparkCreateDedupTest.java index f93703e37..fb1be554b 100644 --- a/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dedup/SparkCreateDedupTest.java +++ b/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dedup/SparkCreateDedupTest.java @@ -1,19 +1,14 @@ package eu.dnetlib.dedup; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.Publication; -import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; -import java.io.File; import java.io.IOException; -import java.util.List; public class SparkCreateDedupTest { @@ -22,7 +17,7 @@ public class SparkCreateDedupTest { @Before public void setUp() throws IOException { - configuration = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dedup/conf/org.curr.conf.json")); + configuration = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dedup/conf/pub_scholix.conf.json")); } @@ -38,6 +33,14 @@ public class SparkCreateDedupTest { }); } + + @Test + public void createDeletedByInference() throws Exception { + SparkUpdateEntityJob.main(new String[] { + "-mt", "local[*]" + }); + } + @Test @Ignore public void createCCTest() throws Exception { diff --git a/dhp-workflows/dhp-dedup/src/test/resources/eu/dnetlib/dedup/conf/pub_scholix.conf.json b/dhp-workflows/dhp-dedup/src/test/resources/eu/dnetlib/dedup/conf/pub_scholix.conf.json new file mode 100644 index 000000000..d91419853 --- /dev/null +++ b/dhp-workflows/dhp-dedup/src/test/resources/eu/dnetlib/dedup/conf/pub_scholix.conf.json @@ -0,0 +1,378 @@ +{ + "wf": { + "threshold": "0.99", + "dedupRun": "001", + "entityType": "result", + "subEntityType": "resulttype", + "subEntityValue": "publication", + "orderField": "title", + "queueMaxSize": "2000", + "groupMaxSize": "100", + "maxChildren": "100", + "slidingWindowSize": "200", + "rootBuilder": [ + ], + "includeChildren": "true", + "maxIterations": 20, + "idPath": "$.id" + }, + "pace": { + "clustering": [ + { + "name": "ngrampairs", + "fields": [ + "title" + ], + "params": { + "max": "1", + "ngramLen": "3" + } + }, + { + "name": "suffixprefix", + "fields": [ + "title" + ], + "params": { + "max": "1", + "len": "3" + } + } + ], + "decisionTree": { + "start": { + "fields": [ + { + "field": "pid", + "comparator": "jsonListMatch", + "weight": 1.0, + "countIfUndefined": "false", + "params": { + "jpath_value": "$.value", + "jpath_classid": "$.qualifier.classid" + } + } + ], + "threshold": 0.5, + "aggregation": "AVG", + "positive": "MATCH", + "negative": "layer2", + "undefined": "layer2", + "ignoreUndefined": "true" + }, + "layer2": { + "fields": [ + { + "field": "title", + "comparator": "titleVersionMatch", + "weight": 1.0, + "countIfUndefined": "false", + "params": {} + }, + { + "field": "authors", + "comparator": "sizeMatch", + "weight": 1.0, + "countIfUndefined": "false", + "params": {} + } + ], + "threshold": 1.0, + "aggregation": "AND", + "positive": "layer3", + "negative": "NO_MATCH", + "undefined": "layer3", + "ignoreUndefined": "false" + }, + "layer3": { + "fields": [ + { + "field": "title", + "comparator": "levensteinTitle", + "weight": 1.0, + "countIfUndefined": "true", + "params": {} + } + ], + "threshold": 0.99, + "aggregation": "AVG", + "positive": "MATCH", + "negative": "NO_MATCH", + "undefined": "NO_MATCH", + "ignoreUndefined": "true" + } + }, + "model": [ + { + "name": "pid", + "type": "JSON", + "path": "$.pid", + "overrideMatch": "true" + }, + { + "name": "title", + "type": "String", + "path": "$.title[*].value", + "length": 250, + "size": 5 + }, + { + "name": "authors", + "type": "List", + "path": "$.author[*].fullname", + "size": 200 + }, + { + "name": "resulttype", + "type": "String", + "path": "$.resulttype.classid" + } + ], + "blacklists": { + "title": [ + "^Inside Front Cover$", + "^CORR Insights$", + "^Index des notions$", + "^Department of Error.$", + "^Untitled Item$", + "^Department of Error$", + "^Tome II : 1598 à 1605$", + "^(à l’exception de roi, prince, royauté, pouvoir, image… qui sont omniprésents)$", + "^Museen und Ausstellungsinstitute in Nürnberg$", + "^Text/Conference Paper$", + "^Table des illustrations$", + "^An Intimate Insight on Psychopathy and a Novel Hermeneutic Psychological Science$", + "^Index des noms$", + "^Reply by Authors.$", + "^Titelblatt - Inhalt$", + "^Index des œuvres,$", + "(?i)^Poster presentations$", + "^THE ASSOCIATION AND THE GENERAL MEDICAL COUNCIL$", + "^Problems with perinatal pathology\\.?$", + "(?i)^Cases? of Puerperal Convulsions$", + "(?i)^Operative Gyna?ecology$", + "(?i)^Mind the gap\\!?\\:?$", + "^Chronic fatigue syndrome\\.?$", + "^Cartas? ao editor Letters? to the Editor$", + "^Note from the Editor$", + "^Anesthesia Abstract$", + "^Annual report$", + "(?i)^“?THE RADICAL PREVENTION OF VENEREAL DISEASE\\.?”?$", + "(?i)^Graph and Table of Infectious Diseases?$", + "^Presentation$", + "(?i)^Reviews and Information on Publications$", + "(?i)^PUBLIC HEALTH SERVICES?$", + "(?i)^COMBINED TEXT-?BOOK OF OBSTETRICS AND GYN(Æ|ae)COLOGY$", + "(?i)^Adrese autora$", + "(?i)^Systematic Part .*\\. Catalogus Fossilium Austriae, Band 2: Echinoidea neogenica$", + "(?i)^Acknowledgement to Referees$", + "(?i)^Behçet's disease\\.?$", + "(?i)^Isolation and identification of restriction endonuclease.*$", + "(?i)^CEREBROVASCULAR DISEASES?.?$", + "(?i)^Screening for abdominal aortic aneurysms?\\.?$", + "^Event management$", + "(?i)^Breakfast and Crohn's disease.*\\.?$", + "^Cálculo de concentraciones en disoluciones acuosas. Ejercicio interactivo\\..*\\.$", + "(?i)^Genetic and functional analyses of SHANK2 mutations suggest a multiple hit model of Autism spectrum disorders?\\.?$", + "^Gushi hakubutsugaku$", + "^Starobosanski nadpisi u Bosni i Hercegovini \\(.*\\)$", + "^Intestinal spirocha?etosis$", + "^Treatment of Rodent Ulcer$", + "(?i)^\\W*Cloud Computing\\W*$", + "^Compendio mathematico : en que se contienen todas las materias mas principales de las Ciencias que tratan de la cantidad$", + "^Free Communications, Poster Presentations: Session [A-F]$", + "^“The Historical Aspects? of Quackery\\.?”$", + "^A designated centre for people with disabilities operated by St John of God Community Services (Limited|Ltd), Louth$", + "^P(er|re)-Mile Premiums for Auto Insurance\\.?$", + "(?i)^Case Report$", + "^Boletín Informativo$", + "(?i)^Glioblastoma Multiforme$", + "(?i)^Nuevos táxones animales descritos en la península Ibérica y Macaronesia desde 1994 \\(.*\\)$", + "^Zaměstnanecké výhody$", + "(?i)^The Economics of Terrorism and Counter-Terrorism: A Survey \\(Part .*\\)$", + "(?i)^Carotid body tumours?\\.?$", + "(?i)^\\[Españoles en Francia : La condición Emigrante.*\\]$", + "^Avant-propos$", + "(?i)^St\\. Patrick's Cathedral, Dublin, County Dublin - Head(s)? and Capital(s)?$", + "(?i)^St\\. Patrick's Cathedral, Dublin, County Dublin - Bases?$", + "(?i)^PUBLIC HEALTH VERSUS THE STATE$", + "^Viñetas de Cortázar$", + "(?i)^Search for heavy neutrinos and W(\\[|_|\\(|_\\{|-)?R(\\]|\\)|\\})? bosons with right-handed couplings in a left-right symmetric model in pp collisions at.*TeV(\\.)?$", + "(?i)^Measurement of the pseudorapidity and centrality dependence of the transverse energy density in Pb(-?)Pb collisions at.*tev(\\.?)$", + "(?i)^Search for resonances decaying into top-quark pairs using fully hadronic decays in pp collisions with ATLAS at.*TeV$", + "(?i)^Search for neutral minimal supersymmetric standard model Higgs bosons decaying to tau pairs in pp collisions at.*tev$", + "(?i)^Relatório de Estágio (de|em) Angiologia e Cirurgia Vascular$", + "^Aus der AGMB$", + "^Znanstveno-stručni prilozi$", + "(?i)^Zhodnocení finanční situace podniku a návrhy na zlepšení$", + "(?i)^Evaluation of the Financial Situation in the Firm and Proposals to its Improvement$", + "(?i)^Hodnocení finanční situace podniku a návrhy na její zlepšení$", + "^Finanční analýza podniku$", + "^Financial analysis( of business)?$", + "(?i)^Textbook of Gyn(a)?(Æ)?(e)?cology$", + "^Jikken nihon shūshinsho$", + "(?i)^CORONER('|s)(s|') INQUESTS$", + "(?i)^(Μελέτη παραγόντων )?risk management( για ανάπτυξη και εφαρμογή ενός πληροφοριακού συστήματος| και ανάπτυξη συστήματος)?$", + "(?i)^Consultants' contract(s)?$", + "(?i)^Upute autorima$", + "(?i)^Bijdrage tot de Kennis van den Godsdienst der Dajaks van Lan(d|f)ak en Tajan$", + "^Joshi shin kokubun$", + "^Kōtō shōgaku dokuhon nōson'yō$", + "^Jinjō shōgaku shōka$", + "^Shōgaku shūjichō$", + "^Nihon joshi dokuhon$", + "^Joshi shin dokuhon$", + "^Chūtō kanbun dokuhon$", + "^Wabun dokuhon$", + "(?i)^(Analysis of economy selected village or town|Rozbor hospodaření vybrané obce či města)$", + "(?i)^cardiac rehabilitation$", + "(?i)^Analytical summary$", + "^Thesaurus resolutionum Sacrae Congregationis Concilii$", + "(?i)^Sumario analítico(\\s{1})?(Analitic summary)?$", + "^Prikazi i osvrti$", + "^Rodinný dům s provozovnou$", + "^Family house with an establishment$", + "^Shinsei chūtō shin kokugun$", + "^Pulmonary alveolar proteinosis(\\.?)$", + "^Shinshū kanbun$", + "^Viñeta(s?) de Rodríguez$", + "(?i)^RUBRIKA UREDNIKA$", + "^A Matching Model of the Academic Publication Market$", + "^Yōgaku kōyō$", + "^Internetový marketing$", + "^Internet marketing$", + "^Chūtō kokugo dokuhon$", + "^Kokugo dokuhon$", + "^Antibiotic Cover for Dental Extraction(s?)$", + "^Strategie podniku$", + "^Strategy of an Enterprise$", + "(?i)^respiratory disease(s?)(\\.?)$", + "^Award(s?) for Gallantry in Civil Defence$", + "^Podniková kultura$", + "^Corporate Culture$", + "^Severe hyponatraemia in hospital inpatient(s?)(\\.?)$", + "^Pracovní motivace$", + "^Work Motivation$", + "^Kaitei kōtō jogaku dokuhon$", + "^Konsolidovaná účetní závěrka$", + "^Consolidated Financial Statements$", + "(?i)^intracranial tumour(s?)$", + "^Climate Change Mitigation Options and Directed Technical Change: A Decentralized Equilibrium Analysis$", + "^\\[CERVECERIAS MAHOU(\\.|\\:) INTERIOR\\] \\[Material gráfico\\]$", + "^Housing Market Dynamics(\\:|\\.) On the Contribution of Income Shocks and Credit Constraint(s?)$", + "^\\[Funciones auxiliares de la música en Radio París,.*\\]$", + "^Úroveň motivačního procesu jako způsobu vedení lidí$", + "^The level of motivation process as a leadership$", + "^Pay-beds in N(\\.?)H(\\.?)S(\\.?) Hospitals$", + "(?i)^news and events$", + "(?i)^NOVOSTI I DOGAĐAJI$", + "^Sansū no gakushū$", + "^Posouzení informačního systému firmy a návrh změn$", + "^Information System Assessment and Proposal for ICT Modification$", + "^Stresové zatížení pracovníků ve vybrané profesi$", + "^Stress load in a specific job$", + "^Sunday: Poster Sessions, Pt.*$", + "^Monday: Poster Sessions, Pt.*$", + "^Wednesday: Poster Sessions, Pt.*", + "^Tuesday: Poster Sessions, Pt.*$", + "^Analýza reklamy$", + "^Analysis of advertising$", + "^Shōgaku shūshinsho$", + "^Shōgaku sansū$", + "^Shintei joshi kokubun$", + "^Taishō joshi kokubun dokuhon$", + "^Joshi kokubun$", + "^Účetní uzávěrka a účetní závěrka v ČR$", + "(?i)^The \"?Causes\"? of Cancer$", + "^Normas para la publicación de artículos$", + "^Editor('|s)(s|') [Rr]eply$", + "^Editor(’|s)(s|’) letter$", + "^Redaktoriaus žodis$", + "^DISCUSSION ON THE PRECEDING PAPER$", + "^Kōtō shōgaku shūshinsho jidōyō$", + "^Shōgaku nihon rekishi$", + "^(Theory of the flow of action currents in isolated myelinated nerve fibers).*$", + "^Préface$", + "^Occupational [Hh]ealth [Ss]ervices.$", + "^In Memoriam Professor Toshiyuki TAKESHIMA$", + "^Účetní závěrka ve vybraném podniku.*$", + "^Financial statements in selected company$", + "^Abdominal [Aa]ortic [Aa]neurysms.*$", + "^Pseudomyxoma peritonei$", + "^Kazalo autora$", + "(?i)^uvodna riječ$", + "^Motivace jako způsob vedení lidí$", + "^Motivation as a leadership$", + "^Polyfunkční dům$", + "^Multi\\-funkcional building$", + "^Podnikatelský plán$", + "(?i)^Podnikatelský záměr$", + "(?i)^Business Plan$", + "^Oceňování nemovitostí$", + "^Marketingová komunikace$", + "^Marketing communication$", + "^Sumario Analítico$", + "^Riječ uredništva$", + "^Savjetovanja i priredbe$", + "^Índice$", + "^(Starobosanski nadpisi).*$", + "^Vzdělávání pracovníků v organizaci$", + "^Staff training in organization$", + "^(Life Histories of North American Geometridae).*$", + "^Strategická analýza podniku$", + "^Strategic Analysis of an Enterprise$", + "^Sadržaj$", + "^Upute suradnicima$", + "^Rodinný dům$", + "(?i)^Fami(l)?ly house$", + "^Upute autorima$", + "^Strategic Analysis$", + "^Finanční analýza vybraného podniku$", + "^Finanční analýza$", + "^Riječ urednika$", + "(?i)^Content(s?)$", + "(?i)^Inhalt$", + "^Jinjō shōgaku shūshinsho jidōyō$", + "(?i)^Index$", + "^Chūgaku kokubun kyōkasho$", + "^Retrato de una mujer$", + "^Retrato de un hombre$", + "^Kōtō shōgaku dokuhon$", + "^Shotōka kokugo$", + "^Shōgaku dokuhon$", + "^Jinjō shōgaku kokugo dokuhon$", + "^Shinsei kokugo dokuhon$", + "^Teikoku dokuhon$", + "^Instructions to Authors$", + "^KİTAP TAHLİLİ$", + "^PRZEGLĄD PIŚMIENNICTWA$", + "(?i)^Presentación$", + "^İçindekiler$", + "(?i)^Tabl?e of contents$", + "^(CODICE DEL BEATO DE LOS REYES FERNANDO I Y SANCHA).*$", + "^(\\[MADRID\\. BIBL\\. NAC\\. N.*KING FERDINAND I.*FROM SAN ISIDORO DE LEON\\. FACUNDUS SCRIPSIT DATED.*\\]).*", + "^Editorial( Board)?$", + "(?i)^Editorial \\(English\\)$", + "^Editörden$", + "^(Corpus Oral Dialectal \\(COD\\)\\.).*$", + "^(Kiri Karl Morgensternile).*$", + "^(\\[Eksliibris Aleksandr).*\\]$", + "^(\\[Eksliibris Aleksandr).*$", + "^(Eksliibris Aleksandr).*$", + "^(Kiri A\\. de Vignolles).*$", + "^(2 kirja Karl Morgensternile).*$", + "^(Pirita kloostri idaosa arheoloogilised).*$", + "^(Kiri tundmatule).*$", + "^(Kiri Jenaer Allgemeine Literaturzeitung toimetusele).*$", + "^(Eksliibris Nikolai Birukovile).*$", + "^(Eksliibris Nikolai Issakovile).*$", + "^(WHP Cruise Summary Information of section).*$", + "^(Measurement of the top quark\\-pair production cross section with ATLAS in pp collisions at).*$", + "^(Measurement of the spin\\-dependent structure function).*", + "(?i)^.*authors['’′]? reply\\.?$", + "(?i)^.*authors['’′]? response\\.?$" + ] + }, + "synonyms": {} + } +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/pom.xml b/dhp-workflows/dhp-graph-mapper/pom.xml index ff7450663..641fbd933 100644 --- a/dhp-workflows/dhp-graph-mapper/pom.xml +++ b/dhp-workflows/dhp-graph-mapper/pom.xml @@ -1,5 +1,6 @@ - + dhp-workflows eu.dnetlib.dhp @@ -11,6 +12,11 @@ + + commons-io + commons-io + + org.apache.spark spark-core_2.11 @@ -34,6 +40,10 @@ com.jayway.jsonpath json-path + + org.mongodb + mongo-java-driver + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ImportDataFromMongo.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ImportDataFromMongo.java new file mode 100644 index 000000000..8872cf696 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ImportDataFromMongo.java @@ -0,0 +1,103 @@ +package eu.dnetlib.dhp.graph; + +import com.mongodb.*; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.message.Message; +import eu.dnetlib.message.MessageType; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.bson.Document; +import org.bson.conversions.Bson; + +import java.io.IOException; +import java.net.URI; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +public class ImportDataFromMongo { + + + public static void main(String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGraphImporterJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/import_from_mongo_parameters.json"))); + parser.parseArgument(args); + final int port = Integer.parseInt(parser.get("dbport")); + final String host = parser.get("dbhost"); + + final String format = parser.get("format"); + final String layout = parser.get("layout"); + final String interpretation = parser.get("interpretation"); + + final String dbName = parser.get("dbName"); + + + final MongoClient client = new MongoClient(host, port); + + MongoDatabase database = client.getDatabase(dbName); + + MongoCollection metadata = database.getCollection("metadata"); + MongoCollection metadataManager = database.getCollection("metadataManager"); + final DBObject query = QueryBuilder.start("format").is(format).and("layout").is(layout).and("interpretation").is(interpretation).get(); + final List ids = new ArrayList<>(); + metadata.find((Bson) query).forEach((Consumer) document -> ids.add(document.getString("mdId"))); + List databaseId = ids.stream().map(it -> getCurrentId(it, metadataManager)).filter(Objects::nonNull).collect(Collectors.toList()); + final String hdfsuri = parser.get("namenode"); + // ====== Init HDFS File System Object + Configuration conf = new Configuration(); + // Set FileSystem URI + conf.set("fs.defaultFS", hdfsuri); + // Because of Maven + conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); + conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); + + System.setProperty("HADOOP_USER_NAME", parser.get("user")); + System.setProperty("hadoop.home.dir", "/"); + FileSystem.get(URI.create(hdfsuri), conf); + Path hdfswritepath = new Path(parser.get("targetPath")); + + final AtomicInteger counter = new AtomicInteger(0); + try (SequenceFile.Writer writer = SequenceFile.createWriter(conf, + SequenceFile.Writer.file(hdfswritepath), SequenceFile.Writer.keyClass(IntWritable.class), + SequenceFile.Writer.valueClass(Text.class))) { + final IntWritable key = new IntWritable(counter.get()); + final Text value = new Text(); + databaseId.forEach(id -> { + System.out.println("Reading :"+id); + MongoCollection collection = database.getCollection(id); + collection.find().forEach((Consumer) document -> + { + key.set(counter.getAndIncrement()); + value.set(document.getString("body")); + + if (counter.get() % 10000 == 0) { + System.out.println("Added "+counter.get()); + } + try { + writer.append(key, value); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + ); + }); + } + } + + + private static String getCurrentId(final String mdId, final MongoCollection metadataManager) { + FindIterable result = metadataManager.find((Bson) QueryBuilder.start("mdId").is(mdId).get()); + final Document item = result.first(); + return item == null ? null : item.getString("currentId"); + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerMergeEntitiesJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerMergeEntitiesJob.java index b320fd51c..54496671f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerMergeEntitiesJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerMergeEntitiesJob.java @@ -5,7 +5,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.jayway.jsonpath.JsonPath; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.graph.SparkGraphImporterJob; -import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset; import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication; @@ -17,10 +16,8 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.SparkSession; import scala.Tuple2; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/AbstractScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/AbstractScholexplorerParser.java index 0ba7b25ee..5277f794b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/AbstractScholexplorerParser.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/AbstractScholexplorerParser.java @@ -82,7 +82,7 @@ public abstract class AbstractScholexplorerParser { } protected String generateId(final String pid, final String pidType, final String entityType) { - String type = "50|"; + String type; switch (entityType){ case "publication": type = "50|"; @@ -100,7 +100,7 @@ public abstract class AbstractScholexplorerParser { if ("dnet".equalsIgnoreCase(pidType)) return type+StringUtils.substringAfter(pid, "::"); - return type+ DHPUtils.md5(String.format("%s::%s", pid, pidType)); + return type+ DHPUtils.md5(String.format("%s::%s", pid.toLowerCase().trim(), pidType.toLowerCase().trim())); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/DatasetScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/DatasetScholexplorerParser.java index 578b18085..3a671e6a1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/DatasetScholexplorerParser.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/DatasetScholexplorerParser.java @@ -11,6 +11,7 @@ import eu.dnetlib.dhp.schema.scholexplorer.ProvenaceInfo; import eu.dnetlib.dhp.parser.utility.VtdUtilityParser.Node; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; import java.util.ArrayList; import java.util.Arrays; @@ -37,10 +38,6 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser { di.setInvisible(false); parsedObject.setDataInfo(di); - - final String objIdentifier = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='objIdentifier']"); - parsedObject.setId("60|" + StringUtils.substringAfter(objIdentifier, "::")); - parsedObject.setOriginalId(Collections.singletonList(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='recordIdentifier']"))); @@ -112,12 +109,16 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser { final List identifierType = VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='resource']/*[local-name()='identifier']", Collections.singletonList("identifierType")); - StructuredProperty currentPid = extractIdentifier(identifierType, "type"); + StructuredProperty currentPid = extractIdentifier(identifierType, "identifierType"); if (currentPid == null) return null; inferPid(currentPid); parsedObject.setPid(Collections.singletonList(currentPid)); + final String sourceId = generateId(currentPid.getValue(), currentPid.getQualifier().getClassid(), "dataset"); + parsedObject.setId(sourceId); + + List descs = VtdUtilityParser.getTextValue(ap, vn, "//*[local-name()='description']"); if (descs != null && descs.size() > 0) parsedObject.setDescription(descs.stream() @@ -149,15 +150,20 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser { final String targetId = generateId(relatedPid, relatedPidType, relatedType); r.setTarget(targetId); r.setRelType(relationSemantic); + r.setRelClass("datacite"); r.setCollectedFrom(parsedObject.getCollectedfrom()); + r.setDataInfo(di); rels.add(r); r = new Relation(); + r.setDataInfo(di); r.setSource(targetId); r.setTarget(parsedObject.getId()); r.setRelType(inverseRelation); + r.setRelClass("datacite"); r.setCollectedFrom(parsedObject.getCollectedfrom()); rels.add(r); - result.add(createUnknownObject(relatedPid, relatedPidType, parsedObject.getCollectedfrom().get(0), di)); + if("unknown".equalsIgnoreCase(relatedType)) + result.add(createUnknownObject(relatedPid, relatedPidType, parsedObject.getCollectedfrom().get(0), di)); return rels.stream(); }).collect(Collectors.toList())); } @@ -185,6 +191,13 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser { parsedObject.setSubject(subjects); + Qualifier q = new Qualifier(); + q.setClassname("dataset"); + q.setClassid("dataset"); + q.setSchemename("dataset"); + q.setSchemeid("dataset"); + parsedObject.setResulttype(q); + parsedObject.setCompletionStatus(completionStatus); final List creators = VtdUtilityParser.getTextValue(ap, vn, "//*[local-name()='resource']//*[local-name()='creator']/*[local-name()='creatorName']"); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/PublicationScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/PublicationScholexplorerParser.java index 6e3221da5..45ef2066b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/PublicationScholexplorerParser.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/PublicationScholexplorerParser.java @@ -36,9 +36,6 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser di.setDeletedbyinference(false); di.setInvisible(false); - final String objIdentifier = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='objIdentifier']"); - parsedObject.setId("50|" + StringUtils.substringAfter(objIdentifier, "::")); - parsedObject.setDateofcollection(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='dateOfCollection']")); final String resolvedDate = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='resolvedDate']"); @@ -63,6 +60,8 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser if (currentPid == null) return null; inferPid(currentPid); parsedObject.setPid(Collections.singletonList(currentPid)); + final String sourceId = generateId(currentPid.getValue(), currentPid.getQualifier().getClassid(), "publication"); + parsedObject.setId(sourceId); String provisionMode = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='provisionMode']"); @@ -136,12 +135,12 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser r.setDataInfo(di); rels.add(r); r = new Relation(); + r.setDataInfo(di); r.setSource(targetId); r.setTarget(parsedObject.getId()); r.setRelType(inverseRelation); - r.setCollectedFrom(parsedObject.getCollectedfrom()); - r.setDataInfo(di); r.setRelClass("datacite"); + r.setCollectedFrom(parsedObject.getCollectedfrom()); rels.add(r); return rels.stream(); @@ -217,7 +216,13 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser parsedObject.setDataInfo(di); - + parsedObject.setSubject(subjects); + Qualifier q = new Qualifier(); + q.setClassname("publication"); + q.setClassid("publication"); + q.setSchemename("publication"); + q.setSchemeid("publication"); + parsedObject.setResulttype(q); result.add(parsedObject); return result; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/mergeentities/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/ConvertXMLToEntities/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/mergeentities/oozie_app/config-default.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/ConvertXMLToEntities/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/mergeentities/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/ConvertXMLToEntities/oozie_app/workflow.xml similarity index 72% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/mergeentities/oozie_app/workflow.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/ConvertXMLToEntities/oozie_app/workflow.xml index 102587ab0..a1faaa0f5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/mergeentities/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/ConvertXMLToEntities/oozie_app/workflow.xml @@ -8,10 +8,6 @@ targetPath the source path - - targetDir - the name of the path - sparkDriverMemory memory for driver process @@ -26,15 +22,22 @@ entity - the entity to be merged + the entity type - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + @@ -42,15 +45,10 @@ ${nameNode} yarn-cluster cluster - Merge ${entity} - eu.dnetlib.dhp.graph.scholexplorer.SparkScholexplorerMergeEntitiesJob + Import ${entity} and related entities + eu.dnetlib.dhp.graph.scholexplorer.SparkScholexplorerGraphImporter dhp-graph-mapper-${projectVersion}.jar - - --executor-memory ${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --num-executors 100 - --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" - + --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} -mt yarn-cluster --sourcePath${sourcePath} --targetPath${targetPath} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/extractentities/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/Extractentities/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/extractentities/oozie_app/config-default.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/Extractentities/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/extractentities/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/Extractentities/oozie_app/workflow.xml similarity index 70% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/extractentities/oozie_app/workflow.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/Extractentities/oozie_app/workflow.xml index ef968b0cd..6caa8b1c3 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/extractentities/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/Extractentities/oozie_app/workflow.xml @@ -20,23 +20,34 @@ sparkExecutorMemory memory for individual executor - - sparkExecutorCores - number of cores used by single executor - entities the entities to be extracted - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + + - + ${jobTracker} ${nameNode} @@ -47,12 +58,8 @@ dhp-graph-mapper-${projectVersion}.jar --executor-memory ${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --num-executors 100 - - - - --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" + --driver-memory=${sparkDriverMemory} + ${sparkExtraOPT} -mt yarn-cluster --sourcePath${sourcePath} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/ImportMongoToHDFS/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/oozie_app/config-default.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/ImportMongoToHDFS/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/ImportMongoToHDFS/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/ImportMongoToHDFS/oozie_app/workflow.xml new file mode 100644 index 000000000..f3c9a4ecb --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/ImportMongoToHDFS/oozie_app/workflow.xml @@ -0,0 +1,73 @@ + + + + workingPath + the working dir base path + + + targetPath + the graph Raw base path + + + format + the postgres URL to access to the database + + + layout + the user postgres + + + interpretation + the password postgres + + + dbhost + mongoDB url, example: mongodb://[username:password@]host[:port] + + + dbName + mongo database + + + user + HDFS user + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.graph.ImportDataFromMongo + -t${targetPath} + -n${nameNode} + -u${user} + -h${dbhost} + -p27017 + -dn${dbName} + -f${format} + -l${layout} + -i${interpretation} + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/MergeEntities/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/MergeEntities/oozie_app/config-default.xml new file mode 100644 index 000000000..6fb2a1253 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/MergeEntities/oozie_app/config-default.xml @@ -0,0 +1,10 @@ + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/MergeEntities/oozie_app/workflow.xml similarity index 53% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/oozie_app/workflow.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/MergeEntities/oozie_app/workflow.xml index 3efb90ae4..d04e76b2a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/MergeEntities/oozie_app/workflow.xml @@ -16,43 +16,41 @@ sparkExecutorMemory memory for individual executor - - sparkExecutorCores - number of cores used by single executor - entity - the entity type + the entity to be merged - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + + + + + + + + + + + ${jobTracker} ${nameNode} yarn-cluster cluster - Import ${entity} and related entities - eu.dnetlib.dhp.graph.scholexplorer.SparkScholexplorerGraphImporter + Merge ${entity} + eu.dnetlib.dhp.graph.scholexplorer.SparkScholexplorerMergeEntitiesJob dhp-graph-mapper-${projectVersion}.jar - - --executor-memory ${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --num-executors 100 - - - - --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" - + --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} -mt yarn-cluster - --sourcePath${sourcePath} - --targetPath${targetPath} + --sourcePath${sourcePath}/${entity} + --targetPath${targetPath}/${entity} --entity${entity} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/import_from_mongo_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/import_from_mongo_parameters.json new file mode 100644 index 000000000..9032be287 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/import_from_mongo_parameters.json @@ -0,0 +1,12 @@ +[ + {"paramName":"n", "paramLongName":"namenode", "paramDescription": "the name node", "paramRequired": true}, + {"paramName":"u", "paramLongName":"user", "paramDescription": "the name node", "paramRequired": true}, + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the name node", "paramRequired": true}, + {"paramName":"h", "paramLongName":"dbhost", "paramDescription": "the mongo host", "paramRequired": true}, + {"paramName":"p", "paramLongName":"dbport", "paramDescription": "the mongo port", "paramRequired": true}, + {"paramName":"f", "paramLongName":"format", "paramDescription": "the metadata format to import", "paramRequired": true}, + {"paramName":"l", "paramLongName":"layout", "paramDescription": "the metadata layout to import", "paramRequired": true}, + {"paramName":"i", "paramLongName":"interpretation", "paramDescription": "the metadata interpretation to import", "paramRequired": true}, + {"paramName":"dn", "paramLongName":"dbName", "paramDescription": "the database Name", "paramRequired": true} + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml deleted file mode 100644 index 24090a245..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml +++ /dev/null @@ -1,51 +0,0 @@ - - - - sourcePath - the source path - - - hive_db_name - the target hive database name - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - MapGraphIntoDataFrame - eu.dnetlib.dhp.graph.SparkGraphImporterJob - dhp-graph-mapper-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" - -mt yarn-cluster - --sourcePath${sourcePath} - --hive_db_name${hive_db_name} - --hive_metastore_uris${hive_metastore_uris} - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/ImportDataFromMongoTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/ImportDataFromMongoTest.java new file mode 100644 index 000000000..50248c83d --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/ImportDataFromMongoTest.java @@ -0,0 +1,22 @@ +package eu.dnetlib.dhp.graph; + +import org.junit.Test; + +public class ImportDataFromMongoTest { + + @Test + public void doTest() throws Exception { + ImportDataFromMongo.main(new String[] { + "-h", "localhost", + "-p", "2800", + "-f", "PMF", + "-l", "store", + "-i", "cleaned", + "-dn", "mdstore_dli", + "-n", "file:///home/sandro/test.seq", + "-u", "sandro", + "-t", "file:///home/sandro/test.seq" + }); + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/scholexplorer/ScholexplorerParserTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/scholexplorer/ScholexplorerParserTest.java new file mode 100644 index 000000000..e87bc8913 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/scholexplorer/ScholexplorerParserTest.java @@ -0,0 +1,38 @@ +package eu.dnetlib.dhp.graph.scholexplorer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import eu.dnetlib.dhp.graph.scholexplorer.parser.DatasetScholexplorerParser; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import org.apache.commons.io.IOUtils; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +public class ScholexplorerParserTest { + + + @Test + public void testDataciteParser() throws IOException { + String xml = IOUtils.toString(this.getClass().getResourceAsStream("dmf.xml")); + + DatasetScholexplorerParser p = new DatasetScholexplorerParser(); + List oaves = p.parseObject(xml); + + ObjectMapper m = new ObjectMapper(); + m.enable(SerializationFeature.INDENT_OUTPUT); + + + oaves.forEach(oaf -> { + try { + System.out.println(m.writeValueAsString(oaf)); + System.out.println("----------------------------"); + } catch (JsonProcessingException e) { + + } + }); + + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/graph/scholexplorer/dmf.xml b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/graph/scholexplorer/dmf.xml new file mode 100644 index 000000000..58defb67b --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/graph/scholexplorer/dmf.xml @@ -0,0 +1,66 @@ + + + + aaadf8b3-01a8-4cc2-9964-63cfb19df3b4_UmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZXMvUmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZVR5cGU= + oai:pangaea.de:doi:10.1594/PANGAEA.821876 + r3d100010134 + r3d100010134::000083be706192d2d839915694ecfd47 +2020-01-08T04:12:12.287 + 2020-01-08T03:24:10.865Z + + oai:pangaea.de:doi:10.1594/PANGAEA.821876 + citable + + + + 10.1594/pangaea.821876 + Macke, AndreasKalisch, John + Total Sky Imager observations during POLARSTERN cruise ANT-XXVI/4 on 2010-05-14 with links to images + +PANGAEA - Data Publisher for Earth & Environmental Science + + 2010-05-14T00:13:47/2010-05-14T23:55:47 + + + + DATE/TIME + + LATITUDE + + LONGITUDE + + Uniform resource locator/link to image + + Total Sky Imager + + ANT-XXVI/4 + + Polarstern + + + dataset + + + dli_resolver::cf447a378b0b6603593f8b0e57242695 + + http://hs.pangaea.de/images/airphoto/ps/ps75/2010-05-14/ant-xxvi_4_2010-05-14_tsi-images-links.zip + + dli_resolver::f0f5975d20991cffd222c6002ddd5821 + + + + + + + complete + + + + + + + + diff --git a/pom.xml b/pom.xml index 5323276aa..ada3a33a4 100644 --- a/pom.xml +++ b/pom.xml @@ -138,6 +138,12 @@ commons-io 2.4 + + org.mongodb + mongo-java-driver + 3.4.2 + + commons-cli @@ -200,7 +206,7 @@ eu.dnetlib dnet-pace-core - 4.0.0-SNAPSHOT + 4.0.0 @@ -418,7 +424,7 @@ UTF-8 UTF-8 3.6.0 - 2.22.2 + 2.22.2 cdh5.9.2 2.6.0-${dhp.cdh.version} 4.1.0-${dhp.cdh.version}