[Dump subset] change the class to read from the db and added needed parameters in the workflow
This commit is contained in:
parent
0bb97fead7
commit
67d48763fa
|
@ -2,6 +2,7 @@
|
|||
package eu.dnetlib.dhp.oa.graph.dump.subset;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.mongodb.DBCursor;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.DbClient;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||
|
@ -23,13 +24,9 @@ import java.sql.ResultSet;
|
|||
import java.sql.SQLException;
|
||||
import java.util.function.Function;
|
||||
|
||||
public class ReadMasterDuplicateFromDB implements Closeable {
|
||||
public class ReadMasterDuplicateFromDB {
|
||||
|
||||
private final DbClient dbClient;
|
||||
private static final Log log = LogFactory.getLog(ReadMasterDuplicateFromDB.class);
|
||||
|
||||
private final BufferedWriter writer;
|
||||
private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
private static final String QUERY = "SELECT id as master, duplicate FROM dsm_dedup_services; ";
|
||||
|
||||
|
@ -49,31 +46,37 @@ public class ReadMasterDuplicateFromDB implements Closeable {
|
|||
final String hdfsPath = parser.get("hdfsPath");
|
||||
final String hdfsNameNode = parser.get("hdfsNameNode");
|
||||
|
||||
try (
|
||||
final ReadMasterDuplicateFromDB rmd = new ReadMasterDuplicateFromDB(hdfsPath, hdfsNameNode, dbUrl, dbUser,
|
||||
dbPassword)) {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set("fs.defaultFS", hdfsNameNode);
|
||||
|
||||
log.info("Processing datasources...");
|
||||
rmd.execute(QUERY, rmd::datasourceMasterMap);
|
||||
FileSystem fileSystem = FileSystem.get(conf);
|
||||
Path hdfsWritePath = new Path(hdfsPath);
|
||||
FSDataOutputStream fsDataOutputStream = fileSystem.create(hdfsWritePath);
|
||||
|
||||
execute(dbUrl, dbUser, dbPassword, fsDataOutputStream);
|
||||
|
||||
}
|
||||
|
||||
private static void execute(String dbUrl, String dbUser, String dbPassword, FSDataOutputStream fos) {
|
||||
try(DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)){
|
||||
try(BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8))){
|
||||
dbClient.processResults(QUERY, rs -> writeMap(datasourceMasterMap(rs), writer));
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public void execute(final String sql, final Function<ResultSet, MasterDuplicate> producer) {
|
||||
|
||||
dbClient.processResults(sql, rs -> writeMap(producer.apply(rs)));
|
||||
}
|
||||
|
||||
public MasterDuplicate datasourceMasterMap(ResultSet rs) {
|
||||
public static MasterDuplicate datasourceMasterMap(ResultSet rs) {
|
||||
try {
|
||||
MasterDuplicate dm = new MasterDuplicate();
|
||||
String duplicate = rs.getString("duplicate");
|
||||
dm.setDuplicate(duplicate);
|
||||
dm.setDuplicate(OafMapperUtils.createOpenaireId(10, duplicate, true));
|
||||
String master = rs.getString("master");
|
||||
if (StringUtils.isNotBlank(master))
|
||||
dm.setMaster(OafMapperUtils.createOpenaireId(10, master, true));
|
||||
else
|
||||
dm.setMaster(OafMapperUtils.createOpenaireId(10, duplicate, true));
|
||||
|
||||
return dm;
|
||||
|
||||
} catch (final SQLException e) {
|
||||
|
@ -81,34 +84,8 @@ public class ReadMasterDuplicateFromDB implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
dbClient.close();
|
||||
writer.close();
|
||||
}
|
||||
|
||||
public ReadMasterDuplicateFromDB(
|
||||
final String hdfsPath, String hdfsNameNode, final String dbUrl, final String dbUser, final String dbPassword)
|
||||
throws IOException {
|
||||
|
||||
this.dbClient = new DbClient(dbUrl, dbUser, dbPassword);
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
conf.set("fs.defaultFS", hdfsNameNode);
|
||||
|
||||
FileSystem fileSystem = FileSystem.get(conf);
|
||||
Path hdfsWritePath = new Path(hdfsPath);
|
||||
FSDataOutputStream fsDataOutputStream = null;
|
||||
if (fileSystem.exists(hdfsWritePath)) {
|
||||
fsDataOutputStream = fileSystem.append(hdfsWritePath);
|
||||
} else {
|
||||
fsDataOutputStream = fileSystem.create(hdfsWritePath);
|
||||
}
|
||||
|
||||
this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
protected void writeMap(final MasterDuplicate dm) {
|
||||
protected static void writeMap(final MasterDuplicate dm, BufferedWriter writer) {
|
||||
try {
|
||||
writer.write(OBJECT_MAPPER.writeValueAsString(dm));
|
||||
writer.newLine();
|
||||
|
|
|
@ -8,11 +8,18 @@
|
|||
<name>outputPath</name>
|
||||
<description>the output path</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>organizationCommunityMap</name>
|
||||
<description>the organization community map</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>pathMap</name>
|
||||
<description>the path where to find the elements involved in the constraints within the json</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>selectionCriteria</name>
|
||||
<description>the selection criteria used to select the results</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hiveDbName</name>
|
||||
|
|
Loading…
Reference in New Issue