Compare commits

...

32 Commits

Author SHA1 Message Date
Lampros Smyrnaios 39c36f9e66 - Resolve a concurrency issue, by enforcing synchronization on the "BulkImportReport.getJsonReport()" method.
- Increase the number of stacktrace-lines to 20, for bulkImport-segment-failures.
- Improve "GenericUtils.getSelectiveStackTrace()".
2024-05-01 01:29:25 +03:00
Lampros Smyrnaios 8e14d4dbe0 - Fix not counting the files from the bulkImport-segment, which failed due to an exception.
- Write segment-exception-messages to the bulkImport-report.
2024-04-30 23:43:52 +03:00
Lampros Smyrnaios 0d117743c2 - Code-optimization.
- Upload the updated gradle-wrapper and set using the latest Gradle version in "installAndRun.sh" script.
2024-04-30 02:13:08 +03:00
Lampros Smyrnaios 64a1b7d4f0 - Comment-out the bucket-deletion process, in order to avoid any accidental deletion, even if it has to be explicitly allowed in the config.
- Update dependencies.
2024-04-26 12:54:00 +03:00
Lampros Smyrnaios e2d43a9af0 Upgrade the "processBulkImportedFilesSegment" code:
1) Pre-calculate the file-hashes for all files of the segment and perform a single "getHashLocationsQuery", instead of thousands
2) Write some important events to the bulkImportReport file, as soon as they are added in the list.
2024-04-02 14:35:19 +03:00
Lampros Smyrnaios bd323ad69a Avoid a very rare case, where we might get an "IllegalArgumentException" from "Lists.partition()", in case the "sizeOfUrlReports" is <= 3. 2024-03-29 18:12:52 +02:00
Lampros Smyrnaios 08de530f03 Various improvements:
- Handle the case when "fileUtils.constructS3FilenameAndUploadToS3()" returns "null", in "processBulkImportedFile()".
- Avoid an "IllegalArgumentException" in "Lists.partition()" when the number of files to bulkImport are fewer than the number of threads available to handle them.
- Include the last directory's "/" divider in the fileDIR group of "FILEPATH_ID_EXTENSION" regex (renamed from "FILENAME_ID_EXTENSION").
- Fix an incomplete log-message.
- Provide the "fileLocation" argument in the "DocFileData" constructor, in "processBulkImportedFile()", even though it's not used after.
2024-03-29 17:23:01 +02:00
Lampros Smyrnaios 1d821ed803 - Prepare version for next release.
- Fix typo of not using the "OpenAireID" in the S3 location of bulkImported files. Instead, the "fileNameID" was used, which in aggregation is the OpenAireID, but not in bulk-import.
- Update dependencies.
- Code polishing.
2024-03-28 06:09:28 +02:00
Lampros Smyrnaios 8bc5cc35e2 - Optimize writing to the Bulk-import-report file.
- Show the IP of the worker which posts a "workerShutdownReport".
- Code polishing.
2024-03-22 17:50:55 +02:00
Lampros Smyrnaios b9b29dd51c Move some code from "FileUtils.getAndUploadFullTexts()" to two separate methods. 2024-03-20 16:53:03 +02:00
Lampros Smyrnaios 56d233d38e - Move the "FileUtils.mergeParquetFiles()" method to "ParquetFileUtils.mergeParquetFilesOfTable()".
- Fix a typo.
2024-03-20 15:25:19 +02:00
Lampros Smyrnaios 724eae1514 - Optimize the placement of "DatabaseConnector.databaseLock.unlock()" statements.
- Rename a maven-repository.
2024-03-20 15:08:01 +02:00
Lampros Smyrnaios 785204419d Update/cleanup the repositories in "build.gradle". 2024-03-15 12:22:13 +02:00
Lampros Smyrnaios 9b0818b535 - Add handling for additional/specific exceptions, when checking the "futures".
- Move common "ExecutionException" handling-code into its own method: "GenericUtils.getSelectedStackTraceForCausedException()".
- Avoid a double log.
- Code polishing.
2024-03-14 13:59:23 +02:00
Lampros Smyrnaios b34417dc45 Optimize the test-DB creation process:
- Use views of the "initialDatabase" view and tables to a) reduce the amount of space used by test-DBs and b) improve test-db creation performance.
- Avoid possible failures from outdated metadata.
2024-03-14 13:10:54 +02:00
Lampros Smyrnaios f61cae41a1 - Try to get the cause of the exception of the callable-tasks which handle the bulk-import of fileSegments.
- Fix not counting the failedSegments when an exception was thrown.
- Code polishing.
2024-03-13 12:15:59 +02:00
Lampros Smyrnaios 8f9786de09 Upgrade the algorithm for finding the previously-found fulltexts, based on their md5hash:
- Use a single query with a list of the fileHashes, instead of thousands of singe-md5hash-check queries (run at most 6 in parallel) which require a lot of I/O.
- Avoid checking multiple times the same fileHash, in case it is related with multiple payloads.
- In case of a database-error, avoid completely losing the full-texts of that worker, instead, continue processing the full-texts.
2024-03-13 11:28:37 +02:00
Lampros Smyrnaios e4540e7f3c Handle the case when a urlReports-sublist does not have any payloads inside. 2024-03-12 14:25:00 +02:00
Lampros Smyrnaios e20c5d2146 - Add error-handling for the case when no payloads could be associated with a specific url which should have been in the hashMultiMap in "addUrlReportsByMatchingRecordsFromBacklog".
- Fix not cloning the payload, before changing it and adding it in the "prefilledPayloads"-list; instead, an object-reference was used.
2024-03-11 19:48:04 +02:00
Lampros Smyrnaios 1048463ca0 - Improve error-handling in "S3ObjectStore.emptyBucket()".
- Change some log-levels.
- Code polishing.
2024-03-11 16:17:32 +02:00
Lampros Smyrnaios 8f18008001 Avoid performing payload-related operations in case no fulltext was received from the worker, due to en error. 2024-03-11 14:57:13 +02:00
Lampros Smyrnaios ce3e149a95 Improve the "emptying/deleting" process of the S3-bucket. 2024-03-11 13:34:38 +02:00
Lampros Smyrnaios dd394f18a0 - Optimize the JOIN-order in the "findAssignmentsQuery".
- Optimize the "DOC_URL_FILTER"-regex.
- Update dependencies.
2024-03-11 11:35:38 +02:00
Lampros Smyrnaios 43ea64758d - Improve handling of the case when no fulltexts have been found or none of the found ones were requested from the worker, as they were already retrieved in the past.
- Show the number of files with problematic locations (if any of them exist).
- Code polishing.
2024-02-23 12:39:28 +02:00
Lampros Smyrnaios 749172edd8 Add the Jenkins' build-status badge in README. 2024-02-08 19:49:58 +02:00
Lampros Smyrnaios b72996c9a9 - Configure the destination of the logs in the "application.properties" file.
- Add some gradle files to be used by Jenkins.
2024-02-08 19:47:34 +02:00
Lampros Smyrnaios 3563fd6e2a - Try to get the cause of the exception of the callable-tasks which handle parquet-files.
- Update License.
- Update dependencies.
2024-02-07 18:34:28 +02:00
Lampros Smyrnaios 34d7a143e7 Add/improve documentation. 2024-02-01 14:37:29 +02:00
Lampros Smyrnaios 5dadb8ad2f - Optimize the "DOC_URL_FILTER"-regex, by using a non-capturing group.
- Remove an extra "File.separator" from the fulltexts-fullFilePath.
2024-01-19 15:46:23 +02:00
Lampros Smyrnaios bdc61c2cda When at least one worker is still active and have to wait for service-shutdown, show a log-message to inform the user, including that worker's IP. 2024-01-15 13:35:22 +02:00
Lampros Smyrnaios 3a70b57146 Prioritize the full-text urls over the landing-page ones. 2024-01-15 12:59:50 +02:00
Lampros Smyrnaios ee1ca8966b - Avoid continuing to request workerReport-batches when from the 1st batch, the base-directory of that assignments-counter is not found.
- Update dependencies.
2024-01-15 12:57:33 +02:00
24 changed files with 1002 additions and 487 deletions

View File

@ -186,7 +186,7 @@
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 2021-2023 Athena Research and Innovation Center (ARC)
Copyright 2021-2024 OpenAIRE AMKE
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.

View File

@ -1,13 +1,26 @@
# UrlsController
## [![Jenkins build status](https://jenkins-dnet.d4science.org/buildStatus/icon?job=UrlsController)](https://jenkins-dnet.d4science.org/job/UrlsController/)
The Controller's Application receives requests coming from the [**Workers**](https://code-repo.d4science.org/lsmyrnaios/UrlsWorker) (deployed on the cloud), constructs an assignments-list with data received from a database and returns the list to the workers.<br>
Then, it receives the "WorkerReports", it requests the full-texts from the workers, in batches, and uploads them on the S3-Object-Store. Finally, it writes the related reports, along with the updated file-locations into the database.<br>
<br>
It can also process **Bulk-Import** requests, from compatible data sources, in which case it receives the full-text files immediately, without offloading crawling jobs to Workers.<br>
<br>
For interacting with the database we use [**Impala**](https://impala.apache.org/).<br>
For managing and generating data, we use [**Impala**](https://impala.apache.org/) JDBC and WebHDFS.<br>
<br>
**To install and run the application**:
- Run ```git clone``` and then ```cd UrlsController```.
- Set the preferable values inside the [__application.yml__](https://code-repo.d4science.org/lsmyrnaios/UrlsController/src/branch/master/src/main/resources/application.yml) file. Specifically, for tests, set the ***services.pdfaggregation.controller.isTestEnvironment*** property to "**true**" and make sure the "***services.pdfaggregation.controller.db.testDatabaseName***" property is set to a test-database.
- Execute the ```installAndRun.sh``` script which builds and runs the app.<br>
If you want to just run the app, then run the script with the argument "1": ```./installAndRun.sh 1```.<br>
If you want to build and run the app on a **Docker Container**, then run the script with the argument "0" followed by the argument "1": ```./installAndRun.sh 0 1```.<br>
Additionally, if you want to test/visualize the exposed metrics on Prometheus and Grafana, you can deploy their instances on docker containers,
by enabling the "runPrometheusAndGrafanaContainers" switch, inside the "./installAndRun.sh" script.<br>
<br>
**BulkImport API**:
- "**bulkImportFullTexts**" endpoint: **http://\<IP\>:\<PORT\>/api/bulkImportFullTexts?provenance=\<provenance\>&bulkImportDir=\<bulkImportDir\>&shouldDeleteFilesOnFinish={true|false}** <br>
This endpoint loads the right configuration with the help of the "provenance" parameter, delegates the processing to a background thread and immediately returns a message with useful information, including the "reportFileID", which can be used at any moment to request a report about the progress of the bulk-import procedure.<br>
@ -15,6 +28,12 @@ For interacting with the database we use [**Impala**](https://impala.apache.org/
- "**getBulkImportReport**" endpoint: **http://\<IP\>:\<PORT\>/api/getBulkImportReport?id=\<reportFileID\>** <br>
This endpoint returns the bulkImport report, which corresponds to the given reportFileID, in JSON format.
<br>
**How to add a bulk-import datasource**:
- Open the [__application.yml__](https://code-repo.d4science.org/lsmyrnaios/UrlsController/src/branch/master/src/main/resources/application.yml) file.
- Add a new object under the "bulk-import.bulkImportSources" property.
- Read the comments written in the end of the "bulk-import" property and make sure all requirements are met.
<br>
<br>
**Statistics API**:
@ -60,16 +79,6 @@ Note: The Shutdown Service API is accessible by the Controller's host machine.
<br>
<br>
**To install and run the application**:
- Run ```git clone``` and then ```cd UrlsController```.
- Set the preferable values inside the [__application.yml__](https://code-repo.d4science.org/lsmyrnaios/UrlsController/src/branch/master/src/main/resources/application.yml) file.
- Execute the ```installAndRun.sh``` script which builds and runs the app.<br>
If you want to just run the app, then run the script with the argument "1": ```./installAndRun.sh 1```.<br>
If you want to build and run the app on a **Docker Container**, then run the script with the argument "0" followed by the argument "1": ```./installAndRun.sh 0 1```.<br>
Additionally, if you want to test/visualize the exposed metrics on Prometheus and Grafana, you can deploy their instances on docker containers,
by enabling the "runPrometheusAndGrafanaContainers" switch, inside the "./installAndRun.sh" script.<br>
<br>
Implementation notes:
- For transferring the full-text files, we use Facebook's [**Zstandard**](https://facebook.github.io/zstd/) compression algorithm, which brings very big benefits in compression rate and speed.
- The uploaded full-text files follow this naming-scheme: "**datasourceID/recordID::fileHash.pdf**"

View File

@ -13,15 +13,15 @@ java {
repositories {
mavenCentral()
maven {
name "omtd"
url "https://repo.openminted.eu/content/repositories/releases/"
}
maven {
name "pentaho-repo"
url "https://public.nexus.pentaho.org/content/groups/omni/"
name "madgik"
url "https://repo.madgik.di.uoa.gr/content/repositories/thirdparty/"
}
}
ext {
hadoopVersion = '3.4.0'
}
dependencies {
runtimeOnly "org.springframework.boot:spring-boot-devtools"
@ -43,18 +43,18 @@ dependencies {
//implementation group: 'jakarta.validation', name: 'jakarta.validation-api', version: '3.0.2'
// https://mvnrepository.com/artifact/com.google.guava/guava
implementation group: 'com.google.guava', name: 'guava', version: '32.1.3-jre'
implementation group: 'com.google.guava', name: 'guava', version: '33.1.0-jre'
// https://mvnrepository.com/artifact/org.apache.commons/commons-lang3
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.14.0'
// https://mvnrepository.com/artifact/org.apache.commons/commons-compress
implementation("org.apache.commons:commons-compress:1.25.0") {
implementation("org.apache.commons:commons-compress:1.26.1") {
exclude group: 'com.github.luben', module: 'zstd-jni'
}
implementation 'com.github.luben:zstd-jni:1.5.5-11' // Even though this is part of the above dependency, the Apache commons rarely updates it, while the zstd team makes improvements very often.
implementation 'com.github.luben:zstd-jni:1.5.6-3' // Even though this is part of the above dependency, the Apache commons rarely updates it, while the zstd team makes improvements very often.
implementation 'io.minio:minio:8.5.7'
implementation 'io.minio:minio:8.5.9'
// https://mvnrepository.com/artifact/com.cloudera.impala/jdbc
implementation("com.cloudera.impala:jdbc:2.5.31") {
@ -80,7 +80,7 @@ dependencies {
implementation('org.apache.parquet:parquet-avro:1.13.1')
// https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common
implementation('org.apache.hadoop:hadoop-common:3.3.6') {
implementation("org.apache.hadoop:hadoop-common:$hadoopVersion") {
exclude group: 'org.apache.parquet', module: 'parquet-avro'
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-api'
@ -96,7 +96,7 @@ dependencies {
}
// https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core
implementation('org.apache.hadoop:hadoop-mapreduce-client-core:3.3.6') {
implementation("org.apache.hadoop:hadoop-mapreduce-client-core:$hadoopVersion") {
exclude group: 'org.apache.parquet', module: 'parquet-avro'
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-api'
@ -110,17 +110,16 @@ dependencies {
// Add back some updated version of the needed dependencies.
implementation 'org.apache.thrift:libthrift:0.17.0' // Newer versions (>=0.18.X) are not compatible with JAVA 8.
implementation 'com.fasterxml.woodstox:woodstox-core:6.5.1'
implementation 'com.fasterxml.woodstox:woodstox-core:6.6.2'
// https://mvnrepository.com/artifact/org.json/json
implementation 'org.json:json:20231013'
implementation 'org.json:json:20240303' // This is used only in "ParquetFileUtils.createRemoteParquetDirectories()". TODO - Replace it with "gson".
// https://mvnrepository.com/artifact/com.google.code.gson/gson
implementation 'com.google.code.gson:gson:2.10.1'
// https://mvnrepository.com/artifact/io.micrometer/micrometer-registry-prometheus
runtimeOnly 'io.micrometer:micrometer-registry-prometheus:1.12.1'
runtimeOnly 'io.micrometer:micrometer-registry-prometheus:1.12.5'
testImplementation 'org.springframework.security:spring-security-test'
testImplementation "org.springframework.boot:spring-boot-starter-test"

BIN
gradle/wrapper/gradle-wrapper.jar vendored Normal file

Binary file not shown.

View File

@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME

249
gradlew vendored Executable file
View File

@ -0,0 +1,249 @@
#!/bin/sh
#
# Copyright © 2015-2021 the original authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
##############################################################################
#
# Gradle start up script for POSIX generated by Gradle.
#
# Important for running:
#
# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is
# noncompliant, but you have some other compliant shell such as ksh or
# bash, then to run this script, type that shell name before the whole
# command line, like:
#
# ksh Gradle
#
# Busybox and similar reduced shells will NOT work, because this script
# requires all of these POSIX shell features:
# * functions;
# * expansions «$var», «${var}», «${var:-default}», «${var+SET}»,
# «${var#prefix}», «${var%suffix}», and «$( cmd )»;
# * compound commands having a testable exit status, especially «case»;
# * various built-in commands including «command», «set», and «ulimit».
#
# Important for patching:
#
# (2) This script targets any POSIX shell, so it avoids extensions provided
# by Bash, Ksh, etc; in particular arrays are avoided.
#
# The "traditional" practice of packing multiple parameters into a
# space-separated string is a well documented source of bugs and security
# problems, so this is (mostly) avoided, by progressively accumulating
# options in "$@", and eventually passing that to Java.
#
# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS,
# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly;
# see the in-line comments for details.
#
# There are tweaks for specific operating systems such as AIX, CygWin,
# Darwin, MinGW, and NonStop.
#
# (3) This script is generated from the Groovy template
# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
# within the Gradle project.
#
# You can find Gradle at https://github.com/gradle/gradle/.
#
##############################################################################
# Attempt to set APP_HOME
# Resolve links: $0 may be a link
app_path=$0
# Need this for daisy-chained symlinks.
while
APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path
[ -h "$app_path" ]
do
ls=$( ls -ld "$app_path" )
link=${ls#*' -> '}
case $link in #(
/*) app_path=$link ;; #(
*) app_path=$APP_HOME$link ;;
esac
done
# This is normally unused
# shellcheck disable=SC2034
APP_BASE_NAME=${0##*/}
# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036)
APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD=maximum
warn () {
echo "$*"
} >&2
die () {
echo
echo "$*"
echo
exit 1
} >&2
# OS specific support (must be 'true' or 'false').
cygwin=false
msys=false
darwin=false
nonstop=false
case "$( uname )" in #(
CYGWIN* ) cygwin=true ;; #(
Darwin* ) darwin=true ;; #(
MSYS* | MINGW* ) msys=true ;; #(
NONSTOP* ) nonstop=true ;;
esac
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
# Determine the Java command to use to start the JVM.
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD=$JAVA_HOME/jre/sh/java
else
JAVACMD=$JAVA_HOME/bin/java
fi
if [ ! -x "$JAVACMD" ] ; then
die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
else
JAVACMD=java
if ! command -v java >/dev/null 2>&1
then
die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
fi
# Increase the maximum file descriptors if we can.
if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then
case $MAX_FD in #(
max*)
# In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked.
# shellcheck disable=SC2039,SC3045
MAX_FD=$( ulimit -H -n ) ||
warn "Could not query maximum file descriptor limit"
esac
case $MAX_FD in #(
'' | soft) :;; #(
*)
# In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked.
# shellcheck disable=SC2039,SC3045
ulimit -n "$MAX_FD" ||
warn "Could not set maximum file descriptor limit to $MAX_FD"
esac
fi
# Collect all arguments for the java command, stacking in reverse order:
# * args from the command line
# * the main class name
# * -classpath
# * -D...appname settings
# * --module-path (only if needed)
# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables.
# For Cygwin or MSYS, switch paths to Windows format before running java
if "$cygwin" || "$msys" ; then
APP_HOME=$( cygpath --path --mixed "$APP_HOME" )
CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" )
JAVACMD=$( cygpath --unix "$JAVACMD" )
# Now convert the arguments - kludge to limit ourselves to /bin/sh
for arg do
if
case $arg in #(
-*) false ;; # don't mess with options #(
/?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath
[ -e "$t" ] ;; #(
*) false ;;
esac
then
arg=$( cygpath --path --ignore --mixed "$arg" )
fi
# Roll the args list around exactly as many times as the number of
# args, so each arg winds up back in the position where it started, but
# possibly modified.
#
# NB: a `for` loop captures its iteration list before it begins, so
# changing the positional parameters here affects neither the number of
# iterations, nor the values presented in `arg`.
shift # remove old arg
set -- "$@" "$arg" # push replacement arg
done
fi
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Collect all arguments for the java command:
# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
# and any embedded shellness will be escaped.
# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be
# treated as '${Hostname}' itself on the command line.
set -- \
"-Dorg.gradle.appname=$APP_BASE_NAME" \
-classpath "$CLASSPATH" \
org.gradle.wrapper.GradleWrapperMain \
"$@"
# Stop when "xargs" is not available.
if ! command -v xargs >/dev/null 2>&1
then
die "xargs is not available"
fi
# Use "xargs" to parse quoted args.
#
# With -n1 it outputs one arg per line, with the quotes and backslashes removed.
#
# In Bash we could simply go:
#
# readarray ARGS < <( xargs -n1 <<<"$var" ) &&
# set -- "${ARGS[@]}" "$@"
#
# but POSIX shell has neither arrays nor command substitution, so instead we
# post-process each arg (as a line of input to sed) to backslash-escape any
# character that might be a shell metacharacter, then use eval to reverse
# that process (while maintaining the separation between arguments), and wrap
# the whole thing up as a single "set" statement.
#
# This will of course break if any of these variables contains a newline or
# an unmatched quote.
#
eval "set -- $(
printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" |
xargs -n1 |
sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' |
tr '\n' ' '
)" '"$@"'
exec "$JAVACMD" "$@"

92
gradlew.bat vendored Normal file
View File

@ -0,0 +1,92 @@
@rem
@rem Copyright 2015 the original author or authors.
@rem
@rem Licensed under the Apache License, Version 2.0 (the "License");
@rem you may not use this file except in compliance with the License.
@rem You may obtain a copy of the License at
@rem
@rem https://www.apache.org/licenses/LICENSE-2.0
@rem
@rem Unless required by applicable law or agreed to in writing, software
@rem distributed under the License is distributed on an "AS IS" BASIS,
@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@rem See the License for the specific language governing permissions and
@rem limitations under the License.
@rem
@if "%DEBUG%"=="" @echo off
@rem ##########################################################################
@rem
@rem Gradle startup script for Windows
@rem
@rem ##########################################################################
@rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal
set DIRNAME=%~dp0
if "%DIRNAME%"=="" set DIRNAME=.
@rem This is normally unused
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
@rem Resolve any "." and ".." in APP_HOME to make it shorter.
for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome
set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
if %ERRORLEVEL% equ 0 goto execute
echo. 1>&2
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. 1>&2
echo. 1>&2
echo Please set the JAVA_HOME variable in your environment to match the 1>&2
echo location of your Java installation. 1>&2
goto fail
:findJavaFromJavaHome
set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe
if exist "%JAVA_EXE%" goto execute
echo. 1>&2
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% 1>&2
echo. 1>&2
echo Please set the JAVA_HOME variable in your environment to match the 1>&2
echo location of your Java installation. 1>&2
goto fail
:execute
@rem Setup the command line
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*
:end
@rem End local scope for the variables with windows NT shell
if %ERRORLEVEL% equ 0 goto mainEnd
:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
set EXIT_CODE=%ERRORLEVEL%
if %EXIT_CODE% equ 0 set EXIT_CODE=1
if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE%
exit /b %EXIT_CODE%
:mainEnd
if "%OS%"=="Windows_NT" endlocal
:omega

View File

@ -26,7 +26,7 @@ if [[ justRun -eq 1 && shouldRunInDocker -eq 1 ]]; then
justRun=0
fi
gradleVersion="8.5"
gradleVersion="8.7"
if [[ justRun -eq 0 ]]; then
@ -46,7 +46,7 @@ if [[ justRun -eq 0 ]]; then
#gradle tasks # For debugging installation
#gradle -v # For debugging installation
gradle clean build
gradle clean build # --refresh-dependencies # --info
if [[ shouldRunInDocker -eq 1 ]]; then

View File

@ -8,6 +8,7 @@ import eu.openaire.urls_controller.controllers.BulkImportController;
import eu.openaire.urls_controller.controllers.ShutdownController;
import eu.openaire.urls_controller.controllers.StatsController;
import eu.openaire.urls_controller.controllers.UrlsController;
import eu.openaire.urls_controller.models.WorkerInfo;
import eu.openaire.urls_controller.payloads.requests.WorkerReport;
import eu.openaire.urls_controller.services.UrlsServiceImpl;
import eu.openaire.urls_controller.util.FileUtils;
@ -111,22 +112,16 @@ public class ScheduledTasks {
if ( ! future.get() ) // Get and see if an exception is thrown. This blocks the current thread, until the task of the future has finished.
numFailedTasks ++;
} catch (ExecutionException ee) { // These can be serious errors like an "out of memory exception" (Java HEAP).
// The stacktrace of the "ExecutionException" is the one of thi code and not the code which ran inside the background-task. Try to get the cause.
Throwable throwable = ee.getCause();
if ( throwable == null ) {
logger.warn("No cause was retrieved for the \"ExecutionException\"!");
throwable = ee;
}
logger.error(GenericUtils.getSelectiveStackTrace(throwable, "Background task_" + i + " failed with: " + throwable.getMessage(), 30));
logger.error(GenericUtils.getSelectedStackTraceForCausedException(ee, "Background_task_" + i + " failed with: ", null, 30));
numFailedTasks ++;
} catch (CancellationException ce) {
logger.error("Background task_" + i + " was cancelled: " + ce.getMessage());
logger.error("Background_task_" + i + " was cancelled: " + ce.getMessage());
numFailedTasks ++;
} catch (InterruptedException ie) {
logger.error("Background task_" + i + " was interrupted: " + ie.getMessage());
logger.error("Background_task_" + i + " was interrupted: " + ie.getMessage());
numFailedTasks ++;
} catch (IndexOutOfBoundsException ioobe) {
logger.error("IOOBE for background task_" + i + " in the futures-list! " + ioobe.getMessage());
logger.error("IOOBE for background_task_" + i + " in the futures-list! " + ioobe.getMessage());
// Only here, the "future" will be null.
} finally {
if ( future != null ) // It may be null in case we have a IOBE.
@ -173,14 +168,19 @@ public class ScheduledTasks {
// Check whether the workers have not shutdown yet, which means that they either crawl assignments or/and they are waiting for the Controller to process the WorkerReport and then shutdown.
Set<String> workerIds = UrlsController.workersInfoMap.keySet();
if ( workerIds.size() > 0 ) {
for ( String workerId : workerIds )
if ( ! UrlsController.workersInfoMap.get(workerId).getHasShutdown() ) // The workerId is certainly inside the map and has a workerInfo value.
for ( String workerId : workerIds ) {
WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId); // The workerId is certainly inside the map and has a workerInfo value.
if ( ! workerInfo.getHasShutdown() ) {
logger.debug("At least one worker (with IP: " + workerInfo.getWorkerIP() + ") is still active. Waiting for all workers to shutdown, before shutting down the service.");
return; // If at least 1 worker is still active, then do not shut down the Controller.
}
}
logger.info("All workers have already shutdown. Shutting down the Controller..");
} else
logger.info("No workers have participated in the service yet, so the Controller will shut-down immediately.");
logger.info("No workers have participated in the service yet. The Controller will shut-down now.");
// If one worker has crashed, then it will have not informed the Controller. So the controller will think that it is still running and will not shut down.
// IMPORTANT: If one worker has crashed, then it will have not informed the Controller. So the controller will think that it is still running and will not shut down..!
// In this case, we have to manually shut-down the service, from Docker cli.
// Any left-over worker-reports are kept to be retried next time the Controller starts.
@ -198,7 +198,7 @@ public class ScheduledTasks {
{
// We make sure an initial delay of some minutes is in place before this is executed, since we have to make sure all workers are up and running in order for them to be able to answer the full-texts-requests.
if ( UrlsController.numOfWorkers.get() == 0 ) {
if ( UrlsController.numOfActiveWorkers.get() == 0 ) {
long timeToWait = (isTestEnvironment ? 1_200_000 : 43_200_000); // 10 mins | 12 hours
logger.warn("None of the workers is participating in the service, at the moment. Will wait " + ((timeToWait /1000) /60) + " minutes and try again..");
try {
@ -206,8 +206,8 @@ public class ScheduledTasks {
} catch (InterruptedException ie) {
logger.warn("The wait-period was interrupted! Will try either way.");
}
if ( UrlsController.numOfWorkers.get() == 0 ) {
logger.error("None of the workers have participated in the service yet again. Will not process any leftover workerReports!");
if ( UrlsController.numOfActiveWorkers.get() == 0 ) {
logger.error("None of the workers is participating in the service, yet again. Will not process any leftover workerReports!");
return;
}
}
@ -243,7 +243,6 @@ public class ScheduledTasks {
}
@Scheduled(initialDelay = 259_200_000, fixedDelay = 259_200_000) // Run every 3 days. 3 days after startup.
//@Scheduled(initialDelay = 1_200_000, fixedDelay = 1_200_000) // Just for testing (every 1200 secs).
public void checkAndDeleteUnhandledAssignments()
@ -255,7 +254,7 @@ public class ScheduledTasks {
// The assignments just remain in the table, and the urls cannot be rechecked.
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.DAY_OF_MONTH, - 3); // Subtract 3 from current Date.
calendar.add(Calendar.DAY_OF_MONTH, -3); // Subtract 3 from current Date.
DatabaseConnector.databaseLock.lock();
urlsService.deleteAssignmentsWithOlderDate(calendar.getTimeInMillis()); // Any error-log is written inside.
@ -302,7 +301,7 @@ public class ScheduledTasks {
} // Any error is already logged.
// TODO - Export more complex data; <numOfAllPayloadsPerDatasource>, <numOfAllPayloadsPerYear>,
// TODO - Export more complex data; <numOfAllPayloadsPerDatasource>, <numOfAllPayloadsPerPublicationYear>,
// <numOfAggregatedPayloadsPerDatasource>, ..., <numOfBulkImportedPayloadsPerDatasource>, ...
}

View File

@ -58,23 +58,29 @@ public class DatabaseConnector {
logger.info("Going to create (if not exist) the TEST-database \"" + testDatabaseName + "\" and its tables. Also will fill some tables with data from the initial-database \"" + initialDatabaseName + "\".");
jdbcTemplate.execute("CREATE DATABASE IF NOT EXISTS " + testDatabaseName);
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + testDatabaseName + ".publication stored as parquet as select * from " + initialDatabaseName + ".publication");
jdbcTemplate.execute("COMPUTE STATS " + testDatabaseName + ".publication");
try { // Metastore takes some time to recognize the DB has been created, in order to use it later..
Thread.sleep(1000);
} catch (InterruptedException ignore) {}
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + testDatabaseName + ".publication_pids stored as parquet as select * from " + initialDatabaseName + ".publication_pids");
jdbcTemplate.execute("COMPUTE STATS " + testDatabaseName + ".publication_pids");
jdbcTemplate.update("INVALIDATE METADATA");
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + testDatabaseName + ".publication_urls stored as parquet as select * from " + initialDatabaseName + ".publication_urls");
jdbcTemplate.execute("COMPUTE STATS " + testDatabaseName + ".publication_urls");
try { // Metastore takes some time to recognize the DB has been created, in order to use it later..
Thread.sleep(1000);
} catch (InterruptedException ignore) {}
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + testDatabaseName + ".publication_boost stored as parquet as select * from " + initialDatabaseName + ".publication_boost");
jdbcTemplate.execute("COMPUTE STATS " + testDatabaseName + ".publication_boost");
// Create VIEWs of the original data. We just READ from it, so it's safe for our testing environment..
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + testDatabaseName + ".datasource stored as parquet as select * from " + initialDatabaseName + ".datasource");
jdbcTemplate.execute("COMPUTE STATS " + testDatabaseName + ".datasource");
jdbcTemplate.execute("CREATE VIEW IF NOT EXISTS " + testDatabaseName + ".publication as select * from " + initialDatabaseName + ".publication");
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + testDatabaseName + ".payload_legacy stored as parquet as select * from " + initialDatabaseName + ".payload_legacy");
jdbcTemplate.execute("COMPUTE STATS " + testDatabaseName + ".payload_legacy");
jdbcTemplate.execute("CREATE VIEW IF NOT EXISTS " + testDatabaseName + ".publication_pids as select * from " + initialDatabaseName + ".publication_pids");
jdbcTemplate.execute("CREATE VIEW IF NOT EXISTS " + testDatabaseName + ".publication_urls as select * from " + initialDatabaseName + ".publication_urls");
jdbcTemplate.execute("CREATE VIEW IF NOT EXISTS " + testDatabaseName + ".publication_boost as select * from " + initialDatabaseName + ".publication_boost");
jdbcTemplate.execute("CREATE VIEW IF NOT EXISTS " + testDatabaseName + ".datasource as select * from " + initialDatabaseName + ".datasource");
jdbcTemplate.execute("CREATE VIEW IF NOT EXISTS " + testDatabaseName + ".payload_legacy as select * from " + initialDatabaseName + ".payload_legacy");
databaseName = testDatabaseName;
} else {
@ -107,9 +113,21 @@ public class DatabaseConnector {
jdbcTemplate.execute("CREATE TABLE IF NOT EXISTS " + databaseName + ".payload_bulk_import (id string, original_url string, actual_url string, `date` bigint, mimetype string, size string, `hash` string, `location` string, provenance string) stored as parquet");
jdbcTemplate.execute("COMPUTE STATS " + databaseName + ".payload_bulk_import");
jdbcTemplate.execute("CREATE VIEW IF NOT EXISTS " + databaseName + ".payload " +
"AS SELECT * from " + databaseName + ".payload_legacy " +
"UNION ALL SELECT * FROM " + databaseName +".payload_aggregated " +
try { // Metastore takes some time to recognize the tables have been created, in order to use them in the view.
Thread.sleep(1000);
} catch (InterruptedException ignore) {}
jdbcTemplate.update("INVALIDATE METADATA");
try { // Metastore takes some time to recognize the tables have been created, in order to use them in the view.
Thread.sleep(1000);
} catch (InterruptedException ignore) {}
jdbcTemplate.execute("CREATE VIEW IF NOT EXISTS " + databaseName + ".payload\n" +
"AS SELECT * from " + databaseName + ".payload_legacy\n" +
"UNION ALL SELECT * FROM " + databaseName +".payload_aggregated\n" +
"UNION ALL SELECT * FROM " + databaseName + ".payload_bulk_import");
// We do not do the "compute stats" for the view, since we get the following error: "COMPUTE STATS not supported for view: pdfaggregationdatabase_payloads_view.payload".

View File

@ -136,7 +136,8 @@ public class ShutdownController {
@PostMapping("workerShutdownReport")
public ResponseEntity<?> workerShutdownReport(@RequestParam String workerId, HttpServletRequest request)
{
String initMsg = "Received a \"workerShutdownReport\" from worker: \"" + workerId + "\".";
String remoteAddr = GenericUtils.getRequestorAddress(request);
String initMsg = "Received a \"workerShutdownReport\" from worker: \"" + workerId + "\" [IP: " + remoteAddr + "].";
WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId);
if ( workerInfo == null ) {
String errorMsg = "The worker with id \"" + workerId + "\" has not participated in the PDF-Aggregation-Service!";
@ -144,16 +145,15 @@ public class ShutdownController {
return ResponseEntity.badRequest().body(errorMsg);
}
String remoteAddr = GenericUtils.getRequestorAddress(request);
if ( ! remoteAddr.equals(workerInfo.getWorkerIP()) ) {
logger.error(initMsg + " The request came from another IP: " + remoteAddr + " | while this worker was registered with this IP: " + workerInfo.getWorkerIP());
logger.error(initMsg + " The request came from an IP different from the one this worker was registered with: " + workerInfo.getWorkerIP());
return ResponseEntity.status(HttpStatus.FORBIDDEN).build();
}
logger.info(initMsg);
workerInfo.setHasShutdown(true); // This will update the map.
UrlsController.numOfWorkers.decrementAndGet();
UrlsController.numOfActiveWorkers.decrementAndGet();
// Return "HTTP-OK" to this worker. If this was part of a shutdown-service request, then wait for the scheduler to check and shutdown the service.
return ResponseEntity.ok().build();

View File

@ -45,8 +45,7 @@ public class UrlsController {
public static final ConcurrentHashMap<String, WorkerInfo> workersInfoMap = new ConcurrentHashMap<>(6);
public static final AtomicInteger numOfWorkers = new AtomicInteger(0);
public static final AtomicInteger numOfActiveWorkers = new AtomicInteger(0);
public static ExecutorService backgroundExecutor;
@ -121,12 +120,12 @@ public class UrlsController {
if ( workerInfo.getHasShutdown() ) {
logger.info("The worker with id \"" + workerId + "\" was restarted.");
workerInfo.setHasShutdown(false);
numOfWorkers.incrementAndGet();
numOfActiveWorkers.incrementAndGet();
}
} else {
logger.info("The worker \"" + workerId + "\" is requesting assignments for the first time. Going to store its IP [" + remoteAddr + "] in memory.");
workersInfoMap.put(workerId, new WorkerInfo(remoteAddr, false));
numOfWorkers.incrementAndGet();
numOfActiveWorkers.incrementAndGet();
}
return urlsService.getAssignments(workerId, assignmentsLimit);

View File

@ -43,10 +43,13 @@ public class BulkImportReport {
public void addEvent(String event) {
eventsMultimap.put(GenericUtils.getReadableCurrentTimeAndZone(), event);
eventsMultimap.put(GenericUtils.getReadableCurrentTimeAndZone(), event); // This is synchronized.
}
public String getJsonReport()
/**
* Synchronize it to avoid concurrency issues when concurrent calls are made to the same bulkImport-Report object.
* */
public synchronized String getJsonReport()
{
//Convert the LinkedHashMultiMap<String, String> to Map<String, Collection<String>>, since Gson cannot serialize Multimaps.
eventsMap = eventsMultimap.asMap();

View File

@ -18,9 +18,9 @@ public class FileLocationData {
public FileLocationData(String fileLocation) throws RuntimeException {
// Extract and set LocationData.
Matcher matcher = FileUtils.FILENAME_ID_EXTENSION.matcher(fileLocation);
Matcher matcher = FileUtils.FILEPATH_ID_EXTENSION.matcher(fileLocation);
if ( !matcher.matches() )
throw new RuntimeException("Failed to match the \"" + fileLocation + "\" with the regex: " + FileUtils.FILENAME_ID_EXTENSION);
throw new RuntimeException("Failed to match the \"" + fileLocation + "\" with the regex: " + FileUtils.FILEPATH_ID_EXTENSION);
fileDir = matcher.group(1);
if ( (fileDir == null) || fileDir.isEmpty() )
throw new RuntimeException("Failed to extract the \"fileDir\" from \"" + fileLocation + "\".");

View File

@ -68,6 +68,14 @@ public class Payload {
this.datasourceId = datasourceId;
}
public Payload clone()
{
// Return a new object with the same values. Clone whatever objects are not immutable.!
return new Payload(this.id, this.original_url, this.actual_url, (Timestamp) this.timestamp_acquired.clone(), this.mime_type, this.size, this.hash,
this.location, this.provenance, this.datasourceId);
}
public String getId() {
return id;
}

View File

@ -15,8 +15,6 @@ import org.apache.avro.generic.GenericData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import javax.xml.bind.DatatypeConverter;
@ -27,10 +25,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.sql.Types;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
@ -44,18 +39,12 @@ public class BulkImportServiceImpl implements BulkImportService {
private static final Logger logger = LoggerFactory.getLogger(BulkImportServiceImpl.class);
@Autowired
private FileUtils fileUtils;
@Autowired
private ParquetFileUtils parquetFileUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
/**
* Given a directory with full-text-files, this method imports the full-texts files in the PDF Aggregation Service.
@ -78,7 +67,7 @@ public class BulkImportServiceImpl implements BulkImportService {
String errorMsg = "The payloadsSchema could not be parsed!";
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
}
@ -88,7 +77,7 @@ public class BulkImportServiceImpl implements BulkImportService {
String errorMsg = "Could not retrieve the files for bulk-import!";
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
}
@ -98,7 +87,7 @@ public class BulkImportServiceImpl implements BulkImportService {
String errorMsg = "No files were found inside the bulkImportDir: " + bulkImportDirName;
logger.warn(errorMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
}
@ -113,18 +102,18 @@ public class BulkImportServiceImpl implements BulkImportService {
String errorMsg = "Could not create the local parquet-directory: " + localParquetDir;
logger.error(errorMsg + additionalLoggingMsg, e);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
}
// Create a new directory on HDFS, with this bulkImportDir name. So, that there will not be any "load data" operation to fail because another thread has loaded that base-dir right before.
String currentBulkImportHdfsDir = parquetFileUtils.parquetHDFSDirectoryPathPayloadsBulkImport + relativeBulkImportDir;
if ( ! parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + currentBulkImportHdfsDir + parquetFileUtils.mkDirsAndParams) ) { // N0-op if it already exists. It is very quick.
if ( ! parquetFileUtils.applyHDFSOperation(parquetFileUtils.webHDFSBaseUrl + currentBulkImportHdfsDir + parquetFileUtils.mkDirsAndParams) ) { // N0-op if it already exists. It is very quick.
String errorMsg = "Could not create the remote HDFS-directory: " + currentBulkImportHdfsDir;
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
}
@ -133,13 +122,15 @@ public class BulkImportServiceImpl implements BulkImportService {
List<Callable<Integer>> callableTasksForFileSegments = new ArrayList<>(numOfFiles);
int sizeOfEachSegment = (numOfFiles / BulkImportController.numOfThreadsForBulkImportProcedures);
if ( sizeOfEachSegment == 0 ) // In case the numFiles are fewer than the numOfThreads to handle them.
sizeOfEachSegment = 1;
List<List<String>> subLists = Lists.partition(fileLocations, sizeOfEachSegment); // Divide the initial list to "numOfThreadsForBulkImportProcedures" subLists. The last one may have marginally fewer files.
int subListsSize = subLists.size();
msg = "Going to bulk-import the " + numOfFiles + " files in parallel, after dividing them in " + subListsSize + " segments.";
logger.debug(msg + additionalLoggingMsg);
bulkImportReport.addEvent(msg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false);
for ( int i = 0; i < subListsSize; ++i ) {
int finalI = i;
@ -163,11 +154,21 @@ public class BulkImportServiceImpl implements BulkImportService {
numFailedSegments++;
// In case all the files failed to be bulk-imported, then we will detect it in the "numSuccessfulSegments"-check later.
// The failed-to-be-imported files, will not be deleted, even if the user specifies that he wants to delete the directory.
} catch (ExecutionException ee) {
String stackTraceMessage = GenericUtils.getSelectiveStackTrace(ee, null, 15); // These can be serious errors like an "out of memory exception" (Java HEAP).
logger.error("Task_" + (i+1) + " failed with: " + ee.getMessage() + additionalLoggingMsg + GenericUtils.endOfLine + stackTraceMessage);
} catch (ExecutionException ee) { // These can be serious errors like an "out of memory exception" (Java HEAP).
numFailedSegments ++;
numAllFailedFiles += subLists.get(i).size(); // We assume all files of this segment failed, as all are passed through the same parts of code, so any serious exception should arise from the 1st files being processed and the rest of the files wil be skipped..
logger.error(GenericUtils.getSelectedStackTraceForCausedException(ee, "Task_" + i + " failed with: ", additionalLoggingMsg, 20));
bulkImportReport.addEvent("Segment_" + i + " failed with: " + ee.getCause().getMessage());
} catch (CancellationException ce) {
logger.error("Task_" + (i+1) + " was cancelled: " + ce.getMessage() + additionalLoggingMsg);
numFailedSegments ++;
numAllFailedFiles += subLists.get(i).size(); // We assume all files have failed.
logger.error("Task_" + i + " was cancelled: " + ce.getMessage() + additionalLoggingMsg);
bulkImportReport.addEvent("Segment_" + i + " failed with: " + ce.getMessage());
} catch (InterruptedException ie) {
numFailedSegments ++;
numAllFailedFiles += (subLists.get(i).size() / 3); // In this case, only some of the files will have failed (do not know how many).
logger.error("Task_" + i + " was interrupted: " + ie.getMessage());
bulkImportReport.addEvent("Segment_" + i + " failed with: " + ie.getMessage());
} catch (IndexOutOfBoundsException ioobe) {
logger.error("IOOBE for task_" + i + " in the futures-list! " + ioobe.getMessage() + additionalLoggingMsg);
}
@ -176,7 +177,7 @@ public class BulkImportServiceImpl implements BulkImportService {
String errorMsg = "An error occurred when trying to bulk-import data from bulkImportDir: " + bulkImportDirName;
logger.error(errorMsg, e);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
} finally {
@ -189,7 +190,7 @@ public class BulkImportServiceImpl implements BulkImportService {
String errorMsg = "None of the files inside the bulkImportDir: " + bulkImportDirName + " were imported!";
logger.error(errorMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
} else if ( numAllFailedFiles > 0 ) { // Some failed, but not all.
@ -200,24 +201,23 @@ public class BulkImportServiceImpl implements BulkImportService {
logger.info(msg);
}
bulkImportReport.addEvent(msg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false);
// Merge the parquet files inside the table "payload_bulk_import", to improve performance of future operations.
DatabaseConnector.databaseLock.lock();
String mergeErrorMsg = fileUtils.mergeParquetFiles("payload_bulk_import", "", null); // msg is already logged
String mergeErrorMsg = parquetFileUtils.mergeParquetFilesOfTable("payload_bulk_import", "", null); // msg is already logged
DatabaseConnector.databaseLock.unlock();
if ( mergeErrorMsg != null ) { // the message in already logged
DatabaseConnector.databaseLock.unlock();
bulkImportReport.addEvent(mergeErrorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
}
DatabaseConnector.databaseLock.unlock();
String successMsg = "Finished the bulk-import procedure for " + provenance + " and bulkImportDir: " + bulkImportDirName;
logger.info(successMsg);
bulkImportReport.addEvent(successMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false);
// The report-file will be overwritten every now and then, instead of appended, since we want to add an update new JSON report-object each time.
// Also, we do not want to write the object in the end (in its final form), since we want the user to have the ability to request the report at any time,
// after submitting the bulk-import request, to see its progress (since the number of file may be very large and the processing may take many hours).
@ -237,6 +237,44 @@ public class BulkImportServiceImpl implements BulkImportService {
String msg = "Going to import " + numOfFilesInSegment + " files, for segment-" + segmentCounter;
logger.debug(msg + additionalLoggingMsg);
bulkImportReport.addEvent(msg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
HashMap<String, DocFileData> fileLocationsWithData = new HashMap<>(numOfFilesInSegment); // Map the fileLocations with hashes in order to quickly retrieve them later, when constructing the s3-file-location.
Set<String> fileHashes = new HashSet<>(numOfFilesInSegment);
// Get file-data for this segment's files.
for ( String fileLocation : fileLocationsSegment ) {
DocFileData docFileData = new DocFileData(new File(fileLocation), null, null, fileLocation);
docFileData.calculateAndSetHashAndSize();
String fileHash = docFileData.getHash();
if ( fileHash != null ) { // We will not work with files for which we cannot produce the hash, since the s3-file-location requires it.
fileHashes.add(fileHash);
fileLocationsWithData.put(fileLocation, docFileData);
}
}
int fileHashesSetSize = fileHashes.size();
if ( fileHashesSetSize == 0 ) {
msg = "No fileHashes were retrieved for segment_" + segmentCounter + ", no files will be processed from it.";
logger.warn(msg + additionalLoggingMsg);
bulkImportReport.addEvent(msg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
return numOfFilesInSegment; // All files of this segment failed.
}
// Get the "alreadyFound" fileLocations for the fileHashes which exist in the DB.
HashMap<String, String> hashWithExistingLocationMap = fileUtils.getHashLocationMap(fileHashes, fileHashesSetSize, segmentCounter, "segment");
int numAlreadyRetrievedFiles = hashWithExistingLocationMap.size();
if ( numAlreadyRetrievedFiles > 0 ) {
msg = numAlreadyRetrievedFiles + " files from segment_" + segmentCounter + ", have been already retrieved in the past.";
logger.warn(msg + additionalLoggingMsg);
bulkImportReport.addEvent(msg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
}
// Generate the payload-records to be inserted in the DB.
// The OpenAireID which is synthesised from the fileName, may be different from the ID related to the exact same file which was retrieved in the past.
// The same file may be related with multiple publications from different datasources. The deduplication is out of the scope of this service.
// All different OpenAIRE IDs should be saved, independently of the fact that their urls or files may be related with other publications as well.
List<GenericData.Record> payloadRecords = new ArrayList<>(numOfFilesInSegment);
@ -247,33 +285,40 @@ public class BulkImportServiceImpl implements BulkImportService {
// Upload files to S3 and collect payloadRecords.
for ( int i=0; i < numOfFilesInSegment; ++i ) {
String fileLocation = fileLocationsSegment.get(i);
GenericData.Record record = null;
try {
record = processBulkImportedFile(fileLocation, provenance, bulkImportSource, timeMillis, additionalLoggingMsg);
} catch (Exception e) {
String errorMsg = "Exception when uploading the files of segment_" + segmentCounter + " to the S3 Object Store. Will avoid uploading any file for this segment..";
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
for ( int j=i; j < numOfFilesInSegment; ++j )
failedFiles.add(fileLocationsSegment.get(j)); // The rest of the files are considered "failed".
break;
}
if ( record != null )
payloadRecords.add(record);
else {
bulkImportReport.addEvent("An error caused the file: " + fileLocation + " to not be imported!");
DocFileData docFileData = fileLocationsWithData.get(fileLocation);
if ( docFileData == null ) {
failedFiles.add(fileLocation);
} else {
String alreadyRetrievedFileLocation = hashWithExistingLocationMap.get(docFileData.getHash());
GenericData.Record record = null;
try {
record = processBulkImportedFile(docFileData, alreadyRetrievedFileLocation, provenance, bulkImportSource, timeMillis, additionalLoggingMsg);
} catch (Exception e) {
String errorMsg = "Exception when uploading the files of segment_" + segmentCounter + " to the S3 Object Store. Will avoid uploading the rest of the files for this segment.. " + e.getMessage();
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
for ( int j=i; j < numOfFilesInSegment; ++j )
failedFiles.add(fileLocationsSegment.get(j)); // The rest of the files are considered "failed".
break;
}
if ( record != null )
payloadRecords.add(record);
else {
bulkImportReport.addEvent("An error caused the file: " + fileLocation + " to not be imported!");
failedFiles.add(fileLocation);
}
}
if ( ((++counter) % 150) == 0 ) { // Every 150 files, report the status for this segment.
if ( ((++counter) % 150) == 0 ) { // Every 150 files, report the status for this segment and write it to the file.
msg = "Progress for segment-" + segmentCounter + " : " + payloadRecords.size() + " files have been imported and " + failedFiles.size() + " have failed, out of " + numOfFilesInSegment + " files.";
if ( logger.isTraceEnabled() )
logger.trace(msg + additionalLoggingMsg);
bulkImportReport.addEvent(msg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
}
}
} // End of processing files for this segment.
int numOfPayloadRecords = payloadRecords.size();
if ( numOfPayloadRecords == 0 ) {
@ -325,8 +370,9 @@ public class BulkImportServiceImpl implements BulkImportService {
logger.trace("Going to load the data of parquet-file: \"" + parquetFileName + "\" to the database-table: \"payload_bulk_import\"." + additionalLoggingMsg); // DEBUG!
DatabaseConnector.databaseLock.lock();
if ( !parquetFileUtils.loadParquetDataIntoTable((currentBulkImportHdfsDir + parquetFileName), "payload_bulk_import") ) {
DatabaseConnector.databaseLock.unlock();
boolean parquetDataLoaded = parquetFileUtils.loadParquetDataIntoTable((currentBulkImportHdfsDir + parquetFileName), "payload_bulk_import");
DatabaseConnector.databaseLock.unlock();
if ( !parquetDataLoaded ) {
errorMsg = "Could not load the payload-records to the database, for segment-" + segmentCounter + "!";
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
@ -334,7 +380,6 @@ public class BulkImportServiceImpl implements BulkImportService {
// None of the files of this segment will be deleted, in any case.
return numOfFilesInSegment; // All files of this segment have failed.
}
DatabaseConnector.databaseLock.unlock();
String segmentSuccessMsg = "Finished importing " + numOfPayloadRecords + " files, out of " + numOfFilesInSegment + ", for segment-" + segmentCounter + ".";
logger.info(segmentSuccessMsg + additionalLoggingMsg);
@ -347,77 +392,54 @@ public class BulkImportServiceImpl implements BulkImportService {
// Delete all files except the ones in the "failedHashSet".
for ( String fileLocation : fileLocationsSegment ) {
if ( !failedFiles.contains(fileLocation) )
if ( !fileUtils.deleteFile(fileLocation) )
if ( !fileUtils.deleteFile(fileLocation) ) // The "error-log-message" is shown inside.
bulkImportReport.addEvent("The file " + fileLocation + " could not be deleted! Please make sure you have provided the WRITE-permission.");
}
}
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true);
return (numOfFilesInSegment - numOfPayloadRecords); // Return the numOfFailedFiles.
}
private GenericData.Record processBulkImportedFile(String fileLocation, String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, String additionalLoggingMsg)
private GenericData.Record processBulkImportedFile(DocFileData docFileData, String alreadyRetrievedFileLocation, String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, String additionalLoggingMsg)
throws ConnectException, UnknownHostException
{
File fullTextFile = new File(fileLocation);
DocFileData docFileData = new DocFileData(fullTextFile, null, null, null);
docFileData.calculateAndSetHashAndSize();
// Check if this file is already found by crawling. Even though we started excluding this datasource from crawling, many full-texts have already been downloaded.
// Also, it may be the case that this file was downloaded by another datasource.
FileLocationData fileLocationData;
try {
fileLocationData = new FileLocationData(fileLocation);
fileLocationData = new FileLocationData(docFileData.getLocation());
} catch (RuntimeException re) {
logger.error(re.getMessage() + additionalLoggingMsg);
return null;
}
String fileHash = docFileData.getHash();
if ( fileHash == null )
return null; // No check of past found full-text can be made nor the S3-fileName can be created.
String datasourceId = bulkImportSource.getDatasourceID();
String datasourcePrefix = bulkImportSource.getDatasourcePrefix();
String fileNameID = fileLocationData.getFileNameID();
String actualUrl = (bulkImportSource.getPdfUrlPrefix() + fileNameID); // This string-concatenation, works with urls of Arvix. A different construction may be needed for other datasources.
String originalUrl = actualUrl; // We have the full-text files from bulk-import, so let's assume the original-url is also the full-text-link.
final String getFileLocationForHashQuery = "select `location` from " + DatabaseConnector.databaseName + ".payload where `hash` = ? limit 1";
final int[] hashArgType = new int[] {Types.VARCHAR};
String alreadyFoundFileLocation = null;
DatabaseConnector.databaseLock.lock();
try {
alreadyFoundFileLocation = jdbcTemplate.queryForObject(getFileLocationForHashQuery, new Object[]{fileHash}, hashArgType, String.class);
} catch (EmptyResultDataAccessException erdae) {
// No fileLocation is found, it's ok. It will be null by default.
} catch (Exception e) {
logger.error("Error when executing or acquiring data from the the 'getFileLocationForHashQuery'!" + additionalLoggingMsg, e);
// Continue with bulk-importing the file and uploading it to S3.
} finally {
DatabaseConnector.databaseLock.unlock();
}
String openAireId = generateOpenaireId(fileNameID, datasourcePrefix, bulkImportSource.getIsAuthoritative());
String openAireId = generateOpenaireId(fileNameID, bulkImportSource.getDatasourcePrefix(), bulkImportSource.getIsAuthoritative());
if ( openAireId == null )
return null;
String s3Url = null;
String fileHash = docFileData.getHash(); // It's guaranteed to NOT be null at this point.
if ( alreadyFoundFileLocation != null ) // If the full-text of this record is already-found and uploaded.
String s3Url = null;
if ( alreadyRetrievedFileLocation != null ) // If the full-text of this record is already-found and uploaded.
{
// This full-text was found to already be in the database.
// If it has the same datasourceID, then it likely was crawled before from an ID belonging to this datasource.
// If also has the same ID, then the exact same record from that datasource was retrieved previously.
// If also, has the same ID, then the exact same record from that datasource was retrieved previously.
// Else, the file was downloaded by another record of this datasource.
// ELse if the datasourceID is not the same, then the same file was retrieved from another datasource.
// The above analysis is educational, it does not need to take place and is not currently used.
s3Url = alreadyRetrievedFileLocation;
} else {
s3Url = fileUtils.constructS3FilenameAndUploadToS3(fileLocationData.getFileDir(), fileLocationData.getFileName(), openAireId, fileLocationData.getDotFileExtension(), bulkImportSource.getDatasourceID(), fileHash);
if ( s3Url == null )
return null;
}
s3Url = alreadyFoundFileLocation;
} else
s3Url = fileUtils.constructS3FilenameAndUploadToS3(fileLocationData.getFileDir(), fileLocationData.getFileName(), fileNameID, fileLocationData.getDotFileExtension(), datasourceId, fileHash);
// TODO - If another url-schema is introduced for other datasources, have a "switch"-statement and perform the right "actualUrl"-creation based on current schema.
String actualUrl = (bulkImportSource.getPdfUrlPrefix() + fileNameID); // This string-concatenation, works with urls of Arvix. A different construction may be needed for other datasources.
String originalUrl = actualUrl; // We have the full-text files from bulk-import, so let's assume the original-url is also the full-text-link.
return parquetFileUtils.getPayloadParquetRecord(openAireId, originalUrl, actualUrl, timeMillis, bulkImportSource.getMimeType(),
docFileData.getSize(), fileHash, s3Url, provenance, true); // It may return null.

View File

@ -19,7 +19,7 @@ public class StatsServiceImpl implements StatsService {
private JdbcTemplate jdbcTemplate;
// No DB-lock is required for these READ-operations.
// BUT! The is an issue.. these queries may run while a "table-merging" operation is in progress.. thus resulting in "no table reference" and "no file found (fieName.parquet)"
// BUT! There is an issue.. these queries may run while a "table-merging" operation is in progress.. thus resulting in "no table reference" and "no file found (fieName.parquet)"
// Thus, we need to have an "error-detection-and-retry" mechanism, in order to avoid returning error that we know will exist in certain times and we can overcome them.
// The final time-to-return of the results-retrieval methods may be somewhat large, but the alternative of returning predictable errors or locking the DB and slowing down the aggregation system are even worse.

View File

@ -67,6 +67,11 @@ public class UrlsServiceImpl implements UrlsService {
private static String excludedDatasourceIDsStringList = null;
private static final String DOC_URL_FILTER = ".+(?:pdf|download|/doc|document|(?:/|[?]|&)file|/fulltext|attachment|/paper|view(?:file|doc)|/get|cgi/viewcontent.cgi\\?|t[ée]l[ée]charger|descargar).*";
// "DOC_URL_FILTER" works for lowerCase Strings (we use the "ignore-case" indicator in the "regexp_like()" method).
public static final ExecutorService insertsExecutor = Executors.newFixedThreadPool(6);
// TODO - Unify this ExecutorService with the hash-matching executorService. Since one will ALWAYS be called after the other. So why having two ExecServices to handle?
@ -95,15 +100,8 @@ public class UrlsServiceImpl implements UrlsService {
// Prepare the "excludedDatasourceIDsStringList" to be used inside the "findAssignmentsQuery". Create the following string-pattern:
// ("ID_1", "ID_2", ...)
final StringBuilder sb = new StringBuilder((exclusionListSize * 46) + (exclusionListSize -1) +2 );
sb.append("(");
for ( int i=0; i < exclusionListSize; ++i ) {
sb.append("\"").append(excludedIDs.get(i)).append("\"");
if ( i < (exclusionListSize -1) )
sb.append(", ");
}
sb.append(")");
excludedDatasourceIDsStringList = sb.toString();
int stringBuilderCapacity = ((exclusionListSize * 46) + (exclusionListSize -1) +2);
excludedDatasourceIDsStringList = FileUtils.getQueryListString(excludedIDs, exclusionListSize, stringBuilderCapacity);
logger.info("The following bulkImport data-sources will be excluded from crawling: " + excludedDatasourceIDsStringList);
}
@ -117,11 +115,11 @@ public class UrlsServiceImpl implements UrlsService {
String findAssignmentsQuery =
"select pubid, url, datasourceid, datasourcename\n" + // Select the final sorted data with "assignmentsLimit".
"from (select distinct p.id as pubid, pu.url as url, pb.level as level, attempts.counts as attempt_count, p.year as pub_year, d.id as datasourceid, d.name as datasourcename\n" + // Select the distinct id-url data. Beware that this will return duplicate id-url pairs, wince one pair may be associated with multiple datasources.
" from " + DatabaseConnector.databaseName + ".publication p\n" +
" from " + DatabaseConnector.databaseName + ".publication_urls pu\n" +
" join " + DatabaseConnector.databaseName + ".publication p on p.id=pu.id\n" +
" join " + DatabaseConnector.databaseName + ".datasource d on d.id=p.datasourceid and d.allow_harvest=true"+
((excludedDatasourceIDsStringList != null) ? // If we have an exclusion-list, use it below.
(" and d.id not in " + excludedDatasourceIDsStringList + GenericUtils.endOfLine) : "") +
" join " + DatabaseConnector.databaseName + ".publication_urls pu on pu.id=p.id\n" +
" left anti join (select a.original_url from " + DatabaseConnector.databaseName + ".assignment a\n" +
" union all\n" +
" select pl.original_url from " + DatabaseConnector.databaseName + ".payload pl\n" + // Here we access the payload-VIEW which includes the three payload-tables.
@ -135,7 +133,7 @@ public class UrlsServiceImpl implements UrlsService {
" and not exists (select 1 from " + DatabaseConnector.databaseName + ".attempt a where a.original_url=pu.url and a.error_class = 'noRetry' limit 1)\n" +
" and (p.year <= " + currentYear + " or p.year > " + (currentYear + 5) + ")\n" + // Exclude the pubs which will be published in the next 5 years. They don't provide full-texts now. (We don't exclude all future pubs, since, some have invalid year, like "9999").
") as distinct_results\n" +
"order by coalesce(attempt_count, 0), coalesce(level, 0) desc, coalesce(pub_year, 0) desc, reverse(pubid), url\n" + // We also order by reverse "pubid" and "url", in order to get the exactly same records for consecutive runs, all things being equal.
"order by coalesce(attempt_count, 0), coalesce(level, 0) desc, coalesce(pub_year, 0) desc, (case when regexp_like(url, '" + DOC_URL_FILTER + "', 'i') then 1 else 0 end) desc, reverse(pubid), url\n" + // We also order by reverse "pubid" and "url", in order to get the exactly same records for consecutive runs, all things being equal.
"limit " + assignmentsLimit;
// The datasourceID and datasourceName are currently not used during the processing in the Worker. They may be used by the Worker, in the future to apply a datasource-specific aggregation plugin to take the full-texts quickly, instead of using the general crawling one.
@ -276,26 +274,27 @@ public class UrlsServiceImpl implements UrlsService {
logger.info("Initializing the addition of the worker's (" + curWorkerId + ") report for assignments_" + curReportAssignmentsCounter);
boolean hasFulltexts = true;
// Before continuing with inserts, take and upload the fullTexts from the Worker. Also, update the file-"location".
FileUtils.UploadFullTextsResponse uploadFullTextsResponse = fileUtils.getAndUploadFullTexts(urlReports, sizeOfUrlReports, curReportAssignmentsCounter, curWorkerId);
if ( uploadFullTextsResponse == null ) {
// Nothing to post to the Worker, since we do not have the worker's info.
// Rename the worker-report-file to indicate its "failure", so that the scheduler can pick it up and retry processing it.
String workerReportBaseName = this.workerReportsDirPath + File.separator + curWorkerId + File.separator + curWorkerId + "_assignments_" + curReportAssignmentsCounter + "_report";
getRenamedWorkerReport(workerReportBaseName, new File(workerReportBaseName + ".json"), "No info was found for worker: " + curWorkerId); // It may return null.
renameAndGetWorkerReportFile(workerReportBaseName, new File(workerReportBaseName + ".json"), "No info was found for worker: " + curWorkerId); // It may return null.
return false;
} else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.databaseError ) {
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Problem with the Impala-database!");
return false;
}
else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
} else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {
logger.error("Failed to get and/or upload the fullTexts for batch-assignments_" + curReportAssignmentsCounter);
// The docUrls were still found! Just update ALL the fileLocations, sizes, hashes and mimetypes, to show that the files are not available.
fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports, false);
fileUtils.removeUnretrievedFullTextsFromUrlReports(urlReports, false);
// We write only the payloads which are connected with retrieved full-texts, uploaded to S3-Object-Store.
// We continue with writing the "attempts", as we want to avoid re-checking the failed-urls later.
// The urls which give full-text (no matter if we could not get it from the worker), are flagged as "couldRetry" anyway, so they will be picked-up to be checked again later.
} else
hasFulltexts = false;
} else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.successful_without_fulltexts )
hasFulltexts = false;
else
logger.debug("Finished uploading the full-texts from batch-assignments_" + curReportAssignmentsCounter);
String localParquetPath = parquetFileUtils.parquetBaseLocalDirectoryPath + "assignments_" + curReportAssignmentsCounter + File.separator;
@ -312,8 +311,8 @@ public class UrlsServiceImpl implements UrlsService {
// Create HDFS subDirs for these assignments. Other background threads handling other assignments will not interfere with loading of parquetFiles to the DB tables.
String endingMkDirAndParams = curReportAssignmentsCounter + "/" + parquetFileUtils.mkDirsAndParams;
if ( !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingMkDirAndParams)
|| !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingMkDirAndParams) )
if ( !parquetFileUtils.applyHDFSOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingMkDirAndParams)
|| (hasFulltexts && !parquetFileUtils.applyHDFSOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingMkDirAndParams)) )
{
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Error when creating the HDFS sub-directories for assignments_" + curReportAssignmentsCounter);
return false;
@ -324,7 +323,7 @@ public class UrlsServiceImpl implements UrlsService {
DatabaseConnector.databaseLock.lock();
// Lock the DB here so the prefilled-Payloads which will be generated inside the "getTasksForCreatingAndUploadingParquetFiles()" method (using a dedicated query)
// will be synchronized with the insert of all attempt and payload records to the DB.
// will be synchronized with the insert of all attempt and payload records to the DB. This action is NOT a callable-task, so it runs during the execution of this method.
// This is important in order to avoid having workers take these records as assignments, when we know that payloads are ready to be inserted for them.
List<Callable<ParquetReport>> callableTasks = parquetFileUtils.getTasksForCreatingAndUploadingParquetFiles(urlReports, sizeOfUrlReports, curReportAssignmentsCounter, localParquetPath, uploadFullTextsResponse);
@ -334,9 +333,9 @@ public class UrlsServiceImpl implements UrlsService {
SumParquetSuccess sumParquetSuccess = parquetFileUtils.checkParquetFilesSuccess(futures);
ResponseEntity<?> errorResponseEntity = sumParquetSuccess.getResponseEntity();
if ( errorResponseEntity != null ) { // The related log is already shown.
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Error when creating or uploading the parquet files!");
if ( errorResponseEntity != null ) { // The related log is already shown in this case.
DatabaseConnector.databaseLock.unlock();
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, "Error when creating or uploading the parquet files!");
return false;
}
hasAttemptParquetFileProblem = sumParquetSuccess.isAttemptParquetFileProblem();
@ -349,17 +348,16 @@ public class UrlsServiceImpl implements UrlsService {
if ( hasAttemptParquetFileProblem )
logger.error("All of the attempt-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"attempt\", for batch-assignments_" + curReportAssignmentsCounter);
else if ( hasPayloadParquetFileProblem )
logger.error("The single payload-parquet-file failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"payload_aggregated\", for batch-assignments_" + curReportAssignmentsCounter);
logger.error("All of the payload-parquet files failed to be created or uploaded! Will avoid to execute load-requests into the database-table \"payload_aggregated\", for batch-assignments_" + curReportAssignmentsCounter);
else
logger.debug("Going to execute \"load\"-requests on the database, for the uploaded parquet-files, for batch-assignments_" + curReportAssignmentsCounter);
}
// Load all the parquet files of each type into its table.
if ( ! hasAttemptParquetFileProblem )
hasAttemptParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathAttempts + curReportAssignmentsCounter + "/", "attempt");
if ( ! hasPayloadParquetFileProblem )
if ( hasFulltexts && ! hasPayloadParquetFileProblem )
hasPayloadParquetFileProblem = ! parquetFileUtils.loadParquetDataIntoTable(parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + curReportAssignmentsCounter + "/", "payload_aggregated");
DatabaseConnector.databaseLock.unlock();
@ -369,7 +367,7 @@ public class UrlsServiceImpl implements UrlsService {
else if ( hasAttemptParquetFileProblem || hasPayloadParquetFileProblem )
logger.error("The data from the HDFS parquet sub-directories COULD NOT be loaded into the \"attempt\" or the \"payload_aggregated\" table, for batch-assignments_" + curReportAssignmentsCounter);
else
logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" and the \"payload_aggregated\" tables, for batch-assignments_" + curReportAssignmentsCounter);
logger.debug("The data from the HDFS parquet sub-directories was loaded into the \"attempt\" " + (hasFulltexts ? "and the \"payload_aggregated\" tables" : "table") + ", for batch-assignments_" + curReportAssignmentsCounter);
} catch (InterruptedException ie) { // Thrown by "insertsExecutor.invokeAll()". In this case, any unfinished tasks are cancelled.
DatabaseConnector.databaseLock.unlock();
@ -392,8 +390,8 @@ public class UrlsServiceImpl implements UrlsService {
fileUtils.deleteDirectory(new File(localParquetPath));
// Delete the HDFS subDirs for this Report.
String endingRmDirAndParams = curReportAssignmentsCounter + "/" + parquetFileUtils.rmDirsAndParams;
if ( !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingRmDirAndParams)
|| !parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingRmDirAndParams) )
if ( !parquetFileUtils.applyHDFSOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathAttempts + endingRmDirAndParams)
|| (hasFulltexts && !parquetFileUtils.applyHDFSOperation(parquetFileUtils.webHDFSBaseUrl + parquetFileUtils.parquetHDFSDirectoryPathPayloadsAggregated + endingRmDirAndParams)) )
{
logger.error("Error when deleting the HDFS sub-directories for assignments_" + curReportAssignmentsCounter); // A directory-specific log has already appeared.
// The failure to delete the assignments_subDirs is not that of a problem and should not erase the whole process. So all goes as planned (the worker deletes any remaining files).
@ -412,8 +410,8 @@ public class UrlsServiceImpl implements UrlsService {
}
// For every "numOfWorkers" assignment-batches that get processed, we merge the "attempts" and "payload_aggregated" tables.
if ( (currentNumOfWorkerReportsProcessed % UrlsController.numOfWorkers.get()) == 0 ) // The workersNum will not be zero!
if ( ! mergeWorkerRelatedTables(curWorkerId, curReportAssignmentsCounter, hasAttemptParquetFileProblem, hasPayloadParquetFileProblem) )
if ( (currentNumOfWorkerReportsProcessed % UrlsController.numOfActiveWorkers.get()) == 0 ) // The workersNum will not be zero!
if ( ! mergeWorkerRelatedTables(curWorkerId, curReportAssignmentsCounter, hasAttemptParquetFileProblem, hasPayloadParquetFileProblem, hasFulltexts) )
// The "postReportResultToWorker()" was called inside.
return false;
@ -468,7 +466,7 @@ public class UrlsServiceImpl implements UrlsService {
}
private boolean mergeWorkerRelatedTables(String curWorkerId, long curReportAssignmentsCounter, boolean hasAttemptParquetFileProblem, boolean hasPayloadParquetFileProblem)
private boolean mergeWorkerRelatedTables(String curWorkerId, long curReportAssignmentsCounter, boolean hasAttemptParquetFileProblem, boolean hasPayloadParquetFileProblem, boolean hasFulltexts)
{
logger.debug("Going to merge the parquet files for the tables which were altered, for batch-assignments_" + curReportAssignmentsCounter);
// When the uploaded parquet files are "loaded" into the tables, they are actually moved into the directory which contains the data of the table.
@ -479,7 +477,7 @@ public class UrlsServiceImpl implements UrlsService {
DatabaseConnector.databaseLock.lock();
if ( ! hasAttemptParquetFileProblem ) {
mergeErrorMsg = fileUtils.mergeParquetFiles("attempt", "", null);
mergeErrorMsg = parquetFileUtils.mergeParquetFilesOfTable("attempt", "", null);
if ( mergeErrorMsg != null ) {
DatabaseConnector.databaseLock.unlock();
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg);
@ -487,8 +485,8 @@ public class UrlsServiceImpl implements UrlsService {
}
}
if ( ! hasPayloadParquetFileProblem ) {
mergeErrorMsg = fileUtils.mergeParquetFiles("payload_aggregated", "", null);
if ( hasFulltexts && ! hasPayloadParquetFileProblem ) {
mergeErrorMsg = parquetFileUtils.mergeParquetFilesOfTable("payload_aggregated", "", null);
if ( mergeErrorMsg != null ) {
DatabaseConnector.databaseLock.unlock();
postReportResultToWorker(curWorkerId, curReportAssignmentsCounter, mergeErrorMsg);
@ -508,7 +506,7 @@ public class UrlsServiceImpl implements UrlsService {
// As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
// Only the rows referring to OTHER "givenAssignmentsBatchCounter" get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
// We don't need to keep the assignment-info forever, as the "findAssignmentsQuery" checks the "payload" table for previously handled tasks.
return fileUtils.mergeParquetFiles("assignment", " WHERE assignments_batch_counter != ", givenAssignmentsBatchCounter);
return parquetFileUtils.mergeParquetFilesOfTable("assignment", " WHERE assignments_batch_counter != ", givenAssignmentsBatchCounter);
}
@ -518,7 +516,7 @@ public class UrlsServiceImpl implements UrlsService {
// As we have non-KUDU Impala tables, the Delete operation can only succeed through a "merge" operation of the rest of the data.
// Only the rows referring to NEWER than "givenDate" get stored in a temp-table, while the "assignment" table gets deleted. Then, the temp_table becomes the "assignment" table.
// We don't need to keep the assignment-info forever, as the "findAssignmentsQuery" checks the "payload" table for previously handled tasks.
return fileUtils.mergeParquetFiles("assignment", " WHERE `date` >= ", givenDate);
return parquetFileUtils.mergeParquetFilesOfTable("assignment", " WHERE `date` >= ", givenDate);
}
@ -529,7 +527,7 @@ public class UrlsServiceImpl implements UrlsService {
// Rename the worker-report to indicate success or failure.
String workerReportBaseName = this.workerReportsDirPath + File.separator + workerId + File.separator + workerId + "_assignments_" + assignmentRequestCounter + "_report";
File workerReport = new File(workerReportBaseName + ".json");
File renamedWorkerReport = getRenamedWorkerReport(workerReportBaseName, workerReport, errorMsg); // It may return null.
File renamedWorkerReport = renameAndGetWorkerReportFile(workerReportBaseName, workerReport, errorMsg); // It may return null.
// Get the IP of this worker.
WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId);
@ -571,7 +569,7 @@ public class UrlsServiceImpl implements UrlsService {
}
private static File getRenamedWorkerReport(String workerReportBaseName, File workerReport, String errorMsg)
private static File renameAndGetWorkerReportFile(String workerReportBaseName, File workerReport, String errorMsg)
{
File renamedWorkerReport = null;
try {

View File

@ -1,7 +1,6 @@
package eu.openaire.urls_controller.util;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import eu.openaire.urls_controller.configuration.DatabaseConnector;
import eu.openaire.urls_controller.controllers.UrlsController;
@ -29,17 +28,10 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.sql.Types;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
@ -66,7 +58,7 @@ public class FileUtils {
public enum UploadFullTextsResponse {successful, unsuccessful, databaseError}
public enum UploadFullTextsResponse {successful, successful_without_fulltexts, unsuccessful}
public String baseFilesLocation;
@ -88,117 +80,11 @@ public class FileUtils {
}
/**
* In each insertion, a new parquet-file is created, so we end up with millions of files. Parquet is great for fast-select, so have to stick with it and merge those files..
* This method, creates a clone of the original table in order to have only one parquet file in the end. Drops the original table.
* Renames the clone to the original's name.
* Returns the errorMsg, if an error appears, otherwise is returns "null".
*/
public String mergeParquetFiles(String tableName, String whereClause, Object parameter) {
String errorMsg;
if ( (tableName == null) || tableName.isEmpty() ) {
errorMsg = "No tableName was given. Do not know the tableName for which we should merger the underlying files for!";
logger.error(errorMsg);
return errorMsg; // Return the error-msg to indicate that something went wrong and pass it down to the Worker.
}
// Make sure the following are empty strings.
whereClause = (whereClause != null) ? (whereClause + " ") : "";
if ( parameter == null )
parameter = "";
else if ( parameter instanceof String )
parameter = "'" + parameter + "'"; // This will be a "string-check", thus the single-quotes.
// Else it is a "long", it will be used as is.
// Create a temp-table as a copy of the initial table.
try {
jdbcTemplate.execute("CREATE TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + DatabaseConnector.databaseName + "." + tableName + " " + whereClause + parameter);
} catch (Exception e) {
errorMsg = "Problem when copying the contents of \"" + tableName + "\" table to a newly created \"" + tableName + "_tmp\" table, when merging the parquet-files!\n";
logger.error(errorMsg, e);
try { // Make sure we delete the possibly half-created temp-table.
jdbcTemplate.execute("DROP TABLE IF EXISTS " + DatabaseConnector.databaseName + "." + tableName + "_tmp PURGE");
// We cannot move on with merging, but no harm happened, since the "table_tmp" name is still reserved for future use (after it was dropped immediately)..
} catch (Exception e1) {
logger.error("Failed to drop the \"" + tableName + "_tmp\" table!", e1);
// TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and act immediately)..!
}
return errorMsg; // We return only the initial error to the Worker, which is easily distinguished indie the "merge-queries".
}
// Drop the initial table.
try {
jdbcTemplate.execute("DROP TABLE " + DatabaseConnector.databaseName + "." + tableName + " PURGE");
} catch (Exception e) {
errorMsg = "Problem when dropping the initial \"" + tableName + "\" table, when merging the parquet-files!\n";
logger.error(errorMsg, e);
// The original table could not be dropped, so the temp-table cannot be renamed to the original..!
try { // Make sure we delete the already created temp-table, in order to be able to use it in the future. The merging has failed nevertheless.
jdbcTemplate.execute("DROP TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp PURGE");
} catch (Exception e1) {
logger.error((errorMsg += "Failed to drop the \"" + tableName + "_tmp\" table!"), e1); // Add this error to the original, both are very important.
}
// Here, the original table is created.
return errorMsg;
// TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and act immediately)..!
}
// Rename the temp-table to have the initial-table's name.
try {
jdbcTemplate.execute("ALTER TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp RENAME TO " + DatabaseConnector.databaseName + "." + tableName);
} catch (Exception e) {
errorMsg = "Problem in renaming the \"" + tableName + "_tmp\" table to \"" + tableName + "\", when merging the parquet-files!\n";
logger.error(errorMsg, e);
// At this point we only have a "temp-table", the original is already deleted..
// Try to create the original, as a copy of the temp-table. If that succeeds, then try to delete the temp-table.
try {
jdbcTemplate.execute("CREATE TABLE " + DatabaseConnector.databaseName + "." + tableName + " stored as parquet AS SELECT * FROM " + DatabaseConnector.databaseName + "." + tableName + "_tmp");
} catch (Exception e1) {
errorMsg = "Problem when copying the contents of \"" + tableName + "_tmp\" table to a newly created \"" + tableName + "\" table, when merging the parquet-files!\n";
logger.error(errorMsg, e1);
// If the original table was not created, then we have to intervene manually, if it was created but without any data, then we can safely move on handling other assignments and workerReports, but the data will be lost! So this workerReport failed to be handled.
try { // The below query normally returns a list, as it takes a "regex-pattern" as an input. BUT, we give just the table name, without wildcards. So the result is either the tableName itself or none (not any other table).
jdbcTemplate.queryForObject("SHOW TABLES IN " + DatabaseConnector.databaseName + " LIKE '" + tableName + "'", List.class);
} catch (EmptyResultDataAccessException erdae) {
// The table does not exist, so it was not even half-created by the previous query.
// Not having the original table anymore is a serious error. A manual action is needed!
logger.error((errorMsg += "The original table \"" + tableName + "\" must be created manually! Serious problems may appear otherwise!"));
// TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and fix it immediately to avoid other errors in the Service)..!
}
// Here, the original-table exists in the DB, BUT without any data inside! This workerReport failed to be handled! (some of its data could not be loaded to the database, and all previous data was lost).
return errorMsg;
}
// The creation of the original table was successful. Try to delete the temp-table.
try {
jdbcTemplate.execute("DROP TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp PURGE");
} catch (Exception e2) {
logger.error((errorMsg += "Problem when dropping the \"" + tableName + "_tmp\" table, when merging the parquet-files!\n"), e2);
// Manual deletion should be performed!
return errorMsg; // Return both errors here, as the second is so important that if it did not happen then we could move on with this workerReport.
// TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and act immediately)..!
}
// Here the original table exists and the temp-table is deleted. We eventually have the same state as if the "ALTER TABLE" succeeded.
}
// Gather information to be used for queries-optimization.
try {
jdbcTemplate.execute("COMPUTE STATS " + DatabaseConnector.databaseName + "." + tableName);
} catch (Exception e) {
logger.error("Problem when gathering information from table \"" + tableName + "\" to be used for queries-optimization.", e);
// In this case the error is not so important to the whole operation.. It's only that the performance of this specific table will be less optimal, only temporarily, unless every "COMPUTE STATS" query fails for future workerReports too.
}
return null; // No errorMsg, everything is fine.
}
public static final DecimalFormat df = new DecimalFormat("0.00");
// The following regex might be useful in a future scenario. It extracts the "plain-filename" and "file-ID" and the "file-extension".
// Possible full-filenames are: "path1/path2/ID.pdf", "ID2.pdf", "path1/path2/ID(12).pdf", "ID2(25).pdf"
public static final Pattern FILENAME_ID_EXTENSION = Pattern.compile("(?:([^.()]+)/)?((([^/()]+)[^./]*)(\\.[\\w]{2,10}))$");
public static final Pattern FILEPATH_ID_EXTENSION = Pattern.compile("([^.()]+/)?((([^/()]+)[^./]*)(\\.[\\w]{2,10}))$");
private static final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames).
@ -220,114 +106,86 @@ public class FileUtils {
workerIp = workerInfo.getWorkerIP(); // This won't be null.
// Get the file-locations.
final AtomicInteger numPayloadsToBeHandled = new AtomicInteger(0);
final AtomicInteger numFullTextsFound = new AtomicInteger(0);
final AtomicInteger numFilesFoundFromPreviousAssignmentsBatches = new AtomicInteger(0);
int numValidFullTextsFound = 0;
int numFilesFoundFromPreviousAssignmentsBatches = 0;
int numFullTextsWithProblematicLocations = 0;
SetMultimap<String, Payload> allFileNamesWithPayloads = Multimaps.synchronizedSetMultimap(HashMultimap.create((sizeOfUrlReports / 5), 3)); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
HashMultimap<String, Payload> hashesWithPayloads = getHashesWithPayloads(urlReports, sizeOfUrlReports); // Holds multiple payloads for the same fileHash.
Set<String> fileHashes = hashesWithPayloads.keySet();
int fileHashesSetSize = fileHashes.size(); // Get the size of the keysSet, instead of the whole multimap.
if ( fileHashesSetSize == 0 ) {
logger.warn("No fulltexts were retrieved for assignments_" + assignmentsBatchCounter + ", from worker: \"" + workerId + "\".");
return UploadFullTextsResponse.successful_without_fulltexts; // It was handled, no error.
}
final String getFileLocationForHashQuery = "select `location` from " + DatabaseConnector.databaseName + ".payload" + (isTestEnvironment ? "_aggregated" : "") + " where `hash` = ? limit 1";
final int[] hashArgType = new int[] {Types.VARCHAR};
HashMap<String, String> hashLocationMap = getHashLocationMap(fileHashes, fileHashesSetSize, assignmentsBatchCounter, "assignments");
HashMultimap<String, Payload> allFileNamesWithPayloads = HashMultimap.create((sizeOfUrlReports / 5), 3); // Holds multiple values for any key, if a fileName(key) has many IDs(values) associated with it.
final List<Callable<Void>> callableTasks = new ArrayList<>(6);
for ( UrlReport urlReport : urlReports )
for ( String fileHash : fileHashes )
{
callableTasks.add(() -> {
Payload payload = urlReport.getPayload();
if ( payload == null )
return null;
String fileLocation = payload.getLocation();
if ( fileLocation == null )
return null; // The full-text was not retrieved, go to the next UrlReport.
numPayloadsToBeHandled.incrementAndGet();
// Query the payload-table FOR EACH RECORD to get the fileLocation of A PREVIOUS RECORD WITH THE SAME FILE-HASH.
// If no result is returned, then this record is not previously found, so go ahead and add it in the list of files to request from the worker.
// If a file-location IS returned (for this hash), then this file is already uploaded to the S3. Update the record to point to that file-location and do not request that file from the Worker.
String fileHash = payload.getHash();
if ( fileHash != null ) {
String alreadyFoundFileLocation = null;
try {
alreadyFoundFileLocation = jdbcTemplate.queryForObject(getFileLocationForHashQuery, new Object[] {fileHash}, hashArgType, String.class);
} catch (EmptyResultDataAccessException erdae) {
// No fileLocation is found, it's ok. It will be null by default.
} catch (Exception e) {
logger.error("Error when executing or acquiring data from the the \"getFileLocationForHashQuery\"!\n", e);
// TODO - SHOULD WE RETURN A "UploadFullTextsResponse.databaseError" AND force the caller to not even insert the payloads to the database??
// TODO - The idea is that since the database will have problems.. there is no point in trying to insert the payloads to Impala (we will handle it like: we tried to insert and got an error).
// Unless we do what it is said above, do not continue to the next UrlReport, this query-exception should not disrupt the normal full-text processing.
for ( Payload payload : hashesWithPayloads.get(fileHash) )
{
String alreadyFoundFileLocation = hashLocationMap.get(fileHash); // Only one location has been retrieved per fileHash.
if ( alreadyFoundFileLocation != null ) {
// Fill the payloads with locations from the "previously-found-hashes."
payload.setLocation(alreadyFoundFileLocation);
if ( logger.isTraceEnabled() )
logger.trace("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + alreadyFoundFileLocation + "\"."); // DEBUG!
numFilesFoundFromPreviousAssignmentsBatches ++;
numValidFullTextsFound ++; // We trust the location being valid..
}
else { // This file has not been found before..
// Extract the "fileNameWithExtension" to be added in the HashMultimap.
String fileLocation = payload.getLocation();
Matcher matcher = FILEPATH_ID_EXTENSION.matcher(fileLocation);
if ( ! matcher.matches() ) {
logger.error("Failed to match the \"fileLocation\": \"" + fileLocation + "\" of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILEPATH_ID_EXTENSION);
numFullTextsWithProblematicLocations ++;
continue;
}
String fileNameWithExtension = matcher.group(2);
if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) {
logger.error("Failed to extract the \"fileNameWithExtension\" from \"fileLocation\": \"" + fileLocation + "\", of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILEPATH_ID_EXTENSION);
numFullTextsWithProblematicLocations ++;
continue;
}
if ( alreadyFoundFileLocation != null ) { // If the full-text of this record is already-found and uploaded.
payload.setLocation(alreadyFoundFileLocation); // Set the location to the older identical file, which was uploaded to S3. The other file-data is identical.
if ( logger.isTraceEnabled() )
logger.trace("The record with ID \"" + payload.getId() + "\" has an \"alreadyRetrieved\" file, with hash \"" + fileHash + "\" and location \"" + alreadyFoundFileLocation + "\"."); // DEBUG!
numFilesFoundFromPreviousAssignmentsBatches.incrementAndGet();
numFullTextsFound.incrementAndGet();
return null; // Do not request the file from the worker, it's already uploaded. Move on. The "location" will be filled my the "setFullTextForMultiplePayloads()" method, later.
}
}
// Extract the "fileNameWithExtension" to be added in the HashMultimap.
Matcher matcher = FILENAME_ID_EXTENSION.matcher(fileLocation);
if ( ! matcher.matches() ) {
logger.error("Failed to match the \"fileLocation\": \"" + fileLocation + "\" of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILENAME_ID_EXTENSION);
return null;
}
String fileNameWithExtension = matcher.group(2);
if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) {
logger.error("Failed to extract the \"fileNameWithExtension\" from \"fileLocation\": \"" + fileLocation + "\", of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILENAME_ID_EXTENSION);
return null;
}
numFullTextsFound.incrementAndGet();
allFileNamesWithPayloads.put(fileNameWithExtension, payload); // The keys and the values are not duplicate.
// Task with ID-1 might have an "ID-1.pdf" file, while a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again.
return null;
});
}// end-for
DatabaseConnector.databaseLock.lock(); // The execution uses the database.
try { // Invoke all the tasks and wait for them to finish before moving to the next batch.
List<Future<Void>> futures = hashMatchingExecutor.invokeAll(callableTasks);
for ( Future<Void> future : futures ) {
try {
Void result = future.get(); // The result is always "null" as we have a "Void" type.
} catch (Exception e) {
logger.error("", e);
numValidFullTextsFound ++;
allFileNamesWithPayloads.put(fileNameWithExtension, payload); // The keys and the values are not duplicate.
// Task with ID-1 might have an "ID-1.pdf" file, while a task with ID-2 can also have an "ID-1.pdf" file, as the pdf-url-2 might be the same with pdf-url-1, thus, the ID-2 file was not downloaded again.
}
}
} catch (InterruptedException ie) { // In this case, any unfinished tasks are cancelled.
logger.warn("The current thread was interrupted when waiting for the worker-threads to finish checking for already-found file-hashes: " + ie.getMessage());
// This is a very rare case. At the moment, we just move on with what we have so far.
} catch (Exception e) {
logger.error("Unexpected error when checking for already-found file-hashes in parallel!", e);
return UploadFullTextsResponse.unsuccessful;
} finally {
DatabaseConnector.databaseLock.unlock(); // The remaining work of this function does not use the database.
}
ArrayList<String> allFileNames = new ArrayList<>(allFileNamesWithPayloads.keySet());
int numAllFullTexts = allFileNames.size();
if ( numAllFullTexts == 0 ) {
if ( numFullTextsWithProblematicLocations > 0 )
logger.warn(numFullTextsWithProblematicLocations + " files had problematic names.");
if ( numValidFullTextsFound == 0 ) {
logger.warn("No full-text files were retrieved for assignments_" + assignmentsBatchCounter + " | from worker: " + workerId);
return UploadFullTextsResponse.successful; // It was handled, no error.
return UploadFullTextsResponse.successful_without_fulltexts; // It's not what we want, but it's not an error either.
}
logger.info("NumFullTextsFound by assignments_" + assignmentsBatchCounter + " = " + numFullTextsFound.get() + " (out of " + sizeOfUrlReports + " | about " + df.format(numFullTextsFound.get() * 100.0 / sizeOfUrlReports) + "%).");
logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches.get());
ArrayList<String> allFileNames = new ArrayList<>(allFileNamesWithPayloads.keySet()); // The number of fulltexts are lower than the number of payloads, since multiple payloads may lead to the same file.
int numFullTextsToBeRequested = allFileNames.size();
if ( numFullTextsToBeRequested == 0 ) {
logger.info(numValidFullTextsFound + " fulltexts were retrieved for assignments_" + assignmentsBatchCounter + ", from worker: \"" + workerId + "\", but all of them have been retrieved before.");
return UploadFullTextsResponse.successful_without_fulltexts; // It was handled, no error.
}
logger.info("NumFullTextsFound by assignments_" + assignmentsBatchCounter + " = " + numValidFullTextsFound + " (out of " + sizeOfUrlReports + " | about " + df.format(numValidFullTextsFound * 100.0 / sizeOfUrlReports) + "%).");
// TODO - Have a prometheus GAUGE to hold the value of the above percentage, so that we can track the success-rates over time..
logger.debug("NumFilesFoundFromPreviousAssignmentsBatches = " + numFilesFoundFromPreviousAssignmentsBatches);
// Request the full-texts in batches, compressed in a zstd tar file.
int numOfBatches = (numAllFullTexts / numOfFullTextsPerBatch);
int remainingFiles = (numAllFullTexts % numOfFullTextsPerBatch);
int numOfBatches = (numFullTextsToBeRequested / numOfFullTextsPerBatch);
int remainingFiles = (numFullTextsToBeRequested % numOfFullTextsPerBatch);
if ( remainingFiles > 0 ) { // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are.
numOfBatches++;
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " distinct non-already-uploaded fullTexts (total is: " + numFullTextsFound.get() + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each, except for the final batch, which will have " + remainingFiles + " files).");
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each, except for the final batch, which will have " + remainingFiles + " files).");
} else
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " distinct non-already-uploaded fullTexts (total is: " + numFullTextsFound.get() + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each).");
logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numFullTextsToBeRequested + " distinct, non-already-uploaded fullTexts (total is: " + numValidFullTextsFound + "). Going to request them from the Worker \"" + workerId + "\", in " + numOfBatches + " batches (" + numOfFullTextsPerBatch + " files each).");
// Check if one full text is left out because of the division. Put it int the last batch.
String baseUrl = "http://" + workerIp + ":" + workerPort + "/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/";
@ -364,7 +222,7 @@ public class FileUtils {
continue;
}
List<String> fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numAllFullTexts, batchCounter);
List<String> fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numFullTextsToBeRequested, batchCounter);
String zstdFileFullPath = targetDirectory + "fullTexts_" + assignmentsBatchCounter + "_" + batchCounter + ".tar.zstd";
try {
if ( ! getAndSaveFullTextBatch(fileNamesForCurBatch, baseUrl, assignmentsBatchCounter, batchCounter, numOfBatches, zstdFileFullPath, workerId) ) {
@ -383,7 +241,7 @@ public class FileUtils {
if ( failedBatches == numOfBatches )
logger.error("None of the " + numOfBatches + " batches could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId);
updateUrlReportsToHaveNoFullTextFiles(urlReports, true); // Make sure all records without an S3-Url have < null > file-data (some batches or uploads might have failed).
removeUnretrievedFullTextsFromUrlReports(urlReports, true); // Make sure all records without an S3-Url have < null > file-data (some batches or uploads might have failed).
deleteDirectory(new File(curAssignmentsBaseLocation));
// Check and warn about the number of failed payloads.
@ -392,7 +250,7 @@ public class FileUtils {
long finalPayloadsCounter = urlReports.parallelStream()
.map(UrlReport::getPayload).filter(payload -> ((payload != null) && (payload.getLocation() != null)))
.count();
int numInitialPayloads = numPayloadsToBeHandled.get();
int numInitialPayloads = (numValidFullTextsFound + numFullTextsWithProblematicLocations);
long numFailedPayloads = (numInitialPayloads - finalPayloadsCounter);
if ( numFailedPayloads == numInitialPayloads ) {
// This will also be the case if there was no DB failure, but all the batches have failed.
@ -405,6 +263,76 @@ public class FileUtils {
}
public HashMultimap<String, Payload> getHashesWithPayloads(List<UrlReport> urlReports, int sizeOfUrlReports)
{
HashMultimap<String, Payload> hashesWithPayloads = HashMultimap.create((sizeOfUrlReports / 5), 3); // Holds multiple payloads for the same fileHash.
// The "Hash" part of the multimap helps with avoiding duplicate fileHashes.
for ( UrlReport urlReport : urlReports )
{
Payload payload = urlReport.getPayload();
if ( payload == null )
continue;
String fileLocation = payload.getLocation();
if ( fileLocation == null )
continue; // The full-text was not retrieved for this UrlReport.
// Query the payload-table FOR EACH RECORD to get the fileLocation of A PREVIOUS RECORD WITH THE SAME FILE-HASH.
// If no result is returned, then this record is not previously found, so go ahead and add it in the list of files to request from the worker.
// If a file-location IS returned (for this hash), then this file is already uploaded to the S3. Update the record to point to that file-location and do not request that file from the Worker.
String fileHash = payload.getHash();
if ( fileHash != null )
{
hashesWithPayloads.put(fileHash, payload); // Hold multiple payloads per fileHash.
// There are 2 cases, which contribute to that:
// 1) Different publication-IDs end up giving the same full-text-url, resulting in the same file. Those duplicates are not saved, but instead, the location, hash and size of the file is copied to the other payload.
// 2) Different publication-IDs end up giving different full-text-urls which point to the same file. Although very rare, in this case, the file is downloaded again by the Worker and has a different name.
// In either case, the duplicate file will not be transferred to the Controller, but in the 2nd one it takes up extra space, at least for some time.
// TODO - Implement a fileHash-check algorithm in the Worker's side ("PublicationsRetriever"), to avoid keeping those files in storage.
} else // This should never happen..
logger.error("Payload: " + payload + " has a null fileHash!");
}// end-for
return hashesWithPayloads;
}
public HashMap<String, String> getHashLocationMap(Set<String> fileHashes, int fileHashesSetSize, long batchCounter, String groupType)
{
// Prepare the "fileHashListString" to be used inside the "getHashLocationsQuery". Create the following string-pattern:
// ("HASH_1", "HASH_2", ...)
int stringBuilderCapacity = ((fileHashesSetSize * 32) + (fileHashesSetSize -1) +2);
String getHashLocationsQuery = "select distinct `hash`, `location` from " + DatabaseConnector.databaseName + ".payload where `hash` in "
+ getQueryListString(new ArrayList<>(fileHashes), fileHashesSetSize, stringBuilderCapacity);
HashMap<String, String> hashLocationMap = new HashMap<>(fileHashesSetSize/2); // No multimap is needed since only one location is returned for each fileHash.
DatabaseConnector.databaseLock.lock(); // The execution uses the database.
try {
jdbcTemplate.query(getHashLocationsQuery, rs -> {
try { // For each of the 4 columns returned, do the following. The column-indexing starts from 1.
hashLocationMap.put(rs.getString(1), rs.getString(2));
} catch (SQLException sqle) {
logger.error("No value was able to be retrieved from one of the columns of row_" + rs.getRow(), sqle);
}
});
} catch (EmptyResultDataAccessException erdae) {
logger.warn("No previously-found hash-locations where found for " + groupType + "_" + batchCounter);
} catch (Exception e) {
logger.error("Unexpected error when checking for already-found file-hashes, for " + groupType + "_" + batchCounter, e);
// We will continue with storing the files, we do not want to lose them.
} finally {
DatabaseConnector.databaseLock.unlock();
}
return hashLocationMap;
}
private boolean getAndSaveFullTextBatch(List<String> fileNamesForCurBatch, String baseUrl, long assignmentsBatchCounter, int batchCounter, int numOfBatches,
String zstdFileFullPath, String workerId) throws RuntimeException
{
@ -437,7 +365,7 @@ public class FileUtils {
// We do NOT have to find and cross-reference the missing files with the urlReports, in order to set their locations to <null>,
// since, in the end of each assignments-batch, an iteration will be made and for all the non-retrieved and non-uploaded full-texts, the app will set them to null.
}
uploadFullTexts(extractedFileNames, targetDirectory, allFileNamesWithPayloads);
uploadFullTexts(extractedFileNames, targetDirectory, allFileNamesWithPayloads, batchCounter);
return true;
} catch (Exception e) {
logger.error("Could not extract and upload the full-texts for batch_" + batchCounter + " of assignments_" + assignmentsBatchCounter + GenericUtils.endOfLine + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6).
@ -483,8 +411,10 @@ public class FileUtils {
logger.warn("Problem when getting the \"status-code\" for url: " + requestUrl);
throw new RuntimeException(); // Avoid any other batches.
} else if ( statusCode != 200 ) {
logger.warn("HTTP-" + statusCode + ": " + getMessageFromResponseBody(conn, true) + "\n\nProblem when requesting the ZstdFile of batch_" + batchNum + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl);
if ( (statusCode >= 500) && (statusCode <= 599) )
String errMsg = getMessageFromResponseBody(conn, true);
logger.warn("HTTP-" + statusCode + ": " + errMsg + "\n\nProblem when requesting the ZstdFile of batch_" + batchNum + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl);
if ( ((statusCode >= 500) && (statusCode <= 599))
|| ((statusCode == 400) && ((errMsg != null) && errMsg.contains("The base directory for assignments_" + assignmentsBatchCounter + " was not found"))) )
throw new RuntimeException(); // Throw an exception to indicate that the Worker has problems and all remaining batches will fail as well.
return null;
} else
@ -503,11 +433,11 @@ public class FileUtils {
}
private void uploadFullTexts(String[] fileNames, String targetDirectory, SetMultimap<String, Payload> allFileNamesWithPayloads)
private void uploadFullTexts(String[] fileNames, String targetDirectory, SetMultimap<String, Payload> allFileNamesWithPayloads, int batchCounter)
{
// Iterate over the files and upload them to S3.
//int numUploadedFiles = 0;
for( String fileName : fileNames )
for ( String fileName : fileNames )
{
if ( fileName.contains(".tar") ) // Exclude the tar-files from uploading (".tar" and ".tar.zstd").
continue;
@ -527,14 +457,14 @@ public class FileUtils {
// BUT, since the filename contains a specific urlID, the datasourceId should be the one related to that specific urlID.
// So, we extract this urlID, search the payload inside the "fileRelatedPayloads" and get the related datasourceID (instead of taking the first or a random datasourceID).
Matcher matcher = FILENAME_ID_EXTENSION.matcher(fileName);
Matcher matcher = FILEPATH_ID_EXTENSION.matcher(fileName);
if ( !matcher.matches() ) {
logger.error("Failed to match the \"" + fileName + "\" with the regex: " + FILENAME_ID_EXTENSION);
logger.error("Failed to match the \"" + fileName + "\" with the regex: " + FILEPATH_ID_EXTENSION);
continue;
}
// The "matcher.group(3)" returns the "filenameWithoutExtension", which is currently not used.
// Use the "fileNameID" and not the "filenameWithoutExtension", as we want to avoid keeping the possible "parenthesis" with the increasing number (about the duplication of ID-fileName).
String fileNameID = matcher.group(4);
String fileNameID = matcher.group(4); // The "fileNameID" is the OpenAIRE_ID for this file.
if ( (fileNameID == null) || fileNameID.isEmpty() ) {
logger.error("Failed to extract the \"fileNameID\" from \"" + fileName + "\".");
continue;
@ -578,7 +508,7 @@ public class FileUtils {
//numUploadedFiles ++;
}
} catch (Exception e) {
logger.error("Avoid uploading the rest of the files of batch..");
logger.error("Avoid uploading the rest of the files of batch_" + batchCounter + " | " + e.getMessage());
break;
}
// Else, the record will have its file-data set to "null", in the end of the caller method (as it will not have an s3Url as its location).
@ -589,14 +519,14 @@ public class FileUtils {
}
public String constructS3FilenameAndUploadToS3(String targetDirectory, String fileName, String fileNameID,
public String constructS3FilenameAndUploadToS3(String targetDirectory, String fileName, String openAireId,
String dotFileExtension, String datasourceId, String hash) throws ConnectException, UnknownHostException
{
String filenameForS3 = constructS3FileName(fileName, fileNameID, dotFileExtension, datasourceId, hash); // This name is for the uploaded file, in the S3 Object Store.
String filenameForS3 = constructS3FileName(fileName, openAireId, dotFileExtension, datasourceId, hash); // This name is for the uploaded file, in the S3 Object Store.
if ( filenameForS3 == null ) // The error is logged inside.
return null;
String fileFullPath = targetDirectory + File.separator + fileName; // The fullPath to the local file (which has the previous name).
String fileFullPath = targetDirectory + fileName; // The fullPath to the local file (which has the previous name).
String s3Url = null;
try {
s3Url = s3ObjectStore.uploadToS3(filenameForS3, fileFullPath);
@ -685,7 +615,7 @@ public class FileUtils {
public static final int twentyFiveKb = 25_600; // 25 Kb
public static final int halfMb = 524_288; // 0.5 Mb = 512 Kb
public static final int halfMb = 524_288; // 0.5 Mb = 512 Kb = 524_288 bytes
public static final int tenMb = (10 * 1_048_576);
public boolean saveArchive(HttpURLConnection conn, File zstdFile)
@ -712,7 +642,7 @@ public class FileUtils {
* @param urlReports
* @param shouldCheckAndKeepS3UploadedFiles
*/
public void updateUrlReportsToHaveNoFullTextFiles(List<UrlReport> urlReports, boolean shouldCheckAndKeepS3UploadedFiles)
public void removeUnretrievedFullTextsFromUrlReports(List<UrlReport> urlReports, boolean shouldCheckAndKeepS3UploadedFiles)
{
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport.getPayload();
@ -766,22 +696,17 @@ public class FileUtils {
// Prepare the "urlsToRetrieveRelatedIDs" to be used inside the "getDataForPayloadPrefillQuery". Create the following string-pattern: ("URL_1", "URL_2", ...)
int urlsToRetrieveRelatedIDsSize = urlsToRetrieveRelatedIDs.size();
StringBuilder relatedIDsStringBuilder = new StringBuilder(urlsToRetrieveRelatedIDsSize * 100);
relatedIDsStringBuilder.append("(");
for ( int i=0; i < urlsToRetrieveRelatedIDsSize; ++i ) {
relatedIDsStringBuilder.append("\"").append(urlsToRetrieveRelatedIDs.get(i)).append("\"");
if ( i < (urlsToRetrieveRelatedIDsSize -1) )
relatedIDsStringBuilder.append(", ");
}
relatedIDsStringBuilder.append(")");
int stringBuilderCapacity = (urlsToRetrieveRelatedIDsSize * 100);
// Get the id and url of any
String getDataForPayloadPrefillQuery = "select distinct pu.id, pu.url\n" +
"from " + DatabaseConnector.databaseName + ".publication_urls pu\n" +
// Exclude the "already-processed" pairs.
"left anti join " + DatabaseConnector.databaseName + ".attempt a on a.id=pu.id and a.original_url=pu.url\n" +
"left anti join " + DatabaseConnector.databaseName + ".payload p on p.id=pu.id and p.original_url=pu.url\n" +
"left anti join " + DatabaseConnector.databaseName + ".assignment asgn on asgn.id=pu.id and asgn.original_url=pu.url\n" +
"where pu.url in " + relatedIDsStringBuilder;
// Limit the urls to the ones matching to the payload-urls found for the current assignments.
"where pu.url in " + getQueryListString(urlsToRetrieveRelatedIDs, urlsToRetrieveRelatedIDsSize, stringBuilderCapacity);
//logger.trace("getDataForPayloadPrefillQuery:\n" + getDataForPayloadPrefillQuery);
@ -798,6 +723,10 @@ public class FileUtils {
return; // Move to the next payload.
}
Set<Payload> foundPayloads = urlToPayloadsMultimap.get(original_url);
if ( foundPayloads == null ) {
logger.error("No payloads associated with \"original_url\" = \"" + original_url + "\"!");
return;
}
// Select a random "foundPayload" to use its data to fill the "prefilledPayload" (in a "Set" the first element is pseudo-random).
Optional<Payload> optPayload = foundPayloads.stream().findFirst();
@ -805,7 +734,7 @@ public class FileUtils {
logger.error("Could not retrieve any payload for the \"original_url\": " + original_url);
return; // Move to the next payload.
}
Payload prefilledPayload = optPayload.get(); // We change just the id and the original_url.
Payload prefilledPayload = optPayload.get().clone(); // We take a clone of the original payload and change just the id and the original_url.
prefilledPayload.setId(id);
prefilledPayload.setOriginal_url(original_url);
prefilledPayloads.add(prefilledPayload);
@ -887,6 +816,9 @@ public class FileUtils {
if ( shouldLockThreads ) // In case multiple threads write to the same file. for ex. during the bulk-import procedure.
fileAccessLock.lock();
// TODO - Make this method to be synchronized be specific file, not in general.
// TODO - NOW: Multiple bulkImport procedures (with diff DIRs), are blocked while writing to DIFFERENT files..
try ( BufferedWriter bufferedWriter = new BufferedWriter(Files.newBufferedWriter(Paths.get(fileFullPath)), halfMb) )
{
bufferedWriter.write(stringToWrite); // This will overwrite the file. If the new string is smaller, then it does not matter.
@ -901,4 +833,17 @@ public class FileUtils {
return null;
}
public static String getQueryListString(List<String> list, int fileHashesListSize, int stringBuilderCapacity) {
StringBuilder sb = new StringBuilder(stringBuilderCapacity);
sb.append("(");
for ( int i=0; i < fileHashesListSize; ++i ) {
sb.append("\"").append(list.get(i)).append("\"");
if ( i < (fileHashesListSize -1) )
sb.append(", ");
}
sb.append(")");
return sb.toString();
}
}

View File

@ -1,13 +1,19 @@
package eu.openaire.urls_controller.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest;
import java.text.SimpleDateFormat;
import java.util.Date;
public class GenericUtils {
private static final Logger logger = LoggerFactory.getLogger(GenericUtils.class);
public static final String endOfLine = "\n";
public static final String tab = "\t";
private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS z");
@ -22,15 +28,30 @@ public class GenericUtils {
}
public static String getSelectedStackTraceForCausedException(Throwable thr, String firstMessage, String additionalMessage, int numOfLines)
{
// The stacktrace of the "ExecutionException" is the one of the current code and not the code which ran inside the background-task. Try to get the cause.
Throwable causedThrowable = thr.getCause();
if ( causedThrowable == null ) {
logger.warn("No cause was retrieved for the \"ExecutionException\"!");
causedThrowable = thr;
}
String initialMessage = firstMessage + causedThrowable.getMessage() + ((additionalMessage != null) ? additionalMessage : "");
return getSelectiveStackTrace(causedThrowable, initialMessage, numOfLines);
}
public static String getSelectiveStackTrace(Throwable thr, String initialMessage, int numOfLines)
{
StackTraceElement[] stels = thr.getStackTrace();
StringBuilder sb = new StringBuilder(numOfLines *100);
StringBuilder sb = new StringBuilder(numOfLines *100); // This StringBuilder is thread-safe as a local-variable.
if ( initialMessage != null )
sb.append(initialMessage).append(" | Stacktrace:").append(GenericUtils.endOfLine); // This StringBuilder is thread-safe as a local-variable.
for ( int i = 0; (i < stels.length) && (i <= numOfLines); ++i ) {
sb.append(stels[i]);
if (i < numOfLines) sb.append(GenericUtils.endOfLine);
sb.append(initialMessage).append(endOfLine);
sb.append("Stacktrace:").append(endOfLine);
for ( int i = 0; (i < stels.length) && (i < numOfLines); ++i ) {
sb.append(tab).append(stels[i]);
if (i < (numOfLines -1)) sb.append(endOfLine);
}
return sb.toString();
}

View File

@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.ClassPathResource;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.jdbc.core.JdbcTemplate;
@ -43,6 +44,8 @@ import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
@ -177,7 +180,7 @@ public class ParquetFileUtils {
int numInitialPayloads = initialPayloads.size();
if ( numInitialPayloads > 0 ) // If at least 1 payload was created by the processed records..
{ // (it's ok to have no payloads, if there were no full-texts available)
// At this point we know there was no problem with the full-texts, but we do not know if at least one full-text was retrieved.
// At this point we know there was no problem with the full-texts, but we do not know if at least one full-text was retrieved FROM the worker.
if ( (payloadsSchema == null) // Parse the schema if it's not already parsed.
&& ((payloadsSchema = parseSchema(payloadSchemaFilePath)) == null ) ) {
logger.error("Nothing can be done without the payloadsSchema! Exiting.."); // The cause is already logged inside the above method.
@ -189,11 +192,14 @@ public class ParquetFileUtils {
// In case the above method returns an error, nothing happens. We just have only the initial payloads to insert to the DB.
int sizeOfEachSubList = (int)(sizeOfUrlReports * 0.33); // We want 3 sub-lists for the payloads.
if ( sizeOfEachSubList == 0 ) // If the "sizeOfUrlReports" is <= 3.
sizeOfEachSubList = 1;
// There may be 1 more with very few elements, due to non-persisted splitting. Unfortunately, we cannot st the number of splits, only the size of most splits.
if ( sizeOfEachSubList > 10 ) {
List<List<UrlReport>> finalSubLists = Lists.partition(urlReports, sizeOfEachSubList); // This needs the "sizeOfEachSubList" to be above < 0 >.
int numSubListsPayload = finalSubLists.size();
// We will run <numSubListsPayload> tasks for the payloads.
// Since the payloads are not evenly destributed in the urlReports, there may be some sub-lists of urlReports without any payloads inside.
for ( int i = 0; i < numSubListsPayload; ++i ) {
int finalI = i;
callableTasks.add(() -> { // Handle inserts to the "payload" table. Around 20% of the total amount.
@ -268,7 +274,7 @@ public class ParquetFileUtils {
int recordsSize = recordList.size();
if ( recordsSize == 0 ) {
logger.warn("No attempts are available to be inserted to the database!");
logger.error("No attempt-parquet-records could be created in order to be inserted to the database!");
return false;
}
@ -294,6 +300,7 @@ public class ParquetFileUtils {
{
List<GenericData.Record> recordList = new ArrayList<>((int) (urlReports.size() * 0.2));
GenericData.Record record;
int numPayloadsInsideUrlReports = 0;
for ( UrlReport urlReport : urlReports )
{
@ -307,6 +314,8 @@ public class ParquetFileUtils {
if ( fileLocation == null ) // We want only the records with uploaded full-texts in the "payload" table.
continue;
numPayloadsInsideUrlReports ++;
Timestamp timestamp = payload.getTimestamp_acquired();
record = getPayloadParquetRecord(payload.getId(), payload.getOriginal_url(), payload.getActual_url(),
(timestamp != null) ? timestamp.getTime() : System.currentTimeMillis(),
@ -316,9 +325,12 @@ public class ParquetFileUtils {
recordList.add(record);
}
if ( numPayloadsInsideUrlReports == 0 )
return true; // This urlsReports-sublist does not have any payloads inside to use. That's fine.
int recordsSize = recordList.size();
if ( recordsSize == 0 ) {
logger.warn("No payloads are available to be inserted to the database!");
logger.error("No payload-parquet-records could be created in order to be inserted to the database!");
return false;
}
@ -558,9 +570,9 @@ public class ParquetFileUtils {
if ( statusCode == 404 ) {
logger.info(initErrMsg + "\"" + parquetBaseRemoteDirectory + "\" does not exist. We will create it, along with its sub-directories.");
attemptCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsAndParams);
payloadAggregatedCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsAndParams);
payloadBulkImportCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsAndParams);
attemptCreationSuccessful = applyHDFSOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsAndParams);
payloadAggregatedCreationSuccessful = applyHDFSOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsAndParams);
payloadBulkImportCreationSuccessful = applyHDFSOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsAndParams);
return (attemptCreationSuccessful && payloadAggregatedCreationSuccessful && payloadBulkImportCreationSuccessful);
// We need all directories to be created in order for the app to function properly!
}
@ -617,19 +629,19 @@ public class ParquetFileUtils {
// For each missing subdirectories, run the mkDirs-request.
if ( !foundAttemptsDir ) {
logger.debug(initErrMsg + "\"" + parquetHDFSDirectoryPathAttempts + "\" does not exist! Going to create it.");
attemptCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsAndParams);
attemptCreationSuccessful = applyHDFSOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsAndParams);
} else
logger.info(initErrMsg + "\"" + parquetHDFSDirectoryPathAttempts + "\" exists.");
if ( !foundPayloadsAggregatedDir ) {
logger.debug(initErrMsg + "\"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" does not exist! Going to create it.");
payloadAggregatedCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsAndParams);
payloadAggregatedCreationSuccessful = applyHDFSOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsAndParams);
} else
logger.info(initErrMsg + "\"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" exists.");
if ( !foundPayloadsBulkImportDir ) {
logger.debug(initErrMsg + "\"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" does not exist! Going to create it.");
payloadBulkImportCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsAndParams);
payloadBulkImportCreationSuccessful = applyHDFSOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsAndParams);
} else
logger.info(initErrMsg + "\"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" exists.");
@ -642,7 +654,7 @@ public class ParquetFileUtils {
}
public boolean applyHDFOperation(String hdfsOperationUrl)
public boolean applyHDFSOperation(String hdfsOperationUrl)
{
try {
URL url = new URL(hdfsOperationUrl);
@ -679,11 +691,11 @@ public class ParquetFileUtils {
int numOfAllPayloadParquetFileCreations = 0;
int numOfFailedPayloadParquetFileCreations = 0;
for ( Future<ParquetReport> future : futures )
int sizeOfFutures = futures.size();
for ( int i = 0; i < sizeOfFutures; ++i )
{
ParquetReport parquetReport = null;
try {
parquetReport = future.get();
ParquetReport parquetReport = futures.get(i).get();
boolean hasProblems = (! parquetReport.isSuccessful());
ParquetReport.ParquetType parquetType = parquetReport.getParquetType();
if ( parquetType.equals(ParquetReport.ParquetType.attempt) ) {
@ -699,11 +711,17 @@ public class ParquetFileUtils {
logger.error(errMsg);
return new SumParquetSuccess(false, false, ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errMsg));
}
} catch (Exception e) {
logger.error("", e);
// We do not know if the failed "future" refers to a "payload" or to a "attempt".
// So we cannot increase a specific counter. That's ok, the only drawback if that we may try to "load" the non-existent data and get an exception.
} catch (ExecutionException ee) { // These can be serious errors like an "out of memory exception" (Java HEAP).
logger.error(GenericUtils.getSelectedStackTraceForCausedException(ee, "Parquet_task_" + i + " failed with: ", null, 15));
} catch (CancellationException ce) {
logger.error("Parquet_task_" + i + " was cancelled: " + ce.getMessage());
} catch (InterruptedException ie) {
logger.error("Parquet_task_" + i + " was interrupted: " + ie.getMessage());
} catch (IndexOutOfBoundsException ioobe) {
logger.error("IOOBE for parquet_task_" + i + " in the futures-list! " + ioobe.getMessage());
}
// We do not know if the failed "future" refers to a "payload" or to a "attempt".
// So we cannot increase a specific counter. That's ok, the only drawback if that we may try to "load" the non-existent data and get an exception.
} // End-for
boolean hasAttemptParquetFileProblem = (numOfFailedAttemptParquetFileCreations == numOfAllAttemptParquetFileCreations);
@ -712,4 +730,110 @@ public class ParquetFileUtils {
return new SumParquetSuccess(hasAttemptParquetFileProblem, hasPayloadParquetFileProblem, null);
}
/**
* In each insertion, a new parquet-file is created, so we end up with millions of files. Parquet is great for fast-select, so have to stick with it and merge those files..
* This method, creates a clone of the original table in order to have only one parquet file in the end. Drops the original table.
* Renames the clone to the original's name.
* Returns the errorMsg, if an error appears, otherwise is returns "null".
*/
public String mergeParquetFilesOfTable(String tableName, String whereClause, Object parameter) {
String errorMsg;
if ( (tableName == null) || tableName.isEmpty() ) {
errorMsg = "No tableName was given. Do not know the tableName for which we should merger the underlying files for!";
logger.error(errorMsg);
return errorMsg; // Return the error-msg to indicate that something went wrong and pass it down to the Worker.
}
// Make sure the following are empty strings.
whereClause = (whereClause != null) ? (whereClause + " ") : "";
if ( parameter == null )
parameter = "";
else if ( parameter instanceof String )
parameter = "'" + parameter + "'"; // This will be a "string-check", thus the single-quotes.
// Else it is a "long", it will be used as is.
// Create a temp-table as a copy of the initial table.
try {
jdbcTemplate.execute("CREATE TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + DatabaseConnector.databaseName + "." + tableName + " " + whereClause + parameter);
} catch (Exception e) {
errorMsg = "Problem when copying the contents of \"" + tableName + "\" table to a newly created \"" + tableName + "_tmp\" table, when merging the parquet-files!\n";
logger.error(errorMsg, e);
try { // Make sure we delete the possibly half-created temp-table.
jdbcTemplate.execute("DROP TABLE IF EXISTS " + DatabaseConnector.databaseName + "." + tableName + "_tmp PURGE");
// We cannot move on with merging, but no harm happened, since the "table_tmp" name is still reserved for future use (after it was dropped immediately)..
} catch (Exception e1) {
logger.error("Failed to drop the \"" + tableName + "_tmp\" table!", e1);
// TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and act immediately)..!
}
return errorMsg; // We return only the initial error to the Worker, which is easily distinguished indie the "merge-queries".
}
// Drop the initial table.
try {
jdbcTemplate.execute("DROP TABLE " + DatabaseConnector.databaseName + "." + tableName + " PURGE");
} catch (Exception e) {
errorMsg = "Problem when dropping the initial \"" + tableName + "\" table, when merging the parquet-files!\n";
logger.error(errorMsg, e);
// The original table could not be dropped, so the temp-table cannot be renamed to the original..!
try { // Make sure we delete the already created temp-table, in order to be able to use it in the future. The merging has failed nevertheless.
jdbcTemplate.execute("DROP TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp PURGE");
} catch (Exception e1) {
logger.error((errorMsg += "Failed to drop the \"" + tableName + "_tmp\" table!"), e1); // Add this error to the original, both are very important.
}
// Here, the original table is created.
return errorMsg;
// TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and act immediately)..!
}
// Rename the temp-table to have the initial-table's name.
try {
jdbcTemplate.execute("ALTER TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp RENAME TO " + DatabaseConnector.databaseName + "." + tableName);
} catch (Exception e) {
errorMsg = "Problem in renaming the \"" + tableName + "_tmp\" table to \"" + tableName + "\", when merging the parquet-files!\n";
logger.error(errorMsg, e);
// At this point we only have a "temp-table", the original is already deleted..
// Try to create the original, as a copy of the temp-table. If that succeeds, then try to delete the temp-table.
try {
jdbcTemplate.execute("CREATE TABLE " + DatabaseConnector.databaseName + "." + tableName + " stored as parquet AS SELECT * FROM " + DatabaseConnector.databaseName + "." + tableName + "_tmp");
} catch (Exception e1) {
errorMsg = "Problem when copying the contents of \"" + tableName + "_tmp\" table to a newly created \"" + tableName + "\" table, when merging the parquet-files!\n";
logger.error(errorMsg, e1);
// If the original table was not created, then we have to intervene manually, if it was created but without any data, then we can safely move on handling other assignments and workerReports, but the data will be lost! So this workerReport failed to be handled.
try { // The below query normally returns a list, as it takes a "regex-pattern" as an input. BUT, we give just the table name, without wildcards. So the result is either the tableName itself or none (not any other table).
jdbcTemplate.queryForObject("SHOW TABLES IN " + DatabaseConnector.databaseName + " LIKE '" + tableName + "'", List.class);
} catch (EmptyResultDataAccessException erdae) {
// The table does not exist, so it was not even half-created by the previous query.
// Not having the original table anymore is a serious error. A manual action is needed!
logger.error((errorMsg += "The original table \"" + tableName + "\" must be created manually! Serious problems may appear otherwise!"));
// TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and fix it immediately to avoid other errors in the Service)..!
}
// Here, the original-table exists in the DB, BUT without any data inside! This workerReport failed to be handled! (some of its data could not be loaded to the database, and all previous data was lost).
return errorMsg;
}
// The creation of the original table was successful. Try to delete the temp-table.
try {
jdbcTemplate.execute("DROP TABLE " + DatabaseConnector.databaseName + "." + tableName + "_tmp PURGE");
} catch (Exception e2) {
logger.error((errorMsg += "Problem when dropping the \"" + tableName + "_tmp\" table, when merging the parquet-files!\n"), e2);
// Manual deletion should be performed!
return errorMsg; // Return both errors here, as the second is so important that if it did not happen then we could move on with this workerReport.
// TODO - Should we shutdown the service? It is highly unlikely that anyone will observe this error live (and act immediately)..!
}
// Here the original table exists and the temp-table is deleted. We eventually have the same state as if the "ALTER TABLE" succeeded.
}
// Gather information to be used for queries-optimization.
try {
jdbcTemplate.execute("COMPUTE STATS " + DatabaseConnector.databaseName + "." + tableName);
} catch (Exception e) {
logger.error("Problem when gathering information from table \"" + tableName + "\" to be used for queries-optimization.", e);
// In this case the error is not so important to the whole operation.. It's only that the performance of this specific table will be less optimal, only temporarily, unless every "COMPUTE STATS" query fails for future workerReports too.
}
return null; // No errorMsg, everything is fine.
}
}

View File

@ -48,8 +48,8 @@ public class S3ObjectStore {
boolean bucketExists = minioClient.bucketExists(BucketExistsArgs.builder().bucket(bucketName).build());
// Keep this commented-out to avoid objects-deletion by accident. The code is open-sourced, so it's easy to enable this ability if we really want it (e.g. for testing).
if ( shouldEmptyBucket && isTestEnvironment && bucketExists )
emptyBucket(bucketName, false);
/*if ( shouldEmptyBucket && isTestEnvironment && bucketExists )
emptyBucket(bucketName, false);*/
// Make the bucket, if not exist.
if ( !bucketExists ) {
@ -117,30 +117,57 @@ public class S3ObjectStore {
}
public void emptyBucket(String bucketName, boolean shouldDeleteBucket) throws Exception {
public void emptyBucket(String bucketName, boolean shouldDeleteBucket)
{
logger.warn("Going to " + (shouldDeleteBucket ? "delete" : "empty") + " bucket \"" + bucketName + "\"!");
// First list the objects of the bucket.
Iterable<Result<Item>> results = minioClient.listObjects(ListObjectsArgs.builder().bucket(bucketName).build());
Iterable<Result<Item>> results;
try {
results = minioClient.listObjects(ListObjectsArgs.builder().bucket(bucketName).build());
} catch (Exception e) {
logger.error("Could not retrieve the list of objects of bucket \"" + bucketName + "\"!");
return;
}
int countDeletedFiles = 0;
int countFilesNotDeleted = 0;
long totalSize = 0;
Item item;
// Then, delete the objects.
for ( Result<Item> resultItem : results ) {
try {
if ( !deleteFile(resultItem.get().objectName(), bucketName) ) {
logger.error("Cannot proceed with bucket deletion, since only an empty bucket can be removed!");
return;
}
item = resultItem.get();
} catch (Exception e) {
logger.warn("Could not remove " + resultItem.get().objectName());
logger.error("Could not get the item-object of one of the S3-Objects returned from the bucket!", e);
countFilesNotDeleted ++;
continue;
}
totalSize += item.size();
if ( !deleteFile(item.objectName(), bucketName) ) { // The reason and for what object, is already logged.
logger.error("Cannot proceed with bucket deletion, since only an empty bucket can be removed!");
countFilesNotDeleted ++;
} else
countDeletedFiles ++;
}
if ( shouldDeleteBucket ) {
// Lastly, delete the empty bucket.
minioClient.removeBucket(RemoveBucketArgs.builder().bucket(bucketName).build());
}
if ( countFilesNotDeleted == 0 ) {
// Lastly, delete the empty bucket. We need to do this last, as in case it's not empty, we get an error!
try {
minioClient.removeBucket(RemoveBucketArgs.builder().bucket(bucketName).build());
logger.info("Bucket \"" + bucketName + "\" was deleted!");
} catch (Exception e) {
logger.error("Bucket \"" + bucketName + "\" could not be deleted!", e);
}
} else
logger.error("Cannot execute the \"removeBucket\" command for bucket \"" + bucketName + "\", as " + countFilesNotDeleted + " files failed to be deleted!");
} else
logger.info("Bucket \"" + bucketName + "\" was emptied!");
logger.info("Bucket " + bucketName + " was " + (shouldDeleteBucket ? "deleted!" : "emptied!"));
logger.info(countDeletedFiles + " files were deleted, amounting to " + ((totalSize/1024)/1024) + " MB.");
}

View File

@ -15,7 +15,7 @@ services:
db:
initialDatabaseName: pdfaggregation_i
testDatabaseName: pdfaggregationdatabase_payloads_view_test
testDatabaseName: pdfaggregationdatabase_test_new
assignmentLimit: 10000
maxAttemptsPerRecord: 3
@ -32,7 +32,7 @@ services:
secretKey: XA
region: XA
bucketName: XA
shouldEmptyBucket: false
shouldEmptyBucket: false # For data-security, in order to use this ability, the relevant code has to be uncommented in "util.S3ObjectStore.java"
shouldShowAllS3Buckets: true
worker:
port: 1881
@ -58,7 +58,7 @@ bulk-import:
# For "authoritative" sources, a special prefix is selected, from: https://graph.openaire.eu/docs/data-model/pids-and-identifiers/#identifiers-in-the-graph
# For the rest, the "datasource_prefix" is selected, using this query:
# select datasource.namespaceprefix.value
# from openaire_prod_20230414.datasource -- Here use the latest production-table.
# from openaire_prod_<PROD_DATE>.datasource -- Here use the production-table with the latest date.
# where officialname.value = 'datasourceOfficialName';
@ -116,6 +116,8 @@ management:
application: ${spring.application.name}
logging:
file:
path: logs
level:
root: INFO
eu:

View File

@ -1,10 +1,10 @@
<configuration debug="false">
<appender name="RollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>logs/UrlsController.log</file>
<file>${LOG_PATH}/UrlsController.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>logs/UrlsController.%i.log.zip</fileNamePattern>
<fileNamePattern>${LOG_PATH}/UrlsController.%i.log.zip</fileNamePattern>
<minIndex>1</minIndex>
<maxIndex>10</maxIndex>
</rollingPolicy>