diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMigrationExecutor.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMigrationExecutor.java index bf877dcf3..3367399c6 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMigrationExecutor.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMigrationExecutor.java @@ -71,12 +71,13 @@ public class AbstractMigrationExecutor implements Closeable { value.set(objectMapper.writeValueAsString(oaf)); writer.append(key, value); } catch (final Exception e) { - e.printStackTrace(); + throw new RuntimeException(e); } } @Override public void close() throws IOException { + writer.hflush(); writer.close(); } @@ -216,4 +217,7 @@ public class AbstractMigrationExecutor implements Closeable { } + public static String asString(final Object o) { + return o == null ? "" : o.toString(); + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/DbClient.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/DbClient.java index 246dae474..9ac0089d2 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/DbClient.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/DbClient.java @@ -37,6 +37,8 @@ public class DbClient implements Closeable { public void processResults(final String sql, final Consumer consumer) { try (final Statement stmt = connection.createStatement()) { + stmt.setFetchSize(100); + try (final ResultSet rs = stmt.executeQuery(sql)) { while (rs.next()) { consumer.accept(rs); diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateDbEntitiesApplication.java index 12043709f..d22e8e5b3 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateDbEntitiesApplication.java @@ -54,11 +54,22 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl final String hdfsUser = parser.get("hdfsUser"); try (final MigrateDbEntitiesApplication smdbe = new MigrateDbEntitiesApplication(hdfsPath, hdfsNameNode, hdfsUser, dbUrl, dbUser, dbPassword)) { + log.info("Processing datasources..."); smdbe.execute("queryDatasources.sql", smdbe::processDatasource); + + log.info("Processing projects..."); smdbe.execute("queryProjects.sql", smdbe::processProject); + + log.info("Processing orgs..."); smdbe.execute("queryOrganizations.sql", smdbe::processOrganization); + + log.info("Processing relations ds <-> orgs ..."); smdbe.execute("queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization); + + log.info("Processing projects <-> orgs ..."); smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization); + + log.info("All done."); } } @@ -75,6 +86,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl } public void processDatasource(final ResultSet rs) { + try { final DataInfo info = prepareDataInfo(rs); @@ -85,7 +97,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl ds.setOriginalId(Arrays.asList(rs.getString("datasourceid"))); ds.setCollectedfrom(listKeyValues(rs.getString("collectedfromid"), rs.getString("collectedfromname"))); ds.setPid(new ArrayList<>()); - ds.setDateofcollection(rs.getDate("dateofcollection").toString()); + ds.setDateofcollection(asString(rs.getDate("dateofcollection"))); ds.setDateoftransformation(null); // Value not returned by the SQL query ds.setExtraInfo(new ArrayList<>()); // Values not present in the DB ds.setOaiprovenance(null); // Values not present in the DB @@ -99,17 +111,17 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl ds.setNamespaceprefix(field(rs.getString("namespaceprefix"), info)); ds.setLatitude(field(Double.toString(rs.getDouble("latitude")), info)); ds.setLongitude(field(Double.toString(rs.getDouble("longitude")), info)); - ds.setDateofvalidation(field(rs.getDate("dateofvalidation").toString(), info)); + ds.setDateofvalidation(field(asString(rs.getDate("dateofvalidation")), info)); ds.setDescription(field(rs.getString("description"), info)); ds.setSubjects(prepareListOfStructProps(rs.getArray("subjects"), info)); ds.setOdnumberofitems(field(Double.toString(rs.getInt("odnumberofitems")), info)); - ds.setOdnumberofitemsdate(field(rs.getDate("odnumberofitemsdate").toString(), info)); + ds.setOdnumberofitemsdate(field(asString(rs.getDate("odnumberofitemsdate")), info)); ds.setOdpolicies(field(rs.getString("odpolicies"), info)); ds.setOdlanguages(prepareListFields(rs.getArray("odlanguages"), info)); ds.setOdcontenttypes(prepareListFields(rs.getArray("odcontenttypes"), info)); ds.setAccessinfopackage(prepareListFields(rs.getArray("accessinfopackage"), info)); - ds.setReleasestartdate(field(rs.getDate("releasestartdate").toString(), info)); - ds.setReleaseenddate(field(rs.getDate("releaseenddate").toString(), info)); + ds.setReleasestartdate(field(asString(rs.getDate("releasestartdate")), info)); + ds.setReleaseenddate(field(asString(rs.getDate("releaseenddate")), info)); ds.setMissionstatementurl(field(rs.getString("missionstatementurl"), info)); ds.setDataprovider(field(rs.getBoolean("dataprovider"), info)); ds.setServiceprovider(field(rs.getBoolean("serviceprovider"), info)); @@ -192,16 +204,16 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl p.setOriginalId(Arrays.asList(rs.getString("projectid"))); p.setCollectedfrom(listKeyValues(rs.getString("collectedfromid"), rs.getString("collectedfromname"))); p.setPid(new ArrayList<>()); - p.setDateofcollection(rs.getDate("dateofcollection").toString()); - p.setDateoftransformation(rs.getDate("dateoftransformation").toString()); + p.setDateofcollection(asString(rs.getDate("dateofcollection"))); + p.setDateoftransformation(asString(rs.getDate("dateoftransformation"))); p.setExtraInfo(new ArrayList<>()); // Values not present in the DB p.setOaiprovenance(null); // Values not present in the DB p.setWebsiteurl(field(rs.getString("websiteurl"), info)); p.setCode(field(rs.getString("code"), info)); p.setAcronym(field(rs.getString("acronym"), info)); p.setTitle(field(rs.getString("title"), info)); - p.setStartdate(field(rs.getDate("startdate").toString(), info)); - p.setEnddate(field(rs.getDate("enddate").toString(), info)); + p.setStartdate(field(asString(rs.getDate("startdate")), info)); + p.setEnddate(field(asString(rs.getDate("enddate")), info)); p.setCallidentifier(field(rs.getString("callidentifier"), info)); p.setKeywords(field(rs.getString("keywords"), info)); p.setDuration(field(Integer.toString(rs.getInt("duration")), info)); @@ -271,6 +283,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl } public void processOrganization(final ResultSet rs) { + try { final DataInfo info = prepareDataInfo(rs); @@ -281,8 +294,8 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl o.setOriginalId(Arrays.asList(rs.getString("organizationid"))); o.setCollectedfrom(listKeyValues(rs.getString("collectedfromid"), rs.getString("collectedfromname"))); o.setPid(new ArrayList<>()); - o.setDateofcollection(rs.getDate("dateofcollection").toString()); - o.setDateoftransformation(rs.getDate("dateoftransformation").toString()); + o.setDateofcollection(asString(rs.getDate("dateofcollection"))); + o.setDateoftransformation(asString(rs.getDate("dateoftransformation"))); o.setExtraInfo(new ArrayList<>()); // Values not present in the DB o.setOaiprovenance(null); // Values not present in the DB o.setLegalshortname(field("legalshortname", info)); @@ -387,6 +400,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl } public void processProjectOrganization(final ResultSet rs) { + try { final DataInfo info = prepareDataInfo(rs); final String orgId = createOpenaireId(20, rs.getString("resporganization")); diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryProjects.sql b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryProjects.sql index f04f1f03b..6cff18875 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryProjects.sql +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryProjects.sql @@ -31,7 +31,7 @@ SELECT p.fundedamount AS fundedamount, dc.id AS collectedfromid, dc.officialname AS collectedfromname, - p.contracttype || '@@@' || p.contracttypename || '@@@' || p.contracttypescheme || '@@@' || p.contracttypescheme AS contracttype, + ctc.code || '@@@' || ctc.name || '@@@' || cts.code || '@@@' || cts.name AS contracttype, pac.code || '@@@' || pac.name || '@@@' || pas.code || '@@@' || pas.name AS provenanceaction, array_agg(DISTINCT i.pid || '###' || i.issuertype) AS pid, array_agg(DISTINCT s.name || '###' || sc.code || '@@@' || sc.name || '@@@' || ss.code || '@@@' || ss.name) AS subjects, @@ -54,6 +54,9 @@ SELECT LEFT OUTER JOIN class sc ON (sc.code = s.semanticclass) LEFT OUTER JOIN scheme ss ON (ss.code = s.semanticscheme) + LEFT OUTER JOIN class ctc ON (ctc.code = p.contracttypeclass) + LEFT OUTER JOIN scheme cts ON (cts.code = p.contracttypescheme) + GROUP BY p.id, p.code, @@ -77,11 +80,11 @@ SELECT p.contactfax, p.contactphone, p.contactemail, - p.contracttype, p.summary, p.currency, p.totalcost, p.fundedamount, dc.id, dc.officialname, - pac.code, pac.name, pas.code, pas.name; \ No newline at end of file + pac.code, pac.name, pas.code, pas.name, + ctc.code, ctc.name, cts.code, cts.name; \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/log4j.properties b/dhp-workflows/dhp-aggregation/src/main/resources/log4j.properties new file mode 100644 index 000000000..63cba917e --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/log4j.properties @@ -0,0 +1,9 @@ +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=INFO, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n