dhp-graph-dump/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SelectEoscResultsJobStep1.java

116 lines
3.8 KiB
Java

package eu.dnetlib.dhp.oa.graph.dump.eosc;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.eosc.model.Result;
import eu.dnetlib.dhp.oa.graph.dump.ResultMapper;
/**
* @author miriam.baglioni
* @Date 27/07/22
*/
public class SelectEoscResultsJobStep1 implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SelectEoscResultsJobStep1.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SelectEoscResultsJobStep1.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/eosc_select_result_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String eoscDatasourceIdsPath = parser.get("eoscDatasourceIdsPath");
log.info("eoscDatasourceIdsPath: {}", eoscDatasourceIdsPath);
final String communityMapPath = parser.get("communityMapPath");
log.info("communityMapPath: {}", communityMapPath);
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
Class<? extends eu.dnetlib.dhp.schema.oaf.Result> inputClazz = (Class<? extends eu.dnetlib.dhp.schema.oaf.Result>) Class
.forName(resultClassName);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
selectEoscResults(spark, inputPath, outputPath, inputClazz, communityMapPath, eoscDatasourceIdsPath);
});
}
private static <R extends eu.dnetlib.dhp.schema.oaf.Result> void selectEoscResults(SparkSession spark,
String inputPath, String outputPath,
Class<R> inputClazz, String communityMapPath, String eoscDatasourceIdsPath) {
// final StructType structureSchema = new StructType()
// .add("eoscId", DataTypes.StringType)
// .add("graphId", DataTypes.StringType)
// .add("graphName", DataTypes.StringType);
//
// // .fromDDL("`graphId`: STRING, `eoscId`:STRING");
// org.apache.spark.sql.Dataset<Row> df = spark
// .read()
// .schema(structureSchema)
// .json(eoscDatasourceIdsPath);
List<MasterDuplicate> df = Utils
.readPath(spark, eoscDatasourceIdsPath, MasterDuplicate.class)
.collectAsList();
log.info("number of rows ************: " + df.size());
CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath);
Utils
.readPath(spark, inputPath, inputClazz)
.filter(
(FilterFunction<R>) r -> !r.getDataInfo().getDeletedbyinference() && !r.getDataInfo().getInvisible()
&& (r.getContext().stream().anyMatch(c -> c.getId().equals("eosc")) ||
r
.getCollectedfrom()
.stream()
.anyMatch(cf -> cf.getValue().equalsIgnoreCase("B2FIND"))))
.map(
(MapFunction<R, Result>) r -> (Result) ResultMapper
.map(r, communityMap, df),
Encoders.bean(Result.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
}