From a5d7007005e80d363d74cf7fe9f6e564f1901f0e Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Wed, 29 Apr 2020 12:05:41 +0200 Subject: [PATCH] Fix relations in migration Fix pom.xml in dhp-stats-update --- .../raw/MigrateDbEntitiesApplication.java | 1017 ++++++++--------- 1 file changed, 506 insertions(+), 511 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java index 7ba1f2576..f048698c8 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java @@ -1,4 +1,3 @@ - package eu.dnetlib.dhp.oa.graph.raw; import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.asString; @@ -11,23 +10,6 @@ import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listKeyValues; import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier; import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty; -import java.io.Closeable; -import java.io.IOException; -import java.sql.Array; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; -import java.util.List; -import java.util.function.Consumer; -import java.util.function.Function; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; import eu.dnetlib.dhp.oa.graph.raw.common.DbClient; @@ -49,499 +31,512 @@ import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Software; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import java.io.Closeable; +import java.io.IOException; +import java.sql.Array; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; public class MigrateDbEntitiesApplication extends AbstractMigrationApplication - implements Closeable { - - private static final Log log = LogFactory.getLog(MigrateDbEntitiesApplication.class); - - private final DbClient dbClient; - - private final long lastUpdateTimestamp; - - public static void main(final String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - MigrateDbEntitiesApplication.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/migrate_db_entities_parameters.json"))); - - parser.parseArgument(args); - - final String dbUrl = parser.get("postgresUrl"); - final String dbUser = parser.get("postgresUser"); - final String dbPassword = parser.get("postgresPassword"); - - final String hdfsPath = parser.get("hdfsPath"); - - final boolean processClaims = parser.get("action") != null && parser.get("action").equalsIgnoreCase("claims"); - - try (final MigrateDbEntitiesApplication smdbe = new MigrateDbEntitiesApplication(hdfsPath, dbUrl, dbUser, - dbPassword)) { - if (processClaims) { - log.info("Processing claims..."); - smdbe.execute("queryClaims.sql", smdbe::processClaims); - } else { - log.info("Processing datasources..."); - smdbe.execute("queryDatasources.sql", smdbe::processDatasource); - - log.info("Processing projects..."); - smdbe.execute("queryProjects.sql", smdbe::processProject); - - log.info("Processing orgs..."); - smdbe.execute("queryOrganizations.sql", smdbe::processOrganization); - - log.info("Processing relations ds <-> orgs ..."); - smdbe.execute("queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization); - - log.info("Processing projects <-> orgs ..."); - smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization); - } - log.info("All done."); - } - } - - protected MigrateDbEntitiesApplication() { // ONLY FOR UNIT TEST - super(); - this.dbClient = null; - this.lastUpdateTimestamp = new Date().getTime(); - } - - public MigrateDbEntitiesApplication( - final String hdfsPath, final String dbUrl, final String dbUser, final String dbPassword) - throws Exception { - super(hdfsPath); - this.dbClient = new DbClient(dbUrl, dbUser, dbPassword); - this.lastUpdateTimestamp = new Date().getTime(); - } - - public void execute(final String sqlFile, final Function> producer) - throws Exception { - final String sql = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/sql/" + sqlFile)); - - final Consumer consumer = rs -> producer.apply(rs).forEach(oaf -> emitOaf(oaf)); - - dbClient.processResults(sql, consumer); - } - - public List processDatasource(final ResultSet rs) { - - try { - - final DataInfo info = prepareDataInfo(rs); - - final Datasource ds = new Datasource(); - - ds.setId(createOpenaireId(10, rs.getString("datasourceid"), true)); - ds.setOriginalId(Arrays.asList(rs.getString("datasourceid"))); - ds - .setCollectedfrom( - listKeyValues( - createOpenaireId(10, rs.getString("collectedfromid"), true), - rs.getString("collectedfromname"))); - ds.setPid(new ArrayList<>()); - ds.setDateofcollection(asString(rs.getDate("dateofcollection"))); - ds.setDateoftransformation(null); // Value not returned by the SQL query - ds.setExtraInfo(new ArrayList<>()); // Values not present in the DB - ds.setOaiprovenance(null); // Values not present in the DB - ds.setDatasourcetype(prepareQualifierSplitting(rs.getString("datasourcetype"))); - ds.setOpenairecompatibility(prepareQualifierSplitting(rs.getString("openairecompatibility"))); - ds.setOfficialname(field(rs.getString("officialname"), info)); - ds.setEnglishname(field(rs.getString("englishname"), info)); - ds.setWebsiteurl(field(rs.getString("websiteurl"), info)); - ds.setLogourl(field(rs.getString("logourl"), info)); - ds.setContactemail(field(rs.getString("contactemail"), info)); - ds.setNamespaceprefix(field(rs.getString("namespaceprefix"), info)); - ds.setLatitude(field(Double.toString(rs.getDouble("latitude")), info)); - ds.setLongitude(field(Double.toString(rs.getDouble("longitude")), info)); - ds.setDateofvalidation(field(asString(rs.getDate("dateofvalidation")), info)); - ds.setDescription(field(rs.getString("description"), info)); - ds.setSubjects(prepareListOfStructProps(rs.getArray("subjects"), info)); - ds.setOdnumberofitems(field(Double.toString(rs.getInt("odnumberofitems")), info)); - ds.setOdnumberofitemsdate(field(asString(rs.getDate("odnumberofitemsdate")), info)); - ds.setOdpolicies(field(rs.getString("odpolicies"), info)); - ds.setOdlanguages(prepareListFields(rs.getArray("odlanguages"), info)); - ds.setOdcontenttypes(prepareListFields(rs.getArray("odcontenttypes"), info)); - ds.setAccessinfopackage(prepareListFields(rs.getArray("accessinfopackage"), info)); - ds.setReleasestartdate(field(asString(rs.getDate("releasestartdate")), info)); - ds.setReleaseenddate(field(asString(rs.getDate("releaseenddate")), info)); - ds.setMissionstatementurl(field(rs.getString("missionstatementurl"), info)); - ds.setDataprovider(field(rs.getBoolean("dataprovider"), info)); - ds.setServiceprovider(field(rs.getBoolean("serviceprovider"), info)); - ds.setDatabaseaccesstype(field(rs.getString("databaseaccesstype"), info)); - ds.setDatauploadtype(field(rs.getString("datauploadtype"), info)); - ds.setDatabaseaccessrestriction(field(rs.getString("databaseaccessrestriction"), info)); - ds.setDatauploadrestriction(field(rs.getString("datauploadrestriction"), info)); - ds.setVersioning(field(rs.getBoolean("versioning"), info)); - ds.setCitationguidelineurl(field(rs.getString("citationguidelineurl"), info)); - ds.setQualitymanagementkind(field(rs.getString("qualitymanagementkind"), info)); - ds.setPidsystems(field(rs.getString("pidsystems"), info)); - ds.setCertificates(field(rs.getString("certificates"), info)); - ds.setPolicies(new ArrayList<>()); // The sql query returns an empty array - ds - .setJournal( - prepareJournal(rs.getString("officialname"), rs.getString("journal"), info)); // Journal - ds.setDataInfo(info); - ds.setLastupdatetimestamp(lastUpdateTimestamp); - - return Arrays.asList(ds); - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - - public List processProject(final ResultSet rs) { - try { - - final DataInfo info = prepareDataInfo(rs); - - final Project p = new Project(); - - p.setId(createOpenaireId(40, rs.getString("projectid"), true)); - p.setOriginalId(Arrays.asList(rs.getString("projectid"))); - p - .setCollectedfrom( - listKeyValues( - createOpenaireId(10, rs.getString("collectedfromid"), true), - rs.getString("collectedfromname"))); - p.setPid(new ArrayList<>()); - p.setDateofcollection(asString(rs.getDate("dateofcollection"))); - p.setDateoftransformation(asString(rs.getDate("dateoftransformation"))); - p.setExtraInfo(new ArrayList<>()); // Values not present in the DB - p.setOaiprovenance(null); // Values not present in the DB - p.setWebsiteurl(field(rs.getString("websiteurl"), info)); - p.setCode(field(rs.getString("code"), info)); - p.setAcronym(field(rs.getString("acronym"), info)); - p.setTitle(field(rs.getString("title"), info)); - p.setStartdate(field(asString(rs.getDate("startdate")), info)); - p.setEnddate(field(asString(rs.getDate("enddate")), info)); - p.setCallidentifier(field(rs.getString("callidentifier"), info)); - p.setKeywords(field(rs.getString("keywords"), info)); - p.setDuration(field(Integer.toString(rs.getInt("duration")), info)); - p.setEcsc39(field(Boolean.toString(rs.getBoolean("ecsc39")), info)); - p - .setOamandatepublications( - field(Boolean.toString(rs.getBoolean("oamandatepublications")), info)); - p.setEcarticle29_3(field(Boolean.toString(rs.getBoolean("ecarticle29_3")), info)); - p.setSubjects(prepareListOfStructProps(rs.getArray("subjects"), info)); - p.setFundingtree(prepareListFields(rs.getArray("fundingtree"), info)); - p.setContracttype(prepareQualifierSplitting(rs.getString("contracttype"))); - p.setOptional1(field(rs.getString("optional1"), info)); - p.setOptional2(field(rs.getString("optional2"), info)); - p.setJsonextrainfo(field(rs.getString("jsonextrainfo"), info)); - p.setContactfullname(field(rs.getString("contactfullname"), info)); - p.setContactfax(field(rs.getString("contactfax"), info)); - p.setContactphone(field(rs.getString("contactphone"), info)); - p.setContactemail(field(rs.getString("contactemail"), info)); - p.setSummary(field(rs.getString("summary"), info)); - p.setCurrency(field(rs.getString("currency"), info)); - p.setTotalcost(new Float(rs.getDouble("totalcost"))); - p.setFundedamount(new Float(rs.getDouble("fundedamount"))); - p.setDataInfo(info); - p.setLastupdatetimestamp(lastUpdateTimestamp); - - return Arrays.asList(p); - - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - - public List processOrganization(final ResultSet rs) { - - try { - - final DataInfo info = prepareDataInfo(rs); - - final Organization o = new Organization(); - - o.setId(createOpenaireId(20, rs.getString("organizationid"), true)); - o.setOriginalId(Arrays.asList(rs.getString("organizationid"))); - o - .setCollectedfrom( - listKeyValues( - createOpenaireId(10, rs.getString("collectedfromid"), true), - rs.getString("collectedfromname"))); - o.setPid(new ArrayList<>()); - o.setDateofcollection(asString(rs.getDate("dateofcollection"))); - o.setDateoftransformation(asString(rs.getDate("dateoftransformation"))); - o.setExtraInfo(new ArrayList<>()); // Values not present in the DB - o.setOaiprovenance(null); // Values not present in the DB - o.setLegalshortname(field(rs.getString("legalshortname"), info)); - o.setLegalname(field(rs.getString("legalname"), info)); - o.setAlternativeNames(new ArrayList<>()); // Values not returned by the SQL query - o.setWebsiteurl(field(rs.getString("websiteurl"), info)); - o.setLogourl(field(rs.getString("logourl"), info)); - o.setEclegalbody(field(Boolean.toString(rs.getBoolean("eclegalbody")), info)); - o.setEclegalperson(field(Boolean.toString(rs.getBoolean("eclegalperson")), info)); - o.setEcnonprofit(field(Boolean.toString(rs.getBoolean("ecnonprofit")), info)); - o - .setEcresearchorganization( - field(Boolean.toString(rs.getBoolean("ecresearchorganization")), info)); - o.setEchighereducation(field(Boolean.toString(rs.getBoolean("echighereducation")), info)); - o - .setEcinternationalorganizationeurinterests( - field(Boolean.toString(rs.getBoolean("ecinternationalorganizationeurinterests")), info)); - o - .setEcinternationalorganization( - field(Boolean.toString(rs.getBoolean("ecinternationalorganization")), info)); - o.setEcenterprise(field(Boolean.toString(rs.getBoolean("ecenterprise")), info)); - o.setEcsmevalidated(field(Boolean.toString(rs.getBoolean("ecsmevalidated")), info)); - o.setEcnutscode(field(Boolean.toString(rs.getBoolean("ecnutscode")), info)); - o.setCountry(prepareQualifierSplitting(rs.getString("country"))); - o.setDataInfo(info); - o.setLastupdatetimestamp(lastUpdateTimestamp); - - return Arrays.asList(o); - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - - public List processDatasourceOrganization(final ResultSet rs) { - try { - final DataInfo info = prepareDataInfo(rs); - final String orgId = createOpenaireId(20, rs.getString("organization"), true); - final String dsId = createOpenaireId(10, rs.getString("datasource"), true); - final List collectedFrom = listKeyValues( - createOpenaireId(10, rs.getString("collectedfromid"), true), - rs.getString("collectedfromname")); - - final Relation r1 = new Relation(); - r1.setRelType("datasourceOrganization"); - r1.setSubRelType("provision"); - r1.setRelClass("isProvidedBy"); - r1.setSource(dsId); - r1.setTarget(orgId); - r1.setCollectedfrom(collectedFrom); - r1.setDataInfo(info); - r1.setLastupdatetimestamp(lastUpdateTimestamp); - - final Relation r2 = new Relation(); - r2.setRelType("datasourceOrganization"); - r2.setSubRelType("provision"); - r2.setRelClass("provides"); - r2.setSource(orgId); - r2.setTarget(dsId); - r2.setCollectedfrom(collectedFrom); - r2.setDataInfo(info); - r2.setLastupdatetimestamp(lastUpdateTimestamp); - - return Arrays.asList(r1, r2); - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - - public List processProjectOrganization(final ResultSet rs) { - try { - final DataInfo info = prepareDataInfo(rs); - final String orgId = createOpenaireId(20, rs.getString("resporganization"), true); - final String projectId = createOpenaireId(40, rs.getString("project"), true); - final List collectedFrom = listKeyValues( - createOpenaireId(10, rs.getString("collectedfromid"), true), - rs.getString("collectedfromname")); - - final Relation r1 = new Relation(); - r1.setRelType("projectOrganization"); - r1.setSubRelType("participation"); - r1.setRelClass("isParticipant"); - r1.setSource(projectId); - r1.setTarget(orgId); - r1.setCollectedfrom(collectedFrom); - r1.setDataInfo(info); - r1.setLastupdatetimestamp(lastUpdateTimestamp); - - final Relation r2 = new Relation(); - r2.setRelType("projectOrganization"); - r2.setSubRelType("participation"); - r2.setRelClass("hasParticipant"); - r2.setSource(orgId); - r2.setTarget(projectId); - r2.setCollectedfrom(collectedFrom); - r2.setDataInfo(info); - r2.setLastupdatetimestamp(lastUpdateTimestamp); - - return Arrays.asList(r1, r2); - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - - public List processClaims(final ResultSet rs) { - - final DataInfo info = dataInfo( - false, - null, - false, - false, - qualifier( - "user:claim", "user:claim", "dnet:provenanceActions", "dnet:provenanceActions"), - "0.9"); - - final List collectedFrom = listKeyValues( - createOpenaireId(10, "infrastruct_::openaire", true), "OpenAIRE"); - - try { - - if (rs.getString("source_type").equals("context")) { - final Result r; - - if (rs.getString("target_type").equals("dataset")) { - r = new Dataset(); - r.setResulttype(MigrationConstants.DATASET_RESULTTYPE_QUALIFIER); - } else if (rs.getString("target_type").equals("software")) { - r = new Software(); - r.setResulttype(MigrationConstants.SOFTWARE_RESULTTYPE_QUALIFIER); - } else if (rs.getString("target_type").equals("other")) { - r = new OtherResearchProduct(); - r.setResulttype(MigrationConstants.OTHER_RESULTTYPE_QUALIFIER); - } else { - r = new Publication(); - r.setResulttype(MigrationConstants.PUBLICATION_RESULTTYPE_QUALIFIER); - } - r.setId(createOpenaireId(50, rs.getString("target_id"), false)); - r.setLastupdatetimestamp(lastUpdateTimestamp); - r.setContext(prepareContext(rs.getString("source_id"), info)); - r.setDataInfo(info); - r.setCollectedfrom(collectedFrom); - - return Arrays.asList(r); - } else { - final String sourceId = createOpenaireId(rs.getString("source_type"), rs.getString("source_id"), false); - final String targetId = createOpenaireId(rs.getString("target_type"), rs.getString("target_id"), false); - - final Relation r1 = new Relation(); - final Relation r2 = new Relation(); - - if (rs.getString("source_type").equals("project")) { - r1.setCollectedfrom(collectedFrom); - r1.setRelType("resultProject"); - r1.setSubRelType("outcome"); - r1.setRelClass("produces"); - - r2.setCollectedfrom(collectedFrom); - r2.setRelType("resultProject"); - r2.setSubRelType("outcome"); - r2.setRelClass("isProducedBy"); - } else { - r1.setCollectedfrom(collectedFrom); - r1.setRelType("resultResult"); - r1.setSubRelType("relationship"); - r1.setRelClass("isRelatedTo"); - - r2.setCollectedfrom(collectedFrom); - r2.setRelType("resultResult"); - r2.setSubRelType("relationship"); - r2.setRelClass("isRelatedTo"); - } - - r1.setSource(sourceId); - r1.setTarget(targetId); - r1.setDataInfo(info); - r1.setLastupdatetimestamp(lastUpdateTimestamp); - - r2.setSource(targetId); - r2.setTarget(sourceId); - r2.setDataInfo(info); - r2.setLastupdatetimestamp(lastUpdateTimestamp); - - return Arrays.asList(r1, r2); - } - - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - - private List prepareContext(final String id, final DataInfo dataInfo) { - final Context context = new Context(); - context.setId(id); - context.setDataInfo(Arrays.asList(dataInfo)); - return Arrays.asList(context); - } - - private DataInfo prepareDataInfo(final ResultSet rs) throws SQLException { - final Boolean deletedbyinference = rs.getBoolean("deletedbyinference"); - final String inferenceprovenance = rs.getString("inferenceprovenance"); - final Boolean inferred = rs.getBoolean("inferred"); - final String trust = rs.getString("trust"); - return dataInfo( - deletedbyinference, - inferenceprovenance, - inferred, - false, - MigrationConstants.ENTITYREGISTRY_PROVENANCE_ACTION, - trust); - } - - private Qualifier prepareQualifierSplitting(final String s) { - if (StringUtils.isBlank(s)) { - return null; - } - final String[] arr = s.split("@@@"); - return arr.length == 4 ? qualifier(arr[0], arr[1], arr[2], arr[3]) : null; - } - - private List> prepareListFields(final Array array, final DataInfo info) { - try { - return array != null ? listFields(info, (String[]) array.getArray()) : new ArrayList<>(); - } catch (final SQLException e) { - throw new RuntimeException("Invalid SQL array", e); - } - } - - private StructuredProperty prepareStructProp(final String s, final DataInfo dataInfo) { - if (StringUtils.isBlank(s)) { - return null; - } - final String[] parts = s.split("###"); - if (parts.length == 2) { - final String value = parts[0]; - final String[] arr = parts[1].split("@@@"); - if (arr.length == 4) { - return structuredProperty(value, arr[0], arr[1], arr[2], arr[3], dataInfo); - } - } - return null; - } - - private List prepareListOfStructProps( - final Array array, final DataInfo dataInfo) throws SQLException { - final List res = new ArrayList<>(); - if (array != null) { - for (final String s : (String[]) array.getArray()) { - final StructuredProperty sp = prepareStructProp(s, dataInfo); - if (sp != null) { - res.add(sp); - } - } - } - - return res; - } - - private Journal prepareJournal(final String name, final String sj, final DataInfo info) { - if (StringUtils.isNotBlank(sj)) { - final String[] arr = sj.split("@@@"); - if (arr.length == 3) { - final String issn = StringUtils.isNotBlank(arr[0]) ? arr[0] : null; - final String eissn = StringUtils.isNotBlank(arr[1]) ? arr[1] : null; - ; - final String lissn = StringUtils.isNotBlank(arr[2]) ? arr[2] : null; - ; - if (issn != null || eissn != null || lissn != null) { - return journal(name, issn, eissn, eissn, null, null, null, null, null, null, null, info); - } - } - } - return null; - } - - @Override - public void close() throws IOException { - super.close(); - dbClient.close(); - } + implements Closeable { + + private static final Log log = LogFactory.getLog(MigrateDbEntitiesApplication.class); + + private final DbClient dbClient; + + private final long lastUpdateTimestamp; + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = + new ArgumentApplicationParser( + IOUtils.toString( + MigrateDbEntitiesApplication.class.getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/migrate_db_entities_parameters.json"))); + + parser.parseArgument(args); + + final String dbUrl = parser.get("postgresUrl"); + final String dbUser = parser.get("postgresUser"); + final String dbPassword = parser.get("postgresPassword"); + + final String hdfsPath = parser.get("hdfsPath"); + + final boolean processClaims = + parser.get("action") != null && parser.get("action").equalsIgnoreCase("claims"); + + try (final MigrateDbEntitiesApplication smdbe = + new MigrateDbEntitiesApplication(hdfsPath, dbUrl, dbUser, dbPassword)) { + if (processClaims) { + log.info("Processing claims..."); + smdbe.execute("queryClaims.sql", smdbe::processClaims); + } else { + log.info("Processing datasources..."); + smdbe.execute("queryDatasources.sql", smdbe::processDatasource); + + log.info("Processing projects..."); + smdbe.execute("queryProjects.sql", smdbe::processProject); + + log.info("Processing orgs..."); + smdbe.execute("queryOrganizations.sql", smdbe::processOrganization); + + log.info("Processing relations ds <-> orgs ..."); + smdbe.execute("queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization); + + log.info("Processing projects <-> orgs ..."); + smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization); + } + log.info("All done."); + } + } + + protected MigrateDbEntitiesApplication() { // ONLY FOR UNIT TEST + super(); + this.dbClient = null; + this.lastUpdateTimestamp = new Date().getTime(); + } + + public MigrateDbEntitiesApplication( + final String hdfsPath, final String dbUrl, final String dbUser, final String dbPassword) + throws Exception { + super(hdfsPath); + this.dbClient = new DbClient(dbUrl, dbUser, dbPassword); + this.lastUpdateTimestamp = new Date().getTime(); + } + + public void execute(final String sqlFile, final Function> producer) + throws Exception { + final String sql = + IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/sql/" + sqlFile)); + + final Consumer consumer = rs -> producer.apply(rs).forEach(oaf -> emitOaf(oaf)); + + dbClient.processResults(sql, consumer); + } + + public List processDatasource(final ResultSet rs) { + + try { + + final DataInfo info = prepareDataInfo(rs); + + final Datasource ds = new Datasource(); + + ds.setId(createOpenaireId(10, rs.getString("datasourceid"), true)); + ds.setOriginalId(Arrays.asList(rs.getString("datasourceid"))); + ds.setCollectedfrom( + listKeyValues( + createOpenaireId(10, rs.getString("collectedfromid"), true), + rs.getString("collectedfromname"))); + ds.setPid(new ArrayList<>()); + ds.setDateofcollection(asString(rs.getDate("dateofcollection"))); + ds.setDateoftransformation(null); // Value not returned by the SQL query + ds.setExtraInfo(new ArrayList<>()); // Values not present in the DB + ds.setOaiprovenance(null); // Values not present in the DB + ds.setDatasourcetype(prepareQualifierSplitting(rs.getString("datasourcetype"))); + ds.setOpenairecompatibility(prepareQualifierSplitting(rs.getString("openairecompatibility"))); + ds.setOfficialname(field(rs.getString("officialname"), info)); + ds.setEnglishname(field(rs.getString("englishname"), info)); + ds.setWebsiteurl(field(rs.getString("websiteurl"), info)); + ds.setLogourl(field(rs.getString("logourl"), info)); + ds.setContactemail(field(rs.getString("contactemail"), info)); + ds.setNamespaceprefix(field(rs.getString("namespaceprefix"), info)); + ds.setLatitude(field(Double.toString(rs.getDouble("latitude")), info)); + ds.setLongitude(field(Double.toString(rs.getDouble("longitude")), info)); + ds.setDateofvalidation(field(asString(rs.getDate("dateofvalidation")), info)); + ds.setDescription(field(rs.getString("description"), info)); + ds.setSubjects(prepareListOfStructProps(rs.getArray("subjects"), info)); + ds.setOdnumberofitems(field(Double.toString(rs.getInt("odnumberofitems")), info)); + ds.setOdnumberofitemsdate(field(asString(rs.getDate("odnumberofitemsdate")), info)); + ds.setOdpolicies(field(rs.getString("odpolicies"), info)); + ds.setOdlanguages(prepareListFields(rs.getArray("odlanguages"), info)); + ds.setOdcontenttypes(prepareListFields(rs.getArray("odcontenttypes"), info)); + ds.setAccessinfopackage(prepareListFields(rs.getArray("accessinfopackage"), info)); + ds.setReleasestartdate(field(asString(rs.getDate("releasestartdate")), info)); + ds.setReleaseenddate(field(asString(rs.getDate("releaseenddate")), info)); + ds.setMissionstatementurl(field(rs.getString("missionstatementurl"), info)); + ds.setDataprovider(field(rs.getBoolean("dataprovider"), info)); + ds.setServiceprovider(field(rs.getBoolean("serviceprovider"), info)); + ds.setDatabaseaccesstype(field(rs.getString("databaseaccesstype"), info)); + ds.setDatauploadtype(field(rs.getString("datauploadtype"), info)); + ds.setDatabaseaccessrestriction(field(rs.getString("databaseaccessrestriction"), info)); + ds.setDatauploadrestriction(field(rs.getString("datauploadrestriction"), info)); + ds.setVersioning(field(rs.getBoolean("versioning"), info)); + ds.setCitationguidelineurl(field(rs.getString("citationguidelineurl"), info)); + ds.setQualitymanagementkind(field(rs.getString("qualitymanagementkind"), info)); + ds.setPidsystems(field(rs.getString("pidsystems"), info)); + ds.setCertificates(field(rs.getString("certificates"), info)); + ds.setPolicies(new ArrayList<>()); // The sql query returns an empty array + ds.setJournal( + prepareJournal(rs.getString("officialname"), rs.getString("journal"), info)); // Journal + ds.setDataInfo(info); + ds.setLastupdatetimestamp(lastUpdateTimestamp); + + return Arrays.asList(ds); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + public List processProject(final ResultSet rs) { + try { + + final DataInfo info = prepareDataInfo(rs); + + final Project p = new Project(); + + p.setId(createOpenaireId(40, rs.getString("projectid"), true)); + p.setOriginalId(Arrays.asList(rs.getString("projectid"))); + p.setCollectedfrom( + listKeyValues( + createOpenaireId(10, rs.getString("collectedfromid"), true), + rs.getString("collectedfromname"))); + p.setPid(new ArrayList<>()); + p.setDateofcollection(asString(rs.getDate("dateofcollection"))); + p.setDateoftransformation(asString(rs.getDate("dateoftransformation"))); + p.setExtraInfo(new ArrayList<>()); // Values not present in the DB + p.setOaiprovenance(null); // Values not present in the DB + p.setWebsiteurl(field(rs.getString("websiteurl"), info)); + p.setCode(field(rs.getString("code"), info)); + p.setAcronym(field(rs.getString("acronym"), info)); + p.setTitle(field(rs.getString("title"), info)); + p.setStartdate(field(asString(rs.getDate("startdate")), info)); + p.setEnddate(field(asString(rs.getDate("enddate")), info)); + p.setCallidentifier(field(rs.getString("callidentifier"), info)); + p.setKeywords(field(rs.getString("keywords"), info)); + p.setDuration(field(Integer.toString(rs.getInt("duration")), info)); + p.setEcsc39(field(Boolean.toString(rs.getBoolean("ecsc39")), info)); + p.setOamandatepublications( + field(Boolean.toString(rs.getBoolean("oamandatepublications")), info)); + p.setEcarticle29_3(field(Boolean.toString(rs.getBoolean("ecarticle29_3")), info)); + p.setSubjects(prepareListOfStructProps(rs.getArray("subjects"), info)); + p.setFundingtree(prepareListFields(rs.getArray("fundingtree"), info)); + p.setContracttype(prepareQualifierSplitting(rs.getString("contracttype"))); + p.setOptional1(field(rs.getString("optional1"), info)); + p.setOptional2(field(rs.getString("optional2"), info)); + p.setJsonextrainfo(field(rs.getString("jsonextrainfo"), info)); + p.setContactfullname(field(rs.getString("contactfullname"), info)); + p.setContactfax(field(rs.getString("contactfax"), info)); + p.setContactphone(field(rs.getString("contactphone"), info)); + p.setContactemail(field(rs.getString("contactemail"), info)); + p.setSummary(field(rs.getString("summary"), info)); + p.setCurrency(field(rs.getString("currency"), info)); + p.setTotalcost(new Float(rs.getDouble("totalcost"))); + p.setFundedamount(new Float(rs.getDouble("fundedamount"))); + p.setDataInfo(info); + p.setLastupdatetimestamp(lastUpdateTimestamp); + + return Arrays.asList(p); + + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + public List processOrganization(final ResultSet rs) { + + try { + + final DataInfo info = prepareDataInfo(rs); + + final Organization o = new Organization(); + + o.setId(createOpenaireId(20, rs.getString("organizationid"), true)); + o.setOriginalId(Arrays.asList(rs.getString("organizationid"))); + o.setCollectedfrom( + listKeyValues( + createOpenaireId(10, rs.getString("collectedfromid"), true), + rs.getString("collectedfromname"))); + o.setPid(new ArrayList<>()); + o.setDateofcollection(asString(rs.getDate("dateofcollection"))); + o.setDateoftransformation(asString(rs.getDate("dateoftransformation"))); + o.setExtraInfo(new ArrayList<>()); // Values not present in the DB + o.setOaiprovenance(null); // Values not present in the DB + o.setLegalshortname(field(rs.getString("legalshortname"), info)); + o.setLegalname(field(rs.getString("legalname"), info)); + o.setAlternativeNames(new ArrayList<>()); // Values not returned by the SQL query + o.setWebsiteurl(field(rs.getString("websiteurl"), info)); + o.setLogourl(field(rs.getString("logourl"), info)); + o.setEclegalbody(field(Boolean.toString(rs.getBoolean("eclegalbody")), info)); + o.setEclegalperson(field(Boolean.toString(rs.getBoolean("eclegalperson")), info)); + o.setEcnonprofit(field(Boolean.toString(rs.getBoolean("ecnonprofit")), info)); + o.setEcresearchorganization( + field(Boolean.toString(rs.getBoolean("ecresearchorganization")), info)); + o.setEchighereducation(field(Boolean.toString(rs.getBoolean("echighereducation")), info)); + o.setEcinternationalorganizationeurinterests( + field(Boolean.toString(rs.getBoolean("ecinternationalorganizationeurinterests")), info)); + o.setEcinternationalorganization( + field(Boolean.toString(rs.getBoolean("ecinternationalorganization")), info)); + o.setEcenterprise(field(Boolean.toString(rs.getBoolean("ecenterprise")), info)); + o.setEcsmevalidated(field(Boolean.toString(rs.getBoolean("ecsmevalidated")), info)); + o.setEcnutscode(field(Boolean.toString(rs.getBoolean("ecnutscode")), info)); + o.setCountry(prepareQualifierSplitting(rs.getString("country"))); + o.setDataInfo(info); + o.setLastupdatetimestamp(lastUpdateTimestamp); + + return Arrays.asList(o); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + public List processDatasourceOrganization(final ResultSet rs) { + try { + final DataInfo info = prepareDataInfo(rs); + final String orgId = createOpenaireId(20, rs.getString("organization"), true); + final String dsId = createOpenaireId(10, rs.getString("datasource"), true); + final List collectedFrom = + listKeyValues( + createOpenaireId(10, rs.getString("collectedfromid"), true), + rs.getString("collectedfromname")); + + final Relation r1 = new Relation(); + r1.setRelType("datasourceOrganization"); + r1.setSubRelType("provision"); + r1.setRelClass("isProvidedBy"); + r1.setSource(dsId); + r1.setTarget(orgId); + r1.setCollectedfrom(collectedFrom); + r1.setDataInfo(info); + r1.setLastupdatetimestamp(lastUpdateTimestamp); + + final Relation r2 = new Relation(); + r2.setRelType("datasourceOrganization"); + r2.setSubRelType("provision"); + r2.setRelClass("provides"); + r2.setSource(orgId); + r2.setTarget(dsId); + r2.setCollectedfrom(collectedFrom); + r2.setDataInfo(info); + r2.setLastupdatetimestamp(lastUpdateTimestamp); + + return Arrays.asList(r1, r2); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + public List processProjectOrganization(final ResultSet rs) { + try { + final DataInfo info = prepareDataInfo(rs); + final String orgId = createOpenaireId(20, rs.getString("resporganization"), true); + final String projectId = createOpenaireId(40, rs.getString("project"), true); + final List collectedFrom = + listKeyValues( + createOpenaireId(10, rs.getString("collectedfromid"), true), + rs.getString("collectedfromname")); + + final Relation r1 = new Relation(); + r1.setRelType("projectOrganization"); + r1.setSubRelType("participation"); + r1.setRelClass("hasParticipant"); + r1.setSource(projectId); + r1.setTarget(orgId); + r1.setCollectedfrom(collectedFrom); + r1.setDataInfo(info); + r1.setLastupdatetimestamp(lastUpdateTimestamp); + + final Relation r2 = new Relation(); + r2.setRelType("projectOrganization"); + r2.setSubRelType("participation"); + r2.setRelClass("isParticipant"); + r2.setSource(orgId); + r2.setTarget(projectId); + r2.setCollectedfrom(collectedFrom); + r2.setDataInfo(info); + r2.setLastupdatetimestamp(lastUpdateTimestamp); + + return Arrays.asList(r1, r2); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + public List processClaims(final ResultSet rs) { + + final DataInfo info = + dataInfo( + false, + null, + false, + false, + qualifier( + "user:claim", "user:claim", "dnet:provenanceActions", "dnet:provenanceActions"), + "0.9"); + + final List collectedFrom = + listKeyValues(createOpenaireId(10, "infrastruct_::openaire", true), "OpenAIRE"); + + try { + + if (rs.getString("source_type").equals("context")) { + final Result r; + + if (rs.getString("target_type").equals("dataset")) { + r = new Dataset(); + r.setResulttype(MigrationConstants.DATASET_RESULTTYPE_QUALIFIER); + } else if (rs.getString("target_type").equals("software")) { + r = new Software(); + r.setResulttype(MigrationConstants.SOFTWARE_RESULTTYPE_QUALIFIER); + } else if (rs.getString("target_type").equals("other")) { + r = new OtherResearchProduct(); + r.setResulttype(MigrationConstants.OTHER_RESULTTYPE_QUALIFIER); + } else { + r = new Publication(); + r.setResulttype(MigrationConstants.PUBLICATION_RESULTTYPE_QUALIFIER); + } + r.setId(createOpenaireId(50, rs.getString("target_id"), false)); + r.setLastupdatetimestamp(lastUpdateTimestamp); + r.setContext(prepareContext(rs.getString("source_id"), info)); + r.setDataInfo(info); + r.setCollectedfrom(collectedFrom); + + return Arrays.asList(r); + } else { + final String sourceId = + createOpenaireId(rs.getString("source_type"), rs.getString("source_id"), false); + final String targetId = + createOpenaireId(rs.getString("target_type"), rs.getString("target_id"), false); + + final Relation r1 = new Relation(); + final Relation r2 = new Relation(); + + if (rs.getString("source_type").equals("project")) { + r1.setCollectedfrom(collectedFrom); + r1.setRelType("resultProject"); + r1.setSubRelType("outcome"); + r1.setRelClass("produces"); + + r2.setCollectedfrom(collectedFrom); + r2.setRelType("resultProject"); + r2.setSubRelType("outcome"); + r2.setRelClass("isProducedBy"); + } else { + r1.setCollectedfrom(collectedFrom); + r1.setRelType("resultResult"); + r1.setSubRelType("relationship"); + r1.setRelClass("isRelatedTo"); + + r2.setCollectedfrom(collectedFrom); + r2.setRelType("resultResult"); + r2.setSubRelType("relationship"); + r2.setRelClass("isRelatedTo"); + } + + r1.setSource(sourceId); + r1.setTarget(targetId); + r1.setDataInfo(info); + r1.setLastupdatetimestamp(lastUpdateTimestamp); + + r2.setSource(targetId); + r2.setTarget(sourceId); + r2.setDataInfo(info); + r2.setLastupdatetimestamp(lastUpdateTimestamp); + + return Arrays.asList(r1, r2); + } + + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + private List prepareContext(final String id, final DataInfo dataInfo) { + final Context context = new Context(); + context.setId(id); + context.setDataInfo(Arrays.asList(dataInfo)); + return Arrays.asList(context); + } + + private DataInfo prepareDataInfo(final ResultSet rs) throws SQLException { + final Boolean deletedbyinference = rs.getBoolean("deletedbyinference"); + final String inferenceprovenance = rs.getString("inferenceprovenance"); + final Boolean inferred = rs.getBoolean("inferred"); + final String trust = rs.getString("trust"); + return dataInfo( + deletedbyinference, + inferenceprovenance, + inferred, + false, + MigrationConstants.ENTITYREGISTRY_PROVENANCE_ACTION, + trust); + } + + private Qualifier prepareQualifierSplitting(final String s) { + if (StringUtils.isBlank(s)) { + return null; + } + final String[] arr = s.split("@@@"); + return arr.length == 4 ? qualifier(arr[0], arr[1], arr[2], arr[3]) : null; + } + + private List> prepareListFields(final Array array, final DataInfo info) { + try { + return array != null ? listFields(info, (String[]) array.getArray()) : new ArrayList<>(); + } catch (final SQLException e) { + throw new RuntimeException("Invalid SQL array", e); + } + } + + private StructuredProperty prepareStructProp(final String s, final DataInfo dataInfo) { + if (StringUtils.isBlank(s)) { + return null; + } + final String[] parts = s.split("###"); + if (parts.length == 2) { + final String value = parts[0]; + final String[] arr = parts[1].split("@@@"); + if (arr.length == 4) { + return structuredProperty(value, arr[0], arr[1], arr[2], arr[3], dataInfo); + } + } + return null; + } + + private List prepareListOfStructProps( + final Array array, final DataInfo dataInfo) throws SQLException { + final List res = new ArrayList<>(); + if (array != null) { + for (final String s : (String[]) array.getArray()) { + final StructuredProperty sp = prepareStructProp(s, dataInfo); + if (sp != null) { + res.add(sp); + } + } + } + + return res; + } + + private Journal prepareJournal(final String name, final String sj, final DataInfo info) { + if (StringUtils.isNotBlank(sj)) { + final String[] arr = sj.split("@@@"); + if (arr.length == 3) { + final String issn = StringUtils.isNotBlank(arr[0]) ? arr[0] : null; + final String eissn = StringUtils.isNotBlank(arr[1]) ? arr[1] : null; + ; + final String lissn = StringUtils.isNotBlank(arr[2]) ? arr[2] : null; + ; + if (issn != null || eissn != null || lissn != null) { + return journal(name, issn, eissn, eissn, null, null, null, null, null, null, null, info); + } + } + } + return null; + } + + @Override + public void close() throws IOException { + super.close(); + dbClient.close(); + } }