forked from D-Net/dnet-hadoop
bug fixing
This commit is contained in:
parent
95740767e0
commit
5fc09b179c
|
@ -55,6 +55,12 @@
|
||||||
<groupId>org.mongodb</groupId>
|
<groupId>org.mongodb</groupId>
|
||||||
<artifactId>mongo-java-driver</artifactId>
|
<artifactId>mongo-java-driver</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.postgresql</groupId>
|
||||||
|
<artifactId>postgresql</artifactId>
|
||||||
|
<version>42.2.10</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.mockito</groupId>
|
<groupId>org.mockito</groupId>
|
||||||
|
|
|
@ -10,6 +10,8 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -42,7 +44,12 @@ public class AbstractMigrationExecutor implements Closeable {
|
||||||
|
|
||||||
private final SequenceFile.Writer writer;
|
private final SequenceFile.Writer writer;
|
||||||
|
|
||||||
|
private static final Log log = LogFactory.getLog(AbstractMigrationExecutor.class);
|
||||||
|
|
||||||
public AbstractMigrationExecutor(final String hdfsPath, final String hdfsNameNode, final String hdfsUser) throws Exception {
|
public AbstractMigrationExecutor(final String hdfsPath, final String hdfsNameNode, final String hdfsUser) throws Exception {
|
||||||
|
|
||||||
|
log.info(String.format("Creating SequenceFile Writer, hdfsPath=%s, nameNode=%s, user=%s", hdfsPath, hdfsNameNode, hdfsUser));
|
||||||
|
|
||||||
this.writer = SequenceFile.createWriter(getConf(hdfsNameNode, hdfsUser), SequenceFile.Writer.file(new Path(hdfsPath)), SequenceFile.Writer
|
this.writer = SequenceFile.createWriter(getConf(hdfsNameNode, hdfsUser), SequenceFile.Writer.file(new Path(hdfsPath)), SequenceFile.Writer
|
||||||
.keyClass(IntWritable.class), SequenceFile.Writer.valueClass(Text.class));
|
.keyClass(IntWritable.class), SequenceFile.Writer.valueClass(Text.class));
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ import java.sql.SQLException;
|
||||||
import java.sql.Statement;
|
import java.sql.Statement;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
@ -22,7 +23,9 @@ public class DbClient implements Closeable {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Class.forName("org.postgresql.Driver");
|
Class.forName("org.postgresql.Driver");
|
||||||
this.connection = DriverManager.getConnection(address, login, password);
|
|
||||||
|
this.connection =
|
||||||
|
StringUtils.isNoneBlank(login, password) ? DriverManager.getConnection(address, login, password) : DriverManager.getConnection(address);
|
||||||
this.connection.setAutoCommit(false);
|
this.connection.setAutoCommit(false);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
log.error(e.getClass().getName() + ": " + e.getMessage());
|
log.error(e.getClass().getName() + ": " + e.getMessage());
|
||||||
|
@ -34,7 +37,7 @@ public class DbClient implements Closeable {
|
||||||
public void processResults(final String sql, final Consumer<ResultSet> consumer) {
|
public void processResults(final String sql, final Consumer<ResultSet> consumer) {
|
||||||
|
|
||||||
try (final Statement stmt = connection.createStatement()) {
|
try (final Statement stmt = connection.createStatement()) {
|
||||||
try (final ResultSet rs = stmt.executeQuery("SELECT * FROM COMPANY;")) {
|
try (final ResultSet rs = stmt.executeQuery(sql)) {
|
||||||
while (rs.next()) {
|
while (rs.next()) {
|
||||||
consumer.accept(rs);
|
consumer.accept(rs);
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,12 +27,12 @@
|
||||||
"paramName": "dbuser",
|
"paramName": "dbuser",
|
||||||
"paramLongName": "postgresUser",
|
"paramLongName": "postgresUser",
|
||||||
"paramDescription": "postgres user",
|
"paramDescription": "postgres user",
|
||||||
"paramRequired": true
|
"paramRequired": false
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"paramName": "dbpasswd",
|
"paramName": "dbpasswd",
|
||||||
"paramLongName": "postgresPassword",
|
"paramLongName": "postgresPassword",
|
||||||
"paramDescription": "postgres password",
|
"paramDescription": "postgres password",
|
||||||
"paramRequired": true
|
"paramRequired": false
|
||||||
}
|
}
|
||||||
]
|
]
|
Loading…
Reference in New Issue