package eu.dnetlib.dhp.broker.oa.util; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.apache.spark.util.LongAccumulator; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.common.HdfsSupport; public class ClusterUtils { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static void createDirIfMissing(final SparkSession spark, final String path) { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } public static void removeDir(final SparkSession spark, final String path) { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } public static Dataset readPath( final SparkSession spark, final String inputPath, final Class clazz) { return spark .read() .textFile(inputPath) .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); } public static boolean isDedupRoot(final String id) { return id.contains("dedup_wf_"); } public static final boolean isValidResultResultClass(final String s) { return s.equals("isReferencedBy") || s.equals("isRelatedTo") || s.equals("references") || s.equals("isSupplementedBy") || s.equals("isSupplementedTo"); } public static T incrementAccumulator(final T o, final LongAccumulator acc) { if (acc != null) { acc.add(1); } return o; } public static void save(final Dataset dataset, final String path, final Class clazz, final LongAccumulator acc) { dataset .map(o -> ClusterUtils.incrementAccumulator(o, acc), Encoders.bean(clazz)) .write() .mode(SaveMode.Overwrite) .json(path); } }