dnet-hadoop/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java

91 lines
2.4 KiB
Java
Raw Normal View History

2020-06-22 08:51:31 +02:00
package eu.dnetlib.dhp.broker.oa.util;
2020-07-07 15:37:13 +02:00
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
2020-06-22 08:51:31 +02:00
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
2020-06-30 16:17:09 +02:00
import org.apache.spark.sql.SaveMode;
2020-06-22 08:51:31 +02:00
import org.apache.spark.sql.SparkSession;
2020-06-30 16:17:09 +02:00
import org.apache.spark.util.LongAccumulator;
2020-06-22 08:51:31 +02:00
import com.fasterxml.jackson.databind.ObjectMapper;
2020-07-07 15:37:13 +02:00
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
2020-06-22 08:51:31 +02:00
import eu.dnetlib.dhp.common.HdfsSupport;
public class ClusterUtils {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
2020-06-22 11:45:14 +02:00
public static void createDirIfMissing(final SparkSession spark, final String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
2020-06-22 08:51:31 +02:00
public static void removeDir(final SparkSession spark, final String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
public static <R> Dataset<R> readPath(
final SparkSession spark,
final String inputPath,
final Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
2020-06-23 08:37:35 +02:00
public static boolean isDedupRoot(final String id) {
return id.contains("dedup_wf_");
}
2020-06-23 10:24:15 +02:00
public static final boolean isValidResultResultClass(final String s) {
return s.equals("isReferencedBy")
|| s.equals("isRelatedTo")
|| s.equals("references")
|| s.equals("isSupplementedBy")
|| s.equals("isSupplementedTo");
}
2020-06-30 16:17:09 +02:00
public static <T> T incrementAccumulator(final T o, final LongAccumulator acc) {
if (acc != null) {
acc.add(1);
}
return o;
}
2020-07-07 15:37:13 +02:00
public static <T> void save(final Dataset<T> dataset,
final String path,
final Class<T> clazz,
2020-06-30 16:17:09 +02:00
final LongAccumulator acc) {
dataset
.map(o -> ClusterUtils.incrementAccumulator(o, acc), Encoders.bean(clazz))
.write()
.mode(SaveMode.Overwrite)
2020-12-10 11:59:28 +01:00
.option("compression", "gzip")
2020-06-30 16:17:09 +02:00
.json(path);
}
2020-07-07 15:37:13 +02:00
public static Set<String> parseParamAsList(final ArgumentApplicationParser parser, final String key) {
final String s = parser.get(key).trim();
final Set<String> res = new HashSet<>();
if (s.length() > 1) { // A value of a single char (for example: '-') indicates an empty list
Arrays
.stream(s.split(","))
.map(String::trim)
.filter(StringUtils::isNotBlank)
.forEach(res::add);
}
return res;
}
2020-06-22 08:51:31 +02:00
}