diff --git a/dhp-workflows/dhp-bmuse/src/main/java/eu/dnetlib/dhp/bmuse/bioschema/ScrapingJob.java b/dhp-workflows/dhp-bmuse/src/main/java/eu/dnetlib/dhp/bmuse/bioschema/ScrapingJob.java index 0a1f897f0..5a8c11a5b 100644 --- a/dhp-workflows/dhp-bmuse/src/main/java/eu/dnetlib/dhp/bmuse/bioschema/ScrapingJob.java +++ b/dhp-workflows/dhp-bmuse/src/main/java/eu/dnetlib/dhp/bmuse/bioschema/ScrapingJob.java @@ -2,6 +2,7 @@ package eu.dnetlib.dhp.bmuse.bioschema; import java.text.SimpleDateFormat; +import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; @@ -87,7 +88,7 @@ public class ScrapingJob { final Text value = new Text(nquads); writer.append(key, value); } catch (Throwable t) { - logger.error(u.text() + " " + t.getMessage()); + logger.error(u.text() + " -> ", t); } }); } diff --git a/dhp-workflows/dhp-bmuse/src/main/resources/eu/dnetlib/dhp/bmuse/bioschema/oozie_app/config-default.xml b/dhp-workflows/dhp-bmuse/src/main/resources/eu/dnetlib/dhp/bmuse/bioschema/oozie_app/config-default.xml index bca528314..c5d960eb1 100644 --- a/dhp-workflows/dhp-bmuse/src/main/resources/eu/dnetlib/dhp/bmuse/bioschema/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-bmuse/src/main/resources/eu/dnetlib/dhp/bmuse/bioschema/oozie_app/config-default.xml @@ -7,6 +7,10 @@ nameNode hdfs://hadoop-rm1.garr-pa1.d4science.org:8020 + + oozie.launcher.mapreduce.user.classpath.first + true + oozie.use.system.libpath true diff --git a/dhp-workflows/dhp-bmuse/src/main/resources/eu/dnetlib/dhp/bmuse/bioschema/oozie_app/workflow.xml b/dhp-workflows/dhp-bmuse/src/main/resources/eu/dnetlib/dhp/bmuse/bioschema/oozie_app/workflow.xml index 285801ea8..636babf07 100644 --- a/dhp-workflows/dhp-bmuse/src/main/resources/eu/dnetlib/dhp/bmuse/bioschema/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-bmuse/src/main/resources/eu/dnetlib/dhp/bmuse/bioschema/oozie_app/workflow.xml @@ -2,12 +2,12 @@ workingPath - /data/bioschema/ped/ + /data/bioschema/mobidb/ the working path sitemapUrl - https://proteinensemble.org/sitemap2.xml.gz + https://mobidb.org/sitemap2.xml.gz sitemapURLKey @@ -20,7 +20,7 @@ maxScrapedPages - 10 + 5 max number of pages that will be scraped, default: no limit @@ -71,6 +71,7 @@ --sitemapUrl${sitemapUrl} --sitemapURLKey${sitemapURLKey} --dynamic${dynamic} + --maxScrapedPages${maxScrapedPages} diff --git a/dhp-workflows/dhp-rdfconverter/src/main/java/eu/dnetlib/dhp/rdfconverter/bioschema/SparkRdfToDatacite.java b/dhp-workflows/dhp-rdfconverter/src/main/java/eu/dnetlib/dhp/rdfconverter/bioschema/SparkRdfToDatacite.java index d3acd029c..0231932e5 100644 --- a/dhp-workflows/dhp-rdfconverter/src/main/java/eu/dnetlib/dhp/rdfconverter/bioschema/SparkRdfToDatacite.java +++ b/dhp-workflows/dhp-rdfconverter/src/main/java/eu/dnetlib/dhp/rdfconverter/bioschema/SparkRdfToDatacite.java @@ -19,7 +19,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.rdfconverter.utils.CompressorUtil; import eu.dnetlib.dhp.rdfconverter.utils.RDFConverter; +import ucar.nc2.stream.NcStreamProto; public class SparkRdfToDatacite { @@ -49,22 +51,30 @@ public class SparkRdfToDatacite { isSparkSessionManaged, spark -> { JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - String rdfNquadsRecordsPath = workingPath.concat(rdfNquadsRecords); - JavaPairRDD rdfNquadsRecordsRDD = sc - .sequenceFile(rdfNquadsRecordsPath, Text.class, Text.class); - logger.info("Rdf nquads records retrieved: {}", rdfNquadsRecordsRDD.count()); + String base64GzippedNquadsPath = workingPath.concat(rdfNquadsRecords); + JavaRDD base64GzippedNquadsRDD = sc + .textFile(base64GzippedNquadsPath); + logger.info("Rdf nquads records retrieved: {}", base64GzippedNquadsRDD.count()); - JavaRDD proteins = rdfNquadsRecordsRDD.flatMap(nquads -> { - RDFConverter converter = new RDFConverter(); - ArrayList jsonlds = null; - try { - jsonlds = converter.nQuadsFile2DataciteJson(nquads._2().toString(), profile); - } catch (Exception e) { - logger.error(nquads._1().toString(), e); - return Arrays.asList(new String()).iterator(); - } - return jsonlds.iterator(); - }).filter(Objects::nonNull).filter(jsonld -> !jsonld.isEmpty()).map(jsonld -> new Text(jsonld)); + JavaRDD proteins2 = base64GzippedNquadsRDD + .flatMap(nquads -> { + RDFConverter converter = new RDFConverter(); + ArrayList jsonlds = null; + try { + jsonlds = converter + .nQuadsFile2DataciteJson(CompressorUtil.decompressValue(nquads), profile); + } catch (Exception e) { + logger.error("converting: " + nquads, e); + return Arrays.asList(new String()).iterator(); + } + return jsonlds.iterator(); + }); + logger.info("json datacite non filtered: {}", proteins2.count()); + JavaRDD proteins = proteins2 + .filter(Objects::nonNull) + .filter(jsonld -> !jsonld.isEmpty()) + .distinct() + .map(jsonld -> new Text(jsonld)); logger.info("json datacite generated: {}", proteins.count()); proteins.saveAsTextFile(workingPath.concat(output), GzipCodec.class); }); diff --git a/dhp-workflows/dhp-rdfconverter/src/main/java/eu/dnetlib/dhp/rdfconverter/utils/CompressorUtil.java b/dhp-workflows/dhp-rdfconverter/src/main/java/eu/dnetlib/dhp/rdfconverter/utils/CompressorUtil.java new file mode 100644 index 000000000..198e92927 --- /dev/null +++ b/dhp-workflows/dhp-rdfconverter/src/main/java/eu/dnetlib/dhp/rdfconverter/utils/CompressorUtil.java @@ -0,0 +1,35 @@ + +package eu.dnetlib.dhp.rdfconverter.utils; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.StringWriter; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.io.IOUtils; + +public class CompressorUtil { + + public static String decompressValue(final String abstractCompressed) { + try { + byte[] byteArray = Base64.decodeBase64(abstractCompressed.getBytes()); + GZIPInputStream gis = new GZIPInputStream(new ByteArrayInputStream(byteArray)); + final StringWriter stringWriter = new StringWriter(); + IOUtils.copy(gis, stringWriter); + return stringWriter.toString(); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } + + public static String compressValue(final String value) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + GZIPOutputStream gzip = new GZIPOutputStream(out); + gzip.write(value.getBytes()); + gzip.close(); + return java.util.Base64.getEncoder().encodeToString(out.toByteArray()); + } +} diff --git a/dhp-workflows/dhp-rdfconverter/src/main/java/eu/dnetlib/dhp/rdfconverter/utils/RDFConverter.java b/dhp-workflows/dhp-rdfconverter/src/main/java/eu/dnetlib/dhp/rdfconverter/utils/RDFConverter.java index 86b9ffd86..3822648a3 100644 --- a/dhp-workflows/dhp-rdfconverter/src/main/java/eu/dnetlib/dhp/rdfconverter/utils/RDFConverter.java +++ b/dhp-workflows/dhp-rdfconverter/src/main/java/eu/dnetlib/dhp/rdfconverter/utils/RDFConverter.java @@ -270,6 +270,9 @@ public class RDFConverter { } results.add(writer.toString()); }); + if (dataciteProteins.isEmpty()) { + log.error("No Protein data found: " + nquads); + } return results; } diff --git a/dhp-workflows/dhp-rdfconverter/src/main/resources/eu/dnetlib/dhp/rdfconverter/bioschema/oozie_app/workflow.xml b/dhp-workflows/dhp-rdfconverter/src/main/resources/eu/dnetlib/dhp/rdfconverter/bioschema/oozie_app/workflow.xml index 42ef603bc..821fbf9bd 100644 --- a/dhp-workflows/dhp-rdfconverter/src/main/resources/eu/dnetlib/dhp/rdfconverter/bioschema/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-rdfconverter/src/main/resources/eu/dnetlib/dhp/rdfconverter/bioschema/oozie_app/workflow.xml @@ -7,7 +7,7 @@ rdfInput - nquads.seq + base64_gzipped_nquads.txt rdf output of scraping workflow diff --git a/dhp-workflows/dhp-rdfconverter/src/test/java/eu/dnetlib/dhp/rdfconverter/bioschema/ConverterTest.java b/dhp-workflows/dhp-rdfconverter/src/test/java/eu/dnetlib/dhp/rdfconverter/bioschema/ConverterTest.java index 0be32d5cb..7d4324225 100644 --- a/dhp-workflows/dhp-rdfconverter/src/test/java/eu/dnetlib/dhp/rdfconverter/bioschema/ConverterTest.java +++ b/dhp-workflows/dhp-rdfconverter/src/test/java/eu/dnetlib/dhp/rdfconverter/bioschema/ConverterTest.java @@ -9,6 +9,7 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import eu.dnetlib.dhp.rdfconverter.utils.CompressorUtil; import eu.dnetlib.dhp.rdfconverter.utils.RDFConverter; public class ConverterTest { @@ -47,4 +48,17 @@ public class ConverterTest { logger.info("JSON DATACITE >> " + r); }); } + + @Test + public void decompressTest() throws Exception { + InputStream is = ConverterTest.class + .getResourceAsStream("/eu/dnetlib/dhp/rdfconverter/bioschema/base64_gzipped_nquads.txt"); + String base64_gzipped_nquads = IOUtils.toString(is); + String nq = CompressorUtil.decompressValue(base64_gzipped_nquads); + RDFConverter converter = new RDFConverter(); + ArrayList results = converter.nQuadsFile2DataciteJson(nq, "Protein"); + results.stream().forEach(r -> { + logger.info("JSON DATACITE >> " + r); + }); + } } diff --git a/dhp-workflows/dhp-rdfconverter/src/test/resources/eu/dnetlib/dhp/rdfconverter/bioschema/base64_gzipped_nquads.txt b/dhp-workflows/dhp-rdfconverter/src/test/resources/eu/dnetlib/dhp/rdfconverter/bioschema/base64_gzipped_nquads.txt new file mode 100644 index 000000000..a099575d2 --- /dev/null +++ b/dhp-workflows/dhp-rdfconverter/src/test/resources/eu/dnetlib/dhp/rdfconverter/bioschema/base64_gzipped_nquads.txt @@ -0,0 +1 @@ +H4sIAAAAAAAAANVY23KjOBB9369wOc+2uNhJ7EqlipiLVQgGkBbGL7OFQbZVayMv4Hj89yOMk2GSSc3O+FLJG0Kt0+d0S92Cu0VZroshAFPGi2RBV3HR5fkcJHm8XYJHGaz4lKVT4Em9a1UGiqQoUk9WgXTfuquWipXrTb7cr1nHjyCnZc7oI03NnK8ONhV8DbM3q6HuW92/7s7l/FN232pX5h2p15FVIitDRR325faXL08Lt9ttd6vulyqSJIPPDsJ7CldpXFLCVvQcDJOcCvQ0YuWiEZy5GG6m3YSvwDjq4Igi8OD8jQ0haEnjghagjOdA6va7yg+kGkH1ZUW9bXhtyJMHgwGQFCCikaezTrHLyvhrJyuuyt2aNljU+uoU5bykLGtM/r7+/8fzOTppAkqarwqQ8GzGxQPhb7tf53zGliIuB6IiNrLcCQxkaNg4N+sXsVrExQPjHl/uVjTH9L8NzRIR1rbj6tD2EYpcy0BuiEIMdQ+byLEt7AUG1EIUwCAUj2SCQnfieAhGrg1tYYNN3UNEx4Y9MSPTGLs28aAXuRMvtEJk6iOkWTbWiEkINg3XD209RMQUgGMUOpaOkecRCD2oYWwTXzNsATP2I9sPRyZEriVea9DBCIeuoQVi1sIhJIFr+roJDYgFD4xx4AVjKGhBn+CAYIuYAcS+QMYOsULXI8jWoWHpEOmhbVjQtVw9dEI48i0B4kVjB6GJjibYRoGQZJloAgkKPWhblm4LbgZqXzZZLKVZyWaM5iJDT/PfX9aea6xhjXNpglmy3KQ0hZkel+Lolw2eDYgr4UXqSIMLs8viqiy2tU3J14t4vmut6/PX6l+YSCGIaMWLMrLJWMVnb3B4fg1yCXaivvKMr1gSxNmcHuH8ddsEfeXmWr3t9W5OpekPSuvR/WVUtUH2SCOe/3smHaeI7dE6dTpjGU2JaGzvWOarEtSgjasKdErn8uCmr6rXN/3bjxORX1abw4HfgX5/oAyUD6rsZ73wSdnwQyt7vmhO+ZJN8zjf7W3EELijB0gqkf98NIXVdXnE06oh19RP1YPPT13k4kVmhE+elXzJ54fUHAZ1frTPn9z3d7ZelbJT9ovjCu/Zpf78Ylglq7UvGe9oL/6K+/fNeHC45nkZv7klmbgbPW/KM1wsn36PvP2FXLJyuf/CFIv0hyMj/Q0PBaE5ARIAAA== \ No newline at end of file