diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java index c35fbb497..7e07dd838 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java @@ -19,6 +19,7 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.aggregation.common.ReporterCallback; import eu.dnetlib.dhp.aggregation.common.ReportingJob; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; +import eu.dnetlib.dhp.collection.plugin.base.BaseCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.file.FileCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.file.FileGZipCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.mongodb.MDStoreCollectorPlugin; @@ -57,7 +58,7 @@ public class CollectorWorker extends ReportingJob { public void collect() throws UnknownCollectorPluginException, CollectorException, IOException { - final String outputPath = mdStoreVersion.getHdfsPath() + SEQUENCE_FILE_NAME; + final String outputPath = this.mdStoreVersion.getHdfsPath() + SEQUENCE_FILE_NAME; log.info("outputPath path is {}", outputPath); final CollectorPlugin plugin = getCollectorPlugin(); @@ -67,36 +68,36 @@ public class CollectorWorker extends ReportingJob { try (SequenceFile.Writer writer = SequenceFile .createWriter( - fileSystem.getConf(), - SequenceFile.Writer.file(new Path(outputPath)), - SequenceFile.Writer.keyClass(IntWritable.class), - SequenceFile.Writer.valueClass(Text.class), + this.fileSystem.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer + .keyClass(IntWritable.class), + SequenceFile.Writer + .valueClass(Text.class), SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) { final IntWritable key = new IntWritable(counter.get()); final Text value = new Text(); plugin - .collect(api, report) - .forEach( - content -> { - key.set(counter.getAndIncrement()); - value.set(content); - try { - writer.append(key, value); - } catch (Throwable e) { - throw new RuntimeException(e); - } - }); - } catch (Throwable e) { - report.put(e.getClass().getName(), e.getMessage()); + .collect(this.api, this.report) + .forEach(content -> { + key.set(counter.getAndIncrement()); + value.set(content); + try { + writer.append(key, value); + } catch (final Throwable e) { + throw new RuntimeException(e); + } + }); + } catch (final Throwable e) { + this.report.put(e.getClass().getName(), e.getMessage()); throw new CollectorException(e); } finally { shutdown(); - report.ongoing(counter.longValue(), counter.longValue()); + this.report.ongoing(counter.longValue(), counter.longValue()); } } - private void scheduleReport(AtomicInteger counter) { + private void scheduleReport(final AtomicInteger counter) { schedule(new ReporterCallback() { + @Override public Long getCurrent() { return counter.longValue(); @@ -111,31 +112,33 @@ public class CollectorWorker extends ReportingJob { private CollectorPlugin getCollectorPlugin() throws UnknownCollectorPluginException { - switch (CollectorPlugin.NAME.valueOf(api.getProtocol())) { + switch (CollectorPlugin.NAME.valueOf(this.api.getProtocol())) { case oai: - return new OaiCollectorPlugin(clientParams); + return new OaiCollectorPlugin(this.clientParams); case rest_json2xml: - return new RestCollectorPlugin(clientParams); + return new RestCollectorPlugin(this.clientParams); case file: - return new FileCollectorPlugin(fileSystem); + return new FileCollectorPlugin(this.fileSystem); case fileGzip: - return new FileGZipCollectorPlugin(fileSystem); + return new FileGZipCollectorPlugin(this.fileSystem); + case baseDump: + return new BaseCollectorPlugin(this.fileSystem); case other: final CollectorPlugin.NAME.OTHER_NAME plugin = Optional - .ofNullable(api.getParams().get("other_plugin_type")) + .ofNullable(this.api.getParams().get("other_plugin_type")) .map(CollectorPlugin.NAME.OTHER_NAME::valueOf) .orElseThrow(() -> new IllegalArgumentException("invalid other_plugin_type")); switch (plugin) { case mdstore_mongodb_dump: - return new MongoDbDumpCollectorPlugin(fileSystem); + return new MongoDbDumpCollectorPlugin(this.fileSystem); case mdstore_mongodb: return new MDStoreCollectorPlugin(); default: throw new UnknownCollectorPluginException("plugin is not managed: " + plugin); } default: - throw new UnknownCollectorPluginException("protocol is not managed: " + api.getProtocol()); + throw new UnknownCollectorPluginException("protocol is not managed: " + this.api.getProtocol()); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java index a19ca5c68..97d2d2585 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java @@ -10,7 +10,8 @@ import eu.dnetlib.dhp.common.collection.CollectorException; public interface CollectorPlugin { enum NAME { - oai, other, rest_json2xml, file, fileGzip; + + oai, other, rest_json2xml, file, fileGzip, baseDump; public enum OTHER_NAME { mdstore_mongodb_dump, mdstore_mongodb diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseAnalyzerJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseAnalyzerJob.java index 717c0f4a8..1a2003991 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseAnalyzerJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseAnalyzerJob.java @@ -52,17 +52,18 @@ public class BaseAnalyzerJob { public static void main(final String[] args) throws Exception { final String jsonConfiguration = IOUtils - .toString(BaseAnalyzerJob.class - .getResourceAsStream("/eu/dnetlib/dhp/collection/plugin/base/action_set_parameters.json")); + .toString( + BaseAnalyzerJob.class + .getResourceAsStream("/eu/dnetlib/dhp/collection/plugin/base/action_set_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); @@ -98,106 +99,119 @@ public class BaseAnalyzerJob { runWithSparkSession(conf, isSparkSessionManaged, spark -> { if (fromStep <= 0) { log - .info("\n**************************************\n* EXECUTING STEP 0: LoadRecords\n**************************************"); + .info( + "\n**************************************\n* EXECUTING STEP 0: LoadRecords\n**************************************"); loadRecords(inputPath, dataPath); log - .info("\n**************************************\n* EXECUTING STEP 0: DONE\n**************************************"); + .info( + "\n**************************************\n* EXECUTING STEP 0: DONE\n**************************************"); } if (fromStep <= 1) { log - .info("\n**************************************\n* EXECUTING STEP 1: Base Report\n**************************************"); + .info( + "\n**************************************\n* EXECUTING STEP 1: Base Report\n**************************************"); generateReport(spark, dataPath, outputPath); log - .info("\n**************************************\n* EXECUTING STEP 1: DONE\n**************************************"); + .info( + "\n**************************************\n* EXECUTING STEP 1: DONE\n**************************************"); } if (fromStep <= 2) { log - .info("\n**************************************\n* EXECUTING STEP 2: OpenDOAR Report\n**************************************"); + .info( + "\n**************************************\n* EXECUTING STEP 2: OpenDOAR Report\n**************************************"); generateOpenDoarReport(spark, outputPath, opendoarPath, loadOpenDoarStats(dbUrl, dbUser, dbPassword)); log - .info("\n**************************************\n* EXECUTING STEP 2: DONE\n**************************************"); + .info( + "\n**************************************\n* EXECUTING STEP 2: DONE\n**************************************"); } if (fromStep <= 3) { log - .info("\n**************************************\n* EXECUTING STEP 3: Type Vocabulary Report\n**************************************"); + .info( + "\n**************************************\n* EXECUTING STEP 3: Type Vocabulary Report\n**************************************"); generateVocTypeReport(spark, outputPath, typesReportPath); log - .info("\n**************************************\n* EXECUTING STEP 3: DONE\n**************************************"); + .info( + "\n**************************************\n* EXECUTING STEP 3: DONE\n**************************************"); } }); } private static void generateVocTypeReport(final SparkSession spark, - final String reportPath, - final String typesReportPath) { + final String reportPath, + final String typesReportPath) { spark - .read() - .parquet(reportPath) - .as(Encoders.bean(BaseRecordInfo.class)) - .flatMap(rec -> { - final List> list = new ArrayList<>(); - for (final String t1 : rec.getTypes()) { - if (t1.startsWith("TYPE_NORM:")) { - for (final String t2 : rec.getTypes()) { - if (t2.startsWith("TYPE:")) { - list - .add(new Tuple2<>(StringUtils.substringAfter(t1, "TYPE_NORM:").trim(), - StringUtils.substringAfter(t2, "TYPE:").trim())); - } + .read() + .parquet(reportPath) + .as(Encoders.bean(BaseRecordInfo.class)) + .flatMap(rec -> { + final List> list = new ArrayList<>(); + for (final String t1 : rec.getTypes()) { + if (t1.startsWith("TYPE_NORM:")) { + for (final String t2 : rec.getTypes()) { + if (t2.startsWith("TYPE:")) { + list + .add( + new Tuple2<>(StringUtils.substringAfter(t1, "TYPE_NORM:").trim(), + StringUtils.substringAfter(t2, "TYPE:").trim())); } } } - return list.iterator(); - }, Encoders.tuple(Encoders.STRING(), Encoders.STRING())) - .distinct() - .write() - .mode(SaveMode.Overwrite) - .format("parquet") - .save(typesReportPath); + } + return list.iterator(); + }, Encoders.tuple(Encoders.STRING(), Encoders.STRING())) + .distinct() + .write() + .mode(SaveMode.Overwrite) + .format("parquet") + .save(typesReportPath); } private static void generateOpenDoarReport(final SparkSession spark, - final String reportPath, - final String opendoarPath, - final List repos) { + final String reportPath, + final String opendoarPath, + final List repos) { final Dataset fromDB = spark.createDataset(repos, Encoders.bean(OpenDoarRepoStatus.class)); final Dataset fromBASE = spark - .read() - .parquet(reportPath) - .selectExpr("explode(collections) as collection") - .where("isnotnull(collection.opendoarId) and character_length(collection.opendoarId)>0") - .selectExpr("concat('opendoar____::',collection.opendoarId) as id") - .groupBy(col("id")) - .agg(count(col("id"))) - .map(row -> { - final OpenDoarRepoStatus repo = new OpenDoarRepoStatus(); - repo.setId(row.getString(0)); - repo.getAggregations().put(BASE_DUMP, row.getLong(1)); - repo.setBaseCount(row.getLong(1)); - repo.setOpenaireCount(0); - repo.setHighCompliance(false); - return repo; - }, Encoders.bean(OpenDoarRepoStatus.class)); + .read() + .parquet(reportPath) + .selectExpr("explode(collections) as collection") + .where("isnotnull(collection.opendoarId) and character_length(collection.opendoarId)>0") + .selectExpr("concat('opendoar____::',collection.opendoarId) as id") + .groupBy(col("id")) + .agg(count(col("id"))) + .map(row -> { + final OpenDoarRepoStatus repo = new OpenDoarRepoStatus(); + repo.setId(row.getString(0)); + repo.getAggregations().put(BASE_DUMP, row.getLong(1)); + repo.setBaseCount(row.getLong(1)); + repo.setOpenaireCount(0); + repo.setHighCompliance(false); + return repo; + }, Encoders.bean(OpenDoarRepoStatus.class)); fromDB - .joinWith(fromBASE, fromDB.col("id").equalTo(fromBASE.col("id")), "full_outer") - .map(t -> merge(t._1, t._2), Encoders.bean(OpenDoarRepoStatus.class)) - .write() - .mode(SaveMode.Overwrite) - .format("parquet") - .save(opendoarPath); + .joinWith(fromBASE, fromDB.col("id").equalTo(fromBASE.col("id")), "full_outer") + .map(t -> merge(t._1, t._2), Encoders.bean(OpenDoarRepoStatus.class)) + .write() + .mode(SaveMode.Overwrite) + .format("parquet") + .save(opendoarPath); } private static OpenDoarRepoStatus merge(final OpenDoarRepoStatus r1, final OpenDoarRepoStatus r2) { - if (r1 == null) { return r2; } - if (r2 == null) { return r1; } + if (r1 == null) { + return r2; + } + if (r2 == null) { + return r1; + } final OpenDoarRepoStatus r = new OpenDoarRepoStatus(); r.setId(ObjectUtils.firstNonNull(r1.getId(), r2.getId())); @@ -212,15 +226,17 @@ public class BaseAnalyzerJob { } private static List loadOpenDoarStats(final String dbUrl, - final String dbUser, - final String dbPassword) throws Exception { + final String dbUser, + final String dbPassword) throws Exception { final List repos = new ArrayList<>(); try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) { final String sql = IOUtils - .toString(BaseAnalyzerJob.class - .getResourceAsStream("/eu/dnetlib/dhp/collection/plugin/base/sql/opendoar-aggregation-status.sql")); + .toString( + BaseAnalyzerJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/collection/plugin/base/sql/opendoar-aggregation-status.sql")); dbClient.processResults(sql, row -> { try { @@ -256,7 +272,7 @@ public class BaseAnalyzerJob { private static void loadRecords(final String inputPath, final String outputPath) throws Exception { try (final FileSystem fs = FileSystem.get(new Configuration()); - final AggregatorReport report = new AggregatorReport()) { + final AggregatorReport report = new AggregatorReport()) { final AtomicLong recordsCounter = new AtomicLong(0); @@ -264,9 +280,12 @@ public class BaseAnalyzerJob { final Text value = new Text(); try (final SequenceFile.Writer writer = SequenceFile - .createWriter(fs.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer - .keyClass(LongWritable.class), SequenceFile.Writer - .valueClass(Text.class), SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) { + .createWriter( + fs.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer + .keyClass(LongWritable.class), + SequenceFile.Writer + .valueClass(Text.class), + SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) { final BaseCollectorIterator iteraror = new BaseCollectorIterator(fs, new Path(inputPath), report); @@ -293,21 +312,21 @@ public class BaseAnalyzerJob { } private static void generateReport(final SparkSession spark, - final String inputPath, - final String targetPath) throws Exception { + final String inputPath, + final String targetPath) throws Exception { final JavaRDD rdd = JavaSparkContext - .fromSparkContext(spark.sparkContext()) - .sequenceFile(inputPath, LongWritable.class, Text.class) - .map(s -> s._2.toString()) - .map(BaseAnalyzerJob::extractInfo); + .fromSparkContext(spark.sparkContext()) + .sequenceFile(inputPath, LongWritable.class, Text.class) + .map(s -> s._2.toString()) + .map(BaseAnalyzerJob::extractInfo); spark - .createDataset(rdd.rdd(), Encoders.bean(BaseRecordInfo.class)) - .write() - .mode(SaveMode.Overwrite) - .format("parquet") - .save(targetPath); + .createDataset(rdd.rdd(), Encoders.bean(BaseRecordInfo.class)) + .write() + .mode(SaveMode.Overwrite) + .format("parquet") + .save(targetPath); } protected static BaseRecordInfo extractInfo(final String s) { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorPlugin.java index bc7bdab97..05a332fd8 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorPlugin.java @@ -45,9 +45,9 @@ public class BaseCollectorPlugin implements CollectorPlugin { public Stream collect(final ApiDescriptor api, final AggregatorReport report) throws CollectorException { // get path to file final Path filePath = Optional - .ofNullable(api.getBaseUrl()) - .map(Path::new) - .orElseThrow(() -> new CollectorException("missing baseUrl")); + .ofNullable(api.getBaseUrl()) + .map(Path::new) + .orElseThrow(() -> new CollectorException("missing baseUrl")); final String dbUrl = api.getParams().get("dbUrl"); final String dbUser = api.getParams().get("dbUser"); @@ -59,7 +59,9 @@ public class BaseCollectorPlugin implements CollectorPlugin { log.info("dbPassword: {}", "***"); try { - if (!this.fs.exists(filePath)) { throw new CollectorException("path does not exist: " + filePath); } + if (!this.fs.exists(filePath)) { + throw new CollectorException("path does not exist: " + filePath); + } } catch (final Throwable e) { throw new CollectorException(e); } @@ -69,18 +71,20 @@ public class BaseCollectorPlugin implements CollectorPlugin { final Iterator iterator = new BaseCollectorIterator(this.fs, filePath, report); final Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED); return StreamSupport - .stream(spliterator, false) - .filter(doc -> filterXml(doc, acceptedOpendoarIds, report)); + .stream(spliterator, false) + .filter(doc -> filterXml(doc, acceptedOpendoarIds, report)); } - private Set findAcceptedOpendoarIds(final String dbUrl, final String dbUser, final String dbPassword) throws CollectorException { + private Set findAcceptedOpendoarIds(final String dbUrl, final String dbUser, final String dbPassword) + throws CollectorException { final Set accepted = new HashSet<>(); try (final DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) { final String sql = IOUtils - .toString(BaseAnalyzerJob.class - .getResourceAsStream("/eu/dnetlib/dhp/collection/plugin/base/sql/opendoar-accepted.sql")); + .toString( + BaseAnalyzerJob.class + .getResourceAsStream("/eu/dnetlib/dhp/collection/plugin/base/sql/opendoar-accepted.sql")); dbClient.processResults(sql, row -> { try {