Fork 0

[dhp-actionmanager] common package added with utility classes supporting hadoop and spark envs

This commit is contained in:
Przemysław Jacewicz 2020-04-01 18:39:26 +02:00 committed by przemek
parent ad70c23b2e
commit f9f7350bb9
8 changed files with 463 additions and 0 deletions

View File

@ -0,0 +1,55 @@
package eu.dnetlib.dhp.actionmanager.common;
import java.io.Serializable;
import java.util.function.Supplier;
* Provides serializable and throwing extensions to standard functional interfaces.
public class FunctionalInterfaceSupport {
private FunctionalInterfaceSupport() {
* Serializable supplier of any kind of objects. To be used withing spark processing pipelines when supplying
* functions externally.
* @param <T>
public interface SerializableSupplier<T> extends Supplier<T>, Serializable {
* Extension of consumer accepting functions throwing an exception.
* @param <T>
* @param <E>
public interface ThrowingConsumer<T, E extends Exception> {
void accept(T t) throws E;
* Extension of supplier accepting functions throwing an exception.
* @param <T>
* @param <E>
public interface ThrowingSupplier<T, E extends Exception> {
T get() throws E;
* Extension of runnable accepting functions throwing an exception.
* @param <E>
public interface ThrowingRunnable<E extends Exception> {
void run() throws E;

View File

@ -0,0 +1,57 @@
package eu.dnetlib.dhp.actionmanager.common;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.actionmanager.common.ThrowingSupport.rethrowAsRuntimeException;
* HDFS utility methods.
public class HdfsSupport {
private static final Logger logger = LoggerFactory.getLogger(HdfsSupport.class);
private HdfsSupport() {
* Removes a path (file or dir) from HDFS.
* @param path Path to be removed
* @param configuration Configuration of hadoop env
public static void remove(String path, Configuration configuration) {
logger.info("Removing path: {}", path);
rethrowAsRuntimeException(() -> {
Path f = new Path(path);
FileSystem fileSystem = FileSystem.get(configuration);
if (fileSystem.exists(f)) {
fileSystem.delete(f, true);
* Lists hadoop files located below path or alternatively lists subdirs under path.
* @param path Path to be listed for hadoop files
* @param configuration Configuration of hadoop env
* @return List with string locations of hadoop files
public static List<String> listFiles(String path, Configuration configuration) {
logger.info("Listing files in path: {}", path);
return rethrowAsRuntimeException(() -> Arrays
.stream(FileSystem.get(configuration).listStatus(new Path(path)))
.map(x -> x.getPath().toString())

View File

@ -0,0 +1,51 @@
package eu.dnetlib.dhp.actionmanager.common;
import eu.dnetlib.dhp.schema.oaf.Oaf;
* Inheritance utility methods.
public class ModelSupport {
private ModelSupport() {
* Checks subclass-superclass relationship.
* @param subClazzObject Subclass object instance
* @param superClazzObject Superclass object instance
* @param <X> Subclass type
* @param <Y> Superclass type
* @return True if X is a subclass of Y
public static <X extends Oaf, Y extends Oaf> Boolean isSubClass(X subClazzObject, Y superClazzObject) {
return isSubClass(subClazzObject.getClass(), superClazzObject.getClass());
* Checks subclass-superclass relationship.
* @param subClazzObject Subclass object instance
* @param superClazz Superclass class
* @param <X> Subclass type
* @param <Y> Superclass type
* @return True if X is a subclass of Y
public static <X extends Oaf, Y extends Oaf> Boolean isSubClass(X subClazzObject, Class<Y> superClazz) {
return isSubClass(subClazzObject.getClass(), superClazz);
* Checks subclass-superclass relationship.
* @param subClazz Subclass class
* @param superClazz Superclass class
* @param <X> Subclass type
* @param <Y> Superclass type
* @return True if X is a subclass of Y
public static <X extends Oaf, Y extends Oaf> Boolean isSubClass(Class<X> subClazz, Class<Y> superClazz) {
return superClazz.isAssignableFrom(subClazz);

View File

@ -0,0 +1,57 @@
package eu.dnetlib.dhp.actionmanager.common;
import eu.dnetlib.dhp.actionmanager.common.FunctionalInterfaceSupport.ThrowingConsumer;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import java.util.Objects;
import java.util.function.Function;
* SparkSession utility methods.
public class SparkSessionSupport {
private SparkSessionSupport() {
* Runs a given function using SparkSession created using default builder and supplied SparkConf. Stops SparkSession
* when SparkSession is managed. Allows to reuse SparkSession created externally.
* @param conf SparkConf instance
* @param isSparkSessionManaged When true will stop SparkSession
* @param fn Consumer to be applied to constructed SparkSession
public static void runWithSparkSession(SparkConf conf,
Boolean isSparkSessionManaged,
ThrowingConsumer<SparkSession, Exception> fn) {
runWithSparkSession(c -> SparkSession.builder().config(c).getOrCreate(), conf, isSparkSessionManaged, fn);
* Runs a given function using SparkSession created using supplied builder and supplied SparkConf. Stops SparkSession
* when SparkSession is managed. Allows to reuse SparkSession created externally.
* @param sparkSessionBuilder Builder of SparkSession
* @param conf SparkConf instance
* @param isSparkSessionManaged When true will stop SparkSession
* @param fn Consumer to be applied to constructed SparkSession
public static void runWithSparkSession(Function<SparkConf, SparkSession> sparkSessionBuilder,
SparkConf conf,
Boolean isSparkSessionManaged,
ThrowingConsumer<SparkSession, Exception> fn) {
SparkSession spark = null;
try {
spark = sparkSessionBuilder.apply(conf);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (Objects.nonNull(spark) && isSparkSessionManaged) {

View File

@ -0,0 +1,76 @@
package eu.dnetlib.dhp.actionmanager.common;
import eu.dnetlib.dhp.actionmanager.common.FunctionalInterfaceSupport.ThrowingRunnable;
import eu.dnetlib.dhp.actionmanager.common.FunctionalInterfaceSupport.ThrowingSupplier;
* Exception handling utility methods.
public class ThrowingSupport {
private ThrowingSupport() {
* Executes given runnable and rethrows any exceptions as RuntimeException.
* @param fn Runnable to be executed
* @param <E> Type of exception thrown
public static <E extends Exception> void rethrowAsRuntimeException(ThrowingRunnable<E> fn) {
try {
} catch (Exception e) {
throw new RuntimeException(e);
* Executes given runnable and rethrows any exceptions as RuntimeException with custom message.
* @param fn Runnable to be executed
* @param msg Message to be set for rethrown exception
* @param <E> Type of exception thrown
public static <E extends Exception> void rethrowAsRuntimeException(ThrowingRunnable<E> fn, String msg) {
try {
} catch (Exception e) {
throw new RuntimeException(msg, e);
* Executes given supplier and rethrows any exceptions as RuntimeException.
* @param fn Supplier to be executed
* @param <T> Type of returned value
* @param <E> Type of exception thrown
* @return Result of supplier execution
public static <T, E extends Exception> T rethrowAsRuntimeException(ThrowingSupplier<T, E> fn) {
try {
return fn.get();
} catch (Exception e) {
throw new RuntimeException(e);
* Executes given supplier and rethrows any exceptions as RuntimeException with custom message.
* @param fn Supplier to be executed
* @param msg Message to be set for rethrown exception
* @param <T> Type of returned value
* @param <E> Type of exception thrown
* @return Result of supplier execution
public static <T, E extends Exception> T rethrowAsRuntimeException(ThrowingSupplier<T, E> fn, String msg) {
try {
return fn.get();
} catch (Exception e) {
throw new RuntimeException(msg, e);

View File

@ -0,0 +1,78 @@
package eu.dnetlib.dhp.actionmanager.common;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.*;
public class HdfsSupportTest {
class Remove {
public void shouldThrowARuntimeExceptionOnError() {
// when
assertThrows(RuntimeException.class, () ->
HdfsSupport.remove(null, new Configuration()));
public void shouldRemoveADirFromHDFS(@TempDir Path tempDir) {
// when
HdfsSupport.remove(tempDir.toString(), new Configuration());
// then
public void shouldRemoveAFileFromHDFS(@TempDir Path tempDir) throws IOException {
// given
Path file = Files.createTempFile(tempDir, "p", "s");
// when
HdfsSupport.remove(file.toString(), new Configuration());
// then
class ListFiles {
public void shouldThrowARuntimeExceptionOnError() {
// when
assertThrows(RuntimeException.class, () ->
HdfsSupport.listFiles(null, new Configuration()));
public void shouldListFilesLocatedInPath(@TempDir Path tempDir) throws IOException {
Path subDir1 = Files.createTempDirectory(tempDir, "list_me");
Path subDir2 = Files.createTempDirectory(tempDir, "list_me");
// when
List<String> paths = HdfsSupport.listFiles(tempDir.toString(), new Configuration());
// then
assertEquals(2, paths.size());
List<String> expecteds = Arrays.stream(new String[]{subDir1.toString(), subDir2.toString()})
List<String> actuals = paths.stream().sorted().collect(Collectors.toList());

View File

@ -0,0 +1,35 @@
package eu.dnetlib.dhp.actionmanager.common;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ModelSupportTest {
class IsSubClass {
public void shouldReturnFalseWhenSubClassDoesNotExtendSuperClass() {
// when
Boolean result = ModelSupport.isSubClass(Relation.class, OafEntity.class);
// then
public void shouldReturnTrueWhenSubClassExtendsSuperClass() {
// when
Boolean result = ModelSupport.isSubClass(Result.class, OafEntity.class);
// then

View File

@ -0,0 +1,54 @@
package eu.dnetlib.dhp.actionmanager.common;
import eu.dnetlib.dhp.actionmanager.common.FunctionalInterfaceSupport.ThrowingConsumer;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import java.util.function.Function;
import static org.mockito.Mockito.*;
public class SparkSessionSupportTest {
class RunWithSparkSession {
public void shouldExecuteFunctionAndNotStopSparkSessionWhenSparkSessionIsNotManaged() throws Exception {
// given
SparkSession spark = mock(SparkSession.class);
SparkConf conf = mock(SparkConf.class);
Function<SparkConf, SparkSession> sparkSessionBuilder = mock(Function.class);
ThrowingConsumer<SparkSession, Exception> fn = mock(ThrowingConsumer.class);
// when
SparkSessionSupport.runWithSparkSession(sparkSessionBuilder, conf, false, fn);
// then
verify(spark, never()).stop();
public void shouldExecuteFunctionAndStopSparkSessionWhenSparkSessionIsManaged() throws Exception {
// given
SparkSession spark = mock(SparkSession.class);
SparkConf conf = mock(SparkConf.class);
Function<SparkConf, SparkSession> sparkSessionBuilder = mock(Function.class);
ThrowingConsumer<SparkSession, Exception> fn = mock(ThrowingConsumer.class);
// when
SparkSessionSupport.runWithSparkSession(sparkSessionBuilder, conf, true, fn);
// then
verify(spark, times(1)).stop();