diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java index c35fbb497f..98caa17416 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java @@ -19,6 +19,7 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.aggregation.common.ReporterCallback; import eu.dnetlib.dhp.aggregation.common.ReportingJob; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; +import eu.dnetlib.dhp.collection.plugin.base.BaseCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.file.FileCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.file.FileGZipCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.mongodb.MDStoreCollectorPlugin; @@ -120,6 +121,8 @@ public class CollectorWorker extends ReportingJob { return new FileCollectorPlugin(fileSystem); case fileGzip: return new FileGZipCollectorPlugin(fileSystem); + case baseDump: + return new BaseCollectorPlugin(this.fileSystem); case other: final CollectorPlugin.NAME.OTHER_NAME plugin = Optional .ofNullable(api.getParams().get("other_plugin_type")) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java index a19ca5c685..97d2d25854 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java @@ -10,7 +10,8 @@ import eu.dnetlib.dhp.common.collection.CollectorException; public interface CollectorPlugin { enum NAME { - oai, other, rest_json2xml, file, fileGzip; + + oai, other, rest_json2xml, file, fileGzip, baseDump; public enum OTHER_NAME { mdstore_mongodb_dump, mdstore_mongodb diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java new file mode 100644 index 0000000000..c41cc8f0f6 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java @@ -0,0 +1,171 @@ + +package eu.dnetlib.dhp.collection.plugin.base; + +import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.StringWriter; +import java.util.Iterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import javax.xml.stream.XMLEventReader; +import javax.xml.stream.XMLEventWriter; +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLOutputFactory; +import javax.xml.stream.events.EndElement; +import javax.xml.stream.events.StartElement; +import javax.xml.stream.events.XMLEvent; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.compressors.CompressorInputStream; +import org.apache.commons.compress.compressors.CompressorStreamFactory; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; + +public class BaseCollectorIterator implements Iterator { + + private String nextElement; + + private final BlockingQueue queue = new LinkedBlockingQueue<>(100); + + private static final Logger log = LoggerFactory.getLogger(BaseCollectorIterator.class); + + private static final String END_ELEM = "__END__"; + + public BaseCollectorIterator(final FileSystem fs, final Path filePath, final AggregatorReport report) { + new Thread(() -> importHadoopFile(fs, filePath, report)).start(); + try { + this.nextElement = this.queue.take(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + + protected BaseCollectorIterator(final String resourcePath, final AggregatorReport report) { + new Thread(() -> importTestFile(resourcePath, report)).start(); + try { + this.nextElement = this.queue.take(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public synchronized boolean hasNext() { + return (this.nextElement != null) & !END_ELEM.equals(this.nextElement); + } + + @Override + public synchronized String next() { + try { + return END_ELEM.equals(this.nextElement) ? null : this.nextElement; + } finally { + try { + this.nextElement = this.queue.take(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + + } + + private void importHadoopFile(final FileSystem fs, final Path filePath, final AggregatorReport report) { + log.info("I start to read the TAR stream"); + + try (InputStream origInputStream = fs.open(filePath); + final TarArchiveInputStream tarInputStream = new TarArchiveInputStream(origInputStream)) { + importTarStream(tarInputStream, report); + } catch (final Throwable e) { + throw new RuntimeException("Error processing BASE records", e); + } + } + + private void importTestFile(final String resourcePath, final AggregatorReport report) { + try (final InputStream origInputStream = BaseCollectorIterator.class.getResourceAsStream(resourcePath); + final TarArchiveInputStream tarInputStream = new TarArchiveInputStream(origInputStream)) { + importTarStream(tarInputStream, report); + } catch (final Throwable e) { + throw new RuntimeException("Error processing BASE records", e); + } + } + + private void importTarStream(final TarArchiveInputStream tarInputStream, final AggregatorReport report) { + long count = 0; + + final XMLInputFactory xmlInputFactory = XMLInputFactory.newInstance(); + final XMLOutputFactory xmlOutputFactory = XMLOutputFactory.newInstance(); + + try { + TarArchiveEntry entry; + while ((entry = (TarArchiveEntry) tarInputStream.getNextEntry()) != null) { + final String name = entry.getName(); + + if (!entry.isDirectory() && name.contains("ListRecords") && name.endsWith(".bz2")) { + + log.info("Processing file (BZIP): " + name); + + final byte[] bzipData = new byte[(int) entry.getSize()]; + IOUtils.readFully(tarInputStream, bzipData); + + try (InputStream bzipIs = new ByteArrayInputStream(bzipData); + final BufferedInputStream bzipBis = new BufferedInputStream(bzipIs); + final CompressorInputStream bzipInput = new CompressorStreamFactory() + .createCompressorInputStream(bzipBis)) { + + final XMLEventReader reader = xmlInputFactory.createXMLEventReader(bzipInput); + + XMLEventWriter eventWriter = null; + StringWriter xmlWriter = null; + + while (reader.hasNext()) { + final XMLEvent nextEvent = reader.nextEvent(); + + if (nextEvent.isStartElement()) { + final StartElement startElement = nextEvent.asStartElement(); + if ("record".equals(startElement.getName().getLocalPart())) { + xmlWriter = new StringWriter(); + eventWriter = xmlOutputFactory.createXMLEventWriter(xmlWriter); + } + } + + if (eventWriter != null) { + eventWriter.add(nextEvent); + } + + if (nextEvent.isEndElement()) { + final EndElement endElement = nextEvent.asEndElement(); + if ("record".equals(endElement.getName().getLocalPart())) { + eventWriter.flush(); + eventWriter.close(); + + this.queue.put(xmlWriter.toString()); + + eventWriter = null; + xmlWriter = null; + count++; + } + } + + } + } + } + } + + this.queue.put(END_ELEM); // TO INDICATE THE END OF THE QUEUE + } catch (final Throwable e) { + log.error("Error processing BASE records", e); + report.put(e.getClass().getName(), e.getMessage()); + throw new RuntimeException("Error processing BASE records", e); + } finally { + log.info("Total records (written in queue): " + count); + } + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorPlugin.java new file mode 100644 index 0000000000..0cd68b12a5 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorPlugin.java @@ -0,0 +1,152 @@ + +package eu.dnetlib.dhp.collection.plugin.base; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Optional; +import java.util.Set; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.DocumentHelper; +import org.dom4j.Node; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.collection.ApiDescriptor; +import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; +import eu.dnetlib.dhp.collection.plugin.file.AbstractSplittedRecordPlugin; +import eu.dnetlib.dhp.common.DbClient; +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; +import eu.dnetlib.dhp.common.collection.CollectorException; + +public class BaseCollectorPlugin implements CollectorPlugin { + + private final FileSystem fs; + + private static final Logger log = LoggerFactory.getLogger(AbstractSplittedRecordPlugin.class); + + // MAPPING AND FILTERING ARE DEFINED HERE: + // https://docs.google.com/document/d/1Aj-ZAV11b44MCrAAUCPiS2TUlXb6PnJEu1utCMAcCOU/edit + + public BaseCollectorPlugin(final FileSystem fs) { + this.fs = fs; + } + + @Override + public Stream collect(final ApiDescriptor api, final AggregatorReport report) throws CollectorException { + // get path to file + final Path filePath = Optional + .ofNullable(api.getBaseUrl()) + .map(Path::new) + .orElseThrow(() -> new CollectorException("missing baseUrl")); + + final String dbUrl = api.getParams().get("dbUrl"); + final String dbUser = api.getParams().get("dbUser"); + final String dbPassword = api.getParams().get("dbPassword"); + final String acceptedNormTypesString = api.getParams().get("acceptedNormTypes"); + + log.info("baseUrl: {}", filePath); + log.info("dbUrl: {}", dbUrl); + log.info("dbUser: {}", dbUser); + log.info("dbPassword: {}", "***"); + log.info("acceptedNormTypes: {}", acceptedNormTypesString); + + try { + if (!this.fs.exists(filePath)) { + throw new CollectorException("path does not exist: " + filePath); + } + } catch (final Throwable e) { + throw new CollectorException(e); + } + + final Set acceptedOpendoarIds = findAcceptedOpendoarIds(dbUrl, dbUser, dbPassword); + + final Set acceptedNormTypes = new HashSet<>(); + if (StringUtils.isNotBlank(acceptedNormTypesString)) { + for (final String s : StringUtils.split(acceptedNormTypesString, ",")) { + if (StringUtils.isNotBlank(s)) { + acceptedNormTypes.add(s.trim()); + } + } + } + + final Iterator iterator = new BaseCollectorIterator(this.fs, filePath, report); + final Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED); + return StreamSupport + .stream(spliterator, false) + .filter(doc -> filterXml(doc, acceptedOpendoarIds, acceptedNormTypes)); + } + + private Set findAcceptedOpendoarIds(final String dbUrl, final String dbUser, final String dbPassword) + throws CollectorException { + final Set accepted = new HashSet<>(); + + try (final DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) { + + final String sql = IOUtils + .toString( + getClass().getResourceAsStream("/eu/dnetlib/dhp/collection/plugin/base/sql/opendoar-accepted.sql")); + + dbClient.processResults(sql, row -> { + try { + final String dsId = row.getString("id"); + log.info("Accepted Datasource: " + dsId); + accepted.add(dsId); + } catch (final SQLException e) { + log.error("Error in SQL", e); + throw new RuntimeException("Error in SQL", e); + } + }); + + } catch (final IOException e) { + log.error("Error accessong SQL", e); + throw new CollectorException("Error accessong SQL", e); + } + + log.info("Accepted Datasources (TOTAL): " + accepted.size()); + + return accepted; + } + + protected static boolean filterXml(final String xml, + final Set acceptedOpendoarIds, + final Set acceptedNormTypes) { + try { + + final Document doc = DocumentHelper.parseText(xml); + + final String id = doc.valueOf("//*[local-name()='collection']/@opendoar_id").trim(); + + if (StringUtils.isBlank(id) || !acceptedOpendoarIds.contains("opendoar____::" + id)) { + return false; + } + + if (acceptedNormTypes.isEmpty()) { + return true; + } + + for (final Object s : doc.selectNodes("//*[local-name()='typenorm']")) { + if (acceptedNormTypes.contains(((Node) s).getText().trim())) { + return true; + } + } + + return false; + } catch (final DocumentException e) { + log.error("Error parsing document", e); + throw new RuntimeException("Error parsing document", e); + } + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/sql/base.sql b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/sql/base.sql new file mode 100644 index 0000000000..b9300f6a86 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/sql/base.sql @@ -0,0 +1,114 @@ +BEGIN; + +INSERT INTO dsm_services( + _dnet_resource_identifier_, + id, + officialname, + englishname, + namespaceprefix, + websiteurl, + logourl, + platform, + contactemail, + collectedfrom, + provenanceaction, + _typology_to_remove_, + eosc_type, + eosc_datasource_type, + research_entity_types, + thematic +) VALUES ( + 'openaire____::base_search', + 'openaire____::base_search', + 'Bielefeld Academic Search Engine (BASE)', + 'Bielefeld Academic Search Engine (BASE)', + 'base_search_', + 'https://www.base-search.net', + 'https://www.base-search.net/about/download/logo_224x57_white.gif', + 'BASE', + 'openaire-helpdesk@uni-bielefeld.de', + 'infrastruct_::openaire', + 'user:insert', + 'aggregator::pubsrepository::unknown', + 'Data Source', + 'Aggregator', + ARRAY['Research Products'], + false +); + +INSERT INTO dsm_service_organization( + _dnet_resource_identifier_, + organization, + service +) VALUES ( + 'fairsharing_::org::214@@openaire____::base_search', + 'fairsharing_::org::214', + 'openaire____::base_search' +); + +INSERT INTO dsm_api( + _dnet_resource_identifier_, + id, + service, + protocol, + baseurl, + metadata_identifier_path +) VALUES ( + 'api_________::openaire____::base_search::dump', + 'api_________::openaire____::base_search::dump', + 'openaire____::base_search', + 'baseDump', + '/user/michele.artini/base-import/base_oaipmh_dump-current.tar', + '//*[local-name()=''header'']/*[local-name()=''identifier'']' +); + + +INSERT INTO dsm_apiparams( + _dnet_resource_identifier_, + api, + param, + value +) VALUES ( + 'api_________::openaire____::base_search::dump@@dbUrl', + 'api_________::openaire____::base_search::dump', + 'dbUrl', + 'jdbc:postgresql://postgresql.services.openaire.eu:5432/dnet_openaireplus' +); + +INSERT INTO dsm_apiparams( + _dnet_resource_identifier_, + api, + param, + value +) VALUES ( + 'api_________::openaire____::base_search::dump@@dbUser', + 'api_________::openaire____::base_search::dump', + 'dbUser', + 'dnet' +); + +INSERT INTO dsm_apiparams( + _dnet_resource_identifier_, + api, + param, + value +) VALUES ( + 'api_________::openaire____::base_search::dump@@dbPassword', + 'api_________::openaire____::base_search::dump', + 'dbPassword', + '***' +); + +INSERT INTO dsm_apiparams( + _dnet_resource_identifier_, + api, + param, + value +) VALUES ( + 'api_________::openaire____::base_search::dump@@acceptedNormTypes', + 'api_________::openaire____::base_search::dump', + 'acceptedNormTypes', + '' +); + +COMMIT; \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/sql/opendoar-accepted.sql b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/sql/opendoar-accepted.sql new file mode 100644 index 0000000000..b7dd835ee0 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/sql/opendoar-accepted.sql @@ -0,0 +1,7 @@ +select s.id as id +from dsm_services s +where collectedfrom = 'openaire____::opendoar' +and jurisdiction = 'Institutional' +and s.id not in ( + select service from dsm_api where coalesce(compatibility_override, compatibility) like '%openaire%' or last_collection_total > 0 +); \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/sql/opendoar-aggregation-status.sql b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/sql/opendoar-aggregation-status.sql new file mode 100644 index 0000000000..7aeead8f33 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/sql/opendoar-aggregation-status.sql @@ -0,0 +1,11 @@ +select + s.id as id, + s.jurisdiction as jurisdiction, + array_remove(array_agg(a.id || ' (compliance: ' || coalesce(a.compatibility_override, a.compatibility, 'UNKNOWN') || ')@@@' || coalesce(a.last_collection_total, 0)), NULL) as aggregations +from + dsm_services s + join dsm_api a on (s.id = a.service) +where + collectedfrom = 'openaire____::opendoar' +group by + s.id; diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/xml/base-types.vocabulary.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/xml/base-types.vocabulary.xml new file mode 100644 index 0000000000..51f4ee6ca9 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/xml/base-types.vocabulary.xml @@ -0,0 +1,180 @@ + +
+ + + + + +
+ + + base:normalized_types + base:normalized_types + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + String + +
+ + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/xml/base2oaf.transformationRule.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/xml/base2oaf.transformationRule.xml new file mode 100644 index 0000000000..7a09fec2ea --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/xml/base2oaf.transformationRule.xml @@ -0,0 +1,298 @@ + +
+ + + + + +
+ + + + + + + + + + +
\ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/xml/base2odf.transformationRule.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/xml/base2odf.transformationRule.xml new file mode 100644 index 0000000000..917a351b3e --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/plugin/base/xml/base2odf.transformationRule.xml @@ -0,0 +1,322 @@ + +
+ + + + + +
+ + + + + + + + + + +
\ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectionInfo.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectionInfo.java new file mode 100644 index 0000000000..06dfe45e23 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectionInfo.java @@ -0,0 +1,38 @@ + +package eu.dnetlib.dhp.collection.plugin.base; + +import java.io.Serializable; + +public class BaseCollectionInfo implements Serializable { + + private static final long serialVersionUID = 5766333937429419647L; + + private String id; + private String opendoarId; + private String rorId; + + public String getId() { + return this.id; + } + + public void setId(final String id) { + this.id = id; + } + + public String getOpendoarId() { + return this.opendoarId; + } + + public void setOpendoarId(final String opendoarId) { + this.opendoarId = opendoarId; + } + + public String getRorId() { + return this.rorId; + } + + public void setRorId(final String rorId) { + this.rorId = rorId; + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java new file mode 100644 index 0000000000..7ffe4bb871 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java @@ -0,0 +1,184 @@ + +package eu.dnetlib.dhp.collection.plugin.base; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.dom4j.Attribute; +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.DocumentHelper; +import org.dom4j.Element; +import org.dom4j.Node; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; + +@Disabled +public class BaseCollectorIteratorTest { + + @Test + void testImportFile() throws Exception { + + long count = 0; + + final BaseCollectorIterator iterator = new BaseCollectorIterator("base-sample.tar", new AggregatorReport()); + + final Map> collections = new HashMap<>(); + final Map fields = new HashMap<>(); + final Set types = new HashSet<>(); + + while (iterator.hasNext()) { + + final Document record = DocumentHelper.parseText(iterator.next()); + + count++; + + if ((count % 1000) == 0) { + System.out.println("# Read records: " + count); + } + + // System.out.println(record.asXML()); + + for (final Object o : record.selectNodes("//*|//@*")) { + final String path = ((Node) o).getPath(); + + if (fields.containsKey(path)) { + fields.get(path).incrementAndGet(); + } else { + fields.put(path, new AtomicInteger(1)); + } + + if (o instanceof Element) { + final Element n = (Element) o; + + if ("collection".equals(n.getName())) { + final String collName = n.getText().trim(); + if (StringUtils.isNotBlank(collName) && !collections.containsKey(collName)) { + final Map collAttrs = new HashMap<>(); + for (final Object ao : n.attributes()) { + collAttrs.put(((Attribute) ao).getName(), ((Attribute) ao).getValue()); + } + collections.put(collName, collAttrs); + } + } else if ("type".equals(n.getName())) { + types.add(n.getText().trim()); + } + + } + } + + } + + final ObjectMapper mapper = new ObjectMapper(); + for (final Entry> e : collections.entrySet()) { + System.out.println(e.getKey() + ": " + mapper.writeValueAsString(e.getValue())); + + } + + for (final Entry e : fields.entrySet()) { + System.out.println(e.getKey() + ": " + e.getValue().get()); + + } + + System.out.println("TYPES: "); + for (final String s : types) { + System.out.println(s); + + } + + assertEquals(30000, count); + } + + @Test + public void testParquet() throws Exception { + + final String xml = IOUtils.toString(getClass().getResourceAsStream("record.xml")); + + final SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate(); + + final List ls = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + ls.add(extractInfo(xml)); + } + + final JavaRDD rdd = JavaSparkContext + .fromSparkContext(spark.sparkContext()) + .parallelize(ls); + + final Dataset df = spark + .createDataset(rdd.rdd(), Encoders.bean(BaseRecordInfo.class)); + + df.printSchema(); + + df.show(false); + } + + private BaseRecordInfo extractInfo(final String s) { + try { + final Document record = DocumentHelper.parseText(s); + + final BaseRecordInfo info = new BaseRecordInfo(); + + final Set paths = new LinkedHashSet<>(); + final Set types = new LinkedHashSet<>(); + final List colls = new ArrayList<>(); + + for (final Object o : record.selectNodes("//*|//@*")) { + paths.add(((Node) o).getPath()); + + if (o instanceof Element) { + final Element n = (Element) o; + + final String nodeName = n.getName(); + + if ("collection".equals(nodeName)) { + final String collName = n.getText().trim(); + + if (StringUtils.isNotBlank(collName)) { + final BaseCollectionInfo coll = new BaseCollectionInfo(); + coll.setId(collName); + coll.setOpendoarId(n.valueOf("@opendoar_id").trim()); + coll.setRorId(n.valueOf("@ror_id").trim()); + colls.add(coll); + } + } else if ("type".equals(nodeName)) { + types.add("TYPE: " + n.getText().trim()); + } else if ("typenorm".equals(nodeName)) { + types.add("TYPE_NORM: " + n.getText().trim()); + } + } + } + + info.setId(record.valueOf("//*[local-name() = 'header']/*[local-name() = 'identifier']").trim()); + info.getTypes().addAll(types); + info.getPaths().addAll(paths); + info.setCollections(colls); + + return info; + } catch (final DocumentException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorPluginTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorPluginTest.java new file mode 100644 index 0000000000..d3b08f212f --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorPluginTest.java @@ -0,0 +1,32 @@ + +package eu.dnetlib.dhp.collection.plugin.base; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.io.IOUtils; +import org.junit.jupiter.api.Test; + +class BaseCollectorPluginTest { + + @Test + void testFilterXml() throws Exception { + final String xml = IOUtils.toString(getClass().getResourceAsStream("record.xml")); + + final Set validIds = new HashSet<>(Arrays.asList("opendoar____::1234", "opendoar____::4567")); + final Set validTypes = new HashSet<>(Arrays.asList("1", "121")); + final Set validTypes2 = new HashSet<>(Arrays.asList("1", "11")); + + assertTrue(BaseCollectorPlugin.filterXml(xml, validIds, validTypes)); + assertTrue(BaseCollectorPlugin.filterXml(xml, validIds, new HashSet<>())); + + assertFalse(BaseCollectorPlugin.filterXml(xml, new HashSet<>(), validTypes)); + assertFalse(BaseCollectorPlugin.filterXml(xml, validIds, validTypes2)); + + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseRecordInfo.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseRecordInfo.java new file mode 100644 index 0000000000..0fe6175a74 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseRecordInfo.java @@ -0,0 +1,49 @@ + +package eu.dnetlib.dhp.collection.plugin.base; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class BaseRecordInfo implements Serializable { + + private static final long serialVersionUID = -8848232018350074593L; + + private String id; + private List collections = new ArrayList<>(); + private List paths = new ArrayList<>(); + private List types = new ArrayList<>(); + + public String getId() { + return this.id; + } + + public void setId(final String id) { + this.id = id; + } + + public List getPaths() { + return this.paths; + } + + public void setPaths(final List paths) { + this.paths = paths; + } + + public List getTypes() { + return this.types; + } + + public void setTypes(final List types) { + this.types = types; + } + + public List getCollections() { + return this.collections; + } + + public void setCollections(final List collections) { + this.collections = collections; + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseTransfomationTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseTransfomationTest.java new file mode 100644 index 0000000000..f4539014d9 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseTransfomationTest.java @@ -0,0 +1,78 @@ + +package eu.dnetlib.dhp.collection.plugin.base; + +import java.io.IOException; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.util.LongAccumulator; +import org.dom4j.io.SAXReader; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest; +import eu.dnetlib.dhp.aggregation.common.AggregationCounter; +import eu.dnetlib.dhp.schema.mdstore.MetadataRecord; +import eu.dnetlib.dhp.schema.mdstore.Provenance; +import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; + +@Disabled +@ExtendWith(MockitoExtension.class) +public class BaseTransfomationTest extends AbstractVocabularyTest { + + private SparkConf sparkConf; + + @BeforeEach + public void setUp() throws IOException, ISLookUpException { + setUpVocabulary(); + + this.sparkConf = new SparkConf(); + this.sparkConf.setMaster("local[*]"); + this.sparkConf.set("spark.driver.host", "localhost"); + this.sparkConf.set("spark.ui.enabled", "false"); + } + + @Test + void testBase2ODF() throws Exception { + + final MetadataRecord mr = new MetadataRecord(); + mr.setProvenance(new Provenance("DSID", "DSNAME", "PREFIX")); + mr.setBody(IOUtils.toString(getClass().getResourceAsStream("record.xml"))); + + final XSLTTransformationFunction tr = loadTransformationRule("xml/base2odf.transformationRule.xml"); + + final MetadataRecord result = tr.call(mr); + + System.out.println(result.getBody()); + } + + @Test + void testBase2OAF() throws Exception { + + final MetadataRecord mr = new MetadataRecord(); + mr.setProvenance(new Provenance("DSID", "DSNAME", "PREFIX")); + mr.setBody(IOUtils.toString(getClass().getResourceAsStream("record.xml"))); + + final XSLTTransformationFunction tr = loadTransformationRule("xml/base2oaf.transformationRule.xml"); + + final MetadataRecord result = tr.call(mr); + + System.out.println(result.getBody()); + } + + private XSLTTransformationFunction loadTransformationRule(final String path) throws Exception { + final String xslt = new SAXReader() + .read(this.getClass().getResourceAsStream(path)) + .selectSingleNode("//CODE/*") + .asXML(); + + final LongAccumulator la = new LongAccumulator(); + + return new XSLTTransformationFunction(new AggregationCounter(la, la, la), xslt, 0, this.vocabularies); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/base/base-sample.tar b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/base/base-sample.tar new file mode 100644 index 0000000000..c575fe1471 Binary files /dev/null and b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/base/base-sample.tar differ diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/base/record.xml b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/base/record.xml new file mode 100644 index 0000000000..9c58941fdb --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/plugin/base/record.xml @@ -0,0 +1,58 @@ + +
+ ftdoajarticles:oai:doaj.org/article:e2d5b5126b2d4e479933cc7f9a9ae0c1 + 2022-12-31T11:48:55Z +
+ + + ftdoajarticles:oai:doaj.org/article:e2d5b5126b2d4e479933cc7f9a9ae0c1 + cww + org + ftdoajarticles + TEST REPO + Assessment of cultural heritage: the legislative and methodological framework of Russian Federation + ALBU, Svetlana + LEȘAN, Anna + architectural heritage + evaluation of architectural heritage + types of values + experience of russian federation + Social Sciences + H + Architectural heritage is the real estate inheritance by population of a country becoming an extremely valuable and specific category, preserving and capitalizing on those assets requires considerable effort. The state does not have sufficient means to maintain and preserve cultural heritage, as a result it is included in the civil circuit. The transfer of property right or of some partial rights over the architectural patrimony is accompanied by the necessity to estimate the value of goods. In this article, the authors examine the experience of Russian Federation (one of the largest countries with a huge architectural heritage) on the legislative framework of architectural and methodological heritage of architectural heritage assessment. The particularities of cultural assets valuation compared to other categories of real estate are examined, as well as the methodological aspects (types of values, methods applied in valuation, approaches according to the purpose of valuation) regarding the valuation of real estate with architectural value in Russian Federation. + Technical University of Moldova + 2020-09-01T00:00:00Z + 2020 + article + 121 + https://doi.org/10.5281/zenodo.3971988 + https://doaj.org/article/e2d5b5126b2d4e479933cc7f9a9ae0c1 + https://doi.org/10.5281/zenodo.3971988 + Journal of Social Sciences, Vol 3, Iss 3, Pp 134-143 (2020) + EN + FR + RO + http://ibn.idsi.md/sites/default/files/imag_file/JSS-3-2020_134-143.pdf + https://doaj.org/toc/2587-3490 + https://doaj.org/toc/2587-3504 + doi:10.5281/zenodo.3971988 + 2587-3490 + 2587-3504 + https://doaj.org/article/e2d5b5126b2d4e479933cc7f9a9ae0c1 + 720 + + ALBU, Svetlana + https://orcid.org/0000-0002-8648-950X + + + LEȘAN, Anna + https://orcid.org/0000-0003-3284-0525 + + https://doi.org/10.5281/zenodo.3971988 + 1 + eng + fre + rum + + +
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/synonyms.txt b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/synonyms.txt index 74a75e46a7..6b7979a6b4 100644 --- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/synonyms.txt +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/synonyms.txt @@ -1496,4 +1496,30 @@ cnr:institutes @=@ __CDS131__ @=@ IBE - Istituto per la BioEconomia cnr:institutes @=@ https://ror.org/0263zy895 @=@ CDS132 cnr:institutes @=@ https://ror.org/0263zy895 @=@ SCITEC - Istituto di Scienze e Tecnologie Chimiche \"Giulio Natta\" cnr:institutes @=@ __CDS133__ @=@ CDS133 -cnr:institutes @=@ __CDS133__ @=@ STEMS - Istituto di Scienze e Tecnologie per l'Energia e la Mobilità Sostenibili \ No newline at end of file +cnr:institutes @=@ __CDS133__ @=@ STEMS - Istituto di Scienze e Tecnologie per l'Energia e la Mobilità Sostenibili +base:normalized_types @=@ Text @=@ 1 +base:normalized_types @=@ Book @=@ 11 +base:normalized_types @=@ Book part @=@ 111 +base:normalized_types @=@ Journal/Newspaper @=@ 12 +base:normalized_types @=@ Article contribution @=@ 121 +base:normalized_types @=@ Other non-article @=@ 122 +base:normalized_types @=@ Conference object @=@ 13 +base:normalized_types @=@ Report @=@ 14 +base:normalized_types @=@ Review @=@ 15 +base:normalized_types @=@ Course material @=@ 16 +base:normalized_types @=@ Lecture @=@ 17 +base:normalized_types @=@ Thesis @=@ 18 +base:normalized_types @=@ Bachelor's thesis @=@ 181 +base:normalized_types @=@ Master's thesis @=@ 182 +base:normalized_types @=@ Doctoral and postdoctoral thesis @=@ 183 +base:normalized_types @=@ Manuscript @=@ 19 +base:normalized_types @=@ Patent @=@ 1A +base:normalized_types @=@ Musical notation @=@ 2 +base:normalized_types @=@ Map @=@ 3 +base:normalized_types @=@ Audio @=@ 4 +base:normalized_types @=@ Image/Video @=@ 5 +base:normalized_types @=@ Still image @=@ 51 +base:normalized_types @=@ Moving image/Video @=@ 52 +base:normalized_types @=@ Software @=@ 6 +base:normalized_types @=@ Dataset @=@ 7 +base:normalized_types @=@ Unknown @=@ F diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/terms.txt b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/terms.txt index 0f0ebaad42..29e42ffd9a 100644 --- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/terms.txt +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/terms.txt @@ -1210,4 +1210,29 @@ cnr:institutes @=@ cnr:institutes @=@ __CDS130__ @=@ __CDS130__ cnr:institutes @=@ cnr:institutes @=@ __CDS131__ @=@ __CDS131__ cnr:institutes @=@ cnr:institutes @=@ https://ror.org/0263zy895 @=@ https://ror.org/0263zy895 cnr:institutes @=@ cnr:institutes @=@ __CDS133__ @=@ __CDS133__ - +base:normalized_types @=@ base:normalized_types @=@ Text @=@ Text +base:normalized_types @=@ base:normalized_types @=@ Book @=@ Book +base:normalized_types @=@ base:normalized_types @=@ Book part @=@ Book part +base:normalized_types @=@ base:normalized_types @=@ Journal/Newspaper @=@ Journal/Newspaper +base:normalized_types @=@ base:normalized_types @=@ Article contribution @=@ Article contribution +base:normalized_types @=@ base:normalized_types @=@ Other non-article @=@ Other non-article +base:normalized_types @=@ base:normalized_types @=@ Conference object @=@ Conference object +base:normalized_types @=@ base:normalized_types @=@ Report @=@ Report +base:normalized_types @=@ base:normalized_types @=@ Review @=@ Review +base:normalized_types @=@ base:normalized_types @=@ Course material @=@ Course material +base:normalized_types @=@ base:normalized_types @=@ Lecture @=@ Lecture +base:normalized_types @=@ base:normalized_types @=@ Thesis @=@ Thesis +base:normalized_types @=@ base:normalized_types @=@ Bachelor's thesis @=@ Bachelor's thesis +base:normalized_types @=@ base:normalized_types @=@ Master's thesis @=@ Master's thesis +base:normalized_types @=@ base:normalized_types @=@ Doctoral and postdoctoral thesis @=@ Doctoral and postdoctoral thesis +base:normalized_types @=@ base:normalized_types @=@ Manuscript @=@ Manuscript +base:normalized_types @=@ base:normalized_types @=@ Patent @=@ Patent +base:normalized_types @=@ base:normalized_types @=@ Musical notation @=@ Musical notation +base:normalized_types @=@ base:normalized_types @=@ Map @=@ Map +base:normalized_types @=@ base:normalized_types @=@ Audio @=@ Audio +base:normalized_types @=@ base:normalized_types @=@ Image/Video @=@ Image/Video +base:normalized_types @=@ base:normalized_types @=@ Still image @=@ Still image +base:normalized_types @=@ base:normalized_types @=@ Moving image/Video @=@ Moving image/Video +base:normalized_types @=@ base:normalized_types @=@ Software @=@ Software +base:normalized_types @=@ base:normalized_types @=@ Dataset @=@ Dataset +base:normalized_types @=@ base:normalized_types @=@ Unknown @=@ Unknown \ No newline at end of file