package eu.dnetlib.dhp.rdfconverter.bioschema; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.ArrayList; import java.util.Arrays; import java.util.Objects; import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.rdfconverter.utils.CompressorUtil; import eu.dnetlib.dhp.rdfconverter.utils.RDFConverter; import ucar.nc2.stream.NcStreamProto; public class SparkRdfToDatacite { static Logger logger = LoggerFactory.getLogger(SparkRdfToDatacite.class); public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( SparkRdfToDatacite.class .getResourceAsStream( "/eu/dnetlib/dhp/rdfconverter/bioschema/generate_dataset.json"))); parser.parseArgument(args); Boolean isSparkSessionManaged = Optional .ofNullable(parser.get("isSparkSessionManaged")) .map(Boolean::valueOf) .orElse(Boolean.TRUE); final String workingPath = parser.get("workingPath"); final String rdfNquadsRecords = parser.get("rdfInput"); final String output = parser.get("output"); final String profile = parser.get("profile"); SparkConf conf = new SparkConf(); runWithSparkSession( conf, isSparkSessionManaged, spark -> { JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); String base64GzippedNquadsPath = workingPath.concat(rdfNquadsRecords); JavaRDD base64GzippedNquadsRDD = sc .textFile(base64GzippedNquadsPath); logger.info("Rdf nquads records retrieved: {}", base64GzippedNquadsRDD.count()); JavaRDD proteins2 = base64GzippedNquadsRDD .flatMap(nquads -> { RDFConverter converter = new RDFConverter(); ArrayList jsonlds = null; try { jsonlds = converter .nQuadsFile2DataciteJson(CompressorUtil.decompressValue(nquads), profile); } catch (Exception e) { logger.error("converting: " + nquads, e); return Arrays.asList(new String()).iterator(); } return jsonlds.iterator(); }); logger.info("json datacite non filtered: {}", proteins2.count()); JavaRDD proteins = proteins2 .filter(Objects::nonNull) .filter(jsonld -> !jsonld.isEmpty()) .distinct() .map(jsonld -> new Text(jsonld)); logger.info("json datacite generated: {}", proteins.count()); proteins.saveAsTextFile(workingPath.concat(output), GzipCodec.class); }); } }