diff --git a/dhp-workflows/dhp-aggregation/pom.xml b/dhp-workflows/dhp-aggregation/pom.xml
index 8d4d880b3..3e7b1a375 100644
--- a/dhp-workflows/dhp-aggregation/pom.xml
+++ b/dhp-workflows/dhp-aggregation/pom.xml
@@ -99,23 +99,11 @@
jaxen
jaxen
-
-
- org.mongodb
- mongo-java-driver
-
-
org.apache.hadoop
hadoop-distcp
-
-
- org.postgresql
- postgresql
- 42.2.10
-
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step3/DispatchEntitiesApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step3/DispatchEntitiesApplication.java
deleted file mode 100644
index 4ee24cba0..000000000
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step3/DispatchEntitiesApplication.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package eu.dnetlib.dhp.migration.step3;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.SparkSession;
-
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication;
-import eu.dnetlib.dhp.schema.oaf.Dataset;
-import eu.dnetlib.dhp.schema.oaf.Datasource;
-import eu.dnetlib.dhp.schema.oaf.Organization;
-import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
-import eu.dnetlib.dhp.schema.oaf.Project;
-import eu.dnetlib.dhp.schema.oaf.Publication;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-import eu.dnetlib.dhp.schema.oaf.Software;
-
-public class DispatchEntitiesApplication {
-
- private static final Log log = LogFactory.getLog(DispatchEntitiesApplication.class);
-
- public static void main(final String[] args) throws Exception {
- final ArgumentApplicationParser parser = new ArgumentApplicationParser(
- IOUtils.toString(MigrateMongoMdstoresApplication.class
- .getResourceAsStream("/eu/dnetlib/dhp/migration/dispatch_entities_parameters.json")));
- parser.parseArgument(args);
-
- try (final SparkSession spark = newSparkSession(parser); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) {
-
- final String sourcePath = parser.get("sourcePath");
- final String targetPath = parser.get("graphRawPath");
-
- processEntity(sc, Publication.class, sourcePath, targetPath);
- processEntity(sc, Dataset.class, sourcePath, targetPath);
- processEntity(sc, Software.class, sourcePath, targetPath);
- processEntity(sc, OtherResearchProduct.class, sourcePath, targetPath);
- processEntity(sc, Datasource.class, sourcePath, targetPath);
- processEntity(sc, Organization.class, sourcePath, targetPath);
- processEntity(sc, Project.class, sourcePath, targetPath);
- processEntity(sc, Relation.class, sourcePath, targetPath);
- }
- }
-
- private static SparkSession newSparkSession(final ArgumentApplicationParser parser) {
- return SparkSession
- .builder()
- .appName(DispatchEntitiesApplication.class.getSimpleName())
- .master(parser.get("master"))
- .getOrCreate();
- }
-
- private static void processEntity(final JavaSparkContext sc, final Class> clazz, final String sourcePath, final String targetPath) {
- final String type = clazz.getSimpleName().toLowerCase();
-
- log.info(String.format("Processing entities (%s) in file: %s", type, sourcePath));
-
- sc.textFile(sourcePath)
- .filter(l -> isEntityType(l, type))
- .map(l -> StringUtils.substringAfter(l, "|"))
- .saveAsTextFile(targetPath + "/" + type, GzipCodec.class); // use repartition(XXX) ???
- }
-
- private static boolean isEntityType(final String line, final String type) {
- return StringUtils.substringBefore(line, "|").equalsIgnoreCase(type);
- }
-
-}
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_all_steps/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_all_steps/oozie_app/workflow.xml
deleted file mode 100644
index 42ab59822..000000000
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_all_steps/oozie_app/workflow.xml
+++ /dev/null
@@ -1,197 +0,0 @@
-
-
-
-
- workingPath
- /tmp/dhp_migration
- the base path to store temporary intermediate data
-
-
- graphBasePath
- the target path to store raw graph
-
-
- reuseContent
- false
- should import content from the aggregator or reuse a previous version
-
-
- postgresURL
- the postgres URL to access to the database
-
-
- postgresUser
- the user postgres
-
-
- postgresPassword
- the password postgres
-
-
- mongoURL
- mongoDB url, example: mongodb://[username:password@]host[:port]
-
-
- mongoDb
- mongo database
-
-
- sparkDriverMemory
- memory for driver process
-
-
- sparkExecutorMemory
- memory for individual executor
-
-
- sparkExecutorCores
- number of cores used by single executor
-
-
-
-
- ${jobTracker}
- ${nameNode}
-
-
- mapreduce.job.queuename
- ${queueName}
-
-
- oozie.launcher.mapred.job.queue.name
- ${oozieLauncherQueueName}
-
-
-
-
-
-
-
- Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
-
-
-
- ${wf:conf('reuseContent') eq false}
- ${wf:conf('reuseContent') eq true}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication
- -p${workingPath}/db_records
- -pgurl${postgresURL}
- -pguser${postgresUser}
- -pgpasswd${postgresPassword}
-
-
-
-
-
-
-
- eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication
- -p${workingPath}/odf_records
- -mongourl${mongoURL}
- -mongodb${mongoDb}
- -fODF
- -lstore
- -icleaned
-
-
-
-
-
-
-
- eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication
- -p${workingPath}/oaf_records
- -mongourl${mongoURL}
- -mongodb${mongoDb}
- -fOAF
- -lstore
- -icleaned
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- yarn
- cluster
- GenerateEntities
- eu.dnetlib.dhp.migration.step2.GenerateEntitiesApplication
- dhp-aggregation-${projectVersion}.jar
-
- --executor-memory ${sparkExecutorMemory}
- --executor-cores ${sparkExecutorCores}
- --driver-memory=${sparkDriverMemory}
- --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
- --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
- --conf spark.sql.warehouse.dir="/user/hive/warehouse"
-
- -mt yarn-cluster
- -s${workingPath}/db_records,${workingPath}/oaf_records,${workingPath}/odf_records
- -t${workingPath}/all_entities
- -pgurl${postgresURL}
- -pguser${postgresUser}
- -pgpasswd${postgresPassword}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- yarn
- cluster
- GenerateGraph
- eu.dnetlib.dhp.migration.step3.DispatchEntitiesApplication
- dhp-aggregation-${projectVersion}.jar
-
- --executor-memory ${sparkExecutorMemory}
- --executor-cores ${sparkExecutorCores}
- --driver-memory=${sparkDriverMemory}
- --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
- --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
- --conf spark.sql.warehouse.dir="/user/hive/warehouse"
-
- -mt yarn-cluster
- -s${workingPath}/all_entities
- -g${graphBasePath}/graph_raw
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/pom.xml b/dhp-workflows/dhp-graph-mapper/pom.xml
index 9876edc16..d25446bbc 100644
--- a/dhp-workflows/dhp-graph-mapper/pom.xml
+++ b/dhp-workflows/dhp-graph-mapper/pom.xml
@@ -40,6 +40,7 @@
dhp-schemas
${project.version}
+
com.jayway.jsonpath
json-path
@@ -48,6 +49,18 @@
org.mongodb
mongo-java-driver
+
+ dom4j
+ dom4j
+
+
+ jaxen
+ jaxen
+
+
+ org.postgresql
+ postgresql
+
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java
similarity index 90%
rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/AbstractMdRecordToOafMapper.java
rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java
index 18a62124b..7d99a4774 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/AbstractMdRecordToOafMapper.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java
@@ -1,45 +1,15 @@
-package eu.dnetlib.dhp.migration.step2;
-
-import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.createOpenaireId;
-import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.dataInfo;
-import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.field;
-import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.journal;
-import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.keyValue;
-import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.listFields;
-import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.oaiIProvenance;
-import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.qualifier;
-import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.structuredProperty;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+package eu.dnetlib.dhp.oa.graph.raw;
+import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.lang3.StringUtils;
import org.dom4j.Document;
import org.dom4j.DocumentFactory;
import org.dom4j.DocumentHelper;
import org.dom4j.Node;
-import eu.dnetlib.dhp.schema.oaf.Author;
-import eu.dnetlib.dhp.schema.oaf.DataInfo;
-import eu.dnetlib.dhp.schema.oaf.Dataset;
-import eu.dnetlib.dhp.schema.oaf.Field;
-import eu.dnetlib.dhp.schema.oaf.GeoLocation;
-import eu.dnetlib.dhp.schema.oaf.Instance;
-import eu.dnetlib.dhp.schema.oaf.Journal;
-import eu.dnetlib.dhp.schema.oaf.KeyValue;
-import eu.dnetlib.dhp.schema.oaf.OAIProvenance;
-import eu.dnetlib.dhp.schema.oaf.Oaf;
-import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
-import eu.dnetlib.dhp.schema.oaf.Publication;
-import eu.dnetlib.dhp.schema.oaf.Qualifier;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-import eu.dnetlib.dhp.schema.oaf.Result;
-import eu.dnetlib.dhp.schema.oaf.Software;
-import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
+import java.util.*;
+
+import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.*;
public abstract class AbstractMdRecordToOafMapper {
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/DispatchEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/DispatchEntitiesApplication.java
new file mode 100644
index 000000000..0b47db588
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/DispatchEntitiesApplication.java
@@ -0,0 +1,94 @@
+package eu.dnetlib.dhp.oa.graph.raw;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.schema.oaf.*;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FilterFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+public class DispatchEntitiesApplication {
+
+ private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesApplication.class);
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ public static void main(final String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils.toString(MigrateMongoMdstoresApplication.class
+ .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/dispatch_entities_parameters.json")));
+ parser.parseArgument(args);
+
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final String sourcePath = parser.get("sourcePath");
+ final String targetPath = parser.get("graphRawPath");
+
+ SparkConf conf = new SparkConf();
+ runWithSparkSession(conf, isSparkSessionManaged, spark -> {
+ removeOutputDir(spark, targetPath);
+
+ processEntity(spark, Publication.class, sourcePath, targetPath);
+ processEntity(spark, Dataset.class, sourcePath, targetPath);
+ processEntity(spark, Software.class, sourcePath, targetPath);
+ processEntity(spark, OtherResearchProduct.class, sourcePath, targetPath);
+ processEntity(spark, Datasource.class, sourcePath, targetPath);
+ processEntity(spark, Organization.class, sourcePath, targetPath);
+ processEntity(spark, Project.class, sourcePath, targetPath);
+ processEntity(spark, Relation.class, sourcePath, targetPath);
+ });
+ }
+
+ private static void processEntity(final SparkSession spark, final Class clazz, final String sourcePath, final String targetPath) {
+ final String type = clazz.getSimpleName().toLowerCase();
+
+ log.info(String.format("Processing entities (%s) in file: %s", type, sourcePath));
+
+ spark.read()
+ .textFile(sourcePath)
+ .filter((FilterFunction) value -> isEntityType(value, type))
+ .map((MapFunction) value -> StringUtils.substringAfter(value, "|"), Encoders.STRING())
+ .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .parquet(targetPath + "/" + type);
+
+ /*
+ JavaSparkContext.fromSparkContext(spark.sparkContext())
+ .textFile(sourcePath)
+ .filter(l -> isEntityType(l, type))
+ .map(l -> StringUtils.substringAfter(l, "|"))
+ .saveAsTextFile(targetPath + "/" + type, GzipCodec.class); // use repartition(XXX) ???
+
+ */
+ }
+
+ private static boolean isEntityType(final String line, final String type) {
+ return StringUtils.substringBefore(line, "|").equalsIgnoreCase(type);
+ }
+
+ private static void removeOutputDir(SparkSession spark, String path) {
+ HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java
similarity index 70%
rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/GenerateEntitiesApplication.java
rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java
index 7f907b0c8..a9f331f53 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/GenerateEntitiesApplication.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java
@@ -1,13 +1,10 @@
-package eu.dnetlib.dhp.migration.step2;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
+package eu.dnetlib.dhp.oa.graph.raw;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.oa.graph.raw.common.DbClient;
+import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
@@ -16,37 +13,38 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication;
-import eu.dnetlib.dhp.migration.utils.DbClient;
-import eu.dnetlib.dhp.schema.oaf.Dataset;
-import eu.dnetlib.dhp.schema.oaf.Datasource;
-import eu.dnetlib.dhp.schema.oaf.Oaf;
-import eu.dnetlib.dhp.schema.oaf.Organization;
-import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
-import eu.dnetlib.dhp.schema.oaf.Project;
-import eu.dnetlib.dhp.schema.oaf.Publication;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-import eu.dnetlib.dhp.schema.oaf.Software;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.Tuple2;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
public class GenerateEntitiesApplication {
- private static final Log log = LogFactory.getLog(GenerateEntitiesApplication.class);
+ private static final Logger log = LoggerFactory.getLogger(GenerateEntitiesApplication.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils.toString(MigrateMongoMdstoresApplication.class
- .getResourceAsStream("/eu/dnetlib/dhp/migration/generate_entities_parameters.json")));
+ .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json")));
parser.parseArgument(args);
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
final String sourcePaths = parser.get("sourcePaths");
final String targetPath = parser.get("targetPath");
@@ -56,31 +54,27 @@ public class GenerateEntitiesApplication {
final Map code2name = loadClassNames(dbUrl, dbUser, dbPassword);
- try (final SparkSession spark = newSparkSession(parser); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) {
- final List existingSourcePaths = Arrays.stream(sourcePaths.split(",")).filter(p -> exists(sc, p)).collect(Collectors.toList());
- generateEntities(sc, code2name, existingSourcePaths, targetPath);
- }
+ SparkConf conf = new SparkConf();
+ runWithSparkSession(conf, isSparkSessionManaged, spark -> {
+ removeOutputDir(spark, targetPath);
+ generateEntities(spark, code2name, sourcePaths, targetPath);
+ });
}
- private static SparkSession newSparkSession(final ArgumentApplicationParser parser) {
- return SparkSession
- .builder()
- .appName(GenerateEntitiesApplication.class.getSimpleName())
- .master(parser.get("master"))
- .getOrCreate();
- }
-
- private static void generateEntities(final JavaSparkContext sc,
+ private static void generateEntities(final SparkSession spark,
final Map code2name,
- final List sourcePaths,
+ final String sourcePaths,
final String targetPath) {
+ JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+ final List existingSourcePaths = Arrays.stream(sourcePaths.split(",")).filter(p -> exists(sc, p)).collect(Collectors.toList());
+
log.info("Generate entities from files:");
- sourcePaths.forEach(log::info);
+ existingSourcePaths.forEach(log::info);
JavaRDD inputRdd = sc.emptyRDD();
- for (final String sp : sourcePaths) {
+ for (final String sp : existingSourcePaths) {
inputRdd = inputRdd.union(sc.sequenceFile(sp, Text.class, Text.class)
.map(k -> new Tuple2<>(k._1().toString(), k._2().toString()))
.map(k -> convertToListOaf(k._1(), k._2(), code2name))
@@ -88,7 +82,8 @@ public class GenerateEntitiesApplication {
.map(oaf -> oaf.getClass().getSimpleName().toLowerCase() + "|" + convertToJson(oaf)));
}
- inputRdd.saveAsTextFile(targetPath, GzipCodec.class);
+ inputRdd
+ .saveAsTextFile(targetPath, GzipCodec.class);
}
@@ -163,11 +158,15 @@ public class GenerateEntitiesApplication {
private static boolean exists(final JavaSparkContext context, final String pathToFile) {
try {
- final FileSystem hdfs = org.apache.hadoop.fs.FileSystem.get(context.hadoopConfiguration());
+ final FileSystem hdfs = FileSystem.get(context.hadoopConfiguration());
final Path path = new Path(pathToFile);
return hdfs.exists(path);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
+
+ private static void removeOutputDir(SparkSession spark, String path) {
+ HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
+ }
}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java
new file mode 100644
index 000000000..85e4f3663
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java
@@ -0,0 +1,151 @@
+package eu.dnetlib.dhp.oa.graph.raw;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.schema.common.ModelSupport;
+import eu.dnetlib.dhp.schema.oaf.*;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FilterFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass;
+
+public class MergeClaimsApplication {
+
+ private static final Logger log = LoggerFactory.getLogger(MergeClaimsApplication.class);
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ public static void main(final String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils.toString(MigrateMongoMdstoresApplication.class
+ .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json")));
+ parser.parseArgument(args);
+
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final String rawGraphPath = parser.get("rawGraphPath");
+ log.info("rawGraphPath: {}", rawGraphPath);
+
+ final String claimsGraphPath = parser.get("claimsGraphPath");
+ log.info("claimsGraphPath: {}", claimsGraphPath);
+
+ final String outputRawGaphPath = parser.get("outputRawGaphPath");
+ log.info("outputRawGaphPath: {}", outputRawGaphPath);
+
+ String graphTableClassName = parser.get("graphTableClassName");
+ log.info("graphTableClassName: {}", graphTableClassName);
+
+ Class extends Oaf> clazz = (Class extends Oaf>) Class.forName(graphTableClassName);
+
+ SparkConf conf = new SparkConf();
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.registerKryoClasses(ModelSupport.getOafModelClasses());
+
+ runWithSparkSession(conf, isSparkSessionManaged, spark -> {
+ String type = clazz.getSimpleName().toLowerCase();
+
+ String rawPath = rawGraphPath + "/" + type;
+ String claimPath = claimsGraphPath + "/" + type;
+ String outPath = outputRawGaphPath + "/" + type;
+
+ removeOutputDir(spark, outPath);
+ mergeByType(spark, rawPath, claimPath, outPath, clazz);
+ });
+ }
+
+ private static void mergeByType(SparkSession spark, String rawPath, String claimPath, String outPath, Class clazz) {
+ Dataset> raw = readFromPath(spark, rawPath, clazz)
+ .map((MapFunction>) value -> new Tuple2<>(idFn().apply(value), value), Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
+
+ final JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+ Dataset> claim = jsc.broadcast(readFromPath(spark, claimPath, clazz))
+ .getValue()
+ .map((MapFunction>) value -> new Tuple2<>(idFn().apply(value), value), Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
+
+ /*
+ Dataset> claim = readFromPath(spark, claimPath, clazz)
+ .map((MapFunction>) value -> new Tuple2<>(idFn().apply(value), value), Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
+ */
+
+ raw.joinWith(claim, raw.col("_1").equalTo(claim.col("_1")), "full_outer")
+ .map((MapFunction, Tuple2>, T>) value -> {
+
+ Optional> opRaw = Optional.ofNullable(value._1());
+ Optional> opClaim = Optional.ofNullable(value._2());
+
+ return opRaw.isPresent() ? opRaw.get()._2() : opClaim.isPresent() ? opClaim.get()._2() : null;
+ }, Encoders.bean(clazz))
+ .filter(Objects::nonNull)
+ .write()
+ .mode(SaveMode.Overwrite)
+ .parquet(outPath);
+ }
+
+ private static Dataset readFromPath(SparkSession spark, String path, Class clazz) {
+ return spark.read()
+ .load(path)
+ .as(Encoders.bean(clazz))
+ .filter((FilterFunction) value -> Objects.nonNull(idFn().apply(value)));
+ }
+
+ private static void removeOutputDir(SparkSession spark, String path) {
+ HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
+ }
+
+ private static Function idFn() {
+ return x -> {
+ if (isSubClass(x, Relation.class)) {
+ return idFnForRelation(x);
+ }
+ return idFnForOafEntity(x);
+ };
+ }
+
+ private static String idFnForRelation(T t) {
+ Relation r = (Relation) t;
+ return Optional.ofNullable(r.getSource())
+ .map(source -> Optional.ofNullable(r.getTarget())
+ .map(target -> Optional.ofNullable(r.getRelType())
+ .map(relType -> Optional.ofNullable(r.getSubRelType())
+ .map(subRelType -> Optional.ofNullable(r.getRelClass())
+ .map(relClass -> String.join(source, target, relType, subRelType, relClass))
+ .orElse(String.join(source, target, relType, subRelType))
+ )
+ .orElse(String.join(source, target, relType))
+ )
+ .orElse(String.join(source, target))
+ )
+ .orElse(source)
+ )
+ .orElse(null);
+ }
+
+ private static String idFnForOafEntity(T t) {
+ return ((OafEntity) t).getId();
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step1/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java
similarity index 91%
rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step1/MigrateDbEntitiesApplication.java
rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java
index 7db2b1772..f8f6b58cc 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step1/MigrateDbEntitiesApplication.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java
@@ -1,14 +1,13 @@
-package eu.dnetlib.dhp.migration.step1;
+package eu.dnetlib.dhp.oa.graph.raw;
-import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.asString;
-import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.createOpenaireId;
-import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.dataInfo;
-import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.field;
-import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.journal;
-import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.listFields;
-import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.listKeyValues;
-import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.qualifier;
-import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.structuredProperty;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
+import eu.dnetlib.dhp.oa.graph.raw.common.DbClient;
+import eu.dnetlib.dhp.schema.oaf.*;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import java.io.Closeable;
import java.io.IOException;
@@ -22,31 +21,7 @@ import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.migration.utils.AbstractMigrationApplication;
-import eu.dnetlib.dhp.migration.utils.DbClient;
-import eu.dnetlib.dhp.schema.oaf.Context;
-import eu.dnetlib.dhp.schema.oaf.DataInfo;
-import eu.dnetlib.dhp.schema.oaf.Dataset;
-import eu.dnetlib.dhp.schema.oaf.Datasource;
-import eu.dnetlib.dhp.schema.oaf.Field;
-import eu.dnetlib.dhp.schema.oaf.Journal;
-import eu.dnetlib.dhp.schema.oaf.KeyValue;
-import eu.dnetlib.dhp.schema.oaf.Oaf;
-import eu.dnetlib.dhp.schema.oaf.Organization;
-import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
-import eu.dnetlib.dhp.schema.oaf.Project;
-import eu.dnetlib.dhp.schema.oaf.Publication;
-import eu.dnetlib.dhp.schema.oaf.Qualifier;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-import eu.dnetlib.dhp.schema.oaf.Result;
-import eu.dnetlib.dhp.schema.oaf.Software;
-import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
+import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.*;
public class MigrateDbEntitiesApplication extends AbstractMigrationApplication implements Closeable {
@@ -61,7 +36,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
- IOUtils.toString(MigrateDbEntitiesApplication.class.getResourceAsStream("/eu/dnetlib/dhp/migration/migrate_db_entities_parameters.json")));
+ IOUtils.toString(MigrateDbEntitiesApplication.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_db_entities_parameters.json")));
parser.parseArgument(args);
@@ -111,7 +86,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
}
public void execute(final String sqlFile, final Function> producer) throws Exception {
- final String sql = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/migration/sql/" + sqlFile));
+ final String sql = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/sql/" + sqlFile));
final Consumer consumer = rs -> producer.apply(rs).forEach(oaf -> emitOaf(oaf));
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step1/MigrateMongoMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java
similarity index 89%
rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step1/MigrateMongoMdstoresApplication.java
rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java
index b1de31326..585209ac9 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step1/MigrateMongoMdstoresApplication.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java
@@ -1,18 +1,17 @@
-package eu.dnetlib.dhp.migration.step1;
+package eu.dnetlib.dhp.oa.graph.raw;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
+import eu.dnetlib.dhp.oa.graph.raw.common.MdstoreClient;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.migration.utils.AbstractMigrationApplication;
-import eu.dnetlib.dhp.migration.utils.MdstoreClient;
-
public class MigrateMongoMdstoresApplication extends AbstractMigrationApplication implements Closeable {
private static final Log log = LogFactory.getLog(MigrateMongoMdstoresApplication.class);
@@ -21,7 +20,7 @@ public class MigrateMongoMdstoresApplication extends AbstractMigrationApplicatio
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
- IOUtils.toString(MigrateMongoMdstoresApplication.class.getResourceAsStream("/eu/dnetlib/dhp/migration/migrate_mongo_mstores_parameters.json")));
+ IOUtils.toString(MigrateMongoMdstoresApplication.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_mongo_mstores_parameters.json")));
parser.parseArgument(args);
final String mongoBaseUrl = parser.get("mongoBaseUrl");
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/OafToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java
similarity index 91%
rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/OafToOafMapper.java
rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java
index 110abc486..aed582d8f 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/OafToOafMapper.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java
@@ -1,27 +1,17 @@
-package eu.dnetlib.dhp.migration.step2;
+package eu.dnetlib.dhp.oa.graph.raw;
-import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.createOpenaireId;
-import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.field;
+import eu.dnetlib.dhp.oa.graph.raw.common.PacePerson;
+import eu.dnetlib.dhp.schema.oaf.*;
+import org.dom4j.Document;
+import org.dom4j.Node;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import org.dom4j.Document;
-import org.dom4j.Node;
-
-import eu.dnetlib.dhp.migration.utils.PacePerson;
-import eu.dnetlib.dhp.schema.oaf.Author;
-import eu.dnetlib.dhp.schema.oaf.DataInfo;
-import eu.dnetlib.dhp.schema.oaf.Field;
-import eu.dnetlib.dhp.schema.oaf.GeoLocation;
-import eu.dnetlib.dhp.schema.oaf.Instance;
-import eu.dnetlib.dhp.schema.oaf.KeyValue;
-import eu.dnetlib.dhp.schema.oaf.Oaf;
-import eu.dnetlib.dhp.schema.oaf.Qualifier;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
+import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
+import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
public class OafToOafMapper extends AbstractMdRecordToOafMapper {
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/OdfToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java
similarity index 93%
rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/OdfToOafMapper.java
rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java
index b4868b8f9..6a6def977 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/OdfToOafMapper.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java
@@ -1,28 +1,16 @@
-package eu.dnetlib.dhp.migration.step2;
+package eu.dnetlib.dhp.oa.graph.raw;
-import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.createOpenaireId;
-import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.field;
-import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.structuredProperty;
+import eu.dnetlib.dhp.schema.oaf.*;
+import org.apache.commons.lang3.StringUtils;
+import org.dom4j.Document;
+import org.dom4j.Node;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import org.apache.commons.lang3.StringUtils;
-import org.dom4j.Document;
-import org.dom4j.Node;
-
-import eu.dnetlib.dhp.schema.oaf.Author;
-import eu.dnetlib.dhp.schema.oaf.DataInfo;
-import eu.dnetlib.dhp.schema.oaf.Field;
-import eu.dnetlib.dhp.schema.oaf.GeoLocation;
-import eu.dnetlib.dhp.schema.oaf.Instance;
-import eu.dnetlib.dhp.schema.oaf.KeyValue;
-import eu.dnetlib.dhp.schema.oaf.Oaf;
-import eu.dnetlib.dhp.schema.oaf.Qualifier;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
+import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.*;
public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/AbstractMigrationApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java
similarity index 98%
rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/AbstractMigrationApplication.java
rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java
index e1a5e5fa7..aec1ea50d 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/AbstractMigrationApplication.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java
@@ -1,9 +1,6 @@
-package eu.dnetlib.dhp.migration.utils;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
+package eu.dnetlib.dhp.oa.graph.raw.common;
+import eu.dnetlib.dhp.schema.oaf.Oaf;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -12,7 +9,9 @@ import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.codehaus.jackson.map.ObjectMapper;
-import eu.dnetlib.dhp.schema.oaf.Oaf;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
public class AbstractMigrationApplication implements Closeable {
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/DbClient.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/DbClient.java
similarity index 90%
rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/DbClient.java
rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/DbClient.java
index 8e9784346..9c0562946 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/DbClient.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/DbClient.java
@@ -1,18 +1,14 @@
-package eu.dnetlib.dhp.migration.utils;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.function.Consumer;
+package eu.dnetlib.dhp.oa.graph.raw.common;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.*;
+import java.util.function.Consumer;
+
public class DbClient implements Closeable {
private static final Log log = LogFactory.getLog(DbClient.class);
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/MdstoreClient.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/MdstoreClient.java
similarity index 98%
rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/MdstoreClient.java
rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/MdstoreClient.java
index 612503da7..ac700ef63 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/MdstoreClient.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/MdstoreClient.java
@@ -1,4 +1,14 @@
-package eu.dnetlib.dhp.migration.utils;
+package eu.dnetlib.dhp.oa.graph.raw.common;
+
+import com.google.common.collect.Iterables;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.bson.Document;
import java.io.Closeable;
import java.io.IOException;
@@ -7,17 +17,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.stream.StreamSupport;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.bson.Document;
-
-import com.google.common.collect.Iterables;
-import com.mongodb.MongoClient;
-import com.mongodb.MongoClientURI;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoDatabase;
-
public class MdstoreClient implements Closeable {
private final MongoClient client;
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/OafMapperUtils.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/OafMapperUtils.java
similarity index 92%
rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/OafMapperUtils.java
rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/OafMapperUtils.java
index 8e51c1858..d02070a8b 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/OafMapperUtils.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/OafMapperUtils.java
@@ -1,4 +1,8 @@
-package eu.dnetlib.dhp.migration.utils;
+package eu.dnetlib.dhp.oa.graph.raw.common;
+
+import eu.dnetlib.dhp.schema.oaf.*;
+import eu.dnetlib.dhp.utils.DHPUtils;
+import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
@@ -6,19 +10,6 @@ import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
-import org.apache.commons.lang3.StringUtils;
-
-import eu.dnetlib.dhp.schema.oaf.DataInfo;
-import eu.dnetlib.dhp.schema.oaf.ExtraInfo;
-import eu.dnetlib.dhp.schema.oaf.Field;
-import eu.dnetlib.dhp.schema.oaf.Journal;
-import eu.dnetlib.dhp.schema.oaf.KeyValue;
-import eu.dnetlib.dhp.schema.oaf.OAIProvenance;
-import eu.dnetlib.dhp.schema.oaf.OriginDescription;
-import eu.dnetlib.dhp.schema.oaf.Qualifier;
-import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
-import eu.dnetlib.dhp.utils.DHPUtils;
-
public class OafMapperUtils {
public static KeyValue keyValue(final String k, final String v) {
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/PacePerson.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/PacePerson.java
similarity index 97%
rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/PacePerson.java
rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/PacePerson.java
index 69e128e63..a72788728 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/PacePerson.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/PacePerson.java
@@ -1,19 +1,18 @@
-package eu.dnetlib.dhp.migration.utils;
-
-import java.nio.charset.Charset;
-import java.text.Normalizer;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.text.WordUtils;
+package eu.dnetlib.dhp.oa.graph.raw.common;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.text.WordUtils;
+
+import java.nio.charset.Charset;
+import java.text.Normalizer;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
public class PacePerson {
@@ -105,7 +104,7 @@ public class PacePerson {
private List splitTerms(final String s) {
if (particles == null) {
- particles = loadFromClasspath("/eu/dnetlib/dhp/migration/pace/name_particles.txt");
+ particles = loadFromClasspath("/eu/dnetlib/dhp/oa/graph/pace/name_particles.txt");
}
final List list = Lists.newArrayList();
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/dispatch_entities_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dispatch_entities_parameters.json
similarity index 61%
rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/dispatch_entities_parameters.json
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dispatch_entities_parameters.json
index 8c81290ca..7d995f39a 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/dispatch_entities_parameters.json
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dispatch_entities_parameters.json
@@ -1,16 +1,16 @@
[
+ {
+ "paramName": "issm",
+ "paramLongName": "isSparkSessionManaged",
+ "paramDescription": "when true will stop SparkSession after job execution",
+ "paramRequired": false
+ },
{
"paramName": "s",
"paramLongName": "sourcePath",
"paramDescription": "the source path",
"paramRequired": true
},
- {
- "paramName": "mt",
- "paramLongName": "master",
- "paramDescription": "should be local or yarn",
- "paramRequired": true
- },
{
"paramName": "g",
"paramLongName": "graphRawPath",
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/generate_entities_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json
similarity index 82%
rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/generate_entities_parameters.json
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json
index 53ee010c4..293bf041b 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/generate_entities_parameters.json
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/generate_entities_parameters.json
@@ -1,16 +1,16 @@
[
+ {
+ "paramName": "issm",
+ "paramLongName": "isSparkSessionManaged",
+ "paramDescription": "when true will stop SparkSession after job execution",
+ "paramRequired": false
+ },
{
"paramName": "s",
"paramLongName": "sourcePaths",
"paramDescription": "the HDFS source paths which contains the sequential file (comma separated)",
"paramRequired": true
},
- {
- "paramName": "mt",
- "paramLongName": "master",
- "paramDescription": "should be local or yarn",
- "paramRequired": true
- },
{
"paramName": "t",
"paramLongName": "targetPath",
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json
new file mode 100644
index 000000000..686fea643
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json
@@ -0,0 +1,32 @@
+[
+ {
+ "paramName": "issm",
+ "paramLongName": "isSparkSessionManaged",
+ "paramDescription": "when true will stop SparkSession after job execution",
+ "paramRequired": false
+ },
+ {
+ "paramName": "rgp",
+ "paramLongName": "rawGraphPath",
+ "paramDescription": "the raw graph path",
+ "paramRequired": true
+ },
+ {
+ "paramName": "cgp",
+ "paramLongName": "claimsGraphPath",
+ "paramDescription": "the path of the claims graph",
+ "paramRequired": true
+ },
+ {
+ "paramName": "ogp",
+ "paramLongName": "outputRawGaphPath",
+ "paramDescription": "the path of output graph, combining raw and claims",
+ "paramRequired": true
+ },
+ {
+ "paramName": "clazz",
+ "paramLongName": "graphTableClassName",
+ "paramDescription": "class name associated to the input entity path",
+ "paramRequired": true
+ }
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_actionsets_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_actionsets_parameters.json
new file mode 100644
index 000000000..c4910ec61
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_actionsets_parameters.json
@@ -0,0 +1,10 @@
+[
+ {"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true},
+ {"paramName":"sn", "paramLongName":"sourceNameNode", "paramDescription": "nameNode of the source cluster", "paramRequired": true},
+ {"paramName":"tn", "paramLongName":"targetNameNode", "paramDescription": "namoNode of the target cluster", "paramRequired": true},
+ {"paramName":"w", "paramLongName":"workingDirectory", "paramDescription": "working directory", "paramRequired": true},
+ {"paramName":"nm", "paramLongName":"distcp_num_maps", "paramDescription": "maximum number of map tasks used in the distcp process", "paramRequired": true},
+ {"paramName":"mm", "paramLongName":"distcp_memory_mb", "paramDescription": "memory for distcp action copying actionsets from remote cluster", "paramRequired": true},
+ {"paramName":"tt", "paramLongName":"distcp_task_timeout", "paramDescription": "timeout for distcp copying actions from remote cluster", "paramRequired": true},
+ {"paramName":"tr", "paramLongName":"transform_only", "paramDescription": "activate tranform-only mode. Only apply transformation step", "paramRequired": true}
+]
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_db_entities_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_db_entities_parameters.json
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_db_entities_parameters.json
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_db_entities_parameters.json
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_mongo_mstores_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_mongo_mstores_parameters.json
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_mongo_mstores_parameters.json
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_mongo_mstores_parameters.json
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/pace/name_particles.txt b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/pace/name_particles.txt
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/pace/name_particles.txt
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/pace/name_particles.txt
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/claims/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/config-default.xml
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/claims/oozie_app/config-default.xml
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/config-default.xml
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml
new file mode 100644
index 000000000..11c8f71a1
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml
@@ -0,0 +1,538 @@
+
+
+
+
+ graphBasePath
+ the target path to store raw graph
+
+
+ reuseContent
+ false
+ should import content from the aggregator or reuse a previous version
+
+
+ postgresURL
+ the postgres URL to access to the database
+
+
+ postgresUser
+ the user postgres
+
+
+ postgresPassword
+ the password postgres
+
+
+ mongoURL
+ mongoDB url, example: mongodb://[username:password@]host[:port]
+
+
+ mongoDb
+ mongo database
+
+
+
+ sparkDriverMemory
+ memory for driver process
+
+
+ sparkExecutorMemory
+ memory for individual executor
+
+
+ sparkExecutorCores
+ number of cores used by single executor
+
+
+ oozieActionShareLibForSpark2
+ oozie action sharelib for spark 2.*
+
+
+ spark2ExtraListeners
+ com.cloudera.spark.lineage.NavigatorAppListener
+ spark 2.* extra listeners classname
+
+
+ spark2SqlQueryExecutionListeners
+ com.cloudera.spark.lineage.NavigatorQueryListener
+ spark 2.* sql query execution listeners classname
+
+
+ spark2YarnHistoryServerAddress
+ spark 2.* yarn history server address
+
+
+ spark2EventLogDir
+ spark 2.* event log dir location
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ mapreduce.job.queuename
+ ${queueName}
+
+
+ oozie.launcher.mapred.job.queue.name
+ ${oozieLauncherQueueName}
+
+
+ oozie.action.sharelib.for.spark
+ ${oozieActionShareLibForSpark2}
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ ${wf:conf('reuseContent') eq false}
+ ${wf:conf('reuseContent') eq true}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication
+ -p${workingDir}/db_claims
+ -pgurl${postgresURL}
+ -pguser${postgresUser}
+ -pgpasswd${postgresPassword}
+ -aclaims
+
+
+
+
+
+
+
+
+
+
+ eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication
+ -p${workingDir}/odf_claims
+ -mongourl${mongoURL}
+ -mongodb${mongoDb}
+ -fODF
+ -lstore
+ -iclaim
+
+
+
+
+
+
+
+
+
+
+ eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication
+ -p${workingDir}/oaf_claims
+ -mongourl${mongoURL}
+ -mongodb${mongoDb}
+ -fOAF
+ -lstore
+ -iclaim
+
+
+
+
+
+
+
+
+
+
+ eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication
+ -p${workingDir}/db_records
+ -pgurl${postgresURL}
+ -pguser${postgresUser}
+ -pgpasswd${postgresPassword}
+
+
+
+
+
+
+
+
+
+
+ eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication
+ -p${workingDir}/odf_records
+ -mongourl${mongoURL}
+ -mongodb${mongoDb}
+ -fODF
+ -lstore
+ -icleaned
+
+
+
+
+
+
+
+
+
+
+ eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication
+ -p${workingDir}/oaf_records
+ -mongourl${mongoURL}
+ -mongodb${mongoDb}
+ -fOAF
+ -lstore
+ -icleaned
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ yarn
+ cluster
+ GenerateEntities_claim
+ eu.dnetlib.dhp.oa.graph.raw.GenerateEntitiesApplication
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory ${sparkExecutorMemory}
+ --executor-cores ${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ -s${workingDir}/db_claims,${workingDir}/oaf_claims,${workingDir}/odf_claims
+ -t${workingDir}/entities_claim
+ -pgurl${postgresURL}
+ -pguser${postgresUser}
+ -pgpasswd${postgresPassword}
+
+
+
+
+
+
+
+ yarn
+ cluster
+ GenerateGraph_claims
+ eu.dnetlib.dhp.oa.graph.raw.DispatchEntitiesApplication
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory ${sparkExecutorMemory}
+ --executor-cores ${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ -s${workingDir}/entities_claim
+ -g${workingDir}/graph_claims
+
+
+
+
+
+
+
+ yarn
+ cluster
+ GenerateEntities
+ eu.dnetlib.dhp.oa.graph.raw.GenerateEntitiesApplication
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory ${sparkExecutorMemory}
+ --executor-cores ${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ -s${workingDir}/db_records,${workingDir}/oaf_records,${workingDir}/odf_records
+ -t${workingDir}/entities
+ -pgurl${postgresURL}
+ -pguser${postgresUser}
+ -pgpasswd${postgresPassword}
+
+
+
+
+
+
+
+ yarn
+ cluster
+ GenerateGraph
+ eu.dnetlib.dhp.oa.graph.raw.DispatchEntitiesApplication
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory ${sparkExecutorMemory}
+ --executor-cores ${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=7680
+
+ -s${workingDir}/entities
+ -g${workingDir}/graph_raw
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ yarn
+ cluster
+ MergeClaims_publication
+ eu.dnetlib.dhp.oa.graph.raw.MergeClaimsApplication
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory ${sparkExecutorMemory}
+ --executor-cores ${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=7680
+
+ --rawGraphPath${workingDir}/graph_raw
+ --claimsGraphPath${workingDir}/graph_claims
+ --outputRawGaphPath${graphBasePath}/graph_raw
+ --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication
+
+
+
+
+
+
+
+ yarn
+ cluster
+ MergeClaims_dataset
+ eu.dnetlib.dhp.oa.graph.raw.MergeClaimsApplication
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory ${sparkExecutorMemory}
+ --executor-cores ${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=7680
+
+ --rawGraphPath${workingDir}/graph_raw
+ --claimsGraphPath${workingDir}/graph_claims
+ --outputRawGaphPath${graphBasePath}/graph_raw
+ --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset
+
+
+
+
+
+
+
+ yarn
+ cluster
+ MergeClaims_relation
+ eu.dnetlib.dhp.oa.graph.raw.MergeClaimsApplication
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory ${sparkExecutorMemory}
+ --executor-cores ${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --rawGraphPath${workingDir}/graph_raw
+ --claimsGraphPath${workingDir}/graph_claims
+ --outputRawGaphPath${graphBasePath}/graph_raw
+ --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Relation
+
+
+
+
+
+
+
+ yarn
+ cluster
+ MergeClaims_software
+ eu.dnetlib.dhp.oa.graph.raw.MergeClaimsApplication
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory ${sparkExecutorMemory}
+ --executor-cores ${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=1920
+
+ --rawGraphPath${workingDir}/graph_raw
+ --claimsGraphPath${workingDir}/graph_claims
+ --outputRawGaphPath${graphBasePath}/graph_raw
+ --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software
+
+
+
+
+
+
+
+ yarn
+ cluster
+ MergeClaims_otherresearchproduct
+ eu.dnetlib.dhp.oa.graph.raw.MergeClaimsApplication
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory ${sparkExecutorMemory}
+ --executor-cores ${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=1920
+
+ --rawGraphPath${workingDir}/graph_raw
+ --claimsGraphPath${workingDir}/graph_claims
+ --outputRawGaphPath${graphBasePath}/graph_raw
+ --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct
+
+
+
+
+
+
+
+ yarn
+ cluster
+ MergeClaims_datasource
+ eu.dnetlib.dhp.oa.graph.raw.MergeClaimsApplication
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory ${sparkExecutorMemory}
+ --executor-cores ${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=200
+
+ --rawGraphPath${workingDir}/graph_raw
+ --claimsGraphPath${workingDir}/graph_claims
+ --outputRawGaphPath${graphBasePath}/graph_raw
+ --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource
+
+
+
+
+
+
+
+ yarn
+ cluster
+ MergeClaims_organization
+ eu.dnetlib.dhp.oa.graph.raw.MergeClaimsApplication
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory ${sparkExecutorMemory}
+ --executor-cores ${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=200
+
+ --rawGraphPath${workingDir}/graph_raw
+ --claimsGraphPath${workingDir}/graph_claims
+ --outputRawGaphPath${graphBasePath}/graph_raw
+ --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization
+
+
+
+
+
+
+
+ yarn
+ cluster
+ MergeClaims_project
+ eu.dnetlib.dhp.oa.graph.raw.MergeClaimsApplication
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory ${sparkExecutorMemory}
+ --executor-cores ${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=200
+
+ --rawGraphPath${workingDir}/graph_raw
+ --claimsGraphPath${workingDir}/graph_claims
+ --outputRawGaphPath${graphBasePath}/graph_raw
+ --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_all_steps/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/config-default.xml
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_all_steps/oozie_app/config-default.xml
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/config-default.xml
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/claims/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/workflow.xml
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/claims/oozie_app/workflow.xml
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/workflow.xml
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step1/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/config-default.xml
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step1/oozie_app/config-default.xml
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/config-default.xml
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step1_onlydb/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step1_onlydb/oozie_app/workflow.xml
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step1_onlydb/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_step1/oozie_app/config-default.xml
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step1_onlydb/oozie_app/config-default.xml
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_step1/oozie_app/config-default.xml
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step1/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_step1/oozie_app/workflow.xml
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step1/oozie_app/workflow.xml
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_step1/oozie_app/workflow.xml
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step2/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_step2/oozie_app/config-default.xml
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step2/oozie_app/config-default.xml
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_step2/oozie_app/config-default.xml
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step2/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_step2/oozie_app/workflow.xml
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step2/oozie_app/workflow.xml
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_step2/oozie_app/workflow.xml
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step3/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_step3/oozie_app/config-default.xml
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step3/oozie_app/config-default.xml
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_step3/oozie_app/config-default.xml
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step3/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_step3/oozie_app/workflow.xml
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step3/oozie_app/workflow.xml
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_step3/oozie_app/workflow.xml
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryClaims.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryClaims.sql
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryClaims.sql
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryClaims.sql
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryDatasourceOrganization.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasourceOrganization.sql
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryDatasourceOrganization.sql
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasourceOrganization.sql
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryDatasources.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryDatasources.sql
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryOrganizations.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizations.sql
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryOrganizations.sql
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizations.sql
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryOrganizationsFromOpenOrgsDB.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizationsFromOpenOrgsDB.sql
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryOrganizationsFromOpenOrgsDB.sql
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryOrganizationsFromOpenOrgsDB.sql
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryProjectOrganization.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryProjectOrganization.sql
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryProjectOrganization.sql
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryProjectOrganization.sql
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryProjects.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryProjects.sql
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryProjects.sql
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryProjects.sql
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryProjects_production.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryProjects_production.sql
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryProjects_production.sql
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryProjects_production.sql
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/querySimilarityFromOpenOrgsDB.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/querySimilarityFromOpenOrgsDB.sql
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/querySimilarityFromOpenOrgsDB.sql
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/querySimilarityFromOpenOrgsDB.sql
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/transform_actionsets_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/transform_actionsets_parameters.json
new file mode 100644
index 000000000..6fa10f739
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/transform_actionsets_parameters.json
@@ -0,0 +1,20 @@
+[
+ {
+ "paramName": "mt",
+ "paramLongName": "master",
+ "paramDescription": "should be local or yarn",
+ "paramRequired": true
+ },
+ {
+ "paramName": "is",
+ "paramLongName": "isLookupUrl",
+ "paramDescription": "URL of the isLookUp Service",
+ "paramRequired": true
+ },
+ {
+ "paramName": "i",
+ "paramLongName": "inputPaths",
+ "paramDescription": "URL of the isLookUp Service",
+ "paramRequired": true
+ }
+]
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJobTest.java
index 29ca46d1d..ecadbe981 100644
--- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJobTest.java
+++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/GraphHiveImporterJobTest.java
@@ -1,34 +1,25 @@
package eu.dnetlib.dhp.oa.graph;
-import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.common.ModelSupport;
-import eu.dnetlib.dhp.schema.oaf.Oaf;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.MapFunction;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
-import org.junit.jupiter.api.*;
-import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Objects;
public class GraphHiveImporterJobTest {
private static final Logger log = LoggerFactory.getLogger(GraphHiveImporterJobTest.class);
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
- private static final ClassLoader cl = GraphHiveImporterJobTest.class.getClassLoader();
-
public static final String JDBC_DERBY_TEMPLATE = "jdbc:derby:;databaseName=%s/junit_metastore_db;create=true";
private static SparkSession spark;
diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/migration/step2/MappersTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java
similarity index 91%
rename from dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/migration/step2/MappersTest.java
rename to dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java
index 894355bcb..63d7d50db 100644
--- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/migration/step2/MappersTest.java
+++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java
@@ -1,27 +1,24 @@
-package eu.dnetlib.dhp.migration.step2;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
+package eu.dnetlib.dhp.oa.graph.raw;
+import com.google.common.collect.Maps;
+import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
-import eu.dnetlib.dhp.schema.oaf.Dataset;
-import eu.dnetlib.dhp.schema.oaf.Oaf;
-import eu.dnetlib.dhp.schema.oaf.Publication;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-import eu.dnetlib.dhp.schema.oaf.Software;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class MappersTest {
@@ -30,7 +27,7 @@ public class MappersTest {
private Map code2name;
@BeforeEach
- void setUp() throws Exception {
+ public void setUp() throws Exception {
when(code2name.get(anyString())).thenAnswer(invocation -> invocation.getArgument(0));
}
diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/migration/step1/MigrateDbEntitiesApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java
similarity index 99%
rename from dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/migration/step1/MigrateDbEntitiesApplicationTest.java
rename to dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java
index d63bb3ee3..b1fc9131f 100644
--- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/migration/step1/MigrateDbEntitiesApplicationTest.java
+++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java
@@ -1,4 +1,4 @@
-package eu.dnetlib.dhp.migration.step1;
+package eu.dnetlib.dhp.oa.graph.raw;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/migration/step1/claimscontext_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/claimscontext_resultset_entry.json
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/migration/step1/claimscontext_resultset_entry.json
rename to dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/claimscontext_resultset_entry.json
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/migration/step1/claimsrel_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/claimsrel_resultset_entry.json
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/migration/step1/claimsrel_resultset_entry.json
rename to dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/claimsrel_resultset_entry.json
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/migration/step1/datasourceorganization_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasourceorganization_resultset_entry.json
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/migration/step1/datasourceorganization_resultset_entry.json
rename to dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasourceorganization_resultset_entry.json
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/migration/step1/datasources_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasources_resultset_entry.json
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/migration/step1/datasources_resultset_entry.json
rename to dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasources_resultset_entry.json
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/migration/step2/oaf_record.xml b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_record.xml
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/migration/step2/oaf_record.xml
rename to dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_record.xml
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/migration/step2/odf_dataset.xml b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/odf_dataset.xml
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/migration/step2/odf_dataset.xml
rename to dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/odf_dataset.xml
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/migration/step2/odf_software.xml b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/odf_software.xml
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/migration/step2/odf_software.xml
rename to dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/odf_software.xml
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/migration/step1/organizations_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/organizations_resultset_entry.json
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/migration/step1/organizations_resultset_entry.json
rename to dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/organizations_resultset_entry.json
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/migration/step1/projectorganization_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projectorganization_resultset_entry.json
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/migration/step1/projectorganization_resultset_entry.json
rename to dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projectorganization_resultset_entry.json
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/migration/step1/projects_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json
similarity index 100%
rename from dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/migration/step1/projects_resultset_entry.json
rename to dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json
diff --git a/pom.xml b/pom.xml
index ae19ddbe5..52c601517 100644
--- a/pom.xml
+++ b/pom.xml
@@ -346,6 +346,12 @@
mongo-java-driver
${mongodb.driver.version}
+
+ org.postgresql
+ postgresql
+ 42.2.10
+
+
org.antlr
stringtemplate