From 83d5e165a724fba8f0f5654b03fb1127dec5a0f1 Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Fri, 3 Dec 2021 15:44:39 +0100 Subject: [PATCH] modified wf configuration; added bmuse scraping on a spark action --- .../dhp/bmuse/bioschema/ScrapingJob.java | 2 + .../dhp/bmuse/bioschema/SparkScraper.java | 111 ++++++++++++++++++ .../dnetlib/dhp/bmuse/utils/BMUSEScraper.java | 2 +- .../bioschema/oozie_app/config-default.xml | 35 +++--- .../bmuse/bioschema/oozie_app/workflow.xml | 49 ++++++-- .../bioschema/oozie_app/config-default.xml | 5 +- .../bioschema/oozie_app/workflow.xml | 5 - 7 files changed, 169 insertions(+), 40 deletions(-) create mode 100644 dhp-workflows/dhp-bmuse/src/main/java/eu/dnetlib/dhp/bmuse/bioschema/SparkScraper.java diff --git a/dhp-workflows/dhp-bmuse/src/main/java/eu/dnetlib/dhp/bmuse/bioschema/ScrapingJob.java b/dhp-workflows/dhp-bmuse/src/main/java/eu/dnetlib/dhp/bmuse/bioschema/ScrapingJob.java index 28ad1ee0a..3fd3c005b 100644 --- a/dhp-workflows/dhp-bmuse/src/main/java/eu/dnetlib/dhp/bmuse/bioschema/ScrapingJob.java +++ b/dhp-workflows/dhp-bmuse/src/main/java/eu/dnetlib/dhp/bmuse/bioschema/ScrapingJob.java @@ -70,6 +70,8 @@ public class ScrapingJob { Elements urls = UrlParser.getSitemapList(url, sitemapURLKey); long total = urls.size(); + System.setProperty("webdriver.chrome.whitelistedIps", ""); + Path output = new Path( nameNode .concat(workingPath) diff --git a/dhp-workflows/dhp-bmuse/src/main/java/eu/dnetlib/dhp/bmuse/bioschema/SparkScraper.java b/dhp-workflows/dhp-bmuse/src/main/java/eu/dnetlib/dhp/bmuse/bioschema/SparkScraper.java new file mode 100644 index 000000000..6ac83b796 --- /dev/null +++ b/dhp-workflows/dhp-bmuse/src/main/java/eu/dnetlib/dhp/bmuse/bioschema/SparkScraper.java @@ -0,0 +1,111 @@ + +package eu.dnetlib.dhp.bmuse.bioschema; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Stream; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.util.LongAccumulator; +import org.jsoup.nodes.Element; +import org.jsoup.select.Elements; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.bmuse.utils.BMUSEScraper; +import eu.dnetlib.dhp.bmuse.utils.UrlParser; + +public class SparkScraper { + + static Logger logger = LoggerFactory.getLogger(SparkScraper.class); + + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkScraper.class + .getResourceAsStream( + "/eu/dnetlib/dhp/bmuse/bioschema/generate_dataset.json"))); + parser.parseArgument(args); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + final String nameNode = parser.get("nameNode"); + final String workingPath = parser.get("workingPath"); + final String rdfOutput = parser.get("rdfOutput"); + final String sitemapUrl = parser.get("sitemapUrl"); + final String sitemapURLKey = parser.get("sitemapURLKey"); + final String dynamic = parser.get("dynamic"); + final String maxScrapedPages = parser.get("maxScrapedPages"); + Boolean dynamicValue = true; + if (Objects.nonNull(dynamic)) { + dynamicValue = Boolean.parseBoolean(dynamic); + } + final boolean scrapingType = dynamicValue.booleanValue(); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + final LongAccumulator scraped = spark.sparkContext().longAccumulator("scraped"); + final LongAccumulator errors = spark.sparkContext().longAccumulator("errors"); + + JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + System.setProperty("webdriver.chrome.whitelistedIps", ""); + + BMUSEScraper scraper = new BMUSEScraper(); + String url = sitemapUrl.toLowerCase(); + Elements urls = UrlParser.getSitemapList(url, sitemapURLKey); + long total = urls.size(); + + Path output = new Path( + nameNode + .concat(workingPath) + .concat(rdfOutput)); + try (SequenceFile.Writer writer = SequenceFile + .createWriter( + sc.hadoopConfiguration(), + SequenceFile.Writer.file(output), + SequenceFile.Writer.keyClass(Text.class), + SequenceFile.Writer.valueClass(Text.class), + SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new GzipCodec()))) { + Stream urlStream = null; + if (Objects.nonNull(maxScrapedPages)) { + urlStream = urls.stream().limit(Long.parseLong(maxScrapedPages)); + } else { + urlStream = urls.stream(); + } + urlStream.forEach(u -> { + try { + final Text key = new Text(u.text()); + final Text value = new Text(scraper.scrapeUrl(u.text(), scrapingType)); + writer.append(key, value); + scraped.add(1l); + } catch (Exception e) { + logger.error(u.text(), e); + errors.add(1l); + } + }); + } + + logger + .info( + "Total pages to scrape: " + total + " Scraped: " + scraped.value() + + " Errors: " + errors.value()); + }); + } +} diff --git a/dhp-workflows/dhp-bmuse/src/main/java/eu/dnetlib/dhp/bmuse/utils/BMUSEScraper.java b/dhp-workflows/dhp-bmuse/src/main/java/eu/dnetlib/dhp/bmuse/utils/BMUSEScraper.java index 1e58503fa..6ec07f90c 100644 --- a/dhp-workflows/dhp-bmuse/src/main/java/eu/dnetlib/dhp/bmuse/utils/BMUSEScraper.java +++ b/dhp-workflows/dhp-bmuse/src/main/java/eu/dnetlib/dhp/bmuse/utils/BMUSEScraper.java @@ -53,7 +53,7 @@ public class BMUSEScraper extends ScraperFilteredCore { logger.error(e.toString()); return e.getMessage(); } - + logger.info("HTML: " + html); DocumentSource source = new StringDocumentSource(html, url); IRI sourceIRI = SimpleValueFactory.getInstance().createIRI(source.getDocumentIRI()); diff --git a/dhp-workflows/dhp-bmuse/src/main/resources/eu/dnetlib/dhp/bmuse/bioschema/oozie_app/config-default.xml b/dhp-workflows/dhp-bmuse/src/main/resources/eu/dnetlib/dhp/bmuse/bioschema/oozie_app/config-default.xml index 7b13aab55..ada9d88b0 100644 --- a/dhp-workflows/dhp-bmuse/src/main/resources/eu/dnetlib/dhp/bmuse/bioschema/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-bmuse/src/main/resources/eu/dnetlib/dhp/bmuse/bioschema/oozie_app/config-default.xml @@ -2,22 +2,22 @@ - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + @@ -38,13 +38,10 @@ spark2YarnHistoryServerAddress http://hadoop-rm2.garr-pa1.d4science.org:19888 - - oozie.launcher.mapreduce.user.classpath.first true - oozie.use.system.libpath true diff --git a/dhp-workflows/dhp-bmuse/src/main/resources/eu/dnetlib/dhp/bmuse/bioschema/oozie_app/workflow.xml b/dhp-workflows/dhp-bmuse/src/main/resources/eu/dnetlib/dhp/bmuse/bioschema/oozie_app/workflow.xml index 705396653..3739fbda1 100644 --- a/dhp-workflows/dhp-bmuse/src/main/resources/eu/dnetlib/dhp/bmuse/bioschema/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-bmuse/src/main/resources/eu/dnetlib/dhp/bmuse/bioschema/oozie_app/workflow.xml @@ -2,17 +2,12 @@ workingPath - /data/bioschema/disprot/ + /data/bioschema/mobidb/ the working path - - rdfOutput - nquads.seq - rdf output of scraping step - sitemapUrl - https://disprot.org/sitemap2.xml.gz + https://mobidb.org/sitemap2.xml.gz sitemapURLKey @@ -28,13 +23,18 @@ 100 max number of pages that will be scraped, default: no limit + + rdfOutput + nquads.seq + rdf output of scraping step + oozie.launcher.mapreduce.map.java.opts -Xmx4g - spark2RdfConversionMaxExecutors - 50 + spark2MaxExecutors + 1 sparkDriverMemory @@ -43,7 +43,7 @@ sparkExecutorMemory - 2G + 4G memory for individual executor @@ -75,7 +75,7 @@ - + @@ -94,5 +94,32 @@ + + + yarn-cluster + cluster + bmuseScrapingSpark + eu.dnetlib.dhp.bmuse.bioschema.SparkScraper + dhp-bmuse-${projectVersion}.jar + + --conf spark.dynamicAllocation.enabled=true + --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --nameNode${nameNode} + --workingPath${workingPath} + --rdfOutput${rdfOutput} + --sitemapUrl${sitemapUrl} + --sitemapURLKey${sitemapURLKey} + --dynamic${dynamic} + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-rdfconverter/src/main/resources/eu/dnetlib/dhp/rdfconverter/bioschema/oozie_app/config-default.xml b/dhp-workflows/dhp-rdfconverter/src/main/resources/eu/dnetlib/dhp/rdfconverter/bioschema/oozie_app/config-default.xml index 7b13aab55..95d48af07 100644 --- a/dhp-workflows/dhp-rdfconverter/src/main/resources/eu/dnetlib/dhp/rdfconverter/bioschema/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-rdfconverter/src/main/resources/eu/dnetlib/dhp/rdfconverter/bioschema/oozie_app/config-default.xml @@ -24,7 +24,7 @@ jobTracker - yarn + hadoop-rm3.garr-pa1.d4science.org:8032 nameNode @@ -38,13 +38,10 @@ spark2YarnHistoryServerAddress http://hadoop-rm2.garr-pa1.d4science.org:19888 - - oozie.launcher.mapreduce.user.classpath.first true - oozie.use.system.libpath true diff --git a/dhp-workflows/dhp-rdfconverter/src/main/resources/eu/dnetlib/dhp/rdfconverter/bioschema/oozie_app/workflow.xml b/dhp-workflows/dhp-rdfconverter/src/main/resources/eu/dnetlib/dhp/rdfconverter/bioschema/oozie_app/workflow.xml index 1a0a4f450..04bb6c3d8 100644 --- a/dhp-workflows/dhp-rdfconverter/src/main/resources/eu/dnetlib/dhp/rdfconverter/bioschema/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-rdfconverter/src/main/resources/eu/dnetlib/dhp/rdfconverter/bioschema/oozie_app/workflow.xml @@ -52,11 +52,6 @@ - - ${jobTracker} - ${nameNode} - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]