"join "+ImpalaConnector.databaseName+".publication_urls pu on pu.id=p.id\n"+
"join "+ImpalaConnector.databaseName+".datasource d on d.id=p.datasourceid\n"+
"left outer join (select count(a.id) as counts, a.id from "+ImpalaConnector.databaseName+".attempt a group by a.id) as attempts\n"+
"on attempts.id=p.id\n"+
"left outer join (select a.id, a.original_url from "+ImpalaConnector.databaseName+".assignment a\n"+
"union all\n"+
"select pl.id, pl.original_url from "+ImpalaConnector.databaseName+".payload pl) as existing\n"+
"on existing.id=p.id and existing.original_url=pu.url\n"+
"where d.allow_harvest=true and existing.id is null and coalesce(attempts.counts, 0) <= "+maxAttemptsPerRecordAtomic.get()+
"\nand not exists (select 1 from "+ImpalaConnector.databaseName+".attempt a where a.id=p.id and a.error_class = 'noRetry' limit 1)\n"+
"and pu.url != '' and pu.url is not null\n"+// Some IDs have empty-string urls, there are no "null" urls, but keep the relevant check for future-proofing.
"limit "+(assignmentsLimit*10)+
")\nas non_distinct_results\n"+
"order by coalesce(attempt_count, 0), reverse(pubid), url\n"+
errorMsg="No results retrieved from the \"findAssignmentsQuery\" for worker with id: "+workerId+". Will increase the \"maxAttempts\" to "+maxAttemptsPerRecordAtomic.incrementAndGet()+" for the next requests.";
logger.error(errorMsg);
StringtmpErrMsg=dropCurrentAssignmentTable();
ImpalaConnector.databaseLock.unlock();
if(tmpErrMsg!=null){
errorMsg+="\n"+tmpErrMsg;// The additional error-msg is already logged.
logger.warn("The retrieved results were fewer ("+assignmentsSize+") than the \"assignmentsLimit\" ("+assignmentsLimit+"), for worker with id: "+workerId+". Will increase the \"maxAttempts\" to "+maxAttemptsPerRecordAtomic.incrementAndGet()+" for the next requests.");
}
logger.debug("Finished gathering "+assignmentsSize+" assignments for worker with id \""+workerId+"\". Going to insert them into the \"assignment\" table and then return them to the worker.");
// Write the Assignment details to the assignment-table.
StringinsertAssignmentsQuery="insert into "+ImpalaConnector.databaseName+".assignment \n select pub_data.pubid, pub_data.url, '"+workerId+"', "+timestampMillis+"\n"
+"from (\n select pubid, url from "+ImpalaConnector.databaseName+".current_assignment) as pub_data";
// We write only the payloads which are connected with retrieved full-texts, uploaded to S3-Object-Store.
// We continue with writing the "attempts", as we want to avoid re-checking the failed-urls later.
// The urls which give full-text (no matter if we could not get it from the worker), are flagged as "couldRetry" anyway, so they will be picked-up to be checked again later.
}
else
logger.debug("Finished uploading the full-texts from batch-assignments_"+curReportAssignments);
logger.debug("Going to write the results in the parquet files, then upload them to HDFS, and then load them into the database's tables. For batch-assignments_"+curReportAssignments);
thrownewRuntimeException("All of the parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database, for batch-assignments_"+curReportAssignments);
else{
if(hasAttemptParquetFileProblem)
logger.error("All of the attempt-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"attempt\", for batch-assignments_"+curReportAssignments);
elseif(hasPayloadParquetFileProblem)
logger.error("The single payload-parquet-file failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"payload\", for batch-assignments_"+curReportAssignments);
else
logger.debug("Going to execute \"load\"-requests on the database, for the uploaded parquet-files.");
}
// Load all the parquet files of each type into its table.
thrownewRuntimeException("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" and the \"payload\" tables, for batch-assignments_"+curReportAssignments);
logger.error("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" or the \"payload\" table, for batch-assignments_"+curReportAssignments);
else
logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" and the \"payload\" tables, for batch-assignments_"+curReportAssignments);
}catch(InterruptedExceptionie){// In this case, any unfinished tasks are cancelled.
logger.warn("The current thread was interrupted when waiting for the worker-threads to finish inserting into the tables: "+ie.getMessage());
// This is a very rare case. At the moment, we just move on with table-merging.
StringtmpErrMsg=dropCurrentAssignmentTable();// The table may be partially created, e.g. in case of an "out of memory" error in the database-server, during the creation, resulting in an empty table (yes it has happened).
// This will delete the rows of the "assignment" table which refer to the "curWorkerId". As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
// Only the rows referring to OTHER workerIDs get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
// We don't need to keep the assignment-info anymore, the "findAssignmentsQuery" checks the payload table for previously handled tasks.
returnfileUtils.mergeParquetFiles("assignment"," WHERE workerid != ",curWorkerId);
}
// The "batchExecute" does not work in this Impala-Database, so this is a "giant-query" solution.
// Note: this causes an "Out of memory"-ERROR in the current version of the Impala JDBC driver. If a later version is provided, then this code should be tested.
StringBuildersb=newStringBuilder(baseInsertQuery.length()+(dataSize*6*numParamsPerRow));// TODO - Make this a global Thread-Local var. And then "clear" (reset) it after each use.
sb.append(baseInsertQuery);
for(inti=1;i<=dataSize;++i){
sb.append("(");
for(intj=1;j<=numParamsPerRow;++j){
sb.append("?");
if(j<numParamsPerRow)
sb.append(",");
}
sb.append(")");
if(i<dataSize)
sb.append(",");
}
PreparedStatementpreparedInsertStatement;
try{// We use a "PreparedStatement" to do insertions, for security reasons.