1
0
Fork 0

Utility classes moved in dhp-common and dhp-schemas

This commit is contained in:
Claudio Atzori 2020-04-07 11:56:22 +02:00
parent c57cf679ca
commit d74e128aa6
29 changed files with 177 additions and 655 deletions

View File

@ -14,12 +14,6 @@
<dependencies> <dependencies>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-schemas</artifactId>
<version>${project.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId> <artifactId>hadoop-common</artifactId>

View File

@ -1,36 +0,0 @@
package eu.dnetlib.dhp.common;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ModelSupportTest {
@Nested
class IsSubClass {
@Test
public void shouldReturnFalseWhenSubClassDoesNotExtendSuperClass() {
// when
Boolean result = ModelSupport.isSubClass(Relation.class, OafEntity.class);
// then
assertFalse(result);
}
@Test
public void shouldReturnTrueWhenSubClassExtendsSuperClass() {
// when
Boolean result = ModelSupport.isSubClass(Result.class, OafEntity.class);
// then
assertTrue(result);
}
}
}

View File

@ -0,0 +1,23 @@
package eu.dnetlib.dhp.schema.common;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
/**
* Actual entity types in the Graph
*/
public enum EntityType {
publication, dataset, otherresearchproduct, software, datasource, organization, project;
/**
* Resolves the EntityType, given the relative class name
* @param clazz the given class name
* @param <T> actual OafEntity subclass
* @return the EntityType associated to the given class
*/
public static <T extends OafEntity> EntityType fromClass(Class<T> clazz) {
return EntityType.valueOf(clazz.getSimpleName().toLowerCase());
}
}

View File

@ -0,0 +1,9 @@
package eu.dnetlib.dhp.schema.common;
/**
* Main entity types in the Graph
*/
public enum MainEntityType {
result, datasource, organization, project
}

View File

@ -1,12 +1,60 @@
package eu.dnetlib.dhp.schema.common; package eu.dnetlib.dhp.schema.common;
import eu.dnetlib.dhp.schema.oaf.Oaf; import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.oaf.*;
import java.util.Map;
/** /**
* Inheritance utility methods. * Oaf model utility methods.
*/ */
public class ModelSupport { public class ModelSupport {
/**
* Defines the mapping between the actual entity type and the main entity type
*/
private static Map<EntityType, MainEntityType> entityMapping = Maps.newHashMap();
static {
entityMapping.put(EntityType.publication, MainEntityType.result);
entityMapping.put(EntityType.dataset, MainEntityType.result);
entityMapping.put(EntityType.otherresearchproduct, MainEntityType.result);
entityMapping.put(EntityType.software, MainEntityType.result);
entityMapping.put(EntityType.datasource, MainEntityType.datasource);
entityMapping.put(EntityType.organization, MainEntityType.organization);
entityMapping.put(EntityType.project, MainEntityType.project);
}
/**
* Defines the mapping between the actual entity types and the relative classes implementing them
*/
public final static Map<EntityType, Class> entityTypes = Maps.newHashMap();
static {
entityTypes.put(EntityType.datasource, Datasource.class);
entityTypes.put(EntityType.organization, Organization.class);
entityTypes.put(EntityType.project, Project.class);
entityTypes.put(EntityType.dataset, Dataset.class);
entityTypes.put(EntityType.otherresearchproduct, OtherResearchProduct.class);
entityTypes.put(EntityType.software, Software.class);
entityTypes.put(EntityType.publication, Publication.class);
}
public final static Map<String, Class> oafTypes = Maps.newHashMap();
static {
oafTypes.put("datasource", Datasource.class);
oafTypes.put("organization", Organization.class);
oafTypes.put("project", Project.class);
oafTypes.put("dataset", Dataset.class);
oafTypes.put("otherresearchproduct", OtherResearchProduct.class);
oafTypes.put("software", Software.class);
oafTypes.put("publication", Publication.class);
oafTypes.put("relation", Relation.class);
}
private static final String schemeTemplate = "dnet:%s_%s_relations";
private ModelSupport() { private ModelSupport() {
} }
@ -48,4 +96,56 @@ public class ModelSupport {
public static <X extends Oaf, Y extends Oaf> Boolean isSubClass(Class<X> subClazz, Class<Y> superClazz) { public static <X extends Oaf, Y extends Oaf> Boolean isSubClass(Class<X> subClazz, Class<Y> superClazz) {
return superClazz.isAssignableFrom(subClazz); return superClazz.isAssignableFrom(subClazz);
} }
/**
* Lists all the OAF model classes
*
* @param <T>
* @return
*/
public static <T extends Oaf> Class<T>[] getOafModelClasses() {
return new Class[]{
Author.class,
Context.class,
Country.class,
DataInfo.class,
Dataset.class,
Datasource.class,
ExternalReference.class,
ExtraInfo.class,
Field.class,
GeoLocation.class,
Instance.class,
Journal.class,
KeyValue.class,
Oaf.class,
OafEntity.class,
OAIProvenance.class,
Organization.class,
OriginDescription.class,
OtherResearchProduct.class,
Project.class,
Publication.class,
Qualifier.class,
Relation.class,
Result.class,
Software.class,
StructuredProperty.class
};
}
public static String getMainType(final EntityType type) {
return entityMapping.get(type).name();
}
public static boolean isResult(EntityType type) {
return MainEntityType.result.name().equals(getMainType(type));
}
public static String getScheme(final String sourceType, final String targetType) {
return String.format(schemeTemplate,
entityMapping.get(EntityType.valueOf(sourceType)).name(),
entityMapping.get(EntityType.valueOf(targetType)).name());
}
} }

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dhp.actionmanager.common; package eu.dnetlib.dhp.schema.common;
import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;

View File

@ -52,5 +52,6 @@
<artifactId>dhp-schemas</artifactId> <artifactId>dhp-schemas</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -1,55 +0,0 @@
package eu.dnetlib.dhp.actionmanager.common;
import java.io.Serializable;
import java.util.function.Supplier;
/**
* Provides serializable and throwing extensions to standard functional interfaces.
*/
public class FunctionalInterfaceSupport {
private FunctionalInterfaceSupport() {
}
/**
* Serializable supplier of any kind of objects. To be used withing spark processing pipelines when supplying
* functions externally.
*
* @param <T>
*/
@FunctionalInterface
public interface SerializableSupplier<T> extends Supplier<T>, Serializable {
}
/**
* Extension of consumer accepting functions throwing an exception.
*
* @param <T>
* @param <E>
*/
@FunctionalInterface
public interface ThrowingConsumer<T, E extends Exception> {
void accept(T t) throws E;
}
/**
* Extension of supplier accepting functions throwing an exception.
*
* @param <T>
* @param <E>
*/
@FunctionalInterface
public interface ThrowingSupplier<T, E extends Exception> {
T get() throws E;
}
/**
* Extension of runnable accepting functions throwing an exception.
*
* @param <E>
*/
@FunctionalInterface
public interface ThrowingRunnable<E extends Exception> {
void run() throws E;
}
}

View File

@ -1,57 +0,0 @@
package eu.dnetlib.dhp.actionmanager.common;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.actionmanager.common.ThrowingSupport.rethrowAsRuntimeException;
/**
* HDFS utility methods.
*/
public class HdfsSupport {
private static final Logger logger = LoggerFactory.getLogger(HdfsSupport.class);
private HdfsSupport() {
}
/**
* Removes a path (file or dir) from HDFS.
*
* @param path Path to be removed
* @param configuration Configuration of hadoop env
*/
public static void remove(String path, Configuration configuration) {
logger.info("Removing path: {}", path);
rethrowAsRuntimeException(() -> {
Path f = new Path(path);
FileSystem fileSystem = FileSystem.get(configuration);
if (fileSystem.exists(f)) {
fileSystem.delete(f, true);
}
});
}
/**
* Lists hadoop files located below path or alternatively lists subdirs under path.
*
* @param path Path to be listed for hadoop files
* @param configuration Configuration of hadoop env
* @return List with string locations of hadoop files
*/
public static List<String> listFiles(String path, Configuration configuration) {
logger.info("Listing files in path: {}", path);
return rethrowAsRuntimeException(() -> Arrays
.stream(FileSystem.get(configuration).listStatus(new Path(path)))
.filter(FileStatus::isDirectory)
.map(x -> x.getPath().toString())
.collect(Collectors.toList()));
}
}

View File

@ -1,51 +0,0 @@
package eu.dnetlib.dhp.actionmanager.common;
import eu.dnetlib.dhp.schema.oaf.Oaf;
/**
* Inheritance utility methods.
*/
public class ModelSupport {
private ModelSupport() {
}
/**
* Checks subclass-superclass relationship.
*
* @param subClazzObject Subclass object instance
* @param superClazzObject Superclass object instance
* @param <X> Subclass type
* @param <Y> Superclass type
* @return True if X is a subclass of Y
*/
public static <X extends Oaf, Y extends Oaf> Boolean isSubClass(X subClazzObject, Y superClazzObject) {
return isSubClass(subClazzObject.getClass(), superClazzObject.getClass());
}
/**
* Checks subclass-superclass relationship.
*
* @param subClazzObject Subclass object instance
* @param superClazz Superclass class
* @param <X> Subclass type
* @param <Y> Superclass type
* @return True if X is a subclass of Y
*/
public static <X extends Oaf, Y extends Oaf> Boolean isSubClass(X subClazzObject, Class<Y> superClazz) {
return isSubClass(subClazzObject.getClass(), superClazz);
}
/**
* Checks subclass-superclass relationship.
*
* @param subClazz Subclass class
* @param superClazz Superclass class
* @param <X> Subclass type
* @param <Y> Superclass type
* @return True if X is a subclass of Y
*/
public static <X extends Oaf, Y extends Oaf> Boolean isSubClass(Class<X> subClazz, Class<Y> superClazz) {
return superClazz.isAssignableFrom(subClazz);
}
}

View File

@ -1,57 +0,0 @@
package eu.dnetlib.dhp.actionmanager.common;
import eu.dnetlib.dhp.actionmanager.common.FunctionalInterfaceSupport.ThrowingConsumer;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import java.util.Objects;
import java.util.function.Function;
/**
* SparkSession utility methods.
*/
public class SparkSessionSupport {
private SparkSessionSupport() {
}
/**
* Runs a given function using SparkSession created using default builder and supplied SparkConf. Stops SparkSession
* when SparkSession is managed. Allows to reuse SparkSession created externally.
*
* @param conf SparkConf instance
* @param isSparkSessionManaged When true will stop SparkSession
* @param fn Consumer to be applied to constructed SparkSession
*/
public static void runWithSparkSession(SparkConf conf,
Boolean isSparkSessionManaged,
ThrowingConsumer<SparkSession, Exception> fn) {
runWithSparkSession(c -> SparkSession.builder().config(c).getOrCreate(), conf, isSparkSessionManaged, fn);
}
/**
* Runs a given function using SparkSession created using supplied builder and supplied SparkConf. Stops SparkSession
* when SparkSession is managed. Allows to reuse SparkSession created externally.
*
* @param sparkSessionBuilder Builder of SparkSession
* @param conf SparkConf instance
* @param isSparkSessionManaged When true will stop SparkSession
* @param fn Consumer to be applied to constructed SparkSession
*/
public static void runWithSparkSession(Function<SparkConf, SparkSession> sparkSessionBuilder,
SparkConf conf,
Boolean isSparkSessionManaged,
ThrowingConsumer<SparkSession, Exception> fn) {
SparkSession spark = null;
try {
spark = sparkSessionBuilder.apply(conf);
fn.accept(spark);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (Objects.nonNull(spark) && isSparkSessionManaged) {
spark.stop();
}
}
}
}

View File

@ -1,76 +0,0 @@
package eu.dnetlib.dhp.actionmanager.common;
import eu.dnetlib.dhp.actionmanager.common.FunctionalInterfaceSupport.ThrowingRunnable;
import eu.dnetlib.dhp.actionmanager.common.FunctionalInterfaceSupport.ThrowingSupplier;
/**
* Exception handling utility methods.
*/
public class ThrowingSupport {
private ThrowingSupport() {
}
/**
* Executes given runnable and rethrows any exceptions as RuntimeException.
*
* @param fn Runnable to be executed
* @param <E> Type of exception thrown
*/
public static <E extends Exception> void rethrowAsRuntimeException(ThrowingRunnable<E> fn) {
try {
fn.run();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* Executes given runnable and rethrows any exceptions as RuntimeException with custom message.
*
* @param fn Runnable to be executed
* @param msg Message to be set for rethrown exception
* @param <E> Type of exception thrown
*/
public static <E extends Exception> void rethrowAsRuntimeException(ThrowingRunnable<E> fn, String msg) {
try {
fn.run();
} catch (Exception e) {
throw new RuntimeException(msg, e);
}
}
/**
* Executes given supplier and rethrows any exceptions as RuntimeException.
*
* @param fn Supplier to be executed
* @param <T> Type of returned value
* @param <E> Type of exception thrown
* @return Result of supplier execution
*/
public static <T, E extends Exception> T rethrowAsRuntimeException(ThrowingSupplier<T, E> fn) {
try {
return fn.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* Executes given supplier and rethrows any exceptions as RuntimeException with custom message.
*
* @param fn Supplier to be executed
* @param msg Message to be set for rethrown exception
* @param <T> Type of returned value
* @param <E> Type of exception thrown
* @return Result of supplier execution
*/
public static <T, E extends Exception> T rethrowAsRuntimeException(ThrowingSupplier<T, E> fn, String msg) {
try {
return fn.get();
} catch (Exception e) {
throw new RuntimeException(msg, e);
}
}
}

View File

@ -1,6 +1,6 @@
package eu.dnetlib.dhp.actionmanager.partition; package eu.dnetlib.dhp.actionmanager.partition;
import eu.dnetlib.dhp.actionmanager.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJob; import eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJob;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -18,7 +18,7 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import static eu.dnetlib.dhp.actionmanager.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static org.apache.spark.sql.functions.*; import static org.apache.spark.sql.functions.*;
/** /**

View File

@ -1,13 +1,13 @@
package eu.dnetlib.dhp.actionmanager.promote; package eu.dnetlib.dhp.actionmanager.promote;
import eu.dnetlib.dhp.actionmanager.common.FunctionalInterfaceSupport.SerializableSupplier; import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableSupplier;
import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import static eu.dnetlib.dhp.actionmanager.common.ModelSupport.isSubClass; import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass;
/** /**
* OAF model merging support. * OAF model merging support.

View File

@ -1,9 +1,10 @@
package eu.dnetlib.dhp.actionmanager.promote; package eu.dnetlib.dhp.actionmanager.promote;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.common.FunctionalInterfaceSupport.SerializableSupplier; import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableSupplier;
import eu.dnetlib.dhp.actionmanager.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -20,8 +21,8 @@ import java.util.Optional;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Function; import java.util.function.Function;
import static eu.dnetlib.dhp.actionmanager.common.ModelSupport.isSubClass; import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass;
import static eu.dnetlib.dhp.actionmanager.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
/** /**
* Applies a given action payload file to graph table of compatible type. * Applies a given action payload file to graph table of compatible type.
@ -69,34 +70,7 @@ public class PromoteActionPayloadForGraphTableJob {
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(new Class[]{ conf.registerKryoClasses(ModelSupport.getOafModelClasses());
Author.class,
Context.class,
Country.class,
DataInfo.class,
eu.dnetlib.dhp.schema.oaf.Dataset.class,
Datasource.class,
ExternalReference.class,
ExtraInfo.class,
Field.class,
GeoLocation.class,
Instance.class,
Journal.class,
KeyValue.class,
Oaf.class,
OafEntity.class,
OAIProvenance.class,
Organization.class,
OriginDescription.class,
OtherResearchProduct.class,
Project.class,
Publication.class,
Qualifier.class,
Relation.class,
Result.class,
Software.class,
StructuredProperty.class
});
runWithSparkSession(conf, isSparkSessionManaged, runWithSparkSession(conf, isSparkSessionManaged,
spark -> { spark -> {

View File

@ -1,6 +1,6 @@
package eu.dnetlib.dhp.actionmanager.promote; package eu.dnetlib.dhp.actionmanager.promote;
import eu.dnetlib.dhp.actionmanager.common.FunctionalInterfaceSupport.SerializableSupplier; import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableSupplier;
import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Oaf;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
@ -16,7 +16,7 @@ import java.util.Optional;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Function; import java.util.function.Function;
import static eu.dnetlib.dhp.actionmanager.common.ModelSupport.isSubClass; import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass;
/** /**
* Promote action payload functions. * Promote action payload functions.

View File

@ -1,78 +0,0 @@
package eu.dnetlib.dhp.actionmanager.common;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.*;
public class HdfsSupportTest {
@Nested
class Remove {
@Test
public void shouldThrowARuntimeExceptionOnError() {
// when
assertThrows(RuntimeException.class, () ->
HdfsSupport.remove(null, new Configuration()));
}
@Test
public void shouldRemoveADirFromHDFS(@TempDir Path tempDir) {
// when
HdfsSupport.remove(tempDir.toString(), new Configuration());
// then
assertFalse(Files.exists(tempDir));
}
@Test
public void shouldRemoveAFileFromHDFS(@TempDir Path tempDir) throws IOException {
// given
Path file = Files.createTempFile(tempDir, "p", "s");
// when
HdfsSupport.remove(file.toString(), new Configuration());
// then
assertFalse(Files.exists(file));
}
}
@Nested
class ListFiles {
@Test
public void shouldThrowARuntimeExceptionOnError() {
// when
assertThrows(RuntimeException.class, () ->
HdfsSupport.listFiles(null, new Configuration()));
}
@Test
public void shouldListFilesLocatedInPath(@TempDir Path tempDir) throws IOException {
Path subDir1 = Files.createTempDirectory(tempDir, "list_me");
Path subDir2 = Files.createTempDirectory(tempDir, "list_me");
// when
List<String> paths = HdfsSupport.listFiles(tempDir.toString(), new Configuration());
// then
assertEquals(2, paths.size());
List<String> expecteds = Arrays.stream(new String[]{subDir1.toString(), subDir2.toString()})
.sorted().collect(Collectors.toList());
List<String> actuals = paths.stream().sorted().collect(Collectors.toList());
assertTrue(actuals.get(0).contains(expecteds.get(0)));
assertTrue(actuals.get(1).contains(expecteds.get(1)));
}
}
}

View File

@ -1,54 +0,0 @@
package eu.dnetlib.dhp.actionmanager.common;
import eu.dnetlib.dhp.actionmanager.common.FunctionalInterfaceSupport.ThrowingConsumer;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import java.util.function.Function;
import static org.mockito.Mockito.*;
public class SparkSessionSupportTest {
@Nested
class RunWithSparkSession {
@Test
public void shouldExecuteFunctionAndNotStopSparkSessionWhenSparkSessionIsNotManaged() throws Exception {
// given
SparkSession spark = mock(SparkSession.class);
SparkConf conf = mock(SparkConf.class);
Function<SparkConf, SparkSession> sparkSessionBuilder = mock(Function.class);
when(sparkSessionBuilder.apply(conf)).thenReturn(spark);
ThrowingConsumer<SparkSession, Exception> fn = mock(ThrowingConsumer.class);
// when
SparkSessionSupport.runWithSparkSession(sparkSessionBuilder, conf, false, fn);
// then
verify(sparkSessionBuilder).apply(conf);
verify(fn).accept(spark);
verify(spark, never()).stop();
}
@Test
public void shouldExecuteFunctionAndStopSparkSessionWhenSparkSessionIsManaged() throws Exception {
// given
SparkSession spark = mock(SparkSession.class);
SparkConf conf = mock(SparkConf.class);
Function<SparkConf, SparkSession> sparkSessionBuilder = mock(Function.class);
when(sparkSessionBuilder.apply(conf)).thenReturn(spark);
ThrowingConsumer<SparkSession, Exception> fn = mock(ThrowingConsumer.class);
// when
SparkSessionSupport.runWithSparkSession(sparkSessionBuilder, conf, true, fn);
// then
verify(sparkSessionBuilder).apply(conf);
verify(fn).accept(spark);
verify(spark, times(1)).stop();
}
}
}

View File

@ -26,7 +26,7 @@ import java.nio.file.Paths;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static eu.dnetlib.dhp.actionmanager.common.ThrowingSupport.rethrowAsRuntimeException; import static eu.dnetlib.dhp.common.ThrowingSupport.rethrowAsRuntimeException;
import static org.apache.spark.sql.functions.*; import static org.apache.spark.sql.functions.*;
import static org.junit.jupiter.api.Assertions.assertIterableEquals; import static org.junit.jupiter.api.Assertions.assertIterableEquals;
import static scala.collection.JavaConversions.mutableSeqAsJavaList; import static scala.collection.JavaConversions.mutableSeqAsJavaList;

View File

@ -1,6 +1,6 @@
package eu.dnetlib.dhp.actionmanager.promote; package eu.dnetlib.dhp.actionmanager.promote;
import eu.dnetlib.dhp.actionmanager.common.FunctionalInterfaceSupport.SerializableSupplier; import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableSupplier;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;

View File

@ -1,6 +1,6 @@
package eu.dnetlib.dhp.actionmanager.promote; package eu.dnetlib.dhp.actionmanager.promote;
import eu.dnetlib.dhp.actionmanager.common.FunctionalInterfaceSupport.SerializableSupplier; import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableSupplier;
import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Oaf;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;

View File

@ -1,31 +0,0 @@
package eu.dnetlib.dhp.oa.graph;
import java.util.Map;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Software;
public class GraphMappingUtils {
public final static Map<String, Class> types = Maps.newHashMap();
static {
types.put("datasource", Datasource.class);
types.put("organization", Organization.class);
types.put("project", Project.class);
types.put("dataset", Dataset.class);
types.put("otherresearchproduct", OtherResearchProduct.class);
types.put("software", Software.class);
types.put("publication", Publication.class);
types.put("relation", Relation.class);
}
}

View File

@ -2,6 +2,7 @@ package eu.dnetlib.dhp.oa.graph;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
@ -39,7 +40,7 @@ public class SparkGraphImporterJob {
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
// Read the input file and convert it into RDD of serializable object // Read the input file and convert it into RDD of serializable object
GraphMappingUtils.types.forEach((name, clazz) -> spark.createDataset(sc.textFile(inputPath + "/" + name) ModelSupport.oafTypes.forEach((name, clazz) -> spark.createDataset(sc.textFile(inputPath + "/" + name)
.map(s -> new ObjectMapper().readValue(s, clazz)) .map(s -> new ObjectMapper().readValue(s, clazz))
.rdd(), Encoders.bean(clazz)) .rdd(), Encoders.bean(clazz))
.write() .write()

View File

@ -1,11 +1,11 @@
package eu.dnetlib.dhp.oa.provision; package eu.dnetlib.dhp.oa.provision;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity;
import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
import eu.dnetlib.dhp.oa.provision.model.Tuple2; import eu.dnetlib.dhp.oa.provision.model.Tuple2;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
@ -21,7 +21,6 @@ import java.util.List;
import java.util.Optional; import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.getKryoClasses;
/** /**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects.
@ -82,7 +81,7 @@ public class AdjacencyListBuilderJob {
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(getKryoClasses()); conf.registerKryoClasses(ModelSupport.getOafModelClasses());
runWithSparkSession(conf, isSparkSessionManaged, runWithSparkSession(conf, isSparkSessionManaged,
spark -> { spark -> {

View File

@ -6,6 +6,8 @@ import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntity; import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
import eu.dnetlib.dhp.oa.provision.model.SortableRelation; import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.OafEntity;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -91,7 +93,7 @@ public class CreateRelatedEntitiesJob_phase1 {
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(getKryoClasses()); conf.registerKryoClasses(ModelSupport.getOafModelClasses());
runWithSparkSession(conf, isSparkSessionManaged, runWithSparkSession(conf, isSparkSessionManaged,
spark -> { spark -> {

View File

@ -6,6 +6,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity;
import eu.dnetlib.dhp.oa.provision.model.TypedRow; import eu.dnetlib.dhp.oa.provision.model.TypedRow;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -26,7 +27,6 @@ import java.util.List;
import java.util.Optional; import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.getKryoClasses;
/** /**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects.
@ -93,7 +93,7 @@ public class CreateRelatedEntitiesJob_phase2 {
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(getKryoClasses()); conf.registerKryoClasses(ModelSupport.getOafModelClasses());
runWithSparkSession(conf, isSparkSessionManaged, runWithSparkSession(conf, isSparkSessionManaged,
spark -> { spark -> {

View File

@ -120,6 +120,7 @@ public class PrepareRelationsJob {
.map((MapFunction<String, SortableRelation>) value -> OBJECT_MAPPER.readValue(value, SortableRelation.class), Encoders.bean(SortableRelation.class)); .map((MapFunction<String, SortableRelation>) value -> OBJECT_MAPPER.readValue(value, SortableRelation.class), Encoders.bean(SortableRelation.class));
} }
//TODO work in progress
private static void prepareRelationsRDDFromPaths(SparkSession spark, String inputRelationsPath, String outputPath, int numPartitions) { private static void prepareRelationsRDDFromPaths(SparkSession spark, String inputRelationsPath, String outputPath, int numPartitions) {
JavaRDD<SortableRelation> rels = readPathRelationRDD(spark, inputRelationsPath) JavaRDD<SortableRelation> rels = readPathRelationRDD(spark, inputRelationsPath)
.repartition(numPartitions); .repartition(numPartitions);

View File

@ -1,12 +1,14 @@
package eu.dnetlib.dhp.oa.provision.utils; package eu.dnetlib.dhp.oa.provision.utils;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntity; import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
import eu.dnetlib.dhp.oa.provision.model.SortableRelation; import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import java.util.*; import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.substringAfter; import static org.apache.commons.lang3.StringUtils.substringAfter;
@ -15,101 +17,8 @@ public class GraphMappingUtils {
public static final String SEPARATOR = "_"; public static final String SEPARATOR = "_";
public final static Map<EntityType, Class> entityTypes = Maps.newHashMap();
static {
entityTypes.put(EntityType.datasource, Datasource.class);
entityTypes.put(EntityType.organization, Organization.class);
entityTypes.put(EntityType.project, Project.class);
entityTypes.put(EntityType.dataset, Dataset.class);
entityTypes.put(EntityType.otherresearchproduct, OtherResearchProduct.class);
entityTypes.put(EntityType.software, Software.class);
entityTypes.put(EntityType.publication, Publication.class);
}
public enum EntityType {
publication, dataset, otherresearchproduct, software, datasource, organization, project;
public static <T extends OafEntity> EntityType fromClass(Class<T> clazz) {
switch (clazz.getName()) {
case "eu.dnetlib.dhp.schema.oaf.Publication" : return publication;
case "eu.dnetlib.dhp.schema.oaf.Dataset" : return dataset;
case "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct" : return otherresearchproduct;
case "eu.dnetlib.dhp.schema.oaf.Software" : return software;
case "eu.dnetlib.dhp.schema.oaf.Datasource" : return datasource;
case "eu.dnetlib.dhp.schema.oaf.Organization" : return organization;
case "eu.dnetlib.dhp.schema.oaf.Project" : return project;
default: throw new IllegalArgumentException("Unknown OafEntity class: " + clazz.getName());
}
}
}
public enum MainEntityType {
result, datasource, organization, project
}
public static Set<String> authorPidTypes = Sets.newHashSet("orcid", "magidentifier"); public static Set<String> authorPidTypes = Sets.newHashSet("orcid", "magidentifier");
private static final String schemeTemplate = "dnet:%s_%s_relations";
private static Map<EntityType, MainEntityType> entityMapping = Maps.newHashMap();
static {
entityMapping.put(EntityType.publication, MainEntityType.result);
entityMapping.put(EntityType.dataset, MainEntityType.result);
entityMapping.put(EntityType.otherresearchproduct, MainEntityType.result);
entityMapping.put(EntityType.software, MainEntityType.result);
entityMapping.put(EntityType.datasource, MainEntityType.datasource);
entityMapping.put(EntityType.organization, MainEntityType.organization);
entityMapping.put(EntityType.project, MainEntityType.project);
}
public static Class[] getKryoClasses() {
return new Class[]{
Author.class,
Context.class,
Country.class,
DataInfo.class,
eu.dnetlib.dhp.schema.oaf.Dataset.class,
Datasource.class,
ExternalReference.class,
ExtraInfo.class,
Field.class,
GeoLocation.class,
Instance.class,
Journal.class,
KeyValue.class,
Oaf.class,
OafEntity.class,
OAIProvenance.class,
Organization.class,
OriginDescription.class,
OtherResearchProduct.class,
Project.class,
Publication.class,
Qualifier.class,
Relation.class,
SortableRelation.class, //SUPPORT
Result.class,
Software.class,
StructuredProperty.class
};
}
public static String getScheme(final String sourceType, final String targetType) {
return String.format(schemeTemplate,
entityMapping.get(EntityType.valueOf(sourceType)).name(),
entityMapping.get(EntityType.valueOf(targetType)).name());
}
public static String getMainType(final EntityType type) {
return entityMapping.get(type).name();
}
public static boolean isResult(EntityType type) {
return MainEntityType.result.name().equals(getMainType(type));
}
public static <E extends OafEntity> RelatedEntity asRelatedEntity(E entity, Class<E> clazz) { public static <E extends OafEntity> RelatedEntity asRelatedEntity(E entity, Class<E> clazz) {
final RelatedEntity re = new RelatedEntity(); final RelatedEntity re = new RelatedEntity();
@ -119,7 +28,7 @@ public class GraphMappingUtils {
re.setPid(entity.getPid()); re.setPid(entity.getPid());
re.setCollectedfrom(entity.getCollectedfrom()); re.setCollectedfrom(entity.getCollectedfrom());
switch (GraphMappingUtils.EntityType.fromClass(clazz)) { switch (EntityType.fromClass(clazz)) {
case publication: case publication:
case dataset: case dataset:
case otherresearchproduct: case otherresearchproduct:

View File

@ -9,10 +9,14 @@ import com.google.common.collect.Sets;
import com.mycila.xmltool.XMLDoc; import com.mycila.xmltool.XMLDoc;
import com.mycila.xmltool.XMLTag; import com.mycila.xmltool.XMLTag;
import eu.dnetlib.dhp.oa.provision.model.*; import eu.dnetlib.dhp.oa.provision.model.*;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.MainEntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.spark.util.LongAccumulator; import org.apache.spark.util.LongAccumulator;
import org.codehaus.janino.Mod;
import org.dom4j.Document; import org.dom4j.Document;
import org.dom4j.DocumentException; import org.dom4j.DocumentException;
import org.dom4j.Element; import org.dom4j.Element;
@ -78,13 +82,13 @@ public class XmlRecordFactory implements Serializable {
final OafEntity entity = toOafEntity(je.getEntity()); final OafEntity entity = toOafEntity(je.getEntity());
TemplateFactory templateFactory = new TemplateFactory(); TemplateFactory templateFactory = new TemplateFactory();
try { try {
final EntityType type = GraphMappingUtils.EntityType.valueOf(je.getEntity().getType()); final EntityType type = EntityType.valueOf(je.getEntity().getType());
final List<String> metadata = metadata(type, entity, contexts); final List<String> metadata = metadata(type, entity, contexts);
// rels has to be processed before the contexts because they enrich the contextMap with the funding info. // rels has to be processed before the contexts because they enrich the contextMap with the funding info.
final List<String> relations = listRelations(je, templateFactory, contexts); final List<String> relations = listRelations(je, templateFactory, contexts);
final String mainType = getMainType(type); final String mainType = ModelSupport.getMainType(type);
metadata.addAll(buildContexts(mainType, contexts)); metadata.addAll(buildContexts(mainType, contexts));
metadata.add(XmlSerializationUtils.parseDataInfo(entity.getDataInfo())); metadata.add(XmlSerializationUtils.parseDataInfo(entity.getDataInfo()));
@ -106,7 +110,7 @@ public class XmlRecordFactory implements Serializable {
private static OafEntity parseOaf(final String json, final String type) { private static OafEntity parseOaf(final String json, final String type) {
try { try {
switch (GraphMappingUtils.EntityType.valueOf(type)) { switch (EntityType.valueOf(type)) {
case publication: case publication:
return OBJECT_MAPPER.readValue(json, Publication.class); return OBJECT_MAPPER.readValue(json, Publication.class);
case dataset: case dataset:
@ -168,7 +172,7 @@ public class XmlRecordFactory implements Serializable {
.collect(Collectors.toList())); .collect(Collectors.toList()));
} }
if (GraphMappingUtils.isResult(type)) { if (ModelSupport.isResult(type)) {
final Result r = (Result) entity; final Result r = (Result) entity;
if (r.getContext() != null) { if (r.getContext() != null) {
@ -756,7 +760,7 @@ public class XmlRecordFactory implements Serializable {
} }
final DataInfo info = rel.getDataInfo(); final DataInfo info = rel.getDataInfo();
final String scheme = getScheme(re.getType(), targetType); final String scheme = ModelSupport.getScheme(re.getType(), targetType);
if (StringUtils.isBlank(scheme)) { if (StringUtils.isBlank(scheme)) {
throw new IllegalArgumentException(String.format("missing scheme for: <%s - %s>", re.getType(), targetType)); throw new IllegalArgumentException(String.format("missing scheme for: <%s - %s>", re.getType(), targetType));
@ -782,7 +786,7 @@ public class XmlRecordFactory implements Serializable {
final List<String> children = Lists.newArrayList(); final List<String> children = Lists.newArrayList();
EntityType entityType = EntityType.valueOf(type); EntityType entityType = EntityType.valueOf(type);
if (MainEntityType.result.toString().equals(getMainType(entityType))) { if (MainEntityType.result.toString().equals(ModelSupport.getMainType(entityType))) {
final List<Instance> instances = ((Result) entity).getInstance(); final List<Instance> instances = ((Result) entity).getInstance();
if (instances != null) { if (instances != null) {
for (final Instance instance : ((Result) entity).getInstance()) { for (final Instance instance : ((Result) entity).getInstance()) {