BrBETA_dnet-hadoop/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java

103 lines
3.0 KiB
Java
Raw Normal View History

2020-06-22 08:51:31 +02:00
package eu.dnetlib.dhp.broker.oa.util;
2020-07-07 15:37:13 +02:00
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import eu.dnetlib.dhp.schema.common.ModelConstants;
2020-07-07 15:37:13 +02:00
import org.apache.commons.lang3.StringUtils;
2020-06-22 08:51:31 +02:00
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
2020-06-30 16:17:09 +02:00
import org.apache.spark.sql.SaveMode;
2020-06-22 08:51:31 +02:00
import org.apache.spark.sql.SparkSession;
2020-06-30 16:17:09 +02:00
import org.apache.spark.util.LongAccumulator;
2020-06-22 08:51:31 +02:00
import com.fasterxml.jackson.databind.ObjectMapper;
2020-07-07 15:37:13 +02:00
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
2020-06-22 08:51:31 +02:00
import eu.dnetlib.dhp.common.HdfsSupport;
2020-12-15 08:30:26 +01:00
import eu.dnetlib.dhp.schema.oaf.Relation;
2020-06-22 08:51:31 +02:00
public class ClusterUtils {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
2020-06-22 11:45:14 +02:00
public static void createDirIfMissing(final SparkSession spark, final String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
2020-06-22 08:51:31 +02:00
public static void removeDir(final SparkSession spark, final String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
2020-12-15 08:30:26 +01:00
public static Dataset<Relation> loadRelations(final String graphPath, final SparkSession spark) {
return ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class)
2021-05-14 10:58:12 +02:00
.map((MapFunction<Relation, Relation>) r -> {
2020-12-15 08:30:26 +01:00
r.setSource(ConversionUtils.cleanOpenaireId(r.getSource()));
r.setTarget(ConversionUtils.cleanOpenaireId(r.getTarget()));
return r;
}, Encoders.bean(Relation.class));
}
2020-06-22 08:51:31 +02:00
public static <R> Dataset<R> readPath(
final SparkSession spark,
final String inputPath,
final Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
2020-06-23 08:37:35 +02:00
public static boolean isDedupRoot(final String id) {
return id.contains("dedup");
2020-06-23 08:37:35 +02:00
}
2020-06-23 10:24:15 +02:00
public static final boolean isValidResultResultClass(final String s) {
return s.equals(ModelConstants.IS_REFERENCED_BY)
|| s.equals(ModelConstants.IS_RELATED_TO)
|| s.equals(ModelConstants.REFERENCES)
|| s.equals(ModelConstants.IS_SUPPLEMENTED_BY)
|| s.equals(ModelConstants.IS_SUPPLEMENT_TO);
2020-06-23 10:24:15 +02:00
}
2020-06-30 16:17:09 +02:00
public static <T> T incrementAccumulator(final T o, final LongAccumulator acc) {
if (acc != null) {
acc.add(1);
}
return o;
}
2020-07-07 15:37:13 +02:00
public static <T> void save(final Dataset<T> dataset,
final String path,
final Class<T> clazz,
2020-06-30 16:17:09 +02:00
final LongAccumulator acc) {
dataset
2021-05-14 10:58:12 +02:00
.map((MapFunction<T, T>) o -> ClusterUtils.incrementAccumulator(o, acc), Encoders.bean(clazz))
2020-06-30 16:17:09 +02:00
.write()
.mode(SaveMode.Overwrite)
2020-12-10 11:59:28 +01:00
.option("compression", "gzip")
2020-06-30 16:17:09 +02:00
.json(path);
}
2020-07-07 15:37:13 +02:00
public static Set<String> parseParamAsList(final ArgumentApplicationParser parser, final String key) {
final String s = parser.get(key).trim();
final Set<String> res = new HashSet<>();
if (s.length() > 1) { // A value of a single char (for example: '-') indicates an empty list
Arrays
.stream(s.split(","))
.map(String::trim)
.filter(StringUtils::isNotBlank)
.forEach(res::add);
}
return res;
}
2020-06-22 08:51:31 +02:00
}