diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml
index 59b7d35d2..345a5475f 100644
--- a/dhp-common/pom.xml
+++ b/dhp-common/pom.xml
@@ -46,6 +46,10 @@
com.ximpleware
vtd-xml
+
+ com.jayway.jsonpath
+ json-path
+
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java
index 846ece5ed..5de2b70ff 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java
@@ -1,5 +1,7 @@
package eu.dnetlib.dhp.utils;
+import com.jayway.jsonpath.JsonPath;
+import net.minidev.json.JSONArray;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.binary.Base64OutputStream;
import org.apache.commons.codec.binary.Hex;
@@ -56,4 +58,17 @@ public class DHPUtils {
}
+ public static String getJPathString(final String jsonPath, final String json) {
+ try {
+ Object o = JsonPath.read(json, jsonPath);
+ if (o instanceof String)
+ return (String) o;
+ if (o instanceof JSONArray && ((JSONArray) o).size() > 0)
+ return (String) ((JSONArray) o).get(0);
+ return "";
+ } catch (Exception e) {
+ return "";
+ }
+ }
+
}
diff --git a/dhp-workflows/dhp-aggregation/pom.xml b/dhp-workflows/dhp-aggregation/pom.xml
index 328e783c4..c6bb99fc3 100644
--- a/dhp-workflows/dhp-aggregation/pom.xml
+++ b/dhp-workflows/dhp-aggregation/pom.xml
@@ -45,6 +45,7 @@
jaxen
+
org.mockito
mockito-core
diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java
index 5f81669e9..ebb504078 100644
--- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java
+++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java
@@ -1,24 +1,20 @@
package eu.dnetlib.dedup;
+import com.fasterxml.jackson.databind.DeserializationFeature;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.util.MapDocumentUtil;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
-import org.codehaus.jackson.map.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectMapper;
import scala.Tuple2;
import java.util.Collection;
-import java.util.Random;
-
-import static java.util.stream.Collectors.toMap;
public class DedupRecordFactory {
@@ -73,6 +69,8 @@ public class DedupRecordFactory {
p.setId(e._1());
final ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
final Collection dateofacceptance = Lists.newArrayList();
@@ -105,6 +103,7 @@ public class DedupRecordFactory {
d.setId(e._1());
final ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
final Collection dateofacceptance = Lists.newArrayList();
@@ -137,6 +136,7 @@ public class DedupRecordFactory {
p.setId(e._1());
final ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
if (e._2() != null)
e._2().forEach(proj -> {
try {
@@ -160,6 +160,7 @@ public class DedupRecordFactory {
s.setId(e._1());
final ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
final Collection dateofacceptance = Lists.newArrayList();
if (e._2() != null)
e._2().forEach(soft -> {
@@ -187,6 +188,7 @@ public class DedupRecordFactory {
Datasource d = new Datasource(); //the result of the merge, to be returned at the end
d.setId(e._1());
final ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
if (e._2() != null)
e._2().forEach(dat -> {
try {
@@ -211,6 +213,7 @@ public class DedupRecordFactory {
o.setId(e._1());
final ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
StringBuilder trust = new StringBuilder("0.0");
@@ -251,6 +254,7 @@ public class DedupRecordFactory {
o.setId(e._1());
final ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
final Collection dateofacceptance = Lists.newArrayList();
diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java
index 3bed74f86..196a8c140 100644
--- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java
+++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java
@@ -151,11 +151,11 @@ public class DedupUtility {
}
public static String createSimRelPath(final String basePath, final String entityType) {
- return String.format("%s/%s_simRel", basePath, entityType);
+ return String.format("%s/%s/simRel", basePath, entityType);
}
public static String createMergeRelPath(final String basePath, final String entityType) {
- return String.format("%s/%s_mergeRel", basePath, entityType);
+ return String.format("%s/%s/mergeRel", basePath, entityType);
}
private static Double sim(Author a, Author b) {
diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateDedupRecord.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateDedupRecord.java
index db2306526..8e60df945 100644
--- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateDedupRecord.java
+++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateDedupRecord.java
@@ -10,7 +10,6 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
public class SparkCreateDedupRecord {
-
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateDedupRecord.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedupRecord_parameters.json")));
parser.parseArgument(args);
@@ -24,16 +23,12 @@ public class SparkCreateDedupRecord {
final String sourcePath = parser.get("sourcePath");
final String entity = parser.get("entity");
final String dedupPath = parser.get("dedupPath");
-// final DedupConfig dedupConf = DedupConfig.load(IOUtils.toString(SparkCreateDedupRecord.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf2.json")));
final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf"));
final JavaRDD dedupRecord = DedupRecordFactory.createDedupRecord(sc, spark, DedupUtility.createMergeRelPath(dedupPath,entity), DedupUtility.createEntityPath(sourcePath,entity), OafEntityType.valueOf(entity), dedupConf);
dedupRecord.map(r-> {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(r);
- }).saveAsTextFile(dedupPath+"/"+entity+"_dedup_record_json");
-
-
+ }).saveAsTextFile(dedupPath+"/"+entity+"/dedup_records");
}
-
}
diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java
index 831e45daf..2bdfa8759 100644
--- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java
+++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java
@@ -44,7 +44,7 @@ public class SparkCreateSimRels {
// final DedupConfig dedupConf = DedupConfig.load(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf"));
- final long total = sc.textFile(inputPath + "/" + entity).count();
+
JavaPairRDD mapDocument = sc.textFile(inputPath + "/" + entity)
.mapToPair(s->{
@@ -70,4 +70,4 @@ public class SparkCreateSimRels {
spark.createDataset(isSimilarToRDD.rdd(), Encoders.bean(Relation.class)).write().mode("overwrite").save( DedupUtility.createSimRelPath(targetPath,entity));
}
-}
\ No newline at end of file
+}
diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkPropagateRelationsJob.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkPropagateRelationsJob.java
new file mode 100644
index 000000000..9a9abebe6
--- /dev/null
+++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkPropagateRelationsJob.java
@@ -0,0 +1,117 @@
+package eu.dnetlib.dedup;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.schema.oaf.DataInfo;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+import eu.dnetlib.dhp.utils.DHPUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import scala.Tuple2;
+
+import java.io.IOException;
+
+public class SparkPropagateRelationsJob {
+ enum FieldType {
+ SOURCE,
+ TARGET
+ }
+ final static String IDJSONPATH = "$.id";
+ final static String SOURCEJSONPATH = "$.source";
+ final static String TARGETJSONPATH = "$.target";
+
+ public static void main(String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkPropagateRelationsJob.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedup_propagate_relation_parameters.json")));
+ parser.parseArgument(args);
+ final SparkSession spark = SparkSession
+ .builder()
+ .appName(SparkUpdateEntityJob.class.getSimpleName())
+ .master(parser.get("master"))
+ .getOrCreate();
+
+ final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
+ final String relationPath = parser.get("relationPath");
+ final String mergeRelPath = parser.get("mergeRelPath");
+ final String targetRelPath = parser.get("targetRelPath");
+
+
+ final Dataset df = spark.read().load(mergeRelPath).as(Encoders.bean(Relation.class));
+
+
+
+ final JavaPairRDD mergedIds = df
+ .where("relClass == 'merges'")
+ .select(df.col("source"),df.col("target"))
+ .distinct()
+ .toJavaRDD()
+ .mapToPair((PairFunction) r -> new Tuple2<>(r.getString(1), r.getString(0)));
+
+
+ final JavaRDD sourceEntity = sc.textFile(relationPath);
+ JavaRDD newRels = sourceEntity.mapToPair(
+ (PairFunction) s ->
+ new Tuple2<>(DHPUtils.getJPathString(SOURCEJSONPATH, s), s))
+ .leftOuterJoin(mergedIds)
+ .map((Function>>, String>) v1 -> {
+ if (v1._2()._2().isPresent()) {
+ return replaceField(v1._2()._1(), v1._2()._2().get(), FieldType.SOURCE);
+ }
+ return v1._2()._1();
+ })
+ .mapToPair(
+ (PairFunction) s ->
+ new Tuple2<>(DHPUtils.getJPathString(TARGETJSONPATH, s), s))
+ .leftOuterJoin(mergedIds)
+ .map((Function>>, String>) v1 -> {
+ if (v1._2()._2().isPresent()) {
+ return replaceField(v1._2()._1(), v1._2()._2().get(), FieldType.TARGET);
+ }
+ return v1._2()._1();
+ }).filter(SparkPropagateRelationsJob::containsDedup)
+ .repartition(500);
+
+ newRels.union(sourceEntity).repartition(1000).saveAsTextFile(targetRelPath, GzipCodec.class);
+ }
+
+ private static boolean containsDedup(final String json) {
+ final String source = DHPUtils.getJPathString(SOURCEJSONPATH, json);
+ final String target = DHPUtils.getJPathString(TARGETJSONPATH, json);
+
+ return source.toLowerCase().contains("dedup") || target.toLowerCase().contains("dedup");
+ }
+
+
+ private static String replaceField(final String json, final String id, final FieldType type) {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ try {
+ Relation relation = mapper.readValue(json, Relation.class);
+ if (relation.getDataInfo() == null)
+ relation.setDataInfo(new DataInfo());
+ relation.getDataInfo().setDeletedbyinference(false);
+ switch (type) {
+ case SOURCE:
+ relation.setSource(id);
+ return mapper.writeValueAsString(relation);
+ case TARGET:
+ relation.setTarget(id);
+ return mapper.writeValueAsString(relation);
+ default:
+ throw new IllegalArgumentException("");
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("unable to deserialize json relation: " + json, e);
+ }
+ }
+}
diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkUpdateEntityJob.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkUpdateEntityJob.java
new file mode 100644
index 000000000..e7bb4f9c2
--- /dev/null
+++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkUpdateEntityJob.java
@@ -0,0 +1,114 @@
+package eu.dnetlib.dedup;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.schema.oaf.DataInfo;
+import eu.dnetlib.dhp.schema.oaf.Oaf;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset;
+import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication;
+import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown;
+import eu.dnetlib.dhp.utils.DHPUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import scala.Tuple2;
+
+import java.io.IOException;
+
+public class SparkUpdateEntityJob {
+
+ final static String IDJSONPATH = "$.id";
+ final static String SOURCEJSONPATH = "$.source";
+ final static String TARGETJSONPATH = "$.target";
+
+ public static void main(String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkUpdateEntityJob.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedup_delete_by_inference_parameters.json")));
+ parser.parseArgument(args);
+ final SparkSession spark = SparkSession
+ .builder()
+ .appName(SparkUpdateEntityJob.class.getSimpleName())
+ .master(parser.get("master"))
+ .getOrCreate();
+
+ final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
+ final String entityPath = parser.get("entityPath");
+ final String mergeRelPath = parser.get("mergeRelPath");
+ final String dedupRecordPath = parser.get("dedupRecordPath");
+ final String entity = parser.get("entity");
+
+ final Dataset df = spark.read().load(mergeRelPath).as(Encoders.bean(Relation.class));
+ final JavaPairRDD mergedIds = df
+ .where("relClass == 'merges'")
+ .select(df.col("target"))
+ .distinct()
+ .toJavaRDD()
+ .mapToPair((PairFunction) r -> new Tuple2<>(r.getString(0), "d"));
+ final JavaRDD sourceEntity = sc.textFile(entityPath);
+
+ if ("relation".equalsIgnoreCase(entity)) {
+ sourceEntity.mapToPair(
+ (PairFunction) s ->
+ new Tuple2<>(DHPUtils.getJPathString(SOURCEJSONPATH, s), s))
+ .leftOuterJoin(mergedIds)
+ .map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), Relation.class) : k._2()._1())
+ .mapToPair((PairFunction) s -> new Tuple2<>(DHPUtils.getJPathString(TARGETJSONPATH, s), s))
+ .leftOuterJoin(mergedIds)
+ .map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), Relation.class) : k._2()._1())
+ .saveAsTextFile(entityPath + "_new", GzipCodec.class);
+ } else {
+ final JavaRDD dedupEntity = sc.textFile(dedupRecordPath);
+ JavaPairRDD entitiesWithId = sourceEntity.mapToPair((PairFunction) s -> new Tuple2<>(DHPUtils.getJPathString(IDJSONPATH, s), s));
+ Class extends Oaf> mainClass;
+ switch (entity) {
+ case "publication":
+ mainClass = DLIPublication.class;
+ break;
+ case "dataset":
+ mainClass = DLIDataset.class;
+ break;
+ case "unknown":
+ mainClass = DLIUnknown.class;
+ break;
+ default:
+ throw new IllegalArgumentException("Illegal type " + entity);
+
+ }
+
+ JavaRDD map = entitiesWithId.leftOuterJoin(mergedIds).map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), mainClass) : k._2()._1());
+
+
+ map.union(dedupEntity).saveAsTextFile(entityPath + "_new", GzipCodec.class);
+ }
+
+
+ }
+
+
+ private static String updateDeletedByInference(final String json, final Class clazz) {
+
+ final ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ try {
+ Oaf entity = mapper.readValue(json, clazz);
+ if (entity.getDataInfo()== null)
+ entity.setDataInfo(new DataInfo());
+ entity.getDataInfo().setDeletedbyinference(true);
+ return mapper.writeValueAsString(entity);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to convert json", e);
+ }
+
+
+ }
+
+
+}
diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/dedup_delete_by_inference_parameters.json b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/dedup_delete_by_inference_parameters.json
new file mode 100644
index 000000000..fecc666c4
--- /dev/null
+++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/dedup_delete_by_inference_parameters.json
@@ -0,0 +1,31 @@
+[
+ {
+ "paramName": "mt",
+ "paramLongName": "master",
+ "paramDescription": "should be local or yarn",
+ "paramRequired": true
+ },
+ {
+ "paramName": "ep",
+ "paramLongName": "entityPath",
+ "paramDescription": "the input entity path",
+ "paramRequired": true
+ },
+ {
+ "paramName": "mr",
+ "paramLongName": "mergeRelPath",
+ "paramDescription": "the input path of merge Rel",
+ "paramRequired": true
+ },
+ {
+ "paramName": "dr",
+ "paramLongName": "dedupRecordPath",
+ "paramDescription": "the inputPath of dedup record",
+ "paramRequired": true
+ }, {
+ "paramName": "e",
+ "paramLongName": "entity",
+ "paramDescription": "the type of entity",
+ "paramRequired": true
+}
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/dedup_propagate_relation_parameters.json b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/dedup_propagate_relation_parameters.json
new file mode 100644
index 000000000..2ce78440f
--- /dev/null
+++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/dedup_propagate_relation_parameters.json
@@ -0,0 +1,26 @@
+[
+ {
+ "paramName": "mt",
+ "paramLongName": "master",
+ "paramDescription": "should be local or yarn",
+ "paramRequired": true
+ },
+ {
+ "paramName": "ep",
+ "paramLongName": "relationPath",
+ "paramDescription": "the input relation path",
+ "paramRequired": true
+ },
+ {
+ "paramName": "mr",
+ "paramLongName": "mergeRelPath",
+ "paramDescription": "the input path of merge Rel",
+ "paramRequired": true
+ },
+ {
+ "paramName": "t",
+ "paramLongName": "targetRelPath",
+ "paramDescription": "the output Rel Path",
+ "paramRequired": true
+ }
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml
index 5a00a5967..89ebb17ff 100644
--- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml
@@ -24,27 +24,24 @@
sparkExecutorMemory
memory for individual executor
-
- sparkExecutorCores
- number of cores used by single executor
-
-
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
@@ -55,11 +52,11 @@
Create Similarity Relations
eu.dnetlib.dedup.SparkCreateSimRels
dhp-dedup-${projectVersion}.jar
- --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores}
- --driver-memory=${sparkDriverMemory} --conf
- spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf
- spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf
- spark.sql.warehouse.dir="/user/hive/warehouse"
+
+ --executor-memory ${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --num-executors 100
+ --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2"
-mtyarn-cluster
--sourcePath${sourcePath}
@@ -71,7 +68,6 @@
-
${jobTracker}
@@ -81,11 +77,11 @@
Create Connected Components
eu.dnetlib.dedup.SparkCreateConnectedComponent
dhp-dedup-${projectVersion}.jar
- --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores}
- --driver-memory=${sparkDriverMemory} --conf
- spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf
- spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf
- spark.sql.warehouse.dir="/user/hive/warehouse"
+
+ --executor-memory ${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --num-executors 100
+ --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2"
-mtyarn-cluster
--sourcePath${sourcePath}
@@ -106,21 +102,46 @@
Create Dedup Record
eu.dnetlib.dedup.SparkCreateDedupRecord
dhp-dedup-${projectVersion}.jar
- --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores}
- --driver-memory=${sparkDriverMemory} --conf
- spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf
- spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf
- spark.sql.warehouse.dir="/user/hive/warehouse"
+
+ --executor-memory ${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --num-executors 100
+ --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2"
-mtyarn-cluster
--sourcePath${sourcePath}
- --dedupPath${dedupPath}
+ --dedupPath${targetPath}
--entity${entity}
--dedupConf${dedupConf}
+
+
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+ yarn-cluster
+ cluster
+ Propagate Dedup Relations
+ eu.dnetlib.dedup.SparkPropagateRelationsJob
+ dhp-dedup-${projectVersion}.jar
+
+ --executor-memory ${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --num-executors 100
+ --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2"
+
+ -mtyarn-cluster
+ --mergeRelPath${targetPath}/${entity}/mergeRel
+ --relationPath${sourcePath}/relation
+ --targetRelPath${targetPath}/${entity}/relation_updated
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/propagaterels/oozie_app/config-default.xml
similarity index 100%
rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml
rename to dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/propagaterels/oozie_app/config-default.xml
diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/propagaterels/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/propagaterels/oozie_app/workflow.xml
new file mode 100644
index 000000000..fd5cd6d7f
--- /dev/null
+++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/propagaterels/oozie_app/workflow.xml
@@ -0,0 +1,52 @@
+
+
+
+ relationPath
+ the source path
+
+
+ mergeRelPath
+ the target path
+
+
+ sparkDriverMemory
+ memory for driver process
+
+
+ sparkExecutorMemory
+ memory for individual executor
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+ yarn-cluster
+ cluster
+ Propagate Dedup Relations
+ eu.dnetlib.dedup.SparkPropagateRelationsJob
+ dhp-dedup-${projectVersion}.jar
+
+ --executor-memory ${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --num-executors 100
+ --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2"
+
+ -mtyarn-cluster
+ --mergeRelPath${mergeRelPath}
+ --relationPath${relationPath}
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/update/entity/oozie_app/config-default.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/update/entity/oozie_app/config-default.xml
new file mode 100644
index 000000000..ba2df7773
--- /dev/null
+++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/update/entity/oozie_app/config-default.xml
@@ -0,0 +1,30 @@
+
+
+ jobTracker
+ yarnRM
+
+
+ nameNode
+ hdfs://nameservice1
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
+ hive_metastore_uris
+ thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083
+
+
+ hive_db_name
+ openaire
+
+
+ master
+ yarn
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/update/entity/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/update/entity/oozie_app/workflow.xml
new file mode 100644
index 000000000..d98344736
--- /dev/null
+++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/update/entity/oozie_app/workflow.xml
@@ -0,0 +1,65 @@
+
+
+
+ entity
+ the entity that should be processed
+
+
+ entityPath
+ the source path
+
+
+ mergeRelPath
+ the target path
+
+
+ dedupRecordPath
+ the target path
+
+
+ master
+ the target path
+
+
+ sparkDriverMemory
+ memory for driver process
+
+
+ sparkExecutorMemory
+ memory for individual executor
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+ ${master}
+ cluster
+ Update ${entity} and add DedupRecord
+ eu.dnetlib.dedup.SparkUpdateEntityJob
+ dhp-dedup-${projectVersion}.jar
+
+ --executor-memory ${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --num-executors 100
+ --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2"
+
+ -mt${master}
+ --entityPath${entityPath}
+ --mergeRelPath${mergeRelPath}
+ --entity${entity}
+ --dedupRecordPath${dedupRecordPath}
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dedup/SparkCreateDedupTest.java b/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dedup/SparkCreateDedupTest.java
index f93703e37..fb1be554b 100644
--- a/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dedup/SparkCreateDedupTest.java
+++ b/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dedup/SparkCreateDedupTest.java
@@ -1,19 +1,14 @@
package eu.dnetlib.dedup;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.schema.oaf.Publication;
-import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
-import java.io.File;
import java.io.IOException;
-import java.util.List;
public class SparkCreateDedupTest {
@@ -22,7 +17,7 @@ public class SparkCreateDedupTest {
@Before
public void setUp() throws IOException {
- configuration = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dedup/conf/org.curr.conf.json"));
+ configuration = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dedup/conf/pub_scholix.conf.json"));
}
@@ -38,6 +33,14 @@ public class SparkCreateDedupTest {
});
}
+
+ @Test
+ public void createDeletedByInference() throws Exception {
+ SparkUpdateEntityJob.main(new String[] {
+ "-mt", "local[*]"
+ });
+ }
+
@Test
@Ignore
public void createCCTest() throws Exception {
diff --git a/dhp-workflows/dhp-dedup/src/test/resources/eu/dnetlib/dedup/conf/pub_scholix.conf.json b/dhp-workflows/dhp-dedup/src/test/resources/eu/dnetlib/dedup/conf/pub_scholix.conf.json
new file mode 100644
index 000000000..d91419853
--- /dev/null
+++ b/dhp-workflows/dhp-dedup/src/test/resources/eu/dnetlib/dedup/conf/pub_scholix.conf.json
@@ -0,0 +1,378 @@
+{
+ "wf": {
+ "threshold": "0.99",
+ "dedupRun": "001",
+ "entityType": "result",
+ "subEntityType": "resulttype",
+ "subEntityValue": "publication",
+ "orderField": "title",
+ "queueMaxSize": "2000",
+ "groupMaxSize": "100",
+ "maxChildren": "100",
+ "slidingWindowSize": "200",
+ "rootBuilder": [
+ ],
+ "includeChildren": "true",
+ "maxIterations": 20,
+ "idPath": "$.id"
+ },
+ "pace": {
+ "clustering": [
+ {
+ "name": "ngrampairs",
+ "fields": [
+ "title"
+ ],
+ "params": {
+ "max": "1",
+ "ngramLen": "3"
+ }
+ },
+ {
+ "name": "suffixprefix",
+ "fields": [
+ "title"
+ ],
+ "params": {
+ "max": "1",
+ "len": "3"
+ }
+ }
+ ],
+ "decisionTree": {
+ "start": {
+ "fields": [
+ {
+ "field": "pid",
+ "comparator": "jsonListMatch",
+ "weight": 1.0,
+ "countIfUndefined": "false",
+ "params": {
+ "jpath_value": "$.value",
+ "jpath_classid": "$.qualifier.classid"
+ }
+ }
+ ],
+ "threshold": 0.5,
+ "aggregation": "AVG",
+ "positive": "MATCH",
+ "negative": "layer2",
+ "undefined": "layer2",
+ "ignoreUndefined": "true"
+ },
+ "layer2": {
+ "fields": [
+ {
+ "field": "title",
+ "comparator": "titleVersionMatch",
+ "weight": 1.0,
+ "countIfUndefined": "false",
+ "params": {}
+ },
+ {
+ "field": "authors",
+ "comparator": "sizeMatch",
+ "weight": 1.0,
+ "countIfUndefined": "false",
+ "params": {}
+ }
+ ],
+ "threshold": 1.0,
+ "aggregation": "AND",
+ "positive": "layer3",
+ "negative": "NO_MATCH",
+ "undefined": "layer3",
+ "ignoreUndefined": "false"
+ },
+ "layer3": {
+ "fields": [
+ {
+ "field": "title",
+ "comparator": "levensteinTitle",
+ "weight": 1.0,
+ "countIfUndefined": "true",
+ "params": {}
+ }
+ ],
+ "threshold": 0.99,
+ "aggregation": "AVG",
+ "positive": "MATCH",
+ "negative": "NO_MATCH",
+ "undefined": "NO_MATCH",
+ "ignoreUndefined": "true"
+ }
+ },
+ "model": [
+ {
+ "name": "pid",
+ "type": "JSON",
+ "path": "$.pid",
+ "overrideMatch": "true"
+ },
+ {
+ "name": "title",
+ "type": "String",
+ "path": "$.title[*].value",
+ "length": 250,
+ "size": 5
+ },
+ {
+ "name": "authors",
+ "type": "List",
+ "path": "$.author[*].fullname",
+ "size": 200
+ },
+ {
+ "name": "resulttype",
+ "type": "String",
+ "path": "$.resulttype.classid"
+ }
+ ],
+ "blacklists": {
+ "title": [
+ "^Inside Front Cover$",
+ "^CORR Insights$",
+ "^Index des notions$",
+ "^Department of Error.$",
+ "^Untitled Item$",
+ "^Department of Error$",
+ "^Tome II : 1598 à 1605$",
+ "^(à l’exception de roi, prince, royauté, pouvoir, image… qui sont omniprésents)$",
+ "^Museen und Ausstellungsinstitute in Nürnberg$",
+ "^Text/Conference Paper$",
+ "^Table des illustrations$",
+ "^An Intimate Insight on Psychopathy and a Novel Hermeneutic Psychological Science$",
+ "^Index des noms$",
+ "^Reply by Authors.$",
+ "^Titelblatt - Inhalt$",
+ "^Index des œuvres,$",
+ "(?i)^Poster presentations$",
+ "^THE ASSOCIATION AND THE GENERAL MEDICAL COUNCIL$",
+ "^Problems with perinatal pathology\\.?$",
+ "(?i)^Cases? of Puerperal Convulsions$",
+ "(?i)^Operative Gyna?ecology$",
+ "(?i)^Mind the gap\\!?\\:?$",
+ "^Chronic fatigue syndrome\\.?$",
+ "^Cartas? ao editor Letters? to the Editor$",
+ "^Note from the Editor$",
+ "^Anesthesia Abstract$",
+ "^Annual report$",
+ "(?i)^“?THE RADICAL PREVENTION OF VENEREAL DISEASE\\.?”?$",
+ "(?i)^Graph and Table of Infectious Diseases?$",
+ "^Presentation$",
+ "(?i)^Reviews and Information on Publications$",
+ "(?i)^PUBLIC HEALTH SERVICES?$",
+ "(?i)^COMBINED TEXT-?BOOK OF OBSTETRICS AND GYN(Æ|ae)COLOGY$",
+ "(?i)^Adrese autora$",
+ "(?i)^Systematic Part .*\\. Catalogus Fossilium Austriae, Band 2: Echinoidea neogenica$",
+ "(?i)^Acknowledgement to Referees$",
+ "(?i)^Behçet's disease\\.?$",
+ "(?i)^Isolation and identification of restriction endonuclease.*$",
+ "(?i)^CEREBROVASCULAR DISEASES?.?$",
+ "(?i)^Screening for abdominal aortic aneurysms?\\.?$",
+ "^Event management$",
+ "(?i)^Breakfast and Crohn's disease.*\\.?$",
+ "^Cálculo de concentraciones en disoluciones acuosas. Ejercicio interactivo\\..*\\.$",
+ "(?i)^Genetic and functional analyses of SHANK2 mutations suggest a multiple hit model of Autism spectrum disorders?\\.?$",
+ "^Gushi hakubutsugaku$",
+ "^Starobosanski nadpisi u Bosni i Hercegovini \\(.*\\)$",
+ "^Intestinal spirocha?etosis$",
+ "^Treatment of Rodent Ulcer$",
+ "(?i)^\\W*Cloud Computing\\W*$",
+ "^Compendio mathematico : en que se contienen todas las materias mas principales de las Ciencias que tratan de la cantidad$",
+ "^Free Communications, Poster Presentations: Session [A-F]$",
+ "^“The Historical Aspects? of Quackery\\.?”$",
+ "^A designated centre for people with disabilities operated by St John of God Community Services (Limited|Ltd), Louth$",
+ "^P(er|re)-Mile Premiums for Auto Insurance\\.?$",
+ "(?i)^Case Report$",
+ "^Boletín Informativo$",
+ "(?i)^Glioblastoma Multiforme$",
+ "(?i)^Nuevos táxones animales descritos en la península Ibérica y Macaronesia desde 1994 \\(.*\\)$",
+ "^Zaměstnanecké výhody$",
+ "(?i)^The Economics of Terrorism and Counter-Terrorism: A Survey \\(Part .*\\)$",
+ "(?i)^Carotid body tumours?\\.?$",
+ "(?i)^\\[Españoles en Francia : La condición Emigrante.*\\]$",
+ "^Avant-propos$",
+ "(?i)^St\\. Patrick's Cathedral, Dublin, County Dublin - Head(s)? and Capital(s)?$",
+ "(?i)^St\\. Patrick's Cathedral, Dublin, County Dublin - Bases?$",
+ "(?i)^PUBLIC HEALTH VERSUS THE STATE$",
+ "^Viñetas de Cortázar$",
+ "(?i)^Search for heavy neutrinos and W(\\[|_|\\(|_\\{|-)?R(\\]|\\)|\\})? bosons with right-handed couplings in a left-right symmetric model in pp collisions at.*TeV(\\.)?$",
+ "(?i)^Measurement of the pseudorapidity and centrality dependence of the transverse energy density in Pb(-?)Pb collisions at.*tev(\\.?)$",
+ "(?i)^Search for resonances decaying into top-quark pairs using fully hadronic decays in pp collisions with ATLAS at.*TeV$",
+ "(?i)^Search for neutral minimal supersymmetric standard model Higgs bosons decaying to tau pairs in pp collisions at.*tev$",
+ "(?i)^Relatório de Estágio (de|em) Angiologia e Cirurgia Vascular$",
+ "^Aus der AGMB$",
+ "^Znanstveno-stručni prilozi$",
+ "(?i)^Zhodnocení finanční situace podniku a návrhy na zlepšení$",
+ "(?i)^Evaluation of the Financial Situation in the Firm and Proposals to its Improvement$",
+ "(?i)^Hodnocení finanční situace podniku a návrhy na její zlepšení$",
+ "^Finanční analýza podniku$",
+ "^Financial analysis( of business)?$",
+ "(?i)^Textbook of Gyn(a)?(Æ)?(e)?cology$",
+ "^Jikken nihon shūshinsho$",
+ "(?i)^CORONER('|s)(s|') INQUESTS$",
+ "(?i)^(Μελέτη παραγόντων )?risk management( για ανάπτυξη και εφαρμογή ενός πληροφοριακού συστήματος| και ανάπτυξη συστήματος)?$",
+ "(?i)^Consultants' contract(s)?$",
+ "(?i)^Upute autorima$",
+ "(?i)^Bijdrage tot de Kennis van den Godsdienst der Dajaks van Lan(d|f)ak en Tajan$",
+ "^Joshi shin kokubun$",
+ "^Kōtō shōgaku dokuhon nōson'yō$",
+ "^Jinjō shōgaku shōka$",
+ "^Shōgaku shūjichō$",
+ "^Nihon joshi dokuhon$",
+ "^Joshi shin dokuhon$",
+ "^Chūtō kanbun dokuhon$",
+ "^Wabun dokuhon$",
+ "(?i)^(Analysis of economy selected village or town|Rozbor hospodaření vybrané obce či města)$",
+ "(?i)^cardiac rehabilitation$",
+ "(?i)^Analytical summary$",
+ "^Thesaurus resolutionum Sacrae Congregationis Concilii$",
+ "(?i)^Sumario analítico(\\s{1})?(Analitic summary)?$",
+ "^Prikazi i osvrti$",
+ "^Rodinný dům s provozovnou$",
+ "^Family house with an establishment$",
+ "^Shinsei chūtō shin kokugun$",
+ "^Pulmonary alveolar proteinosis(\\.?)$",
+ "^Shinshū kanbun$",
+ "^Viñeta(s?) de Rodríguez$",
+ "(?i)^RUBRIKA UREDNIKA$",
+ "^A Matching Model of the Academic Publication Market$",
+ "^Yōgaku kōyō$",
+ "^Internetový marketing$",
+ "^Internet marketing$",
+ "^Chūtō kokugo dokuhon$",
+ "^Kokugo dokuhon$",
+ "^Antibiotic Cover for Dental Extraction(s?)$",
+ "^Strategie podniku$",
+ "^Strategy of an Enterprise$",
+ "(?i)^respiratory disease(s?)(\\.?)$",
+ "^Award(s?) for Gallantry in Civil Defence$",
+ "^Podniková kultura$",
+ "^Corporate Culture$",
+ "^Severe hyponatraemia in hospital inpatient(s?)(\\.?)$",
+ "^Pracovní motivace$",
+ "^Work Motivation$",
+ "^Kaitei kōtō jogaku dokuhon$",
+ "^Konsolidovaná účetní závěrka$",
+ "^Consolidated Financial Statements$",
+ "(?i)^intracranial tumour(s?)$",
+ "^Climate Change Mitigation Options and Directed Technical Change: A Decentralized Equilibrium Analysis$",
+ "^\\[CERVECERIAS MAHOU(\\.|\\:) INTERIOR\\] \\[Material gráfico\\]$",
+ "^Housing Market Dynamics(\\:|\\.) On the Contribution of Income Shocks and Credit Constraint(s?)$",
+ "^\\[Funciones auxiliares de la música en Radio París,.*\\]$",
+ "^Úroveň motivačního procesu jako způsobu vedení lidí$",
+ "^The level of motivation process as a leadership$",
+ "^Pay-beds in N(\\.?)H(\\.?)S(\\.?) Hospitals$",
+ "(?i)^news and events$",
+ "(?i)^NOVOSTI I DOGAĐAJI$",
+ "^Sansū no gakushū$",
+ "^Posouzení informačního systému firmy a návrh změn$",
+ "^Information System Assessment and Proposal for ICT Modification$",
+ "^Stresové zatížení pracovníků ve vybrané profesi$",
+ "^Stress load in a specific job$",
+ "^Sunday: Poster Sessions, Pt.*$",
+ "^Monday: Poster Sessions, Pt.*$",
+ "^Wednesday: Poster Sessions, Pt.*",
+ "^Tuesday: Poster Sessions, Pt.*$",
+ "^Analýza reklamy$",
+ "^Analysis of advertising$",
+ "^Shōgaku shūshinsho$",
+ "^Shōgaku sansū$",
+ "^Shintei joshi kokubun$",
+ "^Taishō joshi kokubun dokuhon$",
+ "^Joshi kokubun$",
+ "^Účetní uzávěrka a účetní závěrka v ČR$",
+ "(?i)^The \"?Causes\"? of Cancer$",
+ "^Normas para la publicación de artículos$",
+ "^Editor('|s)(s|') [Rr]eply$",
+ "^Editor(’|s)(s|’) letter$",
+ "^Redaktoriaus žodis$",
+ "^DISCUSSION ON THE PRECEDING PAPER$",
+ "^Kōtō shōgaku shūshinsho jidōyō$",
+ "^Shōgaku nihon rekishi$",
+ "^(Theory of the flow of action currents in isolated myelinated nerve fibers).*$",
+ "^Préface$",
+ "^Occupational [Hh]ealth [Ss]ervices.$",
+ "^In Memoriam Professor Toshiyuki TAKESHIMA$",
+ "^Účetní závěrka ve vybraném podniku.*$",
+ "^Financial statements in selected company$",
+ "^Abdominal [Aa]ortic [Aa]neurysms.*$",
+ "^Pseudomyxoma peritonei$",
+ "^Kazalo autora$",
+ "(?i)^uvodna riječ$",
+ "^Motivace jako způsob vedení lidí$",
+ "^Motivation as a leadership$",
+ "^Polyfunkční dům$",
+ "^Multi\\-funkcional building$",
+ "^Podnikatelský plán$",
+ "(?i)^Podnikatelský záměr$",
+ "(?i)^Business Plan$",
+ "^Oceňování nemovitostí$",
+ "^Marketingová komunikace$",
+ "^Marketing communication$",
+ "^Sumario Analítico$",
+ "^Riječ uredništva$",
+ "^Savjetovanja i priredbe$",
+ "^Índice$",
+ "^(Starobosanski nadpisi).*$",
+ "^Vzdělávání pracovníků v organizaci$",
+ "^Staff training in organization$",
+ "^(Life Histories of North American Geometridae).*$",
+ "^Strategická analýza podniku$",
+ "^Strategic Analysis of an Enterprise$",
+ "^Sadržaj$",
+ "^Upute suradnicima$",
+ "^Rodinný dům$",
+ "(?i)^Fami(l)?ly house$",
+ "^Upute autorima$",
+ "^Strategic Analysis$",
+ "^Finanční analýza vybraného podniku$",
+ "^Finanční analýza$",
+ "^Riječ urednika$",
+ "(?i)^Content(s?)$",
+ "(?i)^Inhalt$",
+ "^Jinjō shōgaku shūshinsho jidōyō$",
+ "(?i)^Index$",
+ "^Chūgaku kokubun kyōkasho$",
+ "^Retrato de una mujer$",
+ "^Retrato de un hombre$",
+ "^Kōtō shōgaku dokuhon$",
+ "^Shotōka kokugo$",
+ "^Shōgaku dokuhon$",
+ "^Jinjō shōgaku kokugo dokuhon$",
+ "^Shinsei kokugo dokuhon$",
+ "^Teikoku dokuhon$",
+ "^Instructions to Authors$",
+ "^KİTAP TAHLİLİ$",
+ "^PRZEGLĄD PIŚMIENNICTWA$",
+ "(?i)^Presentación$",
+ "^İçindekiler$",
+ "(?i)^Tabl?e of contents$",
+ "^(CODICE DEL BEATO DE LOS REYES FERNANDO I Y SANCHA).*$",
+ "^(\\[MADRID\\. BIBL\\. NAC\\. N.*KING FERDINAND I.*FROM SAN ISIDORO DE LEON\\. FACUNDUS SCRIPSIT DATED.*\\]).*",
+ "^Editorial( Board)?$",
+ "(?i)^Editorial \\(English\\)$",
+ "^Editörden$",
+ "^(Corpus Oral Dialectal \\(COD\\)\\.).*$",
+ "^(Kiri Karl Morgensternile).*$",
+ "^(\\[Eksliibris Aleksandr).*\\]$",
+ "^(\\[Eksliibris Aleksandr).*$",
+ "^(Eksliibris Aleksandr).*$",
+ "^(Kiri A\\. de Vignolles).*$",
+ "^(2 kirja Karl Morgensternile).*$",
+ "^(Pirita kloostri idaosa arheoloogilised).*$",
+ "^(Kiri tundmatule).*$",
+ "^(Kiri Jenaer Allgemeine Literaturzeitung toimetusele).*$",
+ "^(Eksliibris Nikolai Birukovile).*$",
+ "^(Eksliibris Nikolai Issakovile).*$",
+ "^(WHP Cruise Summary Information of section).*$",
+ "^(Measurement of the top quark\\-pair production cross section with ATLAS in pp collisions at).*$",
+ "^(Measurement of the spin\\-dependent structure function).*",
+ "(?i)^.*authors['’′]? reply\\.?$",
+ "(?i)^.*authors['’′]? response\\.?$"
+ ]
+ },
+ "synonyms": {}
+ }
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/pom.xml b/dhp-workflows/dhp-graph-mapper/pom.xml
index ff7450663..641fbd933 100644
--- a/dhp-workflows/dhp-graph-mapper/pom.xml
+++ b/dhp-workflows/dhp-graph-mapper/pom.xml
@@ -1,5 +1,6 @@
-
+
dhp-workflows
eu.dnetlib.dhp
@@ -11,6 +12,11 @@
+
+ commons-io
+ commons-io
+
+
org.apache.spark
spark-core_2.11
@@ -34,6 +40,10 @@
com.jayway.jsonpath
json-path
+
+ org.mongodb
+ mongo-java-driver
+
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ImportDataFromMongo.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ImportDataFromMongo.java
new file mode 100644
index 000000000..8872cf696
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/ImportDataFromMongo.java
@@ -0,0 +1,103 @@
+package eu.dnetlib.dhp.graph;
+
+import com.mongodb.*;
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.message.Message;
+import eu.dnetlib.message.MessageType;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class ImportDataFromMongo {
+
+
+ public static void main(String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGraphImporterJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/import_from_mongo_parameters.json")));
+ parser.parseArgument(args);
+ final int port = Integer.parseInt(parser.get("dbport"));
+ final String host = parser.get("dbhost");
+
+ final String format = parser.get("format");
+ final String layout = parser.get("layout");
+ final String interpretation = parser.get("interpretation");
+
+ final String dbName = parser.get("dbName");
+
+
+ final MongoClient client = new MongoClient(host, port);
+
+ MongoDatabase database = client.getDatabase(dbName);
+
+ MongoCollection metadata = database.getCollection("metadata");
+ MongoCollection metadataManager = database.getCollection("metadataManager");
+ final DBObject query = QueryBuilder.start("format").is(format).and("layout").is(layout).and("interpretation").is(interpretation).get();
+ final List ids = new ArrayList<>();
+ metadata.find((Bson) query).forEach((Consumer) document -> ids.add(document.getString("mdId")));
+ List databaseId = ids.stream().map(it -> getCurrentId(it, metadataManager)).filter(Objects::nonNull).collect(Collectors.toList());
+ final String hdfsuri = parser.get("namenode");
+ // ====== Init HDFS File System Object
+ Configuration conf = new Configuration();
+ // Set FileSystem URI
+ conf.set("fs.defaultFS", hdfsuri);
+ // Because of Maven
+ conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
+ conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
+
+ System.setProperty("HADOOP_USER_NAME", parser.get("user"));
+ System.setProperty("hadoop.home.dir", "/");
+ FileSystem.get(URI.create(hdfsuri), conf);
+ Path hdfswritepath = new Path(parser.get("targetPath"));
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ try (SequenceFile.Writer writer = SequenceFile.createWriter(conf,
+ SequenceFile.Writer.file(hdfswritepath), SequenceFile.Writer.keyClass(IntWritable.class),
+ SequenceFile.Writer.valueClass(Text.class))) {
+ final IntWritable key = new IntWritable(counter.get());
+ final Text value = new Text();
+ databaseId.forEach(id -> {
+ System.out.println("Reading :"+id);
+ MongoCollection collection = database.getCollection(id);
+ collection.find().forEach((Consumer) document ->
+ {
+ key.set(counter.getAndIncrement());
+ value.set(document.getString("body"));
+
+ if (counter.get() % 10000 == 0) {
+ System.out.println("Added "+counter.get());
+ }
+ try {
+ writer.append(key, value);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ );
+ });
+ }
+ }
+
+
+ private static String getCurrentId(final String mdId, final MongoCollection metadataManager) {
+ FindIterable result = metadataManager.find((Bson) QueryBuilder.start("mdId").is(mdId).get());
+ final Document item = result.first();
+ return item == null ? null : item.getString("currentId");
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerMergeEntitiesJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerMergeEntitiesJob.java
index b320fd51c..54496671f 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerMergeEntitiesJob.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/SparkScholexplorerMergeEntitiesJob.java
@@ -5,7 +5,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.JsonPath;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.graph.SparkGraphImporterJob;
-import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset;
import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication;
@@ -17,10 +16,8 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/AbstractScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/AbstractScholexplorerParser.java
index 0ba7b25ee..5277f794b 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/AbstractScholexplorerParser.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/AbstractScholexplorerParser.java
@@ -82,7 +82,7 @@ public abstract class AbstractScholexplorerParser {
}
protected String generateId(final String pid, final String pidType, final String entityType) {
- String type = "50|";
+ String type;
switch (entityType){
case "publication":
type = "50|";
@@ -100,7 +100,7 @@ public abstract class AbstractScholexplorerParser {
if ("dnet".equalsIgnoreCase(pidType))
return type+StringUtils.substringAfter(pid, "::");
- return type+ DHPUtils.md5(String.format("%s::%s", pid, pidType));
+ return type+ DHPUtils.md5(String.format("%s::%s", pid.toLowerCase().trim(), pidType.toLowerCase().trim()));
}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/DatasetScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/DatasetScholexplorerParser.java
index 578b18085..3a671e6a1 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/DatasetScholexplorerParser.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/DatasetScholexplorerParser.java
@@ -11,6 +11,7 @@ import eu.dnetlib.dhp.schema.scholexplorer.ProvenaceInfo;
import eu.dnetlib.dhp.parser.utility.VtdUtilityParser.Node;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import java.util.ArrayList;
import java.util.Arrays;
@@ -37,10 +38,6 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
di.setInvisible(false);
parsedObject.setDataInfo(di);
-
- final String objIdentifier = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='objIdentifier']");
- parsedObject.setId("60|" + StringUtils.substringAfter(objIdentifier, "::"));
-
parsedObject.setOriginalId(Collections.singletonList(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='recordIdentifier']")));
@@ -112,12 +109,16 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
final List identifierType =
VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='resource']/*[local-name()='identifier']", Collections.singletonList("identifierType"));
- StructuredProperty currentPid = extractIdentifier(identifierType, "type");
+ StructuredProperty currentPid = extractIdentifier(identifierType, "identifierType");
if (currentPid == null) return null;
inferPid(currentPid);
parsedObject.setPid(Collections.singletonList(currentPid));
+ final String sourceId = generateId(currentPid.getValue(), currentPid.getQualifier().getClassid(), "dataset");
+ parsedObject.setId(sourceId);
+
+
List descs = VtdUtilityParser.getTextValue(ap, vn, "//*[local-name()='description']");
if (descs != null && descs.size() > 0)
parsedObject.setDescription(descs.stream()
@@ -149,15 +150,20 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
final String targetId = generateId(relatedPid, relatedPidType, relatedType);
r.setTarget(targetId);
r.setRelType(relationSemantic);
+ r.setRelClass("datacite");
r.setCollectedFrom(parsedObject.getCollectedfrom());
+ r.setDataInfo(di);
rels.add(r);
r = new Relation();
+ r.setDataInfo(di);
r.setSource(targetId);
r.setTarget(parsedObject.getId());
r.setRelType(inverseRelation);
+ r.setRelClass("datacite");
r.setCollectedFrom(parsedObject.getCollectedfrom());
rels.add(r);
- result.add(createUnknownObject(relatedPid, relatedPidType, parsedObject.getCollectedfrom().get(0), di));
+ if("unknown".equalsIgnoreCase(relatedType))
+ result.add(createUnknownObject(relatedPid, relatedPidType, parsedObject.getCollectedfrom().get(0), di));
return rels.stream();
}).collect(Collectors.toList()));
}
@@ -185,6 +191,13 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
parsedObject.setSubject(subjects);
+ Qualifier q = new Qualifier();
+ q.setClassname("dataset");
+ q.setClassid("dataset");
+ q.setSchemename("dataset");
+ q.setSchemeid("dataset");
+ parsedObject.setResulttype(q);
+
parsedObject.setCompletionStatus(completionStatus);
final List creators = VtdUtilityParser.getTextValue(ap, vn, "//*[local-name()='resource']//*[local-name()='creator']/*[local-name()='creatorName']");
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/PublicationScholexplorerParser.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/PublicationScholexplorerParser.java
index 6e3221da5..45ef2066b 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/PublicationScholexplorerParser.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/scholexplorer/parser/PublicationScholexplorerParser.java
@@ -36,9 +36,6 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser
di.setDeletedbyinference(false);
di.setInvisible(false);
- final String objIdentifier = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='objIdentifier']");
- parsedObject.setId("50|" + StringUtils.substringAfter(objIdentifier, "::"));
-
parsedObject.setDateofcollection(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='dateOfCollection']"));
final String resolvedDate = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='resolvedDate']");
@@ -63,6 +60,8 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser
if (currentPid == null) return null;
inferPid(currentPid);
parsedObject.setPid(Collections.singletonList(currentPid));
+ final String sourceId = generateId(currentPid.getValue(), currentPid.getQualifier().getClassid(), "publication");
+ parsedObject.setId(sourceId);
String provisionMode = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='provisionMode']");
@@ -136,12 +135,12 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser
r.setDataInfo(di);
rels.add(r);
r = new Relation();
+ r.setDataInfo(di);
r.setSource(targetId);
r.setTarget(parsedObject.getId());
r.setRelType(inverseRelation);
- r.setCollectedFrom(parsedObject.getCollectedfrom());
- r.setDataInfo(di);
r.setRelClass("datacite");
+ r.setCollectedFrom(parsedObject.getCollectedfrom());
rels.add(r);
return rels.stream();
@@ -217,7 +216,13 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser
parsedObject.setDataInfo(di);
-
+ parsedObject.setSubject(subjects);
+ Qualifier q = new Qualifier();
+ q.setClassname("publication");
+ q.setClassid("publication");
+ q.setSchemename("publication");
+ q.setSchemeid("publication");
+ parsedObject.setResulttype(q);
result.add(parsedObject);
return result;
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/mergeentities/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/ConvertXMLToEntities/oozie_app/config-default.xml
similarity index 100%
rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/mergeentities/oozie_app/config-default.xml
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/ConvertXMLToEntities/oozie_app/config-default.xml
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/mergeentities/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/ConvertXMLToEntities/oozie_app/workflow.xml
similarity index 72%
rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/mergeentities/oozie_app/workflow.xml
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/ConvertXMLToEntities/oozie_app/workflow.xml
index 102587ab0..a1faaa0f5 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/mergeentities/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/ConvertXMLToEntities/oozie_app/workflow.xml
@@ -8,10 +8,6 @@
targetPath
the source path
-
- targetDir
- the name of the path
-
sparkDriverMemory
memory for driver process
@@ -26,15 +22,22 @@
entity
- the entity to be merged
+ the entity type
-
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+
+
@@ -42,15 +45,10 @@
${nameNode}
yarn-cluster
cluster
- Merge ${entity}
- eu.dnetlib.dhp.graph.scholexplorer.SparkScholexplorerMergeEntitiesJob
+ Import ${entity} and related entities
+ eu.dnetlib.dhp.graph.scholexplorer.SparkScholexplorerGraphImporter
dhp-graph-mapper-${projectVersion}.jar
-
- --executor-memory ${sparkExecutorMemory}
- --driver-memory=${sparkDriverMemory}
- --num-executors 100
- --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2"
-
+ --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT}
-mt yarn-cluster
--sourcePath${sourcePath}
--targetPath${targetPath}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/extractentities/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/Extractentities/oozie_app/config-default.xml
similarity index 100%
rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/extractentities/oozie_app/config-default.xml
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/Extractentities/oozie_app/config-default.xml
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/extractentities/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/Extractentities/oozie_app/workflow.xml
similarity index 70%
rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/extractentities/oozie_app/workflow.xml
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/Extractentities/oozie_app/workflow.xml
index ef968b0cd..6caa8b1c3 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/extractentities/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/Extractentities/oozie_app/workflow.xml
@@ -20,23 +20,34 @@
sparkExecutorMemory
memory for individual executor
-
- sparkExecutorCores
- number of cores used by single executor
-
entities
the entities to be extracted
-
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
${jobTracker}
${nameNode}
@@ -47,12 +58,8 @@
dhp-graph-mapper-${projectVersion}.jar
--executor-memory ${sparkExecutorMemory}
- --driver-memory=${sparkDriverMemory}
- --num-executors 100
-
-
-
- --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2"
+ --driver-memory=${sparkDriverMemory}
+ ${sparkExtraOPT}
-mt yarn-cluster
--sourcePath${sourcePath}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/ImportMongoToHDFS/oozie_app/config-default.xml
similarity index 100%
rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/oozie_app/config-default.xml
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/ImportMongoToHDFS/oozie_app/config-default.xml
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/ImportMongoToHDFS/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/ImportMongoToHDFS/oozie_app/workflow.xml
new file mode 100644
index 000000000..f3c9a4ecb
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/ImportMongoToHDFS/oozie_app/workflow.xml
@@ -0,0 +1,73 @@
+
+
+
+ workingPath
+ the working dir base path
+
+
+ targetPath
+ the graph Raw base path
+
+
+ format
+ the postgres URL to access to the database
+
+
+ layout
+ the user postgres
+
+
+ interpretation
+ the password postgres
+
+
+ dbhost
+ mongoDB url, example: mongodb://[username:password@]host[:port]
+
+
+ dbName
+ mongo database
+
+
+ user
+ HDFS user
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+ eu.dnetlib.dhp.graph.ImportDataFromMongo
+ -t${targetPath}
+ -n${nameNode}
+ -u${user}
+ -h${dbhost}
+ -p27017
+ -dn${dbName}
+ -f${format}
+ -l${layout}
+ -i${interpretation}
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/MergeEntities/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/MergeEntities/oozie_app/config-default.xml
new file mode 100644
index 000000000..6fb2a1253
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/MergeEntities/oozie_app/config-default.xml
@@ -0,0 +1,10 @@
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/MergeEntities/oozie_app/workflow.xml
similarity index 53%
rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/oozie_app/workflow.xml
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/MergeEntities/oozie_app/workflow.xml
index 3efb90ae4..d04e76b2a 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/scholexplorer/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/Application/MergeEntities/oozie_app/workflow.xml
@@ -16,43 +16,41 @@
sparkExecutorMemory
memory for individual executor
-
- sparkExecutorCores
- number of cores used by single executor
-
entity
- the entity type
+ the entity to be merged
-
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
+
+
+
+
+
+
+
+
+
+
+
${jobTracker}
${nameNode}
yarn-cluster
cluster
- Import ${entity} and related entities
- eu.dnetlib.dhp.graph.scholexplorer.SparkScholexplorerGraphImporter
+ Merge ${entity}
+ eu.dnetlib.dhp.graph.scholexplorer.SparkScholexplorerMergeEntitiesJob
dhp-graph-mapper-${projectVersion}.jar
-
- --executor-memory ${sparkExecutorMemory}
- --driver-memory=${sparkDriverMemory}
- --num-executors 100
-
-
-
- --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2"
-
+ --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT}
-mt yarn-cluster
- --sourcePath${sourcePath}
- --targetPath${targetPath}
+ --sourcePath${sourcePath}/${entity}
+ --targetPath${targetPath}/${entity}
--entity${entity}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/import_from_mongo_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/import_from_mongo_parameters.json
new file mode 100644
index 000000000..9032be287
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/import_from_mongo_parameters.json
@@ -0,0 +1,12 @@
+[
+ {"paramName":"n", "paramLongName":"namenode", "paramDescription": "the name node", "paramRequired": true},
+ {"paramName":"u", "paramLongName":"user", "paramDescription": "the name node", "paramRequired": true},
+ {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the name node", "paramRequired": true},
+ {"paramName":"h", "paramLongName":"dbhost", "paramDescription": "the mongo host", "paramRequired": true},
+ {"paramName":"p", "paramLongName":"dbport", "paramDescription": "the mongo port", "paramRequired": true},
+ {"paramName":"f", "paramLongName":"format", "paramDescription": "the metadata format to import", "paramRequired": true},
+ {"paramName":"l", "paramLongName":"layout", "paramDescription": "the metadata layout to import", "paramRequired": true},
+ {"paramName":"i", "paramLongName":"interpretation", "paramDescription": "the metadata interpretation to import", "paramRequired": true},
+ {"paramName":"dn", "paramLongName":"dbName", "paramDescription": "the database Name", "paramRequired": true}
+
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml
deleted file mode 100644
index 24090a245..000000000
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml
+++ /dev/null
@@ -1,51 +0,0 @@
-
-
-
- sourcePath
- the source path
-
-
- hive_db_name
- the target hive database name
-
-
- sparkDriverMemory
- memory for driver process
-
-
- sparkExecutorMemory
- memory for individual executor
-
-
- sparkExecutorCores
- number of cores used by single executor
-
-
-
-
-
-
- Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
-
-
-
- ${jobTracker}
- ${nameNode}
- yarn-cluster
- cluster
- MapGraphIntoDataFrame
- eu.dnetlib.dhp.graph.SparkGraphImporterJob
- dhp-graph-mapper-${projectVersion}.jar
- --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"
- -mt yarn-cluster
- --sourcePath${sourcePath}
- --hive_db_name${hive_db_name}
- --hive_metastore_uris${hive_metastore_uris}
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/ImportDataFromMongoTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/ImportDataFromMongoTest.java
new file mode 100644
index 000000000..50248c83d
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/ImportDataFromMongoTest.java
@@ -0,0 +1,22 @@
+package eu.dnetlib.dhp.graph;
+
+import org.junit.Test;
+
+public class ImportDataFromMongoTest {
+
+ @Test
+ public void doTest() throws Exception {
+ ImportDataFromMongo.main(new String[] {
+ "-h", "localhost",
+ "-p", "2800",
+ "-f", "PMF",
+ "-l", "store",
+ "-i", "cleaned",
+ "-dn", "mdstore_dli",
+ "-n", "file:///home/sandro/test.seq",
+ "-u", "sandro",
+ "-t", "file:///home/sandro/test.seq"
+ });
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/scholexplorer/ScholexplorerParserTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/scholexplorer/ScholexplorerParserTest.java
new file mode 100644
index 000000000..e87bc8913
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/graph/scholexplorer/ScholexplorerParserTest.java
@@ -0,0 +1,38 @@
+package eu.dnetlib.dhp.graph.scholexplorer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import eu.dnetlib.dhp.graph.scholexplorer.parser.DatasetScholexplorerParser;
+import eu.dnetlib.dhp.schema.oaf.Oaf;
+import org.apache.commons.io.IOUtils;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+public class ScholexplorerParserTest {
+
+
+ @Test
+ public void testDataciteParser() throws IOException {
+ String xml = IOUtils.toString(this.getClass().getResourceAsStream("dmf.xml"));
+
+ DatasetScholexplorerParser p = new DatasetScholexplorerParser();
+ List oaves = p.parseObject(xml);
+
+ ObjectMapper m = new ObjectMapper();
+ m.enable(SerializationFeature.INDENT_OUTPUT);
+
+
+ oaves.forEach(oaf -> {
+ try {
+ System.out.println(m.writeValueAsString(oaf));
+ System.out.println("----------------------------");
+ } catch (JsonProcessingException e) {
+
+ }
+ });
+
+ }
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/graph/scholexplorer/dmf.xml b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/graph/scholexplorer/dmf.xml
new file mode 100644
index 000000000..58defb67b
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/graph/scholexplorer/dmf.xml
@@ -0,0 +1,66 @@
+
+
+
+ aaadf8b3-01a8-4cc2-9964-63cfb19df3b4_UmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZXMvUmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZVR5cGU=
+ oai:pangaea.de:doi:10.1594/PANGAEA.821876
+ r3d100010134
+ r3d100010134::000083be706192d2d839915694ecfd47
+2020-01-08T04:12:12.287
+ 2020-01-08T03:24:10.865Z
+
+ oai:pangaea.de:doi:10.1594/PANGAEA.821876
+ citable
+
+
+
+ 10.1594/pangaea.821876
+ Macke, AndreasKalisch, John
+ Total Sky Imager observations during POLARSTERN cruise ANT-XXVI/4 on 2010-05-14 with links to images
+
+PANGAEA - Data Publisher for Earth & Environmental Science
+
+ 2010-05-14T00:13:47/2010-05-14T23:55:47
+
+
+
+ DATE/TIME
+
+ LATITUDE
+
+ LONGITUDE
+
+ Uniform resource locator/link to image
+
+ Total Sky Imager
+
+ ANT-XXVI/4
+
+ Polarstern
+
+
+ dataset
+
+
+ dli_resolver::cf447a378b0b6603593f8b0e57242695
+
+ http://hs.pangaea.de/images/airphoto/ps/ps75/2010-05-14/ant-xxvi_4_2010-05-14_tsi-images-links.zip
+
+ dli_resolver::f0f5975d20991cffd222c6002ddd5821
+
+
+
+
+
+
+ complete
+
+
+
+
+
+
+
+
diff --git a/pom.xml b/pom.xml
index 5323276aa..ada3a33a4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -138,6 +138,12 @@
commons-io
2.4
+
+ org.mongodb
+ mongo-java-driver
+ 3.4.2
+
+
commons-cli
@@ -200,7 +206,7 @@
eu.dnetlib
dnet-pace-core
- 4.0.0-SNAPSHOT
+ 4.0.0
@@ -418,7 +424,7 @@
UTF-8
UTF-8
3.6.0
- 2.22.2
+ 2.22.2
cdh5.9.2
2.6.0-${dhp.cdh.version}
4.1.0-${dhp.cdh.version}