forked from antonis.lempesis/dnet-hadoop
GroupEntities and DispatchEntites are now merged in GroupEntitiesSparkJob
This commit is contained in:
parent
488d9a1cea
commit
6cc7d8ca7b
|
@ -1,98 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.merge;
|
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
|
||||||
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.spark.SparkConf;
|
|
||||||
import org.apache.spark.api.java.function.FilterFunction;
|
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
|
||||||
import org.apache.spark.sql.*;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
||||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
|
||||||
|
|
||||||
public class DispatchEntitiesSparkJob {
|
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class);
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
|
|
||||||
String jsonConfiguration = IOUtils
|
|
||||||
.toString(
|
|
||||||
Objects
|
|
||||||
.requireNonNull(
|
|
||||||
DispatchEntitiesSparkJob.class
|
|
||||||
.getResourceAsStream(
|
|
||||||
"/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json")));
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
|
||||||
parser.parseArgument(args);
|
|
||||||
|
|
||||||
Boolean isSparkSessionManaged = Optional
|
|
||||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
|
||||||
.map(Boolean::valueOf)
|
|
||||||
.orElse(Boolean.TRUE);
|
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
|
||||||
|
|
||||||
String inputPath = parser.get("inputPath");
|
|
||||||
log.info("inputPath: {}", inputPath);
|
|
||||||
|
|
||||||
String outputPath = parser.get("outputPath");
|
|
||||||
log.info("outputPath: {}", outputPath);
|
|
||||||
|
|
||||||
boolean filterInvisible = Boolean.valueOf(parser.get("filterInvisible"));
|
|
||||||
log.info("filterInvisible: {}", filterInvisible);
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
|
||||||
runWithSparkSession(
|
|
||||||
conf,
|
|
||||||
isSparkSessionManaged,
|
|
||||||
spark -> {
|
|
||||||
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
|
|
||||||
dispatchEntities(spark, inputPath, outputPath, filterInvisible);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void dispatchEntities(
|
|
||||||
SparkSession spark,
|
|
||||||
String inputPath,
|
|
||||||
String outputPath,
|
|
||||||
boolean filterInvisible) {
|
|
||||||
|
|
||||||
Dataset<String> df = spark.read().textFile(inputPath);
|
|
||||||
|
|
||||||
ModelSupport.oafTypes.entrySet().parallelStream().forEach(entry -> {
|
|
||||||
String entityType = entry.getKey();
|
|
||||||
Class<?> clazz = entry.getValue();
|
|
||||||
|
|
||||||
if (!entityType.equalsIgnoreCase("relation")) {
|
|
||||||
Dataset<Row> entityDF = spark
|
|
||||||
.read()
|
|
||||||
.schema(Encoders.bean(clazz).schema())
|
|
||||||
.json(
|
|
||||||
df
|
|
||||||
.filter((FilterFunction<String>) s -> s.startsWith(clazz.getName()))
|
|
||||||
.map(
|
|
||||||
(MapFunction<String, String>) s -> StringUtils.substringAfter(s, "|"),
|
|
||||||
Encoders.STRING()));
|
|
||||||
|
|
||||||
if (filterInvisible) {
|
|
||||||
entityDF = entityDF.filter("dataInfo.invisible != true");
|
|
||||||
}
|
|
||||||
|
|
||||||
entityDF
|
|
||||||
.write()
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.option("compression", "gzip")
|
|
||||||
.json(outputPath + "/" + entityType);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -2,36 +2,28 @@
|
||||||
package eu.dnetlib.dhp.oa.merge;
|
package eu.dnetlib.dhp.oa.merge;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
import static eu.dnetlib.dhp.utils.DHPUtils.toSeq;
|
import static org.apache.spark.sql.functions.col;
|
||||||
|
import static org.apache.spark.sql.functions.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.util.Map;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ForkJoinPool;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
|
||||||
import org.apache.spark.api.java.function.FilterFunction;
|
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.api.java.function.ReduceFunction;
|
||||||
import org.apache.spark.sql.*;
|
import org.apache.spark.sql.*;
|
||||||
import org.apache.spark.sql.expressions.Aggregator;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
import com.jayway.jsonpath.Configuration;
|
|
||||||
import com.jayway.jsonpath.DocumentContext;
|
|
||||||
import com.jayway.jsonpath.JsonPath;
|
|
||||||
import com.jayway.jsonpath.Option;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
|
import eu.dnetlib.dhp.schema.common.EntityType;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
@ -39,13 +31,9 @@ import scala.Tuple2;
|
||||||
* Groups the graph content by entity identifier to ensure ID uniqueness
|
* Groups the graph content by entity identifier to ensure ID uniqueness
|
||||||
*/
|
*/
|
||||||
public class GroupEntitiesSparkJob {
|
public class GroupEntitiesSparkJob {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class);
|
private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class);
|
||||||
|
|
||||||
private static final String ID_JPATH = "$.id";
|
private static final Encoder<OafEntity> OAFENTITY_KRYO_ENC = Encoders.kryo(OafEntity.class);
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
|
|
||||||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
@ -66,9 +54,15 @@ public class GroupEntitiesSparkJob {
|
||||||
String graphInputPath = parser.get("graphInputPath");
|
String graphInputPath = parser.get("graphInputPath");
|
||||||
log.info("graphInputPath: {}", graphInputPath);
|
log.info("graphInputPath: {}", graphInputPath);
|
||||||
|
|
||||||
|
String checkpointPath = parser.get("checkpointPath");
|
||||||
|
log.info("checkpointPath: {}", checkpointPath);
|
||||||
|
|
||||||
String outputPath = parser.get("outputPath");
|
String outputPath = parser.get("outputPath");
|
||||||
log.info("outputPath: {}", outputPath);
|
log.info("outputPath: {}", outputPath);
|
||||||
|
|
||||||
|
boolean filterInvisible = Boolean.valueOf(parser.get("filterInvisible"));
|
||||||
|
log.info("filterInvisible: {}", filterInvisible);
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||||
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
|
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
|
||||||
|
@ -78,126 +72,95 @@ public class GroupEntitiesSparkJob {
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> {
|
spark -> {
|
||||||
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
|
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
|
||||||
groupEntities(spark, graphInputPath, outputPath);
|
groupEntities(spark, graphInputPath, checkpointPath, outputPath, filterInvisible);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void groupEntities(
|
private static void groupEntities(
|
||||||
SparkSession spark,
|
SparkSession spark,
|
||||||
String inputPath,
|
String inputPath,
|
||||||
String outputPath) {
|
String checkpointPath,
|
||||||
|
String outputPath,
|
||||||
|
boolean filterInvisible) {
|
||||||
|
|
||||||
final TypedColumn<OafEntity, OafEntity> aggregator = new GroupingAggregator().toColumn();
|
Dataset<OafEntity> allEntities = spark.emptyDataset(OAFENTITY_KRYO_ENC);
|
||||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
|
||||||
spark
|
for (Map.Entry<EntityType, Class> e : ModelSupport.entityTypes.entrySet()) {
|
||||||
.read()
|
String entity = e.getKey().name();
|
||||||
.textFile(toSeq(listEntityPaths(inputPath, sc)))
|
Class<? extends OafEntity> entityClass = e.getValue();
|
||||||
.map((MapFunction<String, OafEntity>) GroupEntitiesSparkJob::parseOaf, Encoders.kryo(OafEntity.class))
|
String entityInputPath = inputPath + "/" + entity;
|
||||||
.filter((FilterFunction<OafEntity>) e -> StringUtils.isNotBlank(ModelSupport.idFn().apply(e)))
|
|
||||||
.groupByKey((MapFunction<OafEntity, String>) oaf -> ModelSupport.idFn().apply(oaf), Encoders.STRING())
|
if (!HdfsSupport.exists(entityInputPath, spark.sparkContext().hadoopConfiguration())) {
|
||||||
.agg(aggregator)
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
allEntities = allEntities
|
||||||
|
.union(
|
||||||
|
((Dataset<OafEntity>) spark
|
||||||
|
.read()
|
||||||
|
.schema(Encoders.bean(entityClass).schema())
|
||||||
|
.json(entityInputPath)
|
||||||
|
.filter("length(id) > 0")
|
||||||
|
.as(Encoders.bean(entityClass)))
|
||||||
|
.map((MapFunction<OafEntity, OafEntity>) r -> r, OAFENTITY_KRYO_ENC));
|
||||||
|
}
|
||||||
|
|
||||||
|
Dataset<?> groupedEntities = allEntities
|
||||||
|
.groupByKey((MapFunction<OafEntity, String>) OafEntity::getId, Encoders.STRING())
|
||||||
|
.reduceGroups((ReduceFunction<OafEntity>) (b, a) -> OafMapperUtils.mergeEntities(b, a))
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<Tuple2<String, OafEntity>, String>) t -> t._2().getClass().getName() +
|
(MapFunction<Tuple2<String, OafEntity>, Tuple2<String, OafEntity>>) t -> new Tuple2(
|
||||||
"|" + OBJECT_MAPPER.writeValueAsString(t._2()),
|
t._2().getClass().getName(), t._2()),
|
||||||
Encoders.STRING())
|
Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC));
|
||||||
|
|
||||||
|
// pivot on "_1" (classname of the entity)
|
||||||
|
// created columns containing only entities of the same class
|
||||||
|
for (Map.Entry<EntityType, Class> e : ModelSupport.entityTypes.entrySet()) {
|
||||||
|
String entity = e.getKey().name();
|
||||||
|
Class<? extends OafEntity> entityClass = e.getValue();
|
||||||
|
|
||||||
|
groupedEntities = groupedEntities
|
||||||
|
.withColumn(
|
||||||
|
entity,
|
||||||
|
when(col("_1").equalTo(entityClass.getName()), col("_2")));
|
||||||
|
}
|
||||||
|
|
||||||
|
groupedEntities
|
||||||
|
.drop("_1", "_2")
|
||||||
.write()
|
.write()
|
||||||
.option("compression", "gzip")
|
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.text(outputPath);
|
.option("compression", "gzip")
|
||||||
}
|
.save(checkpointPath);
|
||||||
|
|
||||||
public static class GroupingAggregator extends Aggregator<OafEntity, OafEntity, OafEntity> {
|
ForkJoinPool parPool = new ForkJoinPool(ModelSupport.entityTypes.size());
|
||||||
|
|
||||||
@Override
|
ModelSupport.entityTypes
|
||||||
public OafEntity zero() {
|
.entrySet()
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public OafEntity reduce(OafEntity b, OafEntity a) {
|
|
||||||
return mergeAndGet(b, a);
|
|
||||||
}
|
|
||||||
|
|
||||||
private OafEntity mergeAndGet(OafEntity b, OafEntity a) {
|
|
||||||
if (Objects.nonNull(a) && Objects.nonNull(b)) {
|
|
||||||
return OafMapperUtils.mergeEntities(b, a);
|
|
||||||
}
|
|
||||||
return Objects.isNull(a) ? b : a;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public OafEntity merge(OafEntity b, OafEntity a) {
|
|
||||||
return mergeAndGet(b, a);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public OafEntity finish(OafEntity j) {
|
|
||||||
return j;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Encoder<OafEntity> bufferEncoder() {
|
|
||||||
return Encoders.kryo(OafEntity.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Encoder<OafEntity> outputEncoder() {
|
|
||||||
return Encoders.kryo(OafEntity.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private static OafEntity parseOaf(String s) {
|
|
||||||
|
|
||||||
DocumentContext dc = JsonPath
|
|
||||||
.parse(s, Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS));
|
|
||||||
final String id = dc.read(ID_JPATH);
|
|
||||||
if (StringUtils.isNotBlank(id)) {
|
|
||||||
|
|
||||||
String prefix = StringUtils.substringBefore(id, "|");
|
|
||||||
switch (prefix) {
|
|
||||||
case "10":
|
|
||||||
return parse(s, Datasource.class);
|
|
||||||
case "20":
|
|
||||||
return parse(s, Organization.class);
|
|
||||||
case "40":
|
|
||||||
return parse(s, Project.class);
|
|
||||||
case "50":
|
|
||||||
String resultType = dc.read("$.resulttype.classid");
|
|
||||||
switch (resultType) {
|
|
||||||
case "publication":
|
|
||||||
return parse(s, Publication.class);
|
|
||||||
case "dataset":
|
|
||||||
return parse(s, eu.dnetlib.dhp.schema.oaf.Dataset.class);
|
|
||||||
case "software":
|
|
||||||
return parse(s, Software.class);
|
|
||||||
case "other":
|
|
||||||
return parse(s, OtherResearchProduct.class);
|
|
||||||
default:
|
|
||||||
throw new IllegalArgumentException(String.format("invalid resultType: '%s'", resultType));
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
throw new IllegalArgumentException(String.format("invalid id prefix: '%s'", prefix));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
throw new IllegalArgumentException(String.format("invalid oaf: '%s'", s));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static <T extends OafEntity> OafEntity parse(String s, Class<T> clazz) {
|
|
||||||
try {
|
|
||||||
return OBJECT_MAPPER.readValue(s, clazz);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new IllegalArgumentException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static List<String> listEntityPaths(String inputPath, JavaSparkContext sc) {
|
|
||||||
return HdfsSupport
|
|
||||||
.listFiles(inputPath, sc.hadoopConfiguration())
|
|
||||||
.stream()
|
.stream()
|
||||||
.filter(f -> !f.toLowerCase().contains("relation"))
|
.map(e -> parPool.submit(() -> {
|
||||||
.collect(Collectors.toList());
|
String entity = e.getKey().name();
|
||||||
}
|
Class<? extends OafEntity> entityClass = e.getValue();
|
||||||
|
|
||||||
|
spark
|
||||||
|
.read()
|
||||||
|
.load(checkpointPath)
|
||||||
|
.select(col(entity).as("value"))
|
||||||
|
.filter("value IS NOT NULL")
|
||||||
|
.as(OAFENTITY_KRYO_ENC)
|
||||||
|
.map((MapFunction<OafEntity, OafEntity>) r -> r, (Encoder<OafEntity>) Encoders.bean(entityClass))
|
||||||
|
.filter(filterInvisible ? "dataInfo.invisible != TRUE" : "TRUE")
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(outputPath + "/" + entity);
|
||||||
|
}))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.forEach(t -> {
|
||||||
|
try {
|
||||||
|
t.get();
|
||||||
|
} catch (InterruptedException | ExecutionException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,26 +0,0 @@
|
||||||
[
|
|
||||||
{
|
|
||||||
"paramName": "issm",
|
|
||||||
"paramLongName": "isSparkSessionManaged",
|
|
||||||
"paramDescription": "when true will stop SparkSession after job execution",
|
|
||||||
"paramRequired": false
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"paramName": "i",
|
|
||||||
"paramLongName": "inputPath",
|
|
||||||
"paramDescription": "the source path",
|
|
||||||
"paramRequired": true
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"paramName": "o",
|
|
||||||
"paramLongName": "outputPath",
|
|
||||||
"paramDescription": "path of the output graph",
|
|
||||||
"paramRequired": true
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"paramName": "fi",
|
|
||||||
"paramLongName": "filterInvisible",
|
|
||||||
"paramDescription": "if true filters out invisible entities",
|
|
||||||
"paramRequired": true
|
|
||||||
}
|
|
||||||
]
|
|
|
@ -8,13 +8,25 @@
|
||||||
{
|
{
|
||||||
"paramName": "gin",
|
"paramName": "gin",
|
||||||
"paramLongName": "graphInputPath",
|
"paramLongName": "graphInputPath",
|
||||||
"paramDescription": "the graph root path",
|
"paramDescription": "the input graph root path",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "cp",
|
||||||
|
"paramLongName": "checkpointPath",
|
||||||
|
"paramDescription": "checkpoint directory",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"paramName": "out",
|
"paramName": "out",
|
||||||
"paramLongName": "outputPath",
|
"paramLongName": "outputPath",
|
||||||
"paramDescription": "the output merged graph root path",
|
"paramDescription": "the output graph root path",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "fi",
|
||||||
|
"paramLongName": "filterInvisible",
|
||||||
|
"paramDescription": "if true filters out invisible entities",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
}
|
}
|
||||||
]
|
]
|
|
@ -152,31 +152,7 @@
|
||||||
--conf spark.sql.shuffle.partitions=15000
|
--conf spark.sql.shuffle.partitions=15000
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--graphInputPath</arg><arg>${graphBasePath}</arg>
|
<arg>--graphInputPath</arg><arg>${graphBasePath}</arg>
|
||||||
<arg>--outputPath</arg><arg>${workingPath}/grouped_entities</arg>
|
<arg>--checkpointPath</arg><arg>${workingPath}/grouped_entities</arg>
|
||||||
</spark>
|
|
||||||
<ok to="dispatch_entities"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="dispatch_entities">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Dispatch grouped entitities</name>
|
|
||||||
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
|
|
||||||
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--conf spark.executor.memoryOverhead=${sparkExecutorMemoryOverhead}
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.shuffle.partitions=7680
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${graphOutputPath}</arg>
|
<arg>--outputPath</arg><arg>${graphOutputPath}</arg>
|
||||||
<arg>--filterInvisible</arg><arg>${filterInvisible}</arg>
|
<arg>--filterInvisible</arg><arg>${filterInvisible}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
|
|
|
@ -96,30 +96,7 @@
|
||||||
--conf spark.sql.shuffle.partitions=15000
|
--conf spark.sql.shuffle.partitions=15000
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--graphInputPath</arg><arg>${graphBasePath}</arg>
|
<arg>--graphInputPath</arg><arg>${graphBasePath}</arg>
|
||||||
<arg>--outputPath</arg><arg>${workingPath}/grouped_entities</arg>
|
<arg>--checkpointPath</arg><arg>${workingPath}/grouped_entities</arg>
|
||||||
</spark>
|
|
||||||
<ok to="dispatch_entities"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="dispatch_entities">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Dispatch grouped entities</name>
|
|
||||||
<class>eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob</class>
|
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.shuffle.partitions=7680
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${graphOutputPath}</arg>
|
<arg>--outputPath</arg><arg>${graphOutputPath}</arg>
|
||||||
<arg>--filterInvisible</arg><arg>${filterInvisible}</arg>
|
<arg>--filterInvisible</arg><arg>${filterInvisible}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
|
|
|
@ -1,16 +1,15 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.group;
|
package eu.dnetlib.dhp.oa.graph.group;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import java.io.IOException;
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
import java.net.URISyntaxException;
|
import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob;
|
||||||
import java.nio.file.Files;
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
import java.nio.file.Path;
|
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||||
import java.nio.file.Paths;
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.function.FilterFunction;
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
@ -19,118 +18,108 @@ import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.junit.jupiter.api.*;
|
import org.junit.jupiter.api.*;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
import java.io.IOException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import java.net.URISyntaxException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob;
|
|
||||||
import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob;
|
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
|
||||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
|
||||||
|
|
||||||
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
||||||
public class GroupEntitiesSparkJobTest {
|
public class GroupEntitiesSparkJobTest {
|
||||||
|
|
||||||
private static SparkSession spark;
|
private static SparkSession spark;
|
||||||
|
|
||||||
private static ObjectMapper mapper = new ObjectMapper()
|
private static ObjectMapper mapper = new ObjectMapper()
|
||||||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||||
|
|
||||||
private static Path workingDir;
|
private static Path workingDir;
|
||||||
private Path dataInputPath;
|
private Path dataInputPath;
|
||||||
|
|
||||||
private Path groupEntityPath;
|
private Path checkpointPath;
|
||||||
private Path dispatchEntityPath;
|
|
||||||
|
|
||||||
@BeforeAll
|
private Path outputPath;
|
||||||
public static void beforeAll() throws IOException {
|
|
||||||
workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName());
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
@BeforeAll
|
||||||
conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName());
|
public static void beforeAll() throws IOException {
|
||||||
conf.setMaster("local");
|
workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName());
|
||||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
|
||||||
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
|
|
||||||
spark = SparkSession.builder().config(conf).getOrCreate();
|
|
||||||
}
|
|
||||||
|
|
||||||
@BeforeEach
|
SparkConf conf = new SparkConf();
|
||||||
public void beforeEach() throws IOException, URISyntaxException {
|
conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName());
|
||||||
dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI());
|
conf.setMaster("local");
|
||||||
groupEntityPath = workingDir.resolve("grouped_entity");
|
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||||
dispatchEntityPath = workingDir.resolve("dispatched_entity");
|
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
|
||||||
}
|
spark = SparkSession.builder().config(conf).getOrCreate();
|
||||||
|
}
|
||||||
|
|
||||||
@AfterAll
|
@BeforeEach
|
||||||
public static void afterAll() throws IOException {
|
public void beforeEach() throws IOException, URISyntaxException {
|
||||||
spark.stop();
|
dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI());
|
||||||
FileUtils.deleteDirectory(workingDir.toFile());
|
checkpointPath = workingDir.resolve("grouped_entity");
|
||||||
}
|
outputPath = workingDir.resolve("dispatched_entity");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@AfterAll
|
||||||
@Order(1)
|
public static void afterAll() throws IOException {
|
||||||
void testGroupEntities() throws Exception {
|
spark.stop();
|
||||||
GroupEntitiesSparkJob.main(new String[] {
|
FileUtils.deleteDirectory(workingDir.toFile());
|
||||||
"-isSparkSessionManaged",
|
}
|
||||||
Boolean.FALSE.toString(),
|
|
||||||
"-graphInputPath",
|
|
||||||
dataInputPath.toString(),
|
|
||||||
"-outputPath",
|
|
||||||
groupEntityPath.toString()
|
|
||||||
});
|
|
||||||
|
|
||||||
Dataset<Result> output = spark
|
@Test
|
||||||
.read()
|
@Order(1)
|
||||||
.textFile(groupEntityPath.toString())
|
void testGroupEntities() throws Exception {
|
||||||
.map((MapFunction<String, String>) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING())
|
GroupEntitiesSparkJob.main(new String[]{
|
||||||
.map((MapFunction<String, Result>) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class));
|
"-isSparkSessionManaged",
|
||||||
|
Boolean.FALSE.toString(),
|
||||||
|
"-graphInputPath",
|
||||||
|
dataInputPath.toString(),
|
||||||
|
"-checkpointPath",
|
||||||
|
checkpointPath.toString(),
|
||||||
|
"-outputPath",
|
||||||
|
outputPath.toString(),
|
||||||
|
"-filterInvisible",
|
||||||
|
Boolean.FALSE.toString()
|
||||||
|
});
|
||||||
|
|
||||||
assertEquals(
|
Dataset<OafEntity> checkpointTable = spark
|
||||||
1,
|
.read()
|
||||||
output
|
.load(checkpointPath.toString())
|
||||||
.filter(
|
.selectExpr("COALESCE(*)")
|
||||||
(FilterFunction<Result>) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9"
|
.as(Encoders.kryo(OafEntity.class));
|
||||||
.equals(r.getId()) &&
|
|
||||||
r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo")))
|
|
||||||
.count());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
@Order(2)
|
|
||||||
void testDispatchEntities() throws Exception {
|
|
||||||
DispatchEntitiesSparkJob.main(new String[] {
|
|
||||||
"-isSparkSessionManaged",
|
|
||||||
Boolean.FALSE.toString(),
|
|
||||||
"-inputPath",
|
|
||||||
groupEntityPath.toString(),
|
|
||||||
"-outputPath",
|
|
||||||
dispatchEntityPath.resolve(".").toString(),
|
|
||||||
"-filterInvisible",
|
|
||||||
Boolean.TRUE.toString()
|
|
||||||
});
|
|
||||||
|
|
||||||
Dataset<Result> output = spark
|
assertEquals(
|
||||||
.read()
|
1,
|
||||||
.textFile(
|
checkpointTable
|
||||||
DHPUtils
|
.filter(
|
||||||
.toSeq(
|
(FilterFunction<OafEntity>) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9"
|
||||||
HdfsSupport
|
.equals(r.getId()) &&
|
||||||
.listFiles(dispatchEntityPath.toString(), spark.sparkContext().hadoopConfiguration())))
|
r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo")))
|
||||||
.map((MapFunction<String, Result>) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class));
|
.count());
|
||||||
|
|
||||||
assertEquals(3, output.count());
|
|
||||||
assertEquals(
|
Dataset<Result> output = spark
|
||||||
2,
|
.read()
|
||||||
output
|
.textFile(
|
||||||
.map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING())
|
DHPUtils
|
||||||
.filter((FilterFunction<String>) s -> s.equals("publication"))
|
.toSeq(
|
||||||
.count());
|
HdfsSupport
|
||||||
assertEquals(
|
.listFiles(outputPath.toString(), spark.sparkContext().hadoopConfiguration())))
|
||||||
1,
|
.map((MapFunction<String, Result>) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class));
|
||||||
output
|
|
||||||
.map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING())
|
assertEquals(3, output.count());
|
||||||
.filter((FilterFunction<String>) s -> s.equals("dataset"))
|
assertEquals(
|
||||||
.count());
|
2,
|
||||||
}
|
output
|
||||||
}
|
.map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING())
|
||||||
|
.filter((FilterFunction<String>) s -> s.equals("publication"))
|
||||||
|
.count());
|
||||||
|
assertEquals(
|
||||||
|
1,
|
||||||
|
output
|
||||||
|
.map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING())
|
||||||
|
.filter((FilterFunction<String>) s -> s.equals("dataset"))
|
||||||
|
.count());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue