[SKG-IF-EOSC] fixed issue in selecting relevant eosc results. Applied static mapping from old eoscDsId and new eoscDsId
This commit is contained in:
parent
4c7e24df81
commit
059b275a06
|
@ -85,13 +85,7 @@ public class FilterEntities implements Serializable {
|
|||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(workingDir + e.name());
|
||||
// result
|
||||
// .joinWith(filterIds, result.col("id").equalTo(filterIds.col("id")))
|
||||
// .map((MapFunction<Tuple2<R, Row>, R>) t2 -> t2._1(), Encoders.bean(resultClazz))
|
||||
// .write()
|
||||
// .mode(SaveMode.Overwrite)
|
||||
// .option("compression", "gzip")
|
||||
// .json(workingDir + e.name());
|
||||
|
||||
}
|
||||
|
||||
});
|
||||
|
|
|
@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -108,7 +109,8 @@ public class ReadDatasourceMasterDuplicateFromDB {
|
|||
String masterId = rs.getString("masterId");
|
||||
String masterName = rs.getString("masterName");
|
||||
if (duplicateId.startsWith("eosc")) {
|
||||
md.setEoscId(duplicateId.substring(duplicateId.lastIndexOf("::") + 2));
|
||||
final String eoscDsId = getEoscDsId(duplicateId);
|
||||
md.setEoscId(eoscDsId);
|
||||
md.setGraphId(OafMapperUtils.createOpenaireId(10, masterId, true));
|
||||
md.setGraphName(masterName);
|
||||
return md;
|
||||
|
@ -119,6 +121,195 @@ public class ReadDatasourceMasterDuplicateFromDB {
|
|||
}
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private static String getEoscDsId(String duplicateId) {
|
||||
String eoscDsId = duplicateId.substring(duplicateId.lastIndexOf("::") + 2);
|
||||
switch (eoscDsId) {
|
||||
case "eosc.blue-cloud.44fa8dba8ad3ed19445227940032f31c":
|
||||
eoscDsId = "eosc.blue-cloud.grsf";
|
||||
break;
|
||||
case "eosc.ror-org.24ef0000cfbf3ce7f3a40ba6b87e76ce":
|
||||
eoscDsId = "eosc.ror-org.ror";
|
||||
break;
|
||||
case "eosc.clarin-eric.2aad8ade139792a49b130b539e1bb144":
|
||||
eoscDsId = "eosc.clarin-eric.virtual_language_observatory";
|
||||
break;
|
||||
case "eosc.embl-ebi.e29a4e098afa05818957179f05d8e21d":
|
||||
eoscDsId = "eosc.embl-ebi.icr";
|
||||
break;
|
||||
case "eosc.cyfronet.b59c2171d05ed9fb9e70a86d544f42a3":
|
||||
eoscDsId = "eosc.cyfronet.rodbuk";
|
||||
break;
|
||||
case "eosc.eudat.9168f179ffab97584bf99a2729837545":
|
||||
eoscDsId = "eosc.eudat.b2safe";
|
||||
break;
|
||||
case "eosc.oxford_e-research_centre.21697de1a5b10b8eb5fad857edecf5c9":
|
||||
eoscDsId = "eosc.oxford_e-research_centre.fairsharing";
|
||||
break;
|
||||
case "eosc.inria.5923d0f31f0acda46cf4b592972284a2":
|
||||
eoscDsId = "eosc.inria.software_heritage_archive";
|
||||
break;
|
||||
case "eosc.rli.661cdfdc74561b8eb69583b8137799d2":
|
||||
eoscDsId = "eosc.rli.open_energy_platform";
|
||||
break;
|
||||
case "eosc.bbmri-eric.314cee7546a7489c2cc3ab79d34e2640":
|
||||
eoscDsId = "eosc.bbmri-eric.bbmri-eric_directory";
|
||||
break;
|
||||
case "eosc.ku_leuven.68bf19ae7ee1bc7e3872255e96550c04":
|
||||
eoscDsId = "eosc.ku_leuven.lirias";
|
||||
break;
|
||||
case "eosc.wenmr.d288225c333b07fc9d001da5c5392741":
|
||||
eoscDsId = "eosc.wenmr.madomsi3sobm";
|
||||
break;
|
||||
case "eosc.zpid.b96341f00ca4c3a314abcc07fc0084b2":
|
||||
eoscDsId = "eosc.zpid.psycharchives";
|
||||
break;
|
||||
case "eosc.vamdc.c967f669aa354e584e6786ee1d0c823e":
|
||||
eoscDsId = "eosc.vamdc.vamdc_portal";
|
||||
break;
|
||||
case "eosc.openaire.2bb8710e1870170a175110615698e677":
|
||||
eoscDsId = "eosc.openaire.openaire_scholexplorer";
|
||||
break;
|
||||
case "eosc.elixir-uk.5126ffcc8e23f65bbbe219d36128f2c8":
|
||||
eoscDsId = "eosc.elixir-uk.workflowhub";
|
||||
break;
|
||||
case "eosc.vliz.61c6dae33d794d477e6a68ed43f52eb3":
|
||||
eoscDsId = "eosc.vliz.worms";
|
||||
break;
|
||||
case "eosc.cern.8025243fa3c887159fc9b3930ae147c2":
|
||||
eoscDsId = "eosc.cern.cod";
|
||||
break;
|
||||
case "eosc.hits.901e9baaa76d72017ebd7dfd93436caf":
|
||||
eoscDsId = "eosc.hits.fairdomhub";
|
||||
break;
|
||||
case "eosc.bbmri-eric.8206c9aa93eb9513383218704570feb2":
|
||||
eoscDsId = "eosc.bbmri-eric.bbmri-eric_crc-cohort";
|
||||
break;
|
||||
case "eosc.hn.02e4d980399d7142506e8aadb2b8e865":
|
||||
eoscDsId = "eosc.hn.isidore";
|
||||
break;
|
||||
case "eosc.obsparis.9e98089baaf6af32fab3154873dfdfeb":
|
||||
eoscDsId = "eosc.obsparis::eosc.obsparis.padc";
|
||||
break;
|
||||
case "eosc.esrf.ecc74ab09791c52aa238ee77ae988874":
|
||||
eoscDsId = "eosc.esrf::eosc.esrf.tesrfdp";
|
||||
break;
|
||||
case "eosc.cessda-eric.7e17e8817404ce7a8013be373723b2be":
|
||||
eoscDsId = "eosc.cessda-eric.cdc";
|
||||
break;
|
||||
case "eosc.psi.f1a79f572f95bc2fbea5cdc40ef4eb22":
|
||||
eoscDsId = "eosc.psi.psi_public_data_repository";
|
||||
break;
|
||||
case "eosc.uniwersytet_opolski.19b44a96f7a776774de3939d9820d00c":
|
||||
eoscDsId = "eosc.uniwersytet_opolski.bk_uniopole";
|
||||
break;
|
||||
case "eosc.lindatclariah-cz.6dc98fcb5294282acf3d92f3ab3376b2":
|
||||
eoscDsId = "eosc.lindatclariah-cz.lindatclariah-cz_repository";
|
||||
break;
|
||||
|
||||
case "eosc.eudat.17bb7bb8ef1af0f9bdb55a7db30cfa8a":
|
||||
eoscDsId = "eosc.eudat.b2share";
|
||||
break;
|
||||
|
||||
case "eosc.acdh-ch.3b0149bee976d6db7eef053159e97a87":
|
||||
eoscDsId = "eosc.acdh-ch.arche";
|
||||
break;
|
||||
|
||||
case "eosc.uit.49e8d4cef23bda3b66dd417e6675727d":
|
||||
eoscDsId = "eosc.uit.trolling";
|
||||
break;
|
||||
|
||||
case "eosc.csuc.135887d3dea4b6723095d13c28dd52a3":
|
||||
eoscDsId = "eosc.csuc.corardr";
|
||||
break;
|
||||
|
||||
case "eosc.ccsd.06cdd3ff4700bb4c8e7bf22c14f23f5b":
|
||||
eoscDsId = "eosc.ccsd.episciences";
|
||||
break;
|
||||
|
||||
case "eosc.gbif.14ac40283813a624bd74ae82605ded23":
|
||||
eoscDsId = "eosc.gbif.gbif_species_occurrence_data";
|
||||
break;
|
||||
|
||||
case "eosc.gdansk_tech.1434de11c83986b5be5592677f28d171":
|
||||
eoscDsId = "eosc.gdansk_tech.most";
|
||||
break;
|
||||
|
||||
case "eosc.gwdg.d6521479ffa922bbccc839606b8ec7c5":
|
||||
eoscDsId = "eosc.gwdg.textgrid_repository";
|
||||
break;
|
||||
|
||||
case "eosc.unipd.12d35bb1f56d4b91bb4644faf76d9486":
|
||||
eoscDsId = "eosc.unipd.rdu";
|
||||
break;
|
||||
|
||||
case "eosc.unibi-ub.a61d9ea844bdf43e6feabd6b14dfe3c5":
|
||||
eoscDsId = "eosc.unibi-ub.pub";
|
||||
break;
|
||||
|
||||
case "eosc.scipedia.0063745e5964b19c3e9ceeb2bd6632f5":
|
||||
eoscDsId = "eosc.scipedia.spaosp";
|
||||
break;
|
||||
|
||||
case "eosc.psnc.6f0470e3bb9203ec3a7553f3a72a7a1f":
|
||||
eoscDsId = "eosc.psnc.rohub";
|
||||
break;
|
||||
|
||||
case "eosc.ill.d422cba59746f39d10bdfea5c9cf8511":
|
||||
eoscDsId = "eosc.ill.ill_data_portal";
|
||||
break;
|
||||
|
||||
case "eosc.ceric-eric.e9354332fd75190b935b80c1ba30b837":
|
||||
eoscDsId = "eosc.ceric-eric.ceric-data-portal";
|
||||
break;
|
||||
|
||||
case "eosc.cnr_-_isti.dbe89d2b83f3e29caab7923a51c1d151":
|
||||
eoscDsId = "eosc.cnr_-_isti.isti_open_portal";
|
||||
break;
|
||||
|
||||
case "eosc.lapp.ef0bb7d889d0cced364444495f7a1e67":
|
||||
eoscDsId = "eosc.lapp.ossr";
|
||||
break;
|
||||
|
||||
case "eosc.lida.26c1ee137e7510fd1d7e44eb87cdb4af":
|
||||
eoscDsId = "eosc.lida.lida_survey_data";
|
||||
break;
|
||||
|
||||
case "eosc.awi_bremerhaven.2882af227241cb956c28fe321a70dfb2":
|
||||
eoscDsId = "eosc.awi_bremerhaven.pangaea";
|
||||
break;
|
||||
|
||||
case "eosc.riga_stradins_university.4ea61809e753e65a459bbe4a492c773b":
|
||||
eoscDsId = "eosc.riga_stradins_university.rsu_dataverse";
|
||||
break;
|
||||
|
||||
case "eosc.ku_leuven.1cb0937dc41e70d8126d7b259ad470af":
|
||||
eoscDsId = "eosc.ku_leuven.ku_leuven_rdr";
|
||||
break;
|
||||
|
||||
case "eosc.dkrz.9ffffb05aaf22e7f9138dca4560a8c8b":
|
||||
eoscDsId = "eosc.dkrz.wdcc";
|
||||
break;
|
||||
|
||||
case "eosc.openaire.0a02f13310296033694acead588a773b":
|
||||
eoscDsId = "eosc.openaire.zenodo";
|
||||
break;
|
||||
|
||||
case "eosc.vilnius-university.1ec069c1620d49d460e4cbcec0af57f6":
|
||||
eoscDsId = "eosc.vilnius-university.tnoarda";
|
||||
break;
|
||||
|
||||
case "eosc.icos_eric.25c5f3f0674fb287e05e697263e211e2":
|
||||
eoscDsId = "eosc.icos_eric.data_discovery_and_access_portal";
|
||||
break;
|
||||
|
||||
case "eosc.fris.8f42bfccf70de38b01763b704300f882":
|
||||
eoscDsId = "eosc.fris.fris";
|
||||
break;
|
||||
}
|
||||
return eoscDsId;
|
||||
}
|
||||
|
||||
private static void writeMap(MasterDuplicate dm, BufferedWriter writer) {
|
||||
if (dm == null)
|
||||
return;
|
||||
|
|
|
@ -8,6 +8,7 @@ import java.util.Optional;
|
|||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.apache.spark.sql.functions.*;
|
||||
|
@ -67,27 +68,27 @@ public class SelectEOSCEntities implements Serializable {
|
|||
private static <R extends Result> void selectEntities(SparkSession spark, String inputPath, String filterPath) {
|
||||
ModelSupport.entityTypes.keySet().forEach(e -> {
|
||||
if (ModelSupport.isResult(e)) {
|
||||
// Utils
|
||||
// .readPath(spark, inputPath + e.name(), ModelSupport.entityTypes.get(e))
|
||||
// .filter(
|
||||
// (FilterFunction<R>) r -> !r.getDataInfo().getDeletedbyinference()
|
||||
// && !r.getDataInfo().getInvisible()
|
||||
// && (r.getContext().stream().anyMatch(c -> c.getId().equals("eosc")) ||
|
||||
// r
|
||||
// .getCollectedfrom()
|
||||
// .stream()
|
||||
// .anyMatch(cf -> cf.getValue().equalsIgnoreCase("B2FIND"))))
|
||||
// .map((MapFunction<R, String>) r -> r.getId(), Encoders.STRING())
|
||||
|
||||
spark
|
||||
.read()
|
||||
.schema(Encoders.bean(Result.class).schema())
|
||||
.json(inputPath + e.name())
|
||||
.where("datainfo.deletedbyinference != true and datainfo.invisible != true")
|
||||
.select("id", "context", "collectedfrom")
|
||||
.withColumn(
|
||||
"contexts",
|
||||
org.apache.spark.sql.functions
|
||||
.explode(
|
||||
org.apache.spark.sql.functions.col("context")))
|
||||
.selectExpr("id", "contexts.id as context", "collectedfrom")
|
||||
.where("context == 'eosc'")
|
||||
.drop("context")
|
||||
.withColumn(
|
||||
"collectedfroms", org.apache.spark.sql.functions
|
||||
.explode(
|
||||
org.apache.spark.sql.functions.col("collectedfrom")))
|
||||
.selectExpr("id", "collectedfroms.value as collectedfrom")
|
||||
.where("collectedfrom == 'B2FIND'")
|
||||
.drop("collectedfrom")
|
||||
.where("array_contains(context.id,'eosc') or array_contains(collectedfrom.value,'B2FIND')")
|
||||
.drop("context", "collectedfrom")
|
||||
.distinct()
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
|
|
|
@ -70,8 +70,6 @@ public class EmitFromEntities implements Serializable {
|
|||
|
||||
});
|
||||
}
|
||||
|
||||
// per ogni result emetto id + journal se esiste + istanza + hosted by dell'istanza
|
||||
public static <R extends Result> void emitFromResult(SparkSession spark, String inputPath, String outputPath,
|
||||
String workingDir) {
|
||||
|
||||
|
@ -80,9 +78,7 @@ public class EmitFromEntities implements Serializable {
|
|||
emitDatasourcePublisher(spark, inputPath, workingDir);
|
||||
|
||||
}
|
||||
|
||||
//the publisher is at the level of the result as well as the information for the journal. We do not know which instance
|
||||
// hostedby.key is the one for the journal
|
||||
|
||||
private static void emitDatasourcePublisher(SparkSession spark, String inputPath, String workingDir) {
|
||||
Dataset<Row> journalIds = spark
|
||||
.read()
|
||||
|
|
|
@ -62,7 +62,24 @@
|
|||
</property>
|
||||
</configuration>
|
||||
</global>
|
||||
<start to="dump_research_product"/>
|
||||
<start to="resumeFrom"/>
|
||||
|
||||
|
||||
<decision name="resumeFrom">
|
||||
<switch>
|
||||
<case to="get_ds_master_duplicate">${wf:conf('resumeFrom') eq "MapEoscDsIds"}</case>
|
||||
<default to="select_subset"/>
|
||||
</switch>
|
||||
</decision>
|
||||
|
||||
<decision name="select_subset">
|
||||
<switch>
|
||||
<!-- This one is if I hve to select the results as we do now for the eosc futere portal-->
|
||||
<case to="select_eosc_results">${wf:conf('filter') eq true}</case>
|
||||
<!-- This one takes the identifier of the results matching different criteria and computed outside this code-->
|
||||
<default to="filter"/>
|
||||
</switch>
|
||||
</decision>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
|
@ -72,7 +89,7 @@
|
|||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Selecting subset of results</name>
|
||||
<name>Selecting graph results ids relevant for EOSC</name>
|
||||
<class>eu.dnetlib.dhp.oa.graph.dump.filterentities.SelectEOSCEntities</class>
|
||||
<jar>dump-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
|
|
Loading…
Reference in New Issue