1
0
Fork 0

code formatting

This commit is contained in:
Claudio Atzori 2024-09-30 15:13:23 +02:00
parent 3854fcc5e0
commit 6e0b6a886f
7 changed files with 135 additions and 125 deletions

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.schema.oaf.utils; package eu.dnetlib.dhp.schema.oaf.utils;
import java.util.*; import java.util.*;

View File

@ -100,7 +100,7 @@ public class PrepareAffiliationRelations implements Serializable {
String openapcInputPath, String dataciteInputPath, String webcrawlInputPath, String publisherlInputPath, String openapcInputPath, String dataciteInputPath, String webcrawlInputPath, String publisherlInputPath,
String outputPath) { String outputPath) {
List<KeyValue> collectedfromOpenAIRE = OafMapperUtils List<KeyValue> collectedfromOpenAIRE = OafMapperUtils
.listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME); .listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME);
JavaPairRDD<Text, Text> crossrefRelations = prepareAffiliationRelationsNewModel( JavaPairRDD<Text, Text> crossrefRelations = prepareAffiliationRelationsNewModel(
spark, crossrefInputPath, collectedfromOpenAIRE); spark, crossrefInputPath, collectedfromOpenAIRE);
@ -130,7 +130,8 @@ public class PrepareAffiliationRelations implements Serializable {
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class); outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class);
} }
private static JavaPairRDD<Text, Text> prepareAffiliationRelationFromPublisherNewModel(SparkSession spark, String inputPath, private static JavaPairRDD<Text, Text> prepareAffiliationRelationFromPublisherNewModel(SparkSession spark,
String inputPath,
List<KeyValue> collectedfrom) { List<KeyValue> collectedfrom) {
Dataset<Row> df = spark Dataset<Row> df = spark
@ -145,29 +146,28 @@ public class PrepareAffiliationRelations implements Serializable {
} }
private static JavaPairRDD<Text, Text> prepareAffiliationRelationFromPublisher(SparkSession spark, String inputPath, private static JavaPairRDD<Text, Text> prepareAffiliationRelationFromPublisher(SparkSession spark, String inputPath,
List<KeyValue> collectedfrom) { List<KeyValue> collectedfrom) {
Dataset<Row> df = spark Dataset<Row> df = spark
.read() .read()
.schema("`DOI` STRING, `Organizations` ARRAY<STRUCT<`RORid`:STRING,`Confidence`:DOUBLE>>") .schema("`DOI` STRING, `Organizations` ARRAY<STRUCT<`RORid`:STRING,`Confidence`:DOUBLE>>")
.json(inputPath) .json(inputPath)
.where("DOI is not null"); .where("DOI is not null");
return getTextTextJavaPairRDD(collectedfrom, df.selectExpr("DOI", "Organizations as Matchings")); return getTextTextJavaPairRDD(collectedfrom, df.selectExpr("DOI", "Organizations as Matchings"));
} }
private static <I extends Result> JavaPairRDD<Text, Text> prepareAffiliationRelations(SparkSession spark, private static <I extends Result> JavaPairRDD<Text, Text> prepareAffiliationRelations(SparkSession spark,
String inputPath, String inputPath,
List<KeyValue> collectedfrom) { List<KeyValue> collectedfrom) {
// load and parse affiliation relations from HDFS // load and parse affiliation relations from HDFS
Dataset<Row> df = spark Dataset<Row> df = spark
.read() .read()
.schema("`DOI` STRING, `Matchings` ARRAY<STRUCT<`RORid`:STRING,`Confidence`:DOUBLE>>") .schema("`DOI` STRING, `Matchings` ARRAY<STRUCT<`RORid`:STRING,`Confidence`:DOUBLE>>")
.json(inputPath) .json(inputPath)
.where("DOI is not null"); .where("DOI is not null");
return getTextTextJavaPairRDD(collectedfrom, df); return getTextTextJavaPairRDD(collectedfrom, df);
} }
@ -189,49 +189,49 @@ public class PrepareAffiliationRelations implements Serializable {
private static JavaPairRDD<Text, Text> getTextTextJavaPairRDD(List<KeyValue> collectedfrom, Dataset<Row> df) { private static JavaPairRDD<Text, Text> getTextTextJavaPairRDD(List<KeyValue> collectedfrom, Dataset<Row> df) {
// unroll nested arrays // unroll nested arrays
df = df df = df
.withColumn("matching", functions.explode(new Column("Matchings"))) .withColumn("matching", functions.explode(new Column("Matchings")))
.select( .select(
new Column("DOI").as("doi"), new Column("DOI").as("doi"),
new Column("matching.RORid").as("rorid"), new Column("matching.RORid").as("rorid"),
new Column("matching.Confidence").as("confidence")); new Column("matching.Confidence").as("confidence"));
// prepare action sets for affiliation relations // prepare action sets for affiliation relations
return df return df
.toJavaRDD() .toJavaRDD()
.flatMap((FlatMapFunction<Row, Relation>) row -> { .flatMap((FlatMapFunction<Row, Relation>) row -> {
// DOI to OpenAIRE id // DOI to OpenAIRE id
final String paperId = ID_PREFIX final String paperId = ID_PREFIX
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", removePrefix(row.getAs("doi")))); + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", removePrefix(row.getAs("doi"))));
// ROR id to OpenAIRE id // ROR id to OpenAIRE id
final String affId = GenerateRorActionSetJob.calculateOpenaireId(row.getAs("rorid")); final String affId = GenerateRorActionSetJob.calculateOpenaireId(row.getAs("rorid"));
Qualifier qualifier = OafMapperUtils Qualifier qualifier = OafMapperUtils
.qualifier( .qualifier(
BIP_AFFILIATIONS_CLASSID, BIP_AFFILIATIONS_CLASSID,
BIP_AFFILIATIONS_CLASSNAME, BIP_AFFILIATIONS_CLASSNAME,
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS); ModelConstants.DNET_PROVENANCE_ACTIONS);
// format data info; setting `confidence` into relation's `trust` // format data info; setting `confidence` into relation's `trust`
DataInfo dataInfo = OafMapperUtils DataInfo dataInfo = OafMapperUtils
.dataInfo( .dataInfo(
false, false,
BIP_INFERENCE_PROVENANCE, BIP_INFERENCE_PROVENANCE,
true, true,
false, false,
qualifier, qualifier,
Double.toString(row.getAs("confidence"))); Double.toString(row.getAs("confidence")));
// return bi-directional relations // return bi-directional relations
return getAffiliationRelationPair(paperId, affId, collectedfrom, dataInfo).iterator(); return getAffiliationRelationPair(paperId, affId, collectedfrom, dataInfo).iterator();
}) })
.map(p -> new AtomicAction(Relation.class, p)) .map(p -> new AtomicAction(Relation.class, p))
.mapToPair( .mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa)))); new Text(OBJECT_MAPPER.writeValueAsString(aa))));
} }
private static JavaPairRDD<Text, Text> getTextTextJavaPairRDDNew(List<KeyValue> collectedfrom, Dataset<Row> df) { private static JavaPairRDD<Text, Text> getTextTextJavaPairRDDNew(List<KeyValue> collectedfrom, Dataset<Row> df) {
@ -292,7 +292,7 @@ public class PrepareAffiliationRelations implements Serializable {
} }
private static String removePrefix(String doi) { private static String removePrefix(String doi) {
if(doi.startsWith(DOI_URL_PREFIX)) if (doi.startsWith(DOI_URL_PREFIX))
return doi.substring(DOI_URL_PREFIX_LENGTH); return doi.substring(DOI_URL_PREFIX_LENGTH);
return doi; return doi;
} }

View File

@ -46,11 +46,11 @@ public class CollectorWorker extends ReportingJob {
private final HttpClientParams clientParams; private final HttpClientParams clientParams;
public CollectorWorker( public CollectorWorker(
final ApiDescriptor api, final ApiDescriptor api,
final FileSystem fileSystem, final FileSystem fileSystem,
final MDStoreVersion mdStoreVersion, final MDStoreVersion mdStoreVersion,
final HttpClientParams clientParams, final HttpClientParams clientParams,
final AggregatorReport report) { final AggregatorReport report) {
super(report); super(report);
this.api = api; this.api = api;
this.fileSystem = fileSystem; this.fileSystem = fileSystem;
@ -69,22 +69,25 @@ public class CollectorWorker extends ReportingJob {
scheduleReport(counter); scheduleReport(counter);
try (SequenceFile.Writer writer = SequenceFile try (SequenceFile.Writer writer = SequenceFile
.createWriter(this.fileSystem.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer .createWriter(
.keyClass(IntWritable.class), SequenceFile.Writer this.fileSystem.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer
.valueClass(Text.class), SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) { .keyClass(IntWritable.class),
SequenceFile.Writer
.valueClass(Text.class),
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
final IntWritable key = new IntWritable(counter.get()); final IntWritable key = new IntWritable(counter.get());
final Text value = new Text(); final Text value = new Text();
plugin plugin
.collect(this.api, this.report) .collect(this.api, this.report)
.forEach(content -> { .forEach(content -> {
key.set(counter.getAndIncrement()); key.set(counter.getAndIncrement());
value.set(content); value.set(content);
try { try {
writer.append(key, value); writer.append(key, value);
} catch (final Throwable e) { } catch (final Throwable e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}); });
} catch (final Throwable e) { } catch (final Throwable e) {
this.report.put(e.getClass().getName(), e.getMessage()); this.report.put(e.getClass().getName(), e.getMessage());
throw new CollectorException(e); throw new CollectorException(e);
@ -112,36 +115,36 @@ public class CollectorWorker extends ReportingJob {
private CollectorPlugin getCollectorPlugin() throws UnknownCollectorPluginException { private CollectorPlugin getCollectorPlugin() throws UnknownCollectorPluginException {
switch (CollectorPlugin.NAME.valueOf(this.api.getProtocol())) { switch (CollectorPlugin.NAME.valueOf(this.api.getProtocol())) {
case oai: case oai:
return new OaiCollectorPlugin(this.clientParams); return new OaiCollectorPlugin(this.clientParams);
case rest_json2xml: case rest_json2xml:
return new RestCollectorPlugin(this.clientParams); return new RestCollectorPlugin(this.clientParams);
case file: case file:
return new FileCollectorPlugin(this.fileSystem); return new FileCollectorPlugin(this.fileSystem);
case fileGzip: case fileGzip:
return new FileGZipCollectorPlugin(this.fileSystem); return new FileGZipCollectorPlugin(this.fileSystem);
case baseDump: case baseDump:
return new BaseCollectorPlugin(this.fileSystem); return new BaseCollectorPlugin(this.fileSystem);
case gtr2Publications: case gtr2Publications:
return new Gtr2PublicationsCollectorPlugin(this.clientParams); return new Gtr2PublicationsCollectorPlugin(this.clientParams);
case osfPreprints: case osfPreprints:
return new OsfPreprintsCollectorPlugin(this.clientParams); return new OsfPreprintsCollectorPlugin(this.clientParams);
case other: case other:
final CollectorPlugin.NAME.OTHER_NAME plugin = Optional final CollectorPlugin.NAME.OTHER_NAME plugin = Optional
.ofNullable(this.api.getParams().get("other_plugin_type")) .ofNullable(this.api.getParams().get("other_plugin_type"))
.map(CollectorPlugin.NAME.OTHER_NAME::valueOf) .map(CollectorPlugin.NAME.OTHER_NAME::valueOf)
.orElseThrow(() -> new IllegalArgumentException("invalid other_plugin_type")); .orElseThrow(() -> new IllegalArgumentException("invalid other_plugin_type"));
switch (plugin) { switch (plugin) {
case mdstore_mongodb_dump: case mdstore_mongodb_dump:
return new MongoDbDumpCollectorPlugin(this.fileSystem); return new MongoDbDumpCollectorPlugin(this.fileSystem);
case mdstore_mongodb: case mdstore_mongodb:
return new MDStoreCollectorPlugin(); return new MDStoreCollectorPlugin();
default:
throw new UnknownCollectorPluginException("plugin is not managed: " + plugin);
}
default: default:
throw new UnknownCollectorPluginException("plugin is not managed: " + plugin); throw new UnknownCollectorPluginException("protocol is not managed: " + this.api.getProtocol());
}
default:
throw new UnknownCollectorPluginException("protocol is not managed: " + this.api.getProtocol());
} }
} }

View File

@ -31,17 +31,19 @@ public class OsfPreprintsCollectorPlugin implements CollectorPlugin {
final String baseUrl = api.getBaseUrl(); final String baseUrl = api.getBaseUrl();
final int pageSize = Optional final int pageSize = Optional
.ofNullable(api.getParams().get("pageSize")) .ofNullable(api.getParams().get("pageSize"))
.filter(StringUtils::isNotBlank) .filter(StringUtils::isNotBlank)
.map(s -> NumberUtils.toInt(s, PAGE_SIZE_VALUE_DEFAULT)) .map(s -> NumberUtils.toInt(s, PAGE_SIZE_VALUE_DEFAULT))
.orElse(PAGE_SIZE_VALUE_DEFAULT); .orElse(PAGE_SIZE_VALUE_DEFAULT);
if (StringUtils.isBlank(baseUrl)) { throw new CollectorException("Param 'baseUrl' is null or empty"); } if (StringUtils.isBlank(baseUrl)) {
throw new CollectorException("Param 'baseUrl' is null or empty");
}
final OsfPreprintsIterator it = new OsfPreprintsIterator(baseUrl, pageSize, getClientParams()); final OsfPreprintsIterator it = new OsfPreprintsIterator(baseUrl, pageSize, getClientParams());
return StreamSupport return StreamSupport
.stream(Spliterators.spliteratorUnknownSize(it, Spliterator.ORDERED), false); .stream(Spliterators.spliteratorUnknownSize(it, Spliterator.ORDERED), false);
} }
public HttpClientParams getClientParams() { public HttpClientParams getClientParams() {

View File

@ -34,9 +34,9 @@ public class OsfPreprintsIterator implements Iterator<String> {
private final Queue<String> recordQueue = new PriorityBlockingQueue<>(); private final Queue<String> recordQueue = new PriorityBlockingQueue<>();
public OsfPreprintsIterator( public OsfPreprintsIterator(
final String baseUrl, final String baseUrl,
final int pageSize, final int pageSize,
final HttpClientParams clientParams) { final HttpClientParams clientParams) {
this.clientParams = clientParams; this.clientParams = clientParams;
this.baseUrl = baseUrl; this.baseUrl = baseUrl;
@ -54,7 +54,8 @@ public class OsfPreprintsIterator implements Iterator<String> {
@Override @Override
public boolean hasNext() { public boolean hasNext() {
synchronized (this.recordQueue) { synchronized (this.recordQueue) {
while (this.recordQueue.isEmpty() && StringUtils.isNotBlank(this.currentUrl) && this.currentUrl.startsWith("http")) { while (this.recordQueue.isEmpty() && StringUtils.isNotBlank(this.currentUrl)
&& this.currentUrl.startsWith("http")) {
try { try {
this.currentUrl = downloadPage(this.currentUrl); this.currentUrl = downloadPage(this.currentUrl);
} catch (final CollectorException e) { } catch (final CollectorException e) {
@ -63,7 +64,9 @@ public class OsfPreprintsIterator implements Iterator<String> {
} }
} }
if (!this.recordQueue.isEmpty()) { return true; } if (!this.recordQueue.isEmpty()) {
return true;
}
return false; return false;
} }
@ -112,7 +115,9 @@ public class OsfPreprintsIterator implements Iterator<String> {
} }
private Document downloadUrl(final String url, final int attempt) throws CollectorException { private Document downloadUrl(final String url, final int attempt) throws CollectorException {
if (attempt > MAX_ATTEMPTS) { throw new CollectorException("Max Number of attempts reached, url:" + url); } if (attempt > MAX_ATTEMPTS) {
throw new CollectorException("Max Number of attempts reached, url:" + url);
}
if (attempt > 0) { if (attempt > 0) {
final int delay = (attempt * 5000); final int delay = (attempt * 5000);

View File

@ -79,16 +79,16 @@ public class PrepareAffiliationRelationsTest {
.getPath(); .getPath();
String crossrefAffiliationRelationPath = getClass() String crossrefAffiliationRelationPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror_old.json") .getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror_old.json")
.getPath(); .getPath();
String publisherAffiliationRelationPath = getClass() String publisherAffiliationRelationPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/publishers") .getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/publishers")
.getPath(); .getPath();
String publisherAffiliationRelationOldPath = getClass() String publisherAffiliationRelationOldPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/publichers_old") .getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/publichers_old")
.getPath(); .getPath();
String outputPath = workingDir.toString() + "/actionSet"; String outputPath = workingDir.toString() + "/actionSet";
@ -112,9 +112,8 @@ public class PrepareAffiliationRelationsTest {
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Relation) aa.getPayload())); .map(aa -> ((Relation) aa.getPayload()));
// count the number of relations // count the number of relations
assertEquals(150, tmp.count());// 18 + 24 *3 + 30 * 2 = assertEquals(150, tmp.count());// 18 + 24 *3 + 30 * 2 =
Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
dataset.createOrReplaceTempView("result"); dataset.createOrReplaceTempView("result");
@ -173,18 +172,16 @@ public class PrepareAffiliationRelationsTest {
+ IdentifierFactory.md5("https://ror.org/03265fv13") + "'") + IdentifierFactory.md5("https://ror.org/03265fv13") + "'")
.count()); .count());
Assertions Assertions
.assertEquals( .assertEquals(
3, execVerification 3, execVerification
.filter( .filter(
"source = '" + ID_PREFIX "source = '" + ID_PREFIX
+ IdentifierFactory + IdentifierFactory
.md5(CleaningFunctions.normalizePidValue("doi", "10.1007/3-540-47984-8_14")) .md5(CleaningFunctions.normalizePidValue("doi", "10.1007/3-540-47984-8_14"))
+ "' and target = '" + "20|ror_________::" + "' and target = '" + "20|ror_________::"
+ IdentifierFactory.md5("https://ror.org/00a0n9e72") + "'") + IdentifierFactory.md5("https://ror.org/00a0n9e72") + "'")
.count()); .count());
} }
} }

View File

@ -50,9 +50,10 @@ public class OsfPreprintsCollectorPluginTest {
@Test @Test
@Disabled @Disabled
void test_one() throws CollectorException { void test_one() throws CollectorException {
this.plugin.collect(this.api, new AggregatorReport()) this.plugin
.limit(1) .collect(this.api, new AggregatorReport())
.forEach(log::info); .limit(1)
.forEach(log::info);
} }
@Test @Test
@ -95,7 +96,8 @@ public class OsfPreprintsCollectorPluginTest {
final HttpConnector2 connector = new HttpConnector2(); final HttpConnector2 connector = new HttpConnector2();
try { try {
final String res = connector.getInputSource("https://api.osf.io/v2/preprints/ydtzx/contributors/?format=json"); final String res = connector
.getInputSource("https://api.osf.io/v2/preprints/ydtzx/contributors/?format=json");
System.out.println(res); System.out.println(res);
fail(); fail();
} catch (final Throwable e) { } catch (final Throwable e) {