From 3d1b637cab7628051d15fb60855cd7edcfc3aed8 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Sat, 4 Apr 2020 14:03:43 +0200 Subject: [PATCH] dataset based provision WIP --- dhp-common/pom.xml | 20 + .../common/FunctionalInterfaceSupport.java | 56 +++ .../eu/dnetlib/dhp/common/HdfsSupport.java | 57 +++ .../dhp/common/SparkSessionSupport.java | 57 +++ .../dnetlib/dhp/common/ThrowingSupport.java | 76 ++++ .../dnetlib/dhp/common/HdfsSupportTest.java | 78 ++++ .../dnetlib/dhp/common/ModelSupportTest.java | 36 ++ .../dhp/common/SparkSessionSupportTest.java | 54 +++ .../dhp/schema/common/ModelSupport.java | 51 +++ .../eu/dnetlib/dhp/schema/oaf/Relation.java | 3 +- .../oa/provision/AdjacencyListBuilderJob.java | 167 +++++++++ .../CreateRelatedEntitiesJob_phase1.java | 157 ++++++++ .../CreateRelatedEntitiesJob_phase2.java | 168 +++++++++ .../dhp/oa/provision/GraphJoiner_v2.java | 346 ------------------ .../dhp/oa/provision/PrepareRelationsJob.java | 132 +++++++ .../dhp/oa/provision/SparkXmlIndexingJob.java | 73 ++-- .../SparkXmlRecordBuilderJob_v2.java | 81 ---- .../dhp/oa/provision/XmlConverterJob.java | 149 ++++++++ .../oa/provision/model/EntityRelEntity.java | 21 +- .../dhp/oa/provision/model/JoinedEntity.java | 7 +- .../dnetlib/dhp/oa/provision/model/Links.java | 4 +- .../oa/provision/model/SortableRelation.java | 34 ++ .../dhp/oa/provision/model/Tuple2.java | 19 +- .../oa/provision/utils/GraphMappingUtils.java | 232 ++++++------ .../input_params_build_adjacency_lists.json | 18 +- .../input_params_prepare_relations.json | 20 + ...input_params_related_entities_pahase1.json | 32 ++ ...input_params_related_entities_pahase2.json | 26 ++ .../provision/input_params_update_index.json | 2 +- .../provision/input_params_xml_converter.json | 26 ++ .../dhp/oa/provision/oozie_app/workflow.xml | 157 +++++++- 31 files changed, 1739 insertions(+), 620 deletions(-) create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/common/SparkSessionSupport.java create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/common/ThrowingSupport.java create mode 100644 dhp-common/src/test/java/eu/dnetlib/dhp/common/HdfsSupportTest.java create mode 100644 dhp-common/src/test/java/eu/dnetlib/dhp/common/ModelSupportTest.java create mode 100644 dhp-common/src/test/java/eu/dnetlib/dhp/common/SparkSessionSupportTest.java create mode 100644 dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java delete mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner_v2.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java delete mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlRecordBuilderJob_v2.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase1.json create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml index 1268afa3a..d224ebc9f 100644 --- a/dhp-common/pom.xml +++ b/dhp-common/pom.xml @@ -13,6 +13,26 @@ jar + + + eu.dnetlib.dhp + dhp-schemas + ${project.version} + + + + org.apache.hadoop + hadoop-common + + + org.apache.spark + spark-core_2.11 + + + org.apache.spark + spark-sql_2.11 + + commons-cli commons-cli diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java new file mode 100644 index 000000000..d78520f55 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java @@ -0,0 +1,56 @@ +package eu.dnetlib.dhp.common; + +import java.io.Serializable; +import java.util.function.Supplier; + +/** + * Provides serializable and throwing extensions to standard functional interfaces. + */ +public class FunctionalInterfaceSupport { + + private FunctionalInterfaceSupport() { + } + + /** + * Serializable supplier of any kind of objects. To be used withing spark processing pipelines when supplying + * functions externally. + * + * @param + */ + @FunctionalInterface + public interface SerializableSupplier extends Supplier, Serializable { + } + + /** + * Extension of consumer accepting functions throwing an exception. + * + * @param + * @param + */ + @FunctionalInterface + public interface ThrowingConsumer { + void accept(T t) throws E; + } + + /** + * Extension of supplier accepting functions throwing an exception. + * + * @param + * @param + */ + @FunctionalInterface + public interface ThrowingSupplier { + T get() throws E; + } + + /** + * Extension of runnable accepting functions throwing an exception. + * + * @param + */ + @FunctionalInterface + public interface ThrowingRunnable { + void run() throws E; + } + +} \ No newline at end of file diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java new file mode 100644 index 000000000..05beaa51e --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java @@ -0,0 +1,57 @@ +package eu.dnetlib.dhp.common; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static eu.dnetlib.dhp.common.ThrowingSupport.rethrowAsRuntimeException; + +/** + * HDFS utility methods. + */ +public class HdfsSupport { + private static final Logger logger = LoggerFactory.getLogger(HdfsSupport.class); + + private HdfsSupport() { + } + + /** + * Removes a path (file or dir) from HDFS. + * + * @param path Path to be removed + * @param configuration Configuration of hadoop env + */ + public static void remove(String path, Configuration configuration) { + logger.info("Removing path: {}", path); + rethrowAsRuntimeException(() -> { + Path f = new Path(path); + FileSystem fileSystem = FileSystem.get(configuration); + if (fileSystem.exists(f)) { + fileSystem.delete(f, true); + } + }); + } + + /** + * Lists hadoop files located below path or alternatively lists subdirs under path. + * + * @param path Path to be listed for hadoop files + * @param configuration Configuration of hadoop env + * @return List with string locations of hadoop files + */ + public static List listFiles(String path, Configuration configuration) { + logger.info("Listing files in path: {}", path); + return rethrowAsRuntimeException(() -> Arrays + .stream(FileSystem.get(configuration).listStatus(new Path(path))) + .filter(FileStatus::isDirectory) + .map(x -> x.getPath().toString()) + .collect(Collectors.toList())); + } +} diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/SparkSessionSupport.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/SparkSessionSupport.java new file mode 100644 index 000000000..f42ee1c58 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/SparkSessionSupport.java @@ -0,0 +1,57 @@ +package eu.dnetlib.dhp.common; + +import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.ThrowingConsumer; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; + +import java.util.Objects; +import java.util.function.Function; + +/** + * SparkSession utility methods. + */ +public class SparkSessionSupport { + + private SparkSessionSupport() { + } + + /** + * Runs a given function using SparkSession created using default builder and supplied SparkConf. Stops SparkSession + * when SparkSession is managed. Allows to reuse SparkSession created externally. + * + * @param conf SparkConf instance + * @param isSparkSessionManaged When true will stop SparkSession + * @param fn Consumer to be applied to constructed SparkSession + */ + public static void runWithSparkSession(SparkConf conf, + Boolean isSparkSessionManaged, + ThrowingConsumer fn) { + runWithSparkSession(c -> SparkSession.builder().config(c).getOrCreate(), conf, isSparkSessionManaged, fn); + } + + /** + * Runs a given function using SparkSession created using supplied builder and supplied SparkConf. Stops SparkSession + * when SparkSession is managed. Allows to reuse SparkSession created externally. + * + * @param sparkSessionBuilder Builder of SparkSession + * @param conf SparkConf instance + * @param isSparkSessionManaged When true will stop SparkSession + * @param fn Consumer to be applied to constructed SparkSession + */ + public static void runWithSparkSession(Function sparkSessionBuilder, + SparkConf conf, + Boolean isSparkSessionManaged, + ThrowingConsumer fn) { + SparkSession spark = null; + try { + spark = sparkSessionBuilder.apply(conf); + fn.accept(spark); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + if (Objects.nonNull(spark) && isSparkSessionManaged) { + spark.stop(); + } + } + } +} \ No newline at end of file diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/ThrowingSupport.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/ThrowingSupport.java new file mode 100644 index 000000000..b32803c37 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/ThrowingSupport.java @@ -0,0 +1,76 @@ +package eu.dnetlib.dhp.common; + +import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.ThrowingRunnable; +import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.ThrowingSupplier; + +/** + * Exception handling utility methods. + */ +public class ThrowingSupport { + + private ThrowingSupport() { + } + + /** + * Executes given runnable and rethrows any exceptions as RuntimeException. + * + * @param fn Runnable to be executed + * @param Type of exception thrown + */ + public static void rethrowAsRuntimeException(ThrowingRunnable fn) { + try { + fn.run(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Executes given runnable and rethrows any exceptions as RuntimeException with custom message. + * + * @param fn Runnable to be executed + * @param msg Message to be set for rethrown exception + * @param Type of exception thrown + */ + public static void rethrowAsRuntimeException(ThrowingRunnable fn, String msg) { + try { + fn.run(); + } catch (Exception e) { + throw new RuntimeException(msg, e); + } + } + + /** + * Executes given supplier and rethrows any exceptions as RuntimeException. + * + * @param fn Supplier to be executed + * @param Type of returned value + * @param Type of exception thrown + * @return Result of supplier execution + */ + public static T rethrowAsRuntimeException(ThrowingSupplier fn) { + try { + return fn.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Executes given supplier and rethrows any exceptions as RuntimeException with custom message. + * + * @param fn Supplier to be executed + * @param msg Message to be set for rethrown exception + * @param Type of returned value + * @param Type of exception thrown + * @return Result of supplier execution + */ + public static T rethrowAsRuntimeException(ThrowingSupplier fn, String msg) { + try { + return fn.get(); + } catch (Exception e) { + throw new RuntimeException(msg, e); + } + } + +} \ No newline at end of file diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/common/HdfsSupportTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/common/HdfsSupportTest.java new file mode 100644 index 000000000..f1e790ee7 --- /dev/null +++ b/dhp-common/src/test/java/eu/dnetlib/dhp/common/HdfsSupportTest.java @@ -0,0 +1,78 @@ +package eu.dnetlib.dhp.common; + +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.*; + +public class HdfsSupportTest { + + @Nested + class Remove { + + @Test + public void shouldThrowARuntimeExceptionOnError() { + // when + assertThrows(RuntimeException.class, () -> + HdfsSupport.remove(null, new Configuration())); + } + + @Test + public void shouldRemoveADirFromHDFS(@TempDir Path tempDir) { + // when + HdfsSupport.remove(tempDir.toString(), new Configuration()); + + // then + assertFalse(Files.exists(tempDir)); + } + + @Test + public void shouldRemoveAFileFromHDFS(@TempDir Path tempDir) throws IOException { + // given + Path file = Files.createTempFile(tempDir, "p", "s"); + + // when + HdfsSupport.remove(file.toString(), new Configuration()); + + // then + assertFalse(Files.exists(file)); + } + } + + @Nested + class ListFiles { + + @Test + public void shouldThrowARuntimeExceptionOnError() { + // when + assertThrows(RuntimeException.class, () -> + HdfsSupport.listFiles(null, new Configuration())); + } + + @Test + public void shouldListFilesLocatedInPath(@TempDir Path tempDir) throws IOException { + Path subDir1 = Files.createTempDirectory(tempDir, "list_me"); + Path subDir2 = Files.createTempDirectory(tempDir, "list_me"); + + // when + List paths = HdfsSupport.listFiles(tempDir.toString(), new Configuration()); + + // then + assertEquals(2, paths.size()); + List expecteds = Arrays.stream(new String[]{subDir1.toString(), subDir2.toString()}) + .sorted().collect(Collectors.toList()); + List actuals = paths.stream().sorted().collect(Collectors.toList()); + assertTrue(actuals.get(0).contains(expecteds.get(0))); + assertTrue(actuals.get(1).contains(expecteds.get(1))); + } + } +} \ No newline at end of file diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/common/ModelSupportTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/common/ModelSupportTest.java new file mode 100644 index 000000000..bfed019e9 --- /dev/null +++ b/dhp-common/src/test/java/eu/dnetlib/dhp/common/ModelSupportTest.java @@ -0,0 +1,36 @@ +package eu.dnetlib.dhp.common; + +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.Result; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ModelSupportTest { + + @Nested + class IsSubClass { + + @Test + public void shouldReturnFalseWhenSubClassDoesNotExtendSuperClass() { + // when + Boolean result = ModelSupport.isSubClass(Relation.class, OafEntity.class); + + // then + assertFalse(result); + } + + @Test + public void shouldReturnTrueWhenSubClassExtendsSuperClass() { + // when + Boolean result = ModelSupport.isSubClass(Result.class, OafEntity.class); + + // then + assertTrue(result); + } + } +} \ No newline at end of file diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/common/SparkSessionSupportTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/common/SparkSessionSupportTest.java new file mode 100644 index 000000000..bc2dce3cf --- /dev/null +++ b/dhp-common/src/test/java/eu/dnetlib/dhp/common/SparkSessionSupportTest.java @@ -0,0 +1,54 @@ +package eu.dnetlib.dhp.common; + +import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.ThrowingConsumer; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.util.function.Function; + +import static org.mockito.Mockito.*; + +public class SparkSessionSupportTest { + + @Nested + class RunWithSparkSession { + + @Test + public void shouldExecuteFunctionAndNotStopSparkSessionWhenSparkSessionIsNotManaged() throws Exception { + // given + SparkSession spark = mock(SparkSession.class); + SparkConf conf = mock(SparkConf.class); + Function sparkSessionBuilder = mock(Function.class); + when(sparkSessionBuilder.apply(conf)).thenReturn(spark); + ThrowingConsumer fn = mock(ThrowingConsumer.class); + + // when + SparkSessionSupport.runWithSparkSession(sparkSessionBuilder, conf, false, fn); + + // then + verify(sparkSessionBuilder).apply(conf); + verify(fn).accept(spark); + verify(spark, never()).stop(); + } + + @Test + public void shouldExecuteFunctionAndStopSparkSessionWhenSparkSessionIsManaged() throws Exception { + // given + SparkSession spark = mock(SparkSession.class); + SparkConf conf = mock(SparkConf.class); + Function sparkSessionBuilder = mock(Function.class); + when(sparkSessionBuilder.apply(conf)).thenReturn(spark); + ThrowingConsumer fn = mock(ThrowingConsumer.class); + + // when + SparkSessionSupport.runWithSparkSession(sparkSessionBuilder, conf, true, fn); + + // then + verify(sparkSessionBuilder).apply(conf); + verify(fn).accept(spark); + verify(spark, times(1)).stop(); + } + } +} \ No newline at end of file diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java new file mode 100644 index 000000000..3c774aa38 --- /dev/null +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java @@ -0,0 +1,51 @@ +package eu.dnetlib.dhp.schema.common; + +import eu.dnetlib.dhp.schema.oaf.Oaf; + +/** + * Inheritance utility methods. + */ +public class ModelSupport { + + private ModelSupport() { + } + + /** + * Checks subclass-superclass relationship. + * + * @param subClazzObject Subclass object instance + * @param superClazzObject Superclass object instance + * @param Subclass type + * @param Superclass type + * @return True if X is a subclass of Y + */ + public static Boolean isSubClass(X subClazzObject, Y superClazzObject) { + return isSubClass(subClazzObject.getClass(), superClazzObject.getClass()); + } + + /** + * Checks subclass-superclass relationship. + * + * @param subClazzObject Subclass object instance + * @param superClazz Superclass class + * @param Subclass type + * @param Superclass type + * @return True if X is a subclass of Y + */ + public static Boolean isSubClass(X subClazzObject, Class superClazz) { + return isSubClass(subClazzObject.getClass(), superClazz); + } + + /** + * Checks subclass-superclass relationship. + * + * @param subClazz Subclass class + * @param superClazz Superclass class + * @param Subclass type + * @param Superclass type + * @return True if X is a subclass of Y + */ + public static Boolean isSubClass(Class subClazz, Class superClazz) { + return superClazz.isAssignableFrom(subClazz); + } +} \ No newline at end of file diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java index 6738b8693..e2471cd89 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java @@ -92,8 +92,7 @@ public class Relation extends Oaf { subRelType.equals(relation.subRelType) && relClass.equals(relation.relClass) && source.equals(relation.source) && - target.equals(relation.target) && - Objects.equals(collectedFrom, relation.collectedFrom); + target.equals(relation.target); } @Override diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java new file mode 100644 index 000000000..dcb3ac171 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java @@ -0,0 +1,167 @@ +package eu.dnetlib.dhp.oa.provision; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.provision.model.*; +import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; +import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.IOException; +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*; + +/** + * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. + * The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, + * and all the possible relationships (similarity links produced by the Dedup process are excluded). + * + * The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and again + * by E, finally grouped by E.id; + * + * The workflow is organized in different parts aimed to to reduce the complexity of the operation + * 1) PrepareRelationsJob: + * only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity + * can be linked at most to 100 other objects + * + * 2) JoinRelationEntityByTargetJob: + * prepare tuples [source entity - relation - target entity] (S - R - T): + * for each entity type E_i + * join (R.target = E_i.id), + * map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i] + * join (E_i.id = [R - T_i].source), where E_i becomes the source entity S + * + * 3) AdjacencyListBuilderJob: + * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mappnig the result as JoinedEntity + * + * 4) XmlConverterJob: + * convert the JoinedEntities as XML records + */ +public class AdjacencyListBuilderJob { + + private static final Logger log = LoggerFactory.getLogger(AdjacencyListBuilderJob.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils.toString( + AdjacencyListBuilderJob.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json"))); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("inputPath"); + log.info("inputPath: {}", inputPath); + + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(getKryoClasses()); + + runWithSparkSession(conf, isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + createAdjacencyLists(spark, inputPath, outputPath); + }); + + } + + private static void createAdjacencyLists(SparkSession spark, String inputPath, String outputPath) { + + RDD joined = spark.read() + .load(inputPath) + .as(Encoders.kryo(EntityRelEntity.class)) + .javaRDD() + .map(e -> getJoinedEntity(e)) + .mapToPair(e -> new Tuple2<>(e.getEntity().getId(), e)) + .reduceByKey((j1, j2) -> getJoinedEntity(j1, j2)) + .map(Tuple2::_2) + .rdd(); + + spark.createDataset(joined, Encoders.bean(JoinedEntity.class)) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); + + } + + private static JoinedEntity getJoinedEntity(JoinedEntity j1, JoinedEntity j2) { + JoinedEntity je = new JoinedEntity(); + je.setEntity(je.getEntity()); + je.setType(j1.getType()); + + Links links = new Links(); + links.addAll(j1.getLinks()); + links.addAll(j2.getLinks()); + + return je; + } + + private static JoinedEntity getJoinedEntity(EntityRelEntity e) { + JoinedEntity j = new JoinedEntity(); + j.setEntity(toOafEntity(e.getEntity())); + j.setType(EntityType.valueOf(e.getEntity().getType())); + Links links = new Links(); + links.add(new eu.dnetlib.dhp.oa.provision.model.Tuple2(e.getRelation(), e.getTarget())); + j.setLinks(links); + return j; + } + + private static OafEntity toOafEntity(TypedRow typedRow) { + return parseOaf(typedRow.getOaf(), typedRow.getType()); + } + + private static OafEntity parseOaf(final String json, final String type) { + try { + switch (GraphMappingUtils.EntityType.valueOf(type)) { + case publication: + return OBJECT_MAPPER.readValue(json, Publication.class); + case dataset: + return OBJECT_MAPPER.readValue(json, Dataset.class); + case otherresearchproduct: + return OBJECT_MAPPER.readValue(json, OtherResearchProduct.class); + case software: + return OBJECT_MAPPER.readValue(json, Software.class); + case datasource: + return OBJECT_MAPPER.readValue(json, Datasource.class); + case organization: + return OBJECT_MAPPER.readValue(json, Organization.class); + case project: + return OBJECT_MAPPER.readValue(json, Project.class); + default: + throw new IllegalArgumentException("invalid type: " + type); + } + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } + + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java new file mode 100644 index 000000000..0b153f826 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java @@ -0,0 +1,157 @@ +package eu.dnetlib.dhp.oa.provision; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; +import eu.dnetlib.dhp.oa.provision.model.SortableRelation; +import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*; + +/** + * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. + * The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, + * and all the possible relationships (similarity links produced by the Dedup process are excluded). + * + * The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and again + * by E, finally grouped by E.id; + * + * The workflow is organized in different parts aimed to to reduce the complexity of the operation + * 1) PrepareRelationsJob: + * only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity + * can be linked at most to 100 other objects + * + * 2) CreateRelatedEntitiesJob_phase1: + * prepare tuples [relation - target entity] (R - T): + * for each entity type E_i + * join (R.target = E_i.id), + * map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i] + * save the tuples [R - T_i] in append mode + * + * 3) CreateRelatedEntitiesJob_phase2: + * prepare tuples [source entity - relation - target entity] (S - R - T): + * create the union of the each entity type, hash by id (S) + * for each [R - T_i] produced in phase1 + * join S.id = [R - T_i].source to produce (S_i - R - T_i) + * save in append mode + * + * 4) AdjacencyListBuilderJob: + * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mappnig the result as JoinedEntity + * + * 5) XmlConverterJob: + * convert the JoinedEntities as XML records + */ +public class CreateRelatedEntitiesJob_phase1 { + + private static final Logger log = LoggerFactory.getLogger(CreateRelatedEntitiesJob_phase1.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils.toString( + PrepareRelationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase1.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputRelationsPath = parser.get("inputRelationsPath"); + log.info("inputRelationsPath: {}", inputRelationsPath); + + String inputEntityPath = parser.get("inputEntityPath"); + log.info("inputEntityPath: {}", inputEntityPath); + + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + String graphTableClassName = parser.get("graphTableClassName"); + log.info("graphTableClassName: {}", graphTableClassName); + + Class entityClazz = (Class) Class.forName(graphTableClassName); + + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(getKryoClasses()); + + runWithSparkSession(conf, isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + joinRelationEntity(spark, inputRelationsPath, inputEntityPath, entityClazz, outputPath); + }); + } + + private static void joinRelationEntity(SparkSession spark, String inputRelationsPath, String inputEntityPath, Class entityClazz, String outputPath) { + + Dataset> relsByTarget = readPathRelation(spark, inputRelationsPath) + .map((MapFunction>) r -> new Tuple2<>(r.getTarget(), r), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(SortableRelation.class))); + + Dataset> entities = readPathEntity(spark, inputEntityPath, entityClazz) + .map((MapFunction>) e -> new Tuple2<>(e.getId(), e), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(entityClazz))) + .cache(); + + relsByTarget + .joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner") + .filter((FilterFunction, Tuple2>>) + value -> value._2()._2().getDataInfo().getDeletedbyinference() == false) + .map((MapFunction, Tuple2>, EntityRelEntity>) + t -> new EntityRelEntity(t._1()._2(), GraphMappingUtils.asRelatedEntity(t._2()._2(), entityClazz)), + Encoders.bean(EntityRelEntity.class)) + .write() + .mode(SaveMode.Append) + .parquet(outputPath); + } + + private static Dataset readPathEntity(SparkSession spark, String inputEntityPath, Class entityClazz) { + + log.info("Reading Graph table from: {}", inputEntityPath); + return spark + .read() + .textFile(inputEntityPath) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), Encoders.bean(entityClazz)); + } + + /** + * Reads a Dataset of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text file, + * + * @param spark + * @param relationPath + * @return the Dataset containing all the relationships + */ + private static Dataset readPathRelation(SparkSession spark, final String relationPath) { + + log.info("Reading relations from: {}", relationPath); + return spark.read() + .load(relationPath) + .as(Encoders.bean(SortableRelation.class)); + } + + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java new file mode 100644 index 000000000..6c7f1efd7 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java @@ -0,0 +1,168 @@ +package eu.dnetlib.dhp.oa.provision; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; +import eu.dnetlib.dhp.oa.provision.model.TypedRow; +import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*; + +/** + * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. + * The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, + * and all the possible relationships (similarity links produced by the Dedup process are excluded). + * + * The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and again + * by E, finally grouped by E.id; + * + * The workflow is organized in different parts aimed to to reduce the complexity of the operation + * 1) PrepareRelationsJob: + * only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity + * can be linked at most to 100 other objects + * + * 2) CreateRelatedEntitiesJob_phase1: + * prepare tuples [relation - target entity] (R - T): + * for each entity type E_i + * join (R.target = E_i.id), + * map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i] + * save the tuples [R - T_i] in append mode + * + * 3) CreateRelatedEntitiesJob_phase2: + * prepare tuples [source entity - relation - target entity] (S - R - T): + * create the union of the each entity type, hash by id (S) + * for each [R - T_i] produced in phase1 + * join S.id = [R - T_i].source to produce (S_i - R - T_i) + * save in append mode + * + * 4) AdjacencyListBuilderJob: + * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mappnig the result as JoinedEntity + * + * 5) XmlConverterJob: + * convert the JoinedEntities as XML records + */ +public class CreateRelatedEntitiesJob_phase2 { + + private static final Logger log = LoggerFactory.getLogger(CreateRelatedEntitiesJob_phase2.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils.toString( + PrepareRelationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase1.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputRelatedEntitiesPath = parser.get("inputRelatedEntitiesPath"); + log.info("inputRelatedEntitiesPath: {}", inputRelatedEntitiesPath); + + String inputGraphPath = parser.get("inputGraphPath"); + log.info("inputGraphPath: {}", inputGraphPath); + + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + String graphTableClassName = parser.get("graphTableClassName"); + log.info("graphTableClassName: {}", graphTableClassName); + + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(getKryoClasses()); + + runWithSparkSession(conf, isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + joinAllEntities(spark, inputRelatedEntitiesPath, inputGraphPath, outputPath); + }); + } + + private static void joinAllEntities(SparkSession spark, String inputRelatedEntitiesPath, String inputGraphPath, String outputPath) { + + Dataset> relsBySource = readRelatedEntities(spark, inputRelatedEntitiesPath); + Dataset> entities = readAllEntities(spark, inputGraphPath); + + entities + .joinWith(relsBySource, entities.col("_1").equalTo(relsBySource.col("_1")), "left_outer") + .map((MapFunction, Tuple2>, EntityRelEntity>) value -> { + EntityRelEntity re = new EntityRelEntity(); + re.setEntity(value._1()._2()); + Optional related = Optional.ofNullable(value._2()).map(Tuple2::_2); + if (related.isPresent()) { + re.setRelation(related.get().getRelation()); + re.setTarget(related.get().getTarget()); + } + return re; + }, Encoders.bean(EntityRelEntity.class)) + .write() + .mode(SaveMode.Append) + .parquet(outputPath); + } + + private static Dataset> readAllEntities(SparkSession spark, String inputGraphPath) { + return GraphMappingUtils.entityTypes.entrySet() + .stream() + .map((Function, Dataset>) + e -> readPathEntity(spark, inputGraphPath + "/" + e.getKey().name(), e.getValue()) + .map((MapFunction) entity -> { + TypedRow t = new TypedRow(); + t.setType(e.getKey().name()); + t.setDeleted(entity.getDataInfo().getDeletedbyinference()); + t.setId(entity.getId()); + t.setOaf(OBJECT_MAPPER.writeValueAsString(entity)); + return t; + }, Encoders.bean(TypedRow.class))) + .reduce(spark.emptyDataset(Encoders.bean(TypedRow.class)), Dataset::union) + .map((MapFunction>) + value -> new Tuple2<>(value.getId(), value), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class))); + } + + private static Dataset> readRelatedEntities(SparkSession spark, String inputRelatedEntitiesPath) { + return spark.read() + .load(inputRelatedEntitiesPath) + .as(Encoders.kryo(EntityRelEntity.class)) + .map((MapFunction>) + value -> new Tuple2<>(value.getRelation().getSource(), value), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class))); + } + + + private static Dataset readPathEntity(SparkSession spark, String inputEntityPath, Class entityClazz) { + + log.info("Reading Graph table from: {}", inputEntityPath); + return spark + .read() + .textFile(inputEntityPath) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), Encoders.bean(entityClazz)); + } + + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner_v2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner_v2.java deleted file mode 100644 index 3ee72c318..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner_v2.java +++ /dev/null @@ -1,346 +0,0 @@ -package eu.dnetlib.dhp.oa.provision; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Iterators; -import com.google.common.collect.Maps; -import com.jayway.jsonpath.DocumentContext; -import com.jayway.jsonpath.JsonPath; -import eu.dnetlib.dhp.oa.provision.model.*; -import eu.dnetlib.dhp.oa.provision.utils.*; -import eu.dnetlib.dhp.schema.oaf.*; -import org.apache.spark.SparkContext; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.*; -import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.*; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.types.*; -import org.apache.spark.util.LongAccumulator; -import scala.Tuple2; - -import java.io.IOException; -import java.io.Serializable; -import java.util.*; - -import static org.apache.spark.sql.functions.*; - -import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.asRelatedEntity; - -/** - * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. - * The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, - * and all the possible relationships (similarity links produced by the Dedup process are excluded). - * - * The operation is implemented creating the union between the entity types (E), joined by the relationships (R), and again - * by E, finally grouped by E.id; - * - * Different manipulations of the E and R sets are introduced to reduce the complexity of the operation - * 1) treat the object payload as string, extracting only the necessary information beforehand using json path, - * it seems that deserializing it with jackson's object mapper has higher memory footprint. - * - * 2) only consider rels that are not virtually deleted ($.dataInfo.deletedbyinference == false) - * 3) we only need a subset of fields from the related entities, so we introduce a distinction between E_source = S - * and E_target = T. Objects in T are heavily pruned by all the unnecessary information - * - * 4) perform the join as (((T.id join R.target) union S) groupby S.id) yield S -> [ ] - */ -public class GraphJoiner_v2 implements Serializable { - - private Map accumulators = Maps.newHashMap(); - - public static final int MAX_RELS = 100; - - public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd"; - - private SparkSession spark; - - private ContextMapper contextMapper; - - private String inputPath; - - private String outPath; - - private String otherDsTypeId; - - public GraphJoiner_v2(SparkSession spark, ContextMapper contextMapper, String otherDsTypeId, String inputPath, String outPath) { - this.spark = spark; - this.contextMapper = contextMapper; - this.otherDsTypeId = otherDsTypeId; - this.inputPath = inputPath; - this.outPath = outPath; - - final SparkContext sc = spark.sparkContext(); - prepareAccumulators(sc); - } - - public GraphJoiner_v2 adjacencyLists() throws IOException { - - final JavaSparkContext jsc = JavaSparkContext.fromSparkContext(getSpark().sparkContext()); - - // read each entity - Dataset datasource = readPathEntity(jsc, getInputPath(), "datasource"); - Dataset organization = readPathEntity(jsc, getInputPath(), "organization"); - Dataset project = readPathEntity(jsc, getInputPath(), "project"); - Dataset dataset = readPathEntity(jsc, getInputPath(), "dataset"); - Dataset otherresearchproduct = readPathEntity(jsc, getInputPath(), "otherresearchproduct"); - Dataset software = readPathEntity(jsc, getInputPath(), "software"); - Dataset publication = readPathEntity(jsc, getInputPath(), "publication"); - - // create the union between all the entities - datasource - .union(organization) - .union(project) - .union(dataset) - .union(otherresearchproduct) - .union(software) - .union(publication) - .repartition(7000) - .write() - .partitionBy("id") - .parquet(getOutPath() + "/entities"); - - Dataset> entities = getSpark() - .read() - .load(getOutPath() + "/entities") - .map((MapFunction>) r -> { - TypedRow t = new TypedRow(); - t.setId(r.getAs("id")); - t.setDeleted(r.getAs("deleted")); - t.setType(r.getAs("type")); - t.setOaf(r.getAs("oaf")); - - return new Tuple2<>(t.getId(), t); - }, Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class))) - .cache(); - - System.out.println("Entities, number of partitions: " + entities.rdd().getNumPartitions()); - System.out.println("Entities schema:"); - entities.printSchema(); - System.out.println("Entities count:" + entities.count()); - - // reads the relationships - readPathRelation(jsc, getInputPath()) - .groupByKey((MapFunction) t -> SortableRelationKey.from(t), Encoders.kryo(SortableRelationKey.class)) - .flatMapGroups((FlatMapGroupsFunction) (key, values) -> Iterators.limit(values, MAX_RELS), Encoders.kryo(Relation.class)) - .repartition(3000) - .write() - .partitionBy("source", "target") - .parquet(getOutPath() + "/relations"); - - Dataset rels = getSpark() - .read() - .load(getOutPath() + "/relations") - .map((MapFunction) r -> { - Relation rel = new Relation(); - rel.setSource(r.getAs("source")); - rel.setTarget(r.getAs("target")); - rel.setRelType(r.getAs("relType")); - rel.setSubRelType(r.getAs("subRelType")); - rel.setRelClass(r.getAs("relClass")); - rel.setDataInfo(r.getAs("dataInfo")); - rel.setCollectedFrom(r.getList(r.fieldIndex("collectedFrom"))); - return rel; - }, Encoders.kryo(Relation.class)) - .cache(); - - System.out.println("Relation schema:"); - System.out.println("Relation, number of partitions: " + rels.rdd().getNumPartitions()); - System.out.println("Relation schema:"); - entities.printSchema(); - System.out.println("Relation count:" + rels.count()); - - /* - Dataset> relsByTarget = rels - .map((MapFunction>) r -> new Tuple2<>(r.getTarget(), r), Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class))); - - - relsByTarget - .joinWith(entities, relsByTarget.col("_1").equalTo(entities.col("_1")), "inner") - .filter((FilterFunction, Tuple2>>) value -> value._2()._2().getDeleted() == false) - .map((MapFunction, Tuple2>, EntityRelEntity>) t -> { - EntityRelEntity e = new EntityRelEntity(); - e.setRelation(t._1()._2()); - e.setTarget(asRelatedEntity(t._2()._2())); - return e; - }, Encoders.kryo(EntityRelEntity.class)) - .repartition(20000) - .write() - .parquet(getOutPath() + "/bySource"); - - Dataset> bySource = getSpark() - .read() - .load(getOutPath() + "/bySource") - .map(new MapFunction() { - @Override - public EntityRelEntity call(Row value) throws Exception { - return null; - } - }, Encoders.kryo(EntityRelEntity.class)) - .map((MapFunction>) e -> new Tuple2<>(e.getRelation().getSource(), e), - Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class))) - - System.out.println("bySource schema"); - bySource.printSchema(); - - - - - Dataset joined = entities - .joinWith(bySource, entities.col("_1").equalTo(bySource.col("_1")), "left") - .map((MapFunction, Tuple2>, EntityRelEntity>) value -> { - EntityRelEntity re = new EntityRelEntity(); - re.setEntity(value._1()._2()); - Optional related = Optional.ofNullable(value._2()).map(Tuple2::_2); - if (related.isPresent()) { - re.setRelation(related.get().getRelation()); - re.setTarget(related.get().getTarget()); - } - return re; - }, Encoders.kryo(EntityRelEntity.class)); - - System.out.println("joined schema"); - joined.printSchema(); - //joined.write().json(getOutPath() + "/joined"); - - final Dataset grouped = joined - .groupByKey((MapFunction) e -> e.getEntity(), Encoders.kryo(TypedRow.class)) - .mapGroups((MapGroupsFunction) (key, values) -> toJoinedEntity(key, values), Encoders.kryo(JoinedEntity.class)); - - System.out.println("grouped schema"); - grouped.printSchema(); - - final XmlRecordFactory recordFactory = new XmlRecordFactory(accumulators, contextMapper, false, schemaLocation, otherDsTypeId); - grouped - .map((MapFunction) value -> recordFactory.build(value), Encoders.STRING()) - .javaRDD() - .mapToPair((PairFunction, String, String>) t -> new Tuple2<>(t._1(), t._2())) - .saveAsHadoopFile(getOutPath() + "/xml", Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); - - -*/ - - return this; - } - - public SparkSession getSpark() { - return spark; - } - - public String getInputPath() { - return inputPath; - } - - public String getOutPath() { - return outPath; - } - - // HELPERS - - private JoinedEntity toJoinedEntity(TypedRow key, Iterator values) { - final ObjectMapper mapper = getObjectMapper(); - final JoinedEntity j = new JoinedEntity(); - j.setType(key.getType()); - j.setEntity(parseOaf(key.getOaf(), key.getType(), mapper)); - final Links links = new Links(); - values.forEachRemaining(rel -> links.add( - new eu.dnetlib.dhp.oa.provision.model.Tuple2( - rel.getRelation(), - rel.getTarget() - ))); - j.setLinks(links); - return j; - } - - private OafEntity parseOaf(final String json, final String type, final ObjectMapper mapper) { - try { - switch (GraphMappingUtils.EntityType.valueOf(type)) { - case publication: - return mapper.readValue(json, Publication.class); - case dataset: - return mapper.readValue(json, eu.dnetlib.dhp.schema.oaf.Dataset.class); - case otherresearchproduct: - return mapper.readValue(json, OtherResearchProduct.class); - case software: - return mapper.readValue(json, Software.class); - case datasource: - return mapper.readValue(json, Datasource.class); - case organization: - return mapper.readValue(json, Organization.class); - case project: - return mapper.readValue(json, Project.class); - default: - throw new IllegalArgumentException("invalid type: " + type); - } - } catch (IOException e) { - throw new IllegalArgumentException(e); - } - } - - /** - * Reads a set of eu.dnetlib.dhp.schema.oaf.OafEntity objects from a new line delimited json file, - * extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.model.TypedRow - * @param sc - * @param inputPath - * @param type - * @return the JavaPairRDD indexed by entity identifier - */ - private Dataset readPathEntity(final JavaSparkContext sc, final String inputPath, final String type) { - RDD rdd = sc.textFile(inputPath + "/" + type) - .rdd(); - - return getSpark().createDataset(rdd, Encoders.STRING()) - .map((MapFunction) s -> { - final DocumentContext json = JsonPath.parse(s); - final TypedRow t = new TypedRow(); - t.setId(json.read("$.id")); - t.setDeleted(json.read("$.dataInfo.deletedbyinference")); - t.setType(type); - t.setOaf(s); - return t; - }, Encoders.bean(TypedRow.class)); - } - - /** - * Reads a set of eu.dnetlib.dhp.schema.oaf.Relation objects from a sequence file , - * extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.model.TypedRow - * @param sc - * @param inputPath - * @return the JavaRDD containing all the relationships - */ - private Dataset readPathRelation(final JavaSparkContext sc, final String inputPath) { - final RDD rdd = sc.textFile(inputPath + "/relation") - .rdd(); - - return getSpark().createDataset(rdd, Encoders.STRING()) - .map((MapFunction) s -> new ObjectMapper().readValue(s, Relation.class), Encoders.bean(Relation.class)); - } - - private ObjectMapper getObjectMapper() { - return new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - } - - private void prepareAccumulators(SparkContext sc) { - accumulators.put("resultResult_similarity_isAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_isAmongTopNSimilarDocuments")); - accumulators.put("resultResult_similarity_hasAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_hasAmongTopNSimilarDocuments")); - accumulators.put("resultResult_supplement_isSupplementTo", sc.longAccumulator("resultResult_supplement_isSupplementTo")); - accumulators.put("resultResult_supplement_isSupplementedBy", sc.longAccumulator("resultResult_supplement_isSupplementedBy")); - accumulators.put("resultResult_dedup_isMergedIn", sc.longAccumulator("resultResult_dedup_isMergedIn")); - accumulators.put("resultResult_dedup_merges", sc.longAccumulator("resultResult_dedup_merges")); - - accumulators.put("resultResult_publicationDataset_isRelatedTo", sc.longAccumulator("resultResult_publicationDataset_isRelatedTo")); - accumulators.put("resultResult_relationship_isRelatedTo", sc.longAccumulator("resultResult_relationship_isRelatedTo")); - accumulators.put("resultProject_outcome_isProducedBy", sc.longAccumulator("resultProject_outcome_isProducedBy")); - accumulators.put("resultProject_outcome_produces", sc.longAccumulator("resultProject_outcome_produces")); - accumulators.put("resultOrganization_affiliation_isAuthorInstitutionOf", sc.longAccumulator("resultOrganization_affiliation_isAuthorInstitutionOf")); - - accumulators.put("resultOrganization_affiliation_hasAuthorInstitution", sc.longAccumulator("resultOrganization_affiliation_hasAuthorInstitution")); - accumulators.put("projectOrganization_participation_hasParticipant", sc.longAccumulator("projectOrganization_participation_hasParticipant")); - accumulators.put("projectOrganization_participation_isParticipant", sc.longAccumulator("projectOrganization_participation_isParticipant")); - accumulators.put("organizationOrganization_dedup_isMergedIn", sc.longAccumulator("organizationOrganization_dedup_isMergedIn")); - accumulators.put("organizationOrganization_dedup_merges", sc.longAccumulator("resultProject_outcome_produces")); - accumulators.put("datasourceOrganization_provision_isProvidedBy", sc.longAccumulator("datasourceOrganization_provision_isProvidedBy")); - accumulators.put("datasourceOrganization_provision_provides", sc.longAccumulator("datasourceOrganization_provision_provides")); - } - -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java new file mode 100644 index 000000000..19599b52c --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -0,0 +1,132 @@ +package eu.dnetlib.dhp.oa.provision; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.provision.model.SortableRelation; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +/** + * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. + * The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, + * and all the possible relationships (similarity links produced by the Dedup process are excluded). + * + * The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and again + * by E, finally grouped by E.id; + * + * The workflow is organized in different parts aimed to to reduce the complexity of the operation + * 1) PrepareRelationsJob: + * only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity + * can be linked at most to 100 other objects + * + * 2) JoinRelationEntityByTargetJob: + * prepare tuples [source entity - relation - target entity] (S - R - T): + * for each entity type E_i + * join (R.target = E_i.id), + * map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i] + * join (E_i.id = [R - T_i].source), where E_i becomes the source entity S + * + * 3) AdjacencyListBuilderJob: + * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mappnig the result as JoinedEntity + * + * 4) XmlConverterJob: + * convert the JoinedEntities as XML records + */ +public class PrepareRelationsJob { + + private static final Logger log = LoggerFactory.getLogger(PrepareRelationsJob.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static final int MAX_RELS = 100; + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils.toString( + PrepareRelationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputRelationsPath = parser.get("inputRelationsPath"); + log.info("inputRelationsPath: {}", inputRelationsPath); + + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession(conf, isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + prepareRelationsFromPaths(spark, inputRelationsPath, outputPath); + }); + } + + private static void prepareRelationsFromPaths(SparkSession spark, String inputRelationsPath, String outputPath) { + RDD rels = readPathRelation(spark, inputRelationsPath) + .filter((FilterFunction) r -> r.getDataInfo().getDeletedbyinference() == false) + .javaRDD() + .mapToPair((PairFunction>) rel -> new Tuple2<>( + rel.getSource(), + Lists.newArrayList(rel))) + .reduceByKey((v1, v2) -> { + v1.addAll(v2); + v1.sort(SortableRelation::compareTo); + if (v1.size() > MAX_RELS) { + return v1.subList(0, MAX_RELS); + } + return new ArrayList<>(v1.subList(0, MAX_RELS)); + }) + .flatMap(r -> r._2().iterator()) + .rdd(); + + spark.createDataset(rels, Encoders.bean(SortableRelation.class)) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); + } + + /** + * Reads a Dataset of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text file, + * + * @param spark + * @param inputPath + * @return the Dataset containing all the relationships + */ + private static Dataset readPathRelation(SparkSession spark, final String inputPath) { + return spark.read() + .textFile(inputPath) + .map((MapFunction) s -> OBJECT_MAPPER.readValue(s, SortableRelation.class), + Encoders.bean(SortableRelation.class)); + } + + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlIndexingJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlIndexingJob.java index 975ac7548..eae8cf1a1 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlIndexingJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlIndexingJob.java @@ -2,6 +2,7 @@ package eu.dnetlib.dhp.oa.provision; import com.lucidworks.spark.util.SolrSupport; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory; @@ -18,6 +19,8 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.xml.transform.Transformer; import javax.xml.transform.TransformerException; @@ -28,14 +31,20 @@ import java.io.StringReader; import java.io.StringWriter; import java.text.SimpleDateFormat; import java.util.Date; +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; public class SparkXmlIndexingJob { - private static final Log log = LogFactory.getLog(SparkXmlIndexingJob.class); + private static final Logger log = LoggerFactory.getLogger(SparkXmlIndexingJob.class); private static final Integer DEFAULT_BATCH_SIZE = 1000; private static final String LAYOUT = "index"; + private static final String INTERPRETATION = "openaire"; + private static final String SEPARATOR = "-"; + public static final String DATE_FORMAT = "yyyy-MM-dd'T'hh:mm:ss'Z'"; public static void main(String[] args) throws Exception { @@ -45,48 +54,56 @@ public class SparkXmlIndexingJob { "/eu/dnetlib/dhp/oa/provision/input_params_update_index.json"))); parser.parseArgument(args); - final String inputPath = parser.get("sourcePath"); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("inputPath"); + log.info("inputPath: {}", inputPath); + final String isLookupUrl = parser.get("isLookupUrl"); + log.info("isLookupUrl: {}", isLookupUrl); + final String format = parser.get("format"); + log.info("format: {}", format); + final Integer batchSize = parser.getObjectMap().containsKey("batchSize") ? Integer.valueOf(parser.get("batchSize")) : DEFAULT_BATCH_SIZE; + log.info("batchSize: {}", batchSize); final ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl); final String fields = getLayoutSource(isLookup, format); + log.info("fields: {}", fields); + final String xslt = getLayoutTransformer(isLookup); final String dsId = getDsId(format, isLookup); + log.info("dsId: {}", dsId); + final String zkHost = getZkHost(isLookup); + log.info("zkHost: {}", zkHost); + final String version = getRecordDatestamp(); final String indexRecordXslt = getLayoutTransformer(format, fields, xslt); + log.info("indexRecordTransformer {}", indexRecordXslt); - log.info("indexRecordTransformer: " + indexRecordXslt); + final SparkConf conf = new SparkConf(); - final String master = parser.get("master"); - final SparkConf conf = new SparkConf() - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + runWithSparkSession(conf, isSparkSessionManaged, + spark -> { + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - try(SparkSession spark = getSession(conf, master)) { + RDD docs = sc.sequenceFile(inputPath, Text.class, Text.class) + .map(t -> t._2().toString()) + .map(s -> toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), s)) + .map(s -> new StreamingInputDocumentFactory(version, dsId).parseDocument(s)) + .rdd(); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - - RDD docs = sc.sequenceFile(inputPath, Text.class, Text.class) - .map(t -> t._2().toString()) - .map(s -> toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), s)) - .map(s -> new StreamingInputDocumentFactory(version, dsId).parseDocument(s)) - .rdd(); - - SolrSupport.indexDocs(zkHost, format + "-" + LAYOUT + "-openaire", batchSize, docs); - } - } - - private static SparkSession getSession(SparkConf conf, String master) { - return SparkSession - .builder() - .config(conf) - .appName(SparkXmlIndexingJob.class.getSimpleName()) - .master(master) - .getOrCreate(); + final String collection = format + SEPARATOR + LAYOUT + SEPARATOR + INTERPRETATION; + SolrSupport.indexDocs(zkHost, collection, batchSize, docs); + }); } private static String toIndexRecord(Transformer tr, final String record) { @@ -95,7 +112,7 @@ public class SparkXmlIndexingJob { tr.transform(new StreamSource(new StringReader(record)), res); return res.getWriter().toString(); } catch (Throwable e) { - System.out.println("XPathException on record:\n" + record); + log.error("XPathException on record: \n {}", record, e); throw new IllegalArgumentException(e); } } @@ -127,7 +144,7 @@ public class SparkXmlIndexingJob { * @return the parsed date */ public static String getRecordDatestamp() { - return new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss'Z'").format(new Date()); + return new SimpleDateFormat(DATE_FORMAT).format(new Date()); } /** diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlRecordBuilderJob_v2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlRecordBuilderJob_v2.java deleted file mode 100644 index e4124e52f..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlRecordBuilderJob_v2.java +++ /dev/null @@ -1,81 +0,0 @@ -package eu.dnetlib.dhp.oa.provision; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.provision.model.*; -import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; -import eu.dnetlib.dhp.schema.oaf.*; -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.sql.SparkSession; - -public class SparkXmlRecordBuilderJob_v2 { - - public static void main(String[] args) throws Exception { - - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils.toString( - SparkXmlRecordBuilderJob_v2.class.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json"))); - parser.parseArgument(args); - - try(SparkSession spark = getSession(parser)) { - - final String inputPath = parser.get("sourcePath"); - final String outputPath = parser.get("outputPath"); - final String isLookupUrl = parser.get("isLookupUrl"); - final String otherDsTypeId = parser.get("otherDsTypeId"); - - - new GraphJoiner_v2(spark, ContextMapper.fromIS(isLookupUrl), otherDsTypeId, inputPath, outputPath) - .adjacencyLists(); - } - } - - private static SparkSession getSession(ArgumentApplicationParser parser) { - final SparkConf conf = new SparkConf(); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.set("spark.sql.shuffle.partitions", parser.get("sparkSqlShufflePartitions")); - conf.registerKryoClasses(new Class[]{ - Author.class, - Context.class, - Country.class, - DataInfo.class, - eu.dnetlib.dhp.schema.oaf.Dataset.class, - Datasource.class, - ExternalReference.class, - ExtraInfo.class, - Field.class, - GeoLocation.class, - Instance.class, - Journal.class, - KeyValue.class, - Oaf.class, - OafEntity.class, - OAIProvenance.class, - Organization.class, - OriginDescription.class, - OtherResearchProduct.class, - Project.class, - Publication.class, - Qualifier.class, - Relation.class, - Result.class, - Software.class, - StructuredProperty.class, - - TypedRow.class, - EntityRelEntity.class, - JoinedEntity.class, - SortableRelationKey.class, - Tuple2.class, - Links.class, - RelatedEntity.class - }); - return SparkSession - .builder() - .config(conf) - .appName(SparkXmlRecordBuilderJob_v2.class.getSimpleName()) - .master(parser.get("master")) - .getOrCreate(); - } - -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java new file mode 100644 index 000000000..74a36c580 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java @@ -0,0 +1,149 @@ +package eu.dnetlib.dhp.oa.provision; + +import com.google.common.collect.Maps; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.provision.model.*; +import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; +import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils; +import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.util.LongAccumulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.util.Map; +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +/** + * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. + * The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, + * and all the possible relationships (similarity links produced by the Dedup process are excluded). + * + * The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and again + * by E, finally grouped by E.id; + * + * The workflow is organized in different parts aimed to to reduce the complexity of the operation + * 1) PrepareRelationsJob: + * only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity + * can be linked at most to 100 other objects + * + * 2) JoinRelationEntityByTargetJob: + * prepare tuples [source entity - relation - target entity] (S - R - T): + * for each entity type E_i + * join (R.target = E_i.id), + * map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i] + * join (E_i.id = [R - T_i].source), where E_i becomes the source entity S + * + * 3) AdjacencyListBuilderJob: + * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mappnig the result as JoinedEntity + * + * 4) XmlConverterJob: + * convert the JoinedEntities as XML records + */ +public class XmlConverterJob { + + private static final Logger log = LoggerFactory.getLogger(XmlConverterJob.class); + + public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd"; + + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils.toString( + XmlConverterJob.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json"))); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("inputPath"); + log.info("inputPath: {}", inputPath); + + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + String isLookupUrl = parser.get("isLookupUrl"); + log.info("isLookupUrl: {}", isLookupUrl); + + String otherDsTypeId = parser.get("otherDsTypeId"); + log.info("otherDsTypeId: {}", otherDsTypeId); + + SparkConf conf = new SparkConf(); + + runWithSparkSession(conf, isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + convertToXml(spark, inputPath, outputPath, ContextMapper.fromIS(isLookupUrl), otherDsTypeId); + }); + + } + + private static void convertToXml(SparkSession spark, String inputPath, String outputPath, ContextMapper contextMapper, String otherDsTypeId) { + + final XmlRecordFactory recordFactory = new XmlRecordFactory(prepareAccumulators(spark.sparkContext()), contextMapper, false, schemaLocation, otherDsTypeId); + + spark.read() + .load(inputPath) + .as(Encoders.bean(JoinedEntity.class)) + .map((MapFunction>) je -> new Tuple2<>( + je.getEntity().getId(), + recordFactory.build(je) + ), Encoders.tuple(Encoders.STRING(), Encoders.STRING())) + .javaRDD() + .mapToPair((PairFunction, String, String>) t -> t) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + } + + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + + private static Map prepareAccumulators(SparkContext sc) { + Map accumulators = Maps.newHashMap(); + accumulators.put("resultResult_similarity_isAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_isAmongTopNSimilarDocuments")); + accumulators.put("resultResult_similarity_hasAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_hasAmongTopNSimilarDocuments")); + accumulators.put("resultResult_supplement_isSupplementTo", sc.longAccumulator("resultResult_supplement_isSupplementTo")); + accumulators.put("resultResult_supplement_isSupplementedBy", sc.longAccumulator("resultResult_supplement_isSupplementedBy")); + accumulators.put("resultResult_dedup_isMergedIn", sc.longAccumulator("resultResult_dedup_isMergedIn")); + accumulators.put("resultResult_dedup_merges", sc.longAccumulator("resultResult_dedup_merges")); + + accumulators.put("resultResult_publicationDataset_isRelatedTo", sc.longAccumulator("resultResult_publicationDataset_isRelatedTo")); + accumulators.put("resultResult_relationship_isRelatedTo", sc.longAccumulator("resultResult_relationship_isRelatedTo")); + accumulators.put("resultProject_outcome_isProducedBy", sc.longAccumulator("resultProject_outcome_isProducedBy")); + accumulators.put("resultProject_outcome_produces", sc.longAccumulator("resultProject_outcome_produces")); + accumulators.put("resultOrganization_affiliation_isAuthorInstitutionOf", sc.longAccumulator("resultOrganization_affiliation_isAuthorInstitutionOf")); + + accumulators.put("resultOrganization_affiliation_hasAuthorInstitution", sc.longAccumulator("resultOrganization_affiliation_hasAuthorInstitution")); + accumulators.put("projectOrganization_participation_hasParticipant", sc.longAccumulator("projectOrganization_participation_hasParticipant")); + accumulators.put("projectOrganization_participation_isParticipant", sc.longAccumulator("projectOrganization_participation_isParticipant")); + accumulators.put("organizationOrganization_dedup_isMergedIn", sc.longAccumulator("organizationOrganization_dedup_isMergedIn")); + accumulators.put("organizationOrganization_dedup_merges", sc.longAccumulator("resultProject_outcome_produces")); + accumulators.put("datasourceOrganization_provision_isProvidedBy", sc.longAccumulator("datasourceOrganization_provision_isProvidedBy")); + accumulators.put("datasourceOrganization_provision_provides", sc.longAccumulator("datasourceOrganization_provision_provides")); + + return accumulators; + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java index ddeec140b..35dfa41d3 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java @@ -1,15 +1,26 @@ package eu.dnetlib.dhp.oa.provision.model; -import eu.dnetlib.dhp.schema.oaf.Relation; - import java.io.Serializable; public class EntityRelEntity implements Serializable { private TypedRow entity; - private Relation relation; + private SortableRelation relation; private RelatedEntity target; + public EntityRelEntity() { + } + + public EntityRelEntity(SortableRelation relation, RelatedEntity target) { + this(null, relation, target); + } + + public EntityRelEntity(TypedRow entity, SortableRelation relation, RelatedEntity target) { + this.entity = entity; + this.relation = relation; + this.target = target; + } + public TypedRow getEntity() { return entity; } @@ -18,11 +29,11 @@ public class EntityRelEntity implements Serializable { this.entity = entity; } - public Relation getRelation() { + public SortableRelation getRelation() { return relation; } - public void setRelation(Relation relation) { + public void setRelation(SortableRelation relation) { this.relation = relation; } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java index 815863c67..4dd434804 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java @@ -1,22 +1,23 @@ package eu.dnetlib.dhp.oa.provision.model; +import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils; import eu.dnetlib.dhp.schema.oaf.OafEntity; import java.io.Serializable; public class JoinedEntity implements Serializable { - private String type; + private GraphMappingUtils.EntityType type; private OafEntity entity; private Links links; - public String getType() { + public GraphMappingUtils.EntityType getType() { return type; } - public void setType(String type) { + public void setType(GraphMappingUtils.EntityType type) { this.type = type; } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Links.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Links.java index 0cb4617ec..4ea194876 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Links.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Links.java @@ -1,6 +1,6 @@ package eu.dnetlib.dhp.oa.provision.model; -import java.util.ArrayList; +import java.util.HashSet; -public class Links extends ArrayList { +public class Links extends HashSet { } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java new file mode 100644 index 000000000..430779c72 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java @@ -0,0 +1,34 @@ +package eu.dnetlib.dhp.oa.provision.model; + +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.Maps; +import eu.dnetlib.dhp.schema.oaf.Relation; + +import java.util.Map; + +public class SortableRelation extends Relation implements Comparable { + + private final static Map weights = Maps.newHashMap(); + + static { + weights.put("outcome", 0); + weights.put("supplement", 1); + weights.put("publicationDataset", 2); + weights.put("relationship", 3); + weights.put("similarity", 4); + weights.put("affiliation", 5); + + weights.put("provision", 6); + weights.put("participation", 7); + weights.put("dedup", 8); + } + + @Override + public int compareTo(Relation o) { + return ComparisonChain.start() + .compare(weights.get(getSubRelType()), weights.get(o.getSubRelType())) + .compare(getSource(), o.getSource()) + .compare(getTarget(), o.getTarget()) + .result(); + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java index db639f113..f1e2c652c 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java @@ -2,7 +2,10 @@ package eu.dnetlib.dhp.oa.provision.model; import eu.dnetlib.dhp.schema.oaf.Relation; -public class Tuple2 { +import java.io.Serializable; +import java.util.Objects; + +public class Tuple2 implements Serializable { private Relation relation; @@ -28,4 +31,18 @@ public class Tuple2 { public void setRelatedEntity(RelatedEntity relatedEntity) { this.relatedEntity = relatedEntity; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Tuple2 t2 = (Tuple2) o; + return getRelation().equals(t2.getRelation()); + } + + @Override + public int hashCode() { + return Objects.hash(getRelation().hashCode()); + } + } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java index 27b42e69d..8418db8e6 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java @@ -1,30 +1,47 @@ package eu.dnetlib.dhp.oa.provision.utils; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.jayway.jsonpath.DocumentContext; -import com.jayway.jsonpath.JsonPath; -import eu.dnetlib.dhp.oa.provision.model.*; +import eu.dnetlib.dhp.oa.provision.model.RelatedEntity; +import eu.dnetlib.dhp.oa.provision.model.SortableRelation; import eu.dnetlib.dhp.schema.oaf.*; -import net.minidev.json.JSONArray; -import org.apache.commons.lang3.StringUtils; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; -import static org.apache.commons.lang3.StringUtils.*; +import static org.apache.commons.lang3.StringUtils.substringAfter; public class GraphMappingUtils { public static final String SEPARATOR = "_"; + public final static Map entityTypes = Maps.newHashMap(); + + static { + entityTypes.put(EntityType.datasource, Datasource.class); + entityTypes.put(EntityType.organization, Organization.class); + entityTypes.put(EntityType.project, Project.class); + entityTypes.put(EntityType.dataset, Dataset.class); + entityTypes.put(EntityType.otherresearchproduct, OtherResearchProduct.class); + entityTypes.put(EntityType.software, Software.class); + entityTypes.put(EntityType.publication, Publication.class); + } + public enum EntityType { - publication, dataset, otherresearchproduct, software, datasource, organization, project + publication, dataset, otherresearchproduct, software, datasource, organization, project; + + public static EntityType fromClass(Class clazz) { + switch (clazz.getName()) { + case "eu.dnetlib.dhp.schema.oaf.Publication" : return publication; + case "eu.dnetlib.dhp.schema.oaf.Dataset" : return dataset; + case "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct" : return otherresearchproduct; + case "eu.dnetlib.dhp.schema.oaf.Software" : return software; + case "eu.dnetlib.dhp.schema.oaf.Datasource" : return datasource; + case "eu.dnetlib.dhp.schema.oaf.Organization" : return organization; + case "eu.dnetlib.dhp.schema.oaf.Project" : return project; + default: throw new IllegalArgumentException("Unknown OafEntity class: " + clazz.getName()); + } + } } public enum MainEntityType { @@ -33,8 +50,6 @@ public class GraphMappingUtils { public static Set authorPidTypes = Sets.newHashSet("orcid", "magidentifier"); - public static Set instanceFieldFilter = Sets.newHashSet("instancetype", "hostedby", "license", "accessright", "collectedfrom", "dateofacceptance", "distributionlocation"); - private static final String schemeTemplate = "dnet:%s_%s_relations"; private static Map entityMapping = Maps.newHashMap(); @@ -49,6 +64,38 @@ public class GraphMappingUtils { entityMapping.put(EntityType.project, MainEntityType.project); } + public static Class[] getKryoClasses() { + return new Class[]{ + Author.class, + Context.class, + Country.class, + DataInfo.class, + eu.dnetlib.dhp.schema.oaf.Dataset.class, + Datasource.class, + ExternalReference.class, + ExtraInfo.class, + Field.class, + GeoLocation.class, + Instance.class, + Journal.class, + KeyValue.class, + Oaf.class, + OafEntity.class, + OAIProvenance.class, + Organization.class, + OriginDescription.class, + OtherResearchProduct.class, + Project.class, + Publication.class, + Qualifier.class, + Relation.class, + SortableRelation.class, //SUPPORT + Result.class, + Software.class, + StructuredProperty.class + }; + } + public static String getScheme(final String sourceType, final String targetType) { return String.format(schemeTemplate, entityMapping.get(EntityType.valueOf(sourceType)).name(), @@ -63,152 +110,81 @@ public class GraphMappingUtils { return MainEntityType.result.name().equals(getMainType(type)); } - public static RelatedEntity asRelatedEntity(TypedRow e) { + public static RelatedEntity asRelatedEntity(E entity, Class clazz) { - final DocumentContext j = JsonPath.parse(e.getOaf()); final RelatedEntity re = new RelatedEntity(); - re.setId(j.read("$.id")); - re.setType(e.getType()); + re.setId(entity.getId()); + re.setType(clazz.getName()); - switch (EntityType.valueOf(e.getType())) { + re.setPid(entity.getPid()); + re.setCollectedfrom(entity.getCollectedfrom()); + + switch (GraphMappingUtils.EntityType.fromClass(clazz)) { case publication: case dataset: case otherresearchproduct: case software: - mapTitle(j, re); - re.setDateofacceptance(j.read("$.dateofacceptance.value")); - re.setPublisher(j.read("$.publisher.value")); - JSONArray pids = j.read("$.pid"); - re.setPid(pids.stream() - .map(p -> asStructuredProperty((LinkedHashMap) p)) - .collect(Collectors.toList())); + Result r = (Result) entity; - re.setResulttype(asQualifier(j.read("$.resulttype"))); + if (r.getTitle() == null && !r.getTitle().isEmpty()) { + re.setTitle(r.getTitle().stream().findFirst().get()); + } - JSONArray collfrom = j.read("$.collectedfrom"); - re.setCollectedfrom(collfrom.stream() - .map(c -> asKV((LinkedHashMap) c)) - .collect(Collectors.toList())); - - // will throw exception when the instance is not found - JSONArray instances = j.read("$.instance"); - re.setInstances(instances.stream() - .map(i -> { - final LinkedHashMap p = (LinkedHashMap) i; - final Field license = new Field(); - license.setValue((String) ((LinkedHashMap) p.get("license")).get("value")); - final Instance instance = new Instance(); - instance.setLicense(license); - instance.setAccessright(asQualifier((LinkedHashMap) p.get("accessright"))); - instance.setInstancetype(asQualifier((LinkedHashMap) p.get("instancetype"))); - instance.setHostedby(asKV((LinkedHashMap) p.get("hostedby"))); - //TODO mapping of distributionlocation - instance.setCollectedfrom(asKV((LinkedHashMap) p.get("collectedfrom"))); - - Field dateofacceptance = new Field(); - dateofacceptance.setValue((String) ((LinkedHashMap) p.get("dateofacceptance")).get("value")); - instance.setDateofacceptance(dateofacceptance); - return instance; - }).collect(Collectors.toList())); + re.setDateofacceptance(getValue(r.getDateofacceptance())); + re.setPublisher(getValue(r.getPublisher())); + re.setResulttype(re.getResulttype()); + re.setInstances(re.getInstances()); //TODO still to be mapped //re.setCodeRepositoryUrl(j.read("$.coderepositoryurl")); break; case datasource: - re.setOfficialname(j.read("$.officialname.value")); - re.setWebsiteurl(j.read("$.websiteurl.value")); - re.setDatasourcetype(asQualifier(j.read("$.datasourcetype"))); - re.setOpenairecompatibility(asQualifier(j.read("$.openairecompatibility"))); + Datasource d = (Datasource) entity; + + re.setOfficialname(getValue(d.getOfficialname())); + re.setWebsiteurl(getValue(d.getWebsiteurl())); + re.setDatasourcetype(d.getDatasourcetype()); + re.setOpenairecompatibility(d.getOpenairecompatibility()); break; case organization: - re.setLegalname(j.read("$.legalname.value")); - re.setLegalshortname(j.read("$.legalshortname.value")); - re.setCountry(asQualifier(j.read("$.country"))); - re.setWebsiteurl(j.read("$.websiteurl.value")); + Organization o = (Organization) entity; + + re.setLegalname(getValue(o.getLegalname())); + re.setLegalshortname(getValue(o.getLegalshortname())); + re.setCountry(o.getCountry()); + re.setWebsiteurl(getValue(o.getWebsiteurl())); break; case project: - re.setProjectTitle(j.read("$.title.value")); - re.setCode(j.read("$.code.value")); - re.setAcronym(j.read("$.acronym.value")); - re.setContracttype(asQualifier(j.read("$.contracttype"))); + Project p = (Project) entity; - JSONArray f = j.read("$.fundingtree"); + re.setProjectTitle(getValue(p.getTitle())); + re.setCode(getValue(p.getCode())); + re.setAcronym(getValue(p.getAcronym())); + re.setContracttype(p.getContracttype()); + + List> f = p.getFundingtree(); if (!f.isEmpty()) { re.setFundingtree(f.stream() - .map(s -> ((LinkedHashMap) s).get("value")) + .map(s -> s.getValue()) .collect(Collectors.toList())); } - break; } - return re; } - - private static KeyValue asKV(LinkedHashMap j) { - final KeyValue kv = new KeyValue(); - kv.setKey((String) j.get("key")); - kv.setValue((String) j.get("value")); - return kv; + private static String getValue(Field field) { + return getFieldValueWithDefault(field, ""); } - private static void mapTitle(DocumentContext j, RelatedEntity re) { - final JSONArray a = j.read("$.title"); - if (!a.isEmpty()) { - final StructuredProperty sp = asStructuredProperty((LinkedHashMap) a.get(0)); - if (StringUtils.isNotBlank(sp.getValue())) { - re.setTitle(sp); - } - } - } - - private static StructuredProperty asStructuredProperty(LinkedHashMap j) { - final StructuredProperty sp = new StructuredProperty(); - final String value = (String) j.get("value"); - if (StringUtils.isNotBlank(value)) { - sp.setValue((String) j.get("value")); - sp.setQualifier(asQualifier((LinkedHashMap) j.get("qualifier"))); - } - return sp; - } - - public static Qualifier asQualifier(LinkedHashMap j) { - final Qualifier q = new Qualifier(); - - final String classid = j.get("classid"); - if (StringUtils.isNotBlank(classid)) { - q.setClassid(classid); - } - - final String classname = j.get("classname"); - if (StringUtils.isNotBlank(classname)) { - q.setClassname(classname); - } - - final String schemeid = j.get("schemeid"); - if (StringUtils.isNotBlank(schemeid)) { - q.setSchemeid(schemeid); - } - - final String schemename = j.get("schemename"); - if (StringUtils.isNotBlank(schemename)) { - q.setSchemename(schemename); - } - return q; - } - - public static String serialize(final Object o) { - try { - return new ObjectMapper() - .setSerializationInclusion(JsonInclude.Include.NON_NULL) - .writeValueAsString(o); - } catch (JsonProcessingException e) { - throw new IllegalArgumentException("unable to serialize: " + o.toString(), e); - } + private static T getFieldValueWithDefault(Field f, T defaultValue) { + return Optional.ofNullable(f) + .filter(Objects::nonNull) + .map(x -> x.getValue()) + .orElse(defaultValue); } public static String removePrefix(final String s) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json index bbac579fe..e57df9b09 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json @@ -1,8 +1,14 @@ [ - {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, - {"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true}, - {"paramName":"o", "paramLongName":"outputPath", "paramDescription": "the path used to store temporary output files", "paramRequired": true}, - {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequence file to read", "paramRequired": true}, - {"paramName":"t", "paramLongName":"otherDsTypeId", "paramDescription": "list of datasource types to populate field datasourcetypeui", "paramRequired": true}, - {"paramName":"sp", "paramLongName":"sparkSqlShufflePartitions", "paramDescription": "Configures the number of partitions to use when shuffling data for joins or aggregations", "paramRequired": true} + { + "paramName": "in", + "paramLongName": "inputPath", + "paramDescription": "the path of the sequence file to read", + "paramRequired": true + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path used to store temporary output files", + "paramRequired": true + } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json new file mode 100644 index 000000000..043129c9f --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "irp", + "paramLongName": "inputRelationsPath", + "paramDescription": "path to input relations prepare", + "paramRequired": true + }, + { + "paramName": "op", + "paramLongName": "outputPath", + "paramDescription": "root output location for prepared relations", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase1.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase1.json new file mode 100644 index 000000000..0090716d6 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase1.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "irp", + "paramLongName": "inputRelationsPath", + "paramDescription": "path to input relations from the graph", + "paramRequired": true + }, + { + "paramName": "iep", + "paramLongName": "inputEntityPath", + "paramDescription": "path to input entity from the graph", + "paramRequired": true + }, + { + "paramName": "clazz", + "paramLongName": "graphTableClassName", + "paramDescription": "class name associated to the input entity path", + "paramRequired": true + }, + { + "paramName": "op", + "paramLongName": "outputPath", + "paramDescription": "root output location for prepared relations", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json new file mode 100644 index 000000000..cb7949d49 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "irp", + "paramLongName": "inputRelatedEntitiesPath", + "paramDescription": "path to input relations from the graph", + "paramRequired": true + }, + { + "paramName": "iep", + "paramLongName": "inputGraphPath", + "paramDescription": "root graph path", + "paramRequired": true + }, + { + "paramName": "op", + "paramLongName": "outputPath", + "paramDescription": "root output location for prepared relations", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json index 0d45e9e29..146cc9943 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json @@ -1,7 +1,7 @@ [ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, {"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true}, - {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequence file to read the XML records", "paramRequired": true}, + {"paramName":"i", "paramLongName":"inputPath", "paramDescription": "the path of the sequence file to read the XML records", "paramRequired": true}, {"paramName":"f", "paramLongName":"format", "paramDescription": "MDFormat name found in the IS profile", "paramRequired": true}, {"paramName":"b", "paramLongName":"batchSize", "paramDescription": "size of the batch of documents sent to solr", "paramRequired": false} ] diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json new file mode 100644 index 000000000..32720514e --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "in", + "paramLongName": "inputPath", + "paramDescription": "the path of the sequence file to read", + "paramRequired": true + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path used to store temporary output files", + "paramRequired": true + }, + { + "paramName": "ilu", + "paramLongName": "isLookupUrl", + "paramDescription": "URL of the isLookUp Service", + "paramRequired": true + }, + { + "paramName": "odt", + "paramLongName": "otherDsTypeId", + "paramDescription": "list of datasource types to populate field datasourcetypeui", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index 194cd43c8..516821509 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -1,6 +1,11 @@ + + inputGraphRootPath + root location of input materialized graph + + sparkDriverMemoryForJoining memory for driver process @@ -64,7 +69,7 @@ - ${wf:conf('reuseRecords') eq false} + ${wf:conf('reuseRecords') eq false} ${wf:conf('reuseRecords') eq true} @@ -74,16 +79,12 @@ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + - - - - yarn cluster - build_adjacency_lists - eu.dnetlib.dhp.oa.provision.SparkXmlRecordBuilderJob_v2 + PrepareRelations + eu.dnetlib.dhp.oa.provision.PrepareRelationsJob dhp-graph-provision-${projectVersion}.jar --executor-cores=${sparkExecutorCoresForJoining} @@ -94,12 +95,135 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - -mt yarn - -is ${isLookupUrl} - -t ${otherDsTypeId} - -s${sourcePath} - -o${outputPath} - -sp${sparkSqlShufflePartitions} + --inputRelationsPath${inputGraphRootPath}/relation + --outputPath${workingDir}/relation + + + + + + + + yarn + cluster + Join[relation.target = publication.id] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --inputRelationsPath${workingDir}/relations + --inputEntityPath${inputGraphRootPath}/publication + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication + --outputPath${workingDir}/join_partial + + + + + + + + yarn + cluster + Join[relation.target = dataset.id] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --inputRelationsPath${workingDir}/relations + --inputEntityPath${inputGraphRootPath}/dataset + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset + --outputPath${workingDir}/join_partial + + + + + + + + yarn + cluster + Join[relation.target = dataset.id] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --inputRelatedEntitiesPath${workingDir}/join_partial + --inputEntityPath${inputGraphRootPath} + --outputPath${workingDir}/join_entities + + + + + + + + yarn + cluster + build_adjacency_lists + eu.dnetlib.dhp.oa.provision.AdjacencyListBuilderJob + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --inputPath ${${workingDir}/join_entities + --outputPath${workingDir}/joined + + + + + + + + yarn + cluster + build_adjacency_lists + eu.dnetlib.dhp.oa.provision.XmlConverterJob + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --inputPath${${workingDir}/joined + --outputPath${workingDir}/xml + --isLookupUrl${isLookupUrl} + --otherDsTypeId${otherDsTypeId} @@ -122,9 +246,8 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - -mt yarn - -is ${isLookupUrl} - --sourcePath${outputPath}/xml + --isLookupUrl ${isLookupUrl} + --inputPath${workingDir}/xml --format${format} --batchSize${batchSize}