2020-06-22 08:51:31 +02:00
|
|
|
|
|
|
|
package eu.dnetlib.dhp.broker.oa.util;
|
|
|
|
|
2020-07-07 15:37:13 +02:00
|
|
|
import java.util.Arrays;
|
|
|
|
import java.util.HashSet;
|
|
|
|
import java.util.Set;
|
|
|
|
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
2020-06-22 08:51:31 +02:00
|
|
|
import org.apache.spark.api.java.function.MapFunction;
|
|
|
|
import org.apache.spark.sql.Dataset;
|
|
|
|
import org.apache.spark.sql.Encoders;
|
2020-06-30 16:17:09 +02:00
|
|
|
import org.apache.spark.sql.SaveMode;
|
2020-06-22 08:51:31 +02:00
|
|
|
import org.apache.spark.sql.SparkSession;
|
2020-06-30 16:17:09 +02:00
|
|
|
import org.apache.spark.util.LongAccumulator;
|
2020-06-22 08:51:31 +02:00
|
|
|
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
|
|
2020-07-07 15:37:13 +02:00
|
|
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
2020-06-22 08:51:31 +02:00
|
|
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
|
|
|
|
|
|
|
public class ClusterUtils {
|
|
|
|
|
|
|
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
|
|
|
|
2020-06-22 11:45:14 +02:00
|
|
|
public static void createDirIfMissing(final SparkSession spark, final String path) {
|
|
|
|
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
|
|
|
}
|
|
|
|
|
2020-06-22 08:51:31 +02:00
|
|
|
public static void removeDir(final SparkSession spark, final String path) {
|
|
|
|
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
|
|
|
}
|
|
|
|
|
|
|
|
public static <R> Dataset<R> readPath(
|
|
|
|
final SparkSession spark,
|
|
|
|
final String inputPath,
|
|
|
|
final Class<R> clazz) {
|
|
|
|
return spark
|
|
|
|
.read()
|
|
|
|
.textFile(inputPath)
|
|
|
|
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
|
|
|
|
}
|
|
|
|
|
2020-06-23 08:37:35 +02:00
|
|
|
public static boolean isDedupRoot(final String id) {
|
|
|
|
return id.contains("dedup_wf_");
|
|
|
|
}
|
|
|
|
|
2020-06-23 10:24:15 +02:00
|
|
|
public static final boolean isValidResultResultClass(final String s) {
|
|
|
|
return s.equals("isReferencedBy")
|
|
|
|
|| s.equals("isRelatedTo")
|
|
|
|
|| s.equals("references")
|
|
|
|
|| s.equals("isSupplementedBy")
|
|
|
|
|| s.equals("isSupplementedTo");
|
|
|
|
}
|
|
|
|
|
2020-06-30 16:17:09 +02:00
|
|
|
public static <T> T incrementAccumulator(final T o, final LongAccumulator acc) {
|
|
|
|
if (acc != null) {
|
|
|
|
acc.add(1);
|
|
|
|
}
|
|
|
|
return o;
|
|
|
|
}
|
|
|
|
|
2020-07-07 15:37:13 +02:00
|
|
|
public static <T> void save(final Dataset<T> dataset,
|
|
|
|
final String path,
|
|
|
|
final Class<T> clazz,
|
2020-06-30 16:17:09 +02:00
|
|
|
final LongAccumulator acc) {
|
|
|
|
dataset
|
|
|
|
.map(o -> ClusterUtils.incrementAccumulator(o, acc), Encoders.bean(clazz))
|
|
|
|
.write()
|
|
|
|
.mode(SaveMode.Overwrite)
|
2020-12-10 11:59:28 +01:00
|
|
|
.option("compression", "gzip")
|
2020-06-30 16:17:09 +02:00
|
|
|
.json(path);
|
|
|
|
}
|
|
|
|
|
2020-07-07 15:37:13 +02:00
|
|
|
public static Set<String> parseParamAsList(final ArgumentApplicationParser parser, final String key) {
|
|
|
|
final String s = parser.get(key).trim();
|
|
|
|
|
|
|
|
final Set<String> res = new HashSet<>();
|
|
|
|
|
|
|
|
if (s.length() > 1) { // A value of a single char (for example: '-') indicates an empty list
|
|
|
|
Arrays
|
|
|
|
.stream(s.split(","))
|
|
|
|
.map(String::trim)
|
|
|
|
.filter(StringUtils::isNotBlank)
|
|
|
|
.forEach(res::add);
|
|
|
|
}
|
|
|
|
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
2020-06-22 08:51:31 +02:00
|
|
|
}
|