forked from D-Net/dnet-hadoop
logs and closeable
This commit is contained in:
parent
9cf5ce2e66
commit
e7167b996a
|
@ -53,18 +53,20 @@ public class GenerateEntitiesApplication {
|
||||||
final String dbUser = parser.get("postgresUser");
|
final String dbUser = parser.get("postgresUser");
|
||||||
final String dbPassword = parser.get("postgresPassword");
|
final String dbPassword = parser.get("postgresPassword");
|
||||||
|
|
||||||
final SparkSession spark = SparkSession
|
final Map<String, String> code2name = loadClassNames(dbUrl, dbUser, dbPassword);
|
||||||
|
|
||||||
|
try (final SparkSession spark = newSparkSession(parser); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) {
|
||||||
|
final List<String> existingSourcePaths = Arrays.stream(sourcePaths.split(",")).filter(p -> exists(sc, p)).collect(Collectors.toList());
|
||||||
|
generateEntities(sc, code2name, existingSourcePaths, targetPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static SparkSession newSparkSession(final ArgumentApplicationParser parser) {
|
||||||
|
return SparkSession
|
||||||
.builder()
|
.builder()
|
||||||
.appName(GenerateEntitiesApplication.class.getSimpleName())
|
.appName(GenerateEntitiesApplication.class.getSimpleName())
|
||||||
.master(parser.get("master"))
|
.master(parser.get("master"))
|
||||||
.getOrCreate();
|
.getOrCreate();
|
||||||
|
|
||||||
final Map<String, String> code2name = loadClassNames(dbUrl, dbUser, dbPassword);
|
|
||||||
|
|
||||||
try (final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) {
|
|
||||||
final List<String> existingSourcePaths = Arrays.stream(sourcePaths.split(",")).filter(p -> exists(sc, p)).collect(Collectors.toList());
|
|
||||||
generateEntities(sc, code2name, existingSourcePaths, targetPath);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void generateEntities(final JavaSparkContext sc,
|
private static void generateEntities(final JavaSparkContext sc,
|
||||||
|
@ -96,7 +98,7 @@ public class GenerateEntitiesApplication {
|
||||||
case "native_oaf":
|
case "native_oaf":
|
||||||
return new OafToOafMapper(code2name).processMdRecord(s);
|
return new OafToOafMapper(code2name).processMdRecord(s);
|
||||||
case "native_odf":
|
case "native_odf":
|
||||||
return new OafToOafMapper(code2name).processMdRecord(s);
|
return new OdfToOafMapper(code2name).processMdRecord(s);
|
||||||
case "datasource":
|
case "datasource":
|
||||||
return Arrays.asList(convertFromJson(s, Datasource.class));
|
return Arrays.asList(convertFromJson(s, Datasource.class));
|
||||||
case "organization":
|
case "organization":
|
||||||
|
|
|
@ -28,13 +28,7 @@ public class DispatchEntitiesApplication {
|
||||||
.getResourceAsStream("/eu/dnetlib/dhp/migration/dispatch_entities_parameters.json")));
|
.getResourceAsStream("/eu/dnetlib/dhp/migration/dispatch_entities_parameters.json")));
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
|
|
||||||
final SparkSession spark = SparkSession
|
try (final SparkSession spark = newSparkSession(parser); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) {
|
||||||
.builder()
|
|
||||||
.appName(DispatchEntitiesApplication.class.getSimpleName())
|
|
||||||
.master(parser.get("master"))
|
|
||||||
.getOrCreate();
|
|
||||||
|
|
||||||
try (final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) {
|
|
||||||
|
|
||||||
final String sourcePath = parser.get("sourcePath");
|
final String sourcePath = parser.get("sourcePath");
|
||||||
final String targetPath = parser.get("graphRawPath");
|
final String targetPath = parser.get("graphRawPath");
|
||||||
|
@ -50,6 +44,14 @@ public class DispatchEntitiesApplication {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static SparkSession newSparkSession(final ArgumentApplicationParser parser) {
|
||||||
|
return SparkSession
|
||||||
|
.builder()
|
||||||
|
.appName(DispatchEntitiesApplication.class.getSimpleName())
|
||||||
|
.master(parser.get("master"))
|
||||||
|
.getOrCreate();
|
||||||
|
}
|
||||||
|
|
||||||
private static void processEntity(final JavaSparkContext sc, final Class<?> clazz, final String sourcePath, final String targetPath) {
|
private static void processEntity(final JavaSparkContext sc, final Class<?> clazz, final String sourcePath, final String targetPath) {
|
||||||
final String type = clazz.getSimpleName().toLowerCase();
|
final String type = clazz.getSimpleName().toLowerCase();
|
||||||
|
|
||||||
|
|
|
@ -28,8 +28,8 @@ public class DbClient implements Closeable {
|
||||||
StringUtils.isNoneBlank(login, password) ? DriverManager.getConnection(address, login, password) : DriverManager.getConnection(address);
|
StringUtils.isNoneBlank(login, password) ? DriverManager.getConnection(address, login, password) : DriverManager.getConnection(address);
|
||||||
this.connection.setAutoCommit(false);
|
this.connection.setAutoCommit(false);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
log.error(e.getClass().getName() + ": " + e.getMessage());
|
log.error("Connection to postgresDB failed");
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException("Connection to postgresDB failed", e);
|
||||||
}
|
}
|
||||||
log.info("Opened database successfully");
|
log.info("Opened database successfully");
|
||||||
}
|
}
|
||||||
|
@ -44,10 +44,12 @@ public class DbClient implements Closeable {
|
||||||
consumer.accept(rs);
|
consumer.accept(rs);
|
||||||
}
|
}
|
||||||
} catch (final SQLException e) {
|
} catch (final SQLException e) {
|
||||||
throw new RuntimeException(e);
|
log.error("Error executing sql query: " + sql, e);
|
||||||
|
throw new RuntimeException("Error executing sql query", e);
|
||||||
}
|
}
|
||||||
} catch (final SQLException e1) {
|
} catch (final SQLException e1) {
|
||||||
throw new RuntimeException(e1);
|
log.error("Error preparing sql statement", e1);
|
||||||
|
throw new RuntimeException("Error preparing sql statement", e1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue