diff --git a/dhp-build/dhp-build-properties-maven-plugin/pom.xml b/dhp-build/dhp-build-properties-maven-plugin/pom.xml index 9a2d0ffa0..308d78715 100644 --- a/dhp-build/dhp-build-properties-maven-plugin/pom.xml +++ b/dhp-build/dhp-build-properties-maven-plugin/pom.xml @@ -76,6 +76,41 @@ + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + + org.apache.maven.plugins + + + maven-plugin-plugin + + + [3.2,) + + + descriptor + + + + + + + + + + + + diff --git a/dhp-workflows/dhp-aggregation/pom.xml b/dhp-workflows/dhp-aggregation/pom.xml index a1db4ad2e..78831073f 100644 --- a/dhp-workflows/dhp-aggregation/pom.xml +++ b/dhp-workflows/dhp-aggregation/pom.xml @@ -25,6 +25,12 @@ dhp-common ${project.version} + + + eu.dnetlib.dhp + dhp-schemas + ${project.version} + net.sf.saxon @@ -44,6 +50,17 @@ jaxen jaxen + + + org.mongodb + mongo-java-driver + + + + org.postgresql + postgresql + 42.2.10 + org.mockito diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMigrationExecutor.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMigrationExecutor.java new file mode 100644 index 000000000..e91a53045 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMigrationExecutor.java @@ -0,0 +1,236 @@ +package eu.dnetlib.dhp.migration; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.codehaus.jackson.map.ObjectMapper; + +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.ExtraInfo; +import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.Journal; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.OAIProvenance; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.OriginDescription; +import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import eu.dnetlib.dhp.utils.DHPUtils; + +public class AbstractMigrationExecutor implements Closeable { + + private final AtomicInteger counter = new AtomicInteger(0); + + private final Text key = new Text(); + + private final Text value = new Text(); + + private final ObjectMapper objectMapper = new ObjectMapper(); + + private final SequenceFile.Writer writer; + + private static final Log log = LogFactory.getLog(AbstractMigrationExecutor.class); + + public AbstractMigrationExecutor(final String hdfsPath, final String hdfsNameNode, final String hdfsUser) throws Exception { + + log.info(String.format("Creating SequenceFile Writer, hdfsPath=%s, nameNode=%s, user=%s", hdfsPath, hdfsNameNode, hdfsUser)); + + this.writer = SequenceFile.createWriter(getConf(hdfsNameNode, hdfsUser), SequenceFile.Writer.file(new Path(hdfsPath)), SequenceFile.Writer + .keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class)); + } + + private Configuration getConf(final String hdfsNameNode, final String hdfsUser) throws IOException { + final Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); + conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); + System.setProperty("HADOOP_USER_NAME", hdfsUser); + System.setProperty("hadoop.home.dir", "/"); + FileSystem.get(URI.create(hdfsNameNode), conf); + return conf; + } + + protected void emitOaf(final Oaf oaf) { + try { + key.set(counter.getAndIncrement() + ":" + oaf.getClass().getSimpleName().toLowerCase()); + value.set(objectMapper.writeValueAsString(oaf)); + writer.append(key, value); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws IOException { + writer.hflush(); + writer.close(); + } + + public static KeyValue keyValue(final String k, final String v) { + final KeyValue kv = new KeyValue(); + kv.setKey(k); + kv.setValue(v); + return kv; + } + + public static List listKeyValues(final String... s) { + if (s.length % 2 > 0) { throw new RuntimeException("Invalid number of parameters (k,v,k,v,....)"); } + + final List list = new ArrayList<>(); + for (int i = 0; i < s.length; i += 2) { + list.add(keyValue(s[i], s[i + 1])); + } + return list; + } + + public static Field field(final T value, final DataInfo info) { + if (value == null || StringUtils.isBlank(value.toString())) { return null; } + + final Field field = new Field<>(); + field.setValue(value); + field.setDataInfo(info); + return field; + } + + public static List> listFields(final DataInfo info, final String... values) { + return Arrays.stream(values).map(v -> field(v, info)).filter(Objects::nonNull).collect(Collectors.toList()); + } + + public static List> listFields(final DataInfo info, final List values) { + return values.stream().map(v -> field(v, info)).filter(Objects::nonNull).collect(Collectors.toList()); + } + + public static Qualifier qualifier(final String classid, final String classname, final String schemeid, final String schemename) { + final Qualifier q = new Qualifier(); + q.setClassid(classid); + q.setClassname(classname); + q.setSchemeid(schemeid); + q.setSchemename(schemename); + return q; + } + + public static StructuredProperty structuredProperty(final String value, + final String classid, + final String classname, + final String schemeid, + final String schemename, + final DataInfo dataInfo) { + + return structuredProperty(value, qualifier(classid, classname, schemeid, schemename), dataInfo); + } + + public static StructuredProperty structuredProperty(final String value, final Qualifier qualifier, final DataInfo dataInfo) { + if (value == null) { return null; } + final StructuredProperty sp = new StructuredProperty(); + sp.setValue(value); + sp.setQualifier(qualifier); + sp.setDataInfo(dataInfo); + return sp; + } + + public static ExtraInfo extraInfo(final String name, final String value, final String typology, final String provenance, final String trust) { + final ExtraInfo info = new ExtraInfo(); + info.setName(name); + info.setValue(value); + info.setTypology(typology); + info.setProvenance(provenance); + info.setTrust(trust); + return info; + } + + public static OAIProvenance oaiIProvenance(final String identifier, + final String baseURL, + final String metadataNamespace, + final Boolean altered, + final String datestamp, + final String harvestDate) { + + final OriginDescription desc = new OriginDescription(); + desc.setIdentifier(identifier); + desc.setBaseURL(baseURL); + desc.setMetadataNamespace(metadataNamespace); + desc.setAltered(altered); + desc.setDatestamp(datestamp); + desc.setHarvestDate(harvestDate); + + final OAIProvenance p = new OAIProvenance(); + p.setOriginDescription(desc); + + return p; + } + + public static Journal journal(final String name, + final String issnPrinted, + final String issnOnline, + final String issnLinking, + final String ep, + final String iss, + final String sp, + final String vol, + final String edition, + final String conferenceplace, + final String conferencedate, + final DataInfo dataInfo) { + + if (StringUtils.isNotBlank(name) || StringUtils.isNotBlank(issnPrinted) || StringUtils.isNotBlank(issnOnline) || StringUtils.isNotBlank(issnLinking)) { + final Journal j = new Journal(); + j.setName(name); + j.setIssnPrinted(issnPrinted); + j.setIssnOnline(issnOnline); + j.setIssnLinking(issnLinking); + j.setEp(ep); + j.setIss(iss); + j.setSp(sp); + j.setVol(vol); + j.setEdition(edition); + j.setConferenceplace(conferenceplace); + j.setConferencedate(conferencedate); + j.setDataInfo(dataInfo); + return j; + } else { + return null; + } + } + + public static DataInfo dataInfo(final Boolean deletedbyinference, + final String inferenceprovenance, + final Boolean inferred, + final Boolean invisible, + final Qualifier provenanceaction, + final String trust) { + final DataInfo d = new DataInfo(); + d.setDeletedbyinference(deletedbyinference); + d.setInferenceprovenance(inferenceprovenance); + d.setInferred(inferred); + d.setInvisible(invisible); + d.setProvenanceaction(provenanceaction); + d.setTrust(trust); + return d; + } + + public static String createOpenaireId(final int prefix, final String originalId) { + final String nsPrefix = StringUtils.substringBefore(originalId, "::"); + final String rest = StringUtils.substringAfter(originalId, "::"); + return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5(rest)); + + } + + public static String asString(final Object o) { + return o == null ? "" : o.toString(); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMongoExecutor.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMongoExecutor.java new file mode 100644 index 000000000..00d1aa60d --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMongoExecutor.java @@ -0,0 +1,439 @@ +package eu.dnetlib.dhp.migration; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.DocumentFactory; +import org.dom4j.DocumentHelper; +import org.dom4j.Node; + +import eu.dnetlib.dhp.schema.oaf.Author; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.GeoLocation; +import eu.dnetlib.dhp.schema.oaf.Instance; +import eu.dnetlib.dhp.schema.oaf.Journal; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.OAIProvenance; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.Software; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; + +public abstract class AbstractMongoExecutor extends AbstractMigrationExecutor { + + protected final Map code2name = new HashMap<>(); + + protected final MdstoreClient mdstoreClient; + + protected static final Qualifier MAIN_TITLE_QUALIFIER = qualifier("main title", "main title", "dnet:dataCite_title", "dnet:dataCite_title"); + + protected static final Qualifier PUBLICATION_RESULTTYPE_QUALIFIER = + qualifier("publication", "publication", "dnet:result_typologies", "dnet:result_typologies"); + protected static final Qualifier DATASET_RESULTTYPE_QUALIFIER = qualifier("dataset", "dataset", "dnet:result_typologies", "dnet:result_typologies"); + protected static final Qualifier SOFTWARE_RESULTTYPE_QUALIFIER = qualifier("software", "software", "dnet:result_typologies", "dnet:result_typologies"); + protected static final Qualifier OTHER_RESULTTYPE_QUALIFIER = qualifier("other", "other", "dnet:result_typologies", "dnet:result_typologies"); + + private static final Log log = LogFactory.getLog(AbstractMongoExecutor.class); + + public AbstractMongoExecutor(final String hdfsPath, final String hdfsNameNode, final String hdfsUser, final String mongoBaseUrl, + final String mongoDb, final String dbUrl, final String dbUser, + final String dbPassword) throws Exception { + + super(hdfsPath, hdfsNameNode, hdfsUser); + + this.mdstoreClient = new MdstoreClient(mongoBaseUrl, mongoDb); + loadClassNames(dbUrl, dbUser, dbPassword); + + final Map nsContext = new HashMap<>(); + + registerNamespaces(nsContext); + + DocumentFactory.getInstance().setXPathNamespaceURIs(nsContext); + } + + private void loadClassNames(final String dbUrl, final String dbUser, final String dbPassword) throws IOException { + + log.info("Loading vocabulary terms from db..."); + + try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) { + code2name.clear(); + dbClient.processResults("select code, name from class", rs -> { + try { + code2name.put(rs.getString("code"), rs.getString("name")); + } catch (final SQLException e) { + e.printStackTrace(); + } + }); + } + + log.info("Found " + code2name.size() + " terms."); + + } + + public void processMdRecords(final String mdFormat, final String mdLayout, final String mdInterpretation) throws DocumentException { + + log.info(String.format("Searching mdstores (format: %s, layout: %s, interpretation: %s)", mdFormat, mdLayout, mdInterpretation)); + + final Map colls = mdstoreClient.validCollections(mdFormat, mdLayout, mdInterpretation); + log.info("Found " + colls.size() + " mdstores"); + + for (final Entry entry : colls.entrySet()) { + log.info("Processing mdstore " + entry.getKey() + " (collection: " + entry.getValue() + ")"); + final String currentColl = entry.getValue(); + + for (final String xml : mdstoreClient.listRecords(currentColl)) { + final Document doc = DocumentHelper.parseText(xml); + + final String type = doc.valueOf("//dr:CobjCategory/@type"); + final KeyValue collectedFrom = keyValue(doc.valueOf("//oaf:collectedFrom/@id"), doc.valueOf("//oaf:collectedFrom/@name")); + final KeyValue hostedBy = StringUtils.isBlank(doc.valueOf("//oaf:hostedBy/@id")) ? collectedFrom + : keyValue(doc.valueOf("//oaf:hostedBy/@id"), doc.valueOf("//oaf:hostedBy/@name")); + + final DataInfo info = prepareDataInfo(doc); + final long lastUpdateTimestamp = new Date().getTime(); + + for (final Oaf oaf : createOafs(doc, type, collectedFrom, hostedBy, info, lastUpdateTimestamp)) { + emitOaf(oaf); + } + } + } + log.info("All Done."); + } + + protected void registerNamespaces(final Map nsContext) { + nsContext.put("dr", "http://www.driver-repository.eu/namespace/dr"); + nsContext.put("dri", "http://www.driver-repository.eu/namespace/dri"); + nsContext.put("oaf", "http://namespace.openaire.eu/oaf"); + nsContext.put("oai", "http://www.openarchives.org/OAI/2.0/"); + nsContext.put("prov", "http://www.openarchives.org/OAI/2.0/provenance"); + } + + protected List createOafs(final Document doc, + final String type, + final KeyValue collectedFrom, + final KeyValue hostedBy, + final DataInfo info, + final long lastUpdateTimestamp) { + + final List oafs = new ArrayList<>(); + + switch (type.toLowerCase()) { + case "": + case "publication": + final Publication p = new Publication(); + populateResultFields(p, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp); + p.setResulttype(PUBLICATION_RESULTTYPE_QUALIFIER); + p.setJournal(prepareJournal(doc, info)); + oafs.add(p); + break; + case "dataset": + final Dataset d = new Dataset(); + populateResultFields(d, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp); + d.setResulttype(DATASET_RESULTTYPE_QUALIFIER); + d.setStoragedate(prepareDatasetStorageDate(doc, info)); + d.setDevice(prepareDatasetDevice(doc, info)); + d.setSize(prepareDatasetSize(doc, info)); + d.setVersion(prepareDatasetVersion(doc, info)); + d.setLastmetadataupdate(prepareDatasetLastMetadataUpdate(doc, info)); + d.setMetadataversionnumber(prepareDatasetMetadataVersionNumber(doc, info)); + d.setGeolocation(prepareDatasetGeoLocations(doc, info)); + oafs.add(d); + break; + case "software": + final Software s = new Software(); + populateResultFields(s, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp); + s.setResulttype(SOFTWARE_RESULTTYPE_QUALIFIER); + s.setDocumentationUrl(prepareSoftwareDocumentationUrls(doc, info)); + s.setLicense(prepareSoftwareLicenses(doc, info)); + s.setCodeRepositoryUrl(prepareSoftwareCodeRepositoryUrl(doc, info)); + s.setProgrammingLanguage(prepareSoftwareProgrammingLanguage(doc, info)); + oafs.add(s); + break; + case "otherresearchproducts": + default: + final OtherResearchProduct o = new OtherResearchProduct(); + populateResultFields(o, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp); + o.setResulttype(OTHER_RESULTTYPE_QUALIFIER); + o.setContactperson(prepareOtherResearchProductContactPersons(doc, info)); + o.setContactgroup(prepareOtherResearchProductContactGroups(doc, info)); + o.setTool(prepareOtherResearchProductTools(doc, info)); + oafs.add(o); + break; + } + + if (!oafs.isEmpty()) { + oafs.addAll(addProjectRels(doc, collectedFrom, info, lastUpdateTimestamp)); + oafs.addAll(addOtherResultRels(doc, collectedFrom, info, lastUpdateTimestamp)); + } + + return oafs; + } + + private List addProjectRels(final Document doc, + final KeyValue collectedFrom, + final DataInfo info, + final long lastUpdateTimestamp) { + + final List res = new ArrayList<>(); + + final String docId = createOpenaireId(50, doc.valueOf("//dri:objIdentifier")); + + for (final Object o : doc.selectNodes("//oaf:projectid")) { + final String projectId = createOpenaireId(40, ((Node) o).getText()); + + final Relation r1 = new Relation(); + r1.setRelType("resultProject"); + r1.setSubRelType("outcome"); + r1.setRelClass("isProducedBy"); + r1.setSource(docId); + r1.setTarget(projectId); + r1.setCollectedFrom(Arrays.asList(collectedFrom)); + r1.setDataInfo(info); + r1.setLastupdatetimestamp(lastUpdateTimestamp); + res.add(r1); + + final Relation r2 = new Relation(); + r2.setRelType("resultProject"); + r2.setSubRelType("outcome"); + r2.setRelClass("produces"); + r2.setSource(projectId); + r2.setTarget(docId); + r2.setCollectedFrom(Arrays.asList(collectedFrom)); + r2.setDataInfo(info); + r2.setLastupdatetimestamp(lastUpdateTimestamp); + res.add(r2); + } + + return res; + } + + protected abstract List addOtherResultRels(final Document doc, + final KeyValue collectedFrom, + final DataInfo info, + final long lastUpdateTimestamp); + + private void populateResultFields(final Result r, + final Document doc, + final KeyValue collectedFrom, + final KeyValue hostedBy, + final DataInfo info, + final long lastUpdateTimestamp) { + r.setDataInfo(info); + r.setLastupdatetimestamp(lastUpdateTimestamp); + r.setId(createOpenaireId(50, doc.valueOf("//dri:objIdentifier"))); + r.setOriginalId(Arrays.asList(doc.valueOf("//dri:objIdentifier"))); + r.setCollectedfrom(Arrays.asList(collectedFrom)); + r.setPid(prepareListStructProps(doc, "//oaf:identifier", "@identifierType", "dnet:pid_types", "dnet:pid_types", info)); + r.setDateofcollection(doc.valueOf("//dr:dateOfCollection")); + r.setDateoftransformation(doc.valueOf("//dr:dateOfTransformation")); + r.setExtraInfo(new ArrayList<>()); // NOT PRESENT IN MDSTORES + r.setOaiprovenance(prepareOAIprovenance(doc)); + r.setAuthor(prepareAuthors(doc, info)); + r.setLanguage(prepareLanguages(doc)); + r.setCountry(new ArrayList<>()); // NOT PRESENT IN MDSTORES + r.setSubject(prepareSubjects(doc, info)); + r.setTitle(prepareTitles(doc, info)); + r.setRelevantdate(prepareRelevantDates(doc, info)); + r.setDescription(prepareDescriptions(doc, info)); + r.setDateofacceptance(prepareField(doc, "//oaf:dateAccepted", info)); + r.setPublisher(preparePublisher(doc, info)); + r.setEmbargoenddate(prepareField(doc, "//oaf:embargoenddate", info)); + r.setSource(prepareSources(doc, info)); + r.setFulltext(new ArrayList<>()); // NOT PRESENT IN MDSTORES + r.setFormat(prepareFormats(doc, info)); + r.setContributor(prepareContributors(doc, info)); + r.setResourcetype(prepareResourceType(doc, info)); + r.setCoverage(prepareCoverages(doc, info)); + r.setContext(new ArrayList<>()); // NOT PRESENT IN MDSTORES + r.setExternalReference(new ArrayList<>()); // NOT PRESENT IN MDSTORES + r.setInstance(prepareInstances(doc, info, collectedFrom, hostedBy)); + } + + protected abstract Qualifier prepareResourceType(Document doc, DataInfo info); + + protected abstract List prepareInstances(Document doc, DataInfo info, KeyValue collectedfrom, KeyValue hostedby); + + protected abstract List> prepareSources(Document doc, DataInfo info); + + protected abstract List prepareRelevantDates(Document doc, DataInfo info); + + protected abstract List> prepareCoverages(Document doc, DataInfo info); + + protected abstract List> prepareContributors(Document doc, DataInfo info); + + protected abstract List> prepareFormats(Document doc, DataInfo info); + + protected abstract Field preparePublisher(Document doc, DataInfo info); + + protected abstract List> prepareDescriptions(Document doc, DataInfo info); + + protected abstract List prepareTitles(Document doc, DataInfo info); + + protected abstract List prepareSubjects(Document doc, DataInfo info); + + protected abstract Qualifier prepareLanguages(Document doc); + + protected abstract List prepareAuthors(Document doc, DataInfo info); + + protected abstract List> prepareOtherResearchProductTools(Document doc, DataInfo info); + + protected abstract List> prepareOtherResearchProductContactGroups(Document doc, DataInfo info); + + protected abstract List> prepareOtherResearchProductContactPersons(Document doc, DataInfo info); + + protected abstract Qualifier prepareSoftwareProgrammingLanguage(Document doc, DataInfo info); + + protected abstract Field prepareSoftwareCodeRepositoryUrl(Document doc, DataInfo info); + + protected abstract List prepareSoftwareLicenses(Document doc, DataInfo info); + + protected abstract List> prepareSoftwareDocumentationUrls(Document doc, DataInfo info); + + protected abstract List prepareDatasetGeoLocations(Document doc, DataInfo info); + + protected abstract Field prepareDatasetMetadataVersionNumber(Document doc, DataInfo info); + + protected abstract Field prepareDatasetLastMetadataUpdate(Document doc, DataInfo info); + + protected abstract Field prepareDatasetVersion(Document doc, DataInfo info); + + protected abstract Field prepareDatasetSize(Document doc, DataInfo info); + + protected abstract Field prepareDatasetDevice(Document doc, DataInfo info); + + protected abstract Field prepareDatasetStorageDate(Document doc, DataInfo info); + + private Journal prepareJournal(final Document doc, final DataInfo info) { + final Node n = doc.selectSingleNode("//oaf:journal"); + if (n != null) { + final String name = n.getText(); + final String issnPrinted = n.valueOf("@issn"); + final String issnOnline = n.valueOf("@eissn"); + final String issnLinking = n.valueOf("@lissn"); + final String ep = n.valueOf("@ep"); + final String iss = n.valueOf("@iss"); + final String sp = n.valueOf("@sp"); + final String vol = n.valueOf("@vol"); + final String edition = n.valueOf("@edition"); + if (StringUtils.isNotBlank(name)) { return journal(name, issnPrinted, issnOnline, issnLinking, ep, iss, sp, vol, edition, null, null, info); } + } + return null; + } + + protected Qualifier prepareQualifier(final Node node, final String xpath, final String schemeId, final String schemeName) { + final String classId = node.valueOf(xpath); + final String className = code2name.get(classId); + return qualifier(classId, className, schemeId, schemeName); + } + + protected List prepareListStructProps(final Node node, + final String xpath, + final String xpathClassId, + final String schemeId, + final String schemeName, + final DataInfo info) { + final List res = new ArrayList<>(); + for (final Object o : node.selectNodes(xpath)) { + final Node n = (Node) o; + final String classId = n.valueOf(xpathClassId); + final String className = code2name.get(classId); + res.add(structuredProperty(n.getText(), classId, className, schemeId, schemeName, info)); + } + return res; + } + + protected List prepareListStructProps(final Node node, final String xpath, final Qualifier qualifier, final DataInfo info) { + final List res = new ArrayList<>(); + for (final Object o : node.selectNodes(xpath)) { + final Node n = (Node) o; + res.add(structuredProperty(n.getText(), qualifier, info)); + } + return res; + } + + protected List prepareListStructProps(final Node node, final String xpath, final DataInfo info) { + final List res = new ArrayList<>(); + for (final Object o : node.selectNodes(xpath)) { + final Node n = (Node) o; + res.add(structuredProperty(n.getText(), n.valueOf("@classid"), n.valueOf("@classname"), n.valueOf("@schemeid"), n + .valueOf("@schemename"), info)); + } + return res; + } + + protected OAIProvenance prepareOAIprovenance(final Document doc) { + final Node n = doc.selectSingleNode("//*[local-name()='provenance']/*[local-name()='originDescription']"); + + if (n == null) { return null; } + + final String identifier = n.valueOf("./*[local-name()='identifier']"); + final String baseURL = n.valueOf("./*[local-name()='baseURL']");; + final String metadataNamespace = n.valueOf("./*[local-name()='metadataNamespace']");; + final boolean altered = n.valueOf("@altered").equalsIgnoreCase("true"); + final String datestamp = n.valueOf("./*[local-name()='datestamp']");; + final String harvestDate = n.valueOf("@harvestDate");; + + return oaiIProvenance(identifier, baseURL, metadataNamespace, altered, datestamp, harvestDate); + + } + + protected DataInfo prepareDataInfo(final Document doc) { + final Node n = doc.selectSingleNode("//oaf:datainfo"); + + final String paClassId = n.valueOf("./oaf:provenanceaction/@classid"); + final String paClassName = n.valueOf("./oaf:provenanceaction/@classname"); + final String paSchemeId = n.valueOf("./oaf:provenanceaction/@schemeid"); + final String paSchemeName = n.valueOf("./oaf:provenanceaction/@schemename"); + + final boolean deletedbyinference = Boolean.parseBoolean(n.valueOf("./oaf:deletedbyinference")); + final String inferenceprovenance = n.valueOf("./oaf:inferenceprovenance"); + final Boolean inferred = Boolean.parseBoolean(n.valueOf("./oaf:inferred")); + final String trust = n.valueOf("./oaf:trust"); + + return dataInfo(deletedbyinference, inferenceprovenance, inferred, false, qualifier(paClassId, paClassName, paSchemeId, paSchemeName), trust); + } + + protected Field prepareField(final Node node, final String xpath, final DataInfo info) { + return field(node.valueOf(xpath), info); + } + + protected List> prepareListFields(final Node node, final String xpath, final DataInfo info) { + return listFields(info, prepareListString(node, xpath)); + } + + protected List prepareListString(final Node node, final String xpath) { + final List res = new ArrayList<>(); + for (final Object o : node.selectNodes(xpath)) { + final String s = ((Node) o).getText().trim(); + if (StringUtils.isNotBlank(s)) { + res.add(s); + } + } + return res; + } + + @Override + public void close() throws IOException { + super.close(); + mdstoreClient.close(); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/DbClient.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/DbClient.java new file mode 100644 index 000000000..9ac0089d2 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/DbClient.java @@ -0,0 +1,63 @@ +package eu.dnetlib.dhp.migration; + +import java.io.Closeable; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.function.Consumer; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class DbClient implements Closeable { + + private static final Log log = LogFactory.getLog(DbClient.class); + + private Connection connection; + + public DbClient(final String address, final String login, final String password) { + + try { + Class.forName("org.postgresql.Driver"); + + this.connection = + StringUtils.isNoneBlank(login, password) ? DriverManager.getConnection(address, login, password) : DriverManager.getConnection(address); + this.connection.setAutoCommit(false); + } catch (final Exception e) { + log.error(e.getClass().getName() + ": " + e.getMessage()); + throw new RuntimeException(e); + } + log.info("Opened database successfully"); + } + + public void processResults(final String sql, final Consumer consumer) { + + try (final Statement stmt = connection.createStatement()) { + stmt.setFetchSize(100); + + try (final ResultSet rs = stmt.executeQuery(sql)) { + while (rs.next()) { + consumer.accept(rs); + } + } catch (final SQLException e) { + throw new RuntimeException(e); + } + } catch (final SQLException e1) { + throw new RuntimeException(e1); + } + } + + @Override + public void close() throws IOException { + try { + connection.close(); + } catch (final SQLException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/ExtractEntitiesFromHDFSJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/ExtractEntitiesFromHDFSJob.java new file mode 100644 index 000000000..f2d9caebf --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/ExtractEntitiesFromHDFSJob.java @@ -0,0 +1,56 @@ +package eu.dnetlib.dhp.migration; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.io.Text; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +import java.util.Arrays; +import java.util.List; + +public class ExtractEntitiesFromHDFSJob { + + + private static List folderNames = Arrays.asList("db_entities", "oaf_entities", "odf_entities"); + + public static void main(String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils.toString(MigrateMongoMdstoresApplication.class.getResourceAsStream("/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json"))); + parser.parseArgument(args); + + final SparkSession spark = SparkSession + .builder() + .appName(ExtractEntitiesFromHDFSJob.class.getSimpleName()) + .master(parser.get("master")) + .getOrCreate(); + + final String sourcePath = parser.get("sourcePath"); + final String targetPath = parser.get("graphRawPath"); + final String entity = parser.get("entity"); + + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + + JavaRDD inputRdd = sc.emptyRDD(); + + + folderNames.forEach(p -> inputRdd.union( + sc.sequenceFile(sourcePath+"/"+p, Text.class, Text.class) + .map(k -> new Tuple2<>(k._1().toString(), k._2().toString())) + .filter(k -> isEntityType(k._1(), entity)) + .map(Tuple2::_2)) + ); + + inputRdd.saveAsTextFile(targetPath+"/"+entity); + } + + + private static boolean isEntityType(final String item, final String entity) { + return StringUtils.substringAfter(item, ":").equalsIgnoreCase(entity); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MdstoreClient.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MdstoreClient.java new file mode 100644 index 000000000..87dadfc7a --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MdstoreClient.java @@ -0,0 +1,94 @@ +package eu.dnetlib.dhp.migration; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.StreamSupport; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.bson.Document; + +import com.google.common.collect.Iterables; +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; + +public class MdstoreClient implements Closeable { + + private final MongoClient client; + private final MongoDatabase db; + + private static final String COLL_METADATA = "metadata"; + private static final String COLL_METADATA_MANAGER = "metadataManager"; + + private static final Log log = LogFactory.getLog(MdstoreClient.class); + + public MdstoreClient(final String baseUrl, final String dbName) { + this.client = new MongoClient(new MongoClientURI(baseUrl)); + this.db = getDb(client, dbName); + } + + public Map validCollections(final String mdFormat, final String mdLayout, final String mdInterpretation) { + + final Map transactions = new HashMap<>(); + for (final Document entry : getColl(db, COLL_METADATA_MANAGER, true).find()) { + final String mdId = entry.getString("mdId"); + final String currentId = entry.getString("currentId"); + if (StringUtils.isNoneBlank(mdId, currentId)) { + transactions.put(mdId, currentId); + } + } + + final Map res = new HashMap<>(); + for (final Document entry : getColl(db, COLL_METADATA, true).find()) { + if (entry.getString("format").equals(mdFormat) && entry.getString("layout").equals(mdLayout) + && entry.getString("interpretation").equals(mdInterpretation) && transactions.containsKey(entry.getString("mdId"))) { + res.put(entry.getString("mdId"), transactions.get(entry.getString("mdId"))); + } + } + + return res; + } + + private MongoDatabase getDb(final MongoClient client, final String dbName) { + if (!Iterables.contains(client.listDatabaseNames(), dbName)) { + final String err = String.format("Database '%s' not found in %s", dbName, client.getAddress()); + log.warn(err); + throw new RuntimeException(err); + } + return client.getDatabase(dbName); + } + + private MongoCollection getColl(final MongoDatabase db, final String collName, final boolean abortIfMissing) { + if (!Iterables.contains(db.listCollectionNames(), collName)) { + final String err = String.format(String.format("Missing collection '%s' in database '%s'", collName, db.getName())); + log.warn(err); + if (abortIfMissing) { + throw new RuntimeException(err); + } else { + return null; + } + } + return db.getCollection(collName); + } + + public Iterable listRecords(final String collName) { + final MongoCollection coll = getColl(db, collName, false); + return coll == null ? new ArrayList<>() + : () -> StreamSupport.stream(coll.find().spliterator(), false) + .filter(e -> e.containsKey("body")) + .map(e -> e.getString("body")) + .iterator(); + } + + @Override + public void close() throws IOException { + client.close(); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateDbEntitiesApplication.java new file mode 100644 index 000000000..d22e8e5b3 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateDbEntitiesApplication.java @@ -0,0 +1,520 @@ +package eu.dnetlib.dhp.migration; + +import java.io.Closeable; +import java.io.IOException; +import java.sql.Array; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.function.Consumer; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Datasource; +import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.Journal; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.Project; +import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; + +public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor implements Closeable { + + private static final Qualifier ENTITYREGISTRY_PROVENANCE_ACTION = + qualifier("sysimport:crosswalk:entityregistry", "sysimport:crosswalk:entityregistry", "dnet:provenance_actions", "dnet:provenance_actions"); + + private static final Log log = LogFactory.getLog(MigrateDbEntitiesApplication.class); + + private final DbClient dbClient; + + private final long lastUpdateTimestamp; + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils.toString(MigrateDbEntitiesApplication.class.getResourceAsStream("/eu/dnetlib/dhp/migration/migrate_db_entities_parameters.json"))); + + parser.parseArgument(args); + + final String dbUrl = parser.get("postgresUrl"); + final String dbUser = parser.get("postgresUser"); + final String dbPassword = parser.get("postgresPassword"); + + final String hdfsPath = parser.get("hdfsPath"); + final String hdfsNameNode = parser.get("namenode"); + final String hdfsUser = parser.get("hdfsUser"); + + try (final MigrateDbEntitiesApplication smdbe = new MigrateDbEntitiesApplication(hdfsPath, hdfsNameNode, hdfsUser, dbUrl, dbUser, dbPassword)) { + log.info("Processing datasources..."); + smdbe.execute("queryDatasources.sql", smdbe::processDatasource); + + log.info("Processing projects..."); + smdbe.execute("queryProjects.sql", smdbe::processProject); + + log.info("Processing orgs..."); + smdbe.execute("queryOrganizations.sql", smdbe::processOrganization); + + log.info("Processing relations ds <-> orgs ..."); + smdbe.execute("queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization); + + log.info("Processing projects <-> orgs ..."); + smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization); + + log.info("All done."); + } + } + + public MigrateDbEntitiesApplication(final String hdfsPath, final String hdfsNameNode, final String hdfsUser, final String dbUrl, final String dbUser, + final String dbPassword) throws Exception { + super(hdfsPath, hdfsNameNode, hdfsUser); + this.dbClient = new DbClient(dbUrl, dbUser, dbPassword); + this.lastUpdateTimestamp = new Date().getTime(); + } + + public void execute(final String sqlFile, final Consumer consumer) throws Exception { + final String sql = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/migration/sql/" + sqlFile)); + dbClient.processResults(sql, consumer); + } + + public void processDatasource(final ResultSet rs) { + + try { + + final DataInfo info = prepareDataInfo(rs); + + final Datasource ds = new Datasource(); + + ds.setId(createOpenaireId(10, rs.getString("datasourceid"))); + ds.setOriginalId(Arrays.asList(rs.getString("datasourceid"))); + ds.setCollectedfrom(listKeyValues(rs.getString("collectedfromid"), rs.getString("collectedfromname"))); + ds.setPid(new ArrayList<>()); + ds.setDateofcollection(asString(rs.getDate("dateofcollection"))); + ds.setDateoftransformation(null); // Value not returned by the SQL query + ds.setExtraInfo(new ArrayList<>()); // Values not present in the DB + ds.setOaiprovenance(null); // Values not present in the DB + ds.setDatasourcetype(prepareQualifierSplitting(rs.getString("datasourcetype"))); + ds.setOpenairecompatibility(prepareQualifierSplitting(rs.getString("openairecompatibility"))); + ds.setOfficialname(field(rs.getString("officialname"), info)); + ds.setEnglishname(field(rs.getString("englishname"), info)); + ds.setWebsiteurl(field(rs.getString("websiteurl"), info)); + ds.setLogourl(field(rs.getString("logourl"), info)); + ds.setContactemail(field(rs.getString("contactemail"), info)); + ds.setNamespaceprefix(field(rs.getString("namespaceprefix"), info)); + ds.setLatitude(field(Double.toString(rs.getDouble("latitude")), info)); + ds.setLongitude(field(Double.toString(rs.getDouble("longitude")), info)); + ds.setDateofvalidation(field(asString(rs.getDate("dateofvalidation")), info)); + ds.setDescription(field(rs.getString("description"), info)); + ds.setSubjects(prepareListOfStructProps(rs.getArray("subjects"), info)); + ds.setOdnumberofitems(field(Double.toString(rs.getInt("odnumberofitems")), info)); + ds.setOdnumberofitemsdate(field(asString(rs.getDate("odnumberofitemsdate")), info)); + ds.setOdpolicies(field(rs.getString("odpolicies"), info)); + ds.setOdlanguages(prepareListFields(rs.getArray("odlanguages"), info)); + ds.setOdcontenttypes(prepareListFields(rs.getArray("odcontenttypes"), info)); + ds.setAccessinfopackage(prepareListFields(rs.getArray("accessinfopackage"), info)); + ds.setReleasestartdate(field(asString(rs.getDate("releasestartdate")), info)); + ds.setReleaseenddate(field(asString(rs.getDate("releaseenddate")), info)); + ds.setMissionstatementurl(field(rs.getString("missionstatementurl"), info)); + ds.setDataprovider(field(rs.getBoolean("dataprovider"), info)); + ds.setServiceprovider(field(rs.getBoolean("serviceprovider"), info)); + ds.setDatabaseaccesstype(field(rs.getString("databaseaccesstype"), info)); + ds.setDatauploadtype(field(rs.getString("datauploadtype"), info)); + ds.setDatabaseaccessrestriction(field(rs.getString("databaseaccessrestriction"), info)); + ds.setDatauploadrestriction(field(rs.getString("datauploadrestriction"), info)); + ds.setVersioning(field(rs.getBoolean("versioning"), info)); + ds.setCitationguidelineurl(field(rs.getString("citationguidelineurl"), info)); + ds.setQualitymanagementkind(field(rs.getString("qualitymanagementkind"), info)); + ds.setPidsystems(field(rs.getString("pidsystems"), info)); + ds.setCertificates(field(rs.getString("certificates"), info)); + ds.setPolicies(new ArrayList<>()); // The sql query returns an empty array + ds.setJournal(prepareJournal(rs.getString("officialname"), rs.getString("journal"), info)); // Journal + ds.setDataInfo(info); + ds.setLastupdatetimestamp(lastUpdateTimestamp); + + // rs.getString("datasourceid"); + // rs.getArray("identities"); + // rs.getString("officialname"); + // rs.getString("englishname"); + // rs.getString("contactemail"); + // rs.getString("openairecompatibility"); // COMPLEX ...@@@... + // rs.getString("websiteurl"); + // rs.getString("logourl"); + // rs.getArray("accessinfopackage"); + // rs.getDouble("latitude"); + // rs.getDouble("longitude"); + // rs.getString("namespaceprefix"); + // rs.getInt("odnumberofitems"); // NULL + // rs.getDate("odnumberofitemsdate"); // NULL + // rs.getArray("subjects"); + // rs.getString("description"); + // rs.getString("odpolicies"); // NULL + // rs.getArray("odlanguages"); + // rs.getArray("odcontenttypes"); + // rs.getBoolean("inferred"); // false + // rs.getBoolean("deletedbyinference");// false + // rs.getDouble("trust"); // 0.9 + // rs.getString("inferenceprovenance"); // NULL + // rs.getDate("dateofcollection"); + // rs.getDate("dateofvalidation"); + // rs.getDate("releasestartdate"); + // rs.getDate("releaseenddate"); + // rs.getString("missionstatementurl"); + // rs.getBoolean("dataprovider"); + // rs.getBoolean("serviceprovider"); + // rs.getString("databaseaccesstype"); + // rs.getString("datauploadtype"); + // rs.getString("databaseaccessrestriction"); + // rs.getString("datauploadrestriction"); + // rs.getBoolean("versioning"); + // rs.getString("citationguidelineurl"); + // rs.getString("qualitymanagementkind"); + // rs.getString("pidsystems"); + // rs.getString("certificates"); + // rs.getArray("policies"); + // rs.getString("collectedfromid"); + // rs.getString("collectedfromname"); + // rs.getString("datasourcetype"); // COMPLEX + // rs.getString("provenanceaction"); // + // 'sysimport:crosswalk:entityregistry@@@sysimport:crosswalk:entityregistry@@@dnet:provenance_actions@@@dnet:provenance_actions' + // AS provenanceaction, + // rs.getString("journal"); // CONCAT(d.issn, '@@@', d.eissn, '@@@', d.lissn) AS journal + + emitOaf(ds); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + public void processProject(final ResultSet rs) { + try { + + final DataInfo info = prepareDataInfo(rs); + + final Project p = new Project(); + + p.setId(createOpenaireId(40, rs.getString("projectid"))); + p.setOriginalId(Arrays.asList(rs.getString("projectid"))); + p.setCollectedfrom(listKeyValues(rs.getString("collectedfromid"), rs.getString("collectedfromname"))); + p.setPid(new ArrayList<>()); + p.setDateofcollection(asString(rs.getDate("dateofcollection"))); + p.setDateoftransformation(asString(rs.getDate("dateoftransformation"))); + p.setExtraInfo(new ArrayList<>()); // Values not present in the DB + p.setOaiprovenance(null); // Values not present in the DB + p.setWebsiteurl(field(rs.getString("websiteurl"), info)); + p.setCode(field(rs.getString("code"), info)); + p.setAcronym(field(rs.getString("acronym"), info)); + p.setTitle(field(rs.getString("title"), info)); + p.setStartdate(field(asString(rs.getDate("startdate")), info)); + p.setEnddate(field(asString(rs.getDate("enddate")), info)); + p.setCallidentifier(field(rs.getString("callidentifier"), info)); + p.setKeywords(field(rs.getString("keywords"), info)); + p.setDuration(field(Integer.toString(rs.getInt("duration")), info)); + p.setEcsc39(field(Boolean.toString(rs.getBoolean("ecsc39")), info)); + p.setOamandatepublications(field(Boolean.toString(rs.getBoolean("oamandatepublications")), info)); + p.setEcarticle29_3(field(Boolean.toString(rs.getBoolean("ecarticle29_3")), info)); + p.setSubjects(prepareListOfStructProps(rs.getArray("subjects"), info)); + p.setFundingtree(prepareListFields(rs.getArray("fundingtree"), info)); + p.setContracttype(prepareQualifierSplitting(rs.getString("contracttype"))); + p.setOptional1(field(rs.getString("optional1"), info)); + p.setOptional2(field(rs.getString("optional2"), info)); + p.setJsonextrainfo(field(rs.getString("jsonextrainfo"), info)); + p.setContactfullname(field(rs.getString("contactfullname"), info)); + p.setContactfax(field(rs.getString("contactfax"), info)); + p.setContactphone(field(rs.getString("contactphone"), info)); + p.setContactemail(field(rs.getString("contactemail"), info)); + p.setSummary(field(rs.getString("summary"), info)); + p.setCurrency(field(rs.getString("currency"), info)); + p.setTotalcost(new Float(rs.getDouble("totalcost"))); + p.setFundedamount(new Float(rs.getDouble("fundedamount"))); + p.setDataInfo(info); + p.setLastupdatetimestamp(lastUpdateTimestamp); + + // rs.getString("projectid"); + // rs.getString("code"); + // rs.getString("websiteurl"); + // rs.getString("acronym"); + // rs.getString("title"); + // rs.getDate("startdate"); + // rs.getDate("enddate"); + // rs.getString("callidentifier"); + // rs.getString("keywords"); + // rs.getInt("duration"); + // rs.getBoolean("ecsc39"); + // rs.getBoolean("oamandatepublications"); + // rs.getBoolean("ecarticle29_3"); + // rs.getDate("dateofcollection"); + // rs.getDate("dateoftransformation"); + // rs.getBoolean("inferred"); + // rs.getBoolean("deletedbyinference"); + // rs.getDouble("trust"); + // rs.getString("inferenceprovenance"); + // rs.getString("optional1"); + // rs.getString("optional2"); + // rs.getString("jsonextrainfo"); + // rs.getString("contactfullname"); + // rs.getString("contactfax"); + // rs.getString("contactphone"); + // rs.getString("contactemail"); + // rs.getString("summary"); + // rs.getString("currency"); + // rs.getDouble("totalcost"); + // rs.getDouble("fundedamount"); + // rs.getString("collectedfromid"); + // rs.getString("collectedfromname"); + // rs.getString("contracttype"); // COMPLEX + // rs.getString("provenanceaction"); // COMPLEX + // rs.getArray("pid"); + // rs.getArray("subjects"); + // rs.getArray("fundingtree"); + + emitOaf(p); + + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + public void processOrganization(final ResultSet rs) { + + try { + + final DataInfo info = prepareDataInfo(rs); + + final Organization o = new Organization(); + + o.setId(createOpenaireId(20, rs.getString("organizationid"))); + o.setOriginalId(Arrays.asList(rs.getString("organizationid"))); + o.setCollectedfrom(listKeyValues(rs.getString("collectedfromid"), rs.getString("collectedfromname"))); + o.setPid(new ArrayList<>()); + o.setDateofcollection(asString(rs.getDate("dateofcollection"))); + o.setDateoftransformation(asString(rs.getDate("dateoftransformation"))); + o.setExtraInfo(new ArrayList<>()); // Values not present in the DB + o.setOaiprovenance(null); // Values not present in the DB + o.setLegalshortname(field("legalshortname", info)); + o.setLegalname(field("legalname", info)); + o.setAlternativeNames(new ArrayList<>()); // Values not returned by the SQL query + o.setWebsiteurl(field("websiteurl", info)); + o.setLogourl(field("logourl", info)); + o.setEclegalbody(field(Boolean.toString(rs.getBoolean("eclegalbody")), info)); + o.setEclegalperson(field(Boolean.toString(rs.getBoolean("eclegalperson")), info)); + o.setEcnonprofit(field(Boolean.toString(rs.getBoolean("ecnonprofit")), info)); + o.setEcresearchorganization(field(Boolean.toString(rs.getBoolean("ecresearchorganization")), info)); + o.setEchighereducation(field(Boolean.toString(rs.getBoolean("echighereducation")), info)); + o.setEcinternationalorganizationeurinterests(field(Boolean.toString(rs.getBoolean("ecinternationalorganizationeurinterests")), info)); + o.setEcinternationalorganization(field(Boolean.toString(rs.getBoolean("ecinternationalorganization")), info)); + o.setEcenterprise(field(Boolean.toString(rs.getBoolean("ecenterprise")), info)); + o.setEcsmevalidated(field(Boolean.toString(rs.getBoolean("ecsmevalidated")), info)); + o.setEcnutscode(field(Boolean.toString(rs.getBoolean("ecnutscode")), info)); + o.setCountry(prepareQualifierSplitting(rs.getString("country"))); + o.setDataInfo(info); + o.setLastupdatetimestamp(lastUpdateTimestamp); + + // rs.getString("organizationid"); + // rs.getString("legalshortname"); + // rs.getString("legalname"); + // rs.getString("websiteurl"); + // rs.getString("logourl"); + // rs.getBoolean("eclegalbody"); + // rs.getBoolean("eclegalperson"); + // rs.getBoolean("ecnonprofit"); + // rs.getBoolean("ecresearchorganization"); + // rs.getBoolean("echighereducation"); + // rs.getBoolean("ecinternationalorganizationeurinterests"); + // rs.getBoolean("ecinternationalorganization"); + // rs.getBoolean("ecenterprise"); + // rs.getBoolean("ecsmevalidated"); + // rs.getBoolean("ecnutscode"); + // rs.getDate("dateofcollection"); + // rs.getDate("dateoftransformation"); + // rs.getBoolean("inferred"); + // rs.getBoolean("deletedbyinference"); + // rs.getDouble("trust"); + // rs.getString("inferenceprovenance"); + // rs.getString("collectedfromid"); + // rs.getString("collectedfromname"); + // rs.getString("country"); + // rs.getString("provenanceaction"); + // rs.getArray("pid"); + + emitOaf(o); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + public void processDatasourceOrganization(final ResultSet rs) { + + try { + final DataInfo info = prepareDataInfo(rs); + final String orgId = createOpenaireId(20, rs.getString("organization")); + final String dsId = createOpenaireId(10, rs.getString("datasource")); + final List collectedFrom = listKeyValues(rs.getString("collectedfromid"), rs.getString("collectedfromname")); + + final Relation r1 = new Relation(); + r1.setRelType("datasourceOrganization"); + r1.setSubRelType("provision"); + r1.setRelClass("isProvidedBy"); + r1.setSource(dsId); + r1.setTarget(orgId); + r1.setCollectedFrom(collectedFrom); + r1.setDataInfo(info); + r1.setLastupdatetimestamp(lastUpdateTimestamp); + emitOaf(r1); + + final Relation r2 = new Relation(); + r2.setRelType("datasourceOrganization"); + r2.setSubRelType("provision"); + r2.setRelClass("provides"); + r2.setSource(orgId); + r2.setTarget(dsId); + r2.setCollectedFrom(collectedFrom); + r2.setDataInfo(info); + r1.setLastupdatetimestamp(lastUpdateTimestamp); + emitOaf(r2); + + // rs.getString("datasource"); + // rs.getString("organization"); + // rs.getDate("startdate"); // NULL + // rs.getDate("enddate"); // NULL + // rs.getBoolean("inferred"); // false + // rs.getBoolean("deletedbyinference"); // false + // rs.getDouble("trust"); // 0.9 + // rs.getString("inferenceprovenance"); // NULL + // rs.getString("semantics"); // 'providedBy@@@provided + // by@@@dnet:datasources_organizations_typologies@@@dnet:datasources_organizations_typologies' AS + // semantics, + // rs.getString("provenanceaction"); // d.provenanceaction || '@@@' || d.provenanceaction || + // '@@@dnet:provenanceActions@@@dnet:provenanceActions' AS provenanceaction + + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + public void processProjectOrganization(final ResultSet rs) { + + try { + final DataInfo info = prepareDataInfo(rs); + final String orgId = createOpenaireId(20, rs.getString("resporganization")); + final String projectId = createOpenaireId(40, rs.getString("project")); + final List collectedFrom = listKeyValues(rs.getString("collectedfromid"), rs.getString("collectedfromname")); + + final Relation r1 = new Relation(); + r1.setRelType("projectOrganization"); + r1.setSubRelType("participation"); + r1.setRelClass("isParticipant"); + r1.setSource(projectId); + r1.setTarget(orgId); + r1.setCollectedFrom(collectedFrom); + r1.setDataInfo(info); + r1.setLastupdatetimestamp(lastUpdateTimestamp); + emitOaf(r1); + + final Relation r2 = new Relation(); + r2.setRelType("projectOrganization"); + r2.setSubRelType("participation"); + r2.setRelClass("hasParticipant"); + r2.setSource(orgId); + r2.setTarget(projectId); + r2.setCollectedFrom(collectedFrom); + r2.setDataInfo(info); + r1.setLastupdatetimestamp(lastUpdateTimestamp); + emitOaf(r2); + + // rs.getString("project"); + // rs.getString("resporganization"); + // rs.getInt("participantnumber"); + // rs.getDouble("contribution"); + // rs.getDate("startdate");// null + // rs.getDate("enddate");// null + // rs.getBoolean("inferred");// false + // rs.getBoolean("deletedbyinference"); // false + // rs.getDouble("trust"); + // rs.getString("inferenceprovenance"); // NULL + // rs.getString("semantics"); // po.semanticclass || '@@@' || po.semanticclass || + // '@@@dnet:project_organization_relations@@@dnet:project_organization_relations' AS semantics, + // rs.getString("provenanceaction"); // + // 'sysimport:crosswalk:entityregistry@@@sysimport:crosswalk:entityregistry@@@dnet:provenance_actions@@@dnet:provenance_actions' + // AS provenanceaction + + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + private DataInfo prepareDataInfo(final ResultSet rs) throws SQLException { + final Boolean deletedbyinference = rs.getBoolean("deletedbyinference"); + final String inferenceprovenance = rs.getString("inferenceprovenance"); + final Boolean inferred = rs.getBoolean("inferred"); + final String trust = rs.getString("trust"); + return dataInfo(deletedbyinference, inferenceprovenance, inferred, false, ENTITYREGISTRY_PROVENANCE_ACTION, trust); + } + + private Qualifier prepareQualifierSplitting(final String s) { + if (StringUtils.isBlank(s)) { return null; } + final String[] arr = s.split("@@@"); + return arr.length == 4 ? qualifier(arr[0], arr[1], arr[2], arr[3]) : null; + } + + private List> prepareListFields(final Array array, final DataInfo info) { + try { + return listFields(info, (String[]) array.getArray()); + } catch (final SQLException e) { + throw new RuntimeException("Invalid SQL array", e); + } + } + + private StructuredProperty prepareStructProp(final String s, final DataInfo dataInfo) { + if (StringUtils.isBlank(s)) { return null; } + final String[] parts = s.split("###"); + if (parts.length == 2) { + final String value = parts[0]; + final String[] arr = parts[1].split("@@@"); + if (arr.length == 4) { return structuredProperty(value, arr[0], arr[1], arr[2], arr[3], dataInfo); } + } + return null; + } + + private List prepareListOfStructProps(final Array array, final DataInfo dataInfo) throws SQLException { + final List res = new ArrayList<>(); + if (array != null) { + for (final String s : (String[]) array.getArray()) { + final StructuredProperty sp = prepareStructProp(s, dataInfo); + if (sp != null) { + res.add(sp); + } + } + } + + return res; + } + + private Journal prepareJournal(final String name, final String sj, final DataInfo info) { + if (StringUtils.isNotBlank(sj)) { + final String[] arr = sj.split("@@@"); + if (arr.length == 3) { + final String issn = StringUtils.isNotBlank(arr[0]) ? arr[0] : null; + final String eissn = StringUtils.isNotBlank(arr[1]) ? arr[1] : null;; + final String lissn = StringUtils.isNotBlank(arr[2]) ? arr[2] : null;; + if (issn != null || eissn != null + || lissn != null) { return journal(name, issn, eissn, eissn, null, null, null, null, null, null, null, info); } + } + } + return null; + } + + @Override + public void close() throws IOException { + super.close(); + dbClient.close(); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateMongoMdstoresApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateMongoMdstoresApplication.java new file mode 100644 index 000000000..359fe7596 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateMongoMdstoresApplication.java @@ -0,0 +1,45 @@ +package eu.dnetlib.dhp.migration; + +import org.apache.commons.io.IOUtils; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +public class MigrateMongoMdstoresApplication { + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils.toString(MigrateMongoMdstoresApplication.class.getResourceAsStream("/eu/dnetlib/dhp/migration/migrate_mongo_mstores_parameters.json"))); + parser.parseArgument(args); + + final String mongoBaseUrl = parser.get("mongoBaseUrl"); + final String mongoDb = parser.get("mongoDb"); + + final String mdFormat = parser.get("mdFormat"); + final String mdLayout = parser.get("mdLayout"); + final String mdInterpretation = parser.get("mdInterpretation"); + + final String hdfsPath = parser.get("hdfsPath"); + final String hdfsNameNode = parser.get("namenode"); + final String hdfsUser = parser.get("hdfsUser"); + + final String dbUrl = parser.get("postgresUrl"); + final String dbUser = parser.get("postgresUser"); + final String dbPassword = parser.get("postgresPassword"); + + if (mdFormat.equalsIgnoreCase("oaf")) { + try (final OafMigrationExecutor mig = + new OafMigrationExecutor(hdfsPath, hdfsNameNode, hdfsUser, mongoBaseUrl, mongoDb, dbUrl, dbUser, dbPassword)) { + mig.processMdRecords(mdFormat, mdLayout, mdInterpretation); + } + } else if (mdFormat.equalsIgnoreCase("odf")) { + try (final OdfMigrationExecutor mig = + new OdfMigrationExecutor(hdfsPath, hdfsNameNode, hdfsUser, mongoBaseUrl, mongoDb, dbUrl, dbUser, dbPassword)) { + mig.processMdRecords(mdFormat, mdLayout, mdInterpretation); + } + } else { + throw new RuntimeException("Format not supported: " + mdFormat); + } + + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/OafMigrationExecutor.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/OafMigrationExecutor.java new file mode 100644 index 000000000..c32568290 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/OafMigrationExecutor.java @@ -0,0 +1,251 @@ +package eu.dnetlib.dhp.migration; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.dom4j.Document; +import org.dom4j.Node; + +import eu.dnetlib.dhp.migration.pace.PacePerson; +import eu.dnetlib.dhp.schema.oaf.Author; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.GeoLocation; +import eu.dnetlib.dhp.schema.oaf.Instance; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; + +public class OafMigrationExecutor extends AbstractMongoExecutor { + + private static final Log log = LogFactory.getLog(OafMigrationExecutor.class); + + public OafMigrationExecutor(final String hdfsPath, final String hdfsNameNode, final String hdfsUser, final String mongoBaseUrl, final String mongoDb, + final String dbUrl, final String dbUser, + final String dbPassword) throws Exception { + super(hdfsPath, hdfsNameNode, hdfsUser, mongoBaseUrl, mongoDb, dbUrl, dbUser, dbPassword); + } + + @Override + protected void registerNamespaces(final Map nsContext) { + super.registerNamespaces(nsContext); + nsContext.put("dc", "http://purl.org/dc/elements/1.1/"); + } + + @Override + protected List prepareAuthors(final Document doc, final DataInfo info) { + final List res = new ArrayList<>(); + int pos = 1; + for (final Object o : doc.selectNodes("//dc:creator")) { + final Node n = (Node) o; + final Author author = new Author(); + author.setFullname(n.getText()); + author.setRank(pos++); + final PacePerson p = new PacePerson(n.getText(), false); + if (p.isAccurate()) { + author.setName(p.getNormalisedFirstName()); + author.setSurname(p.getNormalisedSurname()); + } + res.add(author); + } + return res; + } + + @Override + protected Qualifier prepareLanguages(final Document doc) { + return prepareQualifier(doc, "//dc:language", "dnet:languages", "dnet:languages"); + } + + @Override + protected List prepareSubjects(final Document doc, final DataInfo info) { + return prepareListStructProps(doc, "//dc:subject", info); + } + + @Override + protected List prepareTitles(final Document doc, final DataInfo info) { + return prepareListStructProps(doc, "//dc:title", MAIN_TITLE_QUALIFIER, info); + } + + @Override + protected List> prepareDescriptions(final Document doc, final DataInfo info) { + return prepareListFields(doc, "//dc:description", info); + } + + @Override + protected Field preparePublisher(final Document doc, final DataInfo info) { + return prepareField(doc, "//dc:publisher", info); + } + + @Override + protected List> prepareFormats(final Document doc, final DataInfo info) { + return prepareListFields(doc, "//dc:format", info); + } + + @Override + protected List> prepareContributors(final Document doc, final DataInfo info) { + return prepareListFields(doc, "//dc:contributor", info); + } + + @Override + protected List> prepareCoverages(final Document doc, final DataInfo info) { + return prepareListFields(doc, "//dc:coverage", info); + } + + @Override + protected List prepareInstances(final Document doc, final DataInfo info, final KeyValue collectedfrom, final KeyValue hostedby) { + final List res = new ArrayList<>(); + for (final Object o : doc.selectNodes("//dc:identifier")) { + final String url = ((Node) o).getText().trim(); + if (url.startsWith("http")) { + final Instance instance = new Instance(); + instance.setUrl(Arrays.asList(url)); + instance.setInstancetype(prepareQualifier(doc, "//dr:CobjCategory", "dnet:publication_resource", "dnet:publication_resource")); + instance.setCollectedfrom(collectedfrom); + instance.setHostedby(hostedby); + instance.setDateofacceptance(field(doc.valueOf("//oaf:dateAccepted"), info)); + instance.setDistributionlocation(doc.valueOf("//oaf:distributionlocation")); + instance.setAccessright(prepareQualifier(doc, "//oaf:accessrights", "dnet:access_modes", "dnet:access_modes")); + instance.setLicense(field(doc.valueOf("//oaf:license"), info)); + instance.setRefereed(field(doc.valueOf("//oaf:refereed"), info)); + instance.setProcessingchargeamount(field(doc.valueOf("//oaf:processingchargeamount"), info)); + instance.setProcessingchargecurrency(field(doc.valueOf("//oaf:processingchargeamount/@currency"), info)); + res.add(instance); + } + } + return res; + } + + @Override + protected List> prepareSources(final Document doc, final DataInfo info) { + return prepareListFields(doc, "//dc:source", info); + } + + @Override + protected List prepareRelevantDates(final Document doc, final DataInfo info) { + return new ArrayList<>(); // NOT PRESENT IN OAF + } + + // SOFTWARES + + @Override + protected Qualifier prepareSoftwareProgrammingLanguage(final Document doc, final DataInfo info) { + return null; // NOT PRESENT IN OAF + } + + @Override + protected Field prepareSoftwareCodeRepositoryUrl(final Document doc, final DataInfo info) { + return null; // NOT PRESENT IN OAF + } + + @Override + protected List prepareSoftwareLicenses(final Document doc, final DataInfo info) { + return new ArrayList<>(); // NOT PRESENT IN OAF + } + + @Override + protected List> prepareSoftwareDocumentationUrls(final Document doc, final DataInfo info) { + return new ArrayList<>(); // NOT PRESENT IN OAF + } + + // DATASETS + @Override + protected List prepareDatasetGeoLocations(final Document doc, final DataInfo info) { + return new ArrayList<>(); // NOT PRESENT IN OAF + } + + @Override + protected Field prepareDatasetMetadataVersionNumber(final Document doc, final DataInfo info) { + return null; // NOT PRESENT IN OAF + } + + @Override + protected Field prepareDatasetLastMetadataUpdate(final Document doc, final DataInfo info) { + return null; // NOT PRESENT IN OAF + } + + @Override + protected Field prepareDatasetVersion(final Document doc, final DataInfo info) { + return null; // NOT PRESENT IN OAF + } + + @Override + protected Field prepareDatasetSize(final Document doc, final DataInfo info) { + return null; // NOT PRESENT IN OAF + } + + @Override + protected Field prepareDatasetDevice(final Document doc, final DataInfo info) { + return null; // NOT PRESENT IN OAF + } + + @Override + protected Field prepareDatasetStorageDate(final Document doc, final DataInfo info) { + return null; // NOT PRESENT IN OAF + } + + // OTHER PRODUCTS + + @Override + protected List> prepareOtherResearchProductTools(final Document doc, final DataInfo info) { + return new ArrayList<>(); // NOT PRESENT IN OAF + } + + @Override + protected List> prepareOtherResearchProductContactGroups(final Document doc, final DataInfo info) { + return new ArrayList<>(); // NOT PRESENT IN OAF + } + + @Override + protected List> prepareOtherResearchProductContactPersons(final Document doc, final DataInfo info) { + return new ArrayList<>(); // NOT PRESENT IN OAF + } + + @Override + protected List addOtherResultRels(final Document doc, + final KeyValue collectedFrom, + final DataInfo info, + final long lastUpdateTimestamp) { + final String docId = createOpenaireId(50, doc.valueOf("//dri:objIdentifier")); + + final List res = new ArrayList<>(); + + for (final Object o : doc.selectNodes("//*[local-name()='relatedDataset']")) { + final String otherId = createOpenaireId(50, ((Node) o).getText()); + + final Relation r1 = new Relation(); + r1.setRelType("resultResult"); + r1.setSubRelType("publicationDataset"); + r1.setRelClass("isRelatedTo"); + r1.setSource(docId); + r1.setTarget(otherId); + r1.setCollectedFrom(Arrays.asList(collectedFrom)); + r1.setDataInfo(info); + r1.setLastupdatetimestamp(lastUpdateTimestamp); + res.add(r1); + + final Relation r2 = new Relation(); + r2.setRelType("resultResult"); + r2.setSubRelType("publicationDataset"); + r2.setRelClass("isRelatedTo"); + r2.setSource(otherId); + r2.setTarget(docId); + r2.setCollectedFrom(Arrays.asList(collectedFrom)); + r2.setDataInfo(info); + r2.setLastupdatetimestamp(lastUpdateTimestamp); + res.add(r2); + } + return res; + } + + @Override + protected Qualifier prepareResourceType(final Document doc, final DataInfo info) { + return null; // NOT PRESENT IN OAF + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/OdfMigrationExecutor.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/OdfMigrationExecutor.java new file mode 100644 index 000000000..457534085 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/OdfMigrationExecutor.java @@ -0,0 +1,273 @@ +package eu.dnetlib.dhp.migration; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.dom4j.Document; +import org.dom4j.Node; + +import eu.dnetlib.dhp.schema.oaf.Author; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.GeoLocation; +import eu.dnetlib.dhp.schema.oaf.Instance; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; + +public class OdfMigrationExecutor extends AbstractMongoExecutor { + + private static final Log log = LogFactory.getLog(OdfMigrationExecutor.class); + + public OdfMigrationExecutor(final String hdfsPath, final String hdfsNameNode, final String hdfsUser, final String mongoBaseUrl, final String mongoDb, + final String dbUrl, final String dbUser, + final String dbPassword) throws Exception { + super(hdfsPath, hdfsNameNode, hdfsUser, mongoBaseUrl, mongoDb, dbUrl, dbUser, dbPassword); + } + + @Override + protected void registerNamespaces(final Map nsContext) { + super.registerNamespaces(nsContext); + nsContext.put("dc", "http://datacite.org/schema/kernel-3"); + } + + @Override + protected List prepareTitles(final Document doc, final DataInfo info) { + return prepareListStructProps(doc, "//dc:title", MAIN_TITLE_QUALIFIER, info); + } + + @Override + protected List prepareAuthors(final Document doc, final DataInfo info) { + final List res = new ArrayList<>(); + int pos = 1; + for (final Object o : doc.selectNodes("//dc:creator")) { + final Node n = (Node) o; + final Author author = new Author(); + author.setFullname(n.valueOf("./dc:creatorName")); + author.setName(n.valueOf("./dc:givenName")); + author.setSurname(n.valueOf("./dc:familyName")); + author.setAffiliation(prepareListFields(doc, "./dc:affiliation", info)); + author.setPid(preparePids(doc, info)); + author.setRank(pos++); + res.add(author); + } + return res; + } + + private List preparePids(final Document doc, final DataInfo info) { + final List res = new ArrayList<>(); + for (final Object o : doc.selectNodes("./dc:nameIdentifier")) { + res.add(structuredProperty(((Node) o).getText(), prepareQualifier((Node) o, "./@nameIdentifierScheme", "dnet:pid_types", "dnet:pid_types"), info)); + } + return res; + } + + @Override + protected List prepareInstances(final Document doc, final DataInfo info, final KeyValue collectedfrom, final KeyValue hostedby) { + final List res = new ArrayList<>(); + for (final Object o : doc.selectNodes("//dc:alternateIdentifier[@alternateIdentifierType='URL']")) { + final Instance instance = new Instance(); + instance.setUrl(Arrays.asList(((Node) o).getText().trim())); + instance.setInstancetype(prepareQualifier(doc, "//dr:CobjCategory", "dnet:publication_resource", "dnet:publication_resource")); + instance.setCollectedfrom(collectedfrom); + instance.setHostedby(hostedby); + instance.setDateofacceptance(field(doc.valueOf("//oaf:dateAccepted"), info)); + instance.setDistributionlocation(doc.valueOf("//oaf:distributionlocation")); + instance.setAccessright(prepareQualifier(doc, "//oaf:accessrights", "dnet:access_modes", "dnet:access_modes")); + instance.setLicense(field(doc.valueOf("//oaf:license"), info)); + instance.setRefereed(field(doc.valueOf("//oaf:refereed"), info)); + instance.setProcessingchargeamount(field(doc.valueOf("//oaf:processingchargeamount"), info)); + instance.setProcessingchargecurrency(field(doc.valueOf("//oaf:processingchargeamount/@currency"), info)); + res.add(instance); + } + return res; + } + + @Override + protected List> prepareSources(final Document doc, final DataInfo info) { + return new ArrayList<>(); // Not present in ODF ??? + } + + @Override + protected List prepareRelevantDates(final Document doc, final DataInfo info) { + final List res = new ArrayList<>(); + for (final Object o : doc.selectNodes("//dc:date")) { + final String dateType = ((Node) o).valueOf("@dateType"); + if (StringUtils.isBlank(dateType) && !dateType.equalsIgnoreCase("Accepted") && !dateType.equalsIgnoreCase("Issued") + && !dateType.equalsIgnoreCase("Updated") && !dateType.equalsIgnoreCase("Available")) { + res.add(structuredProperty(((Node) o).getText(), "UNKNOWN", "UNKNOWN", "dnet:dataCite_date", "dnet:dataCite_date", info)); + } + } + return res; + } + + @Override + protected List> prepareCoverages(final Document doc, final DataInfo info) { + return new ArrayList<>(); // Not present in ODF ??? + } + + @Override + protected List> prepareContributors(final Document doc, final DataInfo info) { + return prepareListFields(doc, "//dc:contributorName", info); + } + + @Override + protected List> prepareFormats(final Document doc, final DataInfo info) { + return prepareListFields(doc, "//dc:format", info); + } + + @Override + protected Field preparePublisher(final Document doc, final DataInfo info) { + return prepareField(doc, "//dc:publisher", info); + } + + @Override + protected List> prepareDescriptions(final Document doc, final DataInfo info) { + return prepareListFields(doc, "//dc:description[@descriptionType='Abstract']", info); + } + + @Override + protected List prepareSubjects(final Document doc, final DataInfo info) { + return prepareListStructProps(doc, "//dc:subject", info); + } + + @Override + protected Qualifier prepareLanguages(final Document doc) { + return prepareQualifier(doc, "//dc:language", "dnet:languages", "dnet:languages"); + } + + @Override + protected List> prepareOtherResearchProductTools(final Document doc, final DataInfo info) { + return new ArrayList<>(); // Not present in ODF ??? + } + + @Override + protected List> prepareOtherResearchProductContactGroups(final Document doc, final DataInfo info) { + return prepareListFields(doc, "//dc:contributor[@contributorType='ContactGroup']/dc:contributorName", info); + } + + @Override + protected List> prepareOtherResearchProductContactPersons(final Document doc, final DataInfo info) { + return prepareListFields(doc, "//dc:contributor[@contributorType='ContactPerson']/dc:contributorName", info); + } + + @Override + protected Qualifier prepareSoftwareProgrammingLanguage(final Document doc, final DataInfo info) { + return prepareQualifier(doc, "//dc:format", "dnet:programming_languages", "dnet:programming_languages"); + } + + @Override + protected Field prepareSoftwareCodeRepositoryUrl(final Document doc, final DataInfo info) { + return null; // Not present in ODF ??? + } + + @Override + protected List prepareSoftwareLicenses(final Document doc, final DataInfo info) { + return new ArrayList<>(); // Not present in ODF ??? + } + + @Override + protected List> prepareSoftwareDocumentationUrls(final Document doc, final DataInfo info) { + return prepareListFields(doc, "//dc:relatedIdentifier[@relatedIdentifierType='URL' and @relationType='IsDocumentedBy']", info); + } + + // DATASETS + + @Override + protected List prepareDatasetGeoLocations(final Document doc, final DataInfo info) { + final List res = new ArrayList<>(); + + for (final Object o : doc.selectNodes("//dc:geoLocation")) { + final GeoLocation loc = new GeoLocation(); + loc.setBox(((Node) o).valueOf("./dc:geoLocationBox")); + loc.setPlace(((Node) o).valueOf("./dc:geoLocationPlace")); + loc.setPoint(((Node) o).valueOf("./dc:geoLocationPoint")); + res.add(loc); + } + return res; + } + + @Override + protected Field prepareDatasetMetadataVersionNumber(final Document doc, final DataInfo info) { + return null; // Not present in ODF ??? + } + + @Override + protected Field prepareDatasetLastMetadataUpdate(final Document doc, final DataInfo info) { + return prepareField(doc, "//dc:date[@dateType='Updated']", info); + } + + @Override + protected Field prepareDatasetVersion(final Document doc, final DataInfo info) { + return prepareField(doc, "//dc:version", info); + } + + @Override + protected Field prepareDatasetSize(final Document doc, final DataInfo info) { + return prepareField(doc, "//dc:size", info); + } + + @Override + protected Field prepareDatasetDevice(final Document doc, final DataInfo info) { + return null; // Not present in ODF ??? + } + + @Override + protected Field prepareDatasetStorageDate(final Document doc, final DataInfo info) { + return prepareField(doc, "//dc:date[@dateType='Issued']", info); + } + + @Override + protected List addOtherResultRels(final Document doc, final KeyValue collectedFrom, final DataInfo info, final long lastUpdateTimestamp) { + + final String docId = createOpenaireId(50, doc.valueOf("//dri:objIdentifier")); + + final List res = new ArrayList<>(); + + for (final Object o : doc.selectNodes("//*[local-name() = 'resource']//*[local-name()='relatedIdentifier' and ./@relatedIdentifierType='OPENAIRE']")) { + final String otherId = createOpenaireId(50, ((Node) o).getText()); + final String type = ((Node) o).valueOf("@relationType"); + + if (type.equals("IsSupplementTo")) { + res.add(prepareOtherResultRel(collectedFrom, info, lastUpdateTimestamp, docId, otherId, "supplement", "isSupplementTo")); + res.add(prepareOtherResultRel(collectedFrom, info, lastUpdateTimestamp, otherId, docId, "supplement", "isSupplementedBy")); + } else if (type.equals("IsPartOf")) { + res.add(prepareOtherResultRel(collectedFrom, info, lastUpdateTimestamp, docId, otherId, "part", "IsPartOf")); + res.add(prepareOtherResultRel(collectedFrom, info, lastUpdateTimestamp, otherId, docId, "part", "HasParts")); + } else {} + } + return res; + } + + private Relation prepareOtherResultRel(final KeyValue collectedFrom, + final DataInfo info, + final long lastUpdateTimestamp, + final String source, + final String target, + final String subRelType, + final String relClass) { + final Relation r = new Relation(); + r.setRelType("resultResult"); + r.setSubRelType(subRelType); + r.setRelClass(relClass); + r.setSource(source); + r.setTarget(target); + r.setCollectedFrom(Arrays.asList(collectedFrom)); + r.setDataInfo(info); + r.setLastupdatetimestamp(lastUpdateTimestamp); + return r; + } + + @Override + protected Qualifier prepareResourceType(final Document doc, final DataInfo info) { + return prepareQualifier(doc, "//*[local-name() = 'resource']//*[local-name() = 'resourceType']", "dnet:dataCite_resource", "dnet:dataCite_resource"); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/pace/PacePerson.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/pace/PacePerson.java new file mode 100644 index 000000000..927f5641b --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/pace/PacePerson.java @@ -0,0 +1,176 @@ +package eu.dnetlib.dhp.migration.pace; + +import java.nio.charset.Charset; +import java.text.Normalizer; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.text.WordUtils; + +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.hash.Hashing; + +public class PacePerson { + + private static final String UTF8 = "UTF-8"; + private List name = Lists.newArrayList(); + private List surname = Lists.newArrayList(); + private List fullname = Lists.newArrayList(); + private final String original; + + private static Set particles = null; + + public static final String capitalize(final String s) { + return WordUtils.capitalize(s.toLowerCase(), ' ', '-'); + } + + public static final String dotAbbreviations(final String s) { + return s.length() == 1 ? s + "." : s; + } + + public static Set loadFromClasspath(final String classpath) { + final Set h = new HashSet<>(); + try { + for (final String s : IOUtils.readLines(PacePerson.class.getResourceAsStream(classpath))) { + h.add(s); + } + } catch (final Throwable e) { + return new HashSet<>(); + } + return h; + } + + public PacePerson(String s, final boolean aggressive) { + original = s; + s = Normalizer.normalize(s, Normalizer.Form.NFD); + s = s.replaceAll("\\(.+\\)", ""); + s = s.replaceAll("\\[.+\\]", ""); + s = s.replaceAll("\\{.+\\}", ""); + s = s.replaceAll("\\s+-\\s+", "-"); + s = s.replaceAll("[\\p{Punct}&&[^,-]]", " "); + s = s.replaceAll("\\d", " "); + s = s.replaceAll("\\n", " "); + s = s.replaceAll("\\.", " "); + s = s.replaceAll("\\s+", " "); + + if (aggressive) { + s = s.replaceAll("[\\p{InCombiningDiacriticalMarks}&&[^,-]]", ""); + // s = s.replaceAll("[\\W&&[^,-]]", ""); + } + + if (s.contains(",")) { + final String[] arr = s.split(","); + if (arr.length == 1) { + fullname = splitTerms(arr[0]); + } else if (arr.length > 1) { + surname = splitTerms(arr[0]); + name = splitTerms(arr[1]); + fullname.addAll(surname); + fullname.addAll(name); + } + } else { + fullname = splitTerms(s); + + int lastInitialPosition = fullname.size(); + boolean hasSurnameInUpperCase = false; + + for (int i = 0; i < fullname.size(); i++) { + final String term = fullname.get(i); + if (term.length() == 1) { + lastInitialPosition = i; + } else if (term.equals(term.toUpperCase())) { + hasSurnameInUpperCase = true; + } + } + + if (lastInitialPosition < fullname.size() - 1) { // Case: Michele G. Artini + name = fullname.subList(0, lastInitialPosition + 1); + surname = fullname.subList(lastInitialPosition + 1, fullname.size()); + } else if (hasSurnameInUpperCase) { // Case: Michele ARTINI + for (final String term : fullname) { + if (term.length() > 1 && term.equals(term.toUpperCase())) { + surname.add(term); + } else { + name.add(term); + } + } + } + } + } + + private List splitTerms(final String s) { + if (particles == null) { + particles = loadFromClasspath("/eu/dnetlib/dhp/migration/pace/name_particles.txt"); + } + + final List list = Lists.newArrayList(); + for (final String part : Splitter.on(" ").omitEmptyStrings().split(s)) { + if (!particles.contains(part.toLowerCase())) { + list.add(part); + } + } + return list; + } + + public List getName() { + return name; + } + + public String getNameString() { + return Joiner.on(" ").join(getName()); + } + + public List getSurname() { + return surname; + } + + public List getFullname() { + return fullname; + } + + public String getOriginal() { + return original; + } + + public String hash() { + return Hashing.murmur3_128().hashString(getNormalisedFullname(), Charset.forName(UTF8)).toString(); + } + + public String getNormalisedFirstName() { + return Joiner.on(" ").join(getCapitalFirstnames()); + } + + public String getNormalisedSurname() { + return Joiner.on(" ").join(getCapitalSurname()); + } + + public String getSurnameString() { + return Joiner.on(" ").join(getSurname()); + } + + public String getNormalisedFullname() { + return isAccurate() ? getNormalisedSurname() + ", " + getNormalisedFirstName() : Joiner.on(" ").join(fullname); + } + + public List getCapitalFirstnames() { + return Lists.newArrayList(Iterables.transform(getNameWithAbbreviations(), PacePerson::capitalize)); + } + + public List getCapitalSurname() { + return Lists.newArrayList(Iterables.transform(surname, PacePerson::capitalize)); + } + + public List getNameWithAbbreviations() { + return Lists.newArrayList(Iterables.transform(name, PacePerson::dotAbbreviations)); + } + + public boolean isAccurate() { + return name != null && surname != null && !name.isEmpty() && !surname.isEmpty(); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json new file mode 100644 index 000000000..f179ee0f8 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "s", + "paramLongName": "sourcePath", + "paramDescription": "the HDFS source path which contains the sequential file", + "paramRequired": true + }, + { + "paramName": "mt", + "paramLongName": "master", + "paramDescription": "should be local or yarn", + "paramRequired": true + }, + { + "paramName": "g", + "paramLongName": "graphRawPath", + "paramDescription": "the path of the graph Raw in hdfs", + "paramRequired": true + }, + { + "paramName": "e", + "paramLongName": "entity", + "paramDescription": "The entity to extract", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_db_entities_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_db_entities_parameters.json new file mode 100644 index 000000000..5e9f378f5 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_db_entities_parameters.json @@ -0,0 +1,38 @@ +[ + { + "paramName": "p", + "paramLongName": "hdfsPath", + "paramDescription": "the path where storing the sequential file", + "paramRequired": true + }, + { + "paramName": "n", + "paramLongName": "namenode", + "paramDescription": "the Name Node URI", + "paramRequired": true + }, + { + "paramName": "u", + "paramLongName": "hdfsUser", + "paramDescription": "the user wich create the hdfs seq file", + "paramRequired": true + }, + { + "paramName": "dburl", + "paramLongName": "postgresUrl", + "paramDescription": "postgres url, example: jdbc:postgresql://localhost:5432/testdb", + "paramRequired": true + }, + { + "paramName": "dbuser", + "paramLongName": "postgresUser", + "paramDescription": "postgres user", + "paramRequired": false + }, + { + "paramName": "dbpasswd", + "paramLongName": "postgresPassword", + "paramDescription": "postgres password", + "paramRequired": false + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_mongo_mstores_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_mongo_mstores_parameters.json new file mode 100644 index 000000000..5738daa76 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_mongo_mstores_parameters.json @@ -0,0 +1,68 @@ +[ + { + "paramName": "p", + "paramLongName": "hdfsPath", + "paramDescription": "the path where storing the sequential file", + "paramRequired": true + }, + { + "paramName": "n", + "paramLongName": "namenode", + "paramDescription": "the Name Node URI", + "paramRequired": true + }, + { + "paramName": "u", + "paramLongName": "hdfsUser", + "paramDescription": "the user wich create the hdfs seq file", + "paramRequired": true + }, + { + "paramName": "mongourl", + "paramLongName": "mongoBaseUrl", + "paramDescription": "mongoDB url, example: mongodb://[username:password@]host[:port]", + "paramRequired": true + }, + { + "paramName": "db", + "paramLongName": "mongoDb", + "paramDescription": "mongo database", + "paramRequired": true + }, + { + "paramName": "f", + "paramLongName": "mdFormat", + "paramDescription": "metadata format", + "paramRequired": true + }, + { + "paramName": "l", + "paramLongName": "mdLayout", + "paramDescription": "metadata layout", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "mdInterpretation", + "paramDescription": "metadata interpretation", + "paramRequired": true + }, + { + "paramName": "pgurl", + "paramLongName": "postgresUrl", + "paramDescription": "postgres url, example: jdbc:postgresql://localhost:5432/testdb", + "paramRequired": true + }, + { + "paramName": "pguser", + "paramLongName": "postgresUser", + "paramDescription": "postgres user", + "paramRequired": false + }, + { + "paramName": "pgpasswd", + "paramLongName": "postgresPassword", + "paramDescription": "postgres password", + "paramRequired": false + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/config-default.xml new file mode 100644 index 000000000..51e48d8f7 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/config-default.xml @@ -0,0 +1,22 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hdfsUser + dnet + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/workflow.xml new file mode 100644 index 000000000..309a6d90f --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/workflow.xml @@ -0,0 +1,282 @@ + + + + workingPath + the base path to store hdfs file + + + graphRawPath + the graph Raw base path + + + + postgresURL + the postgres URL to access to the database + + + postgresUser + the user postgres + + + postgresPassword + the password postgres + + + mongourl + mongoDB url, example: mongodb://[username:password@]host[:port] + + + mongoDb + mongo database + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.migration.MigrateDbEntitiesApplication + -p${workingPath}/db_entities + -n${nameNode} + -u${hdfsUser} + -dburl${postgresURL} + -dbuser${postgresUser} + -dbpasswd${postgresPassword} + + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication + -p${workingPath}/odf_entities + -n${nameNode} + -u${hdfsUser} + -mongourl${mongourl} + -db${mongoDb} + -fODF + -lstore + -icleaned + -pgurl${postgresURL} + -pguser${postgresUser} + -pgpasswd${postgresPassword} + + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication + -p${workingPath}/oaf_entities + -n${nameNode} + -u${hdfsUser} + -mongourl${mongourl} + -db${mongoDb} + -fOAF + -lstore + -icleaned + -pgurl${postgresURL} + -pguser${postgresUser} + -pgpasswd${postgresPassword} + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + ExtractEntities: publication + eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + --sourcePath${workingPath} + -g${graphRawPath}/publication + -epublication + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + ExtractEntities: dataset + eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + --sourcePath${workingPath} + -g${graphRawPath}/dataset + -edataset + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + ExtractEntities: software + eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + --sourcePath${workingPath} + -g${graphRawPath}/software + -esoftware + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + ExtractEntities: otherresearchproduct + eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + --sourcePath${workingPath} + -g${graphRawPath}/otherresearchproduct + -eotherresearchproduct + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + ExtractEntities: datasource + eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + --sourcePath${workingPath} + -g${graphRawPath}/datasource + -edatasource + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + ExtractEntities: organization + eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + --sourcePath${workingPath} + -g${graphRawPath}/organization + -eorganization + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + ExtractEntities: project + eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + --sourcePath${workingPath} + -g${graphRawPath}/project + -eproject + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + ExtractEntities: relation + eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + --sourcePath${workingPath} + -g${graphRawPath}/relation + -erelation + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/pace/name_particles.txt b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/pace/name_particles.txt new file mode 100644 index 000000000..dae37c9dc --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/pace/name_particles.txt @@ -0,0 +1,7 @@ +van +der +de +dell +sig +mr +mrs diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryDatasourceOrganization.sql b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryDatasourceOrganization.sql new file mode 100644 index 000000000..745f83971 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryDatasourceOrganization.sql @@ -0,0 +1,17 @@ +SELECT + dor.datasource AS datasource, + dor.organization AS organization, + NULL AS startdate, + NULL AS enddate, + false AS inferred, + false AS deletedbyinference, + 0.9 AS trust, + NULL AS inferenceprovenance, + dc.id AS collectedfromid, + dc.officialname AS collectedfromname, + 'providedBy@@@provided by@@@dnet:datasources_organizations_typologies@@@dnet:datasources_organizations_typologies' AS semantics, + d.provenanceaction || '@@@' || d.provenanceaction || '@@@dnet:provenanceActions@@@dnet:provenanceActions' AS provenanceaction + +FROM dsm_datasource_organization dor + LEFT OUTER JOIN dsm_datasources d ON (dor.datasource = d.id) + LEFT OUTER JOIN dsm_datasources dc ON (dc.id = d.collectedfrom) diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryDatasources.sql b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryDatasources.sql new file mode 100644 index 000000000..8c587f34e --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryDatasources.sql @@ -0,0 +1,147 @@ +SELECT + d.id AS datasourceid, + d.id || array_agg(distinct di.pid) AS identities, + d.officialname AS officialname, + d.englishname AS englishname, + d.contactemail AS contactemail, + CASE + WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility):: TEXT) @> ARRAY ['openaire-cris_1.1']) + THEN + 'openaire-cris_1.1@@@OpenAIRE CRIS v1.1@@@dnet:datasourceCompatibilityLevel@@@dnet:datasourceCompatibilityLevel' + WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility):: TEXT) @> ARRAY ['driver', 'openaire2.0']) + THEN + 'driver-openaire2.0@@@OpenAIRE 2.0+ (DRIVER OA, EC funding)@@@dnet:datasourceCompatibilityLevel@@@dnet:datasourceCompatibilityLevel' + WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['driver']) + THEN + 'driver@@@OpenAIRE Basic (DRIVER OA)@@@dnet:datasourceCompatibilityLevel@@@dnet:datasourceCompatibilityLevel' + WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['openaire2.0']) + THEN + 'openaire2.0@@@OpenAIRE 2.0 (EC funding)@@@dnet:datasourceCompatibilityLevel@@@dnet:datasourceCompatibilityLevel' + WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['openaire3.0']) + THEN + 'openaire3.0@@@OpenAIRE 3.0 (OA, funding)@@@dnet:datasourceCompatibilityLevel@@@dnet:datasourceCompatibilityLevel' + WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['openaire2.0_data']) + THEN + 'openaire2.0_data@@@OpenAIRE Data (funded, referenced datasets)@@@dnet:datasourceCompatibilityLevel@@@dnet:datasourceCompatibilityLevel' + WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['native']) + THEN + 'native@@@proprietary@@@dnet:datasourceCompatibilityLevel@@@dnet:datasourceCompatibilityLevel' + WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['hostedBy']) + THEN + 'hostedBy@@@collected from a compatible aggregator@@@dnet:datasourceCompatibilityLevel@@@dnet:datasourceCompatibilityLevel' + WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['notCompatible']) + THEN + 'notCompatible@@@under validation@@@dnet:datasourceCompatibilityLevel@@@dnet:datasourceCompatibilityLevel' + ELSE + 'UNKNOWN@@@not available@@@dnet:datasourceCompatibilityLevel@@@dnet:datasourceCompatibilityLevel' + END AS openairecompatibility, + d.websiteurl AS websiteurl, + d.logourl AS logourl, + array_agg(DISTINCT CASE WHEN a.protocol = 'oai' and last_aggregation_date is not null THEN a.baseurl ELSE NULL END) AS accessinfopackage, + d.latitude AS latitude, + d.longitude AS longitude, + d.namespaceprefix AS namespaceprefix, + NULL AS odnumberofitems, + NULL AS odnumberofitemsdate, + + (SELECT array_agg(s|| '###keywords@@@keywords@@@dnet:subject_classification_typologies@@@dnet:subject_classification_typologies') + FROM UNNEST( + ARRAY( + SELECT trim(s) + FROM unnest(string_to_array(d.subjects, '@@')) AS s)) AS s) AS subjects, + + d.description AS description, + NULL AS odpolicies, + ARRAY(SELECT trim(s) + FROM unnest(string_to_array(d.languages, ',')) AS s) AS odlanguages, + ARRAY(SELECT trim(s) + FROM unnest(string_to_array(d.od_contenttypes, '-')) AS s) AS odcontenttypes, + false AS inferred, + false AS deletedbyinference, + 0.9 AS trust, + NULL AS inferenceprovenance, + d.dateofcollection AS dateofcollection, + d.dateofvalidation AS dateofvalidation, + -- re3data fields + d.releasestartdate AS releasestartdate, + d.releaseenddate AS releaseenddate, + d.missionstatementurl AS missionstatementurl, + d.dataprovider AS dataprovider, + d.serviceprovider AS serviceprovider, + d.databaseaccesstype AS databaseaccesstype, + d.datauploadtype AS datauploadtype, + d.databaseaccessrestriction AS databaseaccessrestriction, + d.datauploadrestriction AS datauploadrestriction, + d.versioning AS versioning, + d.citationguidelineurl AS citationguidelineurl, + d.qualitymanagementkind AS qualitymanagementkind, + d.pidsystems AS pidsystems, + d.certificates AS certificates, + ARRAY[]::text[] AS policies, + dc.id AS collectedfromid, + dc.officialname AS collectedfromname, + d.typology || '@@@' || CASE + WHEN (d.typology = 'crissystem') THEN 'CRIS System' + WHEN (d.typology = 'datarepository::unknown') THEN 'Data Repository' + WHEN (d.typology = 'aggregator::datarepository') THEN 'Data Repository Aggregator' + WHEN (d.typology = 'infospace') THEN 'Information Space' + WHEN (d.typology = 'pubsrepository::institutional') THEN 'Institutional Repository' + WHEN (d.typology = 'aggregator::pubsrepository::institutional') THEN 'Institutional Repository Aggregator' + WHEN (d.typology = 'pubsrepository::journal') THEN 'Journal' + WHEN (d.typology = 'aggregator::pubsrepository::journals') THEN 'Journal Aggregator/Publisher' + WHEN (d.typology = 'pubsrepository::mock') THEN 'Other' + WHEN (d.typology = 'pubscatalogue::unknown') THEN 'Publication Catalogue' + WHEN (d.typology = 'pubsrepository::unknown') THEN 'Publication Repository' + WHEN (d.typology = 'aggregator::pubsrepository::unknown') THEN 'Publication Repository Aggregator' + WHEN (d.typology = 'entityregistry') THEN 'Registry' + WHEN (d.typology = 'scholarcomminfra') THEN 'Scholarly Comm. Infrastructure' + WHEN (d.typology = 'pubsrepository::thematic') THEN 'Thematic Repository' + WHEN (d.typology = 'websource') THEN 'Web Source' + WHEN (d.typology = 'entityregistry::projects') THEN 'Funder database' + WHEN (d.typology = 'entityregistry::repositories') THEN 'Registry of repositories' + WHEN (d.typology = 'softwarerepository') THEN 'Software Repository' + WHEN (d.typology = 'aggregator::softwarerepository') THEN 'Software Repository Aggregator' + WHEN (d.typology = 'orprepository') THEN 'Repository' + ELSE 'Other' + END || '@@@dnet:datasource_typologies@@@dnet:datasource_typologies' AS datasourcetype, + 'sysimport:crosswalk:entityregistry@@@sysimport:crosswalk:entityregistry@@@dnet:provenance_actions@@@dnet:provenance_actions' AS provenanceaction, + CONCAT(d.issn, '@@@', d.eissn, '@@@', d.lissn) AS journal + +FROM dsm_datasources d + +LEFT OUTER JOIN dsm_datasources dc on (d.collectedfrom = dc.id) +LEFT OUTER JOIN dsm_api a ON (d.id = a.datasource) +LEFT OUTER JOIN dsm_datasourcepids di ON (d.id = di.datasource) + +GROUP BY + d.id, + d.officialname, + d.englishname, + d.websiteurl, + d.logourl, + d.contactemail, + d.namespaceprefix, + d.description, + d.latitude, + d.longitude, + d.dateofcollection, + d.dateofvalidation, + d.releasestartdate, + d.releaseenddate, + d.missionstatementurl, + d.dataprovider, + d.serviceprovider, + d.databaseaccesstype, + d.datauploadtype, + d.databaseaccessrestriction, + d.datauploadrestriction, + d.versioning, + d.citationguidelineurl, + d.qualitymanagementkind, + d.pidsystems, + d.certificates, + dc.id, + dc.officialname, + d.issn, + d.eissn, + d.lissn diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryOrganizations.sql b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryOrganizations.sql new file mode 100644 index 000000000..682ca3596 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryOrganizations.sql @@ -0,0 +1,36 @@ +SELECT + o.id AS organizationid, + o.legalshortname AS legalshortname, + o.legalname AS legalname, + o.websiteurl AS websiteurl, + o.logourl AS logourl, + o.ec_legalbody AS eclegalbody, + o.ec_legalperson AS eclegalperson, + o.ec_nonprofit AS ecnonprofit, + o.ec_researchorganization AS ecresearchorganization, + o.ec_highereducation AS echighereducation, + o.ec_internationalorganizationeurinterests AS ecinternationalorganizationeurinterests, + o.ec_internationalorganization AS ecinternationalorganization, + o.ec_enterprise AS ecenterprise, + o.ec_smevalidated AS ecsmevalidated, + o.ec_nutscode AS ecnutscode, + o.dateofcollection AS dateofcollection, + o.lastupdate AS dateoftransformation, + false AS inferred, + false AS deletedbyinference, + o.trust AS trust, + '' AS inferenceprovenance, + d.id AS collectedfromid, + d.officialname AS collectedfromname, + + o.country || '@@@dnet:countries' AS country, + 'sysimport:crosswalk:entityregistry@@@sysimport:crosswalk:entityregistry@@@dnet:provenance_actions@@@dnet:provenance_actions' AS provenanceaction, + + ARRAY[]::text[] AS pid +FROM dsm_organizations o + LEFT OUTER JOIN dsm_datasources d ON (d.id = o.collectedfrom) + + + + + diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryOrganizationsFromOpenOrgsDB.sql b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryOrganizationsFromOpenOrgsDB.sql new file mode 100644 index 000000000..dc9550883 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryOrganizationsFromOpenOrgsDB.sql @@ -0,0 +1,53 @@ +SELECT + o.id AS organizationid, + coalesce((array_agg(a.acronym))[1], o.name) AS legalshortname, + o.name AS legalname, + array_agg(DISTINCT n.name) AS "alternativeNames", + (array_agg(u.url))[1] AS websiteurl, + o.modification_date AS dateoftransformation, + false AS inferred, + false AS deletedbyinference, + 0.95 AS trust, + '' AS inferenceprovenance, + 'openaire____::openorgs' AS collectedfromid, + 'OpenOrgs Database' AS collectedfromname, + o.country || '@@@dnet:countries' AS country, + 'sysimport:crosswalk:entityregistry@@@sysimport:crosswalk:entityregistry@@@dnet:provenance_actions@@@dnet:provenance_actions' AS provenanceaction, + array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid +FROM organizations o + LEFT OUTER JOIN acronyms a ON (a.id = o.id) + LEFT OUTER JOIN urls u ON (u.id = o.id) + LEFT OUTER JOIN other_ids i ON (i.id = o.id) + LEFT OUTER JOIN other_names n ON (n.id = o.id) +GROUP BY + o.id, + o.name, + o.modification_date, + o.country + +UNION ALL + +SELECT + 'openorgsmesh'||substring(o.id, 13)||'-'||md5(n.name) AS organizationid, + n.name AS legalshortname, + n.name AS legalname, + ARRAY[]::text[] AS "alternativeNames", + (array_agg(u.url))[1] AS websiteurl, + o.modification_date AS dateoftransformation, + false AS inferred, + false AS deletedbyinference, + 0.88 AS trust, + '' AS inferenceprovenance, + 'openaire____::openorgs' AS collectedfromid, + 'OpenOrgs Database' AS collectedfromname, + o.country || '@@@dnet:countries' AS country, + 'sysimport:crosswalk:entityregistry@@@sysimport:crosswalk:entityregistry@@@dnet:provenance_actions@@@dnet:provenance_actions' AS provenanceaction, + array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid +FROM other_names n + LEFT OUTER JOIN organizations o ON (n.id = o.id) + LEFT OUTER JOIN urls u ON (u.id = o.id) + LEFT OUTER JOIN other_ids i ON (i.id = o.id) +GROUP BY + o.id, o.modification_date, o.country, n.name + + diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryProjectOrganization.sql b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryProjectOrganization.sql new file mode 100644 index 000000000..4c06ca5b9 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryProjectOrganization.sql @@ -0,0 +1,19 @@ +SELECT + po.project AS project, + po.resporganization AS resporganization, + po.participantnumber AS participantnumber, + po.contribution AS contribution, + NULL AS startdate, + NULL AS enddate, + false AS inferred, + false AS deletedbyinference, + po.trust AS trust, + NULL AS inferenceprovenance, + dc.id AS collectedfromid, + dc.officialname AS collectedfromname, + po.semanticclass || '@@@' || po.semanticclass || '@@@dnet:project_organization_relations@@@dnet:project_organization_relations' AS semantics, + 'sysimport:crosswalk:entityregistry@@@sysimport:crosswalk:entityregistry@@@dnet:provenance_actions@@@dnet:provenance_actions' AS provenanceaction + +FROM project_organization po + LEFT OUTER JOIN projects p ON (p.id = po.project) + LEFT OUTER JOIN dsm_datasources dc ON (dc.id = p.collectedfrom) diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryProjects.sql b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryProjects.sql new file mode 100644 index 000000000..6cff18875 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryProjects.sql @@ -0,0 +1,90 @@ +SELECT + p.id AS projectid, + p.code AS code, + p.websiteurl AS websiteurl, + p.acronym AS acronym, + p.title AS title, + p.startdate AS startdate, + p.enddate AS enddate, + p.call_identifier AS callidentifier, + p.keywords AS keywords, + p.duration AS duration, + p.ec_sc39 AS ecsc39, + p.oa_mandate_for_publications AS oamandatepublications, + p.ec_article29_3 AS ecarticle29_3, + p.dateofcollection AS dateofcollection, + p.lastupdate AS dateoftransformation, + p.inferred AS inferred, + p.deletedbyinference AS deletedbyinference, + p.trust AS trust, + p.inferenceprovenance AS inferenceprovenance, + p.optional1 AS optional1, + p.optional2 AS optional2, + p.jsonextrainfo AS jsonextrainfo, + p.contactfullname AS contactfullname, + p.contactfax AS contactfax, + p.contactphone AS contactphone, + p.contactemail AS contactemail, + p.summary AS summary, + p.currency AS currency, + p.totalcost AS totalcost, + p.fundedamount AS fundedamount, + dc.id AS collectedfromid, + dc.officialname AS collectedfromname, + ctc.code || '@@@' || ctc.name || '@@@' || cts.code || '@@@' || cts.name AS contracttype, + pac.code || '@@@' || pac.name || '@@@' || pas.code || '@@@' || pas.name AS provenanceaction, + array_agg(DISTINCT i.pid || '###' || i.issuertype) AS pid, + array_agg(DISTINCT s.name || '###' || sc.code || '@@@' || sc.name || '@@@' || ss.code || '@@@' || ss.name) AS subjects, + array_agg(DISTINCT fp.path) AS fundingtree + FROM projects p + LEFT OUTER JOIN class pac ON (pac.code = p.provenanceactionclass) + LEFT OUTER JOIN scheme pas ON (pas.code = p.provenanceactionscheme) + + LEFT OUTER JOIN projectpids pp ON (pp.project = p.id) + LEFT OUTER JOIN dsm_identities i ON (i.pid = pp.pid) + + LEFT OUTER JOIN dsm_datasources dc ON (dc.id = p.collectedfrom) + + LEFT OUTER JOIN project_fundingpath pf ON (pf.project = p.id) + LEFT OUTER JOIN fundingpaths fp ON (fp.id = pf.funding) + + LEFT OUTER JOIN project_subject ps ON (ps.project = p.id) + LEFT OUTER JOIN subjects s ON (s.id = ps.subject) + + LEFT OUTER JOIN class sc ON (sc.code = s.semanticclass) + LEFT OUTER JOIN scheme ss ON (ss.code = s.semanticscheme) + + LEFT OUTER JOIN class ctc ON (ctc.code = p.contracttypeclass) + LEFT OUTER JOIN scheme cts ON (cts.code = p.contracttypescheme) + + GROUP BY + p.id, + p.code, + p.websiteurl, + p.acronym, + p.title, + p.startdate, + p.enddate, + p.call_identifier, + p.keywords, + p.duration, + p.ec_sc39, + p.oa_mandate_for_publications, + p.ec_article29_3, + p.dateofcollection, + p.inferred, + p.deletedbyinference, + p.trust, + p.inferenceprovenance, + p.contactfullname, + p.contactfax, + p.contactphone, + p.contactemail, + p.summary, + p.currency, + p.totalcost, + p.fundedamount, + dc.id, + dc.officialname, + pac.code, pac.name, pas.code, pas.name, + ctc.code, ctc.name, cts.code, cts.name; \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/querySimilarityFromOpenOrgsDB.sql b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/querySimilarityFromOpenOrgsDB.sql new file mode 100644 index 000000000..4407559c6 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/querySimilarityFromOpenOrgsDB.sql @@ -0,0 +1,17 @@ +SELECT local_id AS id1, oa_original_id AS id2 FROM openaire_simrels WHERE reltype = 'is_similar' + +UNION ALL + +SELECT + o.id AS id1, + 'openorgsmesh'||substring(o.id, 13)||'-'||md5(a.acronym) AS id2 +FROM acronyms a + LEFT OUTER JOIN organizations o ON (a.id = o.id) + +UNION ALL + +SELECT + o.id AS id1, + 'openorgsmesh'||substring(o.id, 13)||'-'||md5(n.name) AS id2 +FROM other_names n + LEFT OUTER JOIN organizations o ON (n.id = o.id) diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/log4j.properties b/dhp-workflows/dhp-aggregation/src/main/resources/log4j.properties new file mode 100644 index 000000000..63cba917e --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/log4j.properties @@ -0,0 +1,9 @@ +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=INFO, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n diff --git a/dhp-workflows/dhp-dedup/pom.xml b/dhp-workflows/dhp-dedup/pom.xml index 6aef8f313..0721af25d 100644 --- a/dhp-workflows/dhp-dedup/pom.xml +++ b/dhp-workflows/dhp-dedup/pom.xml @@ -8,6 +8,37 @@ 4.0.0 dhp-dedup + + + + + net.alchim31.maven + scala-maven-plugin + 4.0.1 + + + scala-compile-first + initialize + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + ${scala.version} + + + + + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java index ab19ff2b5..7c0967b2e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java @@ -1,23 +1,31 @@ package eu.dnetlib.dhp.graph; -import com.google.common.collect.Maps; -import eu.dnetlib.dhp.schema.oaf.*; - import java.util.Map; +import com.google.common.collect.Maps; + +import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.Datasource; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; +import eu.dnetlib.dhp.schema.oaf.Project; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.Software; + public class GraphMappingUtils { - public final static Map types = Maps.newHashMap(); + public final static Map types = Maps.newHashMap(); - static { - types.put("datasource", Datasource.class); - types.put("organization", Organization.class); - types.put("project", Project.class); - types.put("dataset", Dataset.class); - types.put("otherresearchproduct", OtherResearchProduct.class); - types.put("software", Software.class); - types.put("publication", Publication.class); - types.put("relation", Relation.class); - } + static { + types.put("datasource", Datasource.class); + types.put("organization", Organization.class); + types.put("project", Project.class); + types.put("dataset", Dataset.class); + types.put("otherresearchproduct", OtherResearchProduct.class); + types.put("software", Software.class); + types.put("publication", Publication.class); + types.put("relation", Relation.class); + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java index 5401b71c1..c5223c1f6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java @@ -4,12 +4,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; -import scala.Tuple2; public class SparkGraphImporterJob { diff --git a/dhp-workflows/docs/oozie-installer.markdown b/dhp-workflows/docs/oozie-installer.markdown index b1953a54e..d2de80dcc 100644 --- a/dhp-workflows/docs/oozie-installer.markdown +++ b/dhp-workflows/docs/oozie-installer.markdown @@ -10,9 +10,8 @@ This module is automatically executed when running: on module having set: - eu.dnetlib - dhp-wf - 1.0.0-SNAPSHOT + eu.dnetlib.dhp + dhp-workflows in `pom.xml` file. `oozie-package` profile initializes oozie workflow packaging, `workflow.source.dir` property points to a workflow (notice: this is not a relative path but a classpath to directory usually holding `oozie_app` subdirectory). diff --git a/pom.xml b/pom.xml index 8b01741d6..7f5e1e3dc 100644 --- a/pom.xml +++ b/pom.xml @@ -1,156 +1,158 @@ - + - 4.0.0 - eu.dnetlib.dhp - dhp - 1.1.6-SNAPSHOT - pom + 4.0.0 + eu.dnetlib.dhp + dhp + 1.1.6-SNAPSHOT + pom - http://www.d-net.research-infrastructures.eu + http://www.d-net.research-infrastructures.eu - - - The Apache Software License, Version 2.0 - http://www.apache.org/licenses/LICENSE-2.0.txt - repo - A business-friendly OSS license - - + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + A business-friendly OSS license + + - - dhp-build - dhp-schemas - dhp-common - dhp-workflows - + + dhp-build + dhp-schemas + dhp-common + dhp-workflows + - - Redmine - https://issue.openaire.research-infrastructures.eu/projects/openaire - + + Redmine + https://issue.openaire.research-infrastructures.eu/projects/openaire + - - jenkins - https://jenkins-dnet.d4science.org/ - + + jenkins + https://jenkins-dnet.d4science.org/ + - - scm:git:gitea@code-repo.d4science.org:D-Net/dnet-hadoop.git - scm:git:gitea@code-repo.d4science.org:D-Net/dnet-hadoop.git - https://code-repo.d4science.org/D-Net/dnet-hadoop/ - HEAD - + + scm:git:gitea@code-repo.d4science.org:D-Net/dnet-hadoop.git + scm:git:gitea@code-repo.d4science.org:D-Net/dnet-hadoop.git + https://code-repo.d4science.org/D-Net/dnet-hadoop/ + HEAD + - - + + - - - dnet45-releases - D-Net 45 releases - http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases - default - - false - - - true - - - - cloudera - Cloudera Repository - https://repository.cloudera.com/artifactory/cloudera-repos - - true - - - false - - - + + + dnet45-releases + D-Net 45 releases + http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases + default + + false + + + true + + + + cloudera + Cloudera Repository + https://repository.cloudera.com/artifactory/cloudera-repos + + true + + + false + + + - - - junit - junit - 4.12 - test - + + + junit + junit + 4.12 + test + - - org.mockito - mockito-core - 2.7.22 - test - + + org.mockito + mockito-core + 2.7.22 + test + - + - - - - org.apache.hadoop - hadoop-hdfs - ${dhp.hadoop.version} - provided - - - org.apache.hadoop - hadoop-common + + + + org.apache.hadoop + hadoop-hdfs + ${dhp.hadoop.version} + provided + + + org.apache.hadoop + hadoop-common ${dhp.hadoop.version} provided org.apache.hadoop hadoop-client - ${dhp.hadoop.version} - provided - - - org.apache.spark - spark-core_2.11 - ${dhp.spark.version} - provided - - - org.apache.spark - spark-sql_2.11 - ${dhp.spark.version} - provided - - - org.apache.spark - spark-graphx_2.11 - ${dhp.spark.version} - provided - + ${dhp.hadoop.version} + provided + + + org.apache.spark + spark-core_2.11 + ${dhp.spark.version} + provided + + + org.apache.spark + spark-sql_2.11 + ${dhp.spark.version} + provided + + + org.apache.spark + spark-graphx_2.11 + ${dhp.spark.version} + provided + - - org.apache.commons - commons-lang3 - ${dhp.commons.lang.version} - + + org.apache.commons + commons-lang3 + ${dhp.commons.lang.version} + - - commons-codec - commons-codec - 1.9 - + + commons-codec + commons-codec + 1.9 + - - commons-io - commons-io - 2.4 - + + commons-io + commons-io + 2.4 + - - commons-cli - commons-cli - 1.2 - provided - + + commons-cli + commons-cli + 1.2 + provided + net.sf.saxon @@ -158,25 +160,25 @@ 9.9.1-6 - - dom4j - dom4j - 1.6.1 - + + dom4j + dom4j + 1.6.1 + - - xml-apis - xml-apis - 1.4.01 - + + xml-apis + xml-apis + 1.4.01 + - - jaxen - jaxen - 1.1.6 - + + jaxen + jaxen + 1.1.6 + - + com.mycila.xmltool xmltool 3.3 @@ -227,37 +229,37 @@ - net.schmizz - sshj - 0.10.0 - test - + net.schmizz + sshj + 0.10.0 + test + - - com.fasterxml.jackson.core - jackson-core - ${dhp.jackson.version} - provided - + + com.fasterxml.jackson.core + jackson-core + ${dhp.jackson.version} + provided + - - com.fasterxml.jackson.core - jackson-annotations - ${dhp.jackson.version} - provided - - - com.fasterxml.jackson.core - jackson-databind - ${dhp.jackson.version} - provided - + + com.fasterxml.jackson.core + jackson-annotations + ${dhp.jackson.version} + provided + + + com.fasterxml.jackson.core + jackson-databind + ${dhp.jackson.version} + provided + - - eu.dnetlib - dnet-pace-core - 4.0.0 - + + eu.dnetlib + dnet-pace-core + 4.0.0 + eu.dnetlib cnr-rmi-api @@ -269,12 +271,12 @@ cxf-rt-transports-http 3.1.5 - - javax.persistence - javax.persistence-api - 2.2 - provided - + + javax.persistence + javax.persistence-api + 2.2 + provided + com.rabbitmq @@ -296,201 +298,183 @@ secondstring 1.0.0 + + org.mongodb + mongo-java-driver + ${mongodb.driver.version} + org.antlr stringtemplate 4.0 - - org.apache.oozie - oozie-client - ${dhp.oozie.version} - provided - - - - slf4j-simple - org.slf4j - - - - - + + org.apache.oozie + oozie-client + ${dhp.oozie.version} + provided + + + + slf4j-simple + org.slf4j + + + + + - - target - target/classes - ${project.artifactId}-${project.version} - target/test-classes - - - - org.apache.maven.plugins - maven-compiler-plugin - ${maven.compiler.plugin.version} - - 1.8 - 1.8 - ${project.build.sourceEncoding} - - + + target + target/classes + ${project.artifactId}-${project.version} + target/test-classes + + + + org.apache.maven.plugins + maven-compiler-plugin + ${maven.compiler.plugin.version} + + 1.8 + 1.8 + ${project.build.sourceEncoding} + + - - org.apache.maven.plugins - maven-jar-plugin - 3.0.2 - + + org.apache.maven.plugins + maven-jar-plugin + 3.0.2 + - - org.apache.maven.plugins - maven-source-plugin - 3.0.1 - - - attach-sources - verify - - jar-no-fork - - - - + + org.apache.maven.plugins + maven-source-plugin + 3.0.1 + + + attach-sources + verify + + jar-no-fork + + + + - - org.apache.maven.plugins - maven-surefire-plugin - 2.19.1 - - true - - - - org.apache.maven.plugins - maven-javadoc-plugin - 2.10.4 - - true - - - - org.apache.maven.plugins - maven-dependency-plugin - 3.0.0 - + + org.apache.maven.plugins + maven-surefire-plugin + 2.19.1 + + true + + + + org.apache.maven.plugins + maven-javadoc-plugin + 2.10.4 + + true + + + + org.apache.maven.plugins + maven-dependency-plugin + 3.0.0 + - - org.codehaus.mojo - build-helper-maven-plugin - 1.12 - - - - - - org.apache.maven.plugins - maven-release-plugin - 2.5.3 - - - org.jacoco - jacoco-maven-plugin - 0.7.9 - - - **/schemas/* - **/com/cloudera/**/* - **/org/apache/avro/io/**/* - - - - - default-prepare-agent - - prepare-agent - - - - default-report - prepare-package - - report - - - - - - net.alchim31.maven - scala-maven-plugin - 4.0.1 - - - scala-compile-first - initialize - - add-source - compile - - - - scala-test-compile - process-test-resources - - testCompile - - - - - ${scala.version} - - - + + org.codehaus.mojo + build-helper-maven-plugin + 1.12 + + + + + + org.apache.maven.plugins + maven-release-plugin + 2.5.3 + + + org.jacoco + jacoco-maven-plugin + 0.7.9 + + + **/schemas/* + **/com/cloudera/**/* + **/org/apache/avro/io/**/* + + + + + default-prepare-agent + + prepare-agent + + + + default-report + prepare-package + + report + + + + - - - org.apache.maven.wagon - wagon-ssh - 2.10 - - - - - - dnet45-snapshots - DNet45 Snapshots - http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-snapshots - default - - - dnet45-releases - http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases - - - - - - org.apache.maven.plugins - maven-javadoc-plugin - 2.10.4 - - true - - - - + - - UTF-8 - UTF-8 - 3.6.0 - 2.22.2 - cdh5.9.2 - 2.6.0-${dhp.cdh.version} - 4.1.0-${dhp.cdh.version} - 2.4.0.cloudera2 - 2.9.6 - 3.5 - 2.11.12 - + + + org.apache.maven.wagon + wagon-ssh + 2.10 + + + + + + dnet45-snapshots + DNet45 Snapshots + http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-snapshots + default + + + dnet45-releases + http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 2.10.4 + + true + + + + + + + UTF-8 + UTF-8 + 3.6.0 + 2.22.2 + cdh5.9.2 + 2.6.0-${dhp.cdh.version} + 4.1.0-${dhp.cdh.version} + 2.4.0.cloudera2 + 2.9.6 + 3.5 + 2.11.12 + 3.4.2 +