forked from D-Net/dnet-hadoop
bug fixing
This commit is contained in:
parent
5fc09b179c
commit
06c2fd6df9
|
@ -71,12 +71,13 @@ public class AbstractMigrationExecutor implements Closeable {
|
|||
value.set(objectMapper.writeValueAsString(oaf));
|
||||
writer.append(key, value);
|
||||
} catch (final Exception e) {
|
||||
e.printStackTrace();
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
writer.hflush();
|
||||
writer.close();
|
||||
}
|
||||
|
||||
|
@ -216,4 +217,7 @@ public class AbstractMigrationExecutor implements Closeable {
|
|||
|
||||
}
|
||||
|
||||
public static String asString(final Object o) {
|
||||
return o == null ? "" : o.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,6 +37,8 @@ public class DbClient implements Closeable {
|
|||
public void processResults(final String sql, final Consumer<ResultSet> consumer) {
|
||||
|
||||
try (final Statement stmt = connection.createStatement()) {
|
||||
stmt.setFetchSize(100);
|
||||
|
||||
try (final ResultSet rs = stmt.executeQuery(sql)) {
|
||||
while (rs.next()) {
|
||||
consumer.accept(rs);
|
||||
|
|
|
@ -54,11 +54,22 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl
|
|||
final String hdfsUser = parser.get("hdfsUser");
|
||||
|
||||
try (final MigrateDbEntitiesApplication smdbe = new MigrateDbEntitiesApplication(hdfsPath, hdfsNameNode, hdfsUser, dbUrl, dbUser, dbPassword)) {
|
||||
log.info("Processing datasources...");
|
||||
smdbe.execute("queryDatasources.sql", smdbe::processDatasource);
|
||||
|
||||
log.info("Processing projects...");
|
||||
smdbe.execute("queryProjects.sql", smdbe::processProject);
|
||||
|
||||
log.info("Processing orgs...");
|
||||
smdbe.execute("queryOrganizations.sql", smdbe::processOrganization);
|
||||
|
||||
log.info("Processing relations ds <-> orgs ...");
|
||||
smdbe.execute("queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization);
|
||||
|
||||
log.info("Processing projects <-> orgs ...");
|
||||
smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization);
|
||||
|
||||
log.info("All done.");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -75,6 +86,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl
|
|||
}
|
||||
|
||||
public void processDatasource(final ResultSet rs) {
|
||||
|
||||
try {
|
||||
|
||||
final DataInfo info = prepareDataInfo(rs);
|
||||
|
@ -85,7 +97,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl
|
|||
ds.setOriginalId(Arrays.asList(rs.getString("datasourceid")));
|
||||
ds.setCollectedfrom(listKeyValues(rs.getString("collectedfromid"), rs.getString("collectedfromname")));
|
||||
ds.setPid(new ArrayList<>());
|
||||
ds.setDateofcollection(rs.getDate("dateofcollection").toString());
|
||||
ds.setDateofcollection(asString(rs.getDate("dateofcollection")));
|
||||
ds.setDateoftransformation(null); // Value not returned by the SQL query
|
||||
ds.setExtraInfo(new ArrayList<>()); // Values not present in the DB
|
||||
ds.setOaiprovenance(null); // Values not present in the DB
|
||||
|
@ -99,17 +111,17 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl
|
|||
ds.setNamespaceprefix(field(rs.getString("namespaceprefix"), info));
|
||||
ds.setLatitude(field(Double.toString(rs.getDouble("latitude")), info));
|
||||
ds.setLongitude(field(Double.toString(rs.getDouble("longitude")), info));
|
||||
ds.setDateofvalidation(field(rs.getDate("dateofvalidation").toString(), info));
|
||||
ds.setDateofvalidation(field(asString(rs.getDate("dateofvalidation")), info));
|
||||
ds.setDescription(field(rs.getString("description"), info));
|
||||
ds.setSubjects(prepareListOfStructProps(rs.getArray("subjects"), info));
|
||||
ds.setOdnumberofitems(field(Double.toString(rs.getInt("odnumberofitems")), info));
|
||||
ds.setOdnumberofitemsdate(field(rs.getDate("odnumberofitemsdate").toString(), info));
|
||||
ds.setOdnumberofitemsdate(field(asString(rs.getDate("odnumberofitemsdate")), info));
|
||||
ds.setOdpolicies(field(rs.getString("odpolicies"), info));
|
||||
ds.setOdlanguages(prepareListFields(rs.getArray("odlanguages"), info));
|
||||
ds.setOdcontenttypes(prepareListFields(rs.getArray("odcontenttypes"), info));
|
||||
ds.setAccessinfopackage(prepareListFields(rs.getArray("accessinfopackage"), info));
|
||||
ds.setReleasestartdate(field(rs.getDate("releasestartdate").toString(), info));
|
||||
ds.setReleaseenddate(field(rs.getDate("releaseenddate").toString(), info));
|
||||
ds.setReleasestartdate(field(asString(rs.getDate("releasestartdate")), info));
|
||||
ds.setReleaseenddate(field(asString(rs.getDate("releaseenddate")), info));
|
||||
ds.setMissionstatementurl(field(rs.getString("missionstatementurl"), info));
|
||||
ds.setDataprovider(field(rs.getBoolean("dataprovider"), info));
|
||||
ds.setServiceprovider(field(rs.getBoolean("serviceprovider"), info));
|
||||
|
@ -192,16 +204,16 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl
|
|||
p.setOriginalId(Arrays.asList(rs.getString("projectid")));
|
||||
p.setCollectedfrom(listKeyValues(rs.getString("collectedfromid"), rs.getString("collectedfromname")));
|
||||
p.setPid(new ArrayList<>());
|
||||
p.setDateofcollection(rs.getDate("dateofcollection").toString());
|
||||
p.setDateoftransformation(rs.getDate("dateoftransformation").toString());
|
||||
p.setDateofcollection(asString(rs.getDate("dateofcollection")));
|
||||
p.setDateoftransformation(asString(rs.getDate("dateoftransformation")));
|
||||
p.setExtraInfo(new ArrayList<>()); // Values not present in the DB
|
||||
p.setOaiprovenance(null); // Values not present in the DB
|
||||
p.setWebsiteurl(field(rs.getString("websiteurl"), info));
|
||||
p.setCode(field(rs.getString("code"), info));
|
||||
p.setAcronym(field(rs.getString("acronym"), info));
|
||||
p.setTitle(field(rs.getString("title"), info));
|
||||
p.setStartdate(field(rs.getDate("startdate").toString(), info));
|
||||
p.setEnddate(field(rs.getDate("enddate").toString(), info));
|
||||
p.setStartdate(field(asString(rs.getDate("startdate")), info));
|
||||
p.setEnddate(field(asString(rs.getDate("enddate")), info));
|
||||
p.setCallidentifier(field(rs.getString("callidentifier"), info));
|
||||
p.setKeywords(field(rs.getString("keywords"), info));
|
||||
p.setDuration(field(Integer.toString(rs.getInt("duration")), info));
|
||||
|
@ -271,6 +283,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl
|
|||
}
|
||||
|
||||
public void processOrganization(final ResultSet rs) {
|
||||
|
||||
try {
|
||||
|
||||
final DataInfo info = prepareDataInfo(rs);
|
||||
|
@ -281,8 +294,8 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl
|
|||
o.setOriginalId(Arrays.asList(rs.getString("organizationid")));
|
||||
o.setCollectedfrom(listKeyValues(rs.getString("collectedfromid"), rs.getString("collectedfromname")));
|
||||
o.setPid(new ArrayList<>());
|
||||
o.setDateofcollection(rs.getDate("dateofcollection").toString());
|
||||
o.setDateoftransformation(rs.getDate("dateoftransformation").toString());
|
||||
o.setDateofcollection(asString(rs.getDate("dateofcollection")));
|
||||
o.setDateoftransformation(asString(rs.getDate("dateoftransformation")));
|
||||
o.setExtraInfo(new ArrayList<>()); // Values not present in the DB
|
||||
o.setOaiprovenance(null); // Values not present in the DB
|
||||
o.setLegalshortname(field("legalshortname", info));
|
||||
|
@ -387,6 +400,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl
|
|||
}
|
||||
|
||||
public void processProjectOrganization(final ResultSet rs) {
|
||||
|
||||
try {
|
||||
final DataInfo info = prepareDataInfo(rs);
|
||||
final String orgId = createOpenaireId(20, rs.getString("resporganization"));
|
||||
|
|
|
@ -31,7 +31,7 @@ SELECT
|
|||
p.fundedamount AS fundedamount,
|
||||
dc.id AS collectedfromid,
|
||||
dc.officialname AS collectedfromname,
|
||||
p.contracttype || '@@@' || p.contracttypename || '@@@' || p.contracttypescheme || '@@@' || p.contracttypescheme AS contracttype,
|
||||
ctc.code || '@@@' || ctc.name || '@@@' || cts.code || '@@@' || cts.name AS contracttype,
|
||||
pac.code || '@@@' || pac.name || '@@@' || pas.code || '@@@' || pas.name AS provenanceaction,
|
||||
array_agg(DISTINCT i.pid || '###' || i.issuertype) AS pid,
|
||||
array_agg(DISTINCT s.name || '###' || sc.code || '@@@' || sc.name || '@@@' || ss.code || '@@@' || ss.name) AS subjects,
|
||||
|
@ -54,6 +54,9 @@ SELECT
|
|||
LEFT OUTER JOIN class sc ON (sc.code = s.semanticclass)
|
||||
LEFT OUTER JOIN scheme ss ON (ss.code = s.semanticscheme)
|
||||
|
||||
LEFT OUTER JOIN class ctc ON (ctc.code = p.contracttypeclass)
|
||||
LEFT OUTER JOIN scheme cts ON (cts.code = p.contracttypescheme)
|
||||
|
||||
GROUP BY
|
||||
p.id,
|
||||
p.code,
|
||||
|
@ -77,11 +80,11 @@ SELECT
|
|||
p.contactfax,
|
||||
p.contactphone,
|
||||
p.contactemail,
|
||||
p.contracttype,
|
||||
p.summary,
|
||||
p.currency,
|
||||
p.totalcost,
|
||||
p.fundedamount,
|
||||
dc.id,
|
||||
dc.officialname,
|
||||
pac.code, pac.name, pas.code, pas.name;
|
||||
pac.code, pac.name, pas.code, pas.name,
|
||||
ctc.code, ctc.name, cts.code, cts.name;
|
|
@ -0,0 +1,9 @@
|
|||
# Set root logger level to DEBUG and its only appender to A1.
|
||||
log4j.rootLogger=INFO, A1
|
||||
|
||||
# A1 is set to be a ConsoleAppender.
|
||||
log4j.appender.A1=org.apache.log4j.ConsoleAppender
|
||||
|
||||
# A1 uses PatternLayout.
|
||||
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
|
Loading…
Reference in New Issue