dataset based provision WIP

This commit is contained in:
Claudio Atzori 2020-04-04 14:03:43 +02:00
parent 24b2c9012e
commit 3d1b637cab
31 changed files with 1739 additions and 620 deletions

View File

@ -13,6 +13,26 @@
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-schemas</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>

View File

@ -0,0 +1,56 @@
package eu.dnetlib.dhp.common;
import java.io.Serializable;
import java.util.function.Supplier;
/**
* Provides serializable and throwing extensions to standard functional interfaces.
*/
public class FunctionalInterfaceSupport {
private FunctionalInterfaceSupport() {
}
/**
* Serializable supplier of any kind of objects. To be used withing spark processing pipelines when supplying
* functions externally.
*
* @param <T>
*/
@FunctionalInterface
public interface SerializableSupplier<T> extends Supplier<T>, Serializable {
}
/**
* Extension of consumer accepting functions throwing an exception.
*
* @param <T>
* @param <E>
*/
@FunctionalInterface
public interface ThrowingConsumer<T, E extends Exception> {
void accept(T t) throws E;
}
/**
* Extension of supplier accepting functions throwing an exception.
*
* @param <T>
* @param <E>
*/
@FunctionalInterface
public interface ThrowingSupplier<T, E extends Exception> {
T get() throws E;
}
/**
* Extension of runnable accepting functions throwing an exception.
*
* @param <E>
*/
@FunctionalInterface
public interface ThrowingRunnable<E extends Exception> {
void run() throws E;
}
}

View File

@ -0,0 +1,57 @@
package eu.dnetlib.dhp.common;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.common.ThrowingSupport.rethrowAsRuntimeException;
/**
* HDFS utility methods.
*/
public class HdfsSupport {
private static final Logger logger = LoggerFactory.getLogger(HdfsSupport.class);
private HdfsSupport() {
}
/**
* Removes a path (file or dir) from HDFS.
*
* @param path Path to be removed
* @param configuration Configuration of hadoop env
*/
public static void remove(String path, Configuration configuration) {
logger.info("Removing path: {}", path);
rethrowAsRuntimeException(() -> {
Path f = new Path(path);
FileSystem fileSystem = FileSystem.get(configuration);
if (fileSystem.exists(f)) {
fileSystem.delete(f, true);
}
});
}
/**
* Lists hadoop files located below path or alternatively lists subdirs under path.
*
* @param path Path to be listed for hadoop files
* @param configuration Configuration of hadoop env
* @return List with string locations of hadoop files
*/
public static List<String> listFiles(String path, Configuration configuration) {
logger.info("Listing files in path: {}", path);
return rethrowAsRuntimeException(() -> Arrays
.stream(FileSystem.get(configuration).listStatus(new Path(path)))
.filter(FileStatus::isDirectory)
.map(x -> x.getPath().toString())
.collect(Collectors.toList()));
}
}

View File

@ -0,0 +1,57 @@
package eu.dnetlib.dhp.common;
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.ThrowingConsumer;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import java.util.Objects;
import java.util.function.Function;
/**
* SparkSession utility methods.
*/
public class SparkSessionSupport {
private SparkSessionSupport() {
}
/**
* Runs a given function using SparkSession created using default builder and supplied SparkConf. Stops SparkSession
* when SparkSession is managed. Allows to reuse SparkSession created externally.
*
* @param conf SparkConf instance
* @param isSparkSessionManaged When true will stop SparkSession
* @param fn Consumer to be applied to constructed SparkSession
*/
public static void runWithSparkSession(SparkConf conf,
Boolean isSparkSessionManaged,
ThrowingConsumer<SparkSession, Exception> fn) {
runWithSparkSession(c -> SparkSession.builder().config(c).getOrCreate(), conf, isSparkSessionManaged, fn);
}
/**
* Runs a given function using SparkSession created using supplied builder and supplied SparkConf. Stops SparkSession
* when SparkSession is managed. Allows to reuse SparkSession created externally.
*
* @param sparkSessionBuilder Builder of SparkSession
* @param conf SparkConf instance
* @param isSparkSessionManaged When true will stop SparkSession
* @param fn Consumer to be applied to constructed SparkSession
*/
public static void runWithSparkSession(Function<SparkConf, SparkSession> sparkSessionBuilder,
SparkConf conf,
Boolean isSparkSessionManaged,
ThrowingConsumer<SparkSession, Exception> fn) {
SparkSession spark = null;
try {
spark = sparkSessionBuilder.apply(conf);
fn.accept(spark);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (Objects.nonNull(spark) && isSparkSessionManaged) {
spark.stop();
}
}
}
}

View File

@ -0,0 +1,76 @@
package eu.dnetlib.dhp.common;
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.ThrowingRunnable;
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.ThrowingSupplier;
/**
* Exception handling utility methods.
*/
public class ThrowingSupport {
private ThrowingSupport() {
}
/**
* Executes given runnable and rethrows any exceptions as RuntimeException.
*
* @param fn Runnable to be executed
* @param <E> Type of exception thrown
*/
public static <E extends Exception> void rethrowAsRuntimeException(ThrowingRunnable<E> fn) {
try {
fn.run();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* Executes given runnable and rethrows any exceptions as RuntimeException with custom message.
*
* @param fn Runnable to be executed
* @param msg Message to be set for rethrown exception
* @param <E> Type of exception thrown
*/
public static <E extends Exception> void rethrowAsRuntimeException(ThrowingRunnable<E> fn, String msg) {
try {
fn.run();
} catch (Exception e) {
throw new RuntimeException(msg, e);
}
}
/**
* Executes given supplier and rethrows any exceptions as RuntimeException.
*
* @param fn Supplier to be executed
* @param <T> Type of returned value
* @param <E> Type of exception thrown
* @return Result of supplier execution
*/
public static <T, E extends Exception> T rethrowAsRuntimeException(ThrowingSupplier<T, E> fn) {
try {
return fn.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* Executes given supplier and rethrows any exceptions as RuntimeException with custom message.
*
* @param fn Supplier to be executed
* @param msg Message to be set for rethrown exception
* @param <T> Type of returned value
* @param <E> Type of exception thrown
* @return Result of supplier execution
*/
public static <T, E extends Exception> T rethrowAsRuntimeException(ThrowingSupplier<T, E> fn, String msg) {
try {
return fn.get();
} catch (Exception e) {
throw new RuntimeException(msg, e);
}
}
}

View File

@ -0,0 +1,78 @@
package eu.dnetlib.dhp.common;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.*;
public class HdfsSupportTest {
@Nested
class Remove {
@Test
public void shouldThrowARuntimeExceptionOnError() {
// when
assertThrows(RuntimeException.class, () ->
HdfsSupport.remove(null, new Configuration()));
}
@Test
public void shouldRemoveADirFromHDFS(@TempDir Path tempDir) {
// when
HdfsSupport.remove(tempDir.toString(), new Configuration());
// then
assertFalse(Files.exists(tempDir));
}
@Test
public void shouldRemoveAFileFromHDFS(@TempDir Path tempDir) throws IOException {
// given
Path file = Files.createTempFile(tempDir, "p", "s");
// when
HdfsSupport.remove(file.toString(), new Configuration());
// then
assertFalse(Files.exists(file));
}
}
@Nested
class ListFiles {
@Test
public void shouldThrowARuntimeExceptionOnError() {
// when
assertThrows(RuntimeException.class, () ->
HdfsSupport.listFiles(null, new Configuration()));
}
@Test
public void shouldListFilesLocatedInPath(@TempDir Path tempDir) throws IOException {
Path subDir1 = Files.createTempDirectory(tempDir, "list_me");
Path subDir2 = Files.createTempDirectory(tempDir, "list_me");
// when
List<String> paths = HdfsSupport.listFiles(tempDir.toString(), new Configuration());
// then
assertEquals(2, paths.size());
List<String> expecteds = Arrays.stream(new String[]{subDir1.toString(), subDir2.toString()})
.sorted().collect(Collectors.toList());
List<String> actuals = paths.stream().sorted().collect(Collectors.toList());
assertTrue(actuals.get(0).contains(expecteds.get(0)));
assertTrue(actuals.get(1).contains(expecteds.get(1)));
}
}
}

View File

@ -0,0 +1,36 @@
package eu.dnetlib.dhp.common;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ModelSupportTest {
@Nested
class IsSubClass {
@Test
public void shouldReturnFalseWhenSubClassDoesNotExtendSuperClass() {
// when
Boolean result = ModelSupport.isSubClass(Relation.class, OafEntity.class);
// then
assertFalse(result);
}
@Test
public void shouldReturnTrueWhenSubClassExtendsSuperClass() {
// when
Boolean result = ModelSupport.isSubClass(Result.class, OafEntity.class);
// then
assertTrue(result);
}
}
}

View File

@ -0,0 +1,54 @@
package eu.dnetlib.dhp.common;
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.ThrowingConsumer;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import java.util.function.Function;
import static org.mockito.Mockito.*;
public class SparkSessionSupportTest {
@Nested
class RunWithSparkSession {
@Test
public void shouldExecuteFunctionAndNotStopSparkSessionWhenSparkSessionIsNotManaged() throws Exception {
// given
SparkSession spark = mock(SparkSession.class);
SparkConf conf = mock(SparkConf.class);
Function<SparkConf, SparkSession> sparkSessionBuilder = mock(Function.class);
when(sparkSessionBuilder.apply(conf)).thenReturn(spark);
ThrowingConsumer<SparkSession, Exception> fn = mock(ThrowingConsumer.class);
// when
SparkSessionSupport.runWithSparkSession(sparkSessionBuilder, conf, false, fn);
// then
verify(sparkSessionBuilder).apply(conf);
verify(fn).accept(spark);
verify(spark, never()).stop();
}
@Test
public void shouldExecuteFunctionAndStopSparkSessionWhenSparkSessionIsManaged() throws Exception {
// given
SparkSession spark = mock(SparkSession.class);
SparkConf conf = mock(SparkConf.class);
Function<SparkConf, SparkSession> sparkSessionBuilder = mock(Function.class);
when(sparkSessionBuilder.apply(conf)).thenReturn(spark);
ThrowingConsumer<SparkSession, Exception> fn = mock(ThrowingConsumer.class);
// when
SparkSessionSupport.runWithSparkSession(sparkSessionBuilder, conf, true, fn);
// then
verify(sparkSessionBuilder).apply(conf);
verify(fn).accept(spark);
verify(spark, times(1)).stop();
}
}
}

View File

@ -0,0 +1,51 @@
package eu.dnetlib.dhp.schema.common;
import eu.dnetlib.dhp.schema.oaf.Oaf;
/**
* Inheritance utility methods.
*/
public class ModelSupport {
private ModelSupport() {
}
/**
* Checks subclass-superclass relationship.
*
* @param subClazzObject Subclass object instance
* @param superClazzObject Superclass object instance
* @param <X> Subclass type
* @param <Y> Superclass type
* @return True if X is a subclass of Y
*/
public static <X extends Oaf, Y extends Oaf> Boolean isSubClass(X subClazzObject, Y superClazzObject) {
return isSubClass(subClazzObject.getClass(), superClazzObject.getClass());
}
/**
* Checks subclass-superclass relationship.
*
* @param subClazzObject Subclass object instance
* @param superClazz Superclass class
* @param <X> Subclass type
* @param <Y> Superclass type
* @return True if X is a subclass of Y
*/
public static <X extends Oaf, Y extends Oaf> Boolean isSubClass(X subClazzObject, Class<Y> superClazz) {
return isSubClass(subClazzObject.getClass(), superClazz);
}
/**
* Checks subclass-superclass relationship.
*
* @param subClazz Subclass class
* @param superClazz Superclass class
* @param <X> Subclass type
* @param <Y> Superclass type
* @return True if X is a subclass of Y
*/
public static <X extends Oaf, Y extends Oaf> Boolean isSubClass(Class<X> subClazz, Class<Y> superClazz) {
return superClazz.isAssignableFrom(subClazz);
}
}

View File

@ -92,8 +92,7 @@ public class Relation extends Oaf {
subRelType.equals(relation.subRelType) &&
relClass.equals(relation.relClass) &&
source.equals(relation.source) &&
target.equals(relation.target) &&
Objects.equals(collectedFrom, relation.collectedFrom);
target.equals(relation.target);
}
@Override

View File

@ -0,0 +1,167 @@
package eu.dnetlib.dhp.oa.provision;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.*;
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.IOException;
import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*;
/**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects.
* The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization,
* and all the possible relationships (similarity links produced by the Dedup process are excluded).
*
* The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and again
* by E, finally grouped by E.id;
*
* The workflow is organized in different parts aimed to to reduce the complexity of the operation
* 1) PrepareRelationsJob:
* only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity
* can be linked at most to 100 other objects
*
* 2) JoinRelationEntityByTargetJob:
* prepare tuples [source entity - relation - target entity] (S - R - T):
* for each entity type E_i
* join (R.target = E_i.id),
* map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i]
* join (E_i.id = [R - T_i].source), where E_i becomes the source entity S
*
* 3) AdjacencyListBuilderJob:
* given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mappnig the result as JoinedEntity
*
* 4) XmlConverterJob:
* convert the JoinedEntities as XML records
*/
public class AdjacencyListBuilderJob {
private static final Logger log = LoggerFactory.getLogger(AdjacencyListBuilderJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils.toString(
AdjacencyListBuilderJob.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json")));
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(getKryoClasses());
runWithSparkSession(conf, isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
createAdjacencyLists(spark, inputPath, outputPath);
});
}
private static void createAdjacencyLists(SparkSession spark, String inputPath, String outputPath) {
RDD<JoinedEntity> joined = spark.read()
.load(inputPath)
.as(Encoders.kryo(EntityRelEntity.class))
.javaRDD()
.map(e -> getJoinedEntity(e))
.mapToPair(e -> new Tuple2<>(e.getEntity().getId(), e))
.reduceByKey((j1, j2) -> getJoinedEntity(j1, j2))
.map(Tuple2::_2)
.rdd();
spark.createDataset(joined, Encoders.bean(JoinedEntity.class))
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
}
private static JoinedEntity getJoinedEntity(JoinedEntity j1, JoinedEntity j2) {
JoinedEntity je = new JoinedEntity();
je.setEntity(je.getEntity());
je.setType(j1.getType());
Links links = new Links();
links.addAll(j1.getLinks());
links.addAll(j2.getLinks());
return je;
}
private static JoinedEntity getJoinedEntity(EntityRelEntity e) {
JoinedEntity j = new JoinedEntity();
j.setEntity(toOafEntity(e.getEntity()));
j.setType(EntityType.valueOf(e.getEntity().getType()));
Links links = new Links();
links.add(new eu.dnetlib.dhp.oa.provision.model.Tuple2(e.getRelation(), e.getTarget()));
j.setLinks(links);
return j;
}
private static OafEntity toOafEntity(TypedRow typedRow) {
return parseOaf(typedRow.getOaf(), typedRow.getType());
}
private static OafEntity parseOaf(final String json, final String type) {
try {
switch (GraphMappingUtils.EntityType.valueOf(type)) {
case publication:
return OBJECT_MAPPER.readValue(json, Publication.class);
case dataset:
return OBJECT_MAPPER.readValue(json, Dataset.class);
case otherresearchproduct:
return OBJECT_MAPPER.readValue(json, OtherResearchProduct.class);
case software:
return OBJECT_MAPPER.readValue(json, Software.class);
case datasource:
return OBJECT_MAPPER.readValue(json, Datasource.class);
case organization:
return OBJECT_MAPPER.readValue(json, Organization.class);
case project:
return OBJECT_MAPPER.readValue(json, Project.class);
default:
throw new IllegalArgumentException("invalid type: " + type);
}
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}

View File

@ -0,0 +1,157 @@
package eu.dnetlib.dhp.oa.provision;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity;
import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*;
/**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects.
* The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization,
* and all the possible relationships (similarity links produced by the Dedup process are excluded).
*
* The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and again
* by E, finally grouped by E.id;
*
* The workflow is organized in different parts aimed to to reduce the complexity of the operation
* 1) PrepareRelationsJob:
* only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity
* can be linked at most to 100 other objects
*
* 2) CreateRelatedEntitiesJob_phase1:
* prepare tuples [relation - target entity] (R - T):
* for each entity type E_i
* join (R.target = E_i.id),
* map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i]
* save the tuples [R - T_i] in append mode
*
* 3) CreateRelatedEntitiesJob_phase2:
* prepare tuples [source entity - relation - target entity] (S - R - T):
* create the union of the each entity type, hash by id (S)
* for each [R - T_i] produced in phase1
* join S.id = [R - T_i].source to produce (S_i - R - T_i)
* save in append mode
*
* 4) AdjacencyListBuilderJob:
* given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mappnig the result as JoinedEntity
*
* 5) XmlConverterJob:
* convert the JoinedEntities as XML records
*/
public class CreateRelatedEntitiesJob_phase1 {
private static final Logger log = LoggerFactory.getLogger(CreateRelatedEntitiesJob_phase1.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils.toString(
PrepareRelationsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase1.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputRelationsPath = parser.get("inputRelationsPath");
log.info("inputRelationsPath: {}", inputRelationsPath);
String inputEntityPath = parser.get("inputEntityPath");
log.info("inputEntityPath: {}", inputEntityPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
Class<? extends OafEntity> entityClazz = (Class<? extends OafEntity>) Class.forName(graphTableClassName);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(getKryoClasses());
runWithSparkSession(conf, isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
joinRelationEntity(spark, inputRelationsPath, inputEntityPath, entityClazz, outputPath);
});
}
private static <E extends OafEntity> void joinRelationEntity(SparkSession spark, String inputRelationsPath, String inputEntityPath, Class<E> entityClazz, String outputPath) {
Dataset<Tuple2<String, SortableRelation>> relsByTarget = readPathRelation(spark, inputRelationsPath)
.map((MapFunction<SortableRelation, Tuple2<String, SortableRelation>>) r -> new Tuple2<>(r.getTarget(), r),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(SortableRelation.class)));
Dataset<Tuple2<String, E>> entities = readPathEntity(spark, inputEntityPath, entityClazz)
.map((MapFunction<E, Tuple2<String, E>>) e -> new Tuple2<>(e.getId(), e),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(entityClazz)))
.cache();
relsByTarget
.joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner")
.filter((FilterFunction<Tuple2<Tuple2<String, SortableRelation>, Tuple2<String, E>>>)
value -> value._2()._2().getDataInfo().getDeletedbyinference() == false)
.map((MapFunction<Tuple2<Tuple2<String, SortableRelation>, Tuple2<String, E>>, EntityRelEntity>)
t -> new EntityRelEntity(t._1()._2(), GraphMappingUtils.asRelatedEntity(t._2()._2(), entityClazz)),
Encoders.bean(EntityRelEntity.class))
.write()
.mode(SaveMode.Append)
.parquet(outputPath);
}
private static <E extends OafEntity> Dataset<E> readPathEntity(SparkSession spark, String inputEntityPath, Class<E> entityClazz) {
log.info("Reading Graph table from: {}", inputEntityPath);
return spark
.read()
.textFile(inputEntityPath)
.map((MapFunction<String, E>) value -> OBJECT_MAPPER.readValue(value, entityClazz), Encoders.bean(entityClazz));
}
/**
* Reads a Dataset of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text file,
*
* @param spark
* @param relationPath
* @return the Dataset<SortableRelation> containing all the relationships
*/
private static Dataset<SortableRelation> readPathRelation(SparkSession spark, final String relationPath) {
log.info("Reading relations from: {}", relationPath);
return spark.read()
.load(relationPath)
.as(Encoders.bean(SortableRelation.class));
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}

View File

@ -0,0 +1,168 @@
package eu.dnetlib.dhp.oa.provision;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity;
import eu.dnetlib.dhp.oa.provision.model.TypedRow;
import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.*;
/**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects.
* The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization,
* and all the possible relationships (similarity links produced by the Dedup process are excluded).
*
* The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and again
* by E, finally grouped by E.id;
*
* The workflow is organized in different parts aimed to to reduce the complexity of the operation
* 1) PrepareRelationsJob:
* only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity
* can be linked at most to 100 other objects
*
* 2) CreateRelatedEntitiesJob_phase1:
* prepare tuples [relation - target entity] (R - T):
* for each entity type E_i
* join (R.target = E_i.id),
* map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i]
* save the tuples [R - T_i] in append mode
*
* 3) CreateRelatedEntitiesJob_phase2:
* prepare tuples [source entity - relation - target entity] (S - R - T):
* create the union of the each entity type, hash by id (S)
* for each [R - T_i] produced in phase1
* join S.id = [R - T_i].source to produce (S_i - R - T_i)
* save in append mode
*
* 4) AdjacencyListBuilderJob:
* given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mappnig the result as JoinedEntity
*
* 5) XmlConverterJob:
* convert the JoinedEntities as XML records
*/
public class CreateRelatedEntitiesJob_phase2 {
private static final Logger log = LoggerFactory.getLogger(CreateRelatedEntitiesJob_phase2.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils.toString(
PrepareRelationsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase1.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputRelatedEntitiesPath = parser.get("inputRelatedEntitiesPath");
log.info("inputRelatedEntitiesPath: {}", inputRelatedEntitiesPath);
String inputGraphPath = parser.get("inputGraphPath");
log.info("inputGraphPath: {}", inputGraphPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(getKryoClasses());
runWithSparkSession(conf, isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
joinAllEntities(spark, inputRelatedEntitiesPath, inputGraphPath, outputPath);
});
}
private static void joinAllEntities(SparkSession spark, String inputRelatedEntitiesPath, String inputGraphPath, String outputPath) {
Dataset<Tuple2<String, EntityRelEntity>> relsBySource = readRelatedEntities(spark, inputRelatedEntitiesPath);
Dataset<Tuple2<String, TypedRow>> entities = readAllEntities(spark, inputGraphPath);
entities
.joinWith(relsBySource, entities.col("_1").equalTo(relsBySource.col("_1")), "left_outer")
.map((MapFunction<Tuple2<Tuple2<String, TypedRow>, Tuple2<String, EntityRelEntity>>, EntityRelEntity>) value -> {
EntityRelEntity re = new EntityRelEntity();
re.setEntity(value._1()._2());
Optional<EntityRelEntity> related = Optional.ofNullable(value._2()).map(Tuple2::_2);
if (related.isPresent()) {
re.setRelation(related.get().getRelation());
re.setTarget(related.get().getTarget());
}
return re;
}, Encoders.bean(EntityRelEntity.class))
.write()
.mode(SaveMode.Append)
.parquet(outputPath);
}
private static Dataset<Tuple2<String, TypedRow>> readAllEntities(SparkSession spark, String inputGraphPath) {
return GraphMappingUtils.entityTypes.entrySet()
.stream()
.map((Function<Map.Entry<GraphMappingUtils.EntityType, Class>, Dataset<TypedRow>>)
e -> readPathEntity(spark, inputGraphPath + "/" + e.getKey().name(), e.getValue())
.map((MapFunction<OafEntity, TypedRow>) entity -> {
TypedRow t = new TypedRow();
t.setType(e.getKey().name());
t.setDeleted(entity.getDataInfo().getDeletedbyinference());
t.setId(entity.getId());
t.setOaf(OBJECT_MAPPER.writeValueAsString(entity));
return t;
}, Encoders.bean(TypedRow.class)))
.reduce(spark.emptyDataset(Encoders.bean(TypedRow.class)), Dataset::union)
.map((MapFunction<TypedRow, Tuple2<String, TypedRow>>)
value -> new Tuple2<>(value.getId(), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class)));
}
private static Dataset<Tuple2<String, EntityRelEntity>> readRelatedEntities(SparkSession spark, String inputRelatedEntitiesPath) {
return spark.read()
.load(inputRelatedEntitiesPath)
.as(Encoders.kryo(EntityRelEntity.class))
.map((MapFunction<EntityRelEntity, Tuple2<String, EntityRelEntity>>)
value -> new Tuple2<>(value.getRelation().getSource(), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class)));
}
private static <E extends OafEntity> Dataset<E> readPathEntity(SparkSession spark, String inputEntityPath, Class<E> entityClazz) {
log.info("Reading Graph table from: {}", inputEntityPath);
return spark
.read()
.textFile(inputEntityPath)
.map((MapFunction<String, E>) value -> OBJECT_MAPPER.readValue(value, entityClazz), Encoders.bean(entityClazz));
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}

View File

@ -1,346 +0,0 @@
package eu.dnetlib.dhp.oa.provision;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import eu.dnetlib.dhp.oa.provision.model.*;
import eu.dnetlib.dhp.oa.provision.utils.*;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.types.*;
import org.apache.spark.util.LongAccumulator;
import scala.Tuple2;
import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import static org.apache.spark.sql.functions.*;
import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.asRelatedEntity;
/**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects.
* The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization,
* and all the possible relationships (similarity links produced by the Dedup process are excluded).
*
* The operation is implemented creating the union between the entity types (E), joined by the relationships (R), and again
* by E, finally grouped by E.id;
*
* Different manipulations of the E and R sets are introduced to reduce the complexity of the operation
* 1) treat the object payload as string, extracting only the necessary information beforehand using json path,
* it seems that deserializing it with jackson's object mapper has higher memory footprint.
*
* 2) only consider rels that are not virtually deleted ($.dataInfo.deletedbyinference == false)
* 3) we only need a subset of fields from the related entities, so we introduce a distinction between E_source = S
* and E_target = T. Objects in T are heavily pruned by all the unnecessary information
*
* 4) perform the join as (((T.id join R.target) union S) groupby S.id) yield S -> [ <T, R> ]
*/
public class GraphJoiner_v2 implements Serializable {
private Map<String, LongAccumulator> accumulators = Maps.newHashMap();
public static final int MAX_RELS = 100;
public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd";
private SparkSession spark;
private ContextMapper contextMapper;
private String inputPath;
private String outPath;
private String otherDsTypeId;
public GraphJoiner_v2(SparkSession spark, ContextMapper contextMapper, String otherDsTypeId, String inputPath, String outPath) {
this.spark = spark;
this.contextMapper = contextMapper;
this.otherDsTypeId = otherDsTypeId;
this.inputPath = inputPath;
this.outPath = outPath;
final SparkContext sc = spark.sparkContext();
prepareAccumulators(sc);
}
public GraphJoiner_v2 adjacencyLists() throws IOException {
final JavaSparkContext jsc = JavaSparkContext.fromSparkContext(getSpark().sparkContext());
// read each entity
Dataset<TypedRow> datasource = readPathEntity(jsc, getInputPath(), "datasource");
Dataset<TypedRow> organization = readPathEntity(jsc, getInputPath(), "organization");
Dataset<TypedRow> project = readPathEntity(jsc, getInputPath(), "project");
Dataset<TypedRow> dataset = readPathEntity(jsc, getInputPath(), "dataset");
Dataset<TypedRow> otherresearchproduct = readPathEntity(jsc, getInputPath(), "otherresearchproduct");
Dataset<TypedRow> software = readPathEntity(jsc, getInputPath(), "software");
Dataset<TypedRow> publication = readPathEntity(jsc, getInputPath(), "publication");
// create the union between all the entities
datasource
.union(organization)
.union(project)
.union(dataset)
.union(otherresearchproduct)
.union(software)
.union(publication)
.repartition(7000)
.write()
.partitionBy("id")
.parquet(getOutPath() + "/entities");
Dataset<Tuple2<String, TypedRow>> entities = getSpark()
.read()
.load(getOutPath() + "/entities")
.map((MapFunction<Row, Tuple2<String, TypedRow>>) r -> {
TypedRow t = new TypedRow();
t.setId(r.getAs("id"));
t.setDeleted(r.getAs("deleted"));
t.setType(r.getAs("type"));
t.setOaf(r.getAs("oaf"));
return new Tuple2<>(t.getId(), t);
}, Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class)))
.cache();
System.out.println("Entities, number of partitions: " + entities.rdd().getNumPartitions());
System.out.println("Entities schema:");
entities.printSchema();
System.out.println("Entities count:" + entities.count());
// reads the relationships
readPathRelation(jsc, getInputPath())
.groupByKey((MapFunction<Relation, SortableRelationKey>) t -> SortableRelationKey.from(t), Encoders.kryo(SortableRelationKey.class))
.flatMapGroups((FlatMapGroupsFunction<SortableRelationKey, Relation, Relation>) (key, values) -> Iterators.limit(values, MAX_RELS), Encoders.kryo(Relation.class))
.repartition(3000)
.write()
.partitionBy("source", "target")
.parquet(getOutPath() + "/relations");
Dataset<Relation> rels = getSpark()
.read()
.load(getOutPath() + "/relations")
.map((MapFunction<Row, Relation>) r -> {
Relation rel = new Relation();
rel.setSource(r.getAs("source"));
rel.setTarget(r.getAs("target"));
rel.setRelType(r.getAs("relType"));
rel.setSubRelType(r.getAs("subRelType"));
rel.setRelClass(r.getAs("relClass"));
rel.setDataInfo(r.getAs("dataInfo"));
rel.setCollectedFrom(r.getList(r.fieldIndex("collectedFrom")));
return rel;
}, Encoders.kryo(Relation.class))
.cache();
System.out.println("Relation schema:");
System.out.println("Relation, number of partitions: " + rels.rdd().getNumPartitions());
System.out.println("Relation schema:");
entities.printSchema();
System.out.println("Relation count:" + rels.count());
/*
Dataset<Tuple2<String, Relation>> relsByTarget = rels
.map((MapFunction<Relation, Tuple2<String, Relation>>) r -> new Tuple2<>(r.getTarget(), r), Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class)));
relsByTarget
.joinWith(entities, relsByTarget.col("_1").equalTo(entities.col("_1")), "inner")
.filter((FilterFunction<Tuple2<Tuple2<String, Relation>, Tuple2<String, TypedRow>>>) value -> value._2()._2().getDeleted() == false)
.map((MapFunction<Tuple2<Tuple2<String, Relation>, Tuple2<String, TypedRow>>, EntityRelEntity>) t -> {
EntityRelEntity e = new EntityRelEntity();
e.setRelation(t._1()._2());
e.setTarget(asRelatedEntity(t._2()._2()));
return e;
}, Encoders.kryo(EntityRelEntity.class))
.repartition(20000)
.write()
.parquet(getOutPath() + "/bySource");
Dataset<Tuple2<String, EntityRelEntity>> bySource = getSpark()
.read()
.load(getOutPath() + "/bySource")
.map(new MapFunction<Row, EntityRelEntity>() {
@Override
public EntityRelEntity call(Row value) throws Exception {
return null;
}
}, Encoders.kryo(EntityRelEntity.class))
.map((MapFunction<EntityRelEntity, Tuple2<String, EntityRelEntity>>) e -> new Tuple2<>(e.getRelation().getSource(), e),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class)))
System.out.println("bySource schema");
bySource.printSchema();
Dataset<EntityRelEntity> joined = entities
.joinWith(bySource, entities.col("_1").equalTo(bySource.col("_1")), "left")
.map((MapFunction<Tuple2<Tuple2<String, TypedRow>, Tuple2<String, EntityRelEntity>>, EntityRelEntity>) value -> {
EntityRelEntity re = new EntityRelEntity();
re.setEntity(value._1()._2());
Optional<EntityRelEntity> related = Optional.ofNullable(value._2()).map(Tuple2::_2);
if (related.isPresent()) {
re.setRelation(related.get().getRelation());
re.setTarget(related.get().getTarget());
}
return re;
}, Encoders.kryo(EntityRelEntity.class));
System.out.println("joined schema");
joined.printSchema();
//joined.write().json(getOutPath() + "/joined");
final Dataset<JoinedEntity> grouped = joined
.groupByKey((MapFunction<EntityRelEntity, TypedRow>) e -> e.getEntity(), Encoders.kryo(TypedRow.class))
.mapGroups((MapGroupsFunction<TypedRow, EntityRelEntity, JoinedEntity>) (key, values) -> toJoinedEntity(key, values), Encoders.kryo(JoinedEntity.class));
System.out.println("grouped schema");
grouped.printSchema();
final XmlRecordFactory recordFactory = new XmlRecordFactory(accumulators, contextMapper, false, schemaLocation, otherDsTypeId);
grouped
.map((MapFunction<JoinedEntity, String>) value -> recordFactory.build(value), Encoders.STRING())
.javaRDD()
.mapToPair((PairFunction<Tuple2<String, String>, String, String>) t -> new Tuple2<>(t._1(), t._2()))
.saveAsHadoopFile(getOutPath() + "/xml", Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
*/
return this;
}
public SparkSession getSpark() {
return spark;
}
public String getInputPath() {
return inputPath;
}
public String getOutPath() {
return outPath;
}
// HELPERS
private JoinedEntity toJoinedEntity(TypedRow key, Iterator<EntityRelEntity> values) {
final ObjectMapper mapper = getObjectMapper();
final JoinedEntity j = new JoinedEntity();
j.setType(key.getType());
j.setEntity(parseOaf(key.getOaf(), key.getType(), mapper));
final Links links = new Links();
values.forEachRemaining(rel -> links.add(
new eu.dnetlib.dhp.oa.provision.model.Tuple2(
rel.getRelation(),
rel.getTarget()
)));
j.setLinks(links);
return j;
}
private OafEntity parseOaf(final String json, final String type, final ObjectMapper mapper) {
try {
switch (GraphMappingUtils.EntityType.valueOf(type)) {
case publication:
return mapper.readValue(json, Publication.class);
case dataset:
return mapper.readValue(json, eu.dnetlib.dhp.schema.oaf.Dataset.class);
case otherresearchproduct:
return mapper.readValue(json, OtherResearchProduct.class);
case software:
return mapper.readValue(json, Software.class);
case datasource:
return mapper.readValue(json, Datasource.class);
case organization:
return mapper.readValue(json, Organization.class);
case project:
return mapper.readValue(json, Project.class);
default:
throw new IllegalArgumentException("invalid type: " + type);
}
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
/**
* Reads a set of eu.dnetlib.dhp.schema.oaf.OafEntity objects from a new line delimited json file,
* extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.model.TypedRow
* @param sc
* @param inputPath
* @param type
* @return the JavaPairRDD<String, TypedRow> indexed by entity identifier
*/
private Dataset<TypedRow> readPathEntity(final JavaSparkContext sc, final String inputPath, final String type) {
RDD<String> rdd = sc.textFile(inputPath + "/" + type)
.rdd();
return getSpark().createDataset(rdd, Encoders.STRING())
.map((MapFunction<String, TypedRow>) s -> {
final DocumentContext json = JsonPath.parse(s);
final TypedRow t = new TypedRow();
t.setId(json.read("$.id"));
t.setDeleted(json.read("$.dataInfo.deletedbyinference"));
t.setType(type);
t.setOaf(s);
return t;
}, Encoders.bean(TypedRow.class));
}
/**
* Reads a set of eu.dnetlib.dhp.schema.oaf.Relation objects from a sequence file <className, relation json serialization>,
* extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.model.TypedRow
* @param sc
* @param inputPath
* @return the JavaRDD<TypedRow> containing all the relationships
*/
private Dataset<Relation> readPathRelation(final JavaSparkContext sc, final String inputPath) {
final RDD<String> rdd = sc.textFile(inputPath + "/relation")
.rdd();
return getSpark().createDataset(rdd, Encoders.STRING())
.map((MapFunction<String, Relation>) s -> new ObjectMapper().readValue(s, Relation.class), Encoders.bean(Relation.class));
}
private ObjectMapper getObjectMapper() {
return new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
private void prepareAccumulators(SparkContext sc) {
accumulators.put("resultResult_similarity_isAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_isAmongTopNSimilarDocuments"));
accumulators.put("resultResult_similarity_hasAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_hasAmongTopNSimilarDocuments"));
accumulators.put("resultResult_supplement_isSupplementTo", sc.longAccumulator("resultResult_supplement_isSupplementTo"));
accumulators.put("resultResult_supplement_isSupplementedBy", sc.longAccumulator("resultResult_supplement_isSupplementedBy"));
accumulators.put("resultResult_dedup_isMergedIn", sc.longAccumulator("resultResult_dedup_isMergedIn"));
accumulators.put("resultResult_dedup_merges", sc.longAccumulator("resultResult_dedup_merges"));
accumulators.put("resultResult_publicationDataset_isRelatedTo", sc.longAccumulator("resultResult_publicationDataset_isRelatedTo"));
accumulators.put("resultResult_relationship_isRelatedTo", sc.longAccumulator("resultResult_relationship_isRelatedTo"));
accumulators.put("resultProject_outcome_isProducedBy", sc.longAccumulator("resultProject_outcome_isProducedBy"));
accumulators.put("resultProject_outcome_produces", sc.longAccumulator("resultProject_outcome_produces"));
accumulators.put("resultOrganization_affiliation_isAuthorInstitutionOf", sc.longAccumulator("resultOrganization_affiliation_isAuthorInstitutionOf"));
accumulators.put("resultOrganization_affiliation_hasAuthorInstitution", sc.longAccumulator("resultOrganization_affiliation_hasAuthorInstitution"));
accumulators.put("projectOrganization_participation_hasParticipant", sc.longAccumulator("projectOrganization_participation_hasParticipant"));
accumulators.put("projectOrganization_participation_isParticipant", sc.longAccumulator("projectOrganization_participation_isParticipant"));
accumulators.put("organizationOrganization_dedup_isMergedIn", sc.longAccumulator("organizationOrganization_dedup_isMergedIn"));
accumulators.put("organizationOrganization_dedup_merges", sc.longAccumulator("resultProject_outcome_produces"));
accumulators.put("datasourceOrganization_provision_isProvidedBy", sc.longAccumulator("datasourceOrganization_provision_isProvidedBy"));
accumulators.put("datasourceOrganization_provision_provides", sc.longAccumulator("datasourceOrganization_provision_provides"));
}
}

View File

@ -0,0 +1,132 @@
package eu.dnetlib.dhp.oa.provision;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
/**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects.
* The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization,
* and all the possible relationships (similarity links produced by the Dedup process are excluded).
*
* The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and again
* by E, finally grouped by E.id;
*
* The workflow is organized in different parts aimed to to reduce the complexity of the operation
* 1) PrepareRelationsJob:
* only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity
* can be linked at most to 100 other objects
*
* 2) JoinRelationEntityByTargetJob:
* prepare tuples [source entity - relation - target entity] (S - R - T):
* for each entity type E_i
* join (R.target = E_i.id),
* map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i]
* join (E_i.id = [R - T_i].source), where E_i becomes the source entity S
*
* 3) AdjacencyListBuilderJob:
* given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mappnig the result as JoinedEntity
*
* 4) XmlConverterJob:
* convert the JoinedEntities as XML records
*/
public class PrepareRelationsJob {
private static final Logger log = LoggerFactory.getLogger(PrepareRelationsJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static final int MAX_RELS = 100;
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils.toString(
PrepareRelationsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputRelationsPath = parser.get("inputRelationsPath");
log.info("inputRelationsPath: {}", inputRelationsPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
prepareRelationsFromPaths(spark, inputRelationsPath, outputPath);
});
}
private static void prepareRelationsFromPaths(SparkSession spark, String inputRelationsPath, String outputPath) {
RDD<SortableRelation> rels = readPathRelation(spark, inputRelationsPath)
.filter((FilterFunction<SortableRelation>) r -> r.getDataInfo().getDeletedbyinference() == false)
.javaRDD()
.mapToPair((PairFunction<SortableRelation, String, List<SortableRelation>>) rel -> new Tuple2<>(
rel.getSource(),
Lists.newArrayList(rel)))
.reduceByKey((v1, v2) -> {
v1.addAll(v2);
v1.sort(SortableRelation::compareTo);
if (v1.size() > MAX_RELS) {
return v1.subList(0, MAX_RELS);
}
return new ArrayList<>(v1.subList(0, MAX_RELS));
})
.flatMap(r -> r._2().iterator())
.rdd();
spark.createDataset(rels, Encoders.bean(SortableRelation.class))
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
}
/**
* Reads a Dataset of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text file,
*
* @param spark
* @param inputPath
* @return the Dataset<SortableRelation> containing all the relationships
*/
private static Dataset<SortableRelation> readPathRelation(SparkSession spark, final String inputPath) {
return spark.read()
.textFile(inputPath)
.map((MapFunction<String, SortableRelation>) s -> OBJECT_MAPPER.readValue(s, SortableRelation.class),
Encoders.bean(SortableRelation.class));
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}

View File

@ -2,6 +2,7 @@ package eu.dnetlib.dhp.oa.provision;
import com.lucidworks.spark.util.SolrSupport;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory;
@ -18,6 +19,8 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerException;
@ -28,14 +31,20 @@ import java.io.StringReader;
import java.io.StringWriter;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class SparkXmlIndexingJob {
private static final Log log = LogFactory.getLog(SparkXmlIndexingJob.class);
private static final Logger log = LoggerFactory.getLogger(SparkXmlIndexingJob.class);
private static final Integer DEFAULT_BATCH_SIZE = 1000;
private static final String LAYOUT = "index";
private static final String INTERPRETATION = "openaire";
private static final String SEPARATOR = "-";
public static final String DATE_FORMAT = "yyyy-MM-dd'T'hh:mm:ss'Z'";
public static void main(String[] args) throws Exception {
@ -45,48 +54,56 @@ public class SparkXmlIndexingJob {
"/eu/dnetlib/dhp/oa/provision/input_params_update_index.json")));
parser.parseArgument(args);
final String inputPath = parser.get("sourcePath");
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
final String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: {}", isLookupUrl);
final String format = parser.get("format");
log.info("format: {}", format);
final Integer batchSize = parser.getObjectMap().containsKey("batchSize") ? Integer.valueOf(parser.get("batchSize")) : DEFAULT_BATCH_SIZE;
log.info("batchSize: {}", batchSize);
final ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl);
final String fields = getLayoutSource(isLookup, format);
log.info("fields: {}", fields);
final String xslt = getLayoutTransformer(isLookup);
final String dsId = getDsId(format, isLookup);
log.info("dsId: {}", dsId);
final String zkHost = getZkHost(isLookup);
log.info("zkHost: {}", zkHost);
final String version = getRecordDatestamp();
final String indexRecordXslt = getLayoutTransformer(format, fields, xslt);
log.info("indexRecordTransformer {}", indexRecordXslt);
log.info("indexRecordTransformer: " + indexRecordXslt);
final SparkConf conf = new SparkConf();
final String master = parser.get("master");
final SparkConf conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
runWithSparkSession(conf, isSparkSessionManaged,
spark -> {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
try(SparkSession spark = getSession(conf, master)) {
RDD<SolrInputDocument> docs = sc.sequenceFile(inputPath, Text.class, Text.class)
.map(t -> t._2().toString())
.map(s -> toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), s))
.map(s -> new StreamingInputDocumentFactory(version, dsId).parseDocument(s))
.rdd();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
RDD<SolrInputDocument> docs = sc.sequenceFile(inputPath, Text.class, Text.class)
.map(t -> t._2().toString())
.map(s -> toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), s))
.map(s -> new StreamingInputDocumentFactory(version, dsId).parseDocument(s))
.rdd();
SolrSupport.indexDocs(zkHost, format + "-" + LAYOUT + "-openaire", batchSize, docs);
}
}
private static SparkSession getSession(SparkConf conf, String master) {
return SparkSession
.builder()
.config(conf)
.appName(SparkXmlIndexingJob.class.getSimpleName())
.master(master)
.getOrCreate();
final String collection = format + SEPARATOR + LAYOUT + SEPARATOR + INTERPRETATION;
SolrSupport.indexDocs(zkHost, collection, batchSize, docs);
});
}
private static String toIndexRecord(Transformer tr, final String record) {
@ -95,7 +112,7 @@ public class SparkXmlIndexingJob {
tr.transform(new StreamSource(new StringReader(record)), res);
return res.getWriter().toString();
} catch (Throwable e) {
System.out.println("XPathException on record:\n" + record);
log.error("XPathException on record: \n {}", record, e);
throw new IllegalArgumentException(e);
}
}
@ -127,7 +144,7 @@ public class SparkXmlIndexingJob {
* @return the parsed date
*/
public static String getRecordDatestamp() {
return new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss'Z'").format(new Date());
return new SimpleDateFormat(DATE_FORMAT).format(new Date());
}
/**

View File

@ -1,81 +0,0 @@
package eu.dnetlib.dhp.oa.provision;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.provision.model.*;
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
public class SparkXmlRecordBuilderJob_v2 {
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils.toString(
SparkXmlRecordBuilderJob_v2.class.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json")));
parser.parseArgument(args);
try(SparkSession spark = getSession(parser)) {
final String inputPath = parser.get("sourcePath");
final String outputPath = parser.get("outputPath");
final String isLookupUrl = parser.get("isLookupUrl");
final String otherDsTypeId = parser.get("otherDsTypeId");
new GraphJoiner_v2(spark, ContextMapper.fromIS(isLookupUrl), otherDsTypeId, inputPath, outputPath)
.adjacencyLists();
}
}
private static SparkSession getSession(ArgumentApplicationParser parser) {
final SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.sql.shuffle.partitions", parser.get("sparkSqlShufflePartitions"));
conf.registerKryoClasses(new Class[]{
Author.class,
Context.class,
Country.class,
DataInfo.class,
eu.dnetlib.dhp.schema.oaf.Dataset.class,
Datasource.class,
ExternalReference.class,
ExtraInfo.class,
Field.class,
GeoLocation.class,
Instance.class,
Journal.class,
KeyValue.class,
Oaf.class,
OafEntity.class,
OAIProvenance.class,
Organization.class,
OriginDescription.class,
OtherResearchProduct.class,
Project.class,
Publication.class,
Qualifier.class,
Relation.class,
Result.class,
Software.class,
StructuredProperty.class,
TypedRow.class,
EntityRelEntity.class,
JoinedEntity.class,
SortableRelationKey.class,
Tuple2.class,
Links.class,
RelatedEntity.class
});
return SparkSession
.builder()
.config(conf)
.appName(SparkXmlRecordBuilderJob_v2.class.getSimpleName())
.master(parser.get("master"))
.getOrCreate();
}
}

View File

@ -0,0 +1,149 @@
package eu.dnetlib.dhp.oa.provision;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.*;
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils;
import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.util.Map;
import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
/**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects.
* The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization,
* and all the possible relationships (similarity links produced by the Dedup process are excluded).
*
* The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and again
* by E, finally grouped by E.id;
*
* The workflow is organized in different parts aimed to to reduce the complexity of the operation
* 1) PrepareRelationsJob:
* only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference == false), each entity
* can be linked at most to 100 other objects
*
* 2) JoinRelationEntityByTargetJob:
* prepare tuples [source entity - relation - target entity] (S - R - T):
* for each entity type E_i
* join (R.target = E_i.id),
* map E_i as RelatedEntity T_i, extracting only the necessary information beforehand to produce [R - T_i]
* join (E_i.id = [R - T_i].source), where E_i becomes the source entity S
*
* 3) AdjacencyListBuilderJob:
* given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mappnig the result as JoinedEntity
*
* 4) XmlConverterJob:
* convert the JoinedEntities as XML records
*/
public class XmlConverterJob {
private static final Logger log = LoggerFactory.getLogger(XmlConverterJob.class);
public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd";
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils.toString(
XmlConverterJob.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_xml_converter.json")));
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: {}", isLookupUrl);
String otherDsTypeId = parser.get("otherDsTypeId");
log.info("otherDsTypeId: {}", otherDsTypeId);
SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
convertToXml(spark, inputPath, outputPath, ContextMapper.fromIS(isLookupUrl), otherDsTypeId);
});
}
private static void convertToXml(SparkSession spark, String inputPath, String outputPath, ContextMapper contextMapper, String otherDsTypeId) {
final XmlRecordFactory recordFactory = new XmlRecordFactory(prepareAccumulators(spark.sparkContext()), contextMapper, false, schemaLocation, otherDsTypeId);
spark.read()
.load(inputPath)
.as(Encoders.bean(JoinedEntity.class))
.map((MapFunction<JoinedEntity, Tuple2<String, String>>) je -> new Tuple2<>(
je.getEntity().getId(),
recordFactory.build(je)
), Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
.javaRDD()
.mapToPair((PairFunction<Tuple2<String, String>, String, String>) t -> t)
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
private static Map<String, LongAccumulator> prepareAccumulators(SparkContext sc) {
Map<String, LongAccumulator> accumulators = Maps.newHashMap();
accumulators.put("resultResult_similarity_isAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_isAmongTopNSimilarDocuments"));
accumulators.put("resultResult_similarity_hasAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_hasAmongTopNSimilarDocuments"));
accumulators.put("resultResult_supplement_isSupplementTo", sc.longAccumulator("resultResult_supplement_isSupplementTo"));
accumulators.put("resultResult_supplement_isSupplementedBy", sc.longAccumulator("resultResult_supplement_isSupplementedBy"));
accumulators.put("resultResult_dedup_isMergedIn", sc.longAccumulator("resultResult_dedup_isMergedIn"));
accumulators.put("resultResult_dedup_merges", sc.longAccumulator("resultResult_dedup_merges"));
accumulators.put("resultResult_publicationDataset_isRelatedTo", sc.longAccumulator("resultResult_publicationDataset_isRelatedTo"));
accumulators.put("resultResult_relationship_isRelatedTo", sc.longAccumulator("resultResult_relationship_isRelatedTo"));
accumulators.put("resultProject_outcome_isProducedBy", sc.longAccumulator("resultProject_outcome_isProducedBy"));
accumulators.put("resultProject_outcome_produces", sc.longAccumulator("resultProject_outcome_produces"));
accumulators.put("resultOrganization_affiliation_isAuthorInstitutionOf", sc.longAccumulator("resultOrganization_affiliation_isAuthorInstitutionOf"));
accumulators.put("resultOrganization_affiliation_hasAuthorInstitution", sc.longAccumulator("resultOrganization_affiliation_hasAuthorInstitution"));
accumulators.put("projectOrganization_participation_hasParticipant", sc.longAccumulator("projectOrganization_participation_hasParticipant"));
accumulators.put("projectOrganization_participation_isParticipant", sc.longAccumulator("projectOrganization_participation_isParticipant"));
accumulators.put("organizationOrganization_dedup_isMergedIn", sc.longAccumulator("organizationOrganization_dedup_isMergedIn"));
accumulators.put("organizationOrganization_dedup_merges", sc.longAccumulator("resultProject_outcome_produces"));
accumulators.put("datasourceOrganization_provision_isProvidedBy", sc.longAccumulator("datasourceOrganization_provision_isProvidedBy"));
accumulators.put("datasourceOrganization_provision_provides", sc.longAccumulator("datasourceOrganization_provision_provides"));
return accumulators;
}
}

View File

@ -1,15 +1,26 @@
package eu.dnetlib.dhp.oa.provision.model;
import eu.dnetlib.dhp.schema.oaf.Relation;
import java.io.Serializable;
public class EntityRelEntity implements Serializable {
private TypedRow entity;
private Relation relation;
private SortableRelation relation;
private RelatedEntity target;
public EntityRelEntity() {
}
public EntityRelEntity(SortableRelation relation, RelatedEntity target) {
this(null, relation, target);
}
public EntityRelEntity(TypedRow entity, SortableRelation relation, RelatedEntity target) {
this.entity = entity;
this.relation = relation;
this.target = target;
}
public TypedRow getEntity() {
return entity;
}
@ -18,11 +29,11 @@ public class EntityRelEntity implements Serializable {
this.entity = entity;
}
public Relation getRelation() {
public SortableRelation getRelation() {
return relation;
}
public void setRelation(Relation relation) {
public void setRelation(SortableRelation relation) {
this.relation = relation;
}

View File

@ -1,22 +1,23 @@
package eu.dnetlib.dhp.oa.provision.model;
import eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import java.io.Serializable;
public class JoinedEntity implements Serializable {
private String type;
private GraphMappingUtils.EntityType type;
private OafEntity entity;
private Links links;
public String getType() {
public GraphMappingUtils.EntityType getType() {
return type;
}
public void setType(String type) {
public void setType(GraphMappingUtils.EntityType type) {
this.type = type;
}

View File

@ -1,6 +1,6 @@
package eu.dnetlib.dhp.oa.provision.model;
import java.util.ArrayList;
import java.util.HashSet;
public class Links extends ArrayList<Tuple2> {
public class Links extends HashSet<Tuple2> {
}

View File

@ -0,0 +1,34 @@
package eu.dnetlib.dhp.oa.provision.model;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.oaf.Relation;
import java.util.Map;
public class SortableRelation extends Relation implements Comparable<Relation> {
private final static Map<String, Integer> weights = Maps.newHashMap();
static {
weights.put("outcome", 0);
weights.put("supplement", 1);
weights.put("publicationDataset", 2);
weights.put("relationship", 3);
weights.put("similarity", 4);
weights.put("affiliation", 5);
weights.put("provision", 6);
weights.put("participation", 7);
weights.put("dedup", 8);
}
@Override
public int compareTo(Relation o) {
return ComparisonChain.start()
.compare(weights.get(getSubRelType()), weights.get(o.getSubRelType()))
.compare(getSource(), o.getSource())
.compare(getTarget(), o.getTarget())
.result();
}
}

View File

@ -2,7 +2,10 @@ package eu.dnetlib.dhp.oa.provision.model;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class Tuple2 {
import java.io.Serializable;
import java.util.Objects;
public class Tuple2 implements Serializable {
private Relation relation;
@ -28,4 +31,18 @@ public class Tuple2 {
public void setRelatedEntity(RelatedEntity relatedEntity) {
this.relatedEntity = relatedEntity;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Tuple2 t2 = (Tuple2) o;
return getRelation().equals(t2.getRelation());
}
@Override
public int hashCode() {
return Objects.hash(getRelation().hashCode());
}
}

View File

@ -1,30 +1,47 @@
package eu.dnetlib.dhp.oa.provision.utils;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import eu.dnetlib.dhp.oa.provision.model.*;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
import eu.dnetlib.dhp.schema.oaf.*;
import net.minidev.json.JSONArray;
import org.apache.commons.lang3.StringUtils;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.*;
import static org.apache.commons.lang3.StringUtils.substringAfter;
public class GraphMappingUtils {
public static final String SEPARATOR = "_";
public final static Map<EntityType, Class> entityTypes = Maps.newHashMap();
static {
entityTypes.put(EntityType.datasource, Datasource.class);
entityTypes.put(EntityType.organization, Organization.class);
entityTypes.put(EntityType.project, Project.class);
entityTypes.put(EntityType.dataset, Dataset.class);
entityTypes.put(EntityType.otherresearchproduct, OtherResearchProduct.class);
entityTypes.put(EntityType.software, Software.class);
entityTypes.put(EntityType.publication, Publication.class);
}
public enum EntityType {
publication, dataset, otherresearchproduct, software, datasource, organization, project
publication, dataset, otherresearchproduct, software, datasource, organization, project;
public static <T extends OafEntity> EntityType fromClass(Class<T> clazz) {
switch (clazz.getName()) {
case "eu.dnetlib.dhp.schema.oaf.Publication" : return publication;
case "eu.dnetlib.dhp.schema.oaf.Dataset" : return dataset;
case "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct" : return otherresearchproduct;
case "eu.dnetlib.dhp.schema.oaf.Software" : return software;
case "eu.dnetlib.dhp.schema.oaf.Datasource" : return datasource;
case "eu.dnetlib.dhp.schema.oaf.Organization" : return organization;
case "eu.dnetlib.dhp.schema.oaf.Project" : return project;
default: throw new IllegalArgumentException("Unknown OafEntity class: " + clazz.getName());
}
}
}
public enum MainEntityType {
@ -33,8 +50,6 @@ public class GraphMappingUtils {
public static Set<String> authorPidTypes = Sets.newHashSet("orcid", "magidentifier");
public static Set<String> instanceFieldFilter = Sets.newHashSet("instancetype", "hostedby", "license", "accessright", "collectedfrom", "dateofacceptance", "distributionlocation");
private static final String schemeTemplate = "dnet:%s_%s_relations";
private static Map<EntityType, MainEntityType> entityMapping = Maps.newHashMap();
@ -49,6 +64,38 @@ public class GraphMappingUtils {
entityMapping.put(EntityType.project, MainEntityType.project);
}
public static Class[] getKryoClasses() {
return new Class[]{
Author.class,
Context.class,
Country.class,
DataInfo.class,
eu.dnetlib.dhp.schema.oaf.Dataset.class,
Datasource.class,
ExternalReference.class,
ExtraInfo.class,
Field.class,
GeoLocation.class,
Instance.class,
Journal.class,
KeyValue.class,
Oaf.class,
OafEntity.class,
OAIProvenance.class,
Organization.class,
OriginDescription.class,
OtherResearchProduct.class,
Project.class,
Publication.class,
Qualifier.class,
Relation.class,
SortableRelation.class, //SUPPORT
Result.class,
Software.class,
StructuredProperty.class
};
}
public static String getScheme(final String sourceType, final String targetType) {
return String.format(schemeTemplate,
entityMapping.get(EntityType.valueOf(sourceType)).name(),
@ -63,152 +110,81 @@ public class GraphMappingUtils {
return MainEntityType.result.name().equals(getMainType(type));
}
public static RelatedEntity asRelatedEntity(TypedRow e) {
public static <E extends OafEntity> RelatedEntity asRelatedEntity(E entity, Class<E> clazz) {
final DocumentContext j = JsonPath.parse(e.getOaf());
final RelatedEntity re = new RelatedEntity();
re.setId(j.read("$.id"));
re.setType(e.getType());
re.setId(entity.getId());
re.setType(clazz.getName());
switch (EntityType.valueOf(e.getType())) {
re.setPid(entity.getPid());
re.setCollectedfrom(entity.getCollectedfrom());
switch (GraphMappingUtils.EntityType.fromClass(clazz)) {
case publication:
case dataset:
case otherresearchproduct:
case software:
mapTitle(j, re);
re.setDateofacceptance(j.read("$.dateofacceptance.value"));
re.setPublisher(j.read("$.publisher.value"));
JSONArray pids = j.read("$.pid");
re.setPid(pids.stream()
.map(p -> asStructuredProperty((LinkedHashMap<String, Object>) p))
.collect(Collectors.toList()));
Result r = (Result) entity;
re.setResulttype(asQualifier(j.read("$.resulttype")));
if (r.getTitle() == null && !r.getTitle().isEmpty()) {
re.setTitle(r.getTitle().stream().findFirst().get());
}
JSONArray collfrom = j.read("$.collectedfrom");
re.setCollectedfrom(collfrom.stream()
.map(c -> asKV((LinkedHashMap<String, Object>) c))
.collect(Collectors.toList()));
// will throw exception when the instance is not found
JSONArray instances = j.read("$.instance");
re.setInstances(instances.stream()
.map(i -> {
final LinkedHashMap<String, Object> p = (LinkedHashMap<String, Object>) i;
final Field<String> license = new Field<String>();
license.setValue((String) ((LinkedHashMap<String, Object>) p.get("license")).get("value"));
final Instance instance = new Instance();
instance.setLicense(license);
instance.setAccessright(asQualifier((LinkedHashMap<String, String>) p.get("accessright")));
instance.setInstancetype(asQualifier((LinkedHashMap<String, String>) p.get("instancetype")));
instance.setHostedby(asKV((LinkedHashMap<String, Object>) p.get("hostedby")));
//TODO mapping of distributionlocation
instance.setCollectedfrom(asKV((LinkedHashMap<String, Object>) p.get("collectedfrom")));
Field<String> dateofacceptance = new Field<String>();
dateofacceptance.setValue((String) ((LinkedHashMap<String, Object>) p.get("dateofacceptance")).get("value"));
instance.setDateofacceptance(dateofacceptance);
return instance;
}).collect(Collectors.toList()));
re.setDateofacceptance(getValue(r.getDateofacceptance()));
re.setPublisher(getValue(r.getPublisher()));
re.setResulttype(re.getResulttype());
re.setInstances(re.getInstances());
//TODO still to be mapped
//re.setCodeRepositoryUrl(j.read("$.coderepositoryurl"));
break;
case datasource:
re.setOfficialname(j.read("$.officialname.value"));
re.setWebsiteurl(j.read("$.websiteurl.value"));
re.setDatasourcetype(asQualifier(j.read("$.datasourcetype")));
re.setOpenairecompatibility(asQualifier(j.read("$.openairecompatibility")));
Datasource d = (Datasource) entity;
re.setOfficialname(getValue(d.getOfficialname()));
re.setWebsiteurl(getValue(d.getWebsiteurl()));
re.setDatasourcetype(d.getDatasourcetype());
re.setOpenairecompatibility(d.getOpenairecompatibility());
break;
case organization:
re.setLegalname(j.read("$.legalname.value"));
re.setLegalshortname(j.read("$.legalshortname.value"));
re.setCountry(asQualifier(j.read("$.country")));
re.setWebsiteurl(j.read("$.websiteurl.value"));
Organization o = (Organization) entity;
re.setLegalname(getValue(o.getLegalname()));
re.setLegalshortname(getValue(o.getLegalshortname()));
re.setCountry(o.getCountry());
re.setWebsiteurl(getValue(o.getWebsiteurl()));
break;
case project:
re.setProjectTitle(j.read("$.title.value"));
re.setCode(j.read("$.code.value"));
re.setAcronym(j.read("$.acronym.value"));
re.setContracttype(asQualifier(j.read("$.contracttype")));
Project p = (Project) entity;
JSONArray f = j.read("$.fundingtree");
re.setProjectTitle(getValue(p.getTitle()));
re.setCode(getValue(p.getCode()));
re.setAcronym(getValue(p.getAcronym()));
re.setContracttype(p.getContracttype());
List<Field<String>> f = p.getFundingtree();
if (!f.isEmpty()) {
re.setFundingtree(f.stream()
.map(s -> ((LinkedHashMap<String, String>) s).get("value"))
.map(s -> s.getValue())
.collect(Collectors.toList()));
}
break;
}
return re;
}
private static KeyValue asKV(LinkedHashMap<String, Object> j) {
final KeyValue kv = new KeyValue();
kv.setKey((String) j.get("key"));
kv.setValue((String) j.get("value"));
return kv;
private static String getValue(Field<String> field) {
return getFieldValueWithDefault(field, "");
}
private static void mapTitle(DocumentContext j, RelatedEntity re) {
final JSONArray a = j.read("$.title");
if (!a.isEmpty()) {
final StructuredProperty sp = asStructuredProperty((LinkedHashMap<String, Object>) a.get(0));
if (StringUtils.isNotBlank(sp.getValue())) {
re.setTitle(sp);
}
}
}
private static StructuredProperty asStructuredProperty(LinkedHashMap<String, Object> j) {
final StructuredProperty sp = new StructuredProperty();
final String value = (String) j.get("value");
if (StringUtils.isNotBlank(value)) {
sp.setValue((String) j.get("value"));
sp.setQualifier(asQualifier((LinkedHashMap<String, String>) j.get("qualifier")));
}
return sp;
}
public static Qualifier asQualifier(LinkedHashMap<String, String> j) {
final Qualifier q = new Qualifier();
final String classid = j.get("classid");
if (StringUtils.isNotBlank(classid)) {
q.setClassid(classid);
}
final String classname = j.get("classname");
if (StringUtils.isNotBlank(classname)) {
q.setClassname(classname);
}
final String schemeid = j.get("schemeid");
if (StringUtils.isNotBlank(schemeid)) {
q.setSchemeid(schemeid);
}
final String schemename = j.get("schemename");
if (StringUtils.isNotBlank(schemename)) {
q.setSchemename(schemename);
}
return q;
}
public static String serialize(final Object o) {
try {
return new ObjectMapper()
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
.writeValueAsString(o);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("unable to serialize: " + o.toString(), e);
}
private static <T> T getFieldValueWithDefault(Field<T> f, T defaultValue) {
return Optional.ofNullable(f)
.filter(Objects::nonNull)
.map(x -> x.getValue())
.orElse(defaultValue);
}
public static String removePrefix(final String s) {

View File

@ -1,8 +1,14 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true},
{"paramName":"o", "paramLongName":"outputPath", "paramDescription": "the path used to store temporary output files", "paramRequired": true},
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequence file to read", "paramRequired": true},
{"paramName":"t", "paramLongName":"otherDsTypeId", "paramDescription": "list of datasource types to populate field datasourcetypeui", "paramRequired": true},
{"paramName":"sp", "paramLongName":"sparkSqlShufflePartitions", "paramDescription": "Configures the number of partitions to use when shuffling data for joins or aggregations", "paramRequired": true}
{
"paramName": "in",
"paramLongName": "inputPath",
"paramDescription": "the path of the sequence file to read",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
}
]

View File

@ -0,0 +1,20 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "irp",
"paramLongName": "inputRelationsPath",
"paramDescription": "path to input relations prepare",
"paramRequired": true
},
{
"paramName": "op",
"paramLongName": "outputPath",
"paramDescription": "root output location for prepared relations",
"paramRequired": true
}
]

View File

@ -0,0 +1,32 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "irp",
"paramLongName": "inputRelationsPath",
"paramDescription": "path to input relations from the graph",
"paramRequired": true
},
{
"paramName": "iep",
"paramLongName": "inputEntityPath",
"paramDescription": "path to input entity from the graph",
"paramRequired": true
},
{
"paramName": "clazz",
"paramLongName": "graphTableClassName",
"paramDescription": "class name associated to the input entity path",
"paramRequired": true
},
{
"paramName": "op",
"paramLongName": "outputPath",
"paramDescription": "root output location for prepared relations",
"paramRequired": true
}
]

View File

@ -0,0 +1,26 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "irp",
"paramLongName": "inputRelatedEntitiesPath",
"paramDescription": "path to input relations from the graph",
"paramRequired": true
},
{
"paramName": "iep",
"paramLongName": "inputGraphPath",
"paramDescription": "root graph path",
"paramRequired": true
},
{
"paramName": "op",
"paramLongName": "outputPath",
"paramDescription": "root output location for prepared relations",
"paramRequired": true
}
]

View File

@ -1,7 +1,7 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true},
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequence file to read the XML records", "paramRequired": true},
{"paramName":"i", "paramLongName":"inputPath", "paramDescription": "the path of the sequence file to read the XML records", "paramRequired": true},
{"paramName":"f", "paramLongName":"format", "paramDescription": "MDFormat name found in the IS profile", "paramRequired": true},
{"paramName":"b", "paramLongName":"batchSize", "paramDescription": "size of the batch of documents sent to solr", "paramRequired": false}
]

View File

@ -0,0 +1,26 @@
[
{
"paramName": "in",
"paramLongName": "inputPath",
"paramDescription": "the path of the sequence file to read",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "ilu",
"paramLongName": "isLookupUrl",
"paramDescription": "URL of the isLookUp Service",
"paramRequired": true
},
{
"paramName": "odt",
"paramLongName": "otherDsTypeId",
"paramDescription": "list of datasource types to populate field datasourcetypeui",
"paramRequired": true
}
]

View File

@ -1,6 +1,11 @@
<workflow-app name="index_infospace_graph" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>inputGraphRootPath</name>
<description>root location of input materialized graph</description>
</property>
<property>
<name>sparkDriverMemoryForJoining</name>
<description>memory for driver process</description>
@ -64,7 +69,7 @@
<decision name="reuse_records">
<switch>
<case to="adjancency_lists">${wf:conf('reuseRecords') eq false}</case>
<case to="prepare_relations">${wf:conf('reuseRecords') eq false}</case>
<case to="to_solr_index">${wf:conf('reuseRecords') eq true}</case>
<default to="adjancency_lists"/>
</switch>
@ -74,16 +79,12 @@
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="adjancency_lists">
<action name="prepare_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<prepare>
<delete path="${outputPath}"/>
<mkdir path="${outputPath}"/>
</prepare>
<master>yarn</master>
<mode>cluster</mode>
<name>build_adjacency_lists</name>
<class>eu.dnetlib.dhp.oa.provision.SparkXmlRecordBuilderJob_v2</class>
<name>PrepareRelations</name>
<class>eu.dnetlib.dhp.oa.provision.PrepareRelationsJob</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
@ -94,12 +95,135 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>-mt</arg> <arg>yarn</arg>
<arg>-is</arg> <arg>${isLookupUrl}</arg>
<arg>-t</arg> <arg>${otherDsTypeId}</arg>
<arg>-s</arg><arg>${sourcePath}</arg>
<arg>-o</arg><arg>${outputPath}</arg>
<arg>-sp</arg><arg>${sparkSqlShufflePartitions}</arg>
<arg>--inputRelationsPath</arg><arg>${inputGraphRootPath}/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation</arg>
</spark>
<ok to="prepare_publication_table"/>
<error to="Kill"/>
</action>
<action name="join_relation_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[relation.target = publication.id]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relations</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
</spark>
<ok to="join_relation_dataset"/>
<error to="Kill"/>
</action>
<action name="join_relation_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[relation.target = dataset.id]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relations</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
</spark>
<ok to="join_all_entities"/>
<error to="Kill"/>
</action>
<action name="join_all_entities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[relation.target = dataset.id]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities</arg>
</spark>
<ok to="adjancency_lists"/>
<error to="Kill"/>
</action>
<action name="adjancency_lists">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>build_adjacency_lists</name>
<class>eu.dnetlib.dhp.oa.provision.AdjacencyListBuilderJob</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputPath</arg> <arg>${${workingDir}/join_entities</arg>
<arg>--outputPath</arg><arg>${workingDir}/joined</arg>
</spark>
<ok to="convert_to_xml"/>
<error to="Kill"/>
</action>
<action name="convert_to_xml">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>build_adjacency_lists</name>
<class>eu.dnetlib.dhp.oa.provision.XmlConverterJob</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputPath</arg><arg>${${workingDir}/joined</arg>
<arg>--outputPath</arg><arg>${workingDir}/xml</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--otherDsTypeId</arg><arg>${otherDsTypeId}</arg>
</spark>
<ok to="to_solr_index"/>
<error to="Kill"/>
@ -122,9 +246,8 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>-mt</arg> <arg>yarn</arg>
<arg>-is</arg> <arg>${isLookupUrl}</arg>
<arg>--sourcePath</arg><arg>${outputPath}/xml</arg>
<arg>--isLookupUrl</arg> <arg>${isLookupUrl}</arg>
<arg>--inputPath</arg><arg>${workingDir}/xml</arg>
<arg>--format</arg><arg>${format}</arg>
<arg>--batchSize</arg><arg>${batchSize}</arg>
</spark>