selection of the new plugin
This commit is contained in:
parent
9506d80ddc
commit
db6f774394
|
@ -19,6 +19,7 @@ import org.slf4j.LoggerFactory;
|
|||
import eu.dnetlib.dhp.aggregation.common.ReporterCallback;
|
||||
import eu.dnetlib.dhp.aggregation.common.ReportingJob;
|
||||
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
|
||||
import eu.dnetlib.dhp.collection.plugin.base.BaseCollectorPlugin;
|
||||
import eu.dnetlib.dhp.collection.plugin.file.FileCollectorPlugin;
|
||||
import eu.dnetlib.dhp.collection.plugin.file.FileGZipCollectorPlugin;
|
||||
import eu.dnetlib.dhp.collection.plugin.mongodb.MDStoreCollectorPlugin;
|
||||
|
@ -57,7 +58,7 @@ public class CollectorWorker extends ReportingJob {
|
|||
|
||||
public void collect() throws UnknownCollectorPluginException, CollectorException, IOException {
|
||||
|
||||
final String outputPath = mdStoreVersion.getHdfsPath() + SEQUENCE_FILE_NAME;
|
||||
final String outputPath = this.mdStoreVersion.getHdfsPath() + SEQUENCE_FILE_NAME;
|
||||
log.info("outputPath path is {}", outputPath);
|
||||
|
||||
final CollectorPlugin plugin = getCollectorPlugin();
|
||||
|
@ -67,36 +68,36 @@ public class CollectorWorker extends ReportingJob {
|
|||
|
||||
try (SequenceFile.Writer writer = SequenceFile
|
||||
.createWriter(
|
||||
fileSystem.getConf(),
|
||||
SequenceFile.Writer.file(new Path(outputPath)),
|
||||
SequenceFile.Writer.keyClass(IntWritable.class),
|
||||
SequenceFile.Writer.valueClass(Text.class),
|
||||
this.fileSystem.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer
|
||||
.keyClass(IntWritable.class),
|
||||
SequenceFile.Writer
|
||||
.valueClass(Text.class),
|
||||
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
|
||||
final IntWritable key = new IntWritable(counter.get());
|
||||
final Text value = new Text();
|
||||
plugin
|
||||
.collect(api, report)
|
||||
.forEach(
|
||||
content -> {
|
||||
key.set(counter.getAndIncrement());
|
||||
value.set(content);
|
||||
try {
|
||||
writer.append(key, value);
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
} catch (Throwable e) {
|
||||
report.put(e.getClass().getName(), e.getMessage());
|
||||
.collect(this.api, this.report)
|
||||
.forEach(content -> {
|
||||
key.set(counter.getAndIncrement());
|
||||
value.set(content);
|
||||
try {
|
||||
writer.append(key, value);
|
||||
} catch (final Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
} catch (final Throwable e) {
|
||||
this.report.put(e.getClass().getName(), e.getMessage());
|
||||
throw new CollectorException(e);
|
||||
} finally {
|
||||
shutdown();
|
||||
report.ongoing(counter.longValue(), counter.longValue());
|
||||
this.report.ongoing(counter.longValue(), counter.longValue());
|
||||
}
|
||||
}
|
||||
|
||||
private void scheduleReport(AtomicInteger counter) {
|
||||
private void scheduleReport(final AtomicInteger counter) {
|
||||
schedule(new ReporterCallback() {
|
||||
|
||||
@Override
|
||||
public Long getCurrent() {
|
||||
return counter.longValue();
|
||||
|
@ -111,31 +112,33 @@ public class CollectorWorker extends ReportingJob {
|
|||
|
||||
private CollectorPlugin getCollectorPlugin() throws UnknownCollectorPluginException {
|
||||
|
||||
switch (CollectorPlugin.NAME.valueOf(api.getProtocol())) {
|
||||
switch (CollectorPlugin.NAME.valueOf(this.api.getProtocol())) {
|
||||
case oai:
|
||||
return new OaiCollectorPlugin(clientParams);
|
||||
return new OaiCollectorPlugin(this.clientParams);
|
||||
case rest_json2xml:
|
||||
return new RestCollectorPlugin(clientParams);
|
||||
return new RestCollectorPlugin(this.clientParams);
|
||||
case file:
|
||||
return new FileCollectorPlugin(fileSystem);
|
||||
return new FileCollectorPlugin(this.fileSystem);
|
||||
case fileGzip:
|
||||
return new FileGZipCollectorPlugin(fileSystem);
|
||||
return new FileGZipCollectorPlugin(this.fileSystem);
|
||||
case baseDump:
|
||||
return new BaseCollectorPlugin(this.fileSystem);
|
||||
case other:
|
||||
final CollectorPlugin.NAME.OTHER_NAME plugin = Optional
|
||||
.ofNullable(api.getParams().get("other_plugin_type"))
|
||||
.ofNullable(this.api.getParams().get("other_plugin_type"))
|
||||
.map(CollectorPlugin.NAME.OTHER_NAME::valueOf)
|
||||
.orElseThrow(() -> new IllegalArgumentException("invalid other_plugin_type"));
|
||||
|
||||
switch (plugin) {
|
||||
case mdstore_mongodb_dump:
|
||||
return new MongoDbDumpCollectorPlugin(fileSystem);
|
||||
return new MongoDbDumpCollectorPlugin(this.fileSystem);
|
||||
case mdstore_mongodb:
|
||||
return new MDStoreCollectorPlugin();
|
||||
default:
|
||||
throw new UnknownCollectorPluginException("plugin is not managed: " + plugin);
|
||||
}
|
||||
default:
|
||||
throw new UnknownCollectorPluginException("protocol is not managed: " + api.getProtocol());
|
||||
throw new UnknownCollectorPluginException("protocol is not managed: " + this.api.getProtocol());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -10,7 +10,8 @@ import eu.dnetlib.dhp.common.collection.CollectorException;
|
|||
public interface CollectorPlugin {
|
||||
|
||||
enum NAME {
|
||||
oai, other, rest_json2xml, file, fileGzip;
|
||||
|
||||
oai, other, rest_json2xml, file, fileGzip, baseDump;
|
||||
|
||||
public enum OTHER_NAME {
|
||||
mdstore_mongodb_dump, mdstore_mongodb
|
||||
|
|
|
@ -52,17 +52,18 @@ public class BaseAnalyzerJob {
|
|||
public static void main(final String[] args) throws Exception {
|
||||
|
||||
final String jsonConfiguration = IOUtils
|
||||
.toString(BaseAnalyzerJob.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/collection/plugin/base/action_set_parameters.json"));
|
||||
.toString(
|
||||
BaseAnalyzerJob.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/collection/plugin/base/action_set_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
final Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
|
@ -98,106 +99,119 @@ public class BaseAnalyzerJob {
|
|||
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||
if (fromStep <= 0) {
|
||||
log
|
||||
.info("\n**************************************\n* EXECUTING STEP 0: LoadRecords\n**************************************");
|
||||
.info(
|
||||
"\n**************************************\n* EXECUTING STEP 0: LoadRecords\n**************************************");
|
||||
loadRecords(inputPath, dataPath);
|
||||
log
|
||||
.info("\n**************************************\n* EXECUTING STEP 0: DONE\n**************************************");
|
||||
.info(
|
||||
"\n**************************************\n* EXECUTING STEP 0: DONE\n**************************************");
|
||||
}
|
||||
|
||||
if (fromStep <= 1) {
|
||||
log
|
||||
.info("\n**************************************\n* EXECUTING STEP 1: Base Report\n**************************************");
|
||||
.info(
|
||||
"\n**************************************\n* EXECUTING STEP 1: Base Report\n**************************************");
|
||||
generateReport(spark, dataPath, outputPath);
|
||||
log
|
||||
.info("\n**************************************\n* EXECUTING STEP 1: DONE\n**************************************");
|
||||
.info(
|
||||
"\n**************************************\n* EXECUTING STEP 1: DONE\n**************************************");
|
||||
}
|
||||
|
||||
if (fromStep <= 2) {
|
||||
log
|
||||
.info("\n**************************************\n* EXECUTING STEP 2: OpenDOAR Report\n**************************************");
|
||||
.info(
|
||||
"\n**************************************\n* EXECUTING STEP 2: OpenDOAR Report\n**************************************");
|
||||
generateOpenDoarReport(spark, outputPath, opendoarPath, loadOpenDoarStats(dbUrl, dbUser, dbPassword));
|
||||
log
|
||||
.info("\n**************************************\n* EXECUTING STEP 2: DONE\n**************************************");
|
||||
.info(
|
||||
"\n**************************************\n* EXECUTING STEP 2: DONE\n**************************************");
|
||||
}
|
||||
|
||||
if (fromStep <= 3) {
|
||||
log
|
||||
.info("\n**************************************\n* EXECUTING STEP 3: Type Vocabulary Report\n**************************************");
|
||||
.info(
|
||||
"\n**************************************\n* EXECUTING STEP 3: Type Vocabulary Report\n**************************************");
|
||||
generateVocTypeReport(spark, outputPath, typesReportPath);
|
||||
log
|
||||
.info("\n**************************************\n* EXECUTING STEP 3: DONE\n**************************************");
|
||||
.info(
|
||||
"\n**************************************\n* EXECUTING STEP 3: DONE\n**************************************");
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
private static void generateVocTypeReport(final SparkSession spark,
|
||||
final String reportPath,
|
||||
final String typesReportPath) {
|
||||
final String reportPath,
|
||||
final String typesReportPath) {
|
||||
spark
|
||||
.read()
|
||||
.parquet(reportPath)
|
||||
.as(Encoders.bean(BaseRecordInfo.class))
|
||||
.flatMap(rec -> {
|
||||
final List<Tuple2<String, String>> list = new ArrayList<>();
|
||||
for (final String t1 : rec.getTypes()) {
|
||||
if (t1.startsWith("TYPE_NORM:")) {
|
||||
for (final String t2 : rec.getTypes()) {
|
||||
if (t2.startsWith("TYPE:")) {
|
||||
list
|
||||
.add(new Tuple2<>(StringUtils.substringAfter(t1, "TYPE_NORM:").trim(),
|
||||
StringUtils.substringAfter(t2, "TYPE:").trim()));
|
||||
}
|
||||
.read()
|
||||
.parquet(reportPath)
|
||||
.as(Encoders.bean(BaseRecordInfo.class))
|
||||
.flatMap(rec -> {
|
||||
final List<Tuple2<String, String>> list = new ArrayList<>();
|
||||
for (final String t1 : rec.getTypes()) {
|
||||
if (t1.startsWith("TYPE_NORM:")) {
|
||||
for (final String t2 : rec.getTypes()) {
|
||||
if (t2.startsWith("TYPE:")) {
|
||||
list
|
||||
.add(
|
||||
new Tuple2<>(StringUtils.substringAfter(t1, "TYPE_NORM:").trim(),
|
||||
StringUtils.substringAfter(t2, "TYPE:").trim()));
|
||||
}
|
||||
}
|
||||
}
|
||||
return list.iterator();
|
||||
}, Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
|
||||
.distinct()
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.format("parquet")
|
||||
.save(typesReportPath);
|
||||
}
|
||||
return list.iterator();
|
||||
}, Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
|
||||
.distinct()
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.format("parquet")
|
||||
.save(typesReportPath);
|
||||
|
||||
}
|
||||
|
||||
private static void generateOpenDoarReport(final SparkSession spark,
|
||||
final String reportPath,
|
||||
final String opendoarPath,
|
||||
final List<OpenDoarRepoStatus> repos) {
|
||||
final String reportPath,
|
||||
final String opendoarPath,
|
||||
final List<OpenDoarRepoStatus> repos) {
|
||||
|
||||
final Dataset<OpenDoarRepoStatus> fromDB = spark.createDataset(repos, Encoders.bean(OpenDoarRepoStatus.class));
|
||||
|
||||
final Dataset<OpenDoarRepoStatus> fromBASE = spark
|
||||
.read()
|
||||
.parquet(reportPath)
|
||||
.selectExpr("explode(collections) as collection")
|
||||
.where("isnotnull(collection.opendoarId) and character_length(collection.opendoarId)>0")
|
||||
.selectExpr("concat('opendoar____::',collection.opendoarId) as id")
|
||||
.groupBy(col("id"))
|
||||
.agg(count(col("id")))
|
||||
.map(row -> {
|
||||
final OpenDoarRepoStatus repo = new OpenDoarRepoStatus();
|
||||
repo.setId(row.getString(0));
|
||||
repo.getAggregations().put(BASE_DUMP, row.getLong(1));
|
||||
repo.setBaseCount(row.getLong(1));
|
||||
repo.setOpenaireCount(0);
|
||||
repo.setHighCompliance(false);
|
||||
return repo;
|
||||
}, Encoders.bean(OpenDoarRepoStatus.class));
|
||||
.read()
|
||||
.parquet(reportPath)
|
||||
.selectExpr("explode(collections) as collection")
|
||||
.where("isnotnull(collection.opendoarId) and character_length(collection.opendoarId)>0")
|
||||
.selectExpr("concat('opendoar____::',collection.opendoarId) as id")
|
||||
.groupBy(col("id"))
|
||||
.agg(count(col("id")))
|
||||
.map(row -> {
|
||||
final OpenDoarRepoStatus repo = new OpenDoarRepoStatus();
|
||||
repo.setId(row.getString(0));
|
||||
repo.getAggregations().put(BASE_DUMP, row.getLong(1));
|
||||
repo.setBaseCount(row.getLong(1));
|
||||
repo.setOpenaireCount(0);
|
||||
repo.setHighCompliance(false);
|
||||
return repo;
|
||||
}, Encoders.bean(OpenDoarRepoStatus.class));
|
||||
|
||||
fromDB
|
||||
.joinWith(fromBASE, fromDB.col("id").equalTo(fromBASE.col("id")), "full_outer")
|
||||
.map(t -> merge(t._1, t._2), Encoders.bean(OpenDoarRepoStatus.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.format("parquet")
|
||||
.save(opendoarPath);
|
||||
.joinWith(fromBASE, fromDB.col("id").equalTo(fromBASE.col("id")), "full_outer")
|
||||
.map(t -> merge(t._1, t._2), Encoders.bean(OpenDoarRepoStatus.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.format("parquet")
|
||||
.save(opendoarPath);
|
||||
}
|
||||
|
||||
private static OpenDoarRepoStatus merge(final OpenDoarRepoStatus r1, final OpenDoarRepoStatus r2) {
|
||||
if (r1 == null) { return r2; }
|
||||
if (r2 == null) { return r1; }
|
||||
if (r1 == null) {
|
||||
return r2;
|
||||
}
|
||||
if (r2 == null) {
|
||||
return r1;
|
||||
}
|
||||
|
||||
final OpenDoarRepoStatus r = new OpenDoarRepoStatus();
|
||||
r.setId(ObjectUtils.firstNonNull(r1.getId(), r2.getId()));
|
||||
|
@ -212,15 +226,17 @@ public class BaseAnalyzerJob {
|
|||
}
|
||||
|
||||
private static List<OpenDoarRepoStatus> loadOpenDoarStats(final String dbUrl,
|
||||
final String dbUser,
|
||||
final String dbPassword) throws Exception {
|
||||
final String dbUser,
|
||||
final String dbPassword) throws Exception {
|
||||
final List<OpenDoarRepoStatus> repos = new ArrayList<>();
|
||||
|
||||
try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) {
|
||||
|
||||
final String sql = IOUtils
|
||||
.toString(BaseAnalyzerJob.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/collection/plugin/base/sql/opendoar-aggregation-status.sql"));
|
||||
.toString(
|
||||
BaseAnalyzerJob.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/collection/plugin/base/sql/opendoar-aggregation-status.sql"));
|
||||
|
||||
dbClient.processResults(sql, row -> {
|
||||
try {
|
||||
|
@ -256,7 +272,7 @@ public class BaseAnalyzerJob {
|
|||
|
||||
private static void loadRecords(final String inputPath, final String outputPath) throws Exception {
|
||||
try (final FileSystem fs = FileSystem.get(new Configuration());
|
||||
final AggregatorReport report = new AggregatorReport()) {
|
||||
final AggregatorReport report = new AggregatorReport()) {
|
||||
|
||||
final AtomicLong recordsCounter = new AtomicLong(0);
|
||||
|
||||
|
@ -264,9 +280,12 @@ public class BaseAnalyzerJob {
|
|||
final Text value = new Text();
|
||||
|
||||
try (final SequenceFile.Writer writer = SequenceFile
|
||||
.createWriter(fs.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer
|
||||
.keyClass(LongWritable.class), SequenceFile.Writer
|
||||
.valueClass(Text.class), SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
|
||||
.createWriter(
|
||||
fs.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer
|
||||
.keyClass(LongWritable.class),
|
||||
SequenceFile.Writer
|
||||
.valueClass(Text.class),
|
||||
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
|
||||
|
||||
final BaseCollectorIterator iteraror = new BaseCollectorIterator(fs, new Path(inputPath), report);
|
||||
|
||||
|
@ -293,21 +312,21 @@ public class BaseAnalyzerJob {
|
|||
}
|
||||
|
||||
private static void generateReport(final SparkSession spark,
|
||||
final String inputPath,
|
||||
final String targetPath) throws Exception {
|
||||
final String inputPath,
|
||||
final String targetPath) throws Exception {
|
||||
|
||||
final JavaRDD<BaseRecordInfo> rdd = JavaSparkContext
|
||||
.fromSparkContext(spark.sparkContext())
|
||||
.sequenceFile(inputPath, LongWritable.class, Text.class)
|
||||
.map(s -> s._2.toString())
|
||||
.map(BaseAnalyzerJob::extractInfo);
|
||||
.fromSparkContext(spark.sparkContext())
|
||||
.sequenceFile(inputPath, LongWritable.class, Text.class)
|
||||
.map(s -> s._2.toString())
|
||||
.map(BaseAnalyzerJob::extractInfo);
|
||||
|
||||
spark
|
||||
.createDataset(rdd.rdd(), Encoders.bean(BaseRecordInfo.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.format("parquet")
|
||||
.save(targetPath);
|
||||
.createDataset(rdd.rdd(), Encoders.bean(BaseRecordInfo.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.format("parquet")
|
||||
.save(targetPath);
|
||||
}
|
||||
|
||||
protected static BaseRecordInfo extractInfo(final String s) {
|
||||
|
|
|
@ -45,9 +45,9 @@ public class BaseCollectorPlugin implements CollectorPlugin {
|
|||
public Stream<String> collect(final ApiDescriptor api, final AggregatorReport report) throws CollectorException {
|
||||
// get path to file
|
||||
final Path filePath = Optional
|
||||
.ofNullable(api.getBaseUrl())
|
||||
.map(Path::new)
|
||||
.orElseThrow(() -> new CollectorException("missing baseUrl"));
|
||||
.ofNullable(api.getBaseUrl())
|
||||
.map(Path::new)
|
||||
.orElseThrow(() -> new CollectorException("missing baseUrl"));
|
||||
|
||||
final String dbUrl = api.getParams().get("dbUrl");
|
||||
final String dbUser = api.getParams().get("dbUser");
|
||||
|
@ -59,7 +59,9 @@ public class BaseCollectorPlugin implements CollectorPlugin {
|
|||
log.info("dbPassword: {}", "***");
|
||||
|
||||
try {
|
||||
if (!this.fs.exists(filePath)) { throw new CollectorException("path does not exist: " + filePath); }
|
||||
if (!this.fs.exists(filePath)) {
|
||||
throw new CollectorException("path does not exist: " + filePath);
|
||||
}
|
||||
} catch (final Throwable e) {
|
||||
throw new CollectorException(e);
|
||||
}
|
||||
|
@ -69,18 +71,20 @@ public class BaseCollectorPlugin implements CollectorPlugin {
|
|||
final Iterator<String> iterator = new BaseCollectorIterator(this.fs, filePath, report);
|
||||
final Spliterator<String> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED);
|
||||
return StreamSupport
|
||||
.stream(spliterator, false)
|
||||
.filter(doc -> filterXml(doc, acceptedOpendoarIds, report));
|
||||
.stream(spliterator, false)
|
||||
.filter(doc -> filterXml(doc, acceptedOpendoarIds, report));
|
||||
}
|
||||
|
||||
private Set<String> findAcceptedOpendoarIds(final String dbUrl, final String dbUser, final String dbPassword) throws CollectorException {
|
||||
private Set<String> findAcceptedOpendoarIds(final String dbUrl, final String dbUser, final String dbPassword)
|
||||
throws CollectorException {
|
||||
final Set<String> accepted = new HashSet<>();
|
||||
|
||||
try (final DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) {
|
||||
|
||||
final String sql = IOUtils
|
||||
.toString(BaseAnalyzerJob.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/collection/plugin/base/sql/opendoar-accepted.sql"));
|
||||
.toString(
|
||||
BaseAnalyzerJob.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/collection/plugin/base/sql/opendoar-accepted.sql"));
|
||||
|
||||
dbClient.processResults(sql, row -> {
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue