This commit is contained in:
Claudio Atzori 2020-03-04 10:57:06 +01:00
commit 6379f32466
3 changed files with 25 additions and 19 deletions

View File

@ -53,18 +53,20 @@ public class GenerateEntitiesApplication {
final String dbUser = parser.get("postgresUser"); final String dbUser = parser.get("postgresUser");
final String dbPassword = parser.get("postgresPassword"); final String dbPassword = parser.get("postgresPassword");
final SparkSession spark = SparkSession final Map<String, String> code2name = loadClassNames(dbUrl, dbUser, dbPassword);
try (final SparkSession spark = newSparkSession(parser); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) {
final List<String> existingSourcePaths = Arrays.stream(sourcePaths.split(",")).filter(p -> exists(sc, p)).collect(Collectors.toList());
generateEntities(sc, code2name, existingSourcePaths, targetPath);
}
}
private static SparkSession newSparkSession(final ArgumentApplicationParser parser) {
return SparkSession
.builder() .builder()
.appName(GenerateEntitiesApplication.class.getSimpleName()) .appName(GenerateEntitiesApplication.class.getSimpleName())
.master(parser.get("master")) .master(parser.get("master"))
.getOrCreate(); .getOrCreate();
final Map<String, String> code2name = loadClassNames(dbUrl, dbUser, dbPassword);
try (final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) {
final List<String> existingSourcePaths = Arrays.stream(sourcePaths.split(",")).filter(p -> exists(sc, p)).collect(Collectors.toList());
generateEntities(sc, code2name, existingSourcePaths, targetPath);
}
} }
private static void generateEntities(final JavaSparkContext sc, private static void generateEntities(final JavaSparkContext sc,

View File

@ -28,13 +28,7 @@ public class DispatchEntitiesApplication {
.getResourceAsStream("/eu/dnetlib/dhp/migration/dispatch_entities_parameters.json"))); .getResourceAsStream("/eu/dnetlib/dhp/migration/dispatch_entities_parameters.json")));
parser.parseArgument(args); parser.parseArgument(args);
final SparkSession spark = SparkSession try (final SparkSession spark = newSparkSession(parser); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) {
.builder()
.appName(DispatchEntitiesApplication.class.getSimpleName())
.master(parser.get("master"))
.getOrCreate();
try (final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) {
final String sourcePath = parser.get("sourcePath"); final String sourcePath = parser.get("sourcePath");
final String targetPath = parser.get("graphRawPath"); final String targetPath = parser.get("graphRawPath");
@ -50,6 +44,14 @@ public class DispatchEntitiesApplication {
} }
} }
private static SparkSession newSparkSession(final ArgumentApplicationParser parser) {
return SparkSession
.builder()
.appName(DispatchEntitiesApplication.class.getSimpleName())
.master(parser.get("master"))
.getOrCreate();
}
private static void processEntity(final JavaSparkContext sc, final Class<?> clazz, final String sourcePath, final String targetPath) { private static void processEntity(final JavaSparkContext sc, final Class<?> clazz, final String sourcePath, final String targetPath) {
final String type = clazz.getSimpleName().toLowerCase(); final String type = clazz.getSimpleName().toLowerCase();

View File

@ -28,8 +28,8 @@ public class DbClient implements Closeable {
StringUtils.isNoneBlank(login, password) ? DriverManager.getConnection(address, login, password) : DriverManager.getConnection(address); StringUtils.isNoneBlank(login, password) ? DriverManager.getConnection(address, login, password) : DriverManager.getConnection(address);
this.connection.setAutoCommit(false); this.connection.setAutoCommit(false);
} catch (final Exception e) { } catch (final Exception e) {
log.error(e.getClass().getName() + ": " + e.getMessage()); log.error("Connection to postgresDB failed");
throw new RuntimeException(e); throw new RuntimeException("Connection to postgresDB failed", e);
} }
log.info("Opened database successfully"); log.info("Opened database successfully");
} }
@ -44,10 +44,12 @@ public class DbClient implements Closeable {
consumer.accept(rs); consumer.accept(rs);
} }
} catch (final SQLException e) { } catch (final SQLException e) {
throw new RuntimeException(e); log.error("Error executing sql query: " + sql, e);
throw new RuntimeException("Error executing sql query", e);
} }
} catch (final SQLException e1) { } catch (final SQLException e1) {
throw new RuntimeException(e1); log.error("Error preparing sql statement", e1);
throw new RuntimeException("Error preparing sql statement", e1);
} }
} }