some fields in stats

This commit is contained in:
Michele Artini 2024-02-29 10:17:31 +01:00
parent 5ddbef3a5b
commit 71204a8056
2 changed files with 80 additions and 104 deletions

View File

@ -9,7 +9,6 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@ -52,18 +51,17 @@ public class BaseAnalyzerJob {
public static void main(final String[] args) throws Exception {
final String jsonConfiguration = IOUtils
.toString(
BaseAnalyzerJob.class
.getResourceAsStream("/eu/dnetlib/dhp/collection/plugin/base/action_set_parameters.json"));
.toString(BaseAnalyzerJob.class
.getResourceAsStream("/eu/dnetlib/dhp/collection/plugin/base/action_set_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
@ -96,132 +94,113 @@ public class BaseAnalyzerJob {
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
if (fromStep <= 0) {
log
.info(
"\n**************************************\n* EXECUTING STEP 0: LoadRecords\n**************************************");
.info("\n**************************************\n* EXECUTING STEP 0: LoadRecords\n**************************************");
loadRecords(inputPath, dataPath);
log
.info(
"\n**************************************\n* EXECUTING STEP 0: DONE\n**************************************");
.info("\n**************************************\n* EXECUTING STEP 0: DONE\n**************************************");
}
if (fromStep <= 1) {
log
.info(
"\n**************************************\n* EXECUTING STEP 1: Base Report\n**************************************");
.info("\n**************************************\n* EXECUTING STEP 1: Base Report\n**************************************");
generateReport(spark, dataPath, outputPath);
log
.info(
"\n**************************************\n* EXECUTING STEP 1: DONE\n**************************************");
.info("\n**************************************\n* EXECUTING STEP 1: DONE\n**************************************");
}
if (fromStep <= 2) {
log
.info(
"\n**************************************\n* EXECUTING STEP 2: OpenDOAR Report\n**************************************");
.info("\n**************************************\n* EXECUTING STEP 2: OpenDOAR Report\n**************************************");
generateOpenDoarReport(spark, outputPath, opendoarPath, loadOpenDoarStats(dbUrl, dbUser, dbPassword));
log
.info(
"\n**************************************\n* EXECUTING STEP 2: DONE\n**************************************");
.info("\n**************************************\n* EXECUTING STEP 2: DONE\n**************************************");
}
});
}
private static void generateOpenDoarReport(final SparkSession spark,
final String reportPath,
final String opendoarPath,
final List<OpenDoarRepoStatus> repos) {
final String reportPath,
final String opendoarPath,
final List<OpenDoarRepoStatus> repos) {
final Dataset<OpenDoarRepoStatus> fromDB = spark.createDataset(repos, Encoders.bean(OpenDoarRepoStatus.class));
final Dataset<OpenDoarRepoStatus> fromBASE = spark
.read()
.parquet(reportPath)
.selectExpr("explode(collections) as collection")
.where("isnotnull(collection.opendoarId) and character_length(collection.opendoarId)>0")
.selectExpr("concat('opendoar____::',collection.opendoarId) as id")
.groupBy(col("id"))
.agg(count(col("id")))
.map(row -> {
final OpenDoarRepoStatus repo = new OpenDoarRepoStatus();
repo.setId(row.getString(0));
repo.getAggregations().put(BASE_DUMP, row.getLong(1));
repo.setFromBase(true);
repo.setBaseMAX(true);
repo.setHighCompliance(false);
return repo;
}, Encoders.bean(OpenDoarRepoStatus.class));
.read()
.parquet(reportPath)
.selectExpr("explode(collections) as collection")
.where("isnotnull(collection.opendoarId) and character_length(collection.opendoarId)>0")
.selectExpr("concat('opendoar____::',collection.opendoarId) as id")
.groupBy(col("id"))
.agg(count(col("id")))
.map(row -> {
final OpenDoarRepoStatus repo = new OpenDoarRepoStatus();
repo.setId(row.getString(0));
repo.getAggregations().put(BASE_DUMP, row.getLong(1));
repo.setBaseCount(row.getLong(1));
repo.setOpenaireCount(0);
repo.setHighCompliance(false);
return repo;
}, Encoders.bean(OpenDoarRepoStatus.class));
fromDB
.joinWith(fromBASE, fromDB.col("id").equalTo(fromBASE.col("id")), "full_outer")
.map(t -> merge(t._1, t._2), Encoders.bean(OpenDoarRepoStatus.class))
.write()
.mode(SaveMode.Overwrite)
.format("parquet")
.save(opendoarPath);
.joinWith(fromBASE, fromDB.col("id").equalTo(fromBASE.col("id")), "full_outer")
.map(t -> merge(t._1, t._2), Encoders.bean(OpenDoarRepoStatus.class))
.write()
.mode(SaveMode.Overwrite)
.format("parquet")
.save(opendoarPath);
}
private static OpenDoarRepoStatus merge(final OpenDoarRepoStatus r1, final OpenDoarRepoStatus r2) {
if (r1 == null) {
return r2;
}
if (r2 == null) {
return r1;
}
if (r1 == null) { return r2; }
if (r2 == null) { return r1; }
final OpenDoarRepoStatus r = new OpenDoarRepoStatus();
r.setId(ObjectUtils.firstNonNull(r1.getId(), r2.getId()));
r.setJurisdiction(ObjectUtils.firstNonNull(r1.getJurisdiction(), r2.getJurisdiction()));
r.getAggregations().putAll(r1.getAggregations());
r.getAggregations().putAll(r2.getAggregations());
r.setFromBase(r1.isFromBase() || r2.isFromBase());
r.setHighCompliance(r1.isHighCompliance() || r2.isHighCompliance());
if (r.getAggregations().containsKey(BASE_DUMP)) {
final long baseSize = r.getAggregations().get(BASE_DUMP);
final long otherSize = r
.getAggregations()
.entrySet()
.stream()
.filter(e -> !BASE_DUMP.equals(e.getKey()))
.mapToLong(Entry::getValue)
.max()
.orElse(0);
r.setBaseMAX(baseSize > otherSize);
} else {
r.setBaseMAX(false);
}
r.setBaseCount(Math.max(r1.getBaseCount(), r2.getBaseCount()));
r.setOpenaireCount(Math.max(r1.getOpenaireCount(), r2.getOpenaireCount()));
return r;
}
private static List<OpenDoarRepoStatus> loadOpenDoarStats(final String dbUrl,
final String dbUser,
final String dbPassword) throws Exception {
final String dbUser,
final String dbPassword) throws Exception {
final List<OpenDoarRepoStatus> repos = new ArrayList<>();
try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) {
final String sql = IOUtils
.toString(
BaseAnalyzerJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/collection/plugin/base/sql/opendoar-aggregation-status.sql"));
.toString(BaseAnalyzerJob.class
.getResourceAsStream("/eu/dnetlib/dhp/collection/plugin/base/sql/opendoar-aggregation-status.sql"));
dbClient.processResults(sql, row -> {
try {
final OpenDoarRepoStatus repo = new OpenDoarRepoStatus();
repo.setId(row.getString("id"));
repo.setJurisdiction(row.getString("jurisdiction"));
repo.setBaseCount(0);
repo.setHighCompliance(false);
long sum = 0;
for (final String s : (String[]) row.getArray("aggregations").getArray()) {
final String api = StringUtils.substringBefore(s, "@@@");
final long count = NumberUtils.toLong(StringUtils.substringAfter(s, "@@@"), 0);
sum += count;
repo.getAggregations().put(api, count);
repo.setFromBase(false);
repo.setBaseMAX(false);
// This should recognize the HIGH Compliances: openaire*X.Y*
repo.setHighCompliance(s.contains("compliance: openaire"));
if (s.contains("compliance: openaire")) {
repo.setHighCompliance(true);
}
}
repo.setOpenaireCount(sum);
repos.add(repo);
log.info("# FOUND OPENDOAR (DB): " + repo.getId());
} catch (final SQLException e) {
@ -235,7 +214,7 @@ public class BaseAnalyzerJob {
private static void loadRecords(final String inputPath, final String outputPath) throws Exception {
try (final FileSystem fs = FileSystem.get(new Configuration());
final AggregatorReport report = new AggregatorReport()) {
final AggregatorReport report = new AggregatorReport()) {
final AtomicLong recordsCounter = new AtomicLong(0);
@ -243,12 +222,9 @@ public class BaseAnalyzerJob {
final Text value = new Text();
try (final SequenceFile.Writer writer = SequenceFile
.createWriter(
fs.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer
.keyClass(LongWritable.class),
SequenceFile.Writer
.valueClass(Text.class),
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
.createWriter(fs.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer
.keyClass(LongWritable.class), SequenceFile.Writer
.valueClass(Text.class), SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
final BaseCollectorIterator iteraror = new BaseCollectorIterator(fs, new Path(inputPath), report);
@ -275,21 +251,21 @@ public class BaseAnalyzerJob {
}
private static void generateReport(final SparkSession spark,
final String inputPath,
final String targetPath) throws Exception {
final String inputPath,
final String targetPath) throws Exception {
final JavaRDD<BaseRecordInfo> rdd = JavaSparkContext
.fromSparkContext(spark.sparkContext())
.sequenceFile(inputPath, LongWritable.class, Text.class)
.map(s -> s._2.toString())
.map(BaseAnalyzerJob::extractInfo);
.fromSparkContext(spark.sparkContext())
.sequenceFile(inputPath, LongWritable.class, Text.class)
.map(s -> s._2.toString())
.map(BaseAnalyzerJob::extractInfo);
spark
.createDataset(rdd.rdd(), Encoders.bean(BaseRecordInfo.class))
.write()
.mode(SaveMode.Overwrite)
.format("parquet")
.save(targetPath);
.createDataset(rdd.rdd(), Encoders.bean(BaseRecordInfo.class))
.write()
.mode(SaveMode.Overwrite)
.format("parquet")
.save(targetPath);
}
protected static BaseRecordInfo extractInfo(final String s) {

View File

@ -13,11 +13,11 @@ public class OpenDoarRepoStatus implements Serializable {
private String jurisdiction;
private boolean fromBase = false;
private boolean highCompliance = false;
private boolean baseMAX = false;
private long baseCount = 0;
private long openaireCount = 0;
private Map<String, Long> aggregations = new HashMap<>();
@ -53,19 +53,19 @@ public class OpenDoarRepoStatus implements Serializable {
this.highCompliance = highCompliance;
}
public boolean isFromBase() {
return this.fromBase;
public long getOpenaireCount() {
return this.openaireCount;
}
public void setFromBase(final boolean fromBase) {
this.fromBase = fromBase;
public void setOpenaireCount(final long openaireCount) {
this.openaireCount = openaireCount;
}
public boolean isBaseMAX() {
return this.baseMAX;
public long getBaseCount() {
return this.baseCount;
}
public void setBaseMAX(final boolean baseMAX) {
this.baseMAX = baseMAX;
public void setBaseCount(final long baseCount) {
this.baseCount = baseCount;
}
}