package eu.dnetlib.dhp.bmuse.bioschema; import java.text.SimpleDateFormat; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.jsoup.nodes.Element; import org.jsoup.select.Elements; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.bmuse.utils.ArgumentApplicationParser; import eu.dnetlib.dhp.bmuse.utils.BMUSEScraper; import eu.dnetlib.dhp.bmuse.utils.UrlParser; public class ScrapingJob { static Logger logger = LoggerFactory.getLogger(ScrapingJob.class); public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( ScrapingJob.class .getResourceAsStream( "/eu/dnetlib/dhp/bmuse/bioschema/generate_dataset.json"))); parser.parseArgument(args); final String nameNode = parser.get("nameNode"); final String workingPath = parser.get("workingPath"); final String rdfOutput = parser.get("rdfOutput"); final String sitemapUrl = parser.get("sitemapUrl"); final String sitemapURLKey = parser.get("sitemapURLKey"); final String dynamic = parser.get("dynamic"); final String maxScrapedPages = parser.get("maxScrapedPages"); Boolean dynamicValue = true; if (Objects.nonNull(dynamic)) { dynamicValue = Boolean.parseBoolean(dynamic); } final boolean scrapingType = dynamicValue.booleanValue(); logger .info( "*************************** STARTING_SCRAPE"); BMUSEScraper scraper = new BMUSEScraper(); String url = sitemapUrl.toLowerCase(); Elements urls = UrlParser.getSitemapList(url, sitemapURLKey); Path output = new Path( nameNode .concat(workingPath) .concat(rdfOutput)); Configuration conf = getHadoopConfiguration(nameNode); try (SequenceFile.Writer writer = SequenceFile .createWriter( conf, SequenceFile.Writer.file(output), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class), SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new GzipCodec()))) { Stream urlStream = null; if (Objects.nonNull(maxScrapedPages)) { urlStream = urls.stream().limit(Long.parseLong(maxScrapedPages)); } else { urlStream = urls.stream(); } List sites = urlStream.collect(Collectors.toList()); logger.info("Pages available for scraping: " + sites.size()); sites.forEach(u -> { final Text key = new Text(u.text()); String nquads; try { String site = u.text(); logger.debug(site + " > parsing"); nquads = scraper.scrapeUrl(site, scrapingType); final Text value = new Text(nquads); writer.append(key, value); } catch (Throwable t) { logger.error(u.text() + " " + t.getMessage()); } }); } logger .info( "*************************** ENDING_SCRAPE: "); } public static Configuration getHadoopConfiguration(String nameNode) { // ====== Init HDFS File System Object Configuration conf = new Configuration(); // Set FileSystem URI conf.set("fs.defaultFS", nameNode); // Because of Maven conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); System.setProperty("hadoop.home.dir", "/"); return conf; } }