OpenDoar reports

This commit is contained in:
Michele Artini 2024-02-28 10:51:13 +01:00
parent f8cf7ffbcb
commit 3d14bef381
6 changed files with 274 additions and 68 deletions

View File

@ -2,8 +2,10 @@
package eu.dnetlib.dhp.collection.plugin.base;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
@ -12,7 +14,9 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -23,6 +27,7 @@ import org.apache.hadoop.io.compress.DeflateCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
@ -35,6 +40,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
public class BaseAnalyzerJob {
@ -68,78 +74,181 @@ public class BaseAnalyzerJob {
final String outputPath = parser.get("outputPath");
log.info("outputPath {}: ", outputPath);
final boolean reimport = Boolean.parseBoolean(parser.get("reimport"));
log.info("reimport {}: ", reimport);
final String opendoarPath = parser.get("opendoarPath");
log.info("opendoarPath {}: ", opendoarPath);
final int fromStep = Integer.parseInt(parser.get("fromStep"));
log.info("fromStep {}: ", fromStep);
final String dbUrl = parser.get("postgresUrl");
log.info("postgresUrl {}: ", dbUrl);
final String dbUser = parser.get("postgresUser");
log.info("postgresUser {}: ", dbUser);
final String dbPassword = parser.get("postgresPassword");
log.info("postgresPassword {}: ", dbPassword);
final SparkConf conf = new SparkConf();
runWithSparkSession(
conf, isSparkSessionManaged, spark -> processBaseRecords(spark, inputPath, dataPath, outputPath, reimport));
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
if (fromStep <= 0) {
log
.info(
"\n**************************************\n* EXECUTING STEP 0: LoadRecords\n**************************************");
loadRecords(inputPath, dataPath);
log
.info(
"\n**************************************\n* EXECUTING STEP 0: DONE\n**************************************");
}
if (fromStep <= 1) {
log
.info(
"\n**************************************\n* EXECUTING STEP 1: Base Report\n**************************************");
generateReport(spark, dataPath, outputPath);
log
.info(
"\n**************************************\n* EXECUTING STEP 1: DONE\n**************************************");
}
if (fromStep <= 2) {
log
.info(
"\n**************************************\n* EXECUTING STEP 2: OpenDOAR Report\n**************************************");
generateOpenDoarReport(spark, outputPath, opendoarPath, loadOpenDoarStats(dbUrl, dbUser, dbPassword));
log
.info(
"\n**************************************\n* EXECUTING STEP 2: DONE\n**************************************");
}
});
}
private static void processBaseRecords(final SparkSession spark,
final String inputPath,
final String dataPath,
final String outputPath,
final boolean reimport) throws IOException {
private static void generateOpenDoarReport(final SparkSession spark,
final String reportPath,
final String opendoarPath,
final List<OpenDoarRepoStatus> repos) {
final Dataset<OpenDoarRepoStatus> fromDB = spark.createDataset(repos, Encoders.bean(OpenDoarRepoStatus.class));
final Dataset<OpenDoarRepoStatus> fromBASE = spark
.read()
.parquet(reportPath)
.selectExpr("explode(collections) as collection")
.where("isnotnull(collection.opendoarId) and character_length(collection.opendoarId)>0")
.selectExpr("concat('opendoar____::',collection.opendoarId) as id")
.groupBy(col("id"))
.agg(count(col("id")))
.map(row -> {
final OpenDoarRepoStatus repo = new OpenDoarRepoStatus();
repo.setId(row.getString(0));
repo.getAggregations().put("BASE_DUMP", row.getLong(1));
return repo;
}, Encoders.bean(OpenDoarRepoStatus.class));
fromDB
.joinWith(fromBASE, fromDB.col("id").equalTo(fromBASE.col("id")), "full_outer")
.map(t -> merge(t._1, t._2), Encoders.bean(OpenDoarRepoStatus.class))
.write()
.mode(SaveMode.Overwrite)
.format("parquet")
.save(opendoarPath);
}
private static OpenDoarRepoStatus merge(final OpenDoarRepoStatus r1, final OpenDoarRepoStatus r2) {
if (r1 == null) {
return r2;
}
if (r2 == null) {
return r1;
}
final OpenDoarRepoStatus r = new OpenDoarRepoStatus();
r.setId(ObjectUtils.firstNonNull(r1.getId(), r2.getId()));
r.setJurisdiction(ObjectUtils.firstNonNull(r1.getJurisdiction(), r2.getJurisdiction()));
r.getAggregations().putAll(r1.getAggregations());
r.getAggregations().putAll(r2.getAggregations());
return r;
}
private static List<OpenDoarRepoStatus> loadOpenDoarStats(final String dbUrl,
final String dbUser,
final String dbPassword) throws Exception {
final List<OpenDoarRepoStatus> repos = new ArrayList<>();
try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) {
final String sql = IOUtils
.toString(
BaseAnalyzerJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/collection/plugin/base/sql/opendoar-aggregation-status.sql"));
dbClient.processResults(sql, row -> {
try {
final OpenDoarRepoStatus repo = new OpenDoarRepoStatus();
repo.setId(row.getString("id"));
repo.setJurisdiction(row.getString("jurisdiction"));
for (final String s : (String[]) row.getArray("aggregations").getArray()) {
final String api = StringUtils.substringBefore(s, "@@@");
final long count = NumberUtils.toLong(StringUtils.substringAfter(s, "@@@"), 0);
repo.getAggregations().put(api, count);
}
repos.add(repo);
log.info("# FOUND OPENDOAR (DB): " + repo.getId());
} catch (final SQLException e) {
log.error("Error in SQL", e);
throw new RuntimeException("Error in SQL", e);
}
});
}
return repos;
}
private static void loadRecords(final String inputPath, final String outputPath) throws Exception {
try (final FileSystem fs = FileSystem.get(new Configuration());
final AggregatorReport report = new AggregatorReport()) {
if (reimport) {
loadRecords(fs, inputPath, dataPath, report);
}
final AtomicLong recordsCounter = new AtomicLong(0);
// fs.delete(new Path(outputPath), true);
extractInfo(spark, dataPath, outputPath);
} catch (final Throwable e) {
throw new RuntimeException(e);
final LongWritable key = new LongWritable();
final Text value = new Text();
try (final SequenceFile.Writer writer = SequenceFile
.createWriter(
fs.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer
.keyClass(LongWritable.class),
SequenceFile.Writer
.valueClass(Text.class),
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
final BaseCollectorIterator iteraror = new BaseCollectorIterator(fs, new Path(inputPath), report);
while (iteraror.hasNext()) {
final String record = iteraror.next();
final long i = recordsCounter.incrementAndGet();
if ((i % 10000) == 0) {
log.info("# Loaded records: " + i);
}
key.set(i);
value.set(record);
try {
writer.append(key, value);
} catch (final Throwable e1) {
throw new RuntimeException(e1);
}
}
log.info("# COMPLETED - Loaded records: " + recordsCounter.get());
}
}
}
private static void loadRecords(final FileSystem fs,
final String inputPath,
final String outputPath,
final AggregatorReport report)
throws Exception {
final AtomicLong recordsCounter = new AtomicLong(0);
final LongWritable key = new LongWritable();
final Text value = new Text();
try (final SequenceFile.Writer writer = SequenceFile
.createWriter(
fs.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer
.keyClass(LongWritable.class),
SequenceFile.Writer
.valueClass(Text.class),
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
final BaseCollectorIterator iteraror = new BaseCollectorIterator(fs, new Path(inputPath), report);
while (iteraror.hasNext()) {
final String record = iteraror.next();
final long i = recordsCounter.incrementAndGet();
if ((i % 10000) == 0) {
log.info("# Loaded records: " + i);
}
key.set(i);
value.set(record);
try {
writer.append(key, value);
} catch (final Throwable e1) {
throw new RuntimeException(e1);
}
}
log.info("# COMPLETED - Loaded records: " + recordsCounter.get());
}
}
private static void extractInfo(final SparkSession spark,
private static void generateReport(final SparkSession spark,
final String inputPath,
final String targetPath) throws Exception {

View File

@ -33,7 +33,7 @@ public class BaseCollectorIterator implements Iterator<String> {
private String nextElement;
private final BlockingQueue<String> queue = new LinkedBlockingQueue<>(20);
private final BlockingQueue<String> queue = new LinkedBlockingQueue<>(100);
private static final Logger log = LoggerFactory.getLogger(BaseCollectorIterator.class);

View File

@ -0,0 +1,42 @@
package eu.dnetlib.dhp.collection.plugin.base;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
public class OpenDoarRepoStatus implements Serializable {
private static final long serialVersionUID = 4832658700366871160L;
private String id;
private String jurisdiction;
private Map<String, Long> aggregations = new HashMap<>();
public String getId() {
return this.id;
}
public void setId(final String id) {
this.id = id;
}
public String getJurisdiction() {
return this.jurisdiction;
}
public void setJurisdiction(final String jurisdiction) {
this.jurisdiction = jurisdiction;
}
public Map<String, Long> getAggregations() {
return this.aggregations;
}
public void setAggregations(final Map<String, Long> aggregations) {
this.aggregations = aggregations;
}
}

View File

@ -14,13 +14,37 @@
{
"paramName": "o",
"paramLongName": "outputPath",
"paramDescription": "the path of the generated report",
"paramDescription": "the path of the generated the report",
"paramRequired": true
},
{
"paramName": "r",
"paramLongName": "reimport",
"paramDescription": "complete re-import",
"paramName": "od",
"paramLongName": "opendoarPath",
"paramDescription": "the path of the generated the OpenDOAR report",
"paramRequired": true
},
{
"paramName": "f",
"paramLongName": "fromStep",
"paramDescription": "the initial step (numeric, 0 for ALL STEPS)",
"paramRequired": true
},
{
"paramName": "pgurl",
"paramLongName": "postgresUrl",
"paramDescription": "postgres url, example: jdbc:postgresql://localhost:5432/testdb",
"paramRequired": true
},
{
"paramName": "pguser",
"paramLongName": "postgresUser",
"paramDescription": "postgres user",
"paramRequired": false
},
{
"paramName": "pgpasswd",
"paramLongName": "postgresPassword",
"paramDescription": "postgres password",
"paramRequired": false
}
]

View File

@ -13,8 +13,24 @@
<description>path where to store the reports</description>
</property>
<property>
<name>baseReimportFlag</name>
<description>flag to re-import the records from dump</description>
<name>baseOpenDoarReportsPath</name>
<description>path where to store the OpenDOAR reports</description>
</property>
<property>
<name>postgresURL</name>
<description>the postgres URL to access to the database</description>
</property>
<property>
<name>postgresUser</name>
<description>the user postgres</description>
</property>
<property>
<name>postgresPassword</name>
<description>the password postgres</description>
</property>
<property>
<name>baseFromStep</name>
<description>the initial step (numeric, 0 for ALL STEPS)</description>
</property>
</parameters>
@ -44,7 +60,11 @@
<arg>--inputPath</arg><arg>${baseInputPath}</arg>
<arg>--dataPath</arg><arg>${baseDataPath}</arg>
<arg>--outputPath</arg><arg>${baseReportsPath}</arg>
<arg>--reimport</arg><arg>${baseReimportFlag}</arg>
<arg>--opendoarPath</arg><arg>${baseOpenDoarReportsPath}</arg>
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
<arg>--fromStep</arg><arg>${baseFromStep}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>

View File

@ -0,0 +1,11 @@
select
s.id as id,
s.jurisdiction as jurisdiction,
array_remove(array_agg(a.id || ' (' || coalesce(a.compatibility_override, a.compatibility, 'UNKNOWN') || '):' || coalesce(a.last_collection_total, 0)), NULL) as aggregations
from
dsm_services s
join dsm_api a on (s.id = a.service)
where
collectedfrom = 'openaire____::opendoar'
group by
s.id;