diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml
index 1268afa3a1..d224ebc9f8 100644
--- a/dhp-common/pom.xml
+++ b/dhp-common/pom.xml
@@ -13,6 +13,26 @@
jar
+
+
+ eu.dnetlib.dhp
+ dhp-schemas
+ ${project.version}
+
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.spark
+ spark-core_2.11
+
+
+ org.apache.spark
+ spark-sql_2.11
+
+
commons-cli
commons-cli
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java
new file mode 100644
index 0000000000..d78520f55b
--- /dev/null
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java
@@ -0,0 +1,56 @@
+package eu.dnetlib.dhp.common;
+
+import java.io.Serializable;
+import java.util.function.Supplier;
+
+/**
+ * Provides serializable and throwing extensions to standard functional interfaces.
+ */
+public class FunctionalInterfaceSupport {
+
+ private FunctionalInterfaceSupport() {
+ }
+
+ /**
+ * Serializable supplier of any kind of objects. To be used withing spark processing pipelines when supplying
+ * functions externally.
+ *
+ * @param
+ */
+ @FunctionalInterface
+ public interface SerializableSupplier extends Supplier, Serializable {
+ }
+
+ /**
+ * Extension of consumer accepting functions throwing an exception.
+ *
+ * @param
+ * @param
+ */
+ @FunctionalInterface
+ public interface ThrowingConsumer {
+ void accept(T t) throws E;
+ }
+
+ /**
+ * Extension of supplier accepting functions throwing an exception.
+ *
+ * @param
+ * @param
+ */
+ @FunctionalInterface
+ public interface ThrowingSupplier {
+ T get() throws E;
+ }
+
+ /**
+ * Extension of runnable accepting functions throwing an exception.
+ *
+ * @param
+ */
+ @FunctionalInterface
+ public interface ThrowingRunnable {
+ void run() throws E;
+ }
+
+}
\ No newline at end of file
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java
new file mode 100644
index 0000000000..05beaa51e1
--- /dev/null
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java
@@ -0,0 +1,57 @@
+package eu.dnetlib.dhp.common;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static eu.dnetlib.dhp.common.ThrowingSupport.rethrowAsRuntimeException;
+
+/**
+ * HDFS utility methods.
+ */
+public class HdfsSupport {
+ private static final Logger logger = LoggerFactory.getLogger(HdfsSupport.class);
+
+ private HdfsSupport() {
+ }
+
+ /**
+ * Removes a path (file or dir) from HDFS.
+ *
+ * @param path Path to be removed
+ * @param configuration Configuration of hadoop env
+ */
+ public static void remove(String path, Configuration configuration) {
+ logger.info("Removing path: {}", path);
+ rethrowAsRuntimeException(() -> {
+ Path f = new Path(path);
+ FileSystem fileSystem = FileSystem.get(configuration);
+ if (fileSystem.exists(f)) {
+ fileSystem.delete(f, true);
+ }
+ });
+ }
+
+ /**
+ * Lists hadoop files located below path or alternatively lists subdirs under path.
+ *
+ * @param path Path to be listed for hadoop files
+ * @param configuration Configuration of hadoop env
+ * @return List with string locations of hadoop files
+ */
+ public static List listFiles(String path, Configuration configuration) {
+ logger.info("Listing files in path: {}", path);
+ return rethrowAsRuntimeException(() -> Arrays
+ .stream(FileSystem.get(configuration).listStatus(new Path(path)))
+ .filter(FileStatus::isDirectory)
+ .map(x -> x.getPath().toString())
+ .collect(Collectors.toList()));
+ }
+}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/SparkSessionSupport.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/SparkSessionSupport.java
new file mode 100644
index 0000000000..f42ee1c581
--- /dev/null
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/SparkSessionSupport.java
@@ -0,0 +1,57 @@
+package eu.dnetlib.dhp.common;
+
+import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.ThrowingConsumer;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Objects;
+import java.util.function.Function;
+
+/**
+ * SparkSession utility methods.
+ */
+public class SparkSessionSupport {
+
+ private SparkSessionSupport() {
+ }
+
+ /**
+ * Runs a given function using SparkSession created using default builder and supplied SparkConf. Stops SparkSession
+ * when SparkSession is managed. Allows to reuse SparkSession created externally.
+ *
+ * @param conf SparkConf instance
+ * @param isSparkSessionManaged When true will stop SparkSession
+ * @param fn Consumer to be applied to constructed SparkSession
+ */
+ public static void runWithSparkSession(SparkConf conf,
+ Boolean isSparkSessionManaged,
+ ThrowingConsumer fn) {
+ runWithSparkSession(c -> SparkSession.builder().config(c).getOrCreate(), conf, isSparkSessionManaged, fn);
+ }
+
+ /**
+ * Runs a given function using SparkSession created using supplied builder and supplied SparkConf. Stops SparkSession
+ * when SparkSession is managed. Allows to reuse SparkSession created externally.
+ *
+ * @param sparkSessionBuilder Builder of SparkSession
+ * @param conf SparkConf instance
+ * @param isSparkSessionManaged When true will stop SparkSession
+ * @param fn Consumer to be applied to constructed SparkSession
+ */
+ public static void runWithSparkSession(Function sparkSessionBuilder,
+ SparkConf conf,
+ Boolean isSparkSessionManaged,
+ ThrowingConsumer fn) {
+ SparkSession spark = null;
+ try {
+ spark = sparkSessionBuilder.apply(conf);
+ fn.accept(spark);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (Objects.nonNull(spark) && isSparkSessionManaged) {
+ spark.stop();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/ThrowingSupport.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/ThrowingSupport.java
new file mode 100644
index 0000000000..b32803c378
--- /dev/null
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/ThrowingSupport.java
@@ -0,0 +1,76 @@
+package eu.dnetlib.dhp.common;
+
+import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.ThrowingRunnable;
+import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.ThrowingSupplier;
+
+/**
+ * Exception handling utility methods.
+ */
+public class ThrowingSupport {
+
+ private ThrowingSupport() {
+ }
+
+ /**
+ * Executes given runnable and rethrows any exceptions as RuntimeException.
+ *
+ * @param fn Runnable to be executed
+ * @param Type of exception thrown
+ */
+ public static void rethrowAsRuntimeException(ThrowingRunnable fn) {
+ try {
+ fn.run();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Executes given runnable and rethrows any exceptions as RuntimeException with custom message.
+ *
+ * @param fn Runnable to be executed
+ * @param msg Message to be set for rethrown exception
+ * @param Type of exception thrown
+ */
+ public static void rethrowAsRuntimeException(ThrowingRunnable fn, String msg) {
+ try {
+ fn.run();
+ } catch (Exception e) {
+ throw new RuntimeException(msg, e);
+ }
+ }
+
+ /**
+ * Executes given supplier and rethrows any exceptions as RuntimeException.
+ *
+ * @param fn Supplier to be executed
+ * @param Type of returned value
+ * @param Type of exception thrown
+ * @return Result of supplier execution
+ */
+ public static T rethrowAsRuntimeException(ThrowingSupplier fn) {
+ try {
+ return fn.get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Executes given supplier and rethrows any exceptions as RuntimeException with custom message.
+ *
+ * @param fn Supplier to be executed
+ * @param msg Message to be set for rethrown exception
+ * @param Type of returned value
+ * @param Type of exception thrown
+ * @return Result of supplier execution
+ */
+ public static T rethrowAsRuntimeException(ThrowingSupplier fn, String msg) {
+ try {
+ return fn.get();
+ } catch (Exception e) {
+ throw new RuntimeException(msg, e);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/common/HdfsSupportTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/common/HdfsSupportTest.java
new file mode 100644
index 0000000000..f1e790ee7c
--- /dev/null
+++ b/dhp-common/src/test/java/eu/dnetlib/dhp/common/HdfsSupportTest.java
@@ -0,0 +1,78 @@
+package eu.dnetlib.dhp.common;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class HdfsSupportTest {
+
+ @Nested
+ class Remove {
+
+ @Test
+ public void shouldThrowARuntimeExceptionOnError() {
+ // when
+ assertThrows(RuntimeException.class, () ->
+ HdfsSupport.remove(null, new Configuration()));
+ }
+
+ @Test
+ public void shouldRemoveADirFromHDFS(@TempDir Path tempDir) {
+ // when
+ HdfsSupport.remove(tempDir.toString(), new Configuration());
+
+ // then
+ assertFalse(Files.exists(tempDir));
+ }
+
+ @Test
+ public void shouldRemoveAFileFromHDFS(@TempDir Path tempDir) throws IOException {
+ // given
+ Path file = Files.createTempFile(tempDir, "p", "s");
+
+ // when
+ HdfsSupport.remove(file.toString(), new Configuration());
+
+ // then
+ assertFalse(Files.exists(file));
+ }
+ }
+
+ @Nested
+ class ListFiles {
+
+ @Test
+ public void shouldThrowARuntimeExceptionOnError() {
+ // when
+ assertThrows(RuntimeException.class, () ->
+ HdfsSupport.listFiles(null, new Configuration()));
+ }
+
+ @Test
+ public void shouldListFilesLocatedInPath(@TempDir Path tempDir) throws IOException {
+ Path subDir1 = Files.createTempDirectory(tempDir, "list_me");
+ Path subDir2 = Files.createTempDirectory(tempDir, "list_me");
+
+ // when
+ List paths = HdfsSupport.listFiles(tempDir.toString(), new Configuration());
+
+ // then
+ assertEquals(2, paths.size());
+ List expecteds = Arrays.stream(new String[]{subDir1.toString(), subDir2.toString()})
+ .sorted().collect(Collectors.toList());
+ List actuals = paths.stream().sorted().collect(Collectors.toList());
+ assertTrue(actuals.get(0).contains(expecteds.get(0)));
+ assertTrue(actuals.get(1).contains(expecteds.get(1)));
+ }
+ }
+}
\ No newline at end of file
diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/common/ModelSupportTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/common/ModelSupportTest.java
new file mode 100644
index 0000000000..bfed019e9b
--- /dev/null
+++ b/dhp-common/src/test/java/eu/dnetlib/dhp/common/ModelSupportTest.java
@@ -0,0 +1,36 @@
+package eu.dnetlib.dhp.common;
+
+import eu.dnetlib.dhp.schema.common.ModelSupport;
+import eu.dnetlib.dhp.schema.oaf.OafEntity;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+import eu.dnetlib.dhp.schema.oaf.Result;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ModelSupportTest {
+
+ @Nested
+ class IsSubClass {
+
+ @Test
+ public void shouldReturnFalseWhenSubClassDoesNotExtendSuperClass() {
+ // when
+ Boolean result = ModelSupport.isSubClass(Relation.class, OafEntity.class);
+
+ // then
+ assertFalse(result);
+ }
+
+ @Test
+ public void shouldReturnTrueWhenSubClassExtendsSuperClass() {
+ // when
+ Boolean result = ModelSupport.isSubClass(Result.class, OafEntity.class);
+
+ // then
+ assertTrue(result);
+ }
+ }
+}
\ No newline at end of file
diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/common/SparkSessionSupportTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/common/SparkSessionSupportTest.java
new file mode 100644
index 0000000000..bc2dce3cff
--- /dev/null
+++ b/dhp-common/src/test/java/eu/dnetlib/dhp/common/SparkSessionSupportTest.java
@@ -0,0 +1,54 @@
+package eu.dnetlib.dhp.common;
+
+import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.ThrowingConsumer;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import java.util.function.Function;
+
+import static org.mockito.Mockito.*;
+
+public class SparkSessionSupportTest {
+
+ @Nested
+ class RunWithSparkSession {
+
+ @Test
+ public void shouldExecuteFunctionAndNotStopSparkSessionWhenSparkSessionIsNotManaged() throws Exception {
+ // given
+ SparkSession spark = mock(SparkSession.class);
+ SparkConf conf = mock(SparkConf.class);
+ Function sparkSessionBuilder = mock(Function.class);
+ when(sparkSessionBuilder.apply(conf)).thenReturn(spark);
+ ThrowingConsumer fn = mock(ThrowingConsumer.class);
+
+ // when
+ SparkSessionSupport.runWithSparkSession(sparkSessionBuilder, conf, false, fn);
+
+ // then
+ verify(sparkSessionBuilder).apply(conf);
+ verify(fn).accept(spark);
+ verify(spark, never()).stop();
+ }
+
+ @Test
+ public void shouldExecuteFunctionAndStopSparkSessionWhenSparkSessionIsManaged() throws Exception {
+ // given
+ SparkSession spark = mock(SparkSession.class);
+ SparkConf conf = mock(SparkConf.class);
+ Function sparkSessionBuilder = mock(Function.class);
+ when(sparkSessionBuilder.apply(conf)).thenReturn(spark);
+ ThrowingConsumer fn = mock(ThrowingConsumer.class);
+
+ // when
+ SparkSessionSupport.runWithSparkSession(sparkSessionBuilder, conf, true, fn);
+
+ // then
+ verify(sparkSessionBuilder).apply(conf);
+ verify(fn).accept(spark);
+ verify(spark, times(1)).stop();
+ }
+ }
+}
\ No newline at end of file
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java
new file mode 100644
index 0000000000..3c774aa385
--- /dev/null
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java
@@ -0,0 +1,51 @@
+package eu.dnetlib.dhp.schema.common;
+
+import eu.dnetlib.dhp.schema.oaf.Oaf;
+
+/**
+ * Inheritance utility methods.
+ */
+public class ModelSupport {
+
+ private ModelSupport() {
+ }
+
+ /**
+ * Checks subclass-superclass relationship.
+ *
+ * @param subClazzObject Subclass object instance
+ * @param superClazzObject Superclass object instance
+ * @param Subclass type
+ * @param Superclass type
+ * @return True if X is a subclass of Y
+ */
+ public static Boolean isSubClass(X subClazzObject, Y superClazzObject) {
+ return isSubClass(subClazzObject.getClass(), superClazzObject.getClass());
+ }
+
+ /**
+ * Checks subclass-superclass relationship.
+ *
+ * @param subClazzObject Subclass object instance
+ * @param superClazz Superclass class
+ * @param Subclass type
+ * @param Superclass type
+ * @return True if X is a subclass of Y
+ */
+ public static Boolean isSubClass(X subClazzObject, Class superClazz) {
+ return isSubClass(subClazzObject.getClass(), superClazz);
+ }
+
+ /**
+ * Checks subclass-superclass relationship.
+ *
+ * @param subClazz Subclass class
+ * @param superClazz Superclass class
+ * @param Subclass type
+ * @param Superclass type
+ * @return True if X is a subclass of Y
+ */
+ public static Boolean isSubClass(Class subClazz, Class superClazz) {
+ return superClazz.isAssignableFrom(subClazz);
+ }
+}
\ No newline at end of file
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java
index 6738b86938..e2471cd898 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java
@@ -92,8 +92,7 @@ public class Relation extends Oaf {
subRelType.equals(relation.subRelType) &&
relClass.equals(relation.relClass) &&
source.equals(relation.source) &&
- target.equals(relation.target) &&
- Objects.equals(collectedFrom, relation.collectedFrom);
+ target.equals(relation.target);
}
@Override
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java
new file mode 100644
index 0000000000..dcb3ac171d
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java
@@ -0,0 +1,167 @@
+package eu.dnetlib.dhp.oa.provision;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.oa.provision.model.*;
+import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
+import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils;
+import eu.dnetlib.dhp.schema.oaf.*;
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*;
+
+/**
+ * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects.
+ * The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization,
+ * and all the possible relationships (similarity links produced by the Dedup process are excluded).
+ *
+ * The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and again
+ * by E, finally grouped by E.id;
+ *
+ * The workflow is organized in different parts aimed to to reduce the complexity of the operation
+ * 1) PrepareRelationsJob:
+ * only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity
+ * can be linked at most to 100 other objects
+ *
+ * 2) JoinRelationEntityByTargetJob:
+ * prepare tuples [source entity - relation - target entity] (S - R - T):
+ * for each entity type E_i
+ * join (R.target = E_i.id),
+ * map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i]
+ * join (E_i.id = [R - T_i].source), where E_i becomes the source entity S
+ *
+ * 3) AdjacencyListBuilderJob:
+ * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mappnig the result as JoinedEntity
+ *
+ * 4) XmlConverterJob:
+ * convert the JoinedEntities as XML records
+ */
+public class AdjacencyListBuilderJob {
+
+ private static final Logger log = LoggerFactory.getLogger(AdjacencyListBuilderJob.class);
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ public static void main(String[] args) throws Exception {
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils.toString(
+ AdjacencyListBuilderJob.class
+ .getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json")));
+ parser.parseArgument(args);
+
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ String inputPath = parser.get("inputPath");
+ log.info("inputPath: {}", inputPath);
+
+ String outputPath = parser.get("outputPath");
+ log.info("outputPath: {}", outputPath);
+
+ SparkConf conf = new SparkConf();
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.registerKryoClasses(getKryoClasses());
+
+ runWithSparkSession(conf, isSparkSessionManaged,
+ spark -> {
+ removeOutputDir(spark, outputPath);
+ createAdjacencyLists(spark, inputPath, outputPath);
+ });
+
+ }
+
+ private static void createAdjacencyLists(SparkSession spark, String inputPath, String outputPath) {
+
+ RDD joined = spark.read()
+ .load(inputPath)
+ .as(Encoders.kryo(EntityRelEntity.class))
+ .javaRDD()
+ .map(e -> getJoinedEntity(e))
+ .mapToPair(e -> new Tuple2<>(e.getEntity().getId(), e))
+ .reduceByKey((j1, j2) -> getJoinedEntity(j1, j2))
+ .map(Tuple2::_2)
+ .rdd();
+
+ spark.createDataset(joined, Encoders.bean(JoinedEntity.class))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .parquet(outputPath);
+
+ }
+
+ private static JoinedEntity getJoinedEntity(JoinedEntity j1, JoinedEntity j2) {
+ JoinedEntity je = new JoinedEntity();
+ je.setEntity(je.getEntity());
+ je.setType(j1.getType());
+
+ Links links = new Links();
+ links.addAll(j1.getLinks());
+ links.addAll(j2.getLinks());
+
+ return je;
+ }
+
+ private static JoinedEntity getJoinedEntity(EntityRelEntity e) {
+ JoinedEntity j = new JoinedEntity();
+ j.setEntity(toOafEntity(e.getEntity()));
+ j.setType(EntityType.valueOf(e.getEntity().getType()));
+ Links links = new Links();
+ links.add(new eu.dnetlib.dhp.oa.provision.model.Tuple2(e.getRelation(), e.getTarget()));
+ j.setLinks(links);
+ return j;
+ }
+
+ private static OafEntity toOafEntity(TypedRow typedRow) {
+ return parseOaf(typedRow.getOaf(), typedRow.getType());
+ }
+
+ private static OafEntity parseOaf(final String json, final String type) {
+ try {
+ switch (GraphMappingUtils.EntityType.valueOf(type)) {
+ case publication:
+ return OBJECT_MAPPER.readValue(json, Publication.class);
+ case dataset:
+ return OBJECT_MAPPER.readValue(json, Dataset.class);
+ case otherresearchproduct:
+ return OBJECT_MAPPER.readValue(json, OtherResearchProduct.class);
+ case software:
+ return OBJECT_MAPPER.readValue(json, Software.class);
+ case datasource:
+ return OBJECT_MAPPER.readValue(json, Datasource.class);
+ case organization:
+ return OBJECT_MAPPER.readValue(json, Organization.class);
+ case project:
+ return OBJECT_MAPPER.readValue(json, Project.class);
+ default:
+ throw new IllegalArgumentException("invalid type: " + type);
+ }
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ private static void removeOutputDir(SparkSession spark, String path) {
+ HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java
new file mode 100644
index 0000000000..0b153f8269
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java
@@ -0,0 +1,157 @@
+package eu.dnetlib.dhp.oa.provision;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity;
+import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
+import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils;
+import eu.dnetlib.dhp.schema.oaf.*;
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.FilterFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.Optional;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*;
+
+/**
+ * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects.
+ * The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization,
+ * and all the possible relationships (similarity links produced by the Dedup process are excluded).
+ *
+ * The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and again
+ * by E, finally grouped by E.id;
+ *
+ * The workflow is organized in different parts aimed to to reduce the complexity of the operation
+ * 1) PrepareRelationsJob:
+ * only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity
+ * can be linked at most to 100 other objects
+ *
+ * 2) CreateRelatedEntitiesJob_phase1:
+ * prepare tuples [relation - target entity] (R - T):
+ * for each entity type E_i
+ * join (R.target = E_i.id),
+ * map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i]
+ * save the tuples [R - T_i] in append mode
+ *
+ * 3) CreateRelatedEntitiesJob_phase2:
+ * prepare tuples [source entity - relation - target entity] (S - R - T):
+ * create the union of the each entity type, hash by id (S)
+ * for each [R - T_i] produced in phase1
+ * join S.id = [R - T_i].source to produce (S_i - R - T_i)
+ * save in append mode
+ *
+ * 4) AdjacencyListBuilderJob:
+ * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mappnig the result as JoinedEntity
+ *
+ * 5) XmlConverterJob:
+ * convert the JoinedEntities as XML records
+ */
+public class CreateRelatedEntitiesJob_phase1 {
+
+ private static final Logger log = LoggerFactory.getLogger(CreateRelatedEntitiesJob_phase1.class);
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ public static void main(String[] args) throws Exception {
+
+ String jsonConfiguration = IOUtils.toString(
+ PrepareRelationsJob.class
+ .getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase1.json"));
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
+ parser.parseArgument(args);
+
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ String inputRelationsPath = parser.get("inputRelationsPath");
+ log.info("inputRelationsPath: {}", inputRelationsPath);
+
+ String inputEntityPath = parser.get("inputEntityPath");
+ log.info("inputEntityPath: {}", inputEntityPath);
+
+ String outputPath = parser.get("outputPath");
+ log.info("outputPath: {}", outputPath);
+
+ String graphTableClassName = parser.get("graphTableClassName");
+ log.info("graphTableClassName: {}", graphTableClassName);
+
+ Class extends OafEntity> entityClazz = (Class extends OafEntity>) Class.forName(graphTableClassName);
+
+ SparkConf conf = new SparkConf();
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.registerKryoClasses(getKryoClasses());
+
+ runWithSparkSession(conf, isSparkSessionManaged,
+ spark -> {
+ removeOutputDir(spark, outputPath);
+ joinRelationEntity(spark, inputRelationsPath, inputEntityPath, entityClazz, outputPath);
+ });
+ }
+
+ private static void joinRelationEntity(SparkSession spark, String inputRelationsPath, String inputEntityPath, Class entityClazz, String outputPath) {
+
+ Dataset> relsByTarget = readPathRelation(spark, inputRelationsPath)
+ .map((MapFunction>) r -> new Tuple2<>(r.getTarget(), r),
+ Encoders.tuple(Encoders.STRING(), Encoders.kryo(SortableRelation.class)));
+
+ Dataset> entities = readPathEntity(spark, inputEntityPath, entityClazz)
+ .map((MapFunction>) e -> new Tuple2<>(e.getId(), e),
+ Encoders.tuple(Encoders.STRING(), Encoders.kryo(entityClazz)))
+ .cache();
+
+ relsByTarget
+ .joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner")
+ .filter((FilterFunction, Tuple2>>)
+ value -> value._2()._2().getDataInfo().getDeletedbyinference() == false)
+ .map((MapFunction, Tuple2>, EntityRelEntity>)
+ t -> new EntityRelEntity(t._1()._2(), GraphMappingUtils.asRelatedEntity(t._2()._2(), entityClazz)),
+ Encoders.bean(EntityRelEntity.class))
+ .write()
+ .mode(SaveMode.Append)
+ .parquet(outputPath);
+ }
+
+ private static Dataset readPathEntity(SparkSession spark, String inputEntityPath, Class entityClazz) {
+
+ log.info("Reading Graph table from: {}", inputEntityPath);
+ return spark
+ .read()
+ .textFile(inputEntityPath)
+ .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), Encoders.bean(entityClazz));
+ }
+
+ /**
+ * Reads a Dataset of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text file,
+ *
+ * @param spark
+ * @param relationPath
+ * @return the Dataset containing all the relationships
+ */
+ private static Dataset readPathRelation(SparkSession spark, final String relationPath) {
+
+ log.info("Reading relations from: {}", relationPath);
+ return spark.read()
+ .load(relationPath)
+ .as(Encoders.bean(SortableRelation.class));
+ }
+
+ private static void removeOutputDir(SparkSession spark, String path) {
+ HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
+ }
+
+
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java
new file mode 100644
index 0000000000..6c7f1efd74
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java
@@ -0,0 +1,168 @@
+package eu.dnetlib.dhp.oa.provision;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity;
+import eu.dnetlib.dhp.oa.provision.model.TypedRow;
+import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils;
+import eu.dnetlib.dhp.schema.oaf.*;
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*;
+
+/**
+ * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects.
+ * The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization,
+ * and all the possible relationships (similarity links produced by the Dedup process are excluded).
+ *
+ * The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and again
+ * by E, finally grouped by E.id;
+ *
+ * The workflow is organized in different parts aimed to to reduce the complexity of the operation
+ * 1) PrepareRelationsJob:
+ * only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity
+ * can be linked at most to 100 other objects
+ *
+ * 2) CreateRelatedEntitiesJob_phase1:
+ * prepare tuples [relation - target entity] (R - T):
+ * for each entity type E_i
+ * join (R.target = E_i.id),
+ * map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i]
+ * save the tuples [R - T_i] in append mode
+ *
+ * 3) CreateRelatedEntitiesJob_phase2:
+ * prepare tuples [source entity - relation - target entity] (S - R - T):
+ * create the union of the each entity type, hash by id (S)
+ * for each [R - T_i] produced in phase1
+ * join S.id = [R - T_i].source to produce (S_i - R - T_i)
+ * save in append mode
+ *
+ * 4) AdjacencyListBuilderJob:
+ * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mappnig the result as JoinedEntity
+ *
+ * 5) XmlConverterJob:
+ * convert the JoinedEntities as XML records
+ */
+public class CreateRelatedEntitiesJob_phase2 {
+
+ private static final Logger log = LoggerFactory.getLogger(CreateRelatedEntitiesJob_phase2.class);
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ public static void main(String[] args) throws Exception {
+
+ String jsonConfiguration = IOUtils.toString(
+ PrepareRelationsJob.class
+ .getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase1.json"));
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
+ parser.parseArgument(args);
+
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ String inputRelatedEntitiesPath = parser.get("inputRelatedEntitiesPath");
+ log.info("inputRelatedEntitiesPath: {}", inputRelatedEntitiesPath);
+
+ String inputGraphPath = parser.get("inputGraphPath");
+ log.info("inputGraphPath: {}", inputGraphPath);
+
+ String outputPath = parser.get("outputPath");
+ log.info("outputPath: {}", outputPath);
+
+ String graphTableClassName = parser.get("graphTableClassName");
+ log.info("graphTableClassName: {}", graphTableClassName);
+
+ SparkConf conf = new SparkConf();
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.registerKryoClasses(getKryoClasses());
+
+ runWithSparkSession(conf, isSparkSessionManaged,
+ spark -> {
+ removeOutputDir(spark, outputPath);
+ joinAllEntities(spark, inputRelatedEntitiesPath, inputGraphPath, outputPath);
+ });
+ }
+
+ private static void joinAllEntities(SparkSession spark, String inputRelatedEntitiesPath, String inputGraphPath, String outputPath) {
+
+ Dataset> relsBySource = readRelatedEntities(spark, inputRelatedEntitiesPath);
+ Dataset> entities = readAllEntities(spark, inputGraphPath);
+
+ entities
+ .joinWith(relsBySource, entities.col("_1").equalTo(relsBySource.col("_1")), "left_outer")
+ .map((MapFunction, Tuple2>, EntityRelEntity>) value -> {
+ EntityRelEntity re = new EntityRelEntity();
+ re.setEntity(value._1()._2());
+ Optional related = Optional.ofNullable(value._2()).map(Tuple2::_2);
+ if (related.isPresent()) {
+ re.setRelation(related.get().getRelation());
+ re.setTarget(related.get().getTarget());
+ }
+ return re;
+ }, Encoders.bean(EntityRelEntity.class))
+ .write()
+ .mode(SaveMode.Append)
+ .parquet(outputPath);
+ }
+
+ private static Dataset> readAllEntities(SparkSession spark, String inputGraphPath) {
+ return GraphMappingUtils.entityTypes.entrySet()
+ .stream()
+ .map((Function, Dataset>)
+ e -> readPathEntity(spark, inputGraphPath + "/" + e.getKey().name(), e.getValue())
+ .map((MapFunction) entity -> {
+ TypedRow t = new TypedRow();
+ t.setType(e.getKey().name());
+ t.setDeleted(entity.getDataInfo().getDeletedbyinference());
+ t.setId(entity.getId());
+ t.setOaf(OBJECT_MAPPER.writeValueAsString(entity));
+ return t;
+ }, Encoders.bean(TypedRow.class)))
+ .reduce(spark.emptyDataset(Encoders.bean(TypedRow.class)), Dataset::union)
+ .map((MapFunction>)
+ value -> new Tuple2<>(value.getId(), value),
+ Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class)));
+ }
+
+ private static Dataset> readRelatedEntities(SparkSession spark, String inputRelatedEntitiesPath) {
+ return spark.read()
+ .load(inputRelatedEntitiesPath)
+ .as(Encoders.kryo(EntityRelEntity.class))
+ .map((MapFunction>)
+ value -> new Tuple2<>(value.getRelation().getSource(), value),
+ Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class)));
+ }
+
+
+ private static Dataset readPathEntity(SparkSession spark, String inputEntityPath, Class entityClazz) {
+
+ log.info("Reading Graph table from: {}", inputEntityPath);
+ return spark
+ .read()
+ .textFile(inputEntityPath)
+ .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), Encoders.bean(entityClazz));
+ }
+
+ private static void removeOutputDir(SparkSession spark, String path) {
+ HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner_v2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner_v2.java
deleted file mode 100644
index 3ee72c3185..0000000000
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner_v2.java
+++ /dev/null
@@ -1,346 +0,0 @@
-package eu.dnetlib.dhp.oa.provision;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Maps;
-import com.jayway.jsonpath.DocumentContext;
-import com.jayway.jsonpath.JsonPath;
-import eu.dnetlib.dhp.oa.provision.model.*;
-import eu.dnetlib.dhp.oa.provision.utils.*;
-import eu.dnetlib.dhp.schema.oaf.*;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.*;
-import org.apache.spark.rdd.RDD;
-import org.apache.spark.sql.*;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.types.*;
-import org.apache.spark.util.LongAccumulator;
-import scala.Tuple2;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.*;
-
-import static org.apache.spark.sql.functions.*;
-
-import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.asRelatedEntity;
-
-/**
- * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects.
- * The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization,
- * and all the possible relationships (similarity links produced by the Dedup process are excluded).
- *
- * The operation is implemented creating the union between the entity types (E), joined by the relationships (R), and again
- * by E, finally grouped by E.id;
- *
- * Different manipulations of the E and R sets are introduced to reduce the complexity of the operation
- * 1) treat the object payload as string, extracting only the necessary information beforehand using json path,
- * it seems that deserializing it with jackson's object mapper has higher memory footprint.
- *
- * 2) only consider rels that are not virtually deleted ($.dataInfo.deletedbyinference == false)
- * 3) we only need a subset of fields from the related entities, so we introduce a distinction between E_source = S
- * and E_target = T. Objects in T are heavily pruned by all the unnecessary information
- *
- * 4) perform the join as (((T.id join R.target) union S) groupby S.id) yield S -> [ ]
- */
-public class GraphJoiner_v2 implements Serializable {
-
- private Map accumulators = Maps.newHashMap();
-
- public static final int MAX_RELS = 100;
-
- public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd";
-
- private SparkSession spark;
-
- private ContextMapper contextMapper;
-
- private String inputPath;
-
- private String outPath;
-
- private String otherDsTypeId;
-
- public GraphJoiner_v2(SparkSession spark, ContextMapper contextMapper, String otherDsTypeId, String inputPath, String outPath) {
- this.spark = spark;
- this.contextMapper = contextMapper;
- this.otherDsTypeId = otherDsTypeId;
- this.inputPath = inputPath;
- this.outPath = outPath;
-
- final SparkContext sc = spark.sparkContext();
- prepareAccumulators(sc);
- }
-
- public GraphJoiner_v2 adjacencyLists() throws IOException {
-
- final JavaSparkContext jsc = JavaSparkContext.fromSparkContext(getSpark().sparkContext());
-
- // read each entity
- Dataset datasource = readPathEntity(jsc, getInputPath(), "datasource");
- Dataset organization = readPathEntity(jsc, getInputPath(), "organization");
- Dataset project = readPathEntity(jsc, getInputPath(), "project");
- Dataset dataset = readPathEntity(jsc, getInputPath(), "dataset");
- Dataset otherresearchproduct = readPathEntity(jsc, getInputPath(), "otherresearchproduct");
- Dataset software = readPathEntity(jsc, getInputPath(), "software");
- Dataset publication = readPathEntity(jsc, getInputPath(), "publication");
-
- // create the union between all the entities
- datasource
- .union(organization)
- .union(project)
- .union(dataset)
- .union(otherresearchproduct)
- .union(software)
- .union(publication)
- .repartition(7000)
- .write()
- .partitionBy("id")
- .parquet(getOutPath() + "/entities");
-
- Dataset> entities = getSpark()
- .read()
- .load(getOutPath() + "/entities")
- .map((MapFunction>) r -> {
- TypedRow t = new TypedRow();
- t.setId(r.getAs("id"));
- t.setDeleted(r.getAs("deleted"));
- t.setType(r.getAs("type"));
- t.setOaf(r.getAs("oaf"));
-
- return new Tuple2<>(t.getId(), t);
- }, Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class)))
- .cache();
-
- System.out.println("Entities, number of partitions: " + entities.rdd().getNumPartitions());
- System.out.println("Entities schema:");
- entities.printSchema();
- System.out.println("Entities count:" + entities.count());
-
- // reads the relationships
- readPathRelation(jsc, getInputPath())
- .groupByKey((MapFunction) t -> SortableRelationKey.from(t), Encoders.kryo(SortableRelationKey.class))
- .flatMapGroups((FlatMapGroupsFunction) (key, values) -> Iterators.limit(values, MAX_RELS), Encoders.kryo(Relation.class))
- .repartition(3000)
- .write()
- .partitionBy("source", "target")
- .parquet(getOutPath() + "/relations");
-
- Dataset rels = getSpark()
- .read()
- .load(getOutPath() + "/relations")
- .map((MapFunction) r -> {
- Relation rel = new Relation();
- rel.setSource(r.getAs("source"));
- rel.setTarget(r.getAs("target"));
- rel.setRelType(r.getAs("relType"));
- rel.setSubRelType(r.getAs("subRelType"));
- rel.setRelClass(r.getAs("relClass"));
- rel.setDataInfo(r.getAs("dataInfo"));
- rel.setCollectedFrom(r.getList(r.fieldIndex("collectedFrom")));
- return rel;
- }, Encoders.kryo(Relation.class))
- .cache();
-
- System.out.println("Relation schema:");
- System.out.println("Relation, number of partitions: " + rels.rdd().getNumPartitions());
- System.out.println("Relation schema:");
- entities.printSchema();
- System.out.println("Relation count:" + rels.count());
-
- /*
- Dataset> relsByTarget = rels
- .map((MapFunction>) r -> new Tuple2<>(r.getTarget(), r), Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class)));
-
-
- relsByTarget
- .joinWith(entities, relsByTarget.col("_1").equalTo(entities.col("_1")), "inner")
- .filter((FilterFunction, Tuple2>>) value -> value._2()._2().getDeleted() == false)
- .map((MapFunction, Tuple2>, EntityRelEntity>) t -> {
- EntityRelEntity e = new EntityRelEntity();
- e.setRelation(t._1()._2());
- e.setTarget(asRelatedEntity(t._2()._2()));
- return e;
- }, Encoders.kryo(EntityRelEntity.class))
- .repartition(20000)
- .write()
- .parquet(getOutPath() + "/bySource");
-
- Dataset> bySource = getSpark()
- .read()
- .load(getOutPath() + "/bySource")
- .map(new MapFunction() {
- @Override
- public EntityRelEntity call(Row value) throws Exception {
- return null;
- }
- }, Encoders.kryo(EntityRelEntity.class))
- .map((MapFunction>) e -> new Tuple2<>(e.getRelation().getSource(), e),
- Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class)))
-
- System.out.println("bySource schema");
- bySource.printSchema();
-
-
-
-
- Dataset joined = entities
- .joinWith(bySource, entities.col("_1").equalTo(bySource.col("_1")), "left")
- .map((MapFunction, Tuple2>, EntityRelEntity>) value -> {
- EntityRelEntity re = new EntityRelEntity();
- re.setEntity(value._1()._2());
- Optional related = Optional.ofNullable(value._2()).map(Tuple2::_2);
- if (related.isPresent()) {
- re.setRelation(related.get().getRelation());
- re.setTarget(related.get().getTarget());
- }
- return re;
- }, Encoders.kryo(EntityRelEntity.class));
-
- System.out.println("joined schema");
- joined.printSchema();
- //joined.write().json(getOutPath() + "/joined");
-
- final Dataset grouped = joined
- .groupByKey((MapFunction) e -> e.getEntity(), Encoders.kryo(TypedRow.class))
- .mapGroups((MapGroupsFunction) (key, values) -> toJoinedEntity(key, values), Encoders.kryo(JoinedEntity.class));
-
- System.out.println("grouped schema");
- grouped.printSchema();
-
- final XmlRecordFactory recordFactory = new XmlRecordFactory(accumulators, contextMapper, false, schemaLocation, otherDsTypeId);
- grouped
- .map((MapFunction) value -> recordFactory.build(value), Encoders.STRING())
- .javaRDD()
- .mapToPair((PairFunction, String, String>) t -> new Tuple2<>(t._1(), t._2()))
- .saveAsHadoopFile(getOutPath() + "/xml", Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
-
-
-*/
-
- return this;
- }
-
- public SparkSession getSpark() {
- return spark;
- }
-
- public String getInputPath() {
- return inputPath;
- }
-
- public String getOutPath() {
- return outPath;
- }
-
- // HELPERS
-
- private JoinedEntity toJoinedEntity(TypedRow key, Iterator values) {
- final ObjectMapper mapper = getObjectMapper();
- final JoinedEntity j = new JoinedEntity();
- j.setType(key.getType());
- j.setEntity(parseOaf(key.getOaf(), key.getType(), mapper));
- final Links links = new Links();
- values.forEachRemaining(rel -> links.add(
- new eu.dnetlib.dhp.oa.provision.model.Tuple2(
- rel.getRelation(),
- rel.getTarget()
- )));
- j.setLinks(links);
- return j;
- }
-
- private OafEntity parseOaf(final String json, final String type, final ObjectMapper mapper) {
- try {
- switch (GraphMappingUtils.EntityType.valueOf(type)) {
- case publication:
- return mapper.readValue(json, Publication.class);
- case dataset:
- return mapper.readValue(json, eu.dnetlib.dhp.schema.oaf.Dataset.class);
- case otherresearchproduct:
- return mapper.readValue(json, OtherResearchProduct.class);
- case software:
- return mapper.readValue(json, Software.class);
- case datasource:
- return mapper.readValue(json, Datasource.class);
- case organization:
- return mapper.readValue(json, Organization.class);
- case project:
- return mapper.readValue(json, Project.class);
- default:
- throw new IllegalArgumentException("invalid type: " + type);
- }
- } catch (IOException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- /**
- * Reads a set of eu.dnetlib.dhp.schema.oaf.OafEntity objects from a new line delimited json file,
- * extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.model.TypedRow
- * @param sc
- * @param inputPath
- * @param type
- * @return the JavaPairRDD indexed by entity identifier
- */
- private Dataset readPathEntity(final JavaSparkContext sc, final String inputPath, final String type) {
- RDD rdd = sc.textFile(inputPath + "/" + type)
- .rdd();
-
- return getSpark().createDataset(rdd, Encoders.STRING())
- .map((MapFunction) s -> {
- final DocumentContext json = JsonPath.parse(s);
- final TypedRow t = new TypedRow();
- t.setId(json.read("$.id"));
- t.setDeleted(json.read("$.dataInfo.deletedbyinference"));
- t.setType(type);
- t.setOaf(s);
- return t;
- }, Encoders.bean(TypedRow.class));
- }
-
- /**
- * Reads a set of eu.dnetlib.dhp.schema.oaf.Relation objects from a sequence file ,
- * extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.model.TypedRow
- * @param sc
- * @param inputPath
- * @return the JavaRDD containing all the relationships
- */
- private Dataset readPathRelation(final JavaSparkContext sc, final String inputPath) {
- final RDD rdd = sc.textFile(inputPath + "/relation")
- .rdd();
-
- return getSpark().createDataset(rdd, Encoders.STRING())
- .map((MapFunction) s -> new ObjectMapper().readValue(s, Relation.class), Encoders.bean(Relation.class));
- }
-
- private ObjectMapper getObjectMapper() {
- return new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- }
-
- private void prepareAccumulators(SparkContext sc) {
- accumulators.put("resultResult_similarity_isAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_isAmongTopNSimilarDocuments"));
- accumulators.put("resultResult_similarity_hasAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_hasAmongTopNSimilarDocuments"));
- accumulators.put("resultResult_supplement_isSupplementTo", sc.longAccumulator("resultResult_supplement_isSupplementTo"));
- accumulators.put("resultResult_supplement_isSupplementedBy", sc.longAccumulator("resultResult_supplement_isSupplementedBy"));
- accumulators.put("resultResult_dedup_isMergedIn", sc.longAccumulator("resultResult_dedup_isMergedIn"));
- accumulators.put("resultResult_dedup_merges", sc.longAccumulator("resultResult_dedup_merges"));
-
- accumulators.put("resultResult_publicationDataset_isRelatedTo", sc.longAccumulator("resultResult_publicationDataset_isRelatedTo"));
- accumulators.put("resultResult_relationship_isRelatedTo", sc.longAccumulator("resultResult_relationship_isRelatedTo"));
- accumulators.put("resultProject_outcome_isProducedBy", sc.longAccumulator("resultProject_outcome_isProducedBy"));
- accumulators.put("resultProject_outcome_produces", sc.longAccumulator("resultProject_outcome_produces"));
- accumulators.put("resultOrganization_affiliation_isAuthorInstitutionOf", sc.longAccumulator("resultOrganization_affiliation_isAuthorInstitutionOf"));
-
- accumulators.put("resultOrganization_affiliation_hasAuthorInstitution", sc.longAccumulator("resultOrganization_affiliation_hasAuthorInstitution"));
- accumulators.put("projectOrganization_participation_hasParticipant", sc.longAccumulator("projectOrganization_participation_hasParticipant"));
- accumulators.put("projectOrganization_participation_isParticipant", sc.longAccumulator("projectOrganization_participation_isParticipant"));
- accumulators.put("organizationOrganization_dedup_isMergedIn", sc.longAccumulator("organizationOrganization_dedup_isMergedIn"));
- accumulators.put("organizationOrganization_dedup_merges", sc.longAccumulator("resultProject_outcome_produces"));
- accumulators.put("datasourceOrganization_provision_isProvidedBy", sc.longAccumulator("datasourceOrganization_provision_isProvidedBy"));
- accumulators.put("datasourceOrganization_provision_provides", sc.longAccumulator("datasourceOrganization_provision_provides"));
- }
-
-}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java
new file mode 100644
index 0000000000..19599b52c4
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java
@@ -0,0 +1,132 @@
+package eu.dnetlib.dhp.oa.provision;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.FilterFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+/**
+ * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects.
+ * The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization,
+ * and all the possible relationships (similarity links produced by the Dedup process are excluded).
+ *
+ * The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and again
+ * by E, finally grouped by E.id;
+ *
+ * The workflow is organized in different parts aimed to to reduce the complexity of the operation
+ * 1) PrepareRelationsJob:
+ * only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity
+ * can be linked at most to 100 other objects
+ *
+ * 2) JoinRelationEntityByTargetJob:
+ * prepare tuples [source entity - relation - target entity] (S - R - T):
+ * for each entity type E_i
+ * join (R.target = E_i.id),
+ * map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i]
+ * join (E_i.id = [R - T_i].source), where E_i becomes the source entity S
+ *
+ * 3) AdjacencyListBuilderJob:
+ * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mappnig the result as JoinedEntity
+ *
+ * 4) XmlConverterJob:
+ * convert the JoinedEntities as XML records
+ */
+public class PrepareRelationsJob {
+
+ private static final Logger log = LoggerFactory.getLogger(PrepareRelationsJob.class);
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ public static final int MAX_RELS = 100;
+
+ public static void main(String[] args) throws Exception {
+ String jsonConfiguration = IOUtils.toString(
+ PrepareRelationsJob.class
+ .getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json"));
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
+ parser.parseArgument(args);
+
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ String inputRelationsPath = parser.get("inputRelationsPath");
+ log.info("inputRelationsPath: {}", inputRelationsPath);
+
+ String outputPath = parser.get("outputPath");
+ log.info("outputPath: {}", outputPath);
+
+ SparkConf conf = new SparkConf();
+
+ runWithSparkSession(conf, isSparkSessionManaged,
+ spark -> {
+ removeOutputDir(spark, outputPath);
+ prepareRelationsFromPaths(spark, inputRelationsPath, outputPath);
+ });
+ }
+
+ private static void prepareRelationsFromPaths(SparkSession spark, String inputRelationsPath, String outputPath) {
+ RDD rels = readPathRelation(spark, inputRelationsPath)
+ .filter((FilterFunction) r -> r.getDataInfo().getDeletedbyinference() == false)
+ .javaRDD()
+ .mapToPair((PairFunction>) rel -> new Tuple2<>(
+ rel.getSource(),
+ Lists.newArrayList(rel)))
+ .reduceByKey((v1, v2) -> {
+ v1.addAll(v2);
+ v1.sort(SortableRelation::compareTo);
+ if (v1.size() > MAX_RELS) {
+ return v1.subList(0, MAX_RELS);
+ }
+ return new ArrayList<>(v1.subList(0, MAX_RELS));
+ })
+ .flatMap(r -> r._2().iterator())
+ .rdd();
+
+ spark.createDataset(rels, Encoders.bean(SortableRelation.class))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .parquet(outputPath);
+ }
+
+ /**
+ * Reads a Dataset of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text file,
+ *
+ * @param spark
+ * @param inputPath
+ * @return the Dataset containing all the relationships
+ */
+ private static Dataset readPathRelation(SparkSession spark, final String inputPath) {
+ return spark.read()
+ .textFile(inputPath)
+ .map((MapFunction) s -> OBJECT_MAPPER.readValue(s, SortableRelation.class),
+ Encoders.bean(SortableRelation.class));
+ }
+
+ private static void removeOutputDir(SparkSession spark, String path) {
+ HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlIndexingJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlIndexingJob.java
index 975ac75485..eae8cf1a14 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlIndexingJob.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlIndexingJob.java
@@ -2,6 +2,7 @@ package eu.dnetlib.dhp.oa.provision;
import com.lucidworks.spark.util.SolrSupport;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory;
@@ -18,6 +19,8 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerException;
@@ -28,14 +31,20 @@ import java.io.StringReader;
import java.io.StringWriter;
import java.text.SimpleDateFormat;
import java.util.Date;
+import java.util.Optional;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class SparkXmlIndexingJob {
- private static final Log log = LogFactory.getLog(SparkXmlIndexingJob.class);
+ private static final Logger log = LoggerFactory.getLogger(SparkXmlIndexingJob.class);
private static final Integer DEFAULT_BATCH_SIZE = 1000;
private static final String LAYOUT = "index";
+ private static final String INTERPRETATION = "openaire";
+ private static final String SEPARATOR = "-";
+ public static final String DATE_FORMAT = "yyyy-MM-dd'T'hh:mm:ss'Z'";
public static void main(String[] args) throws Exception {
@@ -45,48 +54,56 @@ public class SparkXmlIndexingJob {
"/eu/dnetlib/dhp/oa/provision/input_params_update_index.json")));
parser.parseArgument(args);
- final String inputPath = parser.get("sourcePath");
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final String inputPath = parser.get("inputPath");
+ log.info("inputPath: {}", inputPath);
+
final String isLookupUrl = parser.get("isLookupUrl");
+ log.info("isLookupUrl: {}", isLookupUrl);
+
final String format = parser.get("format");
+ log.info("format: {}", format);
+
final Integer batchSize = parser.getObjectMap().containsKey("batchSize") ? Integer.valueOf(parser.get("batchSize")) : DEFAULT_BATCH_SIZE;
+ log.info("batchSize: {}", batchSize);
final ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl);
final String fields = getLayoutSource(isLookup, format);
+ log.info("fields: {}", fields);
+
final String xslt = getLayoutTransformer(isLookup);
final String dsId = getDsId(format, isLookup);
+ log.info("dsId: {}", dsId);
+
final String zkHost = getZkHost(isLookup);
+ log.info("zkHost: {}", zkHost);
+
final String version = getRecordDatestamp();
final String indexRecordXslt = getLayoutTransformer(format, fields, xslt);
+ log.info("indexRecordTransformer {}", indexRecordXslt);
- log.info("indexRecordTransformer: " + indexRecordXslt);
+ final SparkConf conf = new SparkConf();
- final String master = parser.get("master");
- final SparkConf conf = new SparkConf()
- .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ runWithSparkSession(conf, isSparkSessionManaged,
+ spark -> {
+ final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
- try(SparkSession spark = getSession(conf, master)) {
+ RDD docs = sc.sequenceFile(inputPath, Text.class, Text.class)
+ .map(t -> t._2().toString())
+ .map(s -> toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), s))
+ .map(s -> new StreamingInputDocumentFactory(version, dsId).parseDocument(s))
+ .rdd();
- final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
-
- RDD docs = sc.sequenceFile(inputPath, Text.class, Text.class)
- .map(t -> t._2().toString())
- .map(s -> toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), s))
- .map(s -> new StreamingInputDocumentFactory(version, dsId).parseDocument(s))
- .rdd();
-
- SolrSupport.indexDocs(zkHost, format + "-" + LAYOUT + "-openaire", batchSize, docs);
- }
- }
-
- private static SparkSession getSession(SparkConf conf, String master) {
- return SparkSession
- .builder()
- .config(conf)
- .appName(SparkXmlIndexingJob.class.getSimpleName())
- .master(master)
- .getOrCreate();
+ final String collection = format + SEPARATOR + LAYOUT + SEPARATOR + INTERPRETATION;
+ SolrSupport.indexDocs(zkHost, collection, batchSize, docs);
+ });
}
private static String toIndexRecord(Transformer tr, final String record) {
@@ -95,7 +112,7 @@ public class SparkXmlIndexingJob {
tr.transform(new StreamSource(new StringReader(record)), res);
return res.getWriter().toString();
} catch (Throwable e) {
- System.out.println("XPathException on record:\n" + record);
+ log.error("XPathException on record: \n {}", record, e);
throw new IllegalArgumentException(e);
}
}
@@ -127,7 +144,7 @@ public class SparkXmlIndexingJob {
* @return the parsed date
*/
public static String getRecordDatestamp() {
- return new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss'Z'").format(new Date());
+ return new SimpleDateFormat(DATE_FORMAT).format(new Date());
}
/**
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlRecordBuilderJob_v2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlRecordBuilderJob_v2.java
deleted file mode 100644
index e4124e52fb..0000000000
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlRecordBuilderJob_v2.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package eu.dnetlib.dhp.oa.provision;
-
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.oa.provision.model.*;
-import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
-import eu.dnetlib.dhp.schema.oaf.*;
-import org.apache.commons.io.IOUtils;
-import org.apache.spark.SparkConf;
-import org.apache.spark.sql.SparkSession;
-
-public class SparkXmlRecordBuilderJob_v2 {
-
- public static void main(String[] args) throws Exception {
-
- final ArgumentApplicationParser parser = new ArgumentApplicationParser(
- IOUtils.toString(
- SparkXmlRecordBuilderJob_v2.class.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json")));
- parser.parseArgument(args);
-
- try(SparkSession spark = getSession(parser)) {
-
- final String inputPath = parser.get("sourcePath");
- final String outputPath = parser.get("outputPath");
- final String isLookupUrl = parser.get("isLookupUrl");
- final String otherDsTypeId = parser.get("otherDsTypeId");
-
-
- new GraphJoiner_v2(spark, ContextMapper.fromIS(isLookupUrl), otherDsTypeId, inputPath, outputPath)
- .adjacencyLists();
- }
- }
-
- private static SparkSession getSession(ArgumentApplicationParser parser) {
- final SparkConf conf = new SparkConf();
- conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- conf.set("spark.sql.shuffle.partitions", parser.get("sparkSqlShufflePartitions"));
- conf.registerKryoClasses(new Class[]{
- Author.class,
- Context.class,
- Country.class,
- DataInfo.class,
- eu.dnetlib.dhp.schema.oaf.Dataset.class,
- Datasource.class,
- ExternalReference.class,
- ExtraInfo.class,
- Field.class,
- GeoLocation.class,
- Instance.class,
- Journal.class,
- KeyValue.class,
- Oaf.class,
- OafEntity.class,
- OAIProvenance.class,
- Organization.class,
- OriginDescription.class,
- OtherResearchProduct.class,
- Project.class,
- Publication.class,
- Qualifier.class,
- Relation.class,
- Result.class,
- Software.class,
- StructuredProperty.class,
-
- TypedRow.class,
- EntityRelEntity.class,
- JoinedEntity.class,
- SortableRelationKey.class,
- Tuple2.class,
- Links.class,
- RelatedEntity.class
- });
- return SparkSession
- .builder()
- .config(conf)
- .appName(SparkXmlRecordBuilderJob_v2.class.getSimpleName())
- .master(parser.get("master"))
- .getOrCreate();
- }
-
-}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java
new file mode 100644
index 0000000000..74a36c580b
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java
@@ -0,0 +1,149 @@
+package eu.dnetlib.dhp.oa.provision;
+
+import com.google.common.collect.Maps;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.oa.provision.model.*;
+import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
+import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils;
+import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory;
+import eu.dnetlib.dhp.schema.oaf.*;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.util.LongAccumulator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.Map;
+import java.util.Optional;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+/**
+ * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects.
+ * The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization,
+ * and all the possible relationships (similarity links produced by the Dedup process are excluded).
+ *
+ * The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and again
+ * by E, finally grouped by E.id;
+ *
+ * The workflow is organized in different parts aimed to to reduce the complexity of the operation
+ * 1) PrepareRelationsJob:
+ * only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity
+ * can be linked at most to 100 other objects
+ *
+ * 2) JoinRelationEntityByTargetJob:
+ * prepare tuples [source entity - relation - target entity] (S - R - T):
+ * for each entity type E_i
+ * join (R.target = E_i.id),
+ * map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i]
+ * join (E_i.id = [R - T_i].source), where E_i becomes the source entity S
+ *
+ * 3) AdjacencyListBuilderJob:
+ * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mappnig the result as JoinedEntity
+ *
+ * 4) XmlConverterJob:
+ * convert the JoinedEntities as XML records
+ */
+public class XmlConverterJob {
+
+ private static final Logger log = LoggerFactory.getLogger(XmlConverterJob.class);
+
+ public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd";
+
+ public static void main(String[] args) throws Exception {
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils.toString(
+ XmlConverterJob.class
+ .getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json")));
+ parser.parseArgument(args);
+
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ String inputPath = parser.get("inputPath");
+ log.info("inputPath: {}", inputPath);
+
+ String outputPath = parser.get("outputPath");
+ log.info("outputPath: {}", outputPath);
+
+ String isLookupUrl = parser.get("isLookupUrl");
+ log.info("isLookupUrl: {}", isLookupUrl);
+
+ String otherDsTypeId = parser.get("otherDsTypeId");
+ log.info("otherDsTypeId: {}", otherDsTypeId);
+
+ SparkConf conf = new SparkConf();
+
+ runWithSparkSession(conf, isSparkSessionManaged,
+ spark -> {
+ removeOutputDir(spark, outputPath);
+ convertToXml(spark, inputPath, outputPath, ContextMapper.fromIS(isLookupUrl), otherDsTypeId);
+ });
+
+ }
+
+ private static void convertToXml(SparkSession spark, String inputPath, String outputPath, ContextMapper contextMapper, String otherDsTypeId) {
+
+ final XmlRecordFactory recordFactory = new XmlRecordFactory(prepareAccumulators(spark.sparkContext()), contextMapper, false, schemaLocation, otherDsTypeId);
+
+ spark.read()
+ .load(inputPath)
+ .as(Encoders.bean(JoinedEntity.class))
+ .map((MapFunction>) je -> new Tuple2<>(
+ je.getEntity().getId(),
+ recordFactory.build(je)
+ ), Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
+ .javaRDD()
+ .mapToPair((PairFunction, String, String>) t -> t)
+ .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
+ }
+
+ private static void removeOutputDir(SparkSession spark, String path) {
+ HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
+ }
+
+ private static Map prepareAccumulators(SparkContext sc) {
+ Map accumulators = Maps.newHashMap();
+ accumulators.put("resultResult_similarity_isAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_isAmongTopNSimilarDocuments"));
+ accumulators.put("resultResult_similarity_hasAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_hasAmongTopNSimilarDocuments"));
+ accumulators.put("resultResult_supplement_isSupplementTo", sc.longAccumulator("resultResult_supplement_isSupplementTo"));
+ accumulators.put("resultResult_supplement_isSupplementedBy", sc.longAccumulator("resultResult_supplement_isSupplementedBy"));
+ accumulators.put("resultResult_dedup_isMergedIn", sc.longAccumulator("resultResult_dedup_isMergedIn"));
+ accumulators.put("resultResult_dedup_merges", sc.longAccumulator("resultResult_dedup_merges"));
+
+ accumulators.put("resultResult_publicationDataset_isRelatedTo", sc.longAccumulator("resultResult_publicationDataset_isRelatedTo"));
+ accumulators.put("resultResult_relationship_isRelatedTo", sc.longAccumulator("resultResult_relationship_isRelatedTo"));
+ accumulators.put("resultProject_outcome_isProducedBy", sc.longAccumulator("resultProject_outcome_isProducedBy"));
+ accumulators.put("resultProject_outcome_produces", sc.longAccumulator("resultProject_outcome_produces"));
+ accumulators.put("resultOrganization_affiliation_isAuthorInstitutionOf", sc.longAccumulator("resultOrganization_affiliation_isAuthorInstitutionOf"));
+
+ accumulators.put("resultOrganization_affiliation_hasAuthorInstitution", sc.longAccumulator("resultOrganization_affiliation_hasAuthorInstitution"));
+ accumulators.put("projectOrganization_participation_hasParticipant", sc.longAccumulator("projectOrganization_participation_hasParticipant"));
+ accumulators.put("projectOrganization_participation_isParticipant", sc.longAccumulator("projectOrganization_participation_isParticipant"));
+ accumulators.put("organizationOrganization_dedup_isMergedIn", sc.longAccumulator("organizationOrganization_dedup_isMergedIn"));
+ accumulators.put("organizationOrganization_dedup_merges", sc.longAccumulator("resultProject_outcome_produces"));
+ accumulators.put("datasourceOrganization_provision_isProvidedBy", sc.longAccumulator("datasourceOrganization_provision_isProvidedBy"));
+ accumulators.put("datasourceOrganization_provision_provides", sc.longAccumulator("datasourceOrganization_provision_provides"));
+
+ return accumulators;
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java
index ddeec140b7..35dfa41d38 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java
@@ -1,15 +1,26 @@
package eu.dnetlib.dhp.oa.provision.model;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-
import java.io.Serializable;
public class EntityRelEntity implements Serializable {
private TypedRow entity;
- private Relation relation;
+ private SortableRelation relation;
private RelatedEntity target;
+ public EntityRelEntity() {
+ }
+
+ public EntityRelEntity(SortableRelation relation, RelatedEntity target) {
+ this(null, relation, target);
+ }
+
+ public EntityRelEntity(TypedRow entity, SortableRelation relation, RelatedEntity target) {
+ this.entity = entity;
+ this.relation = relation;
+ this.target = target;
+ }
+
public TypedRow getEntity() {
return entity;
}
@@ -18,11 +29,11 @@ public class EntityRelEntity implements Serializable {
this.entity = entity;
}
- public Relation getRelation() {
+ public SortableRelation getRelation() {
return relation;
}
- public void setRelation(Relation relation) {
+ public void setRelation(SortableRelation relation) {
this.relation = relation;
}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java
index 815863c678..4dd4348040 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java
@@ -1,22 +1,23 @@
package eu.dnetlib.dhp.oa.provision.model;
+import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import java.io.Serializable;
public class JoinedEntity implements Serializable {
- private String type;
+ private GraphMappingUtils.EntityType type;
private OafEntity entity;
private Links links;
- public String getType() {
+ public GraphMappingUtils.EntityType getType() {
return type;
}
- public void setType(String type) {
+ public void setType(GraphMappingUtils.EntityType type) {
this.type = type;
}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Links.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Links.java
index 0cb4617ec0..4ea1948766 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Links.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Links.java
@@ -1,6 +1,6 @@
package eu.dnetlib.dhp.oa.provision.model;
-import java.util.ArrayList;
+import java.util.HashSet;
-public class Links extends ArrayList {
+public class Links extends HashSet {
}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java
new file mode 100644
index 0000000000..430779c727
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java
@@ -0,0 +1,34 @@
+package eu.dnetlib.dhp.oa.provision.model;
+
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.Maps;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+
+import java.util.Map;
+
+public class SortableRelation extends Relation implements Comparable {
+
+ private final static Map weights = Maps.newHashMap();
+
+ static {
+ weights.put("outcome", 0);
+ weights.put("supplement", 1);
+ weights.put("publicationDataset", 2);
+ weights.put("relationship", 3);
+ weights.put("similarity", 4);
+ weights.put("affiliation", 5);
+
+ weights.put("provision", 6);
+ weights.put("participation", 7);
+ weights.put("dedup", 8);
+ }
+
+ @Override
+ public int compareTo(Relation o) {
+ return ComparisonChain.start()
+ .compare(weights.get(getSubRelType()), weights.get(o.getSubRelType()))
+ .compare(getSource(), o.getSource())
+ .compare(getTarget(), o.getTarget())
+ .result();
+ }
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java
index db639f1132..f1e2c652c5 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java
@@ -2,7 +2,10 @@ package eu.dnetlib.dhp.oa.provision.model;
import eu.dnetlib.dhp.schema.oaf.Relation;
-public class Tuple2 {
+import java.io.Serializable;
+import java.util.Objects;
+
+public class Tuple2 implements Serializable {
private Relation relation;
@@ -28,4 +31,18 @@ public class Tuple2 {
public void setRelatedEntity(RelatedEntity relatedEntity) {
this.relatedEntity = relatedEntity;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Tuple2 t2 = (Tuple2) o;
+ return getRelation().equals(t2.getRelation());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getRelation().hashCode());
+ }
+
}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java
index 27b42e69d5..8418db8e69 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java
@@ -1,30 +1,47 @@
package eu.dnetlib.dhp.oa.provision.utils;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.jayway.jsonpath.DocumentContext;
-import com.jayway.jsonpath.JsonPath;
-import eu.dnetlib.dhp.oa.provision.model.*;
+import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
+import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
import eu.dnetlib.dhp.schema.oaf.*;
-import net.minidev.json.JSONArray;
-import org.apache.commons.lang3.StringUtils;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.stream.Collectors;
-import static org.apache.commons.lang3.StringUtils.*;
+import static org.apache.commons.lang3.StringUtils.substringAfter;
public class GraphMappingUtils {
public static final String SEPARATOR = "_";
+ public final static Map entityTypes = Maps.newHashMap();
+
+ static {
+ entityTypes.put(EntityType.datasource, Datasource.class);
+ entityTypes.put(EntityType.organization, Organization.class);
+ entityTypes.put(EntityType.project, Project.class);
+ entityTypes.put(EntityType.dataset, Dataset.class);
+ entityTypes.put(EntityType.otherresearchproduct, OtherResearchProduct.class);
+ entityTypes.put(EntityType.software, Software.class);
+ entityTypes.put(EntityType.publication, Publication.class);
+ }
+
public enum EntityType {
- publication, dataset, otherresearchproduct, software, datasource, organization, project
+ publication, dataset, otherresearchproduct, software, datasource, organization, project;
+
+ public static EntityType fromClass(Class clazz) {
+ switch (clazz.getName()) {
+ case "eu.dnetlib.dhp.schema.oaf.Publication" : return publication;
+ case "eu.dnetlib.dhp.schema.oaf.Dataset" : return dataset;
+ case "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct" : return otherresearchproduct;
+ case "eu.dnetlib.dhp.schema.oaf.Software" : return software;
+ case "eu.dnetlib.dhp.schema.oaf.Datasource" : return datasource;
+ case "eu.dnetlib.dhp.schema.oaf.Organization" : return organization;
+ case "eu.dnetlib.dhp.schema.oaf.Project" : return project;
+ default: throw new IllegalArgumentException("Unknown OafEntity class: " + clazz.getName());
+ }
+ }
}
public enum MainEntityType {
@@ -33,8 +50,6 @@ public class GraphMappingUtils {
public static Set authorPidTypes = Sets.newHashSet("orcid", "magidentifier");
- public static Set instanceFieldFilter = Sets.newHashSet("instancetype", "hostedby", "license", "accessright", "collectedfrom", "dateofacceptance", "distributionlocation");
-
private static final String schemeTemplate = "dnet:%s_%s_relations";
private static Map entityMapping = Maps.newHashMap();
@@ -49,6 +64,38 @@ public class GraphMappingUtils {
entityMapping.put(EntityType.project, MainEntityType.project);
}
+ public static Class[] getKryoClasses() {
+ return new Class[]{
+ Author.class,
+ Context.class,
+ Country.class,
+ DataInfo.class,
+ eu.dnetlib.dhp.schema.oaf.Dataset.class,
+ Datasource.class,
+ ExternalReference.class,
+ ExtraInfo.class,
+ Field.class,
+ GeoLocation.class,
+ Instance.class,
+ Journal.class,
+ KeyValue.class,
+ Oaf.class,
+ OafEntity.class,
+ OAIProvenance.class,
+ Organization.class,
+ OriginDescription.class,
+ OtherResearchProduct.class,
+ Project.class,
+ Publication.class,
+ Qualifier.class,
+ Relation.class,
+ SortableRelation.class, //SUPPORT
+ Result.class,
+ Software.class,
+ StructuredProperty.class
+ };
+ }
+
public static String getScheme(final String sourceType, final String targetType) {
return String.format(schemeTemplate,
entityMapping.get(EntityType.valueOf(sourceType)).name(),
@@ -63,152 +110,81 @@ public class GraphMappingUtils {
return MainEntityType.result.name().equals(getMainType(type));
}
- public static RelatedEntity asRelatedEntity(TypedRow e) {
+ public static RelatedEntity asRelatedEntity(E entity, Class clazz) {
- final DocumentContext j = JsonPath.parse(e.getOaf());
final RelatedEntity re = new RelatedEntity();
- re.setId(j.read("$.id"));
- re.setType(e.getType());
+ re.setId(entity.getId());
+ re.setType(clazz.getName());
- switch (EntityType.valueOf(e.getType())) {
+ re.setPid(entity.getPid());
+ re.setCollectedfrom(entity.getCollectedfrom());
+
+ switch (GraphMappingUtils.EntityType.fromClass(clazz)) {
case publication:
case dataset:
case otherresearchproduct:
case software:
- mapTitle(j, re);
- re.setDateofacceptance(j.read("$.dateofacceptance.value"));
- re.setPublisher(j.read("$.publisher.value"));
- JSONArray pids = j.read("$.pid");
- re.setPid(pids.stream()
- .map(p -> asStructuredProperty((LinkedHashMap) p))
- .collect(Collectors.toList()));
+ Result r = (Result) entity;
- re.setResulttype(asQualifier(j.read("$.resulttype")));
+ if (r.getTitle() == null && !r.getTitle().isEmpty()) {
+ re.setTitle(r.getTitle().stream().findFirst().get());
+ }
- JSONArray collfrom = j.read("$.collectedfrom");
- re.setCollectedfrom(collfrom.stream()
- .map(c -> asKV((LinkedHashMap) c))
- .collect(Collectors.toList()));
-
- // will throw exception when the instance is not found
- JSONArray instances = j.read("$.instance");
- re.setInstances(instances.stream()
- .map(i -> {
- final LinkedHashMap p = (LinkedHashMap) i;
- final Field license = new Field();
- license.setValue((String) ((LinkedHashMap) p.get("license")).get("value"));
- final Instance instance = new Instance();
- instance.setLicense(license);
- instance.setAccessright(asQualifier((LinkedHashMap) p.get("accessright")));
- instance.setInstancetype(asQualifier((LinkedHashMap) p.get("instancetype")));
- instance.setHostedby(asKV((LinkedHashMap) p.get("hostedby")));
- //TODO mapping of distributionlocation
- instance.setCollectedfrom(asKV((LinkedHashMap) p.get("collectedfrom")));
-
- Field dateofacceptance = new Field();
- dateofacceptance.setValue((String) ((LinkedHashMap) p.get("dateofacceptance")).get("value"));
- instance.setDateofacceptance(dateofacceptance);
- return instance;
- }).collect(Collectors.toList()));
+ re.setDateofacceptance(getValue(r.getDateofacceptance()));
+ re.setPublisher(getValue(r.getPublisher()));
+ re.setResulttype(re.getResulttype());
+ re.setInstances(re.getInstances());
//TODO still to be mapped
//re.setCodeRepositoryUrl(j.read("$.coderepositoryurl"));
break;
case datasource:
- re.setOfficialname(j.read("$.officialname.value"));
- re.setWebsiteurl(j.read("$.websiteurl.value"));
- re.setDatasourcetype(asQualifier(j.read("$.datasourcetype")));
- re.setOpenairecompatibility(asQualifier(j.read("$.openairecompatibility")));
+ Datasource d = (Datasource) entity;
+
+ re.setOfficialname(getValue(d.getOfficialname()));
+ re.setWebsiteurl(getValue(d.getWebsiteurl()));
+ re.setDatasourcetype(d.getDatasourcetype());
+ re.setOpenairecompatibility(d.getOpenairecompatibility());
break;
case organization:
- re.setLegalname(j.read("$.legalname.value"));
- re.setLegalshortname(j.read("$.legalshortname.value"));
- re.setCountry(asQualifier(j.read("$.country")));
- re.setWebsiteurl(j.read("$.websiteurl.value"));
+ Organization o = (Organization) entity;
+
+ re.setLegalname(getValue(o.getLegalname()));
+ re.setLegalshortname(getValue(o.getLegalshortname()));
+ re.setCountry(o.getCountry());
+ re.setWebsiteurl(getValue(o.getWebsiteurl()));
break;
case project:
- re.setProjectTitle(j.read("$.title.value"));
- re.setCode(j.read("$.code.value"));
- re.setAcronym(j.read("$.acronym.value"));
- re.setContracttype(asQualifier(j.read("$.contracttype")));
+ Project p = (Project) entity;
- JSONArray f = j.read("$.fundingtree");
+ re.setProjectTitle(getValue(p.getTitle()));
+ re.setCode(getValue(p.getCode()));
+ re.setAcronym(getValue(p.getAcronym()));
+ re.setContracttype(p.getContracttype());
+
+ List> f = p.getFundingtree();
if (!f.isEmpty()) {
re.setFundingtree(f.stream()
- .map(s -> ((LinkedHashMap) s).get("value"))
+ .map(s -> s.getValue())
.collect(Collectors.toList()));
}
-
break;
}
-
return re;
}
-
- private static KeyValue asKV(LinkedHashMap j) {
- final KeyValue kv = new KeyValue();
- kv.setKey((String) j.get("key"));
- kv.setValue((String) j.get("value"));
- return kv;
+ private static String getValue(Field field) {
+ return getFieldValueWithDefault(field, "");
}
- private static void mapTitle(DocumentContext j, RelatedEntity re) {
- final JSONArray a = j.read("$.title");
- if (!a.isEmpty()) {
- final StructuredProperty sp = asStructuredProperty((LinkedHashMap) a.get(0));
- if (StringUtils.isNotBlank(sp.getValue())) {
- re.setTitle(sp);
- }
- }
- }
-
- private static StructuredProperty asStructuredProperty(LinkedHashMap j) {
- final StructuredProperty sp = new StructuredProperty();
- final String value = (String) j.get("value");
- if (StringUtils.isNotBlank(value)) {
- sp.setValue((String) j.get("value"));
- sp.setQualifier(asQualifier((LinkedHashMap) j.get("qualifier")));
- }
- return sp;
- }
-
- public static Qualifier asQualifier(LinkedHashMap j) {
- final Qualifier q = new Qualifier();
-
- final String classid = j.get("classid");
- if (StringUtils.isNotBlank(classid)) {
- q.setClassid(classid);
- }
-
- final String classname = j.get("classname");
- if (StringUtils.isNotBlank(classname)) {
- q.setClassname(classname);
- }
-
- final String schemeid = j.get("schemeid");
- if (StringUtils.isNotBlank(schemeid)) {
- q.setSchemeid(schemeid);
- }
-
- final String schemename = j.get("schemename");
- if (StringUtils.isNotBlank(schemename)) {
- q.setSchemename(schemename);
- }
- return q;
- }
-
- public static String serialize(final Object o) {
- try {
- return new ObjectMapper()
- .setSerializationInclusion(JsonInclude.Include.NON_NULL)
- .writeValueAsString(o);
- } catch (JsonProcessingException e) {
- throw new IllegalArgumentException("unable to serialize: " + o.toString(), e);
- }
+ private static T getFieldValueWithDefault(Field f, T defaultValue) {
+ return Optional.ofNullable(f)
+ .filter(Objects::nonNull)
+ .map(x -> x.getValue())
+ .orElse(defaultValue);
}
public static String removePrefix(final String s) {
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json
index bbac579feb..e57df9b09e 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json
@@ -1,8 +1,14 @@
[
- {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
- {"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true},
- {"paramName":"o", "paramLongName":"outputPath", "paramDescription": "the path used to store temporary output files", "paramRequired": true},
- {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequence file to read", "paramRequired": true},
- {"paramName":"t", "paramLongName":"otherDsTypeId", "paramDescription": "list of datasource types to populate field datasourcetypeui", "paramRequired": true},
- {"paramName":"sp", "paramLongName":"sparkSqlShufflePartitions", "paramDescription": "Configures the number of partitions to use when shuffling data for joins or aggregations", "paramRequired": true}
+ {
+ "paramName": "in",
+ "paramLongName": "inputPath",
+ "paramDescription": "the path of the sequence file to read",
+ "paramRequired": true
+ },
+ {
+ "paramName": "out",
+ "paramLongName": "outputPath",
+ "paramDescription": "the path used to store temporary output files",
+ "paramRequired": true
+ }
]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json
new file mode 100644
index 0000000000..043129c9f5
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json
@@ -0,0 +1,20 @@
+[
+ {
+ "paramName": "issm",
+ "paramLongName": "isSparkSessionManaged",
+ "paramDescription": "when true will stop SparkSession after job execution",
+ "paramRequired": false
+ },
+ {
+ "paramName": "irp",
+ "paramLongName": "inputRelationsPath",
+ "paramDescription": "path to input relations prepare",
+ "paramRequired": true
+ },
+ {
+ "paramName": "op",
+ "paramLongName": "outputPath",
+ "paramDescription": "root output location for prepared relations",
+ "paramRequired": true
+ }
+]
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase1.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase1.json
new file mode 100644
index 0000000000..0090716d69
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase1.json
@@ -0,0 +1,32 @@
+[
+ {
+ "paramName": "issm",
+ "paramLongName": "isSparkSessionManaged",
+ "paramDescription": "when true will stop SparkSession after job execution",
+ "paramRequired": false
+ },
+ {
+ "paramName": "irp",
+ "paramLongName": "inputRelationsPath",
+ "paramDescription": "path to input relations from the graph",
+ "paramRequired": true
+ },
+ {
+ "paramName": "iep",
+ "paramLongName": "inputEntityPath",
+ "paramDescription": "path to input entity from the graph",
+ "paramRequired": true
+ },
+ {
+ "paramName": "clazz",
+ "paramLongName": "graphTableClassName",
+ "paramDescription": "class name associated to the input entity path",
+ "paramRequired": true
+ },
+ {
+ "paramName": "op",
+ "paramLongName": "outputPath",
+ "paramDescription": "root output location for prepared relations",
+ "paramRequired": true
+ }
+]
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json
new file mode 100644
index 0000000000..cb7949d492
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json
@@ -0,0 +1,26 @@
+[
+ {
+ "paramName": "issm",
+ "paramLongName": "isSparkSessionManaged",
+ "paramDescription": "when true will stop SparkSession after job execution",
+ "paramRequired": false
+ },
+ {
+ "paramName": "irp",
+ "paramLongName": "inputRelatedEntitiesPath",
+ "paramDescription": "path to input relations from the graph",
+ "paramRequired": true
+ },
+ {
+ "paramName": "iep",
+ "paramLongName": "inputGraphPath",
+ "paramDescription": "root graph path",
+ "paramRequired": true
+ },
+ {
+ "paramName": "op",
+ "paramLongName": "outputPath",
+ "paramDescription": "root output location for prepared relations",
+ "paramRequired": true
+ }
+]
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json
index 0d45e9e29f..146cc9943b 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json
@@ -1,7 +1,7 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true},
- {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequence file to read the XML records", "paramRequired": true},
+ {"paramName":"i", "paramLongName":"inputPath", "paramDescription": "the path of the sequence file to read the XML records", "paramRequired": true},
{"paramName":"f", "paramLongName":"format", "paramDescription": "MDFormat name found in the IS profile", "paramRequired": true},
{"paramName":"b", "paramLongName":"batchSize", "paramDescription": "size of the batch of documents sent to solr", "paramRequired": false}
]
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json
new file mode 100644
index 0000000000..32720514e1
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json
@@ -0,0 +1,26 @@
+[
+ {
+ "paramName": "in",
+ "paramLongName": "inputPath",
+ "paramDescription": "the path of the sequence file to read",
+ "paramRequired": true
+ },
+ {
+ "paramName": "out",
+ "paramLongName": "outputPath",
+ "paramDescription": "the path used to store temporary output files",
+ "paramRequired": true
+ },
+ {
+ "paramName": "ilu",
+ "paramLongName": "isLookupUrl",
+ "paramDescription": "URL of the isLookUp Service",
+ "paramRequired": true
+ },
+ {
+ "paramName": "odt",
+ "paramLongName": "otherDsTypeId",
+ "paramDescription": "list of datasource types to populate field datasourcetypeui",
+ "paramRequired": true
+ }
+]
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml
index 194cd43c81..5168215099 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml
@@ -1,6 +1,11 @@
+
+ inputGraphRootPath
+ root location of input materialized graph
+
+
sparkDriverMemoryForJoining
memory for driver process
@@ -64,7 +69,7 @@
- ${wf:conf('reuseRecords') eq false}
+ ${wf:conf('reuseRecords') eq false}
${wf:conf('reuseRecords') eq true}
@@ -74,16 +79,12 @@
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
+
-
-
-
-
yarn
cluster
- build_adjacency_lists
- eu.dnetlib.dhp.oa.provision.SparkXmlRecordBuilderJob_v2
+ PrepareRelations
+ eu.dnetlib.dhp.oa.provision.PrepareRelationsJob
dhp-graph-provision-${projectVersion}.jar
--executor-cores=${sparkExecutorCoresForJoining}
@@ -94,12 +95,135 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- -mt yarn
- -is ${isLookupUrl}
- -t ${otherDsTypeId}
- -s${sourcePath}
- -o${outputPath}
- -sp${sparkSqlShufflePartitions}
+ --inputRelationsPath${inputGraphRootPath}/relation
+ --outputPath${workingDir}/relation
+
+
+
+
+
+
+
+ yarn
+ cluster
+ Join[relation.target = publication.id]
+ eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1
+ dhp-graph-provision-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCoresForJoining}
+ --executor-memory=${sparkExecutorMemoryForJoining}
+ --driver-memory=${sparkDriverMemoryForJoining}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --inputRelationsPath${workingDir}/relations
+ --inputEntityPath${inputGraphRootPath}/publication
+ --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication
+ --outputPath${workingDir}/join_partial
+
+
+
+
+
+
+
+ yarn
+ cluster
+ Join[relation.target = dataset.id]
+ eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1
+ dhp-graph-provision-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCoresForJoining}
+ --executor-memory=${sparkExecutorMemoryForJoining}
+ --driver-memory=${sparkDriverMemoryForJoining}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --inputRelationsPath${workingDir}/relations
+ --inputEntityPath${inputGraphRootPath}/dataset
+ --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset
+ --outputPath${workingDir}/join_partial
+
+
+
+
+
+
+
+ yarn
+ cluster
+ Join[relation.target = dataset.id]
+ eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2
+ dhp-graph-provision-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCoresForJoining}
+ --executor-memory=${sparkExecutorMemoryForJoining}
+ --driver-memory=${sparkDriverMemoryForJoining}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --inputRelatedEntitiesPath${workingDir}/join_partial
+ --inputEntityPath${inputGraphRootPath}
+ --outputPath${workingDir}/join_entities
+
+
+
+
+
+
+
+ yarn
+ cluster
+ build_adjacency_lists
+ eu.dnetlib.dhp.oa.provision.AdjacencyListBuilderJob
+ dhp-graph-provision-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCoresForJoining}
+ --executor-memory=${sparkExecutorMemoryForJoining}
+ --driver-memory=${sparkDriverMemoryForJoining}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --inputPath ${${workingDir}/join_entities
+ --outputPath${workingDir}/joined
+
+
+
+
+
+
+
+ yarn
+ cluster
+ build_adjacency_lists
+ eu.dnetlib.dhp.oa.provision.XmlConverterJob
+ dhp-graph-provision-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCoresForJoining}
+ --executor-memory=${sparkExecutorMemoryForJoining}
+ --driver-memory=${sparkDriverMemoryForJoining}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --inputPath${${workingDir}/joined
+ --outputPath${workingDir}/xml
+ --isLookupUrl${isLookupUrl}
+ --otherDsTypeId${otherDsTypeId}
@@ -122,9 +246,8 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- -mt yarn
- -is ${isLookupUrl}
- --sourcePath${outputPath}/xml
+ --isLookupUrl ${isLookupUrl}
+ --inputPath${workingDir}/xml
--format${format}
--batchSize${batchSize}