From 317a4a56ef9019bd4cc3280673ec34cc8c5b58d0 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 21 Jul 2022 17:37:48 +0200 Subject: [PATCH] [EOSC context TAG] first implementation of the logic to tag results imported from datasources registered in the EOSC --- .../dhp/bulktag/eosc/DatasourceMaster.java | 28 ++++ .../eosc/ReadMasterDatasourceFromDB.java | 134 ++++++++++++++++ .../dhp/bulktag/eosc/SparkEoscBulkTag.java | 145 ++++++++++++++++++ pom.xml | 2 +- 4 files changed, 308 insertions(+), 1 deletion(-) create mode 100644 dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/eosc/DatasourceMaster.java create mode 100644 dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/eosc/ReadMasterDatasourceFromDB.java create mode 100644 dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/eosc/SparkEoscBulkTag.java diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/eosc/DatasourceMaster.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/eosc/DatasourceMaster.java new file mode 100644 index 000000000..537356586 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/eosc/DatasourceMaster.java @@ -0,0 +1,28 @@ +package eu.dnetlib.dhp.bulktag.eosc; + +import java.io.Serializable; + +/** + * @author miriam.baglioni + * @Date 21/07/22 + */ +public class DatasourceMaster implements Serializable { + private String datasource; + private String master; + + public String getDatasource() { + return datasource; + } + + public void setDatasource(String datasource) { + this.datasource = datasource; + } + + public String getMaster() { + return master; + } + + public void setMaster(String master) { + this.master = master; + } +} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/eosc/ReadMasterDatasourceFromDB.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/eosc/ReadMasterDatasourceFromDB.java new file mode 100644 index 000000000..7d6964707 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/eosc/ReadMasterDatasourceFromDB.java @@ -0,0 +1,134 @@ + +package eu.dnetlib.dhp.bulktag.eosc; + +/** + * @author miriam.baglioni + * @Date 21/07/22 + */ + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.DbClient; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.common.RelationInverse; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.BufferedWriter; +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; + +public class ReadMasterDatasourceFromDB implements Closeable { + + private final DbClient dbClient; + private static final Log log = LogFactory.getLog(ReadMasterDatasourceFromDB.class); + + private final BufferedWriter writer; + private final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final String QUERY = "SELECT dso.id datasource, d.id master FROM " + + "(SELECT id FROM dsm_services WHERE id like 'eosc%') dso " + + "FULL JOIN " + + "(SELECT id, duplicate FROM dsm_dedup_services WHERE duplicate like 'eosc%')d " + + "ON dso.id = d.duplicate"; + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + ReadMasterDatasourceFromDB.class + .getResourceAsStream( + "/eu/dnetlib/dhp/blacklist/blacklist_parameters.json"))); + + parser.parseArgument(args); + + final String dbUrl = parser.get("postgresUrl"); + final String dbUser = parser.get("postgresUser"); + final String dbPassword = parser.get("postgresPassword"); + final String hdfsPath = parser.get("hdfsPath") + "/datasourceMasters"; + final String hdfsNameNode = parser.get("hdfsNameNode"); + + try (final ReadMasterDatasourceFromDB rmd = new ReadMasterDatasourceFromDB(hdfsPath, hdfsNameNode, dbUrl, dbUser, + dbPassword)) { + + log.info("Processing datasources..."); + rmd.execute(QUERY, rmd::datasourceMasterMap); + + } + } + + public void execute(final String sql, final Function producer) { + + dbClient.processResults(sql, rs -> writeMap(producer.apply(rs))); + } + + public DatasourceMaster datasourceMasterMap(ResultSet rs) { + try { + DatasourceMaster dm = new DatasourceMaster(); + String datasource = rs.getString("datasource"); + dm.setDatasource(datasource); + String master = rs.getString("master"); + if (StringUtils.isNotBlank(master)) + dm.setMaster(OafMapperUtils.createOpenaireId(10, master, true)); + else + dm.setMaster(OafMapperUtils.createOpenaireId(10, datasource, true)); + return dm; + + } catch (final SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws IOException { + dbClient.close(); + writer.close(); + } + + public ReadMasterDatasourceFromDB( + final String hdfsPath, String hdfsNameNode, final String dbUrl, final String dbUser, final String dbPassword) + throws IOException { + + this.dbClient = new DbClient(dbUrl, dbUser, dbPassword); + + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + + FileSystem fileSystem = FileSystem.get(conf); + Path hdfsWritePath = new Path(hdfsPath); + FSDataOutputStream fsDataOutputStream = null; + if (fileSystem.exists(hdfsWritePath)) { + fsDataOutputStream = fileSystem.append(hdfsWritePath); + } else { + fsDataOutputStream = fileSystem.create(hdfsWritePath); + } + + this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)); + } + + protected void writeMap(final DatasourceMaster dm) { + try { + writer.write(OBJECT_MAPPER.writeValueAsString(dm)); + writer.newLine(); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/eosc/SparkEoscBulkTag.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/eosc/SparkEoscBulkTag.java new file mode 100644 index 000000000..aa709cb29 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/eosc/SparkEoscBulkTag.java @@ -0,0 +1,145 @@ +package eu.dnetlib.dhp.bulktag.eosc; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.bulktag.SparkBulkTagJob; +import eu.dnetlib.dhp.bulktag.community.*; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Context; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.print.attribute.DocAttributeSet; +import java.io.Serializable; +import java.util.*; +import java.util.stream.Collectors; + +import static eu.dnetlib.dhp.PropagationConstant.removeOutputDir; +import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.*; +import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.TAGGING_TRUST; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS; + +/** + * @author miriam.baglioni + * @Date 21/07/22 + */ +public class SparkEoscBulkTag implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(SparkEoscBulkTag.class); + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkEoscBulkTag.class + .getResourceAsStream( + "/eu/dnetlib/dhp/bulktag/input_bulkTag_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + String datasourceMapPath = parser.get("datasourceMapPath"); + log.info("datasourceMapPath: {}", datasourceMapPath); + + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); + + Class resultClazz = (Class) Class.forName(resultClassName); + + SparkConf conf = new SparkConf(); + CommunityConfiguration cc; + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + execBulkTag(spark, inputPath, outputPath, datasourceMapPath, resultClazz); + }); + } + + private static void execBulkTag( + SparkSession spark, + String inputPath, + String outputPath, + String datasourceMapPath, + Class resultClazz) { + + final List hostedByList = Arrays.asList(readPath(spark, datasourceMapPath, DatasourceMaster.class) + .map((MapFunction) dm -> dm.getMaster(), Encoders.STRING()) + .collect()); + + readPath(spark, inputPath, resultClazz) + .map(patchResult(), Encoders.bean(resultClazz)) + .filter(Objects::nonNull) + .map( + (MapFunction) value -> enrich(value, hostedByList), + Encoders.bean(resultClazz)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + } + + private static R enrich(R value, List hostedByList) { + if(value.getInstance().stream().anyMatch(i -> hostedByList.contains(i.getHostedby().getKey())) || + (value.getEoscifguidelines() != null && value.getEoscifguidelines().size() > 0)){ + Context context = new Context(); + context.setId("eosc"); + OafMapperUtils.dataInfo(false, BULKTAG_DATA_INFO_TYPE,true,false, + OafMapperUtils.qualifier(CLASS_ID_DATASOURCE, CLASS_NAME_BULKTAG_DATASOURCE, + DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS), TAGGING_TRUST); + value.getContext().add(context); + + } + return value; + + } + + public static Dataset readPath( + SparkSession spark, String inputPath, Class clazz) { + return spark + .read() + .textFile(inputPath) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); + } + + // TODO remove this hack as soon as the values fixed by this method will be provided as NON null + private static MapFunction patchResult() { + return r -> { + if (r.getDataInfo().getDeletedbyinference() == null) { + r.getDataInfo().setDeletedbyinference(false); + } + if (r.getContext() == null) { + r.setContext(new ArrayList<>()); + } + return r; + }; + } + +} diff --git a/pom.xml b/pom.xml index 54070f654..821ce3124 100644 --- a/pom.xml +++ b/pom.xml @@ -801,7 +801,7 @@ 3.3.3 3.4.2 [2.12,3.0) - [2.12.0] + [2.12.1] [4.0.3] [6.0.5] [3.1.6]