diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml index 1268afa3a..d224ebc9f 100644 --- a/dhp-common/pom.xml +++ b/dhp-common/pom.xml @@ -13,6 +13,26 @@ jar + + + eu.dnetlib.dhp + dhp-schemas + ${project.version} + + + + org.apache.hadoop + hadoop-common + + + org.apache.spark + spark-core_2.11 + + + org.apache.spark + spark-sql_2.11 + + commons-cli commons-cli diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java new file mode 100644 index 000000000..d78520f55 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java @@ -0,0 +1,56 @@ +package eu.dnetlib.dhp.common; + +import java.io.Serializable; +import java.util.function.Supplier; + +/** + * Provides serializable and throwing extensions to standard functional interfaces. + */ +public class FunctionalInterfaceSupport { + + private FunctionalInterfaceSupport() { + } + + /** + * Serializable supplier of any kind of objects. To be used withing spark processing pipelines when supplying + * functions externally. + * + * @param + */ + @FunctionalInterface + public interface SerializableSupplier extends Supplier, Serializable { + } + + /** + * Extension of consumer accepting functions throwing an exception. + * + * @param + * @param + */ + @FunctionalInterface + public interface ThrowingConsumer { + void accept(T t) throws E; + } + + /** + * Extension of supplier accepting functions throwing an exception. + * + * @param + * @param + */ + @FunctionalInterface + public interface ThrowingSupplier { + T get() throws E; + } + + /** + * Extension of runnable accepting functions throwing an exception. + * + * @param + */ + @FunctionalInterface + public interface ThrowingRunnable { + void run() throws E; + } + +} \ No newline at end of file diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java new file mode 100644 index 000000000..05beaa51e --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java @@ -0,0 +1,57 @@ +package eu.dnetlib.dhp.common; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static eu.dnetlib.dhp.common.ThrowingSupport.rethrowAsRuntimeException; + +/** + * HDFS utility methods. + */ +public class HdfsSupport { + private static final Logger logger = LoggerFactory.getLogger(HdfsSupport.class); + + private HdfsSupport() { + } + + /** + * Removes a path (file or dir) from HDFS. + * + * @param path Path to be removed + * @param configuration Configuration of hadoop env + */ + public static void remove(String path, Configuration configuration) { + logger.info("Removing path: {}", path); + rethrowAsRuntimeException(() -> { + Path f = new Path(path); + FileSystem fileSystem = FileSystem.get(configuration); + if (fileSystem.exists(f)) { + fileSystem.delete(f, true); + } + }); + } + + /** + * Lists hadoop files located below path or alternatively lists subdirs under path. + * + * @param path Path to be listed for hadoop files + * @param configuration Configuration of hadoop env + * @return List with string locations of hadoop files + */ + public static List listFiles(String path, Configuration configuration) { + logger.info("Listing files in path: {}", path); + return rethrowAsRuntimeException(() -> Arrays + .stream(FileSystem.get(configuration).listStatus(new Path(path))) + .filter(FileStatus::isDirectory) + .map(x -> x.getPath().toString()) + .collect(Collectors.toList())); + } +} diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/SparkSessionSupport.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/SparkSessionSupport.java new file mode 100644 index 000000000..f42ee1c58 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/SparkSessionSupport.java @@ -0,0 +1,57 @@ +package eu.dnetlib.dhp.common; + +import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.ThrowingConsumer; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; + +import java.util.Objects; +import java.util.function.Function; + +/** + * SparkSession utility methods. + */ +public class SparkSessionSupport { + + private SparkSessionSupport() { + } + + /** + * Runs a given function using SparkSession created using default builder and supplied SparkConf. Stops SparkSession + * when SparkSession is managed. Allows to reuse SparkSession created externally. + * + * @param conf SparkConf instance + * @param isSparkSessionManaged When true will stop SparkSession + * @param fn Consumer to be applied to constructed SparkSession + */ + public static void runWithSparkSession(SparkConf conf, + Boolean isSparkSessionManaged, + ThrowingConsumer fn) { + runWithSparkSession(c -> SparkSession.builder().config(c).getOrCreate(), conf, isSparkSessionManaged, fn); + } + + /** + * Runs a given function using SparkSession created using supplied builder and supplied SparkConf. Stops SparkSession + * when SparkSession is managed. Allows to reuse SparkSession created externally. + * + * @param sparkSessionBuilder Builder of SparkSession + * @param conf SparkConf instance + * @param isSparkSessionManaged When true will stop SparkSession + * @param fn Consumer to be applied to constructed SparkSession + */ + public static void runWithSparkSession(Function sparkSessionBuilder, + SparkConf conf, + Boolean isSparkSessionManaged, + ThrowingConsumer fn) { + SparkSession spark = null; + try { + spark = sparkSessionBuilder.apply(conf); + fn.accept(spark); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + if (Objects.nonNull(spark) && isSparkSessionManaged) { + spark.stop(); + } + } + } +} \ No newline at end of file diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/ThrowingSupport.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/ThrowingSupport.java new file mode 100644 index 000000000..b32803c37 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/ThrowingSupport.java @@ -0,0 +1,76 @@ +package eu.dnetlib.dhp.common; + +import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.ThrowingRunnable; +import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.ThrowingSupplier; + +/** + * Exception handling utility methods. + */ +public class ThrowingSupport { + + private ThrowingSupport() { + } + + /** + * Executes given runnable and rethrows any exceptions as RuntimeException. + * + * @param fn Runnable to be executed + * @param Type of exception thrown + */ + public static void rethrowAsRuntimeException(ThrowingRunnable fn) { + try { + fn.run(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Executes given runnable and rethrows any exceptions as RuntimeException with custom message. + * + * @param fn Runnable to be executed + * @param msg Message to be set for rethrown exception + * @param Type of exception thrown + */ + public static void rethrowAsRuntimeException(ThrowingRunnable fn, String msg) { + try { + fn.run(); + } catch (Exception e) { + throw new RuntimeException(msg, e); + } + } + + /** + * Executes given supplier and rethrows any exceptions as RuntimeException. + * + * @param fn Supplier to be executed + * @param Type of returned value + * @param Type of exception thrown + * @return Result of supplier execution + */ + public static T rethrowAsRuntimeException(ThrowingSupplier fn) { + try { + return fn.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Executes given supplier and rethrows any exceptions as RuntimeException with custom message. + * + * @param fn Supplier to be executed + * @param msg Message to be set for rethrown exception + * @param Type of returned value + * @param Type of exception thrown + * @return Result of supplier execution + */ + public static T rethrowAsRuntimeException(ThrowingSupplier fn, String msg) { + try { + return fn.get(); + } catch (Exception e) { + throw new RuntimeException(msg, e); + } + } + +} \ No newline at end of file diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/common/HdfsSupportTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/common/HdfsSupportTest.java new file mode 100644 index 000000000..f1e790ee7 --- /dev/null +++ b/dhp-common/src/test/java/eu/dnetlib/dhp/common/HdfsSupportTest.java @@ -0,0 +1,78 @@ +package eu.dnetlib.dhp.common; + +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.*; + +public class HdfsSupportTest { + + @Nested + class Remove { + + @Test + public void shouldThrowARuntimeExceptionOnError() { + // when + assertThrows(RuntimeException.class, () -> + HdfsSupport.remove(null, new Configuration())); + } + + @Test + public void shouldRemoveADirFromHDFS(@TempDir Path tempDir) { + // when + HdfsSupport.remove(tempDir.toString(), new Configuration()); + + // then + assertFalse(Files.exists(tempDir)); + } + + @Test + public void shouldRemoveAFileFromHDFS(@TempDir Path tempDir) throws IOException { + // given + Path file = Files.createTempFile(tempDir, "p", "s"); + + // when + HdfsSupport.remove(file.toString(), new Configuration()); + + // then + assertFalse(Files.exists(file)); + } + } + + @Nested + class ListFiles { + + @Test + public void shouldThrowARuntimeExceptionOnError() { + // when + assertThrows(RuntimeException.class, () -> + HdfsSupport.listFiles(null, new Configuration())); + } + + @Test + public void shouldListFilesLocatedInPath(@TempDir Path tempDir) throws IOException { + Path subDir1 = Files.createTempDirectory(tempDir, "list_me"); + Path subDir2 = Files.createTempDirectory(tempDir, "list_me"); + + // when + List paths = HdfsSupport.listFiles(tempDir.toString(), new Configuration()); + + // then + assertEquals(2, paths.size()); + List expecteds = Arrays.stream(new String[]{subDir1.toString(), subDir2.toString()}) + .sorted().collect(Collectors.toList()); + List actuals = paths.stream().sorted().collect(Collectors.toList()); + assertTrue(actuals.get(0).contains(expecteds.get(0))); + assertTrue(actuals.get(1).contains(expecteds.get(1))); + } + } +} \ No newline at end of file diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/common/ModelSupportTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/common/ModelSupportTest.java new file mode 100644 index 000000000..bfed019e9 --- /dev/null +++ b/dhp-common/src/test/java/eu/dnetlib/dhp/common/ModelSupportTest.java @@ -0,0 +1,36 @@ +package eu.dnetlib.dhp.common; + +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.Result; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ModelSupportTest { + + @Nested + class IsSubClass { + + @Test + public void shouldReturnFalseWhenSubClassDoesNotExtendSuperClass() { + // when + Boolean result = ModelSupport.isSubClass(Relation.class, OafEntity.class); + + // then + assertFalse(result); + } + + @Test + public void shouldReturnTrueWhenSubClassExtendsSuperClass() { + // when + Boolean result = ModelSupport.isSubClass(Result.class, OafEntity.class); + + // then + assertTrue(result); + } + } +} \ No newline at end of file diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/common/SparkSessionSupportTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/common/SparkSessionSupportTest.java new file mode 100644 index 000000000..bc2dce3cf --- /dev/null +++ b/dhp-common/src/test/java/eu/dnetlib/dhp/common/SparkSessionSupportTest.java @@ -0,0 +1,54 @@ +package eu.dnetlib.dhp.common; + +import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.ThrowingConsumer; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.util.function.Function; + +import static org.mockito.Mockito.*; + +public class SparkSessionSupportTest { + + @Nested + class RunWithSparkSession { + + @Test + public void shouldExecuteFunctionAndNotStopSparkSessionWhenSparkSessionIsNotManaged() throws Exception { + // given + SparkSession spark = mock(SparkSession.class); + SparkConf conf = mock(SparkConf.class); + Function sparkSessionBuilder = mock(Function.class); + when(sparkSessionBuilder.apply(conf)).thenReturn(spark); + ThrowingConsumer fn = mock(ThrowingConsumer.class); + + // when + SparkSessionSupport.runWithSparkSession(sparkSessionBuilder, conf, false, fn); + + // then + verify(sparkSessionBuilder).apply(conf); + verify(fn).accept(spark); + verify(spark, never()).stop(); + } + + @Test + public void shouldExecuteFunctionAndStopSparkSessionWhenSparkSessionIsManaged() throws Exception { + // given + SparkSession spark = mock(SparkSession.class); + SparkConf conf = mock(SparkConf.class); + Function sparkSessionBuilder = mock(Function.class); + when(sparkSessionBuilder.apply(conf)).thenReturn(spark); + ThrowingConsumer fn = mock(ThrowingConsumer.class); + + // when + SparkSessionSupport.runWithSparkSession(sparkSessionBuilder, conf, true, fn); + + // then + verify(sparkSessionBuilder).apply(conf); + verify(fn).accept(spark); + verify(spark, times(1)).stop(); + } + } +} \ No newline at end of file diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java new file mode 100644 index 000000000..3c774aa38 --- /dev/null +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java @@ -0,0 +1,51 @@ +package eu.dnetlib.dhp.schema.common; + +import eu.dnetlib.dhp.schema.oaf.Oaf; + +/** + * Inheritance utility methods. + */ +public class ModelSupport { + + private ModelSupport() { + } + + /** + * Checks subclass-superclass relationship. + * + * @param subClazzObject Subclass object instance + * @param superClazzObject Superclass object instance + * @param Subclass type + * @param Superclass type + * @return True if X is a subclass of Y + */ + public static Boolean isSubClass(X subClazzObject, Y superClazzObject) { + return isSubClass(subClazzObject.getClass(), superClazzObject.getClass()); + } + + /** + * Checks subclass-superclass relationship. + * + * @param subClazzObject Subclass object instance + * @param superClazz Superclass class + * @param Subclass type + * @param Superclass type + * @return True if X is a subclass of Y + */ + public static Boolean isSubClass(X subClazzObject, Class superClazz) { + return isSubClass(subClazzObject.getClass(), superClazz); + } + + /** + * Checks subclass-superclass relationship. + * + * @param subClazz Subclass class + * @param superClazz Superclass class + * @param Subclass type + * @param Superclass type + * @return True if X is a subclass of Y + */ + public static Boolean isSubClass(Class subClazz, Class superClazz) { + return superClazz.isAssignableFrom(subClazz); + } +} \ No newline at end of file diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java index 768fea19f..6871c0197 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java @@ -93,8 +93,7 @@ public class Relation extends Oaf { subRelType.equals(relation.subRelType) && relClass.equals(relation.relClass) && source.equals(relation.source) && - target.equals(relation.target) && - Objects.equals(collectedFrom, relation.collectedFrom); + target.equals(relation.target); } @Override diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java new file mode 100644 index 000000000..2cc52fb62 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java @@ -0,0 +1,123 @@ +package eu.dnetlib.dhp.oa.provision; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; +import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; +import eu.dnetlib.dhp.oa.provision.model.Tuple2; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.getKryoClasses; + +/** + * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. + * The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, + * and all the possible relationships (similarity links produced by the Dedup process are excluded). + * + * The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and again + * by E, finally grouped by E.id; + * + * The workflow is organized in different parts aimed to to reduce the complexity of the operation + * 1) PrepareRelationsJob: + * only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity + * can be linked at most to 100 other objects + * + * 2) JoinRelationEntityByTargetJob: + * (phase 1): prepare tuples [relation - target entity] (R - T): + * for each entity type E_i + * map E_i as RelatedEntity T_i to simplify the model and extracting only the necessary information + * join (R.target = T_i.id) + * save the tuples (R_i, T_i) + * (phase 2): + * create the union of all the entity types E, hash by id + * read the tuples (R, T), hash by R.source + * join E.id = (R, T).source, where E becomes the Source Entity S + * save the tuples (S, R, T) + * + * 3) AdjacencyListBuilderJob: + * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mapping the result as JoinedEntity + * + * 4) XmlConverterJob: + * convert the JoinedEntities as XML records + */ +public class AdjacencyListBuilderJob { + + private static final Logger log = LoggerFactory.getLogger(AdjacencyListBuilderJob.class); + + public static final int MAX_LINKS = 100; + + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils.toString( + AdjacencyListBuilderJob.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json"))); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("inputPath"); + log.info("inputPath: {}", inputPath); + + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(getKryoClasses()); + + runWithSparkSession(conf, isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + createAdjacencyLists(spark, inputPath, outputPath); + }); + } + + private static void createAdjacencyLists(SparkSession spark, String inputPath, String outputPath) { + + log.info("Reading joined entities from: {}", inputPath); + spark.read() + .load(inputPath) + .as(Encoders.bean(EntityRelEntity.class)) + .groupByKey((MapFunction) value -> value.getEntity().getId(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (key, values) -> { + JoinedEntity j = new JoinedEntity(); + List links = new ArrayList<>(); + while (values.hasNext() && links.size() < MAX_LINKS) { + EntityRelEntity curr = values.next(); + if (j.getEntity() == null) { + j.setEntity(curr.getEntity()); + } + links.add(new Tuple2(curr.getRelation(), curr.getTarget())); + } + j.setLinks(links); + return j; + }, Encoders.bean(JoinedEntity.class)) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); + } + + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java new file mode 100644 index 000000000..7d3555b6c --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java @@ -0,0 +1,156 @@ +package eu.dnetlib.dhp.oa.provision; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; +import eu.dnetlib.dhp.oa.provision.model.RelatedEntity; +import eu.dnetlib.dhp.oa.provision.model.SortableRelation; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*; + +/** + * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. + * The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, + * and all the possible relationships (similarity links produced by the Dedup process are excluded). + * + * The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and again + * by E, finally grouped by E.id; + * + * The workflow is organized in different parts aimed to to reduce the complexity of the operation + * 1) PrepareRelationsJob: + * only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity + * can be linked at most to 100 other objects + * + * 2) JoinRelationEntityByTargetJob: + * (phase 1): prepare tuples [relation - target entity] (R - T): + * for each entity type E_i + * map E_i as RelatedEntity T_i to simplify the model and extracting only the necessary information + * join (R.target = T_i.id) + * save the tuples (R_i, T_i) + * (phase 2): + * create the union of all the entity types E, hash by id + * read the tuples (R, T), hash by R.source + * join E.id = (R, T).source, where E becomes the Source Entity S + * save the tuples (S, R, T) + * + * 3) AdjacencyListBuilderJob: + * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mapping the result as JoinedEntity + * + * 4) XmlConverterJob: + * convert the JoinedEntities as XML records + */ +public class CreateRelatedEntitiesJob_phase1 { + + private static final Logger log = LoggerFactory.getLogger(CreateRelatedEntitiesJob_phase1.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils.toString( + PrepareRelationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase1.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputRelationsPath = parser.get("inputRelationsPath"); + log.info("inputRelationsPath: {}", inputRelationsPath); + + String inputEntityPath = parser.get("inputEntityPath"); + log.info("inputEntityPath: {}", inputEntityPath); + + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + String graphTableClassName = parser.get("graphTableClassName"); + log.info("graphTableClassName: {}", graphTableClassName); + + Class entityClazz = (Class) Class.forName(graphTableClassName); + + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(getKryoClasses()); + + runWithSparkSession(conf, isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + joinRelationEntity(spark, inputRelationsPath, inputEntityPath, entityClazz, outputPath); + }); + } + + private static void joinRelationEntity(SparkSession spark, String inputRelationsPath, String inputEntityPath, Class entityClazz, String outputPath) { + + Dataset> relsByTarget = readPathRelation(spark, inputRelationsPath) + .filter((FilterFunction) value -> value.getDataInfo().getDeletedbyinference() == false) + .map((MapFunction>) r -> new Tuple2<>(r.getTarget(), r), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(SortableRelation.class))) + .cache(); + + Dataset> entities = readPathEntity(spark, inputEntityPath, entityClazz) + .map((MapFunction) value -> asRelatedEntity(value, entityClazz), Encoders.bean(RelatedEntity.class)) + .map((MapFunction>) e -> new Tuple2<>(e.getId(), e), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class))) + .cache(); + + relsByTarget + .joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner") + .map((MapFunction, Tuple2>, EntityRelEntity>) + t -> new EntityRelEntity(t._1()._2(), t._2()._2()), + Encoders.bean(EntityRelEntity.class)) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath + "/" + EntityType.fromClass(entityClazz)); + } + + private static Dataset readPathEntity(SparkSession spark, String inputEntityPath, Class entityClazz) { + + log.info("Reading Graph table from: {}", inputEntityPath); + return spark + .read() + .textFile(inputEntityPath) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), Encoders.bean(entityClazz)); + } + + /** + * Reads a Dataset of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text file, + * + * @param spark + * @param relationPath + * @return the Dataset containing all the relationships + */ + private static Dataset readPathRelation(SparkSession spark, final String relationPath) { + + log.info("Reading relations from: {}", relationPath); + return spark.read() + .load(relationPath) + .as(Encoders.bean(SortableRelation.class)); + } + + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java new file mode 100644 index 000000000..2b5c627b6 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java @@ -0,0 +1,194 @@ +package eu.dnetlib.dhp.oa.provision; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; +import eu.dnetlib.dhp.oa.provision.model.TypedRow; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import java.util.List; +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.getKryoClasses; + +/** + * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. + * The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, + * and all the possible relationships (similarity links produced by the Dedup process are excluded). + * + * The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and again + * by E, finally grouped by E.id; + * + * The workflow is organized in different parts aimed to to reduce the complexity of the operation + * 1) PrepareRelationsJob: + * only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity + * can be linked at most to 100 other objects + * + * 2) JoinRelationEntityByTargetJob: + * (phase 1): prepare tuples [relation - target entity] (R - T): + * for each entity type E_i + * map E_i as RelatedEntity T_i to simplify the model and extracting only the necessary information + * join (R.target = T_i.id) + * save the tuples (R_i, T_i) + * (phase 2): + * create the union of all the entity types E, hash by id + * read the tuples (R, T), hash by R.source + * join E.id = (R, T).source, where E becomes the Source Entity S + * save the tuples (S, R, T) + * + * 3) AdjacencyListBuilderJob: + * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mapping the result as JoinedEntity + * + * 4) XmlConverterJob: + * convert the JoinedEntities as XML records + */ +public class CreateRelatedEntitiesJob_phase2 { + + private static final Logger log = LoggerFactory.getLogger(CreateRelatedEntitiesJob_phase2.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils.toString( + PrepareRelationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputRelatedEntitiesPath = parser.get("inputRelatedEntitiesPath"); + log.info("inputRelatedEntitiesPath: {}", inputRelatedEntitiesPath); + + String inputGraphRootPath = parser.get("inputGraphRootPath"); + log.info("inputGraphRootPath: {}", inputGraphRootPath); + + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + int numPartitions = Integer.parseInt(parser.get("numPartitions")); + log.info("numPartitions: {}", numPartitions); + + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(getKryoClasses()); + + runWithSparkSession(conf, isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + joinAllEntities(spark, inputRelatedEntitiesPath, inputGraphRootPath, outputPath, numPartitions); + }); + } + + private static void joinAllEntities(SparkSession spark, String inputRelatedEntitiesPath, String inputGraphRootPath, String outputPath, int numPartitions) { + + Dataset> entities = readAllEntities(spark, inputGraphRootPath, numPartitions); + Dataset> relsBySource = readRelatedEntities(spark, inputRelatedEntitiesPath); + + entities + .joinWith(relsBySource, entities.col("_1").equalTo(relsBySource.col("_1")), "left_outer") + .map((MapFunction, Tuple2>, EntityRelEntity>) value -> { + EntityRelEntity re = new EntityRelEntity(); + re.setEntity(value._1()._2()); + Optional related = Optional.ofNullable(value._2()).map(Tuple2::_2); + if (related.isPresent()) { + re.setRelation(related.get().getRelation()); + re.setTarget(related.get().getTarget()); + } + return re; + }, Encoders.bean(EntityRelEntity.class)) + .repartition(numPartitions) + .filter((FilterFunction) value -> value.getEntity() != null && StringUtils.isNotBlank(value.getEntity().getId())) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); + } + + private static Dataset> readAllEntities(SparkSession spark, String inputGraphPath, int numPartitions) { + Dataset publication = readPathEntity(spark, inputGraphPath + "/publication", Publication.class); + Dataset dataset = readPathEntity(spark, inputGraphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); + Dataset other = readPathEntity(spark, inputGraphPath + "/otherresearchproduct", OtherResearchProduct.class); + Dataset software = readPathEntity(spark, inputGraphPath + "/software", Software.class); + Dataset datasource = readPathEntity(spark, inputGraphPath + "/datasource", Datasource.class); + Dataset organization = readPathEntity(spark, inputGraphPath + "/organization", Organization.class); + Dataset project = readPathEntity(spark, inputGraphPath + "/project", Project.class); + + return publication + .union(dataset) + .union(other) + .union(software) + .union(datasource) + .union(organization) + .union(project) + .map((MapFunction>) + value -> new Tuple2<>(value.getId(), value), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class))) + .repartition(numPartitions); + } + + private static Dataset> readRelatedEntities(SparkSession spark, String inputRelatedEntitiesPath) { + + log.info("Reading related entities from: {}", inputRelatedEntitiesPath); + + final List paths = HdfsSupport.listFiles(inputRelatedEntitiesPath, spark.sparkContext().hadoopConfiguration()); + + log.info("Found paths: {}", String.join(",", paths)); + + return spark.read() + .load(toSeq(paths)) + .as(Encoders.bean(EntityRelEntity.class)) + .map((MapFunction>) + value -> new Tuple2<>(value.getRelation().getSource(), value), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class))); + } + + private static Dataset readPathEntity(SparkSession spark, String inputEntityPath, Class entityClazz) { + + log.info("Reading Graph table from: {}", inputEntityPath); + return spark + .read() + .textFile(inputEntityPath) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), Encoders.bean(entityClazz)) + .map((MapFunction) value -> getTypedRow(StringUtils.substringAfterLast(inputEntityPath, "/"), value), Encoders.bean(TypedRow.class)); + } + + private static TypedRow getTypedRow(String type, OafEntity entity) throws JsonProcessingException { + TypedRow t = new TypedRow(); + t.setType(type); + t.setDeleted(entity.getDataInfo().getDeletedbyinference()); + t.setId(entity.getId()); + t.setOaf(OBJECT_MAPPER.writeValueAsString(entity)); + return t; + } + + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + + private static Seq toSeq(List list) { + return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq(); + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner.java deleted file mode 100644 index def757da3..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner.java +++ /dev/null @@ -1,291 +0,0 @@ -package eu.dnetlib.dhp.oa.provision; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import com.jayway.jsonpath.DocumentContext; -import com.jayway.jsonpath.JsonPath; -import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; -import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils; -import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner; -import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; -import eu.dnetlib.dhp.oa.provision.model.*; -import eu.dnetlib.dhp.schema.oaf.*; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.apache.spark.SparkContext; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.util.LongAccumulator; -import scala.Tuple2; - -import java.io.IOException; -import java.io.Serializable; -import java.util.Map; - -import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.asRelatedEntity; - -/** - * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. - * The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, - * and all the possible relationships (similarity links produced by the Dedup process are excluded). - * - * The operation is implemented creating the union between the entity types (E), joined by the relationships (R), and again - * by E, finally grouped by E.id; - * - * Different manipulations of the E and R sets are introduced to reduce the complexity of the operation - * 1) treat the object payload as string, extracting only the necessary information beforehand using json path, - * it seems that deserializing it with jackson's object mapper has higher memory footprint. - * - * 2) only consider rels that are not virtually deleted ($.dataInfo.deletedbyinference == false) - * 3) we only need a subset of fields from the related entities, so we introduce a distinction between E_source = S - * and E_target = T. Objects in T are heavily pruned by all the unnecessary information - * - * 4) perform the join as (((T.id join R.target) union S) groupby S.id) yield S -> [ ] - */ -public class GraphJoiner implements Serializable { - - private Map accumulators = Maps.newHashMap(); - - public static final int MAX_RELS = 100; - - public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd"; - - private SparkSession spark; - - private ContextMapper contextMapper; - - private String inputPath; - - private String outPath; - - private String otherDsTypeId; - - public GraphJoiner(SparkSession spark, ContextMapper contextMapper, String otherDsTypeId, String inputPath, String outPath) { - this.spark = spark; - this.contextMapper = contextMapper; - this.otherDsTypeId = otherDsTypeId; - this.inputPath = inputPath; - this.outPath = outPath; - - final SparkContext sc = spark.sparkContext(); - prepareAccumulators(sc); - } - - public GraphJoiner adjacencyLists() { - final JavaSparkContext jsc = new JavaSparkContext(getSpark().sparkContext()); - - // read each entity - JavaPairRDD datasource = readPathEntity(jsc, getInputPath(), "datasource"); - JavaPairRDD organization = readPathEntity(jsc, getInputPath(), "organization"); - JavaPairRDD project = readPathEntity(jsc, getInputPath(), "project"); - JavaPairRDD dataset = readPathEntity(jsc, getInputPath(), "dataset"); - JavaPairRDD otherresearchproduct = readPathEntity(jsc, getInputPath(), "otherresearchproduct"); - JavaPairRDD software = readPathEntity(jsc, getInputPath(), "software"); - JavaPairRDD publication = readPathEntity(jsc, getInputPath(), "publication"); - - // create the union between all the entities - final String entitiesPath = getOutPath() + "/entities"; - datasource - .union(organization) - .union(project) - .union(dataset) - .union(otherresearchproduct) - .union(software) - .union(publication) - .map(e -> new EntityRelEntity().setSource(e._2())) - .map(GraphMappingUtils::serialize) - .saveAsTextFile(entitiesPath, GzipCodec.class); - - JavaPairRDD entities = jsc.textFile(entitiesPath) - .map(t -> new ObjectMapper().readValue(t, EntityRelEntity.class)) - .mapToPair(t -> new Tuple2<>(t.getSource().getSourceId(), t)); - - final String relationPath = getOutPath() + "/relation"; - // reads the relationships - final JavaPairRDD rels = readPathRelation(jsc, getInputPath()) - .filter(rel -> !rel.getDeleted()) //only consider those that are not virtually deleted - .map(p -> new EntityRelEntity().setRelation(p)) - .mapToPair(p -> new Tuple2<>(SortableRelationKey.from(p), p)); - rels - .groupByKey(new RelationPartitioner(rels.getNumPartitions())) - .map(p -> Iterables.limit(p._2(), MAX_RELS)) - .flatMap(p -> p.iterator()) - .map(s -> new ObjectMapper().writeValueAsString(s)) - .saveAsTextFile(relationPath, GzipCodec.class); - - final JavaPairRDD relation = jsc.textFile(relationPath) - .map(s -> new ObjectMapper().readValue(s, EntityRelEntity.class)) - .mapToPair(p -> new Tuple2<>(p.getRelation().getTargetId(), p)); - - final String bySourcePath = getOutPath() + "/join_by_source"; - relation - .join(entities - .filter(e -> !e._2().getSource().getDeleted()) - .mapToPair(e -> new Tuple2<>(e._1(), asRelatedEntity(e._2())))) - .map(s -> new EntityRelEntity() - .setRelation(s._2()._1().getRelation()) - .setTarget(s._2()._2().getSource())) - .map(j -> new ObjectMapper().writeValueAsString(j)) - .saveAsTextFile(bySourcePath, GzipCodec.class); - - JavaPairRDD bySource = jsc.textFile(bySourcePath) - .map(e -> getObjectMapper().readValue(e, EntityRelEntity.class)) - .mapToPair(t -> new Tuple2<>(t.getRelation().getSourceId(), t)); - - final XmlRecordFactory recordFactory = new XmlRecordFactory(accumulators, contextMapper, false, schemaLocation, otherDsTypeId); - entities - .union(bySource) - .groupByKey() // by source id - .map(l -> toJoinedEntity(l)) - .mapToPair(je -> new Tuple2<>( - new Text(je.getEntity().getId()), - new Text(recordFactory.build(je)))) - .saveAsHadoopFile(getOutPath() + "/xml", Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); - - return this; - } - - public SparkSession getSpark() { - return spark; - } - - public String getInputPath() { - return inputPath; - } - - public String getOutPath() { - return outPath; - } - - // HELPERS - - private OafEntity parseOaf(final String json, final String type, final ObjectMapper mapper) { - try { - switch (GraphMappingUtils.EntityType.valueOf(type)) { - case publication: - return mapper.readValue(json, Publication.class); - case dataset: - return mapper.readValue(json, Dataset.class); - case otherresearchproduct: - return mapper.readValue(json, OtherResearchProduct.class); - case software: - return mapper.readValue(json, Software.class); - case datasource: - return mapper.readValue(json, Datasource.class); - case organization: - return mapper.readValue(json, Organization.class); - case project: - return mapper.readValue(json, Project.class); - default: - throw new IllegalArgumentException("invalid type: " + type); - } - } catch (IOException e) { - throw new IllegalArgumentException(e); - } - } - - private JoinedEntity toJoinedEntity(Tuple2> p) { - final ObjectMapper mapper = getObjectMapper(); - final JoinedEntity j = new JoinedEntity(); - final Links links = new Links(); - for(EntityRelEntity rel : p._2()) { - if (rel.hasMainEntity() & j.getEntity() == null) { - j.setType(rel.getSource().getType()); - j.setEntity(parseOaf(rel.getSource().getOaf(), rel.getSource().getType(), mapper)); - } - if (rel.hasRelatedEntity()) { - try { - links.add( - new eu.dnetlib.dhp.oa.provision.model.Tuple2() - .setRelation(mapper.readValue(rel.getRelation().getOaf(), Relation.class)) - .setRelatedEntity(mapper.readValue(rel.getTarget().getOaf(), RelatedEntity.class))); - } catch (IOException e) { - throw new IllegalArgumentException(e); - } - } - } - j.setLinks(links); - if (j.getEntity() == null) { - throw new IllegalStateException("missing main entity on '" + p._1() + "'"); - } - return j; - } - - /** - * Reads a set of eu.dnetlib.dhp.schema.oaf.OafEntity objects from a sequence file , - * extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.model.TypedRow - * @param sc - * @param inputPath - * @param type - * @return the JavaPairRDD indexed by entity identifier - */ - private JavaPairRDD readPathEntity(final JavaSparkContext sc, final String inputPath, final String type) { - return sc.textFile(inputPath + "/" + type) - .mapToPair((PairFunction) s -> { - final DocumentContext json = JsonPath.parse(s); - final String id = json.read("$.id"); - return new Tuple2<>(id, new TypedRow() - .setSourceId(id) - .setDeleted(json.read("$.dataInfo.deletedbyinference")) - .setType(type) - .setOaf(s)); - }); - } - - /** - * Reads a set of eu.dnetlib.dhp.schema.oaf.Relation objects from a sequence file , - * extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.model.TypedRow - * @param sc - * @param inputPath - * @return the JavaRDD containing all the relationships - */ - private JavaRDD readPathRelation(final JavaSparkContext sc, final String inputPath) { - return sc.textFile(inputPath + "/relation") - .map(s -> { - final DocumentContext json = JsonPath.parse(s); - return new TypedRow() - .setSourceId(json.read("$.source")) - .setTargetId(json.read("$.target")) - .setDeleted(json.read("$.dataInfo.deletedbyinference")) - .setType("relation") - .setRelType("$.relType") - .setSubRelType("$.subRelType") - .setRelClass("$.relClass") - .setOaf(s); - }); - } - - private ObjectMapper getObjectMapper() { - return new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - } - - private void prepareAccumulators(SparkContext sc) { - accumulators.put("resultResult_similarity_isAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_isAmongTopNSimilarDocuments")); - accumulators.put("resultResult_similarity_hasAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_hasAmongTopNSimilarDocuments")); - accumulators.put("resultResult_supplement_isSupplementTo", sc.longAccumulator("resultResult_supplement_isSupplementTo")); - accumulators.put("resultResult_supplement_isSupplementedBy", sc.longAccumulator("resultResult_supplement_isSupplementedBy")); - accumulators.put("resultResult_dedup_isMergedIn", sc.longAccumulator("resultResult_dedup_isMergedIn")); - accumulators.put("resultResult_dedup_merges", sc.longAccumulator("resultResult_dedup_merges")); - - accumulators.put("resultResult_publicationDataset_isRelatedTo", sc.longAccumulator("resultResult_publicationDataset_isRelatedTo")); - accumulators.put("resultResult_relationship_isRelatedTo", sc.longAccumulator("resultResult_relationship_isRelatedTo")); - accumulators.put("resultProject_outcome_isProducedBy", sc.longAccumulator("resultProject_outcome_isProducedBy")); - accumulators.put("resultProject_outcome_produces", sc.longAccumulator("resultProject_outcome_produces")); - accumulators.put("resultOrganization_affiliation_isAuthorInstitutionOf", sc.longAccumulator("resultOrganization_affiliation_isAuthorInstitutionOf")); - - accumulators.put("resultOrganization_affiliation_hasAuthorInstitution", sc.longAccumulator("resultOrganization_affiliation_hasAuthorInstitution")); - accumulators.put("projectOrganization_participation_hasParticipant", sc.longAccumulator("projectOrganization_participation_hasParticipant")); - accumulators.put("projectOrganization_participation_isParticipant", sc.longAccumulator("projectOrganization_participation_isParticipant")); - accumulators.put("organizationOrganization_dedup_isMergedIn", sc.longAccumulator("organizationOrganization_dedup_isMergedIn")); - accumulators.put("organizationOrganization_dedup_merges", sc.longAccumulator("resultProject_outcome_produces")); - accumulators.put("datasourceOrganization_provision_isProvidedBy", sc.longAccumulator("datasourceOrganization_provision_isProvidedBy")); - accumulators.put("datasourceOrganization_provision_provides", sc.longAccumulator("datasourceOrganization_provision_provides")); - } - -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java new file mode 100644 index 000000000..caddfaf8d --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -0,0 +1,151 @@ +package eu.dnetlib.dhp.oa.provision; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.provision.model.SortableRelation; +import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.*; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +/** + * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. + * The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, + * and all the possible relationships (similarity links produced by the Dedup process are excluded). + * + * The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and again + * by E, finally grouped by E.id; + * + * The workflow is organized in different parts aimed to to reduce the complexity of the operation + * 1) PrepareRelationsJob: + * only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity + * can be linked at most to 100 other objects + * + * 2) JoinRelationEntityByTargetJob: + * (phase 1): prepare tuples [relation - target entity] (R - T): + * for each entity type E_i + * map E_i as RelatedEntity T_i to simplify the model and extracting only the necessary information + * join (R.target = T_i.id) + * save the tuples (R_i, T_i) + * (phase 2): + * create the union of all the entity types E, hash by id + * read the tuples (R, T), hash by R.source + * join E.id = (R, T).source, where E becomes the Source Entity S + * save the tuples (S, R, T) + * + * 3) AdjacencyListBuilderJob: + * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mapping the result as JoinedEntity + * + * 4) XmlConverterJob: + * convert the JoinedEntities as XML records + */ +public class PrepareRelationsJob { + + private static final Logger log = LoggerFactory.getLogger(PrepareRelationsJob.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static final int MAX_RELS = 100; + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils.toString( + PrepareRelationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputRelationsPath = parser.get("inputRelationsPath"); + log.info("inputRelationsPath: {}", inputRelationsPath); + + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + int numPartitions = Integer.parseInt(parser.get("relPartitions")); + log.info("relPartitions: {}", numPartitions); + + SparkConf conf = new SparkConf(); + + runWithSparkSession(conf, isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + prepareRelationsFromPaths(spark, inputRelationsPath, outputPath, numPartitions); + }); + } + + private static void prepareRelationsFromPaths(SparkSession spark, String inputRelationsPath, String outputPath, int numPartitions) { + readPathRelation(spark, inputRelationsPath) + .filter((FilterFunction) value -> value.getDataInfo().getDeletedbyinference() == false) + .groupByKey((MapFunction) value -> value.getSource(), Encoders.STRING()) + .flatMapGroups((FlatMapGroupsFunction) (key, values) -> Iterators.limit(values, MAX_RELS), Encoders.bean(SortableRelation.class)) + .repartition(numPartitions) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); + } + + /** + * Reads a Dataset of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text file, + * + * @param spark + * @param inputPath + * @return the Dataset containing all the relationships + */ + private static Dataset readPathRelation(SparkSession spark, final String inputPath) { + return spark.read() + .textFile(inputPath) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, SortableRelation.class), Encoders.bean(SortableRelation.class)); + } + + private static void prepareRelationsRDDFromPaths(SparkSession spark, String inputRelationsPath, String outputPath, int numPartitions) { + JavaRDD rels = readPathRelationRDD(spark, inputRelationsPath) + .repartition(numPartitions); + + RDD d = rels + .filter(rel -> !rel.getDataInfo().getDeletedbyinference()) //only consider those that are not virtually deleted + .mapToPair((PairFunction) rel -> new Tuple2<>(rel, rel)) + .groupByKey(new RelationPartitioner(rels.getNumPartitions())) + .map(p -> Iterables.limit(p._2(), MAX_RELS)) + .flatMap(p -> p.iterator()) + .rdd(); + + spark.createDataset(d, Encoders.bean(SortableRelation.class)) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); + } + + private static JavaRDD readPathRelationRDD(SparkSession spark, final String inputPath) { + JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + return sc.textFile(inputPath) + .map(s -> OBJECT_MAPPER.readValue(s, SortableRelation.class)); + } + + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlRecordBuilderJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlRecordBuilderJob.java deleted file mode 100644 index 0a898c0fc..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlRecordBuilderJob.java +++ /dev/null @@ -1,47 +0,0 @@ -package eu.dnetlib.dhp.oa.provision; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.spark.SparkConf; -import org.apache.spark.sql.SparkSession; - -public class SparkXmlRecordBuilderJob { - - public static void main(String[] args) throws Exception { - - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils.toString( - SparkXmlRecordBuilderJob.class.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json"))); - parser.parseArgument(args); - - final String master = parser.get("master"); - final SparkConf conf = new SparkConf() - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - - try(SparkSession spark = getSession(conf, master)) { - - final String inputPath = parser.get("sourcePath"); - final String outputPath = parser.get("outputPath"); - final String isLookupUrl = parser.get("isLookupUrl"); - final String otherDsTypeId = parser.get("otherDsTypeId"); - - final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration()); - - new GraphJoiner(spark, ContextMapper.fromIS(isLookupUrl), otherDsTypeId, inputPath, outputPath) - .adjacencyLists(); - } - } - - private static SparkSession getSession(SparkConf conf, String master) { - return SparkSession - .builder() - .config(conf) - .appName(SparkXmlRecordBuilderJob.class.getSimpleName()) - .master(master) - .getOrCreate(); - } - -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java new file mode 100644 index 000000000..059cb31f2 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java @@ -0,0 +1,166 @@ +package eu.dnetlib.dhp.oa.provision; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Maps; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.provision.model.*; +import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; +import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils; +import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; +import eu.dnetlib.dhp.schema.oaf.*; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.util.LongAccumulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +/** + * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. + * The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, + * and all the possible relationships (similarity links produced by the Dedup process are excluded). + * + * The workflow is organized in different parts aimed to to reduce the complexity of the operation + * 1) PrepareRelationsJob: + * only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity + * can be linked at most to 100 other objects + * + * 2) JoinRelationEntityByTargetJob: + * (phase 1): prepare tuples [relation - target entity] (R - T): + * for each entity type E_i + * map E_i as RelatedEntity T_i to simplify the model and extracting only the necessary information + * join (R.target = T_i.id) + * save the tuples (R_i, T_i) + * (phase 2): + * create the union of all the entity types E, hash by id + * read the tuples (R, T), hash by R.source + * join E.id = (R, T).source, where E becomes the Source Entity S + * save the tuples (S, R, T) + * + * 3) AdjacencyListBuilderJob: + * given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mapping the result as JoinedEntity + * + * 4) XmlConverterJob: + * convert the JoinedEntities as XML records + */ +public class XmlConverterJob { + + private static final Logger log = LoggerFactory.getLogger(XmlConverterJob.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd"; + + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils.toString( + XmlConverterJob.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json"))); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("inputPath"); + log.info("inputPath: {}", inputPath); + + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + String isLookupUrl = parser.get("isLookupUrl"); + log.info("isLookupUrl: {}", isLookupUrl); + + String otherDsTypeId = parser.get("otherDsTypeId"); + log.info("otherDsTypeId: {}", otherDsTypeId); + + SparkConf conf = new SparkConf(); + + runWithSparkSession(conf, isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + convertToXml(spark, inputPath, outputPath, ContextMapper.fromIS(isLookupUrl), otherDsTypeId); + }); + + } + + private static void convertToXml(SparkSession spark, String inputPath, String outputPath, ContextMapper contextMapper, String otherDsTypeId) { + + final XmlRecordFactory recordFactory = new XmlRecordFactory(prepareAccumulators(spark.sparkContext()), contextMapper, false, schemaLocation, otherDsTypeId); + + spark.read() + .load(inputPath) + .as(Encoders.bean(JoinedEntity.class)) + .map((MapFunction) j -> { + if (j.getLinks() != null) { + j.setLinks(j.getLinks() + .stream() + .filter(t -> t.getRelation() != null & t.getRelatedEntity() != null) + .collect(Collectors.toCollection(ArrayList::new))); + } + return j; + }, Encoders.bean(JoinedEntity.class)) + .map((MapFunction>) je -> new Tuple2<>( + je.getEntity().getId(), + recordFactory.build(je) + ), Encoders.tuple(Encoders.STRING(), Encoders.STRING())) + .javaRDD() + .mapToPair((PairFunction, Text, Text>) t -> new Tuple2<>(new Text(t._1()), new Text(t._2()))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + } + + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + + private static Map prepareAccumulators(SparkContext sc) { + Map accumulators = Maps.newHashMap(); + accumulators.put("resultResult_similarity_isAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_isAmongTopNSimilarDocuments")); + accumulators.put("resultResult_similarity_hasAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_hasAmongTopNSimilarDocuments")); + accumulators.put("resultResult_supplement_isSupplementTo", sc.longAccumulator("resultResult_supplement_isSupplementTo")); + accumulators.put("resultResult_supplement_isSupplementedBy", sc.longAccumulator("resultResult_supplement_isSupplementedBy")); + accumulators.put("resultResult_dedup_isMergedIn", sc.longAccumulator("resultResult_dedup_isMergedIn")); + accumulators.put("resultResult_dedup_merges", sc.longAccumulator("resultResult_dedup_merges")); + + accumulators.put("resultResult_publicationDataset_isRelatedTo", sc.longAccumulator("resultResult_publicationDataset_isRelatedTo")); + accumulators.put("resultResult_relationship_isRelatedTo", sc.longAccumulator("resultResult_relationship_isRelatedTo")); + accumulators.put("resultProject_outcome_isProducedBy", sc.longAccumulator("resultProject_outcome_isProducedBy")); + accumulators.put("resultProject_outcome_produces", sc.longAccumulator("resultProject_outcome_produces")); + accumulators.put("resultOrganization_affiliation_isAuthorInstitutionOf", sc.longAccumulator("resultOrganization_affiliation_isAuthorInstitutionOf")); + + accumulators.put("resultOrganization_affiliation_hasAuthorInstitution", sc.longAccumulator("resultOrganization_affiliation_hasAuthorInstitution")); + accumulators.put("projectOrganization_participation_hasParticipant", sc.longAccumulator("projectOrganization_participation_hasParticipant")); + accumulators.put("projectOrganization_participation_isParticipant", sc.longAccumulator("projectOrganization_participation_isParticipant")); + accumulators.put("organizationOrganization_dedup_isMergedIn", sc.longAccumulator("organizationOrganization_dedup_isMergedIn")); + accumulators.put("organizationOrganization_dedup_merges", sc.longAccumulator("resultProject_outcome_produces")); + accumulators.put("datasourceOrganization_provision_isProvidedBy", sc.longAccumulator("datasourceOrganization_provision_isProvidedBy")); + accumulators.put("datasourceOrganization_provision_provides", sc.longAccumulator("datasourceOrganization_provision_provides")); + + return accumulators; + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlIndexingJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java similarity index 75% rename from dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlIndexingJob.java rename to dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java index cafbc8653..84538c924 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlIndexingJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java @@ -10,14 +10,13 @@ import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Text; import org.apache.solr.common.SolrInputDocument; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.xml.transform.Transformer; import javax.xml.transform.TransformerException; @@ -28,65 +27,79 @@ import java.io.StringReader; import java.io.StringWriter; import java.text.SimpleDateFormat; import java.util.Date; +import java.util.Optional; -public class SparkXmlIndexingJob { +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - private static final Log log = LogFactory.getLog(SparkXmlIndexingJob.class); +public class XmlIndexingJob { + + private static final Logger log = LoggerFactory.getLogger(XmlIndexingJob.class); private static final Integer DEFAULT_BATCH_SIZE = 1000; private static final String LAYOUT = "index"; + private static final String INTERPRETATION = "openaire"; + private static final String SEPARATOR = "-"; + public static final String DATE_FORMAT = "yyyy-MM-dd'T'hh:mm:ss'Z'"; public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils.toString( - SparkXmlIndexingJob.class.getResourceAsStream( + XmlIndexingJob.class.getResourceAsStream( "/eu/dnetlib/dhp/oa/provision/input_params_update_index.json"))); parser.parseArgument(args); - final String inputPath = parser.get("sourcePath"); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("inputPath"); + log.info("inputPath: {}", inputPath); + final String isLookupUrl = parser.get("isLookupUrl"); + log.info("isLookupUrl: {}", isLookupUrl); + final String format = parser.get("format"); + log.info("format: {}", format); + final Integer batchSize = parser.getObjectMap().containsKey("batchSize") ? Integer.valueOf(parser.get("batchSize")) : DEFAULT_BATCH_SIZE; + log.info("batchSize: {}", batchSize); final ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl); final String fields = getLayoutSource(isLookup, format); + log.info("fields: {}", fields); + final String xslt = getLayoutTransformer(isLookup); final String dsId = getDsId(format, isLookup); + log.info("dsId: {}", dsId); + final String zkHost = getZkHost(isLookup); + log.info("zkHost: {}", zkHost); + final String version = getRecordDatestamp(); final String indexRecordXslt = getLayoutTransformer(format, fields, xslt); + log.info("indexRecordTransformer {}", indexRecordXslt); - log.info("indexRecordTransformer: " + indexRecordXslt); + final SparkConf conf = new SparkConf(); - final String master = parser.get("master"); - final SparkConf conf = new SparkConf() - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + runWithSparkSession(conf, isSparkSessionManaged, + spark -> { + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - try(SparkSession spark = getSession(conf, master)) { + RDD docs = sc.sequenceFile(inputPath, Text.class, Text.class) + .map(t -> t._2().toString()) + .map(s -> toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), s)) + .map(s -> new StreamingInputDocumentFactory(version, dsId).parseDocument(s)) + .rdd(); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - - RDD docs = sc.sequenceFile(inputPath, Text.class, Text.class) - .map(t -> t._2().toString()) - .map(s -> toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), s)) - .map(s -> new StreamingInputDocumentFactory(version, dsId).parseDocument(s)) - .rdd(); - - SolrSupport.indexDocs(zkHost, format + "-" + LAYOUT + "-openaire", batchSize, docs); - } - } - - private static SparkSession getSession(SparkConf conf, String master) { - return SparkSession - .builder() - .config(conf) - .appName(SparkXmlRecordBuilderJob.class.getSimpleName()) - .master(master) - .getOrCreate(); + final String collection = format + SEPARATOR + LAYOUT + SEPARATOR + INTERPRETATION; + SolrSupport.indexDocs(zkHost, collection, batchSize, docs); + }); } private static String toIndexRecord(Transformer tr, final String record) { @@ -95,7 +108,7 @@ public class SparkXmlIndexingJob { tr.transform(new StreamSource(new StringReader(record)), res); return res.getWriter().toString(); } catch (Throwable e) { - System.out.println("XPathException on record:\n" + record); + log.error("XPathException on record: \n {}", record, e); throw new IllegalArgumentException(e); } } @@ -127,7 +140,7 @@ public class SparkXmlIndexingJob { * @return the parsed date */ public static String getRecordDatestamp() { - return new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss'Z'").format(new Date()); + return new SimpleDateFormat(DATE_FORMAT).format(new Date()); } /** diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java index ba89eaa38..e1ca8e316 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java @@ -1,54 +1,64 @@ package eu.dnetlib.dhp.oa.provision.model; +import com.google.common.base.Objects; + import java.io.Serializable; public class EntityRelEntity implements Serializable { - private TypedRow source; - private TypedRow relation; - private TypedRow target; + private TypedRow entity; + private SortableRelation relation; + private RelatedEntity target; public EntityRelEntity() { } - public EntityRelEntity(TypedRow source) { - this.source = source; + public EntityRelEntity(SortableRelation relation, RelatedEntity target) { + this(null, relation, target); } - //helpers - public Boolean hasMainEntity() { - return getSource() != null & getRelation() == null & getTarget() == null; + public EntityRelEntity(TypedRow entity, SortableRelation relation, RelatedEntity target) { + this.entity = entity; + this.relation = relation; + this.target = target; } - public Boolean hasRelatedEntity() { - return getSource() == null & getRelation() != null & getTarget() != null; + public TypedRow getEntity() { + return entity; } - - public TypedRow getSource() { - return source; + public void setEntity(TypedRow entity) { + this.entity = entity; } - public EntityRelEntity setSource(TypedRow source) { - this.source = source; - return this; - } - - public TypedRow getRelation() { + public SortableRelation getRelation() { return relation; } - public EntityRelEntity setRelation(TypedRow relation) { + public void setRelation(SortableRelation relation) { this.relation = relation; - return this; } - public TypedRow getTarget() { + public RelatedEntity getTarget() { return target; } - public EntityRelEntity setTarget(TypedRow target) { + public void setTarget(RelatedEntity target) { this.target = target; - return this; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + EntityRelEntity that = (EntityRelEntity) o; + return Objects.equal(entity, that.entity) && + Objects.equal(relation, that.relation) && + Objects.equal(target, that.target); + } + + @Override + public int hashCode() { + return Objects.hashCode(entity, relation, target); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java index 80b15a4d6..b6e97a503 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/JoinedEntity.java @@ -1,41 +1,30 @@ package eu.dnetlib.dhp.oa.provision.model; -import eu.dnetlib.dhp.schema.oaf.OafEntity; - import java.io.Serializable; +import java.util.List; public class JoinedEntity implements Serializable { - private String type; + private TypedRow entity; - private OafEntity entity; + private List links; - private Links links; - - public String getType() { - return type; + public JoinedEntity() { } - public JoinedEntity setType(String type) { - this.type = type; - return this; - } - - public OafEntity getEntity() { + public TypedRow getEntity() { return entity; } - public JoinedEntity setEntity(OafEntity entity) { + public void setEntity(TypedRow entity) { this.entity = entity; - return this; } - public Links getLinks() { + public List getLinks() { return links; } - public JoinedEntity setLinks(Links links) { + public void setLinks(List links) { this.links = links; - return this; } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Links.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Links.java deleted file mode 100644 index 0cb4617ec..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Links.java +++ /dev/null @@ -1,6 +0,0 @@ -package eu.dnetlib.dhp.oa.provision.model; - -import java.util.ArrayList; - -public class Links extends ArrayList { -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntity.java index 75e9045e8..011d9276d 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntity.java @@ -1,5 +1,6 @@ package eu.dnetlib.dhp.oa.provision.model; +import com.google.common.base.Objects; import eu.dnetlib.dhp.schema.oaf.Instance; import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.Qualifier; @@ -49,207 +50,218 @@ public class RelatedEntity implements Serializable { return id; } - public RelatedEntity setId(String id) { + public void setId(String id) { this.id = id; - return this; - } - - public StructuredProperty getTitle() { - return title; - } - - public RelatedEntity setTitle(StructuredProperty title) { - this.title = title; - return this; - } - - public String getDateofacceptance() { - return dateofacceptance; - } - - public RelatedEntity setDateofacceptance(String dateofacceptance) { - this.dateofacceptance = dateofacceptance; - return this; - } - - public String getPublisher() { - return publisher; - } - - public RelatedEntity setPublisher(String publisher) { - this.publisher = publisher; - return this; - } - - public List getPid() { - return pid; - } - - public RelatedEntity setPid(List pid) { - this.pid = pid; - return this; - } - - public String getCodeRepositoryUrl() { - return codeRepositoryUrl; - } - - public RelatedEntity setCodeRepositoryUrl(String codeRepositoryUrl) { - this.codeRepositoryUrl = codeRepositoryUrl; - return this; - } - - public Qualifier getResulttype() { - return resulttype; - } - - public RelatedEntity setResulttype(Qualifier resulttype) { - this.resulttype = resulttype; - return this; - } - - public List getCollectedfrom() { - return collectedfrom; - } - - public RelatedEntity setCollectedfrom(List collectedfrom) { - this.collectedfrom = collectedfrom; - return this; - } - - public List getInstances() { - return instances; - } - - public RelatedEntity setInstances(List instances) { - this.instances = instances; - return this; - } - - public String getOfficialname() { - return officialname; - } - - public RelatedEntity setOfficialname(String officialname) { - this.officialname = officialname; - return this; - } - - public String getWebsiteurl() { - return websiteurl; - } - - public RelatedEntity setWebsiteurl(String websiteurl) { - this.websiteurl = websiteurl; - return this; - } - - public Qualifier getDatasourcetype() { - return datasourcetype; - } - - public RelatedEntity setDatasourcetype(Qualifier datasourcetype) { - this.datasourcetype = datasourcetype; - return this; - } - - public Qualifier getDatasourcetypeui() { - return datasourcetypeui; - } - - public RelatedEntity setDatasourcetypeui(Qualifier datasourcetypeui) { - this.datasourcetypeui = datasourcetypeui; - return this; - } - - public Qualifier getOpenairecompatibility() { - return openairecompatibility; - } - - public RelatedEntity setOpenairecompatibility(Qualifier openairecompatibility) { - this.openairecompatibility = openairecompatibility; - return this; - } - - public String getLegalname() { - return legalname; - } - - public RelatedEntity setLegalname(String legalname) { - this.legalname = legalname; - return this; - } - - public String getLegalshortname() { - return legalshortname; - } - - public RelatedEntity setLegalshortname(String legalshortname) { - this.legalshortname = legalshortname; - return this; - } - - public Qualifier getCountry() { - return country; - } - - public RelatedEntity setCountry(Qualifier country) { - this.country = country; - return this; - } - - public String getCode() { - return code; - } - - public RelatedEntity setCode(String code) { - this.code = code; - return this; - } - - public String getAcronym() { - return acronym; - } - - public RelatedEntity setAcronym(String acronym) { - this.acronym = acronym; - return this; - } - - public Qualifier getContracttype() { - return contracttype; - } - - public RelatedEntity setContracttype(Qualifier contracttype) { - this.contracttype = contracttype; - return this; - } - - public List getFundingtree() { - return fundingtree; - } - - public RelatedEntity setFundingtree(List fundingtree) { - this.fundingtree = fundingtree; - return this; - } - - public String getProjectTitle() { - return projectTitle; - } - - public RelatedEntity setProjectTitle(String projectTitle) { - this.projectTitle = projectTitle; - return this; } public String getType() { return type; } - public RelatedEntity setType(String type) { + public void setType(String type) { this.type = type; - return this; } + public StructuredProperty getTitle() { + return title; + } + + public void setTitle(StructuredProperty title) { + this.title = title; + } + + public String getWebsiteurl() { + return websiteurl; + } + + public void setWebsiteurl(String websiteurl) { + this.websiteurl = websiteurl; + } + + public String getDateofacceptance() { + return dateofacceptance; + } + + public void setDateofacceptance(String dateofacceptance) { + this.dateofacceptance = dateofacceptance; + } + + public String getPublisher() { + return publisher; + } + + public void setPublisher(String publisher) { + this.publisher = publisher; + } + + public List getPid() { + return pid; + } + + public void setPid(List pid) { + this.pid = pid; + } + + public String getCodeRepositoryUrl() { + return codeRepositoryUrl; + } + + public void setCodeRepositoryUrl(String codeRepositoryUrl) { + this.codeRepositoryUrl = codeRepositoryUrl; + } + + public Qualifier getResulttype() { + return resulttype; + } + + public void setResulttype(Qualifier resulttype) { + this.resulttype = resulttype; + } + + public List getCollectedfrom() { + return collectedfrom; + } + + public void setCollectedfrom(List collectedfrom) { + this.collectedfrom = collectedfrom; + } + + public List getInstances() { + return instances; + } + + public void setInstances(List instances) { + this.instances = instances; + } + + public String getOfficialname() { + return officialname; + } + + public void setOfficialname(String officialname) { + this.officialname = officialname; + } + + public Qualifier getDatasourcetype() { + return datasourcetype; + } + + public void setDatasourcetype(Qualifier datasourcetype) { + this.datasourcetype = datasourcetype; + } + + public Qualifier getDatasourcetypeui() { + return datasourcetypeui; + } + + public void setDatasourcetypeui(Qualifier datasourcetypeui) { + this.datasourcetypeui = datasourcetypeui; + } + + public Qualifier getOpenairecompatibility() { + return openairecompatibility; + } + + public void setOpenairecompatibility(Qualifier openairecompatibility) { + this.openairecompatibility = openairecompatibility; + } + + public String getLegalname() { + return legalname; + } + + public void setLegalname(String legalname) { + this.legalname = legalname; + } + + public String getLegalshortname() { + return legalshortname; + } + + public void setLegalshortname(String legalshortname) { + this.legalshortname = legalshortname; + } + + public Qualifier getCountry() { + return country; + } + + public void setCountry(Qualifier country) { + this.country = country; + } + + public String getProjectTitle() { + return projectTitle; + } + + public void setProjectTitle(String projectTitle) { + this.projectTitle = projectTitle; + } + + public String getCode() { + return code; + } + + public void setCode(String code) { + this.code = code; + } + + public String getAcronym() { + return acronym; + } + + public void setAcronym(String acronym) { + this.acronym = acronym; + } + + public Qualifier getContracttype() { + return contracttype; + } + + public void setContracttype(Qualifier contracttype) { + this.contracttype = contracttype; + } + + public List getFundingtree() { + return fundingtree; + } + + public void setFundingtree(List fundingtree) { + this.fundingtree = fundingtree; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + RelatedEntity that = (RelatedEntity) o; + return Objects.equal(id, that.id) && + Objects.equal(type, that.type) && + Objects.equal(title, that.title) && + Objects.equal(websiteurl, that.websiteurl) && + Objects.equal(dateofacceptance, that.dateofacceptance) && + Objects.equal(publisher, that.publisher) && + Objects.equal(pid, that.pid) && + Objects.equal(codeRepositoryUrl, that.codeRepositoryUrl) && + Objects.equal(resulttype, that.resulttype) && + Objects.equal(collectedfrom, that.collectedfrom) && + Objects.equal(instances, that.instances) && + Objects.equal(officialname, that.officialname) && + Objects.equal(datasourcetype, that.datasourcetype) && + Objects.equal(datasourcetypeui, that.datasourcetypeui) && + Objects.equal(openairecompatibility, that.openairecompatibility) && + Objects.equal(legalname, that.legalname) && + Objects.equal(legalshortname, that.legalshortname) && + Objects.equal(country, that.country) && + Objects.equal(projectTitle, that.projectTitle) && + Objects.equal(code, that.code) && + Objects.equal(acronym, that.acronym) && + Objects.equal(contracttype, that.contracttype) && + Objects.equal(fundingtree, that.fundingtree); + } + + @Override + public int hashCode() { + return Objects.hashCode(id, type, title, websiteurl, dateofacceptance, publisher, pid, codeRepositoryUrl, resulttype, collectedfrom, instances, officialname, datasourcetype, datasourcetypeui, openairecompatibility, legalname, legalshortname, country, projectTitle, code, acronym, contracttype, fundingtree); + } } \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java new file mode 100644 index 000000000..b294a6633 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java @@ -0,0 +1,35 @@ +package eu.dnetlib.dhp.oa.provision.model; + +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.Maps; +import eu.dnetlib.dhp.schema.oaf.Relation; + +import java.io.Serializable; +import java.util.Map; + +public class SortableRelation extends Relation implements Comparable, Serializable { + + private final static Map weights = Maps.newHashMap(); + + static { + weights.put("outcome", 0); + weights.put("supplement", 1); + weights.put("publicationDataset", 2); + weights.put("relationship", 3); + weights.put("similarity", 4); + weights.put("affiliation", 5); + + weights.put("provision", 6); + weights.put("participation", 7); + weights.put("dedup", 8); + } + + @Override + public int compareTo(Relation o) { + return ComparisonChain.start() + .compare(weights.get(getSubRelType()), weights.get(o.getSubRelType())) + .compare(getSource(), o.getSource()) + .compare(getTarget(), o.getTarget()) + .result(); + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java deleted file mode 100644 index 8169e57e0..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java +++ /dev/null @@ -1,99 +0,0 @@ -package eu.dnetlib.dhp.oa.provision.model; - -import com.google.common.collect.ComparisonChain; -import com.google.common.collect.Maps; - -import java.io.Serializable; -import java.util.Map; - -/** - * Allows to sort relationships according to the priority defined in weights map. - */ -public class SortableRelationKey implements Comparable, Serializable { - - private String sourceId; - private String targetId; - - private String relType; - private String subRelType; - private String relClass; - - private final static Map weights = Maps.newHashMap(); - - static { - weights.put("outcome", 0); - weights.put("supplement", 1); - weights.put("publicationDataset", 2); - weights.put("relationship", 3); - weights.put("similarity", 4); - weights.put("affiliation", 5); - - weights.put("provision", 6); - weights.put("participation", 7); - weights.put("dedup", 8); - } - - public static SortableRelationKey from(final EntityRelEntity e) { - return new SortableRelationKey() - .setSourceId(e.getRelation().getSourceId()) - .setTargetId(e.getRelation().getTargetId()) - .setRelType(e.getRelation().getRelType()) - .setSubRelType(e.getRelation().getSubRelType()) - .setRelClass(e.getRelation().getRelClass()); - } - - public String getSourceId() { - return sourceId; - } - - public SortableRelationKey setSourceId(String sourceId) { - this.sourceId = sourceId; - return this; - } - - public String getTargetId() { - return targetId; - } - - public SortableRelationKey setTargetId(String targetId) { - this.targetId = targetId; - return this; - } - - public String getRelType() { - return relType; - } - - public SortableRelationKey setRelType(String relType) { - this.relType = relType; - return this; - } - - public String getSubRelType() { - return subRelType; - } - - public SortableRelationKey setSubRelType(String subRelType) { - this.subRelType = subRelType; - return this; - } - - public String getRelClass() { - return relClass; - } - - public SortableRelationKey setRelClass(String relClass) { - this.relClass = relClass; - return this; - } - - @Override - public int compareTo(SortableRelationKey o) { - return ComparisonChain.start() - .compare(weights.get(getSubRelType()), weights.get(o.getSubRelType())) - .compare(getSourceId(), o.getSourceId()) - .compare(getTargetId(), o.getTargetId()) - .result(); - } - -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java index ded976eea..942acaea1 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/Tuple2.java @@ -2,27 +2,50 @@ package eu.dnetlib.dhp.oa.provision.model; import eu.dnetlib.dhp.schema.oaf.Relation; -public class Tuple2 { +import java.io.Serializable; +import java.util.Objects; + +public class Tuple2 implements Serializable { private Relation relation; private RelatedEntity relatedEntity; + public Tuple2() { + } + + public Tuple2(Relation relation, RelatedEntity relatedEntity) { + this.relation = relation; + this.relatedEntity = relatedEntity; + } + public Relation getRelation() { return relation; } - public Tuple2 setRelation(Relation relation) { + public void setRelation(Relation relation) { this.relation = relation; - return this; } public RelatedEntity getRelatedEntity() { return relatedEntity; } - public Tuple2 setRelatedEntity(RelatedEntity relatedEntity) { + public void setRelatedEntity(RelatedEntity relatedEntity) { this.relatedEntity = relatedEntity; - return this; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Tuple2 t2 = (Tuple2) o; + return getRelation().equals(t2.getRelation()); + } + + @Override + public int hashCode() { + return Objects.hash(getRelation().hashCode()); + } + } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/TypedRow.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/TypedRow.java index e275fd9da..54f34802f 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/TypedRow.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/TypedRow.java @@ -1,92 +1,61 @@ package eu.dnetlib.dhp.oa.provision.model; +import com.google.common.base.Objects; + import java.io.Serializable; public class TypedRow implements Serializable { - private String sourceId; - - private String targetId; + private String id; private Boolean deleted; private String type; - private String relType; - private String subRelType; - private String relClass; - private String oaf; - public String getSourceId() { - return sourceId; + public String getId() { + return id; } - public TypedRow setSourceId(String sourceId) { - this.sourceId = sourceId; - return this; - } - - public String getTargetId() { - return targetId; - } - - public TypedRow setTargetId(String targetId) { - this.targetId = targetId; - return this; + public void setId(String id) { + this.id = id; } public Boolean getDeleted() { return deleted; } - public TypedRow setDeleted(Boolean deleted) { + public void setDeleted(Boolean deleted) { this.deleted = deleted; - return this; } public String getType() { return type; } - public TypedRow setType(String type) { + public void setType(String type) { this.type = type; - return this; - } - - public String getRelType() { - return relType; - } - - public TypedRow setRelType(String relType) { - this.relType = relType; - return this; - } - - public String getSubRelType() { - return subRelType; - } - - public TypedRow setSubRelType(String subRelType) { - this.subRelType = subRelType; - return this; - } - - public String getRelClass() { - return relClass; - } - - public TypedRow setRelClass(String relClass) { - this.relClass = relClass; - return this; } public String getOaf() { return oaf; } - public TypedRow setOaf(String oaf) { + public void setOaf(String oaf) { this.oaf = oaf; - return this; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TypedRow typedRow2 = (TypedRow) o; + return Objects.equal(id, typedRow2.id); + } + + @Override + public int hashCode() { + return Objects.hashCode(id); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java index a48c812fc..398a272e2 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/GraphMappingUtils.java @@ -1,33 +1,47 @@ package eu.dnetlib.dhp.oa.provision.utils; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Predicate; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.jayway.jsonpath.DocumentContext; -import com.jayway.jsonpath.JsonPath; -import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; import eu.dnetlib.dhp.oa.provision.model.RelatedEntity; -import eu.dnetlib.dhp.oa.provision.model.TypedRow; +import eu.dnetlib.dhp.oa.provision.model.SortableRelation; import eu.dnetlib.dhp.schema.oaf.*; -import net.minidev.json.JSONArray; -import org.apache.commons.lang3.StringUtils; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; -import static org.apache.commons.lang3.StringUtils.*; +import static org.apache.commons.lang3.StringUtils.substringAfter; public class GraphMappingUtils { public static final String SEPARATOR = "_"; + public final static Map entityTypes = Maps.newHashMap(); + + static { + entityTypes.put(EntityType.datasource, Datasource.class); + entityTypes.put(EntityType.organization, Organization.class); + entityTypes.put(EntityType.project, Project.class); + entityTypes.put(EntityType.dataset, Dataset.class); + entityTypes.put(EntityType.otherresearchproduct, OtherResearchProduct.class); + entityTypes.put(EntityType.software, Software.class); + entityTypes.put(EntityType.publication, Publication.class); + } + public enum EntityType { - publication, dataset, otherresearchproduct, software, datasource, organization, project + publication, dataset, otherresearchproduct, software, datasource, organization, project; + + public static EntityType fromClass(Class clazz) { + switch (clazz.getName()) { + case "eu.dnetlib.dhp.schema.oaf.Publication" : return publication; + case "eu.dnetlib.dhp.schema.oaf.Dataset" : return dataset; + case "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct" : return otherresearchproduct; + case "eu.dnetlib.dhp.schema.oaf.Software" : return software; + case "eu.dnetlib.dhp.schema.oaf.Datasource" : return datasource; + case "eu.dnetlib.dhp.schema.oaf.Organization" : return organization; + case "eu.dnetlib.dhp.schema.oaf.Project" : return project; + default: throw new IllegalArgumentException("Unknown OafEntity class: " + clazz.getName()); + } + } } public enum MainEntityType { @@ -36,8 +50,6 @@ public class GraphMappingUtils { public static Set authorPidTypes = Sets.newHashSet("orcid", "magidentifier"); - public static Set instanceFieldFilter = Sets.newHashSet("instancetype", "hostedby", "license", "accessright", "collectedfrom", "dateofacceptance", "distributionlocation"); - private static final String schemeTemplate = "dnet:%s_%s_relations"; private static Map entityMapping = Maps.newHashMap(); @@ -52,169 +64,127 @@ public class GraphMappingUtils { entityMapping.put(EntityType.project, MainEntityType.project); } + public static Class[] getKryoClasses() { + return new Class[]{ + Author.class, + Context.class, + Country.class, + DataInfo.class, + eu.dnetlib.dhp.schema.oaf.Dataset.class, + Datasource.class, + ExternalReference.class, + ExtraInfo.class, + Field.class, + GeoLocation.class, + Instance.class, + Journal.class, + KeyValue.class, + Oaf.class, + OafEntity.class, + OAIProvenance.class, + Organization.class, + OriginDescription.class, + OtherResearchProduct.class, + Project.class, + Publication.class, + Qualifier.class, + Relation.class, + SortableRelation.class, //SUPPORT + Result.class, + Software.class, + StructuredProperty.class + }; + } + public static String getScheme(final String sourceType, final String targetType) { return String.format(schemeTemplate, entityMapping.get(EntityType.valueOf(sourceType)).name(), entityMapping.get(EntityType.valueOf(targetType)).name()); } - public static String getMainType(final String type) { - return entityMapping.get(EntityType.valueOf(type)).name(); + public static String getMainType(final EntityType type) { + return entityMapping.get(type).name(); } - public static boolean isResult(String type) { + public static boolean isResult(EntityType type) { return MainEntityType.result.name().equals(getMainType(type)); } - public static Predicate instanceFilter = s -> instanceFieldFilter.contains(s); + public static RelatedEntity asRelatedEntity(E entity, Class clazz) { - public static EntityRelEntity asRelatedEntity(EntityRelEntity e) { + final RelatedEntity re = new RelatedEntity(); + re.setId(entity.getId()); + re.setType(EntityType.fromClass(clazz).name()); - final DocumentContext j = JsonPath.parse(e.getSource().getOaf()); - final RelatedEntity re = new RelatedEntity().setId(j.read("$.id")).setType(e.getSource().getType()); + re.setPid(entity.getPid()); + re.setCollectedfrom(entity.getCollectedfrom()); - switch (EntityType.valueOf(e.getSource().getType())) { + switch (GraphMappingUtils.EntityType.fromClass(clazz)) { case publication: case dataset: case otherresearchproduct: case software: - mapTitle(j, re); - re.setDateofacceptance(j.read("$.dateofacceptance.value")); - re.setPublisher(j.read("$.publisher.value")); - JSONArray pids = j.read("$.pid"); - re.setPid(pids.stream() - .map(p -> asStructuredProperty((LinkedHashMap) p)) - .collect(Collectors.toList())); + Result result = (Result) entity; - re.setResulttype(asQualifier(j.read("$.resulttype"))); + if (result.getTitle() == null && !result.getTitle().isEmpty()) { + re.setTitle(result.getTitle().stream().findFirst().get()); + } - JSONArray collfrom = j.read("$.collectedfrom"); - re.setCollectedfrom(collfrom.stream() - .map(c -> asKV((LinkedHashMap) c)) - .collect(Collectors.toList())); - - // will throw exception when the instance is not found - JSONArray instances = j.read("$.instance"); - re.setInstances(instances.stream() - .map(i -> { - final LinkedHashMap p = (LinkedHashMap) i; - final Field license = new Field(); - license.setValue((String) ((LinkedHashMap) p.get("license")).get("value")); - final Instance instance = new Instance(); - instance.setLicense(license); - instance.setAccessright(asQualifier((LinkedHashMap) p.get("accessright"))); - instance.setInstancetype(asQualifier((LinkedHashMap) p.get("instancetype"))); - instance.setHostedby(asKV((LinkedHashMap) p.get("hostedby"))); - //TODO mapping of distributionlocation - instance.setCollectedfrom(asKV((LinkedHashMap) p.get("collectedfrom"))); - - Field dateofacceptance = new Field(); - dateofacceptance.setValue((String) ((LinkedHashMap) p.get("dateofacceptance")).get("value")); - instance.setDateofacceptance(dateofacceptance); - return instance; - }).collect(Collectors.toList())); + re.setDateofacceptance(getValue(result.getDateofacceptance())); + re.setPublisher(getValue(result.getPublisher())); + re.setResulttype(result.getResulttype()); + re.setInstances(result.getInstance()); //TODO still to be mapped //re.setCodeRepositoryUrl(j.read("$.coderepositoryurl")); break; case datasource: - re.setOfficialname(j.read("$.officialname.value")); - re.setWebsiteurl(j.read("$.websiteurl.value")); - re.setDatasourcetype(asQualifier(j.read("$.datasourcetype"))); - re.setOpenairecompatibility(asQualifier(j.read("$.openairecompatibility"))); + Datasource d = (Datasource) entity; + + re.setOfficialname(getValue(d.getOfficialname())); + re.setWebsiteurl(getValue(d.getWebsiteurl())); + re.setDatasourcetype(d.getDatasourcetype()); + re.setOpenairecompatibility(d.getOpenairecompatibility()); break; case organization: - re.setLegalname(j.read("$.legalname.value")); - re.setLegalshortname(j.read("$.legalshortname.value")); - re.setCountry(asQualifier(j.read("$.country"))); - re.setWebsiteurl(j.read("$.websiteurl.value")); + Organization o = (Organization) entity; + + re.setLegalname(getValue(o.getLegalname())); + re.setLegalshortname(getValue(o.getLegalshortname())); + re.setCountry(o.getCountry()); + re.setWebsiteurl(getValue(o.getWebsiteurl())); break; case project: - re.setProjectTitle(j.read("$.title.value")); - re.setCode(j.read("$.code.value")); - re.setAcronym(j.read("$.acronym.value")); - re.setContracttype(asQualifier(j.read("$.contracttype"))); + Project p = (Project) entity; - JSONArray f = j.read("$.fundingtree"); + re.setProjectTitle(getValue(p.getTitle())); + re.setCode(getValue(p.getCode())); + re.setAcronym(getValue(p.getAcronym())); + re.setContracttype(p.getContracttype()); + + List> f = p.getFundingtree(); if (!f.isEmpty()) { re.setFundingtree(f.stream() - .map(s -> ((LinkedHashMap) s).get("value")) + .map(s -> s.getValue()) .collect(Collectors.toList())); } - break; } - return new EntityRelEntity().setSource( - new TypedRow() - .setSourceId(e.getSource().getSourceId()) - .setDeleted(e.getSource().getDeleted()) - .setType(e.getSource().getType()) - .setOaf(serialize(re))); + return re; } - private static KeyValue asKV(LinkedHashMap j) { - final KeyValue kv = new KeyValue(); - kv.setKey((String) j.get("key")); - kv.setValue((String) j.get("value")); - return kv; + private static String getValue(Field field) { + return getFieldValueWithDefault(field, ""); } - private static void mapTitle(DocumentContext j, RelatedEntity re) { - final JSONArray a = j.read("$.title"); - if (!a.isEmpty()) { - final StructuredProperty sp = asStructuredProperty((LinkedHashMap) a.get(0)); - if (StringUtils.isNotBlank(sp.getValue())) { - re.setTitle(sp); - } - } - } - - private static StructuredProperty asStructuredProperty(LinkedHashMap j) { - final StructuredProperty sp = new StructuredProperty(); - final String value = (String) j.get("value"); - if (StringUtils.isNotBlank(value)) { - sp.setValue((String) j.get("value")); - sp.setQualifier(asQualifier((LinkedHashMap) j.get("qualifier"))); - } - return sp; - } - - public static Qualifier asQualifier(LinkedHashMap j) { - final Qualifier q = new Qualifier(); - - final String classid = j.get("classid"); - if (StringUtils.isNotBlank(classid)) { - q.setClassid(classid); - } - - final String classname = j.get("classname"); - if (StringUtils.isNotBlank(classname)) { - q.setClassname(classname); - } - - final String schemeid = j.get("schemeid"); - if (StringUtils.isNotBlank(schemeid)) { - q.setSchemeid(schemeid); - } - - final String schemename = j.get("schemename"); - if (StringUtils.isNotBlank(schemename)) { - q.setSchemename(schemename); - } - return q; - } - - public static String serialize(final Object o) { - try { - return new ObjectMapper() - .setSerializationInclusion(JsonInclude.Include.NON_NULL) - .writeValueAsString(o); - } catch (JsonProcessingException e) { - throw new IllegalArgumentException("unable to serialize: " + o.toString(), e); - } + private static T getFieldValueWithDefault(Field f, T defaultValue) { + return Optional.ofNullable(f) + .filter(Objects::nonNull) + .map(x -> x.getValue()) + .orElse(defaultValue); } public static String removePrefix(final String s) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java index 9714830d3..c8e7a2429 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java @@ -1,6 +1,6 @@ package eu.dnetlib.dhp.oa.provision.utils; -import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey; +import eu.dnetlib.dhp.oa.provision.model.SortableRelation; import org.apache.spark.Partitioner; import org.apache.spark.util.Utils; @@ -23,7 +23,7 @@ public class RelationPartitioner extends Partitioner { @Override public int getPartition(Object key) { - return Utils.nonNegativeMod(((SortableRelationKey) key).getSourceId().hashCode(), numPartitions()); + return Utils.nonNegativeMod(((SortableRelation) key).getSource().hashCode(), numPartitions()); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java index ffbe54904..5cf881f00 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java @@ -1,5 +1,6 @@ package eu.dnetlib.dhp.oa.provision.utils; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.collect.Lists; @@ -7,9 +8,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.mycila.xmltool.XMLDoc; import com.mycila.xmltool.XMLTag; -import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; -import eu.dnetlib.dhp.oa.provision.model.RelatedEntity; -import eu.dnetlib.dhp.oa.provision.model.Tuple2; +import eu.dnetlib.dhp.oa.provision.model.*; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.*; import org.apache.commons.lang3.StringUtils; @@ -50,6 +49,8 @@ public class XmlRecordFactory implements Serializable { private boolean indent = false; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public XmlRecordFactory( final ContextMapper contextMapper, final boolean indent, final String schemaLocation, final String otherDatasourceTypesUForUI) { @@ -74,22 +75,24 @@ public class XmlRecordFactory implements Serializable { final Set contexts = Sets.newHashSet(); - final OafEntity entity = je.getEntity(); + final OafEntity entity = toOafEntity(je.getEntity()); TemplateFactory templateFactory = new TemplateFactory(); try { - final List metadata = metadata(je.getType(), entity, contexts); + final EntityType type = GraphMappingUtils.EntityType.valueOf(je.getEntity().getType()); + final List metadata = metadata(type, entity, contexts); // rels has to be processed before the contexts because they enrich the contextMap with the funding info. final List relations = listRelations(je, templateFactory, contexts); - metadata.addAll(buildContexts(getMainType(je.getType()), contexts)); + final String mainType = getMainType(type); + metadata.addAll(buildContexts(mainType, contexts)); metadata.add(XmlSerializationUtils.parseDataInfo(entity.getDataInfo())); final String body = templateFactory.buildBody( - getMainType(je.getType()), + mainType, metadata, relations, - listChildren(je, templateFactory), listExtraInfo(je)); + listChildren(entity, je.getEntity().getType(), templateFactory), listExtraInfo(entity)); return printXML(templateFactory.buildRecord(entity, schemaLocation, body), indent); } catch (final Throwable e) { @@ -97,6 +100,35 @@ public class XmlRecordFactory implements Serializable { } } + private static OafEntity toOafEntity(TypedRow typedRow) { + return parseOaf(typedRow.getOaf(), typedRow.getType()); + } + + private static OafEntity parseOaf(final String json, final String type) { + try { + switch (GraphMappingUtils.EntityType.valueOf(type)) { + case publication: + return OBJECT_MAPPER.readValue(json, Publication.class); + case dataset: + return OBJECT_MAPPER.readValue(json, Dataset.class); + case otherresearchproduct: + return OBJECT_MAPPER.readValue(json, OtherResearchProduct.class); + case software: + return OBJECT_MAPPER.readValue(json, Software.class); + case datasource: + return OBJECT_MAPPER.readValue(json, Datasource.class); + case organization: + return OBJECT_MAPPER.readValue(json, Organization.class); + case project: + return OBJECT_MAPPER.readValue(json, Project.class); + default: + throw new IllegalArgumentException("invalid type: " + type); + } + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } + private String printXML(String xml, boolean indent) { try { final Document doc = new SAXReader().read(new StringReader(xml)); @@ -112,7 +144,7 @@ public class XmlRecordFactory implements Serializable { } } - private List metadata(final String type, final OafEntity entity, final Set contexts) { + private List metadata(final EntityType type, final OafEntity entity, final Set contexts) { final List metadata = Lists.newArrayList(); @@ -260,11 +292,9 @@ public class XmlRecordFactory implements Serializable { if (r.getResourcetype() != null) { metadata.add(XmlSerializationUtils.mapQualifier("resourcetype", r.getResourcetype())); } - - metadata.add(XmlSerializationUtils.mapQualifier("bestaccessright", getBestAccessright(r))); } - switch (EntityType.valueOf(type)) { + switch (type) { case publication: final Publication pub = (Publication) entity; @@ -662,7 +692,7 @@ public class XmlRecordFactory implements Serializable { if (isNotBlank(re.getCodeRepositoryUrl())) { metadata.add(XmlSerializationUtils.asXmlElement("coderepositoryurl", re.getCodeRepositoryUrl())); } - if (re.getResulttype() != null & !re.getResulttype().isBlank()) { + if (re.getResulttype() != null & re.getResulttype().isBlank()) { metadata.add(XmlSerializationUtils.mapQualifier("resulttype", re.getResulttype())); } if (re.getCollectedfrom() != null) { @@ -748,14 +778,14 @@ public class XmlRecordFactory implements Serializable { return rels; } - private List listChildren(final JoinedEntity je, TemplateFactory templateFactory) { + private List listChildren(final OafEntity entity, String type, TemplateFactory templateFactory) { final List children = Lists.newArrayList(); - - if (MainEntityType.result.toString().equals(getMainType(je.getType()))) { - final List instances = ((Result) je.getEntity()).getInstance(); + EntityType entityType = EntityType.valueOf(type); + if (MainEntityType.result.toString().equals(getMainType(entityType))) { + final List instances = ((Result) entity).getInstance(); if (instances != null) { - for (final Instance instance : ((Result) je.getEntity()).getInstance()) { + for (final Instance instance : ((Result) entity).getInstance()) { final List fields = Lists.newArrayList(); @@ -790,9 +820,9 @@ public class XmlRecordFactory implements Serializable { children.add(templateFactory.getInstance(instance.getHostedby().getKey(), fields, instance.getUrl())); } } - final List ext = ((Result) je.getEntity()).getExternalReference(); + final List ext = ((Result) entity).getExternalReference(); if (ext != null) { - for (final ExternalReference er : ((Result) je.getEntity()).getExternalReference()) { + for (final ExternalReference er : ((Result) entity).getExternalReference()) { final List fields = Lists.newArrayList(); @@ -826,8 +856,8 @@ public class XmlRecordFactory implements Serializable { return children; } - private List listExtraInfo(JoinedEntity je) { - final List extraInfo = je.getEntity().getExtraInfo(); + private List listExtraInfo(OafEntity entity) { + final List extraInfo = entity.getExtraInfo(); return extraInfo != null ? extraInfo .stream() .map(e -> XmlSerializationUtils.mapExtraInfo(e)) diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json index a5d20a55f..e57df9b09 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json @@ -1,7 +1,14 @@ [ - {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, - {"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true}, - {"paramName":"o", "paramLongName":"outputPath", "paramDescription": "the path used to store temporary output files", "paramRequired": true}, - {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequence file to read", "paramRequired": true}, - {"paramName":"t", "paramLongName":"otherDsTypeId", "paramDescription": "list of datasource types to populate field datasourcetypeui", "paramRequired": true} + { + "paramName": "in", + "paramLongName": "inputPath", + "paramDescription": "the path of the sequence file to read", + "paramRequired": true + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path used to store temporary output files", + "paramRequired": true + } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json new file mode 100644 index 000000000..bfb248d01 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "irp", + "paramLongName": "inputRelationsPath", + "paramDescription": "path to input relations prepare", + "paramRequired": true + }, + { + "paramName": "op", + "paramLongName": "outputPath", + "paramDescription": "root output location for prepared relations", + "paramRequired": true + }, + { + "paramName": "rp", + "paramLongName": "relPartitions", + "paramDescription": "number or partitions for the relations Dataset", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase1.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase1.json new file mode 100644 index 000000000..0090716d6 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase1.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "irp", + "paramLongName": "inputRelationsPath", + "paramDescription": "path to input relations from the graph", + "paramRequired": true + }, + { + "paramName": "iep", + "paramLongName": "inputEntityPath", + "paramDescription": "path to input entity from the graph", + "paramRequired": true + }, + { + "paramName": "clazz", + "paramLongName": "graphTableClassName", + "paramDescription": "class name associated to the input entity path", + "paramRequired": true + }, + { + "paramName": "op", + "paramLongName": "outputPath", + "paramDescription": "root output location for prepared relations", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json new file mode 100644 index 000000000..2727f153b --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "irp", + "paramLongName": "inputRelatedEntitiesPath", + "paramDescription": "path to input relations from the graph", + "paramRequired": true + }, + { + "paramName": "iep", + "paramLongName": "inputGraphRootPath", + "paramDescription": "root graph path", + "paramRequired": true + }, + { + "paramName": "op", + "paramLongName": "outputPath", + "paramDescription": "root output location for prepared relations", + "paramRequired": true + }, + { + "paramName": "np", + "paramLongName": "numPartitions", + "paramDescription": "number of partitions to use for the output", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json index 0d45e9e29..3396020e0 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json @@ -1,7 +1,26 @@ [ - {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, - {"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true}, - {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequence file to read the XML records", "paramRequired": true}, - {"paramName":"f", "paramLongName":"format", "paramDescription": "MDFormat name found in the IS profile", "paramRequired": true}, - {"paramName":"b", "paramLongName":"batchSize", "paramDescription": "size of the batch of documents sent to solr", "paramRequired": false} + { + "paramName": "is", + "paramLongName": "isLookupUrl", + "paramDescription": "URL of the isLookUp Service", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "inputPath", + "paramDescription": "the path of the sequence file to read the XML records", + "paramRequired": true + }, + { + "paramName": "f", + "paramLongName": "format", + "paramDescription": "MDFormat name found in the IS profile", + "paramRequired": true + }, + { + "paramName": "b", + "paramLongName": "batchSize", + "paramDescription": "size of the batch of documents sent to solr", + "paramRequired": false + } ] diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json new file mode 100644 index 000000000..32720514e --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "in", + "paramLongName": "inputPath", + "paramDescription": "the path of the sequence file to read", + "paramRequired": true + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path used to store temporary output files", + "paramRequired": true + }, + { + "paramName": "ilu", + "paramLongName": "isLookupUrl", + "paramDescription": "URL of the isLookUp Service", + "paramRequired": true + }, + { + "paramName": "odt", + "paramLongName": "otherDsTypeId", + "paramDescription": "list of datasource types to populate field datasourcetypeui", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/config-default.xml index 624d3ea76..c070d8338 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/config-default.xml @@ -19,16 +19,24 @@ hive_metastore_uris thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 - - hive_db_name - openaire - spark2YarnHistoryServerAddress - http://iis-cdh5-test-gw.ocean.icm.edu.pl:18088 + http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 spark2EventLogDir - /user/spark/applicationHistory + /user/spark/spark2ApplicationHistory + + + spark2ExtraListeners + "com.cloudera.spark.lineage.NavigatorAppListener" + + + spark2SqlQueryExecutionListeners + "com.cloudera.spark.lineage.NavigatorQueryListener" + + + oozieActionShareLibForSpark2 + spark2 \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index a28174cce..e6587ef5e 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -2,21 +2,52 @@ - hive_db_name - the target hive database name + inputGraphRootPath + root location of input materialized graph - sparkDriverMemory + isLookupUrl + URL for the isLookup service + + + + sparkDriverMemoryForJoining memory for driver process - sparkExecutorMemory + sparkExecutorMemoryForJoining memory for individual executor - sparkExecutorCores + sparkExecutorCoresForJoining number of cores used by single executor + + sparkDriverMemoryForIndexing + memory for driver process + + + sparkExecutorMemoryForIndexing + memory for individual executor + + + sparkExecutorCoresForIndexing + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + spark2YarnHistoryServerAddress spark 2.* yarn history server address @@ -32,12 +63,8 @@ ${nameNode} - mapreduce.job.queuename - ${queueName} - - - oozie.launcher.mapred.job.queue.name - ${oozieLauncherQueueName} + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} @@ -46,9 +73,9 @@ - ${wf:conf('reuseRecords') eq false} + ${wf:conf('reuseRecords') eq false} ${wf:conf('reuseRecords') eq true} - + @@ -56,31 +83,306 @@ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + - - - - yarn cluster - build_adjacency_lists - eu.dnetlib.dhp.oa.provision.SparkXmlRecordBuilderJob + PrepareRelations + eu.dnetlib.dhp.oa.provision.PrepareRelationsJob dhp-graph-provision-${projectVersion}.jar - --executor-cores ${sparkExecutorCoresForJoining} - --executor-memory ${sparkExecutorMemoryForJoining} + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" - --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - -mt yarn - -is ${isLookupUrl} - -t ${otherDsTypeId} - --sourcePath${sourcePath} - --outputPath${outputPath} + --inputRelationsPath${inputGraphRootPath}/relation + --outputPath${workingDir}/relation + --relPartitions3000 + + + + + + + + + + + + + + + + + + yarn + cluster + Join[relation.target = publication.id] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputRelationsPath${workingDir}/relation + --inputEntityPath${inputGraphRootPath}/publication + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication + --outputPath${workingDir}/join_partial + + + + + + + + yarn + cluster + Join[relation.target = dataset.id] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputRelationsPath${workingDir}/relation + --inputEntityPath${inputGraphRootPath}/dataset + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset + --outputPath${workingDir}/join_partial + + + + + + + + yarn + cluster + Join[relation.target = otherresearchproduct.id] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputRelationsPath${workingDir}/relation + --inputEntityPath${inputGraphRootPath}/otherresearchproduct + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct + --outputPath${workingDir}/join_partial + + + + + + + + yarn + cluster + Join[relation.target = software.id] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputRelationsPath${workingDir}/relation + --inputEntityPath${inputGraphRootPath}/software + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software + --outputPath${workingDir}/join_partial + + + + + + + + yarn + cluster + Join[relation.target = datasource.id] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputRelationsPath${workingDir}/relation + --inputEntityPath${inputGraphRootPath}/datasource + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource + --outputPath${workingDir}/join_partial + + + + + + + + yarn + cluster + Join[relation.target = organization.id] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputRelationsPath${workingDir}/relation + --inputEntityPath${inputGraphRootPath}/organization + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization + --outputPath${workingDir}/join_partial + + + + + + + + yarn + cluster + Join[relation.target = project.id] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputRelationsPath${workingDir}/relation + --inputEntityPath${inputGraphRootPath}/project + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project + --outputPath${workingDir}/join_partial + + + + + + + + + + + yarn + cluster + Join[entities.id = relatedEntity.source] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputGraphRootPath${inputGraphRootPath} + --inputRelatedEntitiesPath${workingDir}/join_partial + --outputPath${workingDir}/join_entities + --numPartitions12000 + + + + + + + + yarn + cluster + build_adjacency_lists + eu.dnetlib.dhp.oa.provision.AdjacencyListBuilderJob + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputPath ${workingDir}/join_entities + --outputPath${workingDir}/joined + + + + + + + + yarn + cluster + convert_to_xml + eu.dnetlib.dhp.oa.provision.XmlConverterJob + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputPath${workingDir}/joined + --outputPath${workingDir}/xml + --isLookupUrl${isLookupUrl} + --otherDsTypeId${otherDsTypeId} @@ -91,21 +393,20 @@ yarn cluster to_solr_index - eu.dnetlib.dhp.oa.provision.SparkXmlIndexingJob + eu.dnetlib.dhp.oa.provision.XmlIndexingJob dhp-graph-provision-${projectVersion}.jar - --executor-cores ${sparkExecutorCoresForIndexing} - --executor-memory ${sparkExecutorMemoryForIndexing} + --executor-memory=${sparkExecutorMemoryForIndexing} --driver-memory=${sparkDriverMemoryForIndexing} + --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=${sparkExecutorCoresForIndexing} - --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" - --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - -mt yarn - -is ${isLookupUrl} - --sourcePath${outputPath}/xml + --inputPath${workingDir}/xml + --isLookupUrl ${isLookupUrl} --format${format} --batchSize${batchSize}