diff --git a/dhp-build/dhp-build-properties-maven-plugin/pom.xml b/dhp-build/dhp-build-properties-maven-plugin/pom.xml
index 9a2d0ffa0..308d78715 100644
--- a/dhp-build/dhp-build-properties-maven-plugin/pom.xml
+++ b/dhp-build/dhp-build-properties-maven-plugin/pom.xml
@@ -76,6 +76,41 @@
+
+
+
+
+ org.eclipse.m2e
+ lifecycle-mapping
+ 1.0.0
+
+
+
+
+
+
+ org.apache.maven.plugins
+
+
+ maven-plugin-plugin
+
+
+ [3.2,)
+
+
+ descriptor
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/dhp-workflows/dhp-aggregation/pom.xml b/dhp-workflows/dhp-aggregation/pom.xml
index a1db4ad2e..78831073f 100644
--- a/dhp-workflows/dhp-aggregation/pom.xml
+++ b/dhp-workflows/dhp-aggregation/pom.xml
@@ -25,6 +25,12 @@
dhp-common
${project.version}
+
+
+ eu.dnetlib.dhp
+ dhp-schemas
+ ${project.version}
+
net.sf.saxon
@@ -44,6 +50,17 @@
jaxen
jaxen
+
+
+ org.mongodb
+ mongo-java-driver
+
+
+
+ org.postgresql
+ postgresql
+ 42.2.10
+
org.mockito
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMigrationExecutor.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMigrationExecutor.java
new file mode 100644
index 000000000..e91a53045
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMigrationExecutor.java
@@ -0,0 +1,236 @@
+package eu.dnetlib.dhp.migration;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import eu.dnetlib.dhp.schema.oaf.DataInfo;
+import eu.dnetlib.dhp.schema.oaf.ExtraInfo;
+import eu.dnetlib.dhp.schema.oaf.Field;
+import eu.dnetlib.dhp.schema.oaf.Journal;
+import eu.dnetlib.dhp.schema.oaf.KeyValue;
+import eu.dnetlib.dhp.schema.oaf.OAIProvenance;
+import eu.dnetlib.dhp.schema.oaf.Oaf;
+import eu.dnetlib.dhp.schema.oaf.OriginDescription;
+import eu.dnetlib.dhp.schema.oaf.Qualifier;
+import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
+import eu.dnetlib.dhp.utils.DHPUtils;
+
+public class AbstractMigrationExecutor implements Closeable {
+
+ private final AtomicInteger counter = new AtomicInteger(0);
+
+ private final Text key = new Text();
+
+ private final Text value = new Text();
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ private final SequenceFile.Writer writer;
+
+ private static final Log log = LogFactory.getLog(AbstractMigrationExecutor.class);
+
+ public AbstractMigrationExecutor(final String hdfsPath, final String hdfsNameNode, final String hdfsUser) throws Exception {
+
+ log.info(String.format("Creating SequenceFile Writer, hdfsPath=%s, nameNode=%s, user=%s", hdfsPath, hdfsNameNode, hdfsUser));
+
+ this.writer = SequenceFile.createWriter(getConf(hdfsNameNode, hdfsUser), SequenceFile.Writer.file(new Path(hdfsPath)), SequenceFile.Writer
+ .keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class));
+ }
+
+ private Configuration getConf(final String hdfsNameNode, final String hdfsUser) throws IOException {
+ final Configuration conf = new Configuration();
+ conf.set("fs.defaultFS", hdfsNameNode);
+ conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
+ conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
+ System.setProperty("HADOOP_USER_NAME", hdfsUser);
+ System.setProperty("hadoop.home.dir", "/");
+ FileSystem.get(URI.create(hdfsNameNode), conf);
+ return conf;
+ }
+
+ protected void emitOaf(final Oaf oaf) {
+ try {
+ key.set(counter.getAndIncrement() + ":" + oaf.getClass().getSimpleName().toLowerCase());
+ value.set(objectMapper.writeValueAsString(oaf));
+ writer.append(key, value);
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.hflush();
+ writer.close();
+ }
+
+ public static KeyValue keyValue(final String k, final String v) {
+ final KeyValue kv = new KeyValue();
+ kv.setKey(k);
+ kv.setValue(v);
+ return kv;
+ }
+
+ public static List listKeyValues(final String... s) {
+ if (s.length % 2 > 0) { throw new RuntimeException("Invalid number of parameters (k,v,k,v,....)"); }
+
+ final List list = new ArrayList<>();
+ for (int i = 0; i < s.length; i += 2) {
+ list.add(keyValue(s[i], s[i + 1]));
+ }
+ return list;
+ }
+
+ public static Field field(final T value, final DataInfo info) {
+ if (value == null || StringUtils.isBlank(value.toString())) { return null; }
+
+ final Field field = new Field<>();
+ field.setValue(value);
+ field.setDataInfo(info);
+ return field;
+ }
+
+ public static List> listFields(final DataInfo info, final String... values) {
+ return Arrays.stream(values).map(v -> field(v, info)).filter(Objects::nonNull).collect(Collectors.toList());
+ }
+
+ public static List> listFields(final DataInfo info, final List values) {
+ return values.stream().map(v -> field(v, info)).filter(Objects::nonNull).collect(Collectors.toList());
+ }
+
+ public static Qualifier qualifier(final String classid, final String classname, final String schemeid, final String schemename) {
+ final Qualifier q = new Qualifier();
+ q.setClassid(classid);
+ q.setClassname(classname);
+ q.setSchemeid(schemeid);
+ q.setSchemename(schemename);
+ return q;
+ }
+
+ public static StructuredProperty structuredProperty(final String value,
+ final String classid,
+ final String classname,
+ final String schemeid,
+ final String schemename,
+ final DataInfo dataInfo) {
+
+ return structuredProperty(value, qualifier(classid, classname, schemeid, schemename), dataInfo);
+ }
+
+ public static StructuredProperty structuredProperty(final String value, final Qualifier qualifier, final DataInfo dataInfo) {
+ if (value == null) { return null; }
+ final StructuredProperty sp = new StructuredProperty();
+ sp.setValue(value);
+ sp.setQualifier(qualifier);
+ sp.setDataInfo(dataInfo);
+ return sp;
+ }
+
+ public static ExtraInfo extraInfo(final String name, final String value, final String typology, final String provenance, final String trust) {
+ final ExtraInfo info = new ExtraInfo();
+ info.setName(name);
+ info.setValue(value);
+ info.setTypology(typology);
+ info.setProvenance(provenance);
+ info.setTrust(trust);
+ return info;
+ }
+
+ public static OAIProvenance oaiIProvenance(final String identifier,
+ final String baseURL,
+ final String metadataNamespace,
+ final Boolean altered,
+ final String datestamp,
+ final String harvestDate) {
+
+ final OriginDescription desc = new OriginDescription();
+ desc.setIdentifier(identifier);
+ desc.setBaseURL(baseURL);
+ desc.setMetadataNamespace(metadataNamespace);
+ desc.setAltered(altered);
+ desc.setDatestamp(datestamp);
+ desc.setHarvestDate(harvestDate);
+
+ final OAIProvenance p = new OAIProvenance();
+ p.setOriginDescription(desc);
+
+ return p;
+ }
+
+ public static Journal journal(final String name,
+ final String issnPrinted,
+ final String issnOnline,
+ final String issnLinking,
+ final String ep,
+ final String iss,
+ final String sp,
+ final String vol,
+ final String edition,
+ final String conferenceplace,
+ final String conferencedate,
+ final DataInfo dataInfo) {
+
+ if (StringUtils.isNotBlank(name) || StringUtils.isNotBlank(issnPrinted) || StringUtils.isNotBlank(issnOnline) || StringUtils.isNotBlank(issnLinking)) {
+ final Journal j = new Journal();
+ j.setName(name);
+ j.setIssnPrinted(issnPrinted);
+ j.setIssnOnline(issnOnline);
+ j.setIssnLinking(issnLinking);
+ j.setEp(ep);
+ j.setIss(iss);
+ j.setSp(sp);
+ j.setVol(vol);
+ j.setEdition(edition);
+ j.setConferenceplace(conferenceplace);
+ j.setConferencedate(conferencedate);
+ j.setDataInfo(dataInfo);
+ return j;
+ } else {
+ return null;
+ }
+ }
+
+ public static DataInfo dataInfo(final Boolean deletedbyinference,
+ final String inferenceprovenance,
+ final Boolean inferred,
+ final Boolean invisible,
+ final Qualifier provenanceaction,
+ final String trust) {
+ final DataInfo d = new DataInfo();
+ d.setDeletedbyinference(deletedbyinference);
+ d.setInferenceprovenance(inferenceprovenance);
+ d.setInferred(inferred);
+ d.setInvisible(invisible);
+ d.setProvenanceaction(provenanceaction);
+ d.setTrust(trust);
+ return d;
+ }
+
+ public static String createOpenaireId(final int prefix, final String originalId) {
+ final String nsPrefix = StringUtils.substringBefore(originalId, "::");
+ final String rest = StringUtils.substringAfter(originalId, "::");
+ return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5(rest));
+
+ }
+
+ public static String asString(final Object o) {
+ return o == null ? "" : o.toString();
+ }
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMongoExecutor.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMongoExecutor.java
new file mode 100644
index 000000000..00d1aa60d
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMongoExecutor.java
@@ -0,0 +1,439 @@
+package eu.dnetlib.dhp.migration;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.dom4j.Document;
+import org.dom4j.DocumentException;
+import org.dom4j.DocumentFactory;
+import org.dom4j.DocumentHelper;
+import org.dom4j.Node;
+
+import eu.dnetlib.dhp.schema.oaf.Author;
+import eu.dnetlib.dhp.schema.oaf.DataInfo;
+import eu.dnetlib.dhp.schema.oaf.Dataset;
+import eu.dnetlib.dhp.schema.oaf.Field;
+import eu.dnetlib.dhp.schema.oaf.GeoLocation;
+import eu.dnetlib.dhp.schema.oaf.Instance;
+import eu.dnetlib.dhp.schema.oaf.Journal;
+import eu.dnetlib.dhp.schema.oaf.KeyValue;
+import eu.dnetlib.dhp.schema.oaf.OAIProvenance;
+import eu.dnetlib.dhp.schema.oaf.Oaf;
+import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
+import eu.dnetlib.dhp.schema.oaf.Publication;
+import eu.dnetlib.dhp.schema.oaf.Qualifier;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+import eu.dnetlib.dhp.schema.oaf.Result;
+import eu.dnetlib.dhp.schema.oaf.Software;
+import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
+
+public abstract class AbstractMongoExecutor extends AbstractMigrationExecutor {
+
+ protected final Map code2name = new HashMap<>();
+
+ protected final MdstoreClient mdstoreClient;
+
+ protected static final Qualifier MAIN_TITLE_QUALIFIER = qualifier("main title", "main title", "dnet:dataCite_title", "dnet:dataCite_title");
+
+ protected static final Qualifier PUBLICATION_RESULTTYPE_QUALIFIER =
+ qualifier("publication", "publication", "dnet:result_typologies", "dnet:result_typologies");
+ protected static final Qualifier DATASET_RESULTTYPE_QUALIFIER = qualifier("dataset", "dataset", "dnet:result_typologies", "dnet:result_typologies");
+ protected static final Qualifier SOFTWARE_RESULTTYPE_QUALIFIER = qualifier("software", "software", "dnet:result_typologies", "dnet:result_typologies");
+ protected static final Qualifier OTHER_RESULTTYPE_QUALIFIER = qualifier("other", "other", "dnet:result_typologies", "dnet:result_typologies");
+
+ private static final Log log = LogFactory.getLog(AbstractMongoExecutor.class);
+
+ public AbstractMongoExecutor(final String hdfsPath, final String hdfsNameNode, final String hdfsUser, final String mongoBaseUrl,
+ final String mongoDb, final String dbUrl, final String dbUser,
+ final String dbPassword) throws Exception {
+
+ super(hdfsPath, hdfsNameNode, hdfsUser);
+
+ this.mdstoreClient = new MdstoreClient(mongoBaseUrl, mongoDb);
+ loadClassNames(dbUrl, dbUser, dbPassword);
+
+ final Map nsContext = new HashMap<>();
+
+ registerNamespaces(nsContext);
+
+ DocumentFactory.getInstance().setXPathNamespaceURIs(nsContext);
+ }
+
+ private void loadClassNames(final String dbUrl, final String dbUser, final String dbPassword) throws IOException {
+
+ log.info("Loading vocabulary terms from db...");
+
+ try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) {
+ code2name.clear();
+ dbClient.processResults("select code, name from class", rs -> {
+ try {
+ code2name.put(rs.getString("code"), rs.getString("name"));
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ }
+ });
+ }
+
+ log.info("Found " + code2name.size() + " terms.");
+
+ }
+
+ public void processMdRecords(final String mdFormat, final String mdLayout, final String mdInterpretation) throws DocumentException {
+
+ log.info(String.format("Searching mdstores (format: %s, layout: %s, interpretation: %s)", mdFormat, mdLayout, mdInterpretation));
+
+ final Map colls = mdstoreClient.validCollections(mdFormat, mdLayout, mdInterpretation);
+ log.info("Found " + colls.size() + " mdstores");
+
+ for (final Entry entry : colls.entrySet()) {
+ log.info("Processing mdstore " + entry.getKey() + " (collection: " + entry.getValue() + ")");
+ final String currentColl = entry.getValue();
+
+ for (final String xml : mdstoreClient.listRecords(currentColl)) {
+ final Document doc = DocumentHelper.parseText(xml);
+
+ final String type = doc.valueOf("//dr:CobjCategory/@type");
+ final KeyValue collectedFrom = keyValue(doc.valueOf("//oaf:collectedFrom/@id"), doc.valueOf("//oaf:collectedFrom/@name"));
+ final KeyValue hostedBy = StringUtils.isBlank(doc.valueOf("//oaf:hostedBy/@id")) ? collectedFrom
+ : keyValue(doc.valueOf("//oaf:hostedBy/@id"), doc.valueOf("//oaf:hostedBy/@name"));
+
+ final DataInfo info = prepareDataInfo(doc);
+ final long lastUpdateTimestamp = new Date().getTime();
+
+ for (final Oaf oaf : createOafs(doc, type, collectedFrom, hostedBy, info, lastUpdateTimestamp)) {
+ emitOaf(oaf);
+ }
+ }
+ }
+ log.info("All Done.");
+ }
+
+ protected void registerNamespaces(final Map nsContext) {
+ nsContext.put("dr", "http://www.driver-repository.eu/namespace/dr");
+ nsContext.put("dri", "http://www.driver-repository.eu/namespace/dri");
+ nsContext.put("oaf", "http://namespace.openaire.eu/oaf");
+ nsContext.put("oai", "http://www.openarchives.org/OAI/2.0/");
+ nsContext.put("prov", "http://www.openarchives.org/OAI/2.0/provenance");
+ }
+
+ protected List createOafs(final Document doc,
+ final String type,
+ final KeyValue collectedFrom,
+ final KeyValue hostedBy,
+ final DataInfo info,
+ final long lastUpdateTimestamp) {
+
+ final List oafs = new ArrayList<>();
+
+ switch (type.toLowerCase()) {
+ case "":
+ case "publication":
+ final Publication p = new Publication();
+ populateResultFields(p, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp);
+ p.setResulttype(PUBLICATION_RESULTTYPE_QUALIFIER);
+ p.setJournal(prepareJournal(doc, info));
+ oafs.add(p);
+ break;
+ case "dataset":
+ final Dataset d = new Dataset();
+ populateResultFields(d, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp);
+ d.setResulttype(DATASET_RESULTTYPE_QUALIFIER);
+ d.setStoragedate(prepareDatasetStorageDate(doc, info));
+ d.setDevice(prepareDatasetDevice(doc, info));
+ d.setSize(prepareDatasetSize(doc, info));
+ d.setVersion(prepareDatasetVersion(doc, info));
+ d.setLastmetadataupdate(prepareDatasetLastMetadataUpdate(doc, info));
+ d.setMetadataversionnumber(prepareDatasetMetadataVersionNumber(doc, info));
+ d.setGeolocation(prepareDatasetGeoLocations(doc, info));
+ oafs.add(d);
+ break;
+ case "software":
+ final Software s = new Software();
+ populateResultFields(s, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp);
+ s.setResulttype(SOFTWARE_RESULTTYPE_QUALIFIER);
+ s.setDocumentationUrl(prepareSoftwareDocumentationUrls(doc, info));
+ s.setLicense(prepareSoftwareLicenses(doc, info));
+ s.setCodeRepositoryUrl(prepareSoftwareCodeRepositoryUrl(doc, info));
+ s.setProgrammingLanguage(prepareSoftwareProgrammingLanguage(doc, info));
+ oafs.add(s);
+ break;
+ case "otherresearchproducts":
+ default:
+ final OtherResearchProduct o = new OtherResearchProduct();
+ populateResultFields(o, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp);
+ o.setResulttype(OTHER_RESULTTYPE_QUALIFIER);
+ o.setContactperson(prepareOtherResearchProductContactPersons(doc, info));
+ o.setContactgroup(prepareOtherResearchProductContactGroups(doc, info));
+ o.setTool(prepareOtherResearchProductTools(doc, info));
+ oafs.add(o);
+ break;
+ }
+
+ if (!oafs.isEmpty()) {
+ oafs.addAll(addProjectRels(doc, collectedFrom, info, lastUpdateTimestamp));
+ oafs.addAll(addOtherResultRels(doc, collectedFrom, info, lastUpdateTimestamp));
+ }
+
+ return oafs;
+ }
+
+ private List addProjectRels(final Document doc,
+ final KeyValue collectedFrom,
+ final DataInfo info,
+ final long lastUpdateTimestamp) {
+
+ final List res = new ArrayList<>();
+
+ final String docId = createOpenaireId(50, doc.valueOf("//dri:objIdentifier"));
+
+ for (final Object o : doc.selectNodes("//oaf:projectid")) {
+ final String projectId = createOpenaireId(40, ((Node) o).getText());
+
+ final Relation r1 = new Relation();
+ r1.setRelType("resultProject");
+ r1.setSubRelType("outcome");
+ r1.setRelClass("isProducedBy");
+ r1.setSource(docId);
+ r1.setTarget(projectId);
+ r1.setCollectedFrom(Arrays.asList(collectedFrom));
+ r1.setDataInfo(info);
+ r1.setLastupdatetimestamp(lastUpdateTimestamp);
+ res.add(r1);
+
+ final Relation r2 = new Relation();
+ r2.setRelType("resultProject");
+ r2.setSubRelType("outcome");
+ r2.setRelClass("produces");
+ r2.setSource(projectId);
+ r2.setTarget(docId);
+ r2.setCollectedFrom(Arrays.asList(collectedFrom));
+ r2.setDataInfo(info);
+ r2.setLastupdatetimestamp(lastUpdateTimestamp);
+ res.add(r2);
+ }
+
+ return res;
+ }
+
+ protected abstract List addOtherResultRels(final Document doc,
+ final KeyValue collectedFrom,
+ final DataInfo info,
+ final long lastUpdateTimestamp);
+
+ private void populateResultFields(final Result r,
+ final Document doc,
+ final KeyValue collectedFrom,
+ final KeyValue hostedBy,
+ final DataInfo info,
+ final long lastUpdateTimestamp) {
+ r.setDataInfo(info);
+ r.setLastupdatetimestamp(lastUpdateTimestamp);
+ r.setId(createOpenaireId(50, doc.valueOf("//dri:objIdentifier")));
+ r.setOriginalId(Arrays.asList(doc.valueOf("//dri:objIdentifier")));
+ r.setCollectedfrom(Arrays.asList(collectedFrom));
+ r.setPid(prepareListStructProps(doc, "//oaf:identifier", "@identifierType", "dnet:pid_types", "dnet:pid_types", info));
+ r.setDateofcollection(doc.valueOf("//dr:dateOfCollection"));
+ r.setDateoftransformation(doc.valueOf("//dr:dateOfTransformation"));
+ r.setExtraInfo(new ArrayList<>()); // NOT PRESENT IN MDSTORES
+ r.setOaiprovenance(prepareOAIprovenance(doc));
+ r.setAuthor(prepareAuthors(doc, info));
+ r.setLanguage(prepareLanguages(doc));
+ r.setCountry(new ArrayList<>()); // NOT PRESENT IN MDSTORES
+ r.setSubject(prepareSubjects(doc, info));
+ r.setTitle(prepareTitles(doc, info));
+ r.setRelevantdate(prepareRelevantDates(doc, info));
+ r.setDescription(prepareDescriptions(doc, info));
+ r.setDateofacceptance(prepareField(doc, "//oaf:dateAccepted", info));
+ r.setPublisher(preparePublisher(doc, info));
+ r.setEmbargoenddate(prepareField(doc, "//oaf:embargoenddate", info));
+ r.setSource(prepareSources(doc, info));
+ r.setFulltext(new ArrayList<>()); // NOT PRESENT IN MDSTORES
+ r.setFormat(prepareFormats(doc, info));
+ r.setContributor(prepareContributors(doc, info));
+ r.setResourcetype(prepareResourceType(doc, info));
+ r.setCoverage(prepareCoverages(doc, info));
+ r.setContext(new ArrayList<>()); // NOT PRESENT IN MDSTORES
+ r.setExternalReference(new ArrayList<>()); // NOT PRESENT IN MDSTORES
+ r.setInstance(prepareInstances(doc, info, collectedFrom, hostedBy));
+ }
+
+ protected abstract Qualifier prepareResourceType(Document doc, DataInfo info);
+
+ protected abstract List prepareInstances(Document doc, DataInfo info, KeyValue collectedfrom, KeyValue hostedby);
+
+ protected abstract List> prepareSources(Document doc, DataInfo info);
+
+ protected abstract List prepareRelevantDates(Document doc, DataInfo info);
+
+ protected abstract List> prepareCoverages(Document doc, DataInfo info);
+
+ protected abstract List> prepareContributors(Document doc, DataInfo info);
+
+ protected abstract List> prepareFormats(Document doc, DataInfo info);
+
+ protected abstract Field preparePublisher(Document doc, DataInfo info);
+
+ protected abstract List> prepareDescriptions(Document doc, DataInfo info);
+
+ protected abstract List prepareTitles(Document doc, DataInfo info);
+
+ protected abstract List prepareSubjects(Document doc, DataInfo info);
+
+ protected abstract Qualifier prepareLanguages(Document doc);
+
+ protected abstract List prepareAuthors(Document doc, DataInfo info);
+
+ protected abstract List> prepareOtherResearchProductTools(Document doc, DataInfo info);
+
+ protected abstract List> prepareOtherResearchProductContactGroups(Document doc, DataInfo info);
+
+ protected abstract List> prepareOtherResearchProductContactPersons(Document doc, DataInfo info);
+
+ protected abstract Qualifier prepareSoftwareProgrammingLanguage(Document doc, DataInfo info);
+
+ protected abstract Field prepareSoftwareCodeRepositoryUrl(Document doc, DataInfo info);
+
+ protected abstract List prepareSoftwareLicenses(Document doc, DataInfo info);
+
+ protected abstract List> prepareSoftwareDocumentationUrls(Document doc, DataInfo info);
+
+ protected abstract List prepareDatasetGeoLocations(Document doc, DataInfo info);
+
+ protected abstract Field prepareDatasetMetadataVersionNumber(Document doc, DataInfo info);
+
+ protected abstract Field prepareDatasetLastMetadataUpdate(Document doc, DataInfo info);
+
+ protected abstract Field prepareDatasetVersion(Document doc, DataInfo info);
+
+ protected abstract Field prepareDatasetSize(Document doc, DataInfo info);
+
+ protected abstract Field prepareDatasetDevice(Document doc, DataInfo info);
+
+ protected abstract Field prepareDatasetStorageDate(Document doc, DataInfo info);
+
+ private Journal prepareJournal(final Document doc, final DataInfo info) {
+ final Node n = doc.selectSingleNode("//oaf:journal");
+ if (n != null) {
+ final String name = n.getText();
+ final String issnPrinted = n.valueOf("@issn");
+ final String issnOnline = n.valueOf("@eissn");
+ final String issnLinking = n.valueOf("@lissn");
+ final String ep = n.valueOf("@ep");
+ final String iss = n.valueOf("@iss");
+ final String sp = n.valueOf("@sp");
+ final String vol = n.valueOf("@vol");
+ final String edition = n.valueOf("@edition");
+ if (StringUtils.isNotBlank(name)) { return journal(name, issnPrinted, issnOnline, issnLinking, ep, iss, sp, vol, edition, null, null, info); }
+ }
+ return null;
+ }
+
+ protected Qualifier prepareQualifier(final Node node, final String xpath, final String schemeId, final String schemeName) {
+ final String classId = node.valueOf(xpath);
+ final String className = code2name.get(classId);
+ return qualifier(classId, className, schemeId, schemeName);
+ }
+
+ protected List prepareListStructProps(final Node node,
+ final String xpath,
+ final String xpathClassId,
+ final String schemeId,
+ final String schemeName,
+ final DataInfo info) {
+ final List res = new ArrayList<>();
+ for (final Object o : node.selectNodes(xpath)) {
+ final Node n = (Node) o;
+ final String classId = n.valueOf(xpathClassId);
+ final String className = code2name.get(classId);
+ res.add(structuredProperty(n.getText(), classId, className, schemeId, schemeName, info));
+ }
+ return res;
+ }
+
+ protected List prepareListStructProps(final Node node, final String xpath, final Qualifier qualifier, final DataInfo info) {
+ final List res = new ArrayList<>();
+ for (final Object o : node.selectNodes(xpath)) {
+ final Node n = (Node) o;
+ res.add(structuredProperty(n.getText(), qualifier, info));
+ }
+ return res;
+ }
+
+ protected List prepareListStructProps(final Node node, final String xpath, final DataInfo info) {
+ final List res = new ArrayList<>();
+ for (final Object o : node.selectNodes(xpath)) {
+ final Node n = (Node) o;
+ res.add(structuredProperty(n.getText(), n.valueOf("@classid"), n.valueOf("@classname"), n.valueOf("@schemeid"), n
+ .valueOf("@schemename"), info));
+ }
+ return res;
+ }
+
+ protected OAIProvenance prepareOAIprovenance(final Document doc) {
+ final Node n = doc.selectSingleNode("//*[local-name()='provenance']/*[local-name()='originDescription']");
+
+ if (n == null) { return null; }
+
+ final String identifier = n.valueOf("./*[local-name()='identifier']");
+ final String baseURL = n.valueOf("./*[local-name()='baseURL']");;
+ final String metadataNamespace = n.valueOf("./*[local-name()='metadataNamespace']");;
+ final boolean altered = n.valueOf("@altered").equalsIgnoreCase("true");
+ final String datestamp = n.valueOf("./*[local-name()='datestamp']");;
+ final String harvestDate = n.valueOf("@harvestDate");;
+
+ return oaiIProvenance(identifier, baseURL, metadataNamespace, altered, datestamp, harvestDate);
+
+ }
+
+ protected DataInfo prepareDataInfo(final Document doc) {
+ final Node n = doc.selectSingleNode("//oaf:datainfo");
+
+ final String paClassId = n.valueOf("./oaf:provenanceaction/@classid");
+ final String paClassName = n.valueOf("./oaf:provenanceaction/@classname");
+ final String paSchemeId = n.valueOf("./oaf:provenanceaction/@schemeid");
+ final String paSchemeName = n.valueOf("./oaf:provenanceaction/@schemename");
+
+ final boolean deletedbyinference = Boolean.parseBoolean(n.valueOf("./oaf:deletedbyinference"));
+ final String inferenceprovenance = n.valueOf("./oaf:inferenceprovenance");
+ final Boolean inferred = Boolean.parseBoolean(n.valueOf("./oaf:inferred"));
+ final String trust = n.valueOf("./oaf:trust");
+
+ return dataInfo(deletedbyinference, inferenceprovenance, inferred, false, qualifier(paClassId, paClassName, paSchemeId, paSchemeName), trust);
+ }
+
+ protected Field prepareField(final Node node, final String xpath, final DataInfo info) {
+ return field(node.valueOf(xpath), info);
+ }
+
+ protected List> prepareListFields(final Node node, final String xpath, final DataInfo info) {
+ return listFields(info, prepareListString(node, xpath));
+ }
+
+ protected List prepareListString(final Node node, final String xpath) {
+ final List res = new ArrayList<>();
+ for (final Object o : node.selectNodes(xpath)) {
+ final String s = ((Node) o).getText().trim();
+ if (StringUtils.isNotBlank(s)) {
+ res.add(s);
+ }
+ }
+ return res;
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ mdstoreClient.close();
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/DbClient.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/DbClient.java
new file mode 100644
index 000000000..9ac0089d2
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/DbClient.java
@@ -0,0 +1,63 @@
+package eu.dnetlib.dhp.migration;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.function.Consumer;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class DbClient implements Closeable {
+
+ private static final Log log = LogFactory.getLog(DbClient.class);
+
+ private Connection connection;
+
+ public DbClient(final String address, final String login, final String password) {
+
+ try {
+ Class.forName("org.postgresql.Driver");
+
+ this.connection =
+ StringUtils.isNoneBlank(login, password) ? DriverManager.getConnection(address, login, password) : DriverManager.getConnection(address);
+ this.connection.setAutoCommit(false);
+ } catch (final Exception e) {
+ log.error(e.getClass().getName() + ": " + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ log.info("Opened database successfully");
+ }
+
+ public void processResults(final String sql, final Consumer consumer) {
+
+ try (final Statement stmt = connection.createStatement()) {
+ stmt.setFetchSize(100);
+
+ try (final ResultSet rs = stmt.executeQuery(sql)) {
+ while (rs.next()) {
+ consumer.accept(rs);
+ }
+ } catch (final SQLException e) {
+ throw new RuntimeException(e);
+ }
+ } catch (final SQLException e1) {
+ throw new RuntimeException(e1);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ connection.close();
+ } catch (final SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MdstoreClient.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MdstoreClient.java
new file mode 100644
index 000000000..87dadfc7a
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MdstoreClient.java
@@ -0,0 +1,94 @@
+package eu.dnetlib.dhp.migration;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.StreamSupport;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.bson.Document;
+
+import com.google.common.collect.Iterables;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+
+public class MdstoreClient implements Closeable {
+
+ private final MongoClient client;
+ private final MongoDatabase db;
+
+ private static final String COLL_METADATA = "metadata";
+ private static final String COLL_METADATA_MANAGER = "metadataManager";
+
+ private static final Log log = LogFactory.getLog(MdstoreClient.class);
+
+ public MdstoreClient(final String baseUrl, final String dbName) {
+ this.client = new MongoClient(new MongoClientURI(baseUrl));
+ this.db = getDb(client, dbName);
+ }
+
+ public Map validCollections(final String mdFormat, final String mdLayout, final String mdInterpretation) {
+
+ final Map transactions = new HashMap<>();
+ for (final Document entry : getColl(db, COLL_METADATA_MANAGER, true).find()) {
+ final String mdId = entry.getString("mdId");
+ final String currentId = entry.getString("currentId");
+ if (StringUtils.isNoneBlank(mdId, currentId)) {
+ transactions.put(mdId, currentId);
+ }
+ }
+
+ final Map res = new HashMap<>();
+ for (final Document entry : getColl(db, COLL_METADATA, true).find()) {
+ if (entry.getString("format").equals(mdFormat) && entry.getString("layout").equals(mdLayout)
+ && entry.getString("interpretation").equals(mdInterpretation) && transactions.containsKey(entry.getString("mdId"))) {
+ res.put(entry.getString("mdId"), transactions.get(entry.getString("mdId")));
+ }
+ }
+
+ return res;
+ }
+
+ private MongoDatabase getDb(final MongoClient client, final String dbName) {
+ if (!Iterables.contains(client.listDatabaseNames(), dbName)) {
+ final String err = String.format("Database '%s' not found in %s", dbName, client.getAddress());
+ log.warn(err);
+ throw new RuntimeException(err);
+ }
+ return client.getDatabase(dbName);
+ }
+
+ private MongoCollection getColl(final MongoDatabase db, final String collName, final boolean abortIfMissing) {
+ if (!Iterables.contains(db.listCollectionNames(), collName)) {
+ final String err = String.format(String.format("Missing collection '%s' in database '%s'", collName, db.getName()));
+ log.warn(err);
+ if (abortIfMissing) {
+ throw new RuntimeException(err);
+ } else {
+ return null;
+ }
+ }
+ return db.getCollection(collName);
+ }
+
+ public Iterable listRecords(final String collName) {
+ final MongoCollection coll = getColl(db, collName, false);
+ return coll == null ? new ArrayList<>()
+ : () -> StreamSupport.stream(coll.find().spliterator(), false)
+ .filter(e -> e.containsKey("body"))
+ .map(e -> e.getString("body"))
+ .iterator();
+ }
+
+ @Override
+ public void close() throws IOException {
+ client.close();
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateDbEntitiesApplication.java
new file mode 100644
index 000000000..d22e8e5b3
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateDbEntitiesApplication.java
@@ -0,0 +1,520 @@
+package eu.dnetlib.dhp.migration;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.Array;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.schema.oaf.DataInfo;
+import eu.dnetlib.dhp.schema.oaf.Datasource;
+import eu.dnetlib.dhp.schema.oaf.Field;
+import eu.dnetlib.dhp.schema.oaf.Journal;
+import eu.dnetlib.dhp.schema.oaf.KeyValue;
+import eu.dnetlib.dhp.schema.oaf.Organization;
+import eu.dnetlib.dhp.schema.oaf.Project;
+import eu.dnetlib.dhp.schema.oaf.Qualifier;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
+
+public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor implements Closeable {
+
+ private static final Qualifier ENTITYREGISTRY_PROVENANCE_ACTION =
+ qualifier("sysimport:crosswalk:entityregistry", "sysimport:crosswalk:entityregistry", "dnet:provenance_actions", "dnet:provenance_actions");
+
+ private static final Log log = LogFactory.getLog(MigrateDbEntitiesApplication.class);
+
+ private final DbClient dbClient;
+
+ private final long lastUpdateTimestamp;
+
+ public static void main(final String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils.toString(MigrateDbEntitiesApplication.class.getResourceAsStream("/eu/dnetlib/dhp/migration/migrate_db_entities_parameters.json")));
+
+ parser.parseArgument(args);
+
+ final String dbUrl = parser.get("postgresUrl");
+ final String dbUser = parser.get("postgresUser");
+ final String dbPassword = parser.get("postgresPassword");
+
+ final String hdfsPath = parser.get("hdfsPath");
+ final String hdfsNameNode = parser.get("namenode");
+ final String hdfsUser = parser.get("hdfsUser");
+
+ try (final MigrateDbEntitiesApplication smdbe = new MigrateDbEntitiesApplication(hdfsPath, hdfsNameNode, hdfsUser, dbUrl, dbUser, dbPassword)) {
+ log.info("Processing datasources...");
+ smdbe.execute("queryDatasources.sql", smdbe::processDatasource);
+
+ log.info("Processing projects...");
+ smdbe.execute("queryProjects.sql", smdbe::processProject);
+
+ log.info("Processing orgs...");
+ smdbe.execute("queryOrganizations.sql", smdbe::processOrganization);
+
+ log.info("Processing relations ds <-> orgs ...");
+ smdbe.execute("queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization);
+
+ log.info("Processing projects <-> orgs ...");
+ smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization);
+
+ log.info("All done.");
+ }
+ }
+
+ public MigrateDbEntitiesApplication(final String hdfsPath, final String hdfsNameNode, final String hdfsUser, final String dbUrl, final String dbUser,
+ final String dbPassword) throws Exception {
+ super(hdfsPath, hdfsNameNode, hdfsUser);
+ this.dbClient = new DbClient(dbUrl, dbUser, dbPassword);
+ this.lastUpdateTimestamp = new Date().getTime();
+ }
+
+ public void execute(final String sqlFile, final Consumer consumer) throws Exception {
+ final String sql = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/migration/sql/" + sqlFile));
+ dbClient.processResults(sql, consumer);
+ }
+
+ public void processDatasource(final ResultSet rs) {
+
+ try {
+
+ final DataInfo info = prepareDataInfo(rs);
+
+ final Datasource ds = new Datasource();
+
+ ds.setId(createOpenaireId(10, rs.getString("datasourceid")));
+ ds.setOriginalId(Arrays.asList(rs.getString("datasourceid")));
+ ds.setCollectedfrom(listKeyValues(rs.getString("collectedfromid"), rs.getString("collectedfromname")));
+ ds.setPid(new ArrayList<>());
+ ds.setDateofcollection(asString(rs.getDate("dateofcollection")));
+ ds.setDateoftransformation(null); // Value not returned by the SQL query
+ ds.setExtraInfo(new ArrayList<>()); // Values not present in the DB
+ ds.setOaiprovenance(null); // Values not present in the DB
+ ds.setDatasourcetype(prepareQualifierSplitting(rs.getString("datasourcetype")));
+ ds.setOpenairecompatibility(prepareQualifierSplitting(rs.getString("openairecompatibility")));
+ ds.setOfficialname(field(rs.getString("officialname"), info));
+ ds.setEnglishname(field(rs.getString("englishname"), info));
+ ds.setWebsiteurl(field(rs.getString("websiteurl"), info));
+ ds.setLogourl(field(rs.getString("logourl"), info));
+ ds.setContactemail(field(rs.getString("contactemail"), info));
+ ds.setNamespaceprefix(field(rs.getString("namespaceprefix"), info));
+ ds.setLatitude(field(Double.toString(rs.getDouble("latitude")), info));
+ ds.setLongitude(field(Double.toString(rs.getDouble("longitude")), info));
+ ds.setDateofvalidation(field(asString(rs.getDate("dateofvalidation")), info));
+ ds.setDescription(field(rs.getString("description"), info));
+ ds.setSubjects(prepareListOfStructProps(rs.getArray("subjects"), info));
+ ds.setOdnumberofitems(field(Double.toString(rs.getInt("odnumberofitems")), info));
+ ds.setOdnumberofitemsdate(field(asString(rs.getDate("odnumberofitemsdate")), info));
+ ds.setOdpolicies(field(rs.getString("odpolicies"), info));
+ ds.setOdlanguages(prepareListFields(rs.getArray("odlanguages"), info));
+ ds.setOdcontenttypes(prepareListFields(rs.getArray("odcontenttypes"), info));
+ ds.setAccessinfopackage(prepareListFields(rs.getArray("accessinfopackage"), info));
+ ds.setReleasestartdate(field(asString(rs.getDate("releasestartdate")), info));
+ ds.setReleaseenddate(field(asString(rs.getDate("releaseenddate")), info));
+ ds.setMissionstatementurl(field(rs.getString("missionstatementurl"), info));
+ ds.setDataprovider(field(rs.getBoolean("dataprovider"), info));
+ ds.setServiceprovider(field(rs.getBoolean("serviceprovider"), info));
+ ds.setDatabaseaccesstype(field(rs.getString("databaseaccesstype"), info));
+ ds.setDatauploadtype(field(rs.getString("datauploadtype"), info));
+ ds.setDatabaseaccessrestriction(field(rs.getString("databaseaccessrestriction"), info));
+ ds.setDatauploadrestriction(field(rs.getString("datauploadrestriction"), info));
+ ds.setVersioning(field(rs.getBoolean("versioning"), info));
+ ds.setCitationguidelineurl(field(rs.getString("citationguidelineurl"), info));
+ ds.setQualitymanagementkind(field(rs.getString("qualitymanagementkind"), info));
+ ds.setPidsystems(field(rs.getString("pidsystems"), info));
+ ds.setCertificates(field(rs.getString("certificates"), info));
+ ds.setPolicies(new ArrayList<>()); // The sql query returns an empty array
+ ds.setJournal(prepareJournal(rs.getString("officialname"), rs.getString("journal"), info)); // Journal
+ ds.setDataInfo(info);
+ ds.setLastupdatetimestamp(lastUpdateTimestamp);
+
+ // rs.getString("datasourceid");
+ // rs.getArray("identities");
+ // rs.getString("officialname");
+ // rs.getString("englishname");
+ // rs.getString("contactemail");
+ // rs.getString("openairecompatibility"); // COMPLEX ...@@@...
+ // rs.getString("websiteurl");
+ // rs.getString("logourl");
+ // rs.getArray("accessinfopackage");
+ // rs.getDouble("latitude");
+ // rs.getDouble("longitude");
+ // rs.getString("namespaceprefix");
+ // rs.getInt("odnumberofitems"); // NULL
+ // rs.getDate("odnumberofitemsdate"); // NULL
+ // rs.getArray("subjects");
+ // rs.getString("description");
+ // rs.getString("odpolicies"); // NULL
+ // rs.getArray("odlanguages");
+ // rs.getArray("odcontenttypes");
+ // rs.getBoolean("inferred"); // false
+ // rs.getBoolean("deletedbyinference");// false
+ // rs.getDouble("trust"); // 0.9
+ // rs.getString("inferenceprovenance"); // NULL
+ // rs.getDate("dateofcollection");
+ // rs.getDate("dateofvalidation");
+ // rs.getDate("releasestartdate");
+ // rs.getDate("releaseenddate");
+ // rs.getString("missionstatementurl");
+ // rs.getBoolean("dataprovider");
+ // rs.getBoolean("serviceprovider");
+ // rs.getString("databaseaccesstype");
+ // rs.getString("datauploadtype");
+ // rs.getString("databaseaccessrestriction");
+ // rs.getString("datauploadrestriction");
+ // rs.getBoolean("versioning");
+ // rs.getString("citationguidelineurl");
+ // rs.getString("qualitymanagementkind");
+ // rs.getString("pidsystems");
+ // rs.getString("certificates");
+ // rs.getArray("policies");
+ // rs.getString("collectedfromid");
+ // rs.getString("collectedfromname");
+ // rs.getString("datasourcetype"); // COMPLEX
+ // rs.getString("provenanceaction"); //
+ // 'sysimport:crosswalk:entityregistry@@@sysimport:crosswalk:entityregistry@@@dnet:provenance_actions@@@dnet:provenance_actions'
+ // AS provenanceaction,
+ // rs.getString("journal"); // CONCAT(d.issn, '@@@', d.eissn, '@@@', d.lissn) AS journal
+
+ emitOaf(ds);
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void processProject(final ResultSet rs) {
+ try {
+
+ final DataInfo info = prepareDataInfo(rs);
+
+ final Project p = new Project();
+
+ p.setId(createOpenaireId(40, rs.getString("projectid")));
+ p.setOriginalId(Arrays.asList(rs.getString("projectid")));
+ p.setCollectedfrom(listKeyValues(rs.getString("collectedfromid"), rs.getString("collectedfromname")));
+ p.setPid(new ArrayList<>());
+ p.setDateofcollection(asString(rs.getDate("dateofcollection")));
+ p.setDateoftransformation(asString(rs.getDate("dateoftransformation")));
+ p.setExtraInfo(new ArrayList<>()); // Values not present in the DB
+ p.setOaiprovenance(null); // Values not present in the DB
+ p.setWebsiteurl(field(rs.getString("websiteurl"), info));
+ p.setCode(field(rs.getString("code"), info));
+ p.setAcronym(field(rs.getString("acronym"), info));
+ p.setTitle(field(rs.getString("title"), info));
+ p.setStartdate(field(asString(rs.getDate("startdate")), info));
+ p.setEnddate(field(asString(rs.getDate("enddate")), info));
+ p.setCallidentifier(field(rs.getString("callidentifier"), info));
+ p.setKeywords(field(rs.getString("keywords"), info));
+ p.setDuration(field(Integer.toString(rs.getInt("duration")), info));
+ p.setEcsc39(field(Boolean.toString(rs.getBoolean("ecsc39")), info));
+ p.setOamandatepublications(field(Boolean.toString(rs.getBoolean("oamandatepublications")), info));
+ p.setEcarticle29_3(field(Boolean.toString(rs.getBoolean("ecarticle29_3")), info));
+ p.setSubjects(prepareListOfStructProps(rs.getArray("subjects"), info));
+ p.setFundingtree(prepareListFields(rs.getArray("fundingtree"), info));
+ p.setContracttype(prepareQualifierSplitting(rs.getString("contracttype")));
+ p.setOptional1(field(rs.getString("optional1"), info));
+ p.setOptional2(field(rs.getString("optional2"), info));
+ p.setJsonextrainfo(field(rs.getString("jsonextrainfo"), info));
+ p.setContactfullname(field(rs.getString("contactfullname"), info));
+ p.setContactfax(field(rs.getString("contactfax"), info));
+ p.setContactphone(field(rs.getString("contactphone"), info));
+ p.setContactemail(field(rs.getString("contactemail"), info));
+ p.setSummary(field(rs.getString("summary"), info));
+ p.setCurrency(field(rs.getString("currency"), info));
+ p.setTotalcost(new Float(rs.getDouble("totalcost")));
+ p.setFundedamount(new Float(rs.getDouble("fundedamount")));
+ p.setDataInfo(info);
+ p.setLastupdatetimestamp(lastUpdateTimestamp);
+
+ // rs.getString("projectid");
+ // rs.getString("code");
+ // rs.getString("websiteurl");
+ // rs.getString("acronym");
+ // rs.getString("title");
+ // rs.getDate("startdate");
+ // rs.getDate("enddate");
+ // rs.getString("callidentifier");
+ // rs.getString("keywords");
+ // rs.getInt("duration");
+ // rs.getBoolean("ecsc39");
+ // rs.getBoolean("oamandatepublications");
+ // rs.getBoolean("ecarticle29_3");
+ // rs.getDate("dateofcollection");
+ // rs.getDate("dateoftransformation");
+ // rs.getBoolean("inferred");
+ // rs.getBoolean("deletedbyinference");
+ // rs.getDouble("trust");
+ // rs.getString("inferenceprovenance");
+ // rs.getString("optional1");
+ // rs.getString("optional2");
+ // rs.getString("jsonextrainfo");
+ // rs.getString("contactfullname");
+ // rs.getString("contactfax");
+ // rs.getString("contactphone");
+ // rs.getString("contactemail");
+ // rs.getString("summary");
+ // rs.getString("currency");
+ // rs.getDouble("totalcost");
+ // rs.getDouble("fundedamount");
+ // rs.getString("collectedfromid");
+ // rs.getString("collectedfromname");
+ // rs.getString("contracttype"); // COMPLEX
+ // rs.getString("provenanceaction"); // COMPLEX
+ // rs.getArray("pid");
+ // rs.getArray("subjects");
+ // rs.getArray("fundingtree");
+
+ emitOaf(p);
+
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void processOrganization(final ResultSet rs) {
+
+ try {
+
+ final DataInfo info = prepareDataInfo(rs);
+
+ final Organization o = new Organization();
+
+ o.setId(createOpenaireId(20, rs.getString("organizationid")));
+ o.setOriginalId(Arrays.asList(rs.getString("organizationid")));
+ o.setCollectedfrom(listKeyValues(rs.getString("collectedfromid"), rs.getString("collectedfromname")));
+ o.setPid(new ArrayList<>());
+ o.setDateofcollection(asString(rs.getDate("dateofcollection")));
+ o.setDateoftransformation(asString(rs.getDate("dateoftransformation")));
+ o.setExtraInfo(new ArrayList<>()); // Values not present in the DB
+ o.setOaiprovenance(null); // Values not present in the DB
+ o.setLegalshortname(field("legalshortname", info));
+ o.setLegalname(field("legalname", info));
+ o.setAlternativeNames(new ArrayList<>()); // Values not returned by the SQL query
+ o.setWebsiteurl(field("websiteurl", info));
+ o.setLogourl(field("logourl", info));
+ o.setEclegalbody(field(Boolean.toString(rs.getBoolean("eclegalbody")), info));
+ o.setEclegalperson(field(Boolean.toString(rs.getBoolean("eclegalperson")), info));
+ o.setEcnonprofit(field(Boolean.toString(rs.getBoolean("ecnonprofit")), info));
+ o.setEcresearchorganization(field(Boolean.toString(rs.getBoolean("ecresearchorganization")), info));
+ o.setEchighereducation(field(Boolean.toString(rs.getBoolean("echighereducation")), info));
+ o.setEcinternationalorganizationeurinterests(field(Boolean.toString(rs.getBoolean("ecinternationalorganizationeurinterests")), info));
+ o.setEcinternationalorganization(field(Boolean.toString(rs.getBoolean("ecinternationalorganization")), info));
+ o.setEcenterprise(field(Boolean.toString(rs.getBoolean("ecenterprise")), info));
+ o.setEcsmevalidated(field(Boolean.toString(rs.getBoolean("ecsmevalidated")), info));
+ o.setEcnutscode(field(Boolean.toString(rs.getBoolean("ecnutscode")), info));
+ o.setCountry(prepareQualifierSplitting(rs.getString("country")));
+ o.setDataInfo(info);
+ o.setLastupdatetimestamp(lastUpdateTimestamp);
+
+ // rs.getString("organizationid");
+ // rs.getString("legalshortname");
+ // rs.getString("legalname");
+ // rs.getString("websiteurl");
+ // rs.getString("logourl");
+ // rs.getBoolean("eclegalbody");
+ // rs.getBoolean("eclegalperson");
+ // rs.getBoolean("ecnonprofit");
+ // rs.getBoolean("ecresearchorganization");
+ // rs.getBoolean("echighereducation");
+ // rs.getBoolean("ecinternationalorganizationeurinterests");
+ // rs.getBoolean("ecinternationalorganization");
+ // rs.getBoolean("ecenterprise");
+ // rs.getBoolean("ecsmevalidated");
+ // rs.getBoolean("ecnutscode");
+ // rs.getDate("dateofcollection");
+ // rs.getDate("dateoftransformation");
+ // rs.getBoolean("inferred");
+ // rs.getBoolean("deletedbyinference");
+ // rs.getDouble("trust");
+ // rs.getString("inferenceprovenance");
+ // rs.getString("collectedfromid");
+ // rs.getString("collectedfromname");
+ // rs.getString("country");
+ // rs.getString("provenanceaction");
+ // rs.getArray("pid");
+
+ emitOaf(o);
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void processDatasourceOrganization(final ResultSet rs) {
+
+ try {
+ final DataInfo info = prepareDataInfo(rs);
+ final String orgId = createOpenaireId(20, rs.getString("organization"));
+ final String dsId = createOpenaireId(10, rs.getString("datasource"));
+ final List collectedFrom = listKeyValues(rs.getString("collectedfromid"), rs.getString("collectedfromname"));
+
+ final Relation r1 = new Relation();
+ r1.setRelType("datasourceOrganization");
+ r1.setSubRelType("provision");
+ r1.setRelClass("isProvidedBy");
+ r1.setSource(dsId);
+ r1.setTarget(orgId);
+ r1.setCollectedFrom(collectedFrom);
+ r1.setDataInfo(info);
+ r1.setLastupdatetimestamp(lastUpdateTimestamp);
+ emitOaf(r1);
+
+ final Relation r2 = new Relation();
+ r2.setRelType("datasourceOrganization");
+ r2.setSubRelType("provision");
+ r2.setRelClass("provides");
+ r2.setSource(orgId);
+ r2.setTarget(dsId);
+ r2.setCollectedFrom(collectedFrom);
+ r2.setDataInfo(info);
+ r1.setLastupdatetimestamp(lastUpdateTimestamp);
+ emitOaf(r2);
+
+ // rs.getString("datasource");
+ // rs.getString("organization");
+ // rs.getDate("startdate"); // NULL
+ // rs.getDate("enddate"); // NULL
+ // rs.getBoolean("inferred"); // false
+ // rs.getBoolean("deletedbyinference"); // false
+ // rs.getDouble("trust"); // 0.9
+ // rs.getString("inferenceprovenance"); // NULL
+ // rs.getString("semantics"); // 'providedBy@@@provided
+ // by@@@dnet:datasources_organizations_typologies@@@dnet:datasources_organizations_typologies' AS
+ // semantics,
+ // rs.getString("provenanceaction"); // d.provenanceaction || '@@@' || d.provenanceaction ||
+ // '@@@dnet:provenanceActions@@@dnet:provenanceActions' AS provenanceaction
+
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void processProjectOrganization(final ResultSet rs) {
+
+ try {
+ final DataInfo info = prepareDataInfo(rs);
+ final String orgId = createOpenaireId(20, rs.getString("resporganization"));
+ final String projectId = createOpenaireId(40, rs.getString("project"));
+ final List collectedFrom = listKeyValues(rs.getString("collectedfromid"), rs.getString("collectedfromname"));
+
+ final Relation r1 = new Relation();
+ r1.setRelType("projectOrganization");
+ r1.setSubRelType("participation");
+ r1.setRelClass("isParticipant");
+ r1.setSource(projectId);
+ r1.setTarget(orgId);
+ r1.setCollectedFrom(collectedFrom);
+ r1.setDataInfo(info);
+ r1.setLastupdatetimestamp(lastUpdateTimestamp);
+ emitOaf(r1);
+
+ final Relation r2 = new Relation();
+ r2.setRelType("projectOrganization");
+ r2.setSubRelType("participation");
+ r2.setRelClass("hasParticipant");
+ r2.setSource(orgId);
+ r2.setTarget(projectId);
+ r2.setCollectedFrom(collectedFrom);
+ r2.setDataInfo(info);
+ r1.setLastupdatetimestamp(lastUpdateTimestamp);
+ emitOaf(r2);
+
+ // rs.getString("project");
+ // rs.getString("resporganization");
+ // rs.getInt("participantnumber");
+ // rs.getDouble("contribution");
+ // rs.getDate("startdate");// null
+ // rs.getDate("enddate");// null
+ // rs.getBoolean("inferred");// false
+ // rs.getBoolean("deletedbyinference"); // false
+ // rs.getDouble("trust");
+ // rs.getString("inferenceprovenance"); // NULL
+ // rs.getString("semantics"); // po.semanticclass || '@@@' || po.semanticclass ||
+ // '@@@dnet:project_organization_relations@@@dnet:project_organization_relations' AS semantics,
+ // rs.getString("provenanceaction"); //
+ // 'sysimport:crosswalk:entityregistry@@@sysimport:crosswalk:entityregistry@@@dnet:provenance_actions@@@dnet:provenance_actions'
+ // AS provenanceaction
+
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private DataInfo prepareDataInfo(final ResultSet rs) throws SQLException {
+ final Boolean deletedbyinference = rs.getBoolean("deletedbyinference");
+ final String inferenceprovenance = rs.getString("inferenceprovenance");
+ final Boolean inferred = rs.getBoolean("inferred");
+ final String trust = rs.getString("trust");
+ return dataInfo(deletedbyinference, inferenceprovenance, inferred, false, ENTITYREGISTRY_PROVENANCE_ACTION, trust);
+ }
+
+ private Qualifier prepareQualifierSplitting(final String s) {
+ if (StringUtils.isBlank(s)) { return null; }
+ final String[] arr = s.split("@@@");
+ return arr.length == 4 ? qualifier(arr[0], arr[1], arr[2], arr[3]) : null;
+ }
+
+ private List> prepareListFields(final Array array, final DataInfo info) {
+ try {
+ return listFields(info, (String[]) array.getArray());
+ } catch (final SQLException e) {
+ throw new RuntimeException("Invalid SQL array", e);
+ }
+ }
+
+ private StructuredProperty prepareStructProp(final String s, final DataInfo dataInfo) {
+ if (StringUtils.isBlank(s)) { return null; }
+ final String[] parts = s.split("###");
+ if (parts.length == 2) {
+ final String value = parts[0];
+ final String[] arr = parts[1].split("@@@");
+ if (arr.length == 4) { return structuredProperty(value, arr[0], arr[1], arr[2], arr[3], dataInfo); }
+ }
+ return null;
+ }
+
+ private List prepareListOfStructProps(final Array array, final DataInfo dataInfo) throws SQLException {
+ final List res = new ArrayList<>();
+ if (array != null) {
+ for (final String s : (String[]) array.getArray()) {
+ final StructuredProperty sp = prepareStructProp(s, dataInfo);
+ if (sp != null) {
+ res.add(sp);
+ }
+ }
+ }
+
+ return res;
+ }
+
+ private Journal prepareJournal(final String name, final String sj, final DataInfo info) {
+ if (StringUtils.isNotBlank(sj)) {
+ final String[] arr = sj.split("@@@");
+ if (arr.length == 3) {
+ final String issn = StringUtils.isNotBlank(arr[0]) ? arr[0] : null;
+ final String eissn = StringUtils.isNotBlank(arr[1]) ? arr[1] : null;;
+ final String lissn = StringUtils.isNotBlank(arr[2]) ? arr[2] : null;;
+ if (issn != null || eissn != null
+ || lissn != null) { return journal(name, issn, eissn, eissn, null, null, null, null, null, null, null, info); }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ dbClient.close();
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateMongoMdstoresApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateMongoMdstoresApplication.java
new file mode 100644
index 000000000..359fe7596
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateMongoMdstoresApplication.java
@@ -0,0 +1,45 @@
+package eu.dnetlib.dhp.migration;
+
+import org.apache.commons.io.IOUtils;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+
+public class MigrateMongoMdstoresApplication {
+
+ public static void main(final String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils.toString(MigrateMongoMdstoresApplication.class.getResourceAsStream("/eu/dnetlib/dhp/migration/migrate_mongo_mstores_parameters.json")));
+ parser.parseArgument(args);
+
+ final String mongoBaseUrl = parser.get("mongoBaseUrl");
+ final String mongoDb = parser.get("mongoDb");
+
+ final String mdFormat = parser.get("mdFormat");
+ final String mdLayout = parser.get("mdLayout");
+ final String mdInterpretation = parser.get("mdInterpretation");
+
+ final String hdfsPath = parser.get("hdfsPath");
+ final String hdfsNameNode = parser.get("namenode");
+ final String hdfsUser = parser.get("hdfsUser");
+
+ final String dbUrl = parser.get("postgresUrl");
+ final String dbUser = parser.get("postgresUser");
+ final String dbPassword = parser.get("postgresPassword");
+
+ if (mdFormat.equalsIgnoreCase("oaf")) {
+ try (final OafMigrationExecutor mig =
+ new OafMigrationExecutor(hdfsPath, hdfsNameNode, hdfsUser, mongoBaseUrl, mongoDb, dbUrl, dbUser, dbPassword)) {
+ mig.processMdRecords(mdFormat, mdLayout, mdInterpretation);
+ }
+ } else if (mdFormat.equalsIgnoreCase("odf")) {
+ try (final OdfMigrationExecutor mig =
+ new OdfMigrationExecutor(hdfsPath, hdfsNameNode, hdfsUser, mongoBaseUrl, mongoDb, dbUrl, dbUser, dbPassword)) {
+ mig.processMdRecords(mdFormat, mdLayout, mdInterpretation);
+ }
+ } else {
+ throw new RuntimeException("Format not supported: " + mdFormat);
+ }
+
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/OafMigrationExecutor.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/OafMigrationExecutor.java
new file mode 100644
index 000000000..c32568290
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/OafMigrationExecutor.java
@@ -0,0 +1,251 @@
+package eu.dnetlib.dhp.migration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.dom4j.Document;
+import org.dom4j.Node;
+
+import eu.dnetlib.dhp.migration.pace.PacePerson;
+import eu.dnetlib.dhp.schema.oaf.Author;
+import eu.dnetlib.dhp.schema.oaf.DataInfo;
+import eu.dnetlib.dhp.schema.oaf.Field;
+import eu.dnetlib.dhp.schema.oaf.GeoLocation;
+import eu.dnetlib.dhp.schema.oaf.Instance;
+import eu.dnetlib.dhp.schema.oaf.KeyValue;
+import eu.dnetlib.dhp.schema.oaf.Oaf;
+import eu.dnetlib.dhp.schema.oaf.Qualifier;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
+
+public class OafMigrationExecutor extends AbstractMongoExecutor {
+
+ private static final Log log = LogFactory.getLog(OafMigrationExecutor.class);
+
+ public OafMigrationExecutor(final String hdfsPath, final String hdfsNameNode, final String hdfsUser, final String mongoBaseUrl, final String mongoDb,
+ final String dbUrl, final String dbUser,
+ final String dbPassword) throws Exception {
+ super(hdfsPath, hdfsNameNode, hdfsUser, mongoBaseUrl, mongoDb, dbUrl, dbUser, dbPassword);
+ }
+
+ @Override
+ protected void registerNamespaces(final Map nsContext) {
+ super.registerNamespaces(nsContext);
+ nsContext.put("dc", "http://purl.org/dc/elements/1.1/");
+ }
+
+ @Override
+ protected List prepareAuthors(final Document doc, final DataInfo info) {
+ final List res = new ArrayList<>();
+ int pos = 1;
+ for (final Object o : doc.selectNodes("//dc:creator")) {
+ final Node n = (Node) o;
+ final Author author = new Author();
+ author.setFullname(n.getText());
+ author.setRank(pos++);
+ final PacePerson p = new PacePerson(n.getText(), false);
+ if (p.isAccurate()) {
+ author.setName(p.getNormalisedFirstName());
+ author.setSurname(p.getNormalisedSurname());
+ }
+ res.add(author);
+ }
+ return res;
+ }
+
+ @Override
+ protected Qualifier prepareLanguages(final Document doc) {
+ return prepareQualifier(doc, "//dc:language", "dnet:languages", "dnet:languages");
+ }
+
+ @Override
+ protected List prepareSubjects(final Document doc, final DataInfo info) {
+ return prepareListStructProps(doc, "//dc:subject", info);
+ }
+
+ @Override
+ protected List prepareTitles(final Document doc, final DataInfo info) {
+ return prepareListStructProps(doc, "//dc:title", MAIN_TITLE_QUALIFIER, info);
+ }
+
+ @Override
+ protected List> prepareDescriptions(final Document doc, final DataInfo info) {
+ return prepareListFields(doc, "//dc:description", info);
+ }
+
+ @Override
+ protected Field preparePublisher(final Document doc, final DataInfo info) {
+ return prepareField(doc, "//dc:publisher", info);
+ }
+
+ @Override
+ protected List> prepareFormats(final Document doc, final DataInfo info) {
+ return prepareListFields(doc, "//dc:format", info);
+ }
+
+ @Override
+ protected List> prepareContributors(final Document doc, final DataInfo info) {
+ return prepareListFields(doc, "//dc:contributor", info);
+ }
+
+ @Override
+ protected List> prepareCoverages(final Document doc, final DataInfo info) {
+ return prepareListFields(doc, "//dc:coverage", info);
+ }
+
+ @Override
+ protected List prepareInstances(final Document doc, final DataInfo info, final KeyValue collectedfrom, final KeyValue hostedby) {
+ final List res = new ArrayList<>();
+ for (final Object o : doc.selectNodes("//dc:identifier")) {
+ final String url = ((Node) o).getText().trim();
+ if (url.startsWith("http")) {
+ final Instance instance = new Instance();
+ instance.setUrl(Arrays.asList(url));
+ instance.setInstancetype(prepareQualifier(doc, "//dr:CobjCategory", "dnet:publication_resource", "dnet:publication_resource"));
+ instance.setCollectedfrom(collectedfrom);
+ instance.setHostedby(hostedby);
+ instance.setDateofacceptance(field(doc.valueOf("//oaf:dateAccepted"), info));
+ instance.setDistributionlocation(doc.valueOf("//oaf:distributionlocation"));
+ instance.setAccessright(prepareQualifier(doc, "//oaf:accessrights", "dnet:access_modes", "dnet:access_modes"));
+ instance.setLicense(field(doc.valueOf("//oaf:license"), info));
+ instance.setRefereed(field(doc.valueOf("//oaf:refereed"), info));
+ instance.setProcessingchargeamount(field(doc.valueOf("//oaf:processingchargeamount"), info));
+ instance.setProcessingchargecurrency(field(doc.valueOf("//oaf:processingchargeamount/@currency"), info));
+ res.add(instance);
+ }
+ }
+ return res;
+ }
+
+ @Override
+ protected List> prepareSources(final Document doc, final DataInfo info) {
+ return prepareListFields(doc, "//dc:source", info);
+ }
+
+ @Override
+ protected List prepareRelevantDates(final Document doc, final DataInfo info) {
+ return new ArrayList<>(); // NOT PRESENT IN OAF
+ }
+
+ // SOFTWARES
+
+ @Override
+ protected Qualifier prepareSoftwareProgrammingLanguage(final Document doc, final DataInfo info) {
+ return null; // NOT PRESENT IN OAF
+ }
+
+ @Override
+ protected Field prepareSoftwareCodeRepositoryUrl(final Document doc, final DataInfo info) {
+ return null; // NOT PRESENT IN OAF
+ }
+
+ @Override
+ protected List prepareSoftwareLicenses(final Document doc, final DataInfo info) {
+ return new ArrayList<>(); // NOT PRESENT IN OAF
+ }
+
+ @Override
+ protected List> prepareSoftwareDocumentationUrls(final Document doc, final DataInfo info) {
+ return new ArrayList<>(); // NOT PRESENT IN OAF
+ }
+
+ // DATASETS
+ @Override
+ protected List prepareDatasetGeoLocations(final Document doc, final DataInfo info) {
+ return new ArrayList<>(); // NOT PRESENT IN OAF
+ }
+
+ @Override
+ protected Field prepareDatasetMetadataVersionNumber(final Document doc, final DataInfo info) {
+ return null; // NOT PRESENT IN OAF
+ }
+
+ @Override
+ protected Field prepareDatasetLastMetadataUpdate(final Document doc, final DataInfo info) {
+ return null; // NOT PRESENT IN OAF
+ }
+
+ @Override
+ protected Field prepareDatasetVersion(final Document doc, final DataInfo info) {
+ return null; // NOT PRESENT IN OAF
+ }
+
+ @Override
+ protected Field prepareDatasetSize(final Document doc, final DataInfo info) {
+ return null; // NOT PRESENT IN OAF
+ }
+
+ @Override
+ protected Field prepareDatasetDevice(final Document doc, final DataInfo info) {
+ return null; // NOT PRESENT IN OAF
+ }
+
+ @Override
+ protected Field prepareDatasetStorageDate(final Document doc, final DataInfo info) {
+ return null; // NOT PRESENT IN OAF
+ }
+
+ // OTHER PRODUCTS
+
+ @Override
+ protected List> prepareOtherResearchProductTools(final Document doc, final DataInfo info) {
+ return new ArrayList<>(); // NOT PRESENT IN OAF
+ }
+
+ @Override
+ protected List> prepareOtherResearchProductContactGroups(final Document doc, final DataInfo info) {
+ return new ArrayList<>(); // NOT PRESENT IN OAF
+ }
+
+ @Override
+ protected List> prepareOtherResearchProductContactPersons(final Document doc, final DataInfo info) {
+ return new ArrayList<>(); // NOT PRESENT IN OAF
+ }
+
+ @Override
+ protected List addOtherResultRels(final Document doc,
+ final KeyValue collectedFrom,
+ final DataInfo info,
+ final long lastUpdateTimestamp) {
+ final String docId = createOpenaireId(50, doc.valueOf("//dri:objIdentifier"));
+
+ final List res = new ArrayList<>();
+
+ for (final Object o : doc.selectNodes("//*[local-name()='relatedDataset']")) {
+ final String otherId = createOpenaireId(50, ((Node) o).getText());
+
+ final Relation r1 = new Relation();
+ r1.setRelType("resultResult");
+ r1.setSubRelType("publicationDataset");
+ r1.setRelClass("isRelatedTo");
+ r1.setSource(docId);
+ r1.setTarget(otherId);
+ r1.setCollectedFrom(Arrays.asList(collectedFrom));
+ r1.setDataInfo(info);
+ r1.setLastupdatetimestamp(lastUpdateTimestamp);
+ res.add(r1);
+
+ final Relation r2 = new Relation();
+ r2.setRelType("resultResult");
+ r2.setSubRelType("publicationDataset");
+ r2.setRelClass("isRelatedTo");
+ r2.setSource(otherId);
+ r2.setTarget(docId);
+ r2.setCollectedFrom(Arrays.asList(collectedFrom));
+ r2.setDataInfo(info);
+ r2.setLastupdatetimestamp(lastUpdateTimestamp);
+ res.add(r2);
+ }
+ return res;
+ }
+
+ @Override
+ protected Qualifier prepareResourceType(final Document doc, final DataInfo info) {
+ return null; // NOT PRESENT IN OAF
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/OdfMigrationExecutor.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/OdfMigrationExecutor.java
new file mode 100644
index 000000000..457534085
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/OdfMigrationExecutor.java
@@ -0,0 +1,273 @@
+package eu.dnetlib.dhp.migration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.dom4j.Document;
+import org.dom4j.Node;
+
+import eu.dnetlib.dhp.schema.oaf.Author;
+import eu.dnetlib.dhp.schema.oaf.DataInfo;
+import eu.dnetlib.dhp.schema.oaf.Field;
+import eu.dnetlib.dhp.schema.oaf.GeoLocation;
+import eu.dnetlib.dhp.schema.oaf.Instance;
+import eu.dnetlib.dhp.schema.oaf.KeyValue;
+import eu.dnetlib.dhp.schema.oaf.Oaf;
+import eu.dnetlib.dhp.schema.oaf.Qualifier;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
+
+public class OdfMigrationExecutor extends AbstractMongoExecutor {
+
+ private static final Log log = LogFactory.getLog(OdfMigrationExecutor.class);
+
+ public OdfMigrationExecutor(final String hdfsPath, final String hdfsNameNode, final String hdfsUser, final String mongoBaseUrl, final String mongoDb,
+ final String dbUrl, final String dbUser,
+ final String dbPassword) throws Exception {
+ super(hdfsPath, hdfsNameNode, hdfsUser, mongoBaseUrl, mongoDb, dbUrl, dbUser, dbPassword);
+ }
+
+ @Override
+ protected void registerNamespaces(final Map nsContext) {
+ super.registerNamespaces(nsContext);
+ nsContext.put("dc", "http://datacite.org/schema/kernel-3");
+ }
+
+ @Override
+ protected List prepareTitles(final Document doc, final DataInfo info) {
+ return prepareListStructProps(doc, "//dc:title", MAIN_TITLE_QUALIFIER, info);
+ }
+
+ @Override
+ protected List prepareAuthors(final Document doc, final DataInfo info) {
+ final List res = new ArrayList<>();
+ int pos = 1;
+ for (final Object o : doc.selectNodes("//dc:creator")) {
+ final Node n = (Node) o;
+ final Author author = new Author();
+ author.setFullname(n.valueOf("./dc:creatorName"));
+ author.setName(n.valueOf("./dc:givenName"));
+ author.setSurname(n.valueOf("./dc:familyName"));
+ author.setAffiliation(prepareListFields(doc, "./dc:affiliation", info));
+ author.setPid(preparePids(doc, info));
+ author.setRank(pos++);
+ res.add(author);
+ }
+ return res;
+ }
+
+ private List preparePids(final Document doc, final DataInfo info) {
+ final List res = new ArrayList<>();
+ for (final Object o : doc.selectNodes("./dc:nameIdentifier")) {
+ res.add(structuredProperty(((Node) o).getText(), prepareQualifier((Node) o, "./@nameIdentifierScheme", "dnet:pid_types", "dnet:pid_types"), info));
+ }
+ return res;
+ }
+
+ @Override
+ protected List prepareInstances(final Document doc, final DataInfo info, final KeyValue collectedfrom, final KeyValue hostedby) {
+ final List res = new ArrayList<>();
+ for (final Object o : doc.selectNodes("//dc:alternateIdentifier[@alternateIdentifierType='URL']")) {
+ final Instance instance = new Instance();
+ instance.setUrl(Arrays.asList(((Node) o).getText().trim()));
+ instance.setInstancetype(prepareQualifier(doc, "//dr:CobjCategory", "dnet:publication_resource", "dnet:publication_resource"));
+ instance.setCollectedfrom(collectedfrom);
+ instance.setHostedby(hostedby);
+ instance.setDateofacceptance(field(doc.valueOf("//oaf:dateAccepted"), info));
+ instance.setDistributionlocation(doc.valueOf("//oaf:distributionlocation"));
+ instance.setAccessright(prepareQualifier(doc, "//oaf:accessrights", "dnet:access_modes", "dnet:access_modes"));
+ instance.setLicense(field(doc.valueOf("//oaf:license"), info));
+ instance.setRefereed(field(doc.valueOf("//oaf:refereed"), info));
+ instance.setProcessingchargeamount(field(doc.valueOf("//oaf:processingchargeamount"), info));
+ instance.setProcessingchargecurrency(field(doc.valueOf("//oaf:processingchargeamount/@currency"), info));
+ res.add(instance);
+ }
+ return res;
+ }
+
+ @Override
+ protected List> prepareSources(final Document doc, final DataInfo info) {
+ return new ArrayList<>(); // Not present in ODF ???
+ }
+
+ @Override
+ protected List prepareRelevantDates(final Document doc, final DataInfo info) {
+ final List res = new ArrayList<>();
+ for (final Object o : doc.selectNodes("//dc:date")) {
+ final String dateType = ((Node) o).valueOf("@dateType");
+ if (StringUtils.isBlank(dateType) && !dateType.equalsIgnoreCase("Accepted") && !dateType.equalsIgnoreCase("Issued")
+ && !dateType.equalsIgnoreCase("Updated") && !dateType.equalsIgnoreCase("Available")) {
+ res.add(structuredProperty(((Node) o).getText(), "UNKNOWN", "UNKNOWN", "dnet:dataCite_date", "dnet:dataCite_date", info));
+ }
+ }
+ return res;
+ }
+
+ @Override
+ protected List> prepareCoverages(final Document doc, final DataInfo info) {
+ return new ArrayList<>(); // Not present in ODF ???
+ }
+
+ @Override
+ protected List> prepareContributors(final Document doc, final DataInfo info) {
+ return prepareListFields(doc, "//dc:contributorName", info);
+ }
+
+ @Override
+ protected List> prepareFormats(final Document doc, final DataInfo info) {
+ return prepareListFields(doc, "//dc:format", info);
+ }
+
+ @Override
+ protected Field preparePublisher(final Document doc, final DataInfo info) {
+ return prepareField(doc, "//dc:publisher", info);
+ }
+
+ @Override
+ protected List> prepareDescriptions(final Document doc, final DataInfo info) {
+ return prepareListFields(doc, "//dc:description[@descriptionType='Abstract']", info);
+ }
+
+ @Override
+ protected List prepareSubjects(final Document doc, final DataInfo info) {
+ return prepareListStructProps(doc, "//dc:subject", info);
+ }
+
+ @Override
+ protected Qualifier prepareLanguages(final Document doc) {
+ return prepareQualifier(doc, "//dc:language", "dnet:languages", "dnet:languages");
+ }
+
+ @Override
+ protected List> prepareOtherResearchProductTools(final Document doc, final DataInfo info) {
+ return new ArrayList<>(); // Not present in ODF ???
+ }
+
+ @Override
+ protected List> prepareOtherResearchProductContactGroups(final Document doc, final DataInfo info) {
+ return prepareListFields(doc, "//dc:contributor[@contributorType='ContactGroup']/dc:contributorName", info);
+ }
+
+ @Override
+ protected List> prepareOtherResearchProductContactPersons(final Document doc, final DataInfo info) {
+ return prepareListFields(doc, "//dc:contributor[@contributorType='ContactPerson']/dc:contributorName", info);
+ }
+
+ @Override
+ protected Qualifier prepareSoftwareProgrammingLanguage(final Document doc, final DataInfo info) {
+ return prepareQualifier(doc, "//dc:format", "dnet:programming_languages", "dnet:programming_languages");
+ }
+
+ @Override
+ protected Field prepareSoftwareCodeRepositoryUrl(final Document doc, final DataInfo info) {
+ return null; // Not present in ODF ???
+ }
+
+ @Override
+ protected List prepareSoftwareLicenses(final Document doc, final DataInfo info) {
+ return new ArrayList<>(); // Not present in ODF ???
+ }
+
+ @Override
+ protected List> prepareSoftwareDocumentationUrls(final Document doc, final DataInfo info) {
+ return prepareListFields(doc, "//dc:relatedIdentifier[@relatedIdentifierType='URL' and @relationType='IsDocumentedBy']", info);
+ }
+
+ // DATASETS
+
+ @Override
+ protected List prepareDatasetGeoLocations(final Document doc, final DataInfo info) {
+ final List res = new ArrayList<>();
+
+ for (final Object o : doc.selectNodes("//dc:geoLocation")) {
+ final GeoLocation loc = new GeoLocation();
+ loc.setBox(((Node) o).valueOf("./dc:geoLocationBox"));
+ loc.setPlace(((Node) o).valueOf("./dc:geoLocationPlace"));
+ loc.setPoint(((Node) o).valueOf("./dc:geoLocationPoint"));
+ res.add(loc);
+ }
+ return res;
+ }
+
+ @Override
+ protected Field prepareDatasetMetadataVersionNumber(final Document doc, final DataInfo info) {
+ return null; // Not present in ODF ???
+ }
+
+ @Override
+ protected Field prepareDatasetLastMetadataUpdate(final Document doc, final DataInfo info) {
+ return prepareField(doc, "//dc:date[@dateType='Updated']", info);
+ }
+
+ @Override
+ protected Field prepareDatasetVersion(final Document doc, final DataInfo info) {
+ return prepareField(doc, "//dc:version", info);
+ }
+
+ @Override
+ protected Field prepareDatasetSize(final Document doc, final DataInfo info) {
+ return prepareField(doc, "//dc:size", info);
+ }
+
+ @Override
+ protected Field prepareDatasetDevice(final Document doc, final DataInfo info) {
+ return null; // Not present in ODF ???
+ }
+
+ @Override
+ protected Field prepareDatasetStorageDate(final Document doc, final DataInfo info) {
+ return prepareField(doc, "//dc:date[@dateType='Issued']", info);
+ }
+
+ @Override
+ protected List addOtherResultRels(final Document doc, final KeyValue collectedFrom, final DataInfo info, final long lastUpdateTimestamp) {
+
+ final String docId = createOpenaireId(50, doc.valueOf("//dri:objIdentifier"));
+
+ final List res = new ArrayList<>();
+
+ for (final Object o : doc.selectNodes("//*[local-name() = 'resource']//*[local-name()='relatedIdentifier' and ./@relatedIdentifierType='OPENAIRE']")) {
+ final String otherId = createOpenaireId(50, ((Node) o).getText());
+ final String type = ((Node) o).valueOf("@relationType");
+
+ if (type.equals("IsSupplementTo")) {
+ res.add(prepareOtherResultRel(collectedFrom, info, lastUpdateTimestamp, docId, otherId, "supplement", "isSupplementTo"));
+ res.add(prepareOtherResultRel(collectedFrom, info, lastUpdateTimestamp, otherId, docId, "supplement", "isSupplementedBy"));
+ } else if (type.equals("IsPartOf")) {
+ res.add(prepareOtherResultRel(collectedFrom, info, lastUpdateTimestamp, docId, otherId, "part", "IsPartOf"));
+ res.add(prepareOtherResultRel(collectedFrom, info, lastUpdateTimestamp, otherId, docId, "part", "HasParts"));
+ } else {}
+ }
+ return res;
+ }
+
+ private Relation prepareOtherResultRel(final KeyValue collectedFrom,
+ final DataInfo info,
+ final long lastUpdateTimestamp,
+ final String source,
+ final String target,
+ final String subRelType,
+ final String relClass) {
+ final Relation r = new Relation();
+ r.setRelType("resultResult");
+ r.setSubRelType(subRelType);
+ r.setRelClass(relClass);
+ r.setSource(source);
+ r.setTarget(target);
+ r.setCollectedFrom(Arrays.asList(collectedFrom));
+ r.setDataInfo(info);
+ r.setLastupdatetimestamp(lastUpdateTimestamp);
+ return r;
+ }
+
+ @Override
+ protected Qualifier prepareResourceType(final Document doc, final DataInfo info) {
+ return prepareQualifier(doc, "//*[local-name() = 'resource']//*[local-name() = 'resourceType']", "dnet:dataCite_resource", "dnet:dataCite_resource");
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/pace/PacePerson.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/pace/PacePerson.java
new file mode 100644
index 000000000..927f5641b
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/pace/PacePerson.java
@@ -0,0 +1,176 @@
+package eu.dnetlib.dhp.migration.pace;
+
+import java.nio.charset.Charset;
+import java.text.Normalizer;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.text.WordUtils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.hash.Hashing;
+
+public class PacePerson {
+
+ private static final String UTF8 = "UTF-8";
+ private List name = Lists.newArrayList();
+ private List surname = Lists.newArrayList();
+ private List fullname = Lists.newArrayList();
+ private final String original;
+
+ private static Set particles = null;
+
+ public static final String capitalize(final String s) {
+ return WordUtils.capitalize(s.toLowerCase(), ' ', '-');
+ }
+
+ public static final String dotAbbreviations(final String s) {
+ return s.length() == 1 ? s + "." : s;
+ }
+
+ public static Set loadFromClasspath(final String classpath) {
+ final Set h = new HashSet<>();
+ try {
+ for (final String s : IOUtils.readLines(PacePerson.class.getResourceAsStream(classpath))) {
+ h.add(s);
+ }
+ } catch (final Throwable e) {
+ return new HashSet<>();
+ }
+ return h;
+ }
+
+ public PacePerson(String s, final boolean aggressive) {
+ original = s;
+ s = Normalizer.normalize(s, Normalizer.Form.NFD);
+ s = s.replaceAll("\\(.+\\)", "");
+ s = s.replaceAll("\\[.+\\]", "");
+ s = s.replaceAll("\\{.+\\}", "");
+ s = s.replaceAll("\\s+-\\s+", "-");
+ s = s.replaceAll("[\\p{Punct}&&[^,-]]", " ");
+ s = s.replaceAll("\\d", " ");
+ s = s.replaceAll("\\n", " ");
+ s = s.replaceAll("\\.", " ");
+ s = s.replaceAll("\\s+", " ");
+
+ if (aggressive) {
+ s = s.replaceAll("[\\p{InCombiningDiacriticalMarks}&&[^,-]]", "");
+ // s = s.replaceAll("[\\W&&[^,-]]", "");
+ }
+
+ if (s.contains(",")) {
+ final String[] arr = s.split(",");
+ if (arr.length == 1) {
+ fullname = splitTerms(arr[0]);
+ } else if (arr.length > 1) {
+ surname = splitTerms(arr[0]);
+ name = splitTerms(arr[1]);
+ fullname.addAll(surname);
+ fullname.addAll(name);
+ }
+ } else {
+ fullname = splitTerms(s);
+
+ int lastInitialPosition = fullname.size();
+ boolean hasSurnameInUpperCase = false;
+
+ for (int i = 0; i < fullname.size(); i++) {
+ final String term = fullname.get(i);
+ if (term.length() == 1) {
+ lastInitialPosition = i;
+ } else if (term.equals(term.toUpperCase())) {
+ hasSurnameInUpperCase = true;
+ }
+ }
+
+ if (lastInitialPosition < fullname.size() - 1) { // Case: Michele G. Artini
+ name = fullname.subList(0, lastInitialPosition + 1);
+ surname = fullname.subList(lastInitialPosition + 1, fullname.size());
+ } else if (hasSurnameInUpperCase) { // Case: Michele ARTINI
+ for (final String term : fullname) {
+ if (term.length() > 1 && term.equals(term.toUpperCase())) {
+ surname.add(term);
+ } else {
+ name.add(term);
+ }
+ }
+ }
+ }
+ }
+
+ private List splitTerms(final String s) {
+ if (particles == null) {
+ particles = loadFromClasspath("/eu/dnetlib/dhp/migration/pace/name_particles.txt");
+ }
+
+ final List list = Lists.newArrayList();
+ for (final String part : Splitter.on(" ").omitEmptyStrings().split(s)) {
+ if (!particles.contains(part.toLowerCase())) {
+ list.add(part);
+ }
+ }
+ return list;
+ }
+
+ public List getName() {
+ return name;
+ }
+
+ public String getNameString() {
+ return Joiner.on(" ").join(getName());
+ }
+
+ public List getSurname() {
+ return surname;
+ }
+
+ public List getFullname() {
+ return fullname;
+ }
+
+ public String getOriginal() {
+ return original;
+ }
+
+ public String hash() {
+ return Hashing.murmur3_128().hashString(getNormalisedFullname(), Charset.forName(UTF8)).toString();
+ }
+
+ public String getNormalisedFirstName() {
+ return Joiner.on(" ").join(getCapitalFirstnames());
+ }
+
+ public String getNormalisedSurname() {
+ return Joiner.on(" ").join(getCapitalSurname());
+ }
+
+ public String getSurnameString() {
+ return Joiner.on(" ").join(getSurname());
+ }
+
+ public String getNormalisedFullname() {
+ return isAccurate() ? getNormalisedSurname() + ", " + getNormalisedFirstName() : Joiner.on(" ").join(fullname);
+ }
+
+ public List getCapitalFirstnames() {
+ return Lists.newArrayList(Iterables.transform(getNameWithAbbreviations(), PacePerson::capitalize));
+ }
+
+ public List getCapitalSurname() {
+ return Lists.newArrayList(Iterables.transform(surname, PacePerson::capitalize));
+ }
+
+ public List getNameWithAbbreviations() {
+ return Lists.newArrayList(Iterables.transform(name, PacePerson::dotAbbreviations));
+ }
+
+ public boolean isAccurate() {
+ return name != null && surname != null && !name.isEmpty() && !surname.isEmpty();
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_db_entities_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_db_entities_parameters.json
new file mode 100644
index 000000000..5e9f378f5
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_db_entities_parameters.json
@@ -0,0 +1,38 @@
+[
+ {
+ "paramName": "p",
+ "paramLongName": "hdfsPath",
+ "paramDescription": "the path where storing the sequential file",
+ "paramRequired": true
+ },
+ {
+ "paramName": "n",
+ "paramLongName": "namenode",
+ "paramDescription": "the Name Node URI",
+ "paramRequired": true
+ },
+ {
+ "paramName": "u",
+ "paramLongName": "hdfsUser",
+ "paramDescription": "the user wich create the hdfs seq file",
+ "paramRequired": true
+ },
+ {
+ "paramName": "dburl",
+ "paramLongName": "postgresUrl",
+ "paramDescription": "postgres url, example: jdbc:postgresql://localhost:5432/testdb",
+ "paramRequired": true
+ },
+ {
+ "paramName": "dbuser",
+ "paramLongName": "postgresUser",
+ "paramDescription": "postgres user",
+ "paramRequired": false
+ },
+ {
+ "paramName": "dbpasswd",
+ "paramLongName": "postgresPassword",
+ "paramDescription": "postgres password",
+ "paramRequired": false
+ }
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_mongo_mstores_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_mongo_mstores_parameters.json
new file mode 100644
index 000000000..5738daa76
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_mongo_mstores_parameters.json
@@ -0,0 +1,68 @@
+[
+ {
+ "paramName": "p",
+ "paramLongName": "hdfsPath",
+ "paramDescription": "the path where storing the sequential file",
+ "paramRequired": true
+ },
+ {
+ "paramName": "n",
+ "paramLongName": "namenode",
+ "paramDescription": "the Name Node URI",
+ "paramRequired": true
+ },
+ {
+ "paramName": "u",
+ "paramLongName": "hdfsUser",
+ "paramDescription": "the user wich create the hdfs seq file",
+ "paramRequired": true
+ },
+ {
+ "paramName": "mongourl",
+ "paramLongName": "mongoBaseUrl",
+ "paramDescription": "mongoDB url, example: mongodb://[username:password@]host[:port]",
+ "paramRequired": true
+ },
+ {
+ "paramName": "db",
+ "paramLongName": "mongoDb",
+ "paramDescription": "mongo database",
+ "paramRequired": true
+ },
+ {
+ "paramName": "f",
+ "paramLongName": "mdFormat",
+ "paramDescription": "metadata format",
+ "paramRequired": true
+ },
+ {
+ "paramName": "l",
+ "paramLongName": "mdLayout",
+ "paramDescription": "metadata layout",
+ "paramRequired": true
+ },
+ {
+ "paramName": "i",
+ "paramLongName": "mdInterpretation",
+ "paramDescription": "metadata interpretation",
+ "paramRequired": true
+ },
+ {
+ "paramName": "pgurl",
+ "paramLongName": "postgresUrl",
+ "paramDescription": "postgres url, example: jdbc:postgresql://localhost:5432/testdb",
+ "paramRequired": true
+ },
+ {
+ "paramName": "pguser",
+ "paramLongName": "postgresUser",
+ "paramDescription": "postgres user",
+ "paramRequired": false
+ },
+ {
+ "paramName": "pgpasswd",
+ "paramLongName": "postgresPassword",
+ "paramDescription": "postgres password",
+ "paramRequired": false
+ }
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/pace/name_particles.txt b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/pace/name_particles.txt
new file mode 100644
index 000000000..dae37c9dc
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/pace/name_particles.txt
@@ -0,0 +1,7 @@
+van
+der
+de
+dell
+sig
+mr
+mrs
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryDatasourceOrganization.sql b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryDatasourceOrganization.sql
new file mode 100644
index 000000000..745f83971
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryDatasourceOrganization.sql
@@ -0,0 +1,17 @@
+SELECT
+ dor.datasource AS datasource,
+ dor.organization AS organization,
+ NULL AS startdate,
+ NULL AS enddate,
+ false AS inferred,
+ false AS deletedbyinference,
+ 0.9 AS trust,
+ NULL AS inferenceprovenance,
+ dc.id AS collectedfromid,
+ dc.officialname AS collectedfromname,
+ 'providedBy@@@provided by@@@dnet:datasources_organizations_typologies@@@dnet:datasources_organizations_typologies' AS semantics,
+ d.provenanceaction || '@@@' || d.provenanceaction || '@@@dnet:provenanceActions@@@dnet:provenanceActions' AS provenanceaction
+
+FROM dsm_datasource_organization dor
+ LEFT OUTER JOIN dsm_datasources d ON (dor.datasource = d.id)
+ LEFT OUTER JOIN dsm_datasources dc ON (dc.id = d.collectedfrom)
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryDatasources.sql b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryDatasources.sql
new file mode 100644
index 000000000..8c587f34e
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryDatasources.sql
@@ -0,0 +1,147 @@
+SELECT
+ d.id AS datasourceid,
+ d.id || array_agg(distinct di.pid) AS identities,
+ d.officialname AS officialname,
+ d.englishname AS englishname,
+ d.contactemail AS contactemail,
+ CASE
+ WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility):: TEXT) @> ARRAY ['openaire-cris_1.1'])
+ THEN
+ 'openaire-cris_1.1@@@OpenAIRE CRIS v1.1@@@dnet:datasourceCompatibilityLevel@@@dnet:datasourceCompatibilityLevel'
+ WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility):: TEXT) @> ARRAY ['driver', 'openaire2.0'])
+ THEN
+ 'driver-openaire2.0@@@OpenAIRE 2.0+ (DRIVER OA, EC funding)@@@dnet:datasourceCompatibilityLevel@@@dnet:datasourceCompatibilityLevel'
+ WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['driver'])
+ THEN
+ 'driver@@@OpenAIRE Basic (DRIVER OA)@@@dnet:datasourceCompatibilityLevel@@@dnet:datasourceCompatibilityLevel'
+ WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['openaire2.0'])
+ THEN
+ 'openaire2.0@@@OpenAIRE 2.0 (EC funding)@@@dnet:datasourceCompatibilityLevel@@@dnet:datasourceCompatibilityLevel'
+ WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['openaire3.0'])
+ THEN
+ 'openaire3.0@@@OpenAIRE 3.0 (OA, funding)@@@dnet:datasourceCompatibilityLevel@@@dnet:datasourceCompatibilityLevel'
+ WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['openaire2.0_data'])
+ THEN
+ 'openaire2.0_data@@@OpenAIRE Data (funded, referenced datasets)@@@dnet:datasourceCompatibilityLevel@@@dnet:datasourceCompatibilityLevel'
+ WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['native'])
+ THEN
+ 'native@@@proprietary@@@dnet:datasourceCompatibilityLevel@@@dnet:datasourceCompatibilityLevel'
+ WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['hostedBy'])
+ THEN
+ 'hostedBy@@@collected from a compatible aggregator@@@dnet:datasourceCompatibilityLevel@@@dnet:datasourceCompatibilityLevel'
+ WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility) :: TEXT) @> ARRAY ['notCompatible'])
+ THEN
+ 'notCompatible@@@under validation@@@dnet:datasourceCompatibilityLevel@@@dnet:datasourceCompatibilityLevel'
+ ELSE
+ 'UNKNOWN@@@not available@@@dnet:datasourceCompatibilityLevel@@@dnet:datasourceCompatibilityLevel'
+ END AS openairecompatibility,
+ d.websiteurl AS websiteurl,
+ d.logourl AS logourl,
+ array_agg(DISTINCT CASE WHEN a.protocol = 'oai' and last_aggregation_date is not null THEN a.baseurl ELSE NULL END) AS accessinfopackage,
+ d.latitude AS latitude,
+ d.longitude AS longitude,
+ d.namespaceprefix AS namespaceprefix,
+ NULL AS odnumberofitems,
+ NULL AS odnumberofitemsdate,
+
+ (SELECT array_agg(s|| '###keywords@@@keywords@@@dnet:subject_classification_typologies@@@dnet:subject_classification_typologies')
+ FROM UNNEST(
+ ARRAY(
+ SELECT trim(s)
+ FROM unnest(string_to_array(d.subjects, '@@')) AS s)) AS s) AS subjects,
+
+ d.description AS description,
+ NULL AS odpolicies,
+ ARRAY(SELECT trim(s)
+ FROM unnest(string_to_array(d.languages, ',')) AS s) AS odlanguages,
+ ARRAY(SELECT trim(s)
+ FROM unnest(string_to_array(d.od_contenttypes, '-')) AS s) AS odcontenttypes,
+ false AS inferred,
+ false AS deletedbyinference,
+ 0.9 AS trust,
+ NULL AS inferenceprovenance,
+ d.dateofcollection AS dateofcollection,
+ d.dateofvalidation AS dateofvalidation,
+ -- re3data fields
+ d.releasestartdate AS releasestartdate,
+ d.releaseenddate AS releaseenddate,
+ d.missionstatementurl AS missionstatementurl,
+ d.dataprovider AS dataprovider,
+ d.serviceprovider AS serviceprovider,
+ d.databaseaccesstype AS databaseaccesstype,
+ d.datauploadtype AS datauploadtype,
+ d.databaseaccessrestriction AS databaseaccessrestriction,
+ d.datauploadrestriction AS datauploadrestriction,
+ d.versioning AS versioning,
+ d.citationguidelineurl AS citationguidelineurl,
+ d.qualitymanagementkind AS qualitymanagementkind,
+ d.pidsystems AS pidsystems,
+ d.certificates AS certificates,
+ ARRAY[]::text[] AS policies,
+ dc.id AS collectedfromid,
+ dc.officialname AS collectedfromname,
+ d.typology || '@@@' || CASE
+ WHEN (d.typology = 'crissystem') THEN 'CRIS System'
+ WHEN (d.typology = 'datarepository::unknown') THEN 'Data Repository'
+ WHEN (d.typology = 'aggregator::datarepository') THEN 'Data Repository Aggregator'
+ WHEN (d.typology = 'infospace') THEN 'Information Space'
+ WHEN (d.typology = 'pubsrepository::institutional') THEN 'Institutional Repository'
+ WHEN (d.typology = 'aggregator::pubsrepository::institutional') THEN 'Institutional Repository Aggregator'
+ WHEN (d.typology = 'pubsrepository::journal') THEN 'Journal'
+ WHEN (d.typology = 'aggregator::pubsrepository::journals') THEN 'Journal Aggregator/Publisher'
+ WHEN (d.typology = 'pubsrepository::mock') THEN 'Other'
+ WHEN (d.typology = 'pubscatalogue::unknown') THEN 'Publication Catalogue'
+ WHEN (d.typology = 'pubsrepository::unknown') THEN 'Publication Repository'
+ WHEN (d.typology = 'aggregator::pubsrepository::unknown') THEN 'Publication Repository Aggregator'
+ WHEN (d.typology = 'entityregistry') THEN 'Registry'
+ WHEN (d.typology = 'scholarcomminfra') THEN 'Scholarly Comm. Infrastructure'
+ WHEN (d.typology = 'pubsrepository::thematic') THEN 'Thematic Repository'
+ WHEN (d.typology = 'websource') THEN 'Web Source'
+ WHEN (d.typology = 'entityregistry::projects') THEN 'Funder database'
+ WHEN (d.typology = 'entityregistry::repositories') THEN 'Registry of repositories'
+ WHEN (d.typology = 'softwarerepository') THEN 'Software Repository'
+ WHEN (d.typology = 'aggregator::softwarerepository') THEN 'Software Repository Aggregator'
+ WHEN (d.typology = 'orprepository') THEN 'Repository'
+ ELSE 'Other'
+ END || '@@@dnet:datasource_typologies@@@dnet:datasource_typologies' AS datasourcetype,
+ 'sysimport:crosswalk:entityregistry@@@sysimport:crosswalk:entityregistry@@@dnet:provenance_actions@@@dnet:provenance_actions' AS provenanceaction,
+ CONCAT(d.issn, '@@@', d.eissn, '@@@', d.lissn) AS journal
+
+FROM dsm_datasources d
+
+LEFT OUTER JOIN dsm_datasources dc on (d.collectedfrom = dc.id)
+LEFT OUTER JOIN dsm_api a ON (d.id = a.datasource)
+LEFT OUTER JOIN dsm_datasourcepids di ON (d.id = di.datasource)
+
+GROUP BY
+ d.id,
+ d.officialname,
+ d.englishname,
+ d.websiteurl,
+ d.logourl,
+ d.contactemail,
+ d.namespaceprefix,
+ d.description,
+ d.latitude,
+ d.longitude,
+ d.dateofcollection,
+ d.dateofvalidation,
+ d.releasestartdate,
+ d.releaseenddate,
+ d.missionstatementurl,
+ d.dataprovider,
+ d.serviceprovider,
+ d.databaseaccesstype,
+ d.datauploadtype,
+ d.databaseaccessrestriction,
+ d.datauploadrestriction,
+ d.versioning,
+ d.citationguidelineurl,
+ d.qualitymanagementkind,
+ d.pidsystems,
+ d.certificates,
+ dc.id,
+ dc.officialname,
+ d.issn,
+ d.eissn,
+ d.lissn
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryOrganizations.sql b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryOrganizations.sql
new file mode 100644
index 000000000..682ca3596
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryOrganizations.sql
@@ -0,0 +1,36 @@
+SELECT
+ o.id AS organizationid,
+ o.legalshortname AS legalshortname,
+ o.legalname AS legalname,
+ o.websiteurl AS websiteurl,
+ o.logourl AS logourl,
+ o.ec_legalbody AS eclegalbody,
+ o.ec_legalperson AS eclegalperson,
+ o.ec_nonprofit AS ecnonprofit,
+ o.ec_researchorganization AS ecresearchorganization,
+ o.ec_highereducation AS echighereducation,
+ o.ec_internationalorganizationeurinterests AS ecinternationalorganizationeurinterests,
+ o.ec_internationalorganization AS ecinternationalorganization,
+ o.ec_enterprise AS ecenterprise,
+ o.ec_smevalidated AS ecsmevalidated,
+ o.ec_nutscode AS ecnutscode,
+ o.dateofcollection AS dateofcollection,
+ o.lastupdate AS dateoftransformation,
+ false AS inferred,
+ false AS deletedbyinference,
+ o.trust AS trust,
+ '' AS inferenceprovenance,
+ d.id AS collectedfromid,
+ d.officialname AS collectedfromname,
+
+ o.country || '@@@dnet:countries' AS country,
+ 'sysimport:crosswalk:entityregistry@@@sysimport:crosswalk:entityregistry@@@dnet:provenance_actions@@@dnet:provenance_actions' AS provenanceaction,
+
+ ARRAY[]::text[] AS pid
+FROM dsm_organizations o
+ LEFT OUTER JOIN dsm_datasources d ON (d.id = o.collectedfrom)
+
+
+
+
+
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryOrganizationsFromOpenOrgsDB.sql b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryOrganizationsFromOpenOrgsDB.sql
new file mode 100644
index 000000000..dc9550883
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryOrganizationsFromOpenOrgsDB.sql
@@ -0,0 +1,53 @@
+SELECT
+ o.id AS organizationid,
+ coalesce((array_agg(a.acronym))[1], o.name) AS legalshortname,
+ o.name AS legalname,
+ array_agg(DISTINCT n.name) AS "alternativeNames",
+ (array_agg(u.url))[1] AS websiteurl,
+ o.modification_date AS dateoftransformation,
+ false AS inferred,
+ false AS deletedbyinference,
+ 0.95 AS trust,
+ '' AS inferenceprovenance,
+ 'openaire____::openorgs' AS collectedfromid,
+ 'OpenOrgs Database' AS collectedfromname,
+ o.country || '@@@dnet:countries' AS country,
+ 'sysimport:crosswalk:entityregistry@@@sysimport:crosswalk:entityregistry@@@dnet:provenance_actions@@@dnet:provenance_actions' AS provenanceaction,
+ array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid
+FROM organizations o
+ LEFT OUTER JOIN acronyms a ON (a.id = o.id)
+ LEFT OUTER JOIN urls u ON (u.id = o.id)
+ LEFT OUTER JOIN other_ids i ON (i.id = o.id)
+ LEFT OUTER JOIN other_names n ON (n.id = o.id)
+GROUP BY
+ o.id,
+ o.name,
+ o.modification_date,
+ o.country
+
+UNION ALL
+
+SELECT
+ 'openorgsmesh'||substring(o.id, 13)||'-'||md5(n.name) AS organizationid,
+ n.name AS legalshortname,
+ n.name AS legalname,
+ ARRAY[]::text[] AS "alternativeNames",
+ (array_agg(u.url))[1] AS websiteurl,
+ o.modification_date AS dateoftransformation,
+ false AS inferred,
+ false AS deletedbyinference,
+ 0.88 AS trust,
+ '' AS inferenceprovenance,
+ 'openaire____::openorgs' AS collectedfromid,
+ 'OpenOrgs Database' AS collectedfromname,
+ o.country || '@@@dnet:countries' AS country,
+ 'sysimport:crosswalk:entityregistry@@@sysimport:crosswalk:entityregistry@@@dnet:provenance_actions@@@dnet:provenance_actions' AS provenanceaction,
+ array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid
+FROM other_names n
+ LEFT OUTER JOIN organizations o ON (n.id = o.id)
+ LEFT OUTER JOIN urls u ON (u.id = o.id)
+ LEFT OUTER JOIN other_ids i ON (i.id = o.id)
+GROUP BY
+ o.id, o.modification_date, o.country, n.name
+
+
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryProjectOrganization.sql b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryProjectOrganization.sql
new file mode 100644
index 000000000..4c06ca5b9
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryProjectOrganization.sql
@@ -0,0 +1,19 @@
+SELECT
+ po.project AS project,
+ po.resporganization AS resporganization,
+ po.participantnumber AS participantnumber,
+ po.contribution AS contribution,
+ NULL AS startdate,
+ NULL AS enddate,
+ false AS inferred,
+ false AS deletedbyinference,
+ po.trust AS trust,
+ NULL AS inferenceprovenance,
+ dc.id AS collectedfromid,
+ dc.officialname AS collectedfromname,
+ po.semanticclass || '@@@' || po.semanticclass || '@@@dnet:project_organization_relations@@@dnet:project_organization_relations' AS semantics,
+ 'sysimport:crosswalk:entityregistry@@@sysimport:crosswalk:entityregistry@@@dnet:provenance_actions@@@dnet:provenance_actions' AS provenanceaction
+
+FROM project_organization po
+ LEFT OUTER JOIN projects p ON (p.id = po.project)
+ LEFT OUTER JOIN dsm_datasources dc ON (dc.id = p.collectedfrom)
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryProjects.sql b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryProjects.sql
new file mode 100644
index 000000000..6cff18875
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryProjects.sql
@@ -0,0 +1,90 @@
+SELECT
+ p.id AS projectid,
+ p.code AS code,
+ p.websiteurl AS websiteurl,
+ p.acronym AS acronym,
+ p.title AS title,
+ p.startdate AS startdate,
+ p.enddate AS enddate,
+ p.call_identifier AS callidentifier,
+ p.keywords AS keywords,
+ p.duration AS duration,
+ p.ec_sc39 AS ecsc39,
+ p.oa_mandate_for_publications AS oamandatepublications,
+ p.ec_article29_3 AS ecarticle29_3,
+ p.dateofcollection AS dateofcollection,
+ p.lastupdate AS dateoftransformation,
+ p.inferred AS inferred,
+ p.deletedbyinference AS deletedbyinference,
+ p.trust AS trust,
+ p.inferenceprovenance AS inferenceprovenance,
+ p.optional1 AS optional1,
+ p.optional2 AS optional2,
+ p.jsonextrainfo AS jsonextrainfo,
+ p.contactfullname AS contactfullname,
+ p.contactfax AS contactfax,
+ p.contactphone AS contactphone,
+ p.contactemail AS contactemail,
+ p.summary AS summary,
+ p.currency AS currency,
+ p.totalcost AS totalcost,
+ p.fundedamount AS fundedamount,
+ dc.id AS collectedfromid,
+ dc.officialname AS collectedfromname,
+ ctc.code || '@@@' || ctc.name || '@@@' || cts.code || '@@@' || cts.name AS contracttype,
+ pac.code || '@@@' || pac.name || '@@@' || pas.code || '@@@' || pas.name AS provenanceaction,
+ array_agg(DISTINCT i.pid || '###' || i.issuertype) AS pid,
+ array_agg(DISTINCT s.name || '###' || sc.code || '@@@' || sc.name || '@@@' || ss.code || '@@@' || ss.name) AS subjects,
+ array_agg(DISTINCT fp.path) AS fundingtree
+ FROM projects p
+ LEFT OUTER JOIN class pac ON (pac.code = p.provenanceactionclass)
+ LEFT OUTER JOIN scheme pas ON (pas.code = p.provenanceactionscheme)
+
+ LEFT OUTER JOIN projectpids pp ON (pp.project = p.id)
+ LEFT OUTER JOIN dsm_identities i ON (i.pid = pp.pid)
+
+ LEFT OUTER JOIN dsm_datasources dc ON (dc.id = p.collectedfrom)
+
+ LEFT OUTER JOIN project_fundingpath pf ON (pf.project = p.id)
+ LEFT OUTER JOIN fundingpaths fp ON (fp.id = pf.funding)
+
+ LEFT OUTER JOIN project_subject ps ON (ps.project = p.id)
+ LEFT OUTER JOIN subjects s ON (s.id = ps.subject)
+
+ LEFT OUTER JOIN class sc ON (sc.code = s.semanticclass)
+ LEFT OUTER JOIN scheme ss ON (ss.code = s.semanticscheme)
+
+ LEFT OUTER JOIN class ctc ON (ctc.code = p.contracttypeclass)
+ LEFT OUTER JOIN scheme cts ON (cts.code = p.contracttypescheme)
+
+ GROUP BY
+ p.id,
+ p.code,
+ p.websiteurl,
+ p.acronym,
+ p.title,
+ p.startdate,
+ p.enddate,
+ p.call_identifier,
+ p.keywords,
+ p.duration,
+ p.ec_sc39,
+ p.oa_mandate_for_publications,
+ p.ec_article29_3,
+ p.dateofcollection,
+ p.inferred,
+ p.deletedbyinference,
+ p.trust,
+ p.inferenceprovenance,
+ p.contactfullname,
+ p.contactfax,
+ p.contactphone,
+ p.contactemail,
+ p.summary,
+ p.currency,
+ p.totalcost,
+ p.fundedamount,
+ dc.id,
+ dc.officialname,
+ pac.code, pac.name, pas.code, pas.name,
+ ctc.code, ctc.name, cts.code, cts.name;
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/querySimilarityFromOpenOrgsDB.sql b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/querySimilarityFromOpenOrgsDB.sql
new file mode 100644
index 000000000..4407559c6
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/querySimilarityFromOpenOrgsDB.sql
@@ -0,0 +1,17 @@
+SELECT local_id AS id1, oa_original_id AS id2 FROM openaire_simrels WHERE reltype = 'is_similar'
+
+UNION ALL
+
+SELECT
+ o.id AS id1,
+ 'openorgsmesh'||substring(o.id, 13)||'-'||md5(a.acronym) AS id2
+FROM acronyms a
+ LEFT OUTER JOIN organizations o ON (a.id = o.id)
+
+UNION ALL
+
+SELECT
+ o.id AS id1,
+ 'openorgsmesh'||substring(o.id, 13)||'-'||md5(n.name) AS id2
+FROM other_names n
+ LEFT OUTER JOIN organizations o ON (n.id = o.id)
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/log4j.properties b/dhp-workflows/dhp-aggregation/src/main/resources/log4j.properties
new file mode 100644
index 000000000..63cba917e
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/log4j.properties
@@ -0,0 +1,9 @@
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=INFO, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/dhp-workflows/dhp-dedup/pom.xml b/dhp-workflows/dhp-dedup/pom.xml
index 6aef8f313..0721af25d 100644
--- a/dhp-workflows/dhp-dedup/pom.xml
+++ b/dhp-workflows/dhp-dedup/pom.xml
@@ -8,6 +8,37 @@
4.0.0
dhp-dedup
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+ 4.0.1
+
+
+ scala-compile-first
+ initialize
+
+ add-source
+ compile
+
+
+
+ scala-test-compile
+ process-test-resources
+
+ testCompile
+
+
+
+
+ ${scala.version}
+
+
+
+
+
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java
index ab19ff2b5..0291be47e 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java
@@ -1,23 +1,31 @@
package eu.dnetlib.dhp.graph;
-import com.google.common.collect.Maps;
-import eu.dnetlib.dhp.schema.oaf.*;
-
import java.util.Map;
+import com.google.common.collect.Maps;
+
+import eu.dnetlib.dhp.schema.oaf.Dataset;
+import eu.dnetlib.dhp.schema.oaf.Datasource;
+import eu.dnetlib.dhp.schema.oaf.Organization;
+import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
+import eu.dnetlib.dhp.schema.oaf.Project;
+import eu.dnetlib.dhp.schema.oaf.Publication;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+import eu.dnetlib.dhp.schema.oaf.Software;
+
public class GraphMappingUtils {
- public final static Map types = Maps.newHashMap();
+ public final static Map types = Maps.newHashMap();
- static {
- types.put("datasource", Datasource.class);
- types.put("organization", Organization.class);
- types.put("project", Project.class);
- types.put("dataset", Dataset.class);
- types.put("otherresearchproduct", OtherResearchProduct.class);
- types.put("software", Software.class);
- types.put("publication", Publication.class);
- types.put("relation", Relation.class);
- }
+ static {
+ types.put("datasource", Datasource.class);
+ types.put("organization", Organization.class);
+ types.put("project", Project.class);
+ types.put("dataset", Dataset.class);
+ types.put("otherresearchproduct", OtherResearchProduct.class);
+ types.put("software", Software.class);
+ types.put("publication", Publication.class);
+ types.put("relation", Relation.class);
+ }
}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java
index a6a4e9291..463bffae9 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java
@@ -1,7 +1,5 @@
package eu.dnetlib.dhp.graph;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.api.java.JavaRDD;
@@ -9,42 +7,47 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import scala.Tuple2;
public class SparkGraphImporterJob {
- public static void main(String[] args) throws Exception {
+ public static void main(final String[] args) throws Exception {
- final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGraphImporterJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_graph_parameters.json")));
- parser.parseArgument(args);
- final SparkSession spark = SparkSession
- .builder()
- .appName(SparkGraphImporterJob.class.getSimpleName())
- .master(parser.get("master"))
- .config("hive.metastore.uris", parser.get("hive_metastore_uris"))
- .enableHiveSupport()
- .getOrCreate();
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils.toString(SparkGraphImporterJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_graph_parameters.json")));
+ parser.parseArgument(args);
+ final SparkSession spark = SparkSession
+ .builder()
+ .appName(SparkGraphImporterJob.class.getSimpleName())
+ .master(parser.get("master"))
+ .config("hive.metastore.uris", parser.get("hive_metastore_uris"))
+ .enableHiveSupport()
+ .getOrCreate();
- final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
- final String inputPath = parser.get("sourcePath");
- final String hiveDbName = parser.get("hive_db_name");
+ final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
+ final String inputPath = parser.get("sourcePath");
+ final String hiveDbName = parser.get("hive_db_name");
- spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName));
+ spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", hiveDbName));
- // Read the input file and convert it into RDD of serializable object
- GraphMappingUtils.types.forEach((name, clazz) -> {
- final JavaRDD> inputRDD = sc.sequenceFile(inputPath + "/" + name, Text.class, Text.class)
- .map(item -> new Tuple2<>(item._1.toString(), item._2.toString()));
+ // Read the input file and convert it into RDD of serializable object
+ GraphMappingUtils.types.forEach((name, clazz) -> {
+ final JavaRDD> inputRDD = sc.sequenceFile(inputPath + "/" + name, Text.class, Text.class)
+ .map(item -> new Tuple2<>(item._1.toString(), item._2.toString()));
- spark.createDataset(inputRDD
- .filter(s -> s._1().equals(clazz.getName()))
- .map(Tuple2::_2)
- .map(s -> new ObjectMapper().readValue(s, clazz))
- .rdd(), Encoders.bean(clazz))
- .write()
- .mode(SaveMode.Overwrite)
- .saveAsTable(hiveDbName + "." + name);
- });
+ spark.createDataset(inputRDD
+ .filter(s -> s._1().equals(clazz.getName()))
+ .map(Tuple2::_2)
+ .map(s -> new ObjectMapper().readValue(s, clazz))
+ .rdd(), Encoders.bean(clazz))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .saveAsTable(hiveDbName + "." + name);
+ });
- }
+ }
}
diff --git a/pom.xml b/pom.xml
index 300af6a61..6f85886c0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,426 +1,411 @@
-
+
- 4.0.0
- eu.dnetlib.dhp
- dhp
- 1.1.6-SNAPSHOT
- pom
+ 4.0.0
+ eu.dnetlib.dhp
+ dhp
+ 1.1.6-SNAPSHOT
+ pom
- http://www.d-net.research-infrastructures.eu
+ http://www.d-net.research-infrastructures.eu
-
-
- The Apache Software License, Version 2.0
- http://www.apache.org/licenses/LICENSE-2.0.txt
- repo
- A business-friendly OSS license
-
-
+
+
+ The Apache Software License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+ repo
+ A business-friendly OSS license
+
+
-
- dhp-build
- dhp-schemas
- dhp-common
- dhp-workflows
-
+
+ dhp-build
+ dhp-schemas
+ dhp-common
+ dhp-workflows
+
-
- Redmine
- https://issue.openaire.research-infrastructures.eu/projects/openaire
-
+
+ Redmine
+ https://issue.openaire.research-infrastructures.eu/projects/openaire
+
-
- jenkins
- https://jenkins-dnet.d4science.org/
-
+
+ jenkins
+ https://jenkins-dnet.d4science.org/
+
-
- scm:git:gitea@code-repo.d4science.org:D-Net/dnet-hadoop.git
- scm:git:gitea@code-repo.d4science.org:D-Net/dnet-hadoop.git
- https://code-repo.d4science.org/D-Net/dnet-hadoop/
- HEAD
-
+
+ scm:git:gitea@code-repo.d4science.org:D-Net/dnet-hadoop.git
+ scm:git:gitea@code-repo.d4science.org:D-Net/dnet-hadoop.git
+ https://code-repo.d4science.org/D-Net/dnet-hadoop/
+ HEAD
+
-
-
+
+
-
-
- dnet45-releases
- D-Net 45 releases
- http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases
- default
-
- false
-
-
- true
-
-
-
- cloudera
- Cloudera Repository
- https://repository.cloudera.com/artifactory/cloudera-repos
-
- true
-
-
- false
-
-
-
+
+
+ dnet45-releases
+ D-Net 45 releases
+ http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases
+ default
+
+ false
+
+
+ true
+
+
+
+ cloudera
+ Cloudera Repository
+ https://repository.cloudera.com/artifactory/cloudera-repos
+
+ true
+
+
+ false
+
+
+
-
-
- junit
- junit
- 4.12
- test
-
+
+
+ junit
+ junit
+ 4.12
+ test
+
-
- org.mockito
- mockito-core
- 2.7.22
- test
-
+
+ org.mockito
+ mockito-core
+ 2.7.22
+ test
+
-
+
-
-
-
- org.apache.hadoop
- hadoop-hdfs
- ${dhp.hadoop.version}
- provided
-
-
- org.apache.hadoop
- hadoop-client
- ${dhp.hadoop.version}
- provided
-
-
- org.apache.spark
- spark-core_2.11
- ${dhp.spark.version}
- provided
-
-
- org.apache.spark
- spark-sql_2.11
- ${dhp.spark.version}
- provided
-
-
- org.apache.spark
- spark-graphx_2.11
- ${dhp.spark.version}
- provided
-
+
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ ${dhp.hadoop.version}
+ provided
+
+
+ org.apache.hadoop
+ hadoop-client
+ ${dhp.hadoop.version}
+ provided
+
+
+ org.apache.spark
+ spark-core_2.11
+ ${dhp.spark.version}
+ provided
+
+
+ org.apache.spark
+ spark-sql_2.11
+ ${dhp.spark.version}
+ provided
+
+
+ org.apache.spark
+ spark-graphx_2.11
+ ${dhp.spark.version}
+ provided
+
-
- org.apache.commons
- commons-lang3
- ${dhp.commons.lang.version}
-
+
+ org.apache.commons
+ commons-lang3
+ ${dhp.commons.lang.version}
+
-
- commons-codec
- commons-codec
- 1.9
-
+
+ commons-codec
+ commons-codec
+ 1.9
+
-
- commons-io
- commons-io
- 2.4
-
+
+ commons-io
+ commons-io
+ 2.4
+
-
- commons-cli
- commons-cli
- 1.2
- provided
-
+
+ commons-cli
+ commons-cli
+ 1.2
+ provided
+
-
- net.sf.saxon
- Saxon-HE
- 9.5.1-5
-
+
+ net.sf.saxon
+ Saxon-HE
+ 9.5.1-5
+
-
- dom4j
- dom4j
- 1.6.1
-
+
+ dom4j
+ dom4j
+ 1.6.1
+
-
- xml-apis
- xml-apis
- 1.4.01
-
+
+ xml-apis
+ xml-apis
+ 1.4.01
+
-
- jaxen
- jaxen
- 1.1.6
-
+
+ jaxen
+ jaxen
+ 1.1.6
+
-
- net.schmizz
- sshj
- 0.10.0
- test
-
+
+ net.schmizz
+ sshj
+ 0.10.0
+ test
+
-
- com.fasterxml.jackson.core
- jackson-core
- ${dhp.jackson.version}
- provided
-
+
+ com.fasterxml.jackson.core
+ jackson-core
+ ${dhp.jackson.version}
+ provided
+
-
- com.fasterxml.jackson.core
- jackson-annotations
- ${dhp.jackson.version}
- provided
-
-
- com.fasterxml.jackson.core
- jackson-databind
- ${dhp.jackson.version}
- provided
-
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ ${dhp.jackson.version}
+ provided
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ ${dhp.jackson.version}
+ provided
+
-
- eu.dnetlib
- dnet-pace-core
- 4.0.0
-
+
+ eu.dnetlib
+ dnet-pace-core
+ 4.0.0-SNAPSHOT
+
-
- javax.persistence
- javax.persistence-api
- 2.2
- provided
-
+
+ javax.persistence
+ javax.persistence-api
+ 2.2
+ provided
+
-
- com.rabbitmq
- amqp-client
- 5.6.0
-
-
- com.jayway.jsonpath
- json-path
- 2.4.0
-
-
- com.arakelian
- java-jq
- 0.10.1
-
-
- edu.cmu
- secondstring
- 1.0.0
-
+
+ com.rabbitmq
+ amqp-client
+ 5.6.0
+
+
+ com.jayway.jsonpath
+ json-path
+ 2.4.0
+
+
+ com.arakelian
+ java-jq
+ 0.10.1
+
+
+ edu.cmu
+ secondstring
+ 1.0.0
+
-
- org.apache.oozie
- oozie-client
- ${dhp.oozie.version}
- provided
-
-
-
- slf4j-simple
- org.slf4j
-
-
-
-
-
+
+ org.mongodb
+ mongo-java-driver
+ ${mongodb.driver.version}
+
-
- target
- target/classes
- ${project.artifactId}-${project.version}
- target/test-classes
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- ${maven.compiler.plugin.version}
-
-
- 1.8
- ${project.build.sourceEncoding}
-
-
+
+ org.apache.oozie
+ oozie-client
+ ${dhp.oozie.version}
+ provided
+
+
+
+ slf4j-simple
+ org.slf4j
+
+
+
+
+
-
- org.apache.maven.plugins
- maven-jar-plugin
- 3.0.2
-
+
+ target
+ target/classes
+ ${project.artifactId}-${project.version}
+ target/test-classes
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ ${maven.compiler.plugin.version}
+
+
+ 1.8
+ ${project.build.sourceEncoding}
+
+
-
- org.apache.maven.plugins
- maven-source-plugin
- 3.0.1
-
-
- attach-sources
- verify
-
- jar-no-fork
-
-
-
-
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+ 3.0.2
+
-
- org.apache.maven.plugins
- maven-surefire-plugin
- 2.19.1
-
- true
-
-
-
- org.apache.maven.plugins
- maven-javadoc-plugin
- 2.10.4
-
- true
-
-
-
- org.apache.maven.plugins
- maven-dependency-plugin
- 3.0.0
-
+
+ org.apache.maven.plugins
+ maven-source-plugin
+ 3.0.1
+
+
+ attach-sources
+ verify
+
+ jar-no-fork
+
+
+
+
-
- org.codehaus.mojo
- build-helper-maven-plugin
- 1.12
-
-
-
-
-
- org.apache.maven.plugins
- maven-release-plugin
- 2.5.3
-
-
- org.jacoco
- jacoco-maven-plugin
- 0.7.9
-
-
- **/schemas/*
- **/com/cloudera/**/*
- **/org/apache/avro/io/**/*
-
-
-
-
- default-prepare-agent
-
- prepare-agent
-
-
-
- default-report
- prepare-package
-
- report
-
-
-
-
-
- net.alchim31.maven
- scala-maven-plugin
- 4.0.1
-
-
- scala-compile-first
- initialize
-
- add-source
- compile
-
-
-
- scala-test-compile
- process-test-resources
-
- testCompile
-
-
-
-
- ${scala.version}
-
-
-
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.19.1
+
+ true
+
+
+
+ org.apache.maven.plugins
+ maven-javadoc-plugin
+ 2.10.4
+
+ true
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+ 3.0.0
+
-
-
- org.apache.maven.wagon
- wagon-ssh
- 2.10
-
-
-
-
-
- dnet45-snapshots
- DNet45 Snapshots
- http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-snapshots
- default
-
-
- dnet45-releases
- http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases
-
-
-
-
-
- org.apache.maven.plugins
- maven-javadoc-plugin
- 2.10.4
-
- true
-
-
-
-
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+ 1.12
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-release-plugin
+ 2.5.3
+
+
+ org.jacoco
+ jacoco-maven-plugin
+ 0.7.9
+
+
+ **/schemas/*
+ **/com/cloudera/**/*
+ **/org/apache/avro/io/**/*
+
+
+
+
+ default-prepare-agent
+
+ prepare-agent
+
+
+
+ default-report
+ prepare-package
+
+ report
+
+
+
+
-
- UTF-8
- UTF-8
- 3.6.0
- 2.22.2
- cdh5.9.2
- 2.6.0-${dhp.cdh.version}
- 4.1.0-${dhp.cdh.version}
- 2.4.0.cloudera2
- 2.9.6
- 3.5
- 2.11.12
-
+
+
+
+
+ org.apache.maven.wagon
+ wagon-ssh
+ 2.10
+
+
+
+
+
+ dnet45-snapshots
+ DNet45 Snapshots
+ http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-snapshots
+ default
+
+
+ dnet45-releases
+ http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-javadoc-plugin
+ 2.10.4
+
+ true
+
+
+
+
+
+
+ UTF-8
+ UTF-8
+ 3.6.0
+ 2.22.2
+ cdh5.9.2
+ 2.6.0-${dhp.cdh.version}
+ 4.1.0-${dhp.cdh.version}
+ 2.4.0.cloudera2
+ 2.9.6
+ 3.5
+ 2.11.12
+ 3.4.2
+