more fixes
This commit is contained in:
parent
9ac10fc4b3
commit
e9bede5c45
|
@ -19,13 +19,16 @@ public final class ImpalaConnector {
|
||||||
@Autowired
|
@Autowired
|
||||||
private JdbcTemplate jdbcTemplate;
|
private JdbcTemplate jdbcTemplate;
|
||||||
|
|
||||||
@Value("${services.pdfaggregation.controller.db.oldDatabaseName}")
|
private final String oldDatabaseName;
|
||||||
public static String oldDatabaseName;
|
private final String databaseName;
|
||||||
@Value("${services.pdfaggregation.controller.db.databaseName}")
|
|
||||||
public static String databaseName;
|
|
||||||
|
|
||||||
public static final Lock databaseLock = new ReentrantLock(true); // This lock is locking the threads trying to execute queries in the database.
|
public static final Lock databaseLock = new ReentrantLock(true); // This lock is locking the threads trying to execute queries in the database.
|
||||||
|
|
||||||
|
public ImpalaConnector(@Value("${services.pdfaggregation.controller.db.oldDatabaseName}") String oldDatabaseName,
|
||||||
|
@Value("${services.pdfaggregation.controller.db.databaseName}") String databaseName) {
|
||||||
|
this.oldDatabaseName = oldDatabaseName;
|
||||||
|
this.databaseName = databaseName;
|
||||||
|
}
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void init() {
|
public void init() {
|
||||||
logger.info("Max available memory to the Controller: " + Runtime.getRuntime().maxMemory() + " bytes.");
|
logger.info("Max available memory to the Controller: " + Runtime.getRuntime().maxMemory() + " bytes.");
|
||||||
|
@ -60,7 +63,7 @@ public final class ImpalaConnector {
|
||||||
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".assignment (id string, original_url string, workerid string, `date` timestamp) stored as parquet");
|
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".assignment (id string, original_url string, workerid string, `date` timestamp) stored as parquet");
|
||||||
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".assignment");
|
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".assignment");
|
||||||
|
|
||||||
jdbcTemplate.execute("DROP TABLE IF EXISTS " + ImpalaConnector.databaseName + ".current_assignment PURGE");
|
jdbcTemplate.execute("DROP TABLE IF EXISTS " + databaseName + ".current_assignment PURGE");
|
||||||
|
|
||||||
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".attempt (id string, original_url string, `date` timestamp, status string, error_class string, error_message string) stored as parquet");
|
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".attempt (id string, original_url string, `date` timestamp, status string, error_class string, error_message string) stored as parquet");
|
||||||
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".attempt");
|
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".attempt");
|
||||||
|
|
|
@ -44,6 +44,9 @@ public class UrlController {
|
||||||
@Value("${services.pdfaggregation.controller.assignmentLimit}")
|
@Value("${services.pdfaggregation.controller.assignmentLimit}")
|
||||||
private int assignmentLimit;
|
private int assignmentLimit;
|
||||||
|
|
||||||
|
@Value("${services.pdfaggregation.controller.db.databaseName}")
|
||||||
|
private String databaseName;
|
||||||
|
|
||||||
@GetMapping("")
|
@GetMapping("")
|
||||||
public ResponseEntity<?> getUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
|
public ResponseEntity<?> getUrls(@RequestParam String workerId, @RequestParam int workerAssignmentsLimit) {
|
||||||
|
|
||||||
|
@ -72,24 +75,24 @@ public class UrlController {
|
||||||
String findAssignmentsQuery = "select pubid, url, datasourceid, datasourcetype\n" +
|
String findAssignmentsQuery = "select pubid, url, datasourceid, datasourcetype\n" +
|
||||||
"from (select distinct pubid, url, datasourceid, datasourcetype, attempt_count from (\n" +
|
"from (select distinct pubid, url, datasourceid, datasourcetype, attempt_count from (\n" +
|
||||||
"select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype, attempts.counts as attempt_count\n" +
|
"select p.id as pubid, pu.url as url, d.id as datasourceid, d.type as datasourcetype, attempts.counts as attempt_count\n" +
|
||||||
"from " + ImpalaConnector.databaseName + ".publication p\n" +
|
"from " + databaseName + ".publication p\n" +
|
||||||
"join " + ImpalaConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" +
|
"join " + databaseName + ".publication_urls pu on pu.id=p.id\n" +
|
||||||
"join " + ImpalaConnector.databaseName + ".datasource d on d.id=p.datasourceid\n" +
|
"join " + databaseName + ".datasource d on d.id=p.datasourceid\n" +
|
||||||
"left outer join (select count(a.id) as counts, a.id from " + ImpalaConnector.databaseName + ".attempt a group by a.id) as attempts on attempts.id=p.id\n" +
|
"left outer join (select count(a.id) as counts, a.id from " + databaseName + ".attempt a group by a.id) as attempts on attempts.id=p.id\n" +
|
||||||
"left outer join (\n" +
|
"left outer join (\n" +
|
||||||
" select a.id, a.original_url from " + ImpalaConnector.databaseName + ".assignment a\n" +
|
" select a.id, a.original_url from " + databaseName + ".assignment a\n" +
|
||||||
" union all\n" +
|
" union all\n" +
|
||||||
" select pl.id, pl.original_url from " + ImpalaConnector.databaseName + ".payload pl) as existing on existing.id=p.id and existing.original_url=pu.url\n" +
|
" select pl.id, pl.original_url from " + databaseName + ".payload pl) as existing on existing.id=p.id and existing.original_url=pu.url\n" +
|
||||||
"where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecord + " and not exists (select 1 from " + ImpalaConnector.databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" +
|
"where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= " + maxAttemptsPerRecord + " and not exists (select 1 from " + databaseName + ".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n" +
|
||||||
"limit " + (assignmentsLimit * 10) + ") as non_distinct_results\n" +
|
"limit " + (assignmentsLimit * 10) + ") as non_distinct_results\n" +
|
||||||
"order by coalesce(attempt_count, 0), reverse(pubid), url\n" +
|
"order by coalesce(attempt_count, 0), reverse(pubid), url\n" +
|
||||||
"limit " + assignmentsLimit + ") as findAssignmentsQuery";
|
"limit " + assignmentsLimit + ") as findAssignmentsQuery";
|
||||||
|
|
||||||
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
|
// The "order by" in the end makes sure the older attempted records will be re-attempted after a long time.
|
||||||
|
|
||||||
String createAssignmentsQuery = "create table " + ImpalaConnector.databaseName + ".current_assignment as \n" + findAssignmentsQuery;
|
String createAssignmentsQuery = "create table " + databaseName + ".current_assignment as \n" + findAssignmentsQuery;
|
||||||
String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + ImpalaConnector.databaseName + ".current_assignment";
|
String computeCurrentAssignmentsStatsQuery = "COMPUTE STATS " + databaseName + ".current_assignment";
|
||||||
String getAssignmentsQuery = "select * from " + ImpalaConnector.databaseName + ".current_assignment";
|
String getAssignmentsQuery = "select * from " + databaseName + ".current_assignment";
|
||||||
|
|
||||||
List<Assignment> assignments = new ArrayList<>();
|
List<Assignment> assignments = new ArrayList<>();
|
||||||
|
|
||||||
|
@ -171,8 +174,8 @@ public class UrlController {
|
||||||
// Write the Assignment details to the assignment-table.
|
// Write the Assignment details to the assignment-table.
|
||||||
|
|
||||||
// The "timestamp" is generated from the Java-code, so it's in no way provided by a 3rd party.
|
// The "timestamp" is generated from the Java-code, so it's in no way provided by a 3rd party.
|
||||||
String insertAssignmentsQuery = "insert into " + ImpalaConnector.databaseName + ".assignment \n select pub_data.pubid, pub_data.url, '" + workerId + "', cast('" + timestamp + "' as timestamp)\n"
|
String insertAssignmentsQuery = "insert into " + databaseName + ".assignment \n select pub_data.pubid, pub_data.url, '" + workerId + "', cast('" + timestamp + "' as timestamp)\n"
|
||||||
+ "from (\n select pubid, url from " + ImpalaConnector.databaseName + ".current_assignment) as pub_data";
|
+ "from (\n select pubid, url from " + databaseName + ".current_assignment) as pub_data";
|
||||||
|
|
||||||
try {
|
try {
|
||||||
jdbcTemplate.execute(insertAssignmentsQuery);
|
jdbcTemplate.execute(insertAssignmentsQuery);
|
||||||
|
@ -252,8 +255,8 @@ public class UrlController {
|
||||||
ImpalaConnector.databaseLock.lock();
|
ImpalaConnector.databaseLock.lock();
|
||||||
|
|
||||||
// Store the workerReport into the database.
|
// Store the workerReport into the database.
|
||||||
String insertIntoPayloadBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
|
String insertIntoPayloadBaseQuery = "INSERT INTO " + databaseName + ".payload (id, original_url, actual_url, date, mimetype, size, hash, location, provenance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
|
||||||
String insertIntoAttemptBaseQuery = "INSERT INTO " + ImpalaConnector.databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)";
|
String insertIntoAttemptBaseQuery = "INSERT INTO " + databaseName + ".attempt (id, original_url, date, status, error_class, error_message) VALUES (?, ?, ?, ?, ?, ?)";
|
||||||
|
|
||||||
String payloadErrorMsg = null;
|
String payloadErrorMsg = null;
|
||||||
int failedCount = 0;
|
int failedCount = 0;
|
||||||
|
@ -372,7 +375,7 @@ public class UrlController {
|
||||||
}
|
}
|
||||||
|
|
||||||
private String dropCurrentAssignmentTable() {
|
private String dropCurrentAssignmentTable() {
|
||||||
String dropCurrentAssignmentsQuery = "DROP TABLE " + ImpalaConnector.databaseName + ".current_assignment PURGE";
|
String dropCurrentAssignmentsQuery = "DROP TABLE " + databaseName + ".current_assignment PURGE";
|
||||||
|
|
||||||
try {
|
try {
|
||||||
jdbcTemplate.execute(dropCurrentAssignmentsQuery);
|
jdbcTemplate.execute(dropCurrentAssignmentsQuery);
|
||||||
|
|
|
@ -40,6 +40,10 @@ public class FileUtils {
|
||||||
@Autowired
|
@Autowired
|
||||||
private FileUnZipper fileUnZipper;
|
private FileUnZipper fileUnZipper;
|
||||||
|
|
||||||
|
@Value("${services.pdfaggregation.controller.db.databaseName}")
|
||||||
|
private String databaseName;
|
||||||
|
|
||||||
|
|
||||||
public enum UploadFullTextsResponse {successful, unsuccessful, databaseError}
|
public enum UploadFullTextsResponse {successful, unsuccessful, databaseError}
|
||||||
|
|
||||||
public FileUtils() throws RuntimeException {
|
public FileUtils() throws RuntimeException {
|
||||||
|
@ -69,10 +73,10 @@ public class FileUtils {
|
||||||
parameter = " '" + parameter + "'"; // This will be a "string-check".
|
parameter = " '" + parameter + "'"; // This will be a "string-check".
|
||||||
|
|
||||||
try {
|
try {
|
||||||
jdbcTemplate.execute("CREATE TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + ImpalaConnector.databaseName + "." + tableName + " " + whereClause + parameter);
|
jdbcTemplate.execute("CREATE TABLE " + databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + databaseName + "." + tableName + " " + whereClause + parameter);
|
||||||
jdbcTemplate.execute("DROP TABLE " + ImpalaConnector.databaseName + "." + tableName + " PURGE");
|
jdbcTemplate.execute("DROP TABLE " + databaseName + "." + tableName + " PURGE");
|
||||||
jdbcTemplate.execute("ALTER TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp RENAME TO " + ImpalaConnector.databaseName + "." + tableName);
|
jdbcTemplate.execute("ALTER TABLE " + databaseName + "." + tableName + "_tmp RENAME TO " + databaseName + "." + tableName);
|
||||||
jdbcTemplate.execute("COMPUTE STATS " + ImpalaConnector.databaseName + "." + tableName);
|
jdbcTemplate.execute("COMPUTE STATS " + databaseName + "." + tableName);
|
||||||
} catch (DataAccessException e) {
|
} catch (DataAccessException e) {
|
||||||
errorMsg = "Problem when executing the \"clone-drop-rename\" queries!\n";
|
errorMsg = "Problem when executing the \"clone-drop-rename\" queries!\n";
|
||||||
logger.error(errorMsg, e);
|
logger.error(errorMsg, e);
|
||||||
|
@ -104,7 +108,7 @@ public class FileUtils {
|
||||||
|
|
||||||
ImpalaConnector.databaseLock.lock();
|
ImpalaConnector.databaseLock.lock();
|
||||||
|
|
||||||
String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload where `hash` = ?" ;
|
String getFileLocationForHashQuery = "select `location` from " + databaseName + ".payload where `hash` = ?" ;
|
||||||
|
|
||||||
// Get the file-locations.
|
// Get the file-locations.
|
||||||
int numFullTextUrlsFound = 0;
|
int numFullTextUrlsFound = 0;
|
||||||
|
|
Loading…
Reference in New Issue