From f9f7350bb9b51379c7bc1ed57bbde50c30b83551 Mon Sep 17 00:00:00 2001 From: pjacewicz Date: Wed, 1 Apr 2020 18:39:26 +0200 Subject: [PATCH] [dhp-actionmanager] common package added with utility classes supporting hadoop and spark envs --- .../common/FunctionalInterfaceSupport.java | 55 +++++++++++++ .../dhp/actionmanager/common/HdfsSupport.java | 57 ++++++++++++++ .../actionmanager/common/ModelSupport.java | 51 ++++++++++++ .../common/SparkSessionSupport.java | 57 ++++++++++++++ .../actionmanager/common/ThrowingSupport.java | 76 ++++++++++++++++++ .../actionmanager/common/HdfsSupportTest.java | 78 +++++++++++++++++++ .../common/ModelSupportTest.java | 35 +++++++++ .../common/SparkSessionSupportTest.java | 54 +++++++++++++ 8 files changed, 463 insertions(+) create mode 100644 dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/common/FunctionalInterfaceSupport.java create mode 100644 dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/common/HdfsSupport.java create mode 100644 dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/common/ModelSupport.java create mode 100644 dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/common/SparkSessionSupport.java create mode 100644 dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/common/ThrowingSupport.java create mode 100644 dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/common/HdfsSupportTest.java create mode 100644 dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/common/ModelSupportTest.java create mode 100644 dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/common/SparkSessionSupportTest.java diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/common/FunctionalInterfaceSupport.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/common/FunctionalInterfaceSupport.java new file mode 100644 index 0000000000..0f962428f4 --- /dev/null +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/common/FunctionalInterfaceSupport.java @@ -0,0 +1,55 @@ +package eu.dnetlib.dhp.actionmanager.common; + +import java.io.Serializable; +import java.util.function.Supplier; + +/** + * Provides serializable and throwing extensions to standard functional interfaces. + */ +public class FunctionalInterfaceSupport { + + private FunctionalInterfaceSupport() { + } + + /** + * Serializable supplier of any kind of objects. To be used withing spark processing pipelines when supplying + * functions externally. + * + * @param + */ + @FunctionalInterface + public interface SerializableSupplier extends Supplier, Serializable { + } + + /** + * Extension of consumer accepting functions throwing an exception. + * + * @param + * @param + */ + @FunctionalInterface + public interface ThrowingConsumer { + void accept(T t) throws E; + } + + /** + * Extension of supplier accepting functions throwing an exception. + * + * @param + * @param + */ + @FunctionalInterface + public interface ThrowingSupplier { + T get() throws E; + } + + /** + * Extension of runnable accepting functions throwing an exception. + * + * @param + */ + @FunctionalInterface + public interface ThrowingRunnable { + void run() throws E; + } +} diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/common/HdfsSupport.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/common/HdfsSupport.java new file mode 100644 index 0000000000..70af7e2a58 --- /dev/null +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/common/HdfsSupport.java @@ -0,0 +1,57 @@ +package eu.dnetlib.dhp.actionmanager.common; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static eu.dnetlib.dhp.actionmanager.common.ThrowingSupport.rethrowAsRuntimeException; + +/** + * HDFS utility methods. + */ +public class HdfsSupport { + private static final Logger logger = LoggerFactory.getLogger(HdfsSupport.class); + + private HdfsSupport() { + } + + /** + * Removes a path (file or dir) from HDFS. + * + * @param path Path to be removed + * @param configuration Configuration of hadoop env + */ + public static void remove(String path, Configuration configuration) { + logger.info("Removing path: {}", path); + rethrowAsRuntimeException(() -> { + Path f = new Path(path); + FileSystem fileSystem = FileSystem.get(configuration); + if (fileSystem.exists(f)) { + fileSystem.delete(f, true); + } + }); + } + + /** + * Lists hadoop files located below path or alternatively lists subdirs under path. + * + * @param path Path to be listed for hadoop files + * @param configuration Configuration of hadoop env + * @return List with string locations of hadoop files + */ + public static List listFiles(String path, Configuration configuration) { + logger.info("Listing files in path: {}", path); + return rethrowAsRuntimeException(() -> Arrays + .stream(FileSystem.get(configuration).listStatus(new Path(path))) + .filter(FileStatus::isDirectory) + .map(x -> x.getPath().toString()) + .collect(Collectors.toList())); + } +} diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/common/ModelSupport.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/common/ModelSupport.java new file mode 100644 index 0000000000..62cb8a5426 --- /dev/null +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/common/ModelSupport.java @@ -0,0 +1,51 @@ +package eu.dnetlib.dhp.actionmanager.common; + +import eu.dnetlib.dhp.schema.oaf.Oaf; + +/** + * Inheritance utility methods. + */ +public class ModelSupport { + + private ModelSupport() { + } + + /** + * Checks subclass-superclass relationship. + * + * @param subClazzObject Subclass object instance + * @param superClazzObject Superclass object instance + * @param Subclass type + * @param Superclass type + * @return True if X is a subclass of Y + */ + public static Boolean isSubClass(X subClazzObject, Y superClazzObject) { + return isSubClass(subClazzObject.getClass(), superClazzObject.getClass()); + } + + /** + * Checks subclass-superclass relationship. + * + * @param subClazzObject Subclass object instance + * @param superClazz Superclass class + * @param Subclass type + * @param Superclass type + * @return True if X is a subclass of Y + */ + public static Boolean isSubClass(X subClazzObject, Class superClazz) { + return isSubClass(subClazzObject.getClass(), superClazz); + } + + /** + * Checks subclass-superclass relationship. + * + * @param subClazz Subclass class + * @param superClazz Superclass class + * @param Subclass type + * @param Superclass type + * @return True if X is a subclass of Y + */ + public static Boolean isSubClass(Class subClazz, Class superClazz) { + return superClazz.isAssignableFrom(subClazz); + } +} diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/common/SparkSessionSupport.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/common/SparkSessionSupport.java new file mode 100644 index 0000000000..96b29fd22b --- /dev/null +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/common/SparkSessionSupport.java @@ -0,0 +1,57 @@ +package eu.dnetlib.dhp.actionmanager.common; + +import eu.dnetlib.dhp.actionmanager.common.FunctionalInterfaceSupport.ThrowingConsumer; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; + +import java.util.Objects; +import java.util.function.Function; + +/** + * SparkSession utility methods. + */ +public class SparkSessionSupport { + + private SparkSessionSupport() { + } + + /** + * Runs a given function using SparkSession created using default builder and supplied SparkConf. Stops SparkSession + * when SparkSession is managed. Allows to reuse SparkSession created externally. + * + * @param conf SparkConf instance + * @param isSparkSessionManaged When true will stop SparkSession + * @param fn Consumer to be applied to constructed SparkSession + */ + public static void runWithSparkSession(SparkConf conf, + Boolean isSparkSessionManaged, + ThrowingConsumer fn) { + runWithSparkSession(c -> SparkSession.builder().config(c).getOrCreate(), conf, isSparkSessionManaged, fn); + } + + /** + * Runs a given function using SparkSession created using supplied builder and supplied SparkConf. Stops SparkSession + * when SparkSession is managed. Allows to reuse SparkSession created externally. + * + * @param sparkSessionBuilder Builder of SparkSession + * @param conf SparkConf instance + * @param isSparkSessionManaged When true will stop SparkSession + * @param fn Consumer to be applied to constructed SparkSession + */ + public static void runWithSparkSession(Function sparkSessionBuilder, + SparkConf conf, + Boolean isSparkSessionManaged, + ThrowingConsumer fn) { + SparkSession spark = null; + try { + spark = sparkSessionBuilder.apply(conf); + fn.accept(spark); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + if (Objects.nonNull(spark) && isSparkSessionManaged) { + spark.stop(); + } + } + } +} diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/common/ThrowingSupport.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/common/ThrowingSupport.java new file mode 100644 index 0000000000..a384728d06 --- /dev/null +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/common/ThrowingSupport.java @@ -0,0 +1,76 @@ +package eu.dnetlib.dhp.actionmanager.common; + +import eu.dnetlib.dhp.actionmanager.common.FunctionalInterfaceSupport.ThrowingRunnable; +import eu.dnetlib.dhp.actionmanager.common.FunctionalInterfaceSupport.ThrowingSupplier; + +/** + * Exception handling utility methods. + */ +public class ThrowingSupport { + + private ThrowingSupport() { + } + + /** + * Executes given runnable and rethrows any exceptions as RuntimeException. + * + * @param fn Runnable to be executed + * @param Type of exception thrown + */ + public static void rethrowAsRuntimeException(ThrowingRunnable fn) { + try { + fn.run(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Executes given runnable and rethrows any exceptions as RuntimeException with custom message. + * + * @param fn Runnable to be executed + * @param msg Message to be set for rethrown exception + * @param Type of exception thrown + */ + public static void rethrowAsRuntimeException(ThrowingRunnable fn, String msg) { + try { + fn.run(); + } catch (Exception e) { + throw new RuntimeException(msg, e); + } + } + + /** + * Executes given supplier and rethrows any exceptions as RuntimeException. + * + * @param fn Supplier to be executed + * @param Type of returned value + * @param Type of exception thrown + * @return Result of supplier execution + */ + public static T rethrowAsRuntimeException(ThrowingSupplier fn) { + try { + return fn.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Executes given supplier and rethrows any exceptions as RuntimeException with custom message. + * + * @param fn Supplier to be executed + * @param msg Message to be set for rethrown exception + * @param Type of returned value + * @param Type of exception thrown + * @return Result of supplier execution + */ + public static T rethrowAsRuntimeException(ThrowingSupplier fn, String msg) { + try { + return fn.get(); + } catch (Exception e) { + throw new RuntimeException(msg, e); + } + } + +} diff --git a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/common/HdfsSupportTest.java b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/common/HdfsSupportTest.java new file mode 100644 index 0000000000..e352a94459 --- /dev/null +++ b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/common/HdfsSupportTest.java @@ -0,0 +1,78 @@ +package eu.dnetlib.dhp.actionmanager.common; + +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.*; + +public class HdfsSupportTest { + + @Nested + class Remove { + + @Test + public void shouldThrowARuntimeExceptionOnError() { + // when + assertThrows(RuntimeException.class, () -> + HdfsSupport.remove(null, new Configuration())); + } + + @Test + public void shouldRemoveADirFromHDFS(@TempDir Path tempDir) { + // when + HdfsSupport.remove(tempDir.toString(), new Configuration()); + + // then + assertFalse(Files.exists(tempDir)); + } + + @Test + public void shouldRemoveAFileFromHDFS(@TempDir Path tempDir) throws IOException { + // given + Path file = Files.createTempFile(tempDir, "p", "s"); + + // when + HdfsSupport.remove(file.toString(), new Configuration()); + + // then + assertFalse(Files.exists(file)); + } + } + + @Nested + class ListFiles { + + @Test + public void shouldThrowARuntimeExceptionOnError() { + // when + assertThrows(RuntimeException.class, () -> + HdfsSupport.listFiles(null, new Configuration())); + } + + @Test + public void shouldListFilesLocatedInPath(@TempDir Path tempDir) throws IOException { + Path subDir1 = Files.createTempDirectory(tempDir, "list_me"); + Path subDir2 = Files.createTempDirectory(tempDir, "list_me"); + + // when + List paths = HdfsSupport.listFiles(tempDir.toString(), new Configuration()); + + // then + assertEquals(2, paths.size()); + List expecteds = Arrays.stream(new String[]{subDir1.toString(), subDir2.toString()}) + .sorted().collect(Collectors.toList()); + List actuals = paths.stream().sorted().collect(Collectors.toList()); + assertTrue(actuals.get(0).contains(expecteds.get(0))); + assertTrue(actuals.get(1).contains(expecteds.get(1))); + } + } +} diff --git a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/common/ModelSupportTest.java b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/common/ModelSupportTest.java new file mode 100644 index 0000000000..ea8082fe14 --- /dev/null +++ b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/common/ModelSupportTest.java @@ -0,0 +1,35 @@ +package eu.dnetlib.dhp.actionmanager.common; + +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.Result; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ModelSupportTest { + + @Nested + class IsSubClass { + + @Test + public void shouldReturnFalseWhenSubClassDoesNotExtendSuperClass() { + // when + Boolean result = ModelSupport.isSubClass(Relation.class, OafEntity.class); + + // then + assertFalse(result); + } + + @Test + public void shouldReturnTrueWhenSubClassExtendsSuperClass() { + // when + Boolean result = ModelSupport.isSubClass(Result.class, OafEntity.class); + + // then + assertTrue(result); + } + } +} \ No newline at end of file diff --git a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/common/SparkSessionSupportTest.java b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/common/SparkSessionSupportTest.java new file mode 100644 index 0000000000..93debe6178 --- /dev/null +++ b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/common/SparkSessionSupportTest.java @@ -0,0 +1,54 @@ +package eu.dnetlib.dhp.actionmanager.common; + +import eu.dnetlib.dhp.actionmanager.common.FunctionalInterfaceSupport.ThrowingConsumer; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.util.function.Function; + +import static org.mockito.Mockito.*; + +public class SparkSessionSupportTest { + + @Nested + class RunWithSparkSession { + + @Test + public void shouldExecuteFunctionAndNotStopSparkSessionWhenSparkSessionIsNotManaged() throws Exception { + // given + SparkSession spark = mock(SparkSession.class); + SparkConf conf = mock(SparkConf.class); + Function sparkSessionBuilder = mock(Function.class); + when(sparkSessionBuilder.apply(conf)).thenReturn(spark); + ThrowingConsumer fn = mock(ThrowingConsumer.class); + + // when + SparkSessionSupport.runWithSparkSession(sparkSessionBuilder, conf, false, fn); + + // then + verify(sparkSessionBuilder).apply(conf); + verify(fn).accept(spark); + verify(spark, never()).stop(); + } + + @Test + public void shouldExecuteFunctionAndStopSparkSessionWhenSparkSessionIsManaged() throws Exception { + // given + SparkSession spark = mock(SparkSession.class); + SparkConf conf = mock(SparkConf.class); + Function sparkSessionBuilder = mock(Function.class); + when(sparkSessionBuilder.apply(conf)).thenReturn(spark); + ThrowingConsumer fn = mock(ThrowingConsumer.class); + + // when + SparkSessionSupport.runWithSparkSession(sparkSessionBuilder, conf, true, fn); + + // then + verify(sparkSessionBuilder).apply(conf); + verify(fn).accept(spark); + verify(spark, times(1)).stop(); + } + } +} \ No newline at end of file