diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/GenerateEntitiesApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/GenerateEntitiesApplication.java index 5627a720b..775e5e7d8 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/GenerateEntitiesApplication.java @@ -53,18 +53,20 @@ public class GenerateEntitiesApplication { final String dbUser = parser.get("postgresUser"); final String dbPassword = parser.get("postgresPassword"); - final SparkSession spark = SparkSession + final Map code2name = loadClassNames(dbUrl, dbUser, dbPassword); + + try (final SparkSession spark = newSparkSession(parser); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) { + final List existingSourcePaths = Arrays.stream(sourcePaths.split(",")).filter(p -> exists(sc, p)).collect(Collectors.toList()); + generateEntities(sc, code2name, existingSourcePaths, targetPath); + } + } + + private static SparkSession newSparkSession(final ArgumentApplicationParser parser) { + return SparkSession .builder() .appName(GenerateEntitiesApplication.class.getSimpleName()) .master(parser.get("master")) .getOrCreate(); - - final Map code2name = loadClassNames(dbUrl, dbUser, dbPassword); - - try (final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) { - final List existingSourcePaths = Arrays.stream(sourcePaths.split(",")).filter(p -> exists(sc, p)).collect(Collectors.toList()); - generateEntities(sc, code2name, existingSourcePaths, targetPath); - } } private static void generateEntities(final JavaSparkContext sc, @@ -96,7 +98,7 @@ public class GenerateEntitiesApplication { case "native_oaf": return new OafToOafMapper(code2name).processMdRecord(s); case "native_odf": - return new OafToOafMapper(code2name).processMdRecord(s); + return new OdfToOafMapper(code2name).processMdRecord(s); case "datasource": return Arrays.asList(convertFromJson(s, Datasource.class)); case "organization": diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step3/DispatchEntitiesApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step3/DispatchEntitiesApplication.java index 222828a49..4f10068e7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step3/DispatchEntitiesApplication.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step3/DispatchEntitiesApplication.java @@ -28,13 +28,7 @@ public class DispatchEntitiesApplication { .getResourceAsStream("/eu/dnetlib/dhp/migration/dispatch_entities_parameters.json"))); parser.parseArgument(args); - final SparkSession spark = SparkSession - .builder() - .appName(DispatchEntitiesApplication.class.getSimpleName()) - .master(parser.get("master")) - .getOrCreate(); - - try (final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) { + try (final SparkSession spark = newSparkSession(parser); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) { final String sourcePath = parser.get("sourcePath"); final String targetPath = parser.get("graphRawPath"); @@ -50,6 +44,14 @@ public class DispatchEntitiesApplication { } } + private static SparkSession newSparkSession(final ArgumentApplicationParser parser) { + return SparkSession + .builder() + .appName(DispatchEntitiesApplication.class.getSimpleName()) + .master(parser.get("master")) + .getOrCreate(); + } + private static void processEntity(final JavaSparkContext sc, final Class clazz, final String sourcePath, final String targetPath) { final String type = clazz.getSimpleName().toLowerCase(); diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/DbClient.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/DbClient.java index 43270e452..8e9784346 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/DbClient.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/DbClient.java @@ -28,8 +28,8 @@ public class DbClient implements Closeable { StringUtils.isNoneBlank(login, password) ? DriverManager.getConnection(address, login, password) : DriverManager.getConnection(address); this.connection.setAutoCommit(false); } catch (final Exception e) { - log.error(e.getClass().getName() + ": " + e.getMessage()); - throw new RuntimeException(e); + log.error("Connection to postgresDB failed"); + throw new RuntimeException("Connection to postgresDB failed", e); } log.info("Opened database successfully"); } @@ -44,10 +44,12 @@ public class DbClient implements Closeable { consumer.accept(rs); } } catch (final SQLException e) { - throw new RuntimeException(e); + log.error("Error executing sql query: " + sql, e); + throw new RuntimeException("Error executing sql query", e); } } catch (final SQLException e1) { - throw new RuntimeException(e1); + log.error("Error preparing sql statement", e1); + throw new RuntimeException("Error preparing sql statement", e1); } }