parent
33df46f6f5
commit
952bf7c035
|
@ -1,12 +1,14 @@
|
|||
plugins {
|
||||
id 'org.springframework.boot' version '2.7.12'
|
||||
id 'org.springframework.boot' version '2.7.13'
|
||||
id 'io.spring.dependency-management' version '1.1.0'
|
||||
id 'java'
|
||||
}
|
||||
|
||||
java {
|
||||
group = 'eu.openaire.urls_worker'
|
||||
version = '2.1.0-SNAPSHOT'
|
||||
sourceCompatibility = '1.8'
|
||||
}
|
||||
|
||||
repositories {
|
||||
mavenCentral()
|
||||
|
@ -37,7 +39,7 @@ dependencies {
|
|||
exclude group: 'io.minio' // This is not used in the Worker, since it's the Controller which uploads the full-texts to S3. It also includes an older "commons-compress" version which causes problems.
|
||||
}
|
||||
|
||||
implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre'
|
||||
implementation group: 'com.google.guava', name: 'guava', version: '32.1.1-jre'
|
||||
|
||||
// https://mvnrepository.com/artifact/com.google.code.gson/gson
|
||||
implementation 'com.google.code.gson:gson:2.10.1'
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
distributionBase=GRADLE_USER_HOME
|
||||
distributionPath=wrapper/dists
|
||||
distributionUrl=https\://services.gradle.org/distributions/gradle-8.1.1-bin.zip
|
||||
distributionUrl=https\://services.gradle.org/distributions/gradle-8.2-bin.zip
|
||||
networkTimeout=10000
|
||||
validateDistributionUrl=true
|
||||
zipStoreBase=GRADLE_USER_HOME
|
||||
zipStorePath=wrapper/dists
|
||||
|
|
|
@ -22,7 +22,7 @@ elif [[ $# -gt 2 ]]; then
|
|||
echo -e "Wrong number of arguments given: ${#} (more than 2)\nPlease execute it like: script.sh <justRun: 0 | 1> <avoidReInstallingPublicationsRetriever: 0 | 1>"; exit 2
|
||||
fi
|
||||
|
||||
gradleVersion="8.1.1"
|
||||
gradleVersion="8.2"
|
||||
|
||||
shouldBeCarefulWithMaxHeap=0 # This is NOT a cmd-arg.
|
||||
|
||||
|
|
|
@ -195,7 +195,7 @@ public class AssignmentsHandler {
|
|||
|
||||
// Every time we reach a "limit" of handled id-url clear some data-structures of the underlying "PublicationsRetriever" program.
|
||||
// This helps with reducing the memory consumption over the period of weeks or months, and also give a 2nd chance to some domains which may be blocked due to a connectivity issues, but after a month they may be fine.
|
||||
long idUrlPairsHandled = (numHandledAssignmentsBatches * this.maxAssignmentsLimitPerBatch);
|
||||
long idUrlPairsHandled = (numHandledAssignmentsBatches * maxAssignmentsLimitPerBatch);
|
||||
|
||||
if ( idUrlPairsHandled >= ((timesClearingDuplicateUrlsData +1) * idUrlsToHandleBeforeClearingDuplicateUrlsData) ) {
|
||||
UrlUtils.duplicateUrls.clear();
|
||||
|
@ -212,11 +212,11 @@ public class AssignmentsHandler {
|
|||
}
|
||||
|
||||
if ( GeneralController.shouldShutdownWorker
|
||||
|| (numHandledAssignmentsBatches == this.maxAssignmentsBatchesToHandleBeforeShutdown) )
|
||||
|| (numHandledAssignmentsBatches == maxAssignmentsBatchesToHandleBeforeShutdown) )
|
||||
{
|
||||
logger.info("The worker will shutdown, after the full-texts are delivered to the Controller, as " + (GeneralController.shouldShutdownWorker
|
||||
? "it received a \"shutdownWorker\" request!"
|
||||
: "the maximum assignments-batches (" + this.maxAssignmentsBatchesToHandleBeforeShutdown + ") to be handled was reached!"));
|
||||
: "the maximum assignments-batches (" + maxAssignmentsBatchesToHandleBeforeShutdown + ") to be handled was reached!"));
|
||||
|
||||
// Here, just specify that we do not want to request for more assignments. A scheduling job will check if the fulltexts were delivered to the Controller and then shutdown the Worker.
|
||||
shouldNotRequestMore = true;
|
||||
|
@ -302,13 +302,12 @@ public class AssignmentsHandler {
|
|||
}
|
||||
|
||||
if ( sb != null ) {
|
||||
logger.debug("Before change:\n" + sb.toString()); // DEBUG!
|
||||
logger.debug("Before change:\n" + sb); // DEBUG!
|
||||
sb.setLength(0); // Reset it without re-sizing it.
|
||||
}
|
||||
|
||||
List<String> domains = new ArrayList<>(domainsWithAssignments.keySet());
|
||||
int domainsSize = domains.size();
|
||||
|
||||
Integer domainsCounter = -1;
|
||||
|
||||
for ( int i = 0; i < assignmentsSize; ++i )
|
||||
|
@ -324,7 +323,7 @@ public class AssignmentsHandler {
|
|||
}
|
||||
|
||||
if ( sb != null )
|
||||
logger.debug("After change:\n" + sb.toString());
|
||||
logger.debug("After change:\n" + sb);
|
||||
|
||||
return spacedOutAssignments;
|
||||
}
|
||||
|
@ -337,13 +336,9 @@ public class AssignmentsHandler {
|
|||
* */
|
||||
public static HashMap<Object, Integer> getFirstAvailableObjectForSpacedOutDomains(List<String> domainsList, Integer domainsCounter, HashMultimap<String, ?> domainsWithAssignments, int domainsSize, StringBuilder sb)
|
||||
{
|
||||
HashMap<Object, Integer> result = new HashMap<>();
|
||||
Object nextAssignment = null;
|
||||
|
||||
// Normally, this method does not need a recursion-break-safety, as the initial-caller method should call this method exactly N times, where N is the number of all the values of "domainsWithAssignments".
|
||||
// Although, for extra-safety and re-usability, let's have this check here.
|
||||
Set<String> domainsSet = domainsWithAssignments.keySet();
|
||||
if ( domainsSet.isEmpty() )
|
||||
if ( domainsWithAssignments.keySet().isEmpty() )
|
||||
return null; // Break recursion when the domains run-out.
|
||||
|
||||
if ( domainsCounter < (domainsSize -1) )
|
||||
|
@ -353,22 +348,21 @@ public class AssignmentsHandler {
|
|||
|
||||
String currentDomain = domainsList.get(domainsCounter);
|
||||
Set<?> assignmentsOfCurrentDomain = domainsWithAssignments.get(currentDomain);
|
||||
if ( assignmentsOfCurrentDomain.isEmpty() ) {
|
||||
// This domain is out of assignments, check the next available one.
|
||||
result = getFirstAvailableObjectForSpacedOutDomains(domainsList, domainsCounter, domainsWithAssignments, domainsSize, sb);
|
||||
} else {
|
||||
nextAssignment = assignmentsOfCurrentDomain.toArray()[0];
|
||||
if ( assignmentsOfCurrentDomain.isEmpty() ) // This domain is out of assignments, check the next available one.
|
||||
return getFirstAvailableObjectForSpacedOutDomains(domainsList, domainsCounter, domainsWithAssignments, domainsSize, sb);
|
||||
|
||||
Object nextAssignment = assignmentsOfCurrentDomain.toArray()[0];
|
||||
HashMap<Object, Integer> result = new HashMap<>();
|
||||
result.put(nextAssignment, domainsCounter);
|
||||
domainsWithAssignments.remove(currentDomain, nextAssignment);
|
||||
if ( sb != null )
|
||||
sb.append(currentDomain).append("\n"); // DEBUG!
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
Lock fileWriteLock = new ReentrantLock(true);
|
||||
private static final Lock fileWriteLock = new ReentrantLock(true);
|
||||
|
||||
public String writeToFile(String fileFullPath, String stringToWrite, boolean shouldLockThreads)
|
||||
{
|
||||
|
|
|
@ -7,9 +7,7 @@ import eu.openaire.publications_retriever.util.http.HttpConnUtils;
|
|||
import eu.openaire.publications_retriever.util.url.DataToBeLogged;
|
||||
import eu.openaire.publications_retriever.util.url.LoaderAndChecker;
|
||||
import eu.openaire.publications_retriever.util.url.UrlUtils;
|
||||
import eu.openaire.urls_worker.UrlsWorkerApplication;
|
||||
import eu.openaire.urls_worker.components.AssignmentsHandler;
|
||||
import eu.openaire.urls_worker.components.ConnWithController;
|
||||
import eu.openaire.urls_worker.controllers.GeneralController;
|
||||
import eu.openaire.urls_worker.models.Assignment;
|
||||
import eu.openaire.urls_worker.models.Error;
|
||||
|
@ -18,13 +16,11 @@ import eu.openaire.urls_worker.models.UrlReport;
|
|||
import eu.openaire.urls_worker.services.FileStorageService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.File;
|
||||
import java.net.CookieStore;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.sql.Timestamp;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -39,10 +35,6 @@ public class PublicationsRetrieverPlugin {
|
|||
|
||||
private static final Logger logger = LoggerFactory.getLogger(PublicationsRetrieverPlugin.class);
|
||||
|
||||
@Autowired
|
||||
private ConnWithController connWithController;
|
||||
|
||||
|
||||
public static String assignmentsBasePath;
|
||||
|
||||
private static String workerId;
|
||||
|
@ -179,6 +171,9 @@ public class PublicationsRetrieverPlugin {
|
|||
private static final int lengthOfAlreadyDownloadedFromSourceUrlContinuedMessage = ConnSupportUtils.alreadyDownloadedFromSourceUrlContinuedMessage.length();
|
||||
private static final int lengthOfAlreadyDownloadedFromIDMessage = ConnSupportUtils.alreadyDownloadedFromIDMessage.length();
|
||||
|
||||
private static final String provenance = "crawl:PublicationsRetriever";
|
||||
|
||||
|
||||
public static void addUrlReportsToWorkerReport(Collection<Assignment> assignments)
|
||||
{
|
||||
if ( FileUtils.dataToBeLoggedList.size() != assignments.size() ) {
|
||||
|
@ -246,6 +241,7 @@ public class PublicationsRetrieverPlugin {
|
|||
if ( !foundAlreadyDownloadedFullText ) {
|
||||
String addErrorMessage = ((!foundIDUrlInWorkerReport) ? " | That ID-sourceUrl was not found inside the WorkerReport!" : " | The file was not downloaded!");
|
||||
error = new Error(Error.ErrorType.couldRetry, comment + addErrorMessage); // We can still try to download it from the found docUrl, in the future.
|
||||
// The "fileLocation" is null.
|
||||
}
|
||||
}
|
||||
else if ( ! comment.startsWith(HttpConnUtils.docFileNotRetrievedMessage, 0) ) { // If it was downloaded without an error.
|
||||
|
@ -279,7 +275,7 @@ public class PublicationsRetrieverPlugin {
|
|||
// Each record will have the urlID, the datasourceID and possibly one filename, which may contain a different urlID.
|
||||
// The Controller will select the correct datasourceID for before adding it inside the S3-ObjectStore filename.
|
||||
|
||||
Payload payload = new Payload(urlId, data.getSourceUrl(), docOrDatasetUrl, timestamp, mimeType, size, hash, fileLocation, "crawl:PublicationsRetriever", datasourceId);
|
||||
Payload payload = new Payload(urlId, data.getSourceUrl(), docOrDatasetUrl, timestamp, mimeType, size, hash, fileLocation, provenance, datasourceId);
|
||||
// TODO - If support is added for other doc-formats other than "pdf", then make sure the "mime_type" is correctly specified.
|
||||
|
||||
AssignmentsHandler.urlReports.add(new UrlReport(status, payload, error));
|
||||
|
|
Loading…
Reference in New Issue