Compare commits

...

89 Commits

Author SHA1 Message Date
Spyros Zoupanos cfea668999 Duplicate elimination from usage_stats, views_stats and downloads_stats tables 2020-10-25 13:07:05 +02:00
Spyros Zoupanos 6ff5436991 Changes for better connection management. Now we can keep the connection alive all the time or to create a new connection every time that we request the connection object 2020-10-25 11:27:37 +02:00
Spyros Zoupanos 8b591937e1 Small correction for views stats generation 2020-10-23 15:24:33 +03:00
Spyros Zoupanos 6b5b6796b7 Small corrections for problems that Dimitris found and flag for the number of threads 2020-10-20 20:30:26 +03:00
Spyros Zoupanos 1ca74ce830 Better file re-organisation for piwiklogs 2020-10-20 18:05:03 +03:00
Spyros Zoupanos 7fdf994eb6 Multithreaded download for piwiklogs 2020-10-19 22:40:02 +03:00
Spyros Zoupanos 6cc58e2720 Minor corrections to avoid empty replies exceptions 2020-10-15 22:08:36 +03:00
Spyros Zoupanos a1dc930486 More control on directory cleanup 2020-10-15 17:41:36 +03:00
Spyros Zoupanos 8f24a6388e Corrections on final steps 2020-10-10 14:25:20 +03:00
Spyros Zoupanos 8826684130 Corrections for Sarc stats. Heavy queries run on Impala. 2020-10-10 12:06:31 +03:00
Spyros Zoupanos d9cc70d334 Minor corrections to Impala queries 2020-10-10 10:59:05 +03:00
Spyros Zoupanos a852dd3a0d Making tables visible to Impala 2020-10-09 22:28:45 +03:00
Spyros Zoupanos dd9df4ae58 Modifications to SARC stats 2020-10-09 21:48:52 +03:00
Spyros Zoupanos 8b08f35dfe Corrections for Irus stats after discussing with Dimitris 2020-10-08 19:15:01 +03:00
Spyros Zoupanos 715bbd487d Flags & limiters for Sarc stats 2020-10-08 00:24:58 +03:00
Spyros Zoupanos 2de17e7f32 Corrections for irus stats 2020-10-07 23:28:09 +03:00
Spyros Zoupanos 8da64d8f54 Adding flags and time limits to Irus stats 2020-10-06 23:48:31 +03:00
Spyros Zoupanos 9a1512004f Corrections for LaReferencia ending period 2020-10-05 19:09:31 +03:00
Spyros Zoupanos 915b3287a8 Minor corrections to LaReferencia stats 2020-10-04 20:01:40 +03:00
Spyros Zoupanos 806a450465 Flags for La Referencia logs 2020-10-04 19:16:42 +03:00
Spyros Zoupanos 48d6bf28eb More control parameters and limits on the lareferncia download 2020-10-04 17:03:01 +03:00
Spyros Zoupanos 2b330dd84c Corrections for correct download of OpenAIRE logs - Date limits 2020-10-04 10:19:44 +03:00
Spyros Zoupanos 7b7075cfdd Changes for proper log downloading (limits on starting and ending period) + loggers to STDOUT 2020-10-04 00:24:55 +03:00
Spyros Zoupanos e2748fea95 Download directory automatic deletion & creation 2020-10-03 19:47:57 +03:00
Spyros Zoupanos 2e2e2b8b29 Passing via properties the DB parameters 2020-10-02 19:20:31 +03:00
Spyros Zoupanos 07e750939f Adding correct logger everywhere, cleaning code, removing sysouts 2020-10-02 16:25:21 +03:00
Spyros Zoupanos c6bbe215e1 Code cleaning and re-organisation 2020-10-02 14:45:19 +03:00
Spyros Zoupanos c1257ac7c5 Correct parameter parsing 2020-10-01 23:24:40 +03:00
Spyros Zoupanos dc6114a24e Introducing impala connections and using the correct connection string 2020-09-27 13:19:45 +03:00
Spyros Zoupanos 73656f7f31 More corrections on the portalStats queries 2020-09-26 11:18:03 +03:00
Spyros Zoupanos a497b19b21 More corrections on the portalStats queries 2020-09-26 10:47:08 +03:00
Spyros Zoupanos bc5cf28375 Changing portalStats queries to faster ones 2020-09-23 22:22:43 +03:00
Spyros Zoupanos 69640f5fc4 Finished LaReferencia updateProdTables. WF rewriting finished. Needs optimization 2020-09-21 23:26:55 +03:00
Spyros Zoupanos 1ceb363cb2 lareferencia viewsStats & downloadsStats finished 2020-09-21 23:16:10 +03:00
Spyros Zoupanos 65acece7c4 lareferencia removeDoubleClicks done 2020-09-21 22:27:15 +03:00
Spyros Zoupanos 0369f36776 processlaReferenciaLog finished 2020-09-20 21:16:01 +03:00
Spyros Zoupanos 3c11acde0c More progress on processlaReferenciaLog 2020-09-20 15:07:09 +03:00
Spyros Zoupanos a2d64b4644 Added lareferencialogtmp_json table creation 2020-09-20 14:03:16 +03:00
Spyros Zoupanos 373f4fdbd8 Better organisation of downloaded logs 2020-09-20 12:56:04 +03:00
Spyros Zoupanos 8a39ec44e0 More logging messages - Code needs cleaning. Duplicate code for LaReferencia table creation. Should it at one place. The same for various methods that are used for the JSON downloading in various classes like getJson 2020-09-20 11:27:27 +03:00
Spyros Zoupanos 2b2bac9b28 More progress on LaReFerenciaLogs 2020-09-20 00:59:33 +03:00
Spyros Zoupanos 053588c365 Adding lareferencia initial files 2020-09-20 00:00:59 +03:00
Spyros Zoupanos 2e701c547d Finished Sarc Stats 2020-09-19 23:43:26 +03:00
Spyros Zoupanos b3d51a954a Deleting the not needed commented out code 2020-09-19 22:11:40 +03:00
Spyros Zoupanos ed4e9f46d9 Changes to create properly directories for downloaded files 2020-09-19 22:04:42 +03:00
Spyros Zoupanos 03fb2b9e01 Sarc stats almost finished. Have to look at sarcStats() method - INSERT INTO downloads_stats 2020-09-17 22:19:10 +03:00
Spyros Zoupanos 2ae67cfdba Creation of Sarc JSON tables 2020-09-16 21:51:50 +03:00
Spyros Zoupanos 958fb1a343 Creation of Sarc JSON tables 2020-09-16 21:46:32 +03:00
Spyros Zoupanos 1dcb197f02 Renamings on Sarcs and deletion of empty files (to be double checked) 2020-09-16 21:28:05 +03:00
Spyros Zoupanos 17f2748eb4 Merge branch 'usage-stats-export-wf' of code-repo.d4science.org:spyros/dnet-hadoop into usage-stats-export-wf 2020-09-16 20:34:14 +03:00
Spyros Zoupanos 17acbb7fc6 Schema separation on sarc stats that are downloaded 2020-09-16 20:30:36 +03:00
Spyros Zoupanos 49de94c4b1 Removing prefix c: from json 2020-09-15 22:32:14 +03:00
Spyros Zoupanos 015f6e88df Removing prefix c: from json 2020-09-15 21:41:28 +03:00
Spyros Zoupanos 8bb00add0d Minor changes to print the Sarc keys 2020-09-15 18:08:42 +03:00
Spyros Zoupanos ba33df29b4 Workign on Sarc stats 2020-09-14 20:10:53 +03:00
Spyros Zoupanos 55222a2516 processIrusStats done 2020-09-13 16:01:08 +03:00
Spyros Zoupanos 08a102a76c processIrusStats done 2020-09-13 16:00:40 +03:00
Spyros Zoupanos 3d5904fb41 More proper naming to methods 2020-09-13 15:01:29 +03:00
Spyros Zoupanos 95fee808fd Downloading Irus Reports works correctly. Need to add limit on downloaded files for testing reasons. Now we have breakpoints 2020-09-13 14:51:45 +03:00
Spyros Zoupanos 196946cd6b Moving JSON Serde jar addition to a better place 2020-09-13 13:01:39 +03:00
Spyros Zoupanos f8e91cdc5c processLogs.updateProdTables. I need feedback for processLogs.portalStats to see wy they never end 2020-09-13 12:23:03 +03:00
Spyros Zoupanos 9caac3e3e3 portalStats finished - Needs testing. Working on updateProdTables 2020-09-12 21:24:31 +03:00
Spyros Zoupanos 8ddf1dcc15 processPortalLog finished 2020-09-12 20:13:33 +03:00
Spyros Zoupanos 968d53f119 Finished downloadsStats 2020-09-11 20:10:37 +03:00
Spyros Zoupanos f78b5d3f86 More progress on viewsStats 2020-09-10 22:37:48 +03:00
Spyros Zoupanos 2d2d1b9694 More progress on viewsStats 2020-09-10 22:27:19 +03:00
Spyros Zoupanos 1d9f8f79a8 Finished cleanOAI 2020-09-09 21:59:04 +03:00
Spyros Zoupanos 398f1f6f15 More progress. Cleaning view double clicks 2020-09-07 21:57:45 +03:00
Spyros Zoupanos 81102dd791 Removing not needed jar by reflection 2020-09-07 20:54:47 +03:00
Spyros Zoupanos 719f9e3cd9 Adding systout messages (should be transformed to log messages) 2020-09-07 20:44:01 +03:00
Spyros Zoupanos e2c70f64ed More progress on loading JSON Serde jar 2020-09-07 00:01:05 +03:00
Spyros Zoupanos 5af2abbea5 Moving variable declarations to a more appropriate place, adding drop table code 2020-09-04 19:49:07 +03:00
Spyros Zoupanos cf7b9c6db3 More progress on adding queries to the code. Initial database and table creation seems OK. Downloading logs from available piwik_ids 2020-09-02 21:02:56 +03:00
Spyros Zoupanos 637e61bb0f Getting the right piwik_ids from (graph) stats db 2020-09-01 22:06:16 +03:00
Spyros Zoupanos d770d7043d Adding a better .gitignore 2020-09-01 19:42:09 +03:00
Spyros Zoupanos 293d6accd4 More progress on adding piwiklogtmp to the code 2020-09-01 19:05:38 +03:00
Spyros Zoupanos f3dda9858c More progress - Adding queries to code 2020-08-31 23:19:15 +03:00
Spyros Zoupanos 8db9a7ccdc Changes to download Sarc stats 2020-07-25 13:17:47 +03:00
Spyros Zoupanos c035fa7648 Changes to download Irus Stats 2020-07-22 19:22:04 +03:00
Spyros Zoupanos 4c00343bbd More progress 2020-06-05 20:39:51 +03:00
Spyros Zoupanos b213da51c4 Modifying JSON saving procedure to make the files usable by HIVE JsonSerDe 2020-05-21 21:49:33 +03:00
Spyros Zoupanos bf820a98b4 Removing the not needed download code that ignores SSL certificates and uses username/password for authentication. Repository ids are provided manually for the moment until the Hive stats DB provides the correct piwik_id 2020-05-19 18:45:28 +03:00
Spyros Zoupanos 9cdea87c7a More progress on download jsons. All certificates are ignored & authentication is done with username & pass 2020-05-16 13:16:16 +03:00
Spyros Zoupanos 66c7ddfc5e More progress on SQL statements and parameters 2020-05-14 22:27:18 +03:00
Spyros Zoupanos 98ba2d0282 The workflow starts 2020-05-12 20:38:31 +03:00
Spyros Zoupanos 0b6f302652 Adding also an update example with the appropriate table definition 2020-05-11 19:53:41 +03:00
Spyros Zoupanos c0b509abfb Simple java action added.
Simple java connection to hive db + basic statements added
2020-05-09 15:51:22 +03:00
Spyros Zoupanos cabe92d155 Changes to make it compile successfully 2020-05-07 21:46:14 +03:00
Spyros Zoupanos af62b14f91 Adding the main java files, the directory structure and main workflow file 2020-05-07 19:00:03 +03:00
38 changed files with 4787 additions and 14 deletions

1
.gitignore vendored
View File

@ -1,4 +1,5 @@
.DS_Store
._*
.idea
*.iws
*.ipr

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.broker.model;
import java.util.Map;
@ -20,9 +21,11 @@ public class Event {
private Map<String, Object> map;
public Event() {}
public Event() {
}
public Event(final String producerId, final String eventId, final String topic, final String payload, final Long creationDate, final Long expiryDate,
public Event(final String producerId, final String eventId, final String topic, final String payload,
final Long creationDate, final Long expiryDate,
final boolean instantMessage,
final Map<String, Object> map) {
this.producerId = producerId;

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.broker.model;
import java.text.ParseException;
@ -38,8 +39,8 @@ public class EventFactory {
final String payload = createPayload(target, updateInfo);
final String eventId =
calculateEventId(updateInfo.getTopic(), target.getOriginalId().get(0), updateInfo.getHighlightValueAsString());
final String eventId = calculateEventId(
updateInfo.getTopic(), target.getOriginalId().get(0), updateInfo.getHighlightValueAsString());
res.setEventId(eventId);
res.setProducerId(PRODUCER_ID);
@ -61,7 +62,8 @@ public class EventFactory {
return payload.toJSON();
}
private static Map<String, Object> createMapFromResult(final Result oaf, final Result source, final UpdateInfo<?> updateInfo) {
private static Map<String, Object> createMapFromResult(final Result oaf, final Result source,
final UpdateInfo<?> updateInfo) {
final Map<String, Object> map = new HashMap<>();
final List<KeyValue> collectedFrom = oaf.getCollectedfrom();
@ -87,12 +89,18 @@ public class EventFactory {
final List<StructuredProperty> subjects = oaf.getSubject();
if (subjects.size() > 0) {
map.put("target_publication_subject_list", subjects.stream().map(StructuredProperty::getValue).collect(Collectors.toList()));
map
.put(
"target_publication_subject_list",
subjects.stream().map(StructuredProperty::getValue).collect(Collectors.toList()));
}
final List<Author> authors = oaf.getAuthor();
if (authors.size() > 0) {
map.put("target_publication_author_list", authors.stream().map(Author::getFullname).collect(Collectors.toList()));
map
.put(
"target_publication_author_list",
authors.stream().map(Author::getFullname).collect(Collectors.toList()));
}
// PROVENANCE INFO
@ -119,7 +127,9 @@ public class EventFactory {
}
private static long parseDateTolong(final String date) {
if (StringUtils.isBlank(date)) { return -1; }
if (StringUtils.isBlank(date)) {
return -1;
}
try {
return DateUtils.parseDate(date, DATE_PATTERNS).getTime();
} catch (final ParseException e) {

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
@ -40,8 +41,10 @@ public class GenerateEventsApplication {
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils.toString(GenerateEventsApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json")));
IOUtils
.toString(
GenerateEventsApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
@ -78,9 +81,12 @@ public class GenerateEventsApplication {
for (final Result source : children) {
for (final Result target : children) {
if (source != target) {
list.addAll(findUpdates(source, target).stream()
.map(info -> EventFactory.newBrokerEvent(source, target, info))
.collect(Collectors.toList()));
list
.addAll(
findUpdates(source, target)
.stream()
.map(info -> EventFactory.newBrokerEvent(source, target, info))
.collect(Collectors.toList()));
}
}
}

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
@ -25,7 +26,8 @@ public class EnrichMissingProject extends UpdateInfo<Project> {
@Override
public String getHighlightValueAsString() {
return getHighlightValue().getFunder() + "::" + getHighlightValue().getFundingProgram() + getHighlightValue().getCode();
return getHighlightValue().getFunder() + "::" + getHighlightValue().getFundingProgram()
+ getHighlightValue().getCode();
}
}

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.broker.oa.util;
import eu.dnetlib.broker.objects.OpenAireEventPayload;

View File

@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dhp-workflows</artifactId >
<groupId>eu.dnetlib.dhp</groupId>
<version>1.1.7-SNAPSHOT</version>
</parent>
<groupId>eu.dnetlib</groupId>
<modelVersion>4.0.0</modelVersion>
<artifactId>dhp-java-action-dbconnection</artifactId>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<cdh.hive.version>0.13.1-cdh5.2.1</cdh.hive.version>
<cdh.hadoop.version>2.5.0-cdh5.2.1</cdh.hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${cdh.hive.version}</version>
<!-- <version>3.1.2</version> -->
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${cdh.hadoop.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,116 @@
package eu.dnetlib.oa.graph.usagestats.export;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
public class Simple {
public static void main(final String[] args) throws Exception {
System.out.println("=============================================");
System.out.println("Staring");
System.out.println("=============================================");
// Connection string that works OK
// String connectionUrl =
// "jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000/stats_wf_db_galexiou_oozie_beta;UID=spyros;PWD=XXXXXX;UseNativeQuery=1";
// The following connectio string also works
String connectionUrl = "jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000/;UseNativeQuery=1";
String jdbcDriverName = "org.apache.hive.jdbc.HiveDriver";
System.out.println("\n=============================================");
System.out.println("Using Connection URL: " + connectionUrl);
System.out.println("USing JDBC Driver " + jdbcDriverName);
Connection con = null;
try {
System.out.println("Step 1");
Class.forName(jdbcDriverName);
System.out.println("Step 2");
con = DriverManager.getConnection(connectionUrl);
System.out.println("Step 3");
Statement stmt = con.createStatement();
System.out.println("Step 4");
ResultSet rs = stmt.executeQuery("select personid, lastname from test_db_spyros.persons");
System.out.println("\n== Begin Query Results ======================");
// print the results to the console
while (rs.next()) {
// the example query returns one String column
System.out.println(rs.getString(1) + " | " + rs.getString(2));
}
System.out.println("== End Query Results =======================\n\n");
System.out.println("==> Drop if exists");
// Drop table if exists
stmt.execute("DROP TABLE IF EXISTS test_db_spyros.Persons2");
System.out.println("==> Create table");
// Create table
stmt
.execute(
"CREATE TABLE test_db_spyros.Persons2 (personid int, lastname varchar(255), firstname varchar(255)) clustered by (personid) into 100 buckets stored as orc tblproperties('transactional'='true')");
System.out.println("==> Insert");
// Insert
stmt
.execute(
"INSERT INTO test_db_spyros.persons2 (personid, lastname, firstname) VALUES ('1', 'ZoupZoup', 'Spyros')");
System.out.println("==> Select 1 on persons2");
// Select the inserted values
rs = stmt.executeQuery("select personid, lastname, firstname from test_db_spyros.persons2");
while (rs.next()) {
// the example query returns one String column
System.out.println(rs.getString(1) + " | " + rs.getString(2) + " | " + rs.getString(3));
}
System.out.println("==> Update");
// Update
stmt
.execute(
"UPDATE test_db_spyros.persons2 SET lastname='ZoupZoup2', firstname='Spyros2' WHERE personid=1");
System.out.println("==> Select 2 on persons2");
// Select the updated values
rs = stmt.executeQuery("select personid, lastname, firstname from test_db_spyros.persons2");
while (rs.next()) {
// the example query returns one String column
System.out.println(rs.getString(1) + " | " + rs.getString(2) + " | " + rs.getString(3));
}
} catch (SQLException e) {
System.out.println("Ex 1");
e.printStackTrace();
System.out.println(e.getMessage());
} catch (Exception e) {
System.out.println("Ex 2");
e.printStackTrace();
System.out.println(e.getMessage());
} finally {
try {
con.close();
} catch (Exception e) {
// swallow
System.out.println("Ex 3");
e.printStackTrace();
System.out.println(e.getMessage());
}
}
System.out.println("=============================================");
System.out.println("Ending");
System.out.println("=============================================");
}
}

View File

@ -0,0 +1,34 @@
<configuration>
<property>
<name>jobTracker</name>
<value>${jobTracker}</value>
</property>
<property>
<name>nameNode</name>
<value>${nameNode}</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>oozie.wf.workflow.notification.url</name>
<value>{serviceUrl}/v1/oozieNotification/jobUpdate?jobId=$jobId%26status=$status</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,48 @@
<workflow-app name="java_action_dbconnection" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>hiveJdbcUrl</name>
<description>hive server jdbc url</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>${hiveMetastoreUris}</value>
</property>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
</configuration>
</global>
<start to="Step1"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name='Step1'>
<java>
<main-class>eu.dnetlib.oa.graph.usagestats.export.Simple</main-class>
</java>
<ok to="End" />
<error to="Kill" />
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dhp-workflows</artifactId >
<groupId>eu.dnetlib.dhp</groupId>
<version>1.1.7-SNAPSHOT</version>
</parent>
<groupId>eu.dnetlib</groupId>
<modelVersion>4.0.0</modelVersion>
<artifactId>dhp-java-action-simple</artifactId>
</project>

View File

@ -0,0 +1,212 @@
package eu.dnetlib.oa.graph.usagestats.export;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.Authenticator;
import java.net.HttpURLConnection;
import java.net.PasswordAuthentication;
import java.net.URL;
import java.net.URLConnection;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.X509Certificate;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
public class Simple {
public static void main(final String[] args) throws Exception {
System.out.println("=============================================");
System.out.println("Hello world");
System.out.println("=============================================");
String url = "https://analytics.openaire.eu/?module=API&method=Live.getLastVisitsDetails&idSite=13&period=day&date=2016-03-15&format=json&expanded=5&filter_limit=1000&token_auth=703bd17d845acdaf795e01bb1e0895b9";
// String url = "https://www.ntua.gr/media/djmediatools/cache/images/home-showcase/1170x460-crop-100-1_averof.jpg";
// String url = "https://www.ntua.gr";
// simpleConnection(url);
// ignoreSslConnection(url);
// getJson(url);
getJson2(url);
}
public static void simpleConnection(String givenUrl) throws IOException {
Authenticator.setDefault(new Authenticator() {
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication("spyros", "RU78N9sqQndnH3SQ".toCharArray());
}
});
URL url = new URL(givenUrl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
conn.setDoOutput(true);
DataOutputStream wr = new DataOutputStream(conn.getOutputStream());
// wr.writeBytes(params);
wr.flush();
wr.close();
BufferedReader br = new BufferedReader(new InputStreamReader(
conn.getInputStream()));
String line = null;
while ((line = br.readLine()) != null) {
System.out.println(line);
}
}
public static String getJson(String url) throws Exception {
try {
Authenticator.setDefault(new Authenticator() {
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication("spyros", "RU78N9sqQndnH3SQ".toCharArray());
}
});
System.out.println("===> Connecting to: " + url);
URL website = new URL(url);
System.out.println("Connection url -----> " + url);
URLConnection connection = website.openConnection();
// connection.setRequestProperty ("Authorization", "Basic "+encoded);
StringBuilder response;
try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
response = new StringBuilder();
String inputLine;
while ((inputLine = in.readLine()) != null) {
response.append(inputLine);
response.append("\n");
}
}
return response.toString();
} catch (Exception e) {
System.out.println("Failed to get URL: " + e);
throw new Exception("Failed to get URL: " + e.toString(), e);
}
}
public static void ignoreSslConnection(String givenUrl)
throws IOException, NoSuchAlgorithmException, KeyManagementException {
/* Start of Fix */
TrustManager[] trustAllCerts = new TrustManager[] {
new X509TrustManager() {
public java.security.cert.X509Certificate[] getAcceptedIssuers() {
return null;
}
public void checkClientTrusted(X509Certificate[] certs, String authType) {
}
public void checkServerTrusted(X509Certificate[] certs, String authType) {
}
}
};
Authenticator.setDefault(new Authenticator() {
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication("spyros", "RU78N9sqQndnH3SQ".toCharArray());
}
});
SSLContext sc = SSLContext.getInstance("SSL");
sc.init(null, trustAllCerts, new java.security.SecureRandom());
HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
// Create all-trusting host name verifier
HostnameVerifier allHostsValid = new HostnameVerifier() {
public boolean verify(String hostname, SSLSession session) {
return true;
}
};
// Install the all-trusting host verifier
HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid);
/* End of the fix */
URL url = new URL(givenUrl);
URLConnection con = url.openConnection();
Reader reader = new InputStreamReader(con.getInputStream());
while (true) {
int ch = reader.read();
if (ch == -1)
break;
System.out.print((char) ch);
}
}
public static String getJson2(String url) throws Exception {
try {
// Trust all certificates
TrustManager[] trustAllCerts = new TrustManager[] {
new X509TrustManager() {
public java.security.cert.X509Certificate[] getAcceptedIssuers() {
return null;
}
public void checkClientTrusted(X509Certificate[] certs, String authType) {
}
public void checkServerTrusted(X509Certificate[] certs, String authType) {
}
}
};
// Provide username & password until there is IP authentication
Authenticator.setDefault(new Authenticator() {
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication("spyros", "RU78N9sqQndnH3SQ".toCharArray());
}
});
SSLContext sc = SSLContext.getInstance("SSL");
sc.init(null, trustAllCerts, new java.security.SecureRandom());
HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
// Create all-trusting host name verifier
HostnameVerifier allHostsValid = new HostnameVerifier() {
public boolean verify(String hostname, SSLSession session) {
return true;
}
};
// Install the all-trusting host verifier
HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid);
/* End of the fix */
System.out.println("===> Connecting to: " + url);
URL website = new URL(url);
System.out.println("Connection url -----> " + url);
URLConnection connection = website.openConnection();
// connection.setRequestProperty ("Authorization", "Basic "+encoded);
StringBuilder response;
try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
response = new StringBuilder();
String inputLine;
while ((inputLine = in.readLine()) != null) {
response.append(inputLine);
response.append("\n");
}
}
return response.toString();
} catch (Exception e) {
System.out.println("Failed to get URL: " + e);
throw new Exception("Failed to get URL: " + e.toString(), e);
}
}
}

View File

@ -0,0 +1,34 @@
<configuration>
<property>
<name>jobTracker</name>
<value>${jobTracker}</value>
</property>
<property>
<name>nameNode</name>
<value>${nameNode}</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>oozie.wf.workflow.notification.url</name>
<value>{serviceUrl}/v1/oozieNotification/jobUpdate?jobId=$jobId%26status=$status</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,48 @@
<workflow-app name="java_action_simple" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>hiveJdbcUrl</name>
<description>hive server jdbc url</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>${hiveMetastoreUris}</value>
</property>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
</configuration>
</global>
<start to="Step1"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name='Step1'>
<java>
<main-class>eu.dnetlib.oa.graph.usagestats.export.Simple</main-class>
</java>
<ok to="End" />
<error to="Kill" />
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,72 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- <parent>
<artifactId>dhp-workflows</artifactId >
<groupId>eu.dnetlib.dhp</groupId>
<version>1.1.7-SNAPSHOT</version>
</parent>
<groupId>eu.dnetlib</groupId> -->
<!-- <parent>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-workflows</artifactId>
<version>1.1.7-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dhp-usage-stats-update</artifactId> -->
<parent>
<artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId>
<version>1.1.7-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dhp-usage-stats-update</artifactId>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<cdh.hive.version>0.13.1-cdh5.2.1</cdh.hive.version>
<cdh.hadoop.version>2.5.0-cdh5.2.1</cdh.hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20180130</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${cdh.hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${cdh.hadoop.version}</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,108 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package eu.dnetlib.oa.graph.usagestats.export;
/**
* @author D. Pierrakos, S. Zoupanos
*/
/**
* @author D. Pierrakos, S. Zoupanos
*/
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import org.apache.log4j.Logger;
public abstract class ConnectDB {
public static Connection DB_HIVE_CONNECTION;
public static Connection DB_IMPALA_CONNECTION;
private static String dbHiveUrl;
private static String dbImpalaUrl;
private static String usageStatsDBSchema;
private static String statsDBSchema;
private final static Logger log = Logger.getLogger(ConnectDB.class);
private final static boolean createAlwaysNewConnection = true;
static void init() throws ClassNotFoundException {
dbHiveUrl = ExecuteWorkflow.dbHiveUrl;
dbImpalaUrl = ExecuteWorkflow.dbImpalaUrl;
usageStatsDBSchema = ExecuteWorkflow.usageStatsDBSchema;
statsDBSchema = ExecuteWorkflow.statsDBSchema;
Class.forName("org.apache.hive.jdbc.HiveDriver");
}
public static Connection getHiveConnection() throws SQLException {
if (!createAlwaysNewConnection) {
if (DB_HIVE_CONNECTION != null && !DB_HIVE_CONNECTION.isClosed()) {
return DB_HIVE_CONNECTION;
} else {
DB_HIVE_CONNECTION = connectHive();
return DB_HIVE_CONNECTION;
}
} else {
if (DB_HIVE_CONNECTION != null && !DB_HIVE_CONNECTION.isClosed())
DB_HIVE_CONNECTION.close();
DB_HIVE_CONNECTION = connectHive();
return DB_HIVE_CONNECTION;
}
}
public static Connection getImpalaConnection() throws SQLException {
if (!createAlwaysNewConnection) {
if (DB_IMPALA_CONNECTION != null && !DB_IMPALA_CONNECTION.isClosed()) {
return DB_IMPALA_CONNECTION;
} else {
DB_IMPALA_CONNECTION = connectImpala();
return DB_IMPALA_CONNECTION;
}
} else {
if (DB_IMPALA_CONNECTION != null && !DB_IMPALA_CONNECTION.isClosed())
DB_IMPALA_CONNECTION.close();
DB_IMPALA_CONNECTION = connectImpala();
return DB_IMPALA_CONNECTION;
}
}
public static String getUsageStatsDBSchema() {
return ConnectDB.usageStatsDBSchema;
}
public static String getStatsDBSchema() {
return ConnectDB.statsDBSchema;
}
private static Connection connectHive() throws SQLException {
Connection connection = DriverManager.getConnection(dbHiveUrl);
Statement stmt = connection.createStatement();
log.debug("Opened database successfully");
return connection;
}
private static Connection connectImpala() throws SQLException {
Connection connection = DriverManager.getConnection(dbImpalaUrl);
Statement stmt = connection.createStatement();
log.debug("Opened database successfully");
return connection;
}
}

View File

@ -0,0 +1,202 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package eu.dnetlib.oa.graph.usagestats.export;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.BasicConfigurator;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
/**
* @author D. Pierrakos, S. Zoupanos
*/
public class ExecuteWorkflow {
static String matomoAuthToken;
static String matomoBaseURL;
static String repoLogPath;
static String portalLogPath;
static String portalMatomoID;
static String irusUKBaseURL;
static String irusUKReportPath;
static String sarcsReportPathArray;
static String sarcsReportPathNonArray;
static String lareferenciaLogPath;
static String lareferenciaBaseURL;
static String lareferenciaAuthToken;
static String dbHiveUrl;
static String dbImpalaUrl;
static String usageStatsDBSchema;
static String statsDBSchema;
static boolean recreateDbAndTables;
static boolean piwikEmptyDirs;
static boolean downloadPiwikLogs;
static boolean processPiwikLogs;
static Calendar startingLogPeriod;
static Calendar endingLogPeriod;
static int numberOfPiwikIdsToDownload;
static int numberOfSiteIdsToDownload;
static boolean laReferenciaEmptyDirs;
static boolean downloadLaReferenciaLogs;
static boolean processLaReferenciaLogs;
static boolean irusCreateTablesEmptyDirs;
static boolean irusDownloadReports;
static boolean irusProcessStats;
static int irusNumberOfOpendoarsToDownload;
static boolean sarcCreateTablesEmptyDirs;
static boolean sarcDownloadReports;
static boolean sarcProcessStats;
static int sarcNumberOfIssnToDownload;
static boolean finalizeStats;
static boolean cleanDuplicates;
static boolean finalTablesVisibleToImpala;
static int numberOfDownloadThreads;
public static void main(String args[]) throws Exception {
// Sending the logs to the console
BasicConfigurator.configure();
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
UsageStatsExporter.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/usagestats/export/usagestats_parameters.json")));
parser.parseArgument(args);
// Setting up the initial parameters
matomoAuthToken = parser.get("matomoAuthToken");
matomoBaseURL = parser.get("matomoBaseURL");
repoLogPath = parser.get("repoLogPath");
portalLogPath = parser.get("portalLogPath");
portalMatomoID = parser.get("portalMatomoID");
irusUKBaseURL = parser.get("irusUKBaseURL");
irusUKReportPath = parser.get("irusUKReportPath");
sarcsReportPathArray = parser.get("sarcsReportPathArray");
sarcsReportPathNonArray = parser.get("sarcsReportPathNonArray");
lareferenciaLogPath = parser.get("lareferenciaLogPath");
lareferenciaBaseURL = parser.get("lareferenciaBaseURL");
lareferenciaAuthToken = parser.get("lareferenciaAuthToken");
dbHiveUrl = parser.get("dbHiveUrl");
dbImpalaUrl = parser.get("dbImpalaUrl");
usageStatsDBSchema = parser.get("usageStatsDBSchema");
statsDBSchema = parser.get("statsDBSchema");
if (parser.get("recreateDbAndTables").toLowerCase().equals("true"))
recreateDbAndTables = true;
else
recreateDbAndTables = false;
if (parser.get("piwikEmptyDirs").toLowerCase().equals("true"))
piwikEmptyDirs = true;
else
piwikEmptyDirs = false;
if (parser.get("downloadPiwikLogs").toLowerCase().equals("true"))
downloadPiwikLogs = true;
else
downloadPiwikLogs = false;
if (parser.get("processPiwikLogs").toLowerCase().equals("true"))
processPiwikLogs = true;
else
processPiwikLogs = false;
String startingLogPeriodStr = parser.get("startingLogPeriod");
Date startingLogPeriodDate = new SimpleDateFormat("MM/yyyy").parse(startingLogPeriodStr);
startingLogPeriod = startingLogPeriodStr(startingLogPeriodDate);
String endingLogPeriodStr = parser.get("endingLogPeriod");
Date endingLogPeriodDate = new SimpleDateFormat("MM/yyyy").parse(endingLogPeriodStr);
endingLogPeriod = startingLogPeriodStr(endingLogPeriodDate);
numberOfPiwikIdsToDownload = Integer.parseInt(parser.get("numberOfPiwikIdsToDownload"));
numberOfSiteIdsToDownload = Integer.parseInt(parser.get("numberOfSiteIdsToDownload"));
if (parser.get("laReferenciaEmptyDirs").toLowerCase().equals("true"))
laReferenciaEmptyDirs = true;
else
laReferenciaEmptyDirs = false;
if (parser.get("downloadLaReferenciaLogs").toLowerCase().equals("true"))
downloadLaReferenciaLogs = true;
else
downloadLaReferenciaLogs = false;
if (parser.get("processLaReferenciaLogs").toLowerCase().equals("true"))
processLaReferenciaLogs = true;
else
processLaReferenciaLogs = false;
if (parser.get("irusCreateTablesEmptyDirs").toLowerCase().equals("true"))
irusCreateTablesEmptyDirs = true;
else
irusCreateTablesEmptyDirs = false;
if (parser.get("irusDownloadReports").toLowerCase().equals("true"))
irusDownloadReports = true;
else
irusDownloadReports = false;
if (parser.get("irusProcessStats").toLowerCase().equals("true"))
irusProcessStats = true;
else
irusProcessStats = false;
irusNumberOfOpendoarsToDownload = Integer.parseInt(parser.get("irusNumberOfOpendoarsToDownload"));
if (parser.get("sarcCreateTablesEmptyDirs").toLowerCase().equals("true"))
sarcCreateTablesEmptyDirs = true;
else
sarcCreateTablesEmptyDirs = false;
if (parser.get("sarcDownloadReports").toLowerCase().equals("true"))
sarcDownloadReports = true;
else
sarcDownloadReports = false;
if (parser.get("sarcProcessStats").toLowerCase().equals("true"))
sarcProcessStats = true;
else
sarcProcessStats = false;
sarcNumberOfIssnToDownload = Integer.parseInt(parser.get("sarcNumberOfIssnToDownload"));
if (parser.get("finalizeStats").toLowerCase().equals("true"))
finalizeStats = true;
else
finalizeStats = false;
if (parser.get("cleanDuplicates").toLowerCase().equals("true"))
cleanDuplicates = true;
else
cleanDuplicates = false;
if (parser.get("finalTablesVisibleToImpala").toLowerCase().equals("true"))
finalTablesVisibleToImpala = true;
else
finalTablesVisibleToImpala = false;
numberOfDownloadThreads = Integer.parseInt(parser.get("numberOfDownloadThreads"));
UsageStatsExporter usagestatsExport = new UsageStatsExporter();
usagestatsExport.export();
}
private static Calendar startingLogPeriodStr(Date date) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
return calendar;
}
}

View File

@ -0,0 +1,414 @@
package eu.dnetlib.oa.graph.usagestats.export;
import java.io.*;
import java.net.URL;
import java.net.URLConnection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author D. Pierrakos, S. Zoupanos
*/
public class IrusStats {
private String irusUKURL;
private static final Logger logger = LoggerFactory.getLogger(IrusStats.class);
public IrusStats(String irusUKURL) throws Exception {
this.irusUKURL = irusUKURL;
// The following may not be needed - It will be created when JSON tables are created
// createTmpTables();
}
public void reCreateLogDirs() throws Exception {
FileSystem dfs = FileSystem.get(new Configuration());
logger.info("Deleting irusUKReport directory: " + ExecuteWorkflow.irusUKReportPath);
dfs.delete(new Path(ExecuteWorkflow.irusUKReportPath), true);
logger.info("Creating irusUKReport directory: " + ExecuteWorkflow.irusUKReportPath);
dfs.mkdirs(new Path(ExecuteWorkflow.irusUKReportPath));
}
public void createTables() throws Exception {
try {
logger.info("Creating sushilog");
Statement stmt = ConnectDB.getHiveConnection().createStatement();
String sqlCreateTableSushiLog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ".sushilog(source STRING, " +
"repository STRING, rid STRING, date STRING, metric_type STRING, count INT) clustered by (source, " +
"repository, rid, date, metric_type) into 100 buckets stored as orc tblproperties('transactional'='true')";
stmt.executeUpdate(sqlCreateTableSushiLog);
logger.info("Created sushilog");
// To see how to apply to the ignore duplicate rules and indexes
// stmt.executeUpdate(sqlCreateTableSushiLog);
// String sqlcreateRuleSushiLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
// + " ON INSERT TO sushilog "
// + " WHERE (EXISTS ( SELECT sushilog.source, sushilog.repository,"
// + "sushilog.rid, sushilog.date "
// + "FROM sushilog "
// + "WHERE sushilog.source = new.source AND sushilog.repository = new.repository AND sushilog.rid = new.rid AND sushilog.date = new.date AND sushilog.metric_type = new.metric_type)) DO INSTEAD NOTHING;";
// stmt.executeUpdate(sqlcreateRuleSushiLog);
// String createSushiIndex = "create index if not exists sushilog_duplicates on sushilog(source, repository, rid, date, metric_type);";
// stmt.executeUpdate(createSushiIndex);
stmt.close();
ConnectDB.getHiveConnection().close();
logger.info("Sushi Tables Created");
} catch (Exception e) {
logger.error("Failed to create tables: " + e);
throw new Exception("Failed to create tables: " + e.toString(), e);
}
}
// // The following may not be needed - It will be created when JSON tables are created
// private void createTmpTables() throws Exception {
// try {
//
// Statement stmt = ConnectDB.getConnection().createStatement();
// String sqlCreateTableSushiLog = "CREATE TABLE IF NOT EXISTS sushilogtmp(source TEXT, repository TEXT, rid TEXT, date TEXT, metric_type TEXT, count INT, PRIMARY KEY(source, repository, rid, date, metric_type));";
// stmt.executeUpdate(sqlCreateTableSushiLog);
//
// // stmt.executeUpdate("CREATE TABLE IF NOT EXISTS public.sushilog AS TABLE sushilog;");
// // String sqlCopyPublicSushiLog = "INSERT INTO sushilog SELECT * FROM public.sushilog;";
// // stmt.executeUpdate(sqlCopyPublicSushiLog);
// String sqlcreateRuleSushiLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
// + " ON INSERT TO sushilogtmp "
// + " WHERE (EXISTS ( SELECT sushilogtmp.source, sushilogtmp.repository,"
// + "sushilogtmp.rid, sushilogtmp.date "
// + "FROM sushilogtmp "
// + "WHERE sushilogtmp.source = new.source AND sushilogtmp.repository = new.repository AND sushilogtmp.rid = new.rid AND sushilogtmp.date = new.date AND sushilogtmp.metric_type = new.metric_type)) DO INSTEAD NOTHING;";
// stmt.executeUpdate(sqlcreateRuleSushiLog);
//
// stmt.close();
// ConnectDB.getConnection().close();
// log.info("Sushi Tmp Tables Created");
// } catch (Exception e) {
// log.error("Failed to create tables: " + e);
// throw new Exception("Failed to create tables: " + e.toString(), e);
// }
// }
public void processIrusStats() throws Exception {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
logger.info("Adding JSON Serde jar");
stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
logger.info("Added JSON Serde jar");
logger.info("Dropping sushilogtmp_json table");
String dropSushilogtmpJson = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".sushilogtmp_json";
stmt.executeUpdate(dropSushilogtmpJson);
logger.info("Dropped sushilogtmp_json table");
logger.info("Creating irus_sushilogtmp_json table");
String createSushilogtmpJson = "CREATE EXTERNAL TABLE IF NOT EXISTS " +
ConnectDB.getUsageStatsDBSchema() + ".irus_sushilogtmp_json(\n" +
" `ItemIdentifier` ARRAY<\n" +
" struct<\n" +
" Type: STRING,\n" +
" Value: STRING\n" +
" >\n" +
" >,\n" +
" `ItemPerformance` ARRAY<\n" +
" struct<\n" +
" `Period`: struct<\n" +
" `Begin`: STRING,\n" +
" `End`: STRING\n" +
" >,\n" +
" `Instance`: struct<\n" +
" `Count`: STRING,\n" +
" `MetricType`: STRING\n" +
" >\n" +
" >\n" +
" >\n" +
")\n" +
"ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n" +
"LOCATION '" + ExecuteWorkflow.irusUKReportPath + "'\n" +
"TBLPROPERTIES (\"transactional\"=\"false\")";
stmt.executeUpdate(createSushilogtmpJson);
logger.info("Created irus_sushilogtmp_json table");
logger.info("Dropping irus_sushilogtmp table");
String dropSushilogtmp = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".irus_sushilogtmp";
stmt.executeUpdate(dropSushilogtmp);
logger.info("Dropped irus_sushilogtmp table");
logger.info("Creating irus_sushilogtmp table");
String createSushilogtmp = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema()
+ ".irus_sushilogtmp(source STRING, repository STRING, " +
"rid STRING, date STRING, metric_type STRING, count INT) clustered by (source) into 100 buckets stored as orc "
+
"tblproperties('transactional'='true')";
stmt.executeUpdate(createSushilogtmp);
logger.info("Created irus_sushilogtmp table");
logger.info("Inserting to irus_sushilogtmp table");
String insertSushilogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".irus_sushilogtmp " +
"SELECT 'IRUS-UK', CONCAT('opendoar____::', split(split(INPUT__FILE__NAME,'IrusIRReport_')[1],'_')[0]), " +
"`ItemIdent`.`Value`, `ItemPerf`.`Period`.`Begin`, " +
"`ItemPerf`.`Instance`.`MetricType`, `ItemPerf`.`Instance`.`Count` " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".irus_sushilogtmp_json " +
"LATERAL VIEW posexplode(ItemIdentifier) ItemIdentifierTable AS seqi, ItemIdent " +
"LATERAL VIEW posexplode(ItemPerformance) ItemPerformanceTable AS seqp, ItemPerf " +
"WHERE `ItemIdent`.`Type`= 'OAI'";
stmt.executeUpdate(insertSushilogtmp);
logger.info("Inserted to irus_sushilogtmp table");
logger.info("Creating downloads_stats table");
String createDownloadsStats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ".downloads_stats " +
"(`source` string, " +
"`repository_id` string, " +
"`result_id` string, " +
"`date` string, " +
"`count` bigint, " +
"`openaire` bigint)";
stmt.executeUpdate(createDownloadsStats);
logger.info("Created downloads_stats table");
logger.info("Inserting into downloads_stats");
String insertDStats = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " +
"SELECT s.source, d.id AS repository_id, " +
"ro.id as result_id, CONCAT(YEAR(date), '/', LPAD(MONTH(date), 2, '0')) as date, s.count, '0' " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".irus_sushilogtmp s, " +
ConnectDB.getStatsDBSchema() + ".datasource_oids d, " +
ConnectDB.getStatsDBSchema() + ".result_oids ro " +
"WHERE s.repository=d.oid AND s.rid=ro.oid AND metric_type='ft_total' AND s.source='IRUS-UK'";
stmt.executeUpdate(insertDStats);
logger.info("Inserted into downloads_stats");
logger.info("Creating sushilog table");
String createSushilog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ".sushilog " +
"(`source` string, " +
"`repository_id` string, " +
"`rid` string, " +
"`date` string, " +
"`metric_type` string, " +
"`count` int)";
stmt.executeUpdate(createSushilog);
logger.info("Created sushilog table");
logger.info("Inserting to sushilog table");
String insertToShushilog = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".sushilog SELECT * FROM " +
ConnectDB.getUsageStatsDBSchema()
+ ".irus_sushilogtmp";
stmt.executeUpdate(insertToShushilog);
logger.info("Inserted to sushilog table");
ConnectDB.getHiveConnection().close();
}
public void getIrusRRReport(String irusUKReportPath) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("YYYY-MM");
// Setting the starting period
Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone();
logger.info("(getIrusRRReport) Starting period for log download: " + sdf.format(start.getTime()));
// Setting the ending period (last day of the month)
Calendar end = (Calendar) ExecuteWorkflow.endingLogPeriod.clone();
end.add(Calendar.MONTH, +1);
end.add(Calendar.DAY_OF_MONTH, -1);
logger.info("(getIrusRRReport) Ending period for log download: " + sdf.format(end.getTime()));
String reportUrl = irusUKURL + "GetReport/?Report=RR1&Release=4&RequestorID=OpenAIRE&BeginDate=" +
sdf.format(start.getTime()) + "&EndDate=" + sdf.format(end.getTime()) +
"&RepositoryIdentifier=&ItemDataType=&NewJiscBand=&Granularity=Monthly&Callback=";
logger.info("(getIrusRRReport) Getting report: " + reportUrl);
String text = getJson(reportUrl, "", "");
List<String> opendoarsToVisit = new ArrayList<String>();
JSONParser parser = new JSONParser();
JSONObject jsonObject = (JSONObject) parser.parse(text);
jsonObject = (JSONObject) jsonObject.get("ReportResponse");
jsonObject = (JSONObject) jsonObject.get("Report");
jsonObject = (JSONObject) jsonObject.get("Report");
jsonObject = (JSONObject) jsonObject.get("Customer");
JSONArray jsonArray = (JSONArray) jsonObject.get("ReportItems");
int i = 0;
for (Object aJsonArray : jsonArray) {
JSONObject jsonObjectRow = (JSONObject) aJsonArray;
JSONArray itemIdentifier = (JSONArray) jsonObjectRow.get("ItemIdentifier");
for (Object identifier : itemIdentifier) {
JSONObject opendoar = (JSONObject) identifier;
if (opendoar.get("Type").toString().equals("OpenDOAR")) {
i++;
opendoarsToVisit.add(opendoar.get("Value").toString());
break;
}
}
// break;
}
logger.info("(getIrusRRReport) Found the following opendoars for download: " + opendoarsToVisit);
if (ExecuteWorkflow.irusNumberOfOpendoarsToDownload > 0 &&
ExecuteWorkflow.irusNumberOfOpendoarsToDownload <= opendoarsToVisit.size()) {
logger.info("Trimming siteIds list to the size of: " + ExecuteWorkflow.irusNumberOfOpendoarsToDownload);
opendoarsToVisit = opendoarsToVisit.subList(0, ExecuteWorkflow.irusNumberOfOpendoarsToDownload);
}
logger.info("(getIrusRRReport) Downloading the followins opendoars: " + opendoarsToVisit);
for (String opendoar : opendoarsToVisit) {
logger.info("Now working on piwikId: " + opendoar);
this.getIrusIRReport(opendoar, irusUKReportPath);
}
logger.info("(getIrusRRReport) Finished with report: " + reportUrl);
}
private void getIrusIRReport(String opendoar, String irusUKReportPath) throws Exception {
logger.info("(getIrusIRReport) Getting report(s) with opendoar: " + opendoar);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM");
// Setting the starting period
Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone();
logger.info("(getIrusIRReport) Starting period for log download: " + simpleDateFormat.format(start.getTime()));
// Setting the ending period (last day of the month)
Calendar end = (Calendar) ExecuteWorkflow.endingLogPeriod.clone();
end.add(Calendar.MONTH, +1);
end.add(Calendar.DAY_OF_MONTH, -1);
logger.info("(getIrusIRReport) Ending period for log download: " + simpleDateFormat.format(end.getTime()));
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
PreparedStatement st = ConnectDB
.getHiveConnection()
.prepareStatement(
"SELECT max(date) FROM " + ConnectDB.getUsageStatsDBSchema() + ".sushilog WHERE repository=?");
st.setString(1, "opendoar____::" + opendoar);
ResultSet rs_date = st.executeQuery();
while (rs_date.next()) {
if (rs_date.getString(1) != null && !rs_date.getString(1).equals("null")
&& !rs_date.getString(1).equals("")) {
start.setTime(sdf.parse(rs_date.getString(1)));
}
}
rs_date.close();
int batch_size = 0;
while (start.before(end)) {
// log.info("date: " + simpleDateFormat.format(start.getTime()));
String reportUrl = this.irusUKURL + "GetReport/?Report=IR1&Release=4&RequestorID=OpenAIRE&BeginDate="
+ simpleDateFormat.format(start.getTime()) + "&EndDate=" + simpleDateFormat.format(start.getTime())
+ "&RepositoryIdentifier=opendoar%3A" + opendoar
+ "&ItemIdentifier=&ItemDataType=&hasDOI=&Granularity=Monthly&Callback=";
start.add(Calendar.MONTH, 1);
logger.info("Downloading file: " + reportUrl);
String text = getJson(reportUrl, "", "");
if (text == null) {
continue;
}
FileSystem fs = FileSystem.get(new Configuration());
String filePath = irusUKReportPath + "/" + "IrusIRReport_" +
opendoar + "_" + simpleDateFormat.format(start.getTime()) + ".json";
logger.info("Storing to file: " + filePath);
FSDataOutputStream fin = fs.create(new Path(filePath), true);
JSONParser parser = new JSONParser();
JSONObject jsonObject = (JSONObject) parser.parse(text);
jsonObject = (JSONObject) jsonObject.get("ReportResponse");
jsonObject = (JSONObject) jsonObject.get("Report");
jsonObject = (JSONObject) jsonObject.get("Report");
jsonObject = (JSONObject) jsonObject.get("Customer");
JSONArray jsonArray = (JSONArray) jsonObject.get("ReportItems");
if (jsonArray == null) {
continue;
}
String oai = "";
for (Object aJsonArray : jsonArray) {
JSONObject jsonObjectRow = (JSONObject) aJsonArray;
fin.write(jsonObjectRow.toJSONString().getBytes());
fin.writeChar('\n');
}
fin.close();
}
ConnectDB.getHiveConnection().close();
logger.info("(getIrusIRReport) Finished downloading report(s) with opendoar: " + opendoar);
}
private String getJson(String url) throws Exception {
try {
System.out.println("===> Connecting to: " + url);
URL website = new URL(url);
System.out.println("Connection url -----> " + url);
URLConnection connection = website.openConnection();
// connection.setRequestProperty ("Authorization", "Basic "+encoded);
StringBuilder response;
try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
response = new StringBuilder();
String inputLine;
while ((inputLine = in.readLine()) != null) {
response.append(inputLine);
// response.append("\n");
}
}
System.out.println("response ====> " + response.toString());
return response.toString();
} catch (Exception e) {
logger.error("Failed to get URL: " + e);
System.out.println("Failed to get URL: " + e);
throw new Exception("Failed to get URL: " + e.toString(), e);
}
}
private String getJson(String url, String username, String password) throws Exception {
// String cred=username+":"+password;
// String encoded = new sun.misc.BASE64Encoder().encode (cred.getBytes());
try {
URL website = new URL(url);
URLConnection connection = website.openConnection();
// connection.setRequestProperty ("Authorization", "Basic "+encoded);
StringBuilder response;
try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
response = new StringBuilder();
String inputLine;
while ((inputLine = in.readLine()) != null) {
response.append(inputLine);
response.append("\n");
}
}
return response.toString();
} catch (Exception e) {
logger.error("Failed to get URL", e);
return null;
}
}
}

View File

@ -0,0 +1,261 @@
package eu.dnetlib.oa.graph.usagestats.export;
import java.io.*;
import java.net.URL;
import java.net.URLConnection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author D. Pierrakos, S. Zoupanos
*/
public class LaReferenciaDownloadLogs {
private final String piwikUrl;
private Date startDate;
private final String tokenAuth;
/*
* The Piwik's API method
*/
private final String APImethod = "?module=API&method=Live.getLastVisitsDetails";
private final String format = "&format=json";
private final String ApimethodGetAllSites = "?module=API&method=SitesManager.getSitesWithViewAccess";
private static final Logger logger = LoggerFactory.getLogger(LaReferenciaDownloadLogs.class);
public LaReferenciaDownloadLogs(String piwikUrl, String tokenAuth) throws Exception {
this.piwikUrl = piwikUrl;
this.tokenAuth = tokenAuth;
this.createTables();
// this.createTmpTables();
}
public void reCreateLogDirs() throws IllegalArgumentException, IOException {
FileSystem dfs = FileSystem.get(new Configuration());
logger.info("Deleting lareferenciaLog directory: " + ExecuteWorkflow.lareferenciaLogPath);
dfs.delete(new Path(ExecuteWorkflow.lareferenciaLogPath), true);
logger.info("Creating lareferenciaLog directory: " + ExecuteWorkflow.lareferenciaLogPath);
dfs.mkdirs(new Path(ExecuteWorkflow.lareferenciaLogPath));
}
private void createTables() throws Exception {
try {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
logger.info("Creating LaReferencia tables");
String sqlCreateTableLareferenciaLog = "CREATE TABLE IF NOT EXISTS " +
ConnectDB.getUsageStatsDBSchema() + ".lareferencialog(matomoid INT, " +
"source STRING, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, " +
"source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " +
"clustered by (source, id_visit, action, timestamp, entity_id) into 100 buckets " +
"stored as orc tblproperties('transactional'='true')";
stmt.executeUpdate(sqlCreateTableLareferenciaLog);
logger.info("Created LaReferencia tables");
// String sqlcreateRuleLaReferenciaLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
// + " ON INSERT TO lareferencialog "
// + " WHERE (EXISTS ( SELECT lareferencialog.matomoid, lareferencialog.source, lareferencialog.id_visit,"
// + "lareferencialog.action, lareferencialog.\"timestamp\", lareferencialog.entity_id "
// + "FROM lareferencialog "
// + "WHERE lareferencialog.matomoid=new.matomoid AND lareferencialog.source = new.source AND lareferencialog.id_visit = new.id_visit AND lareferencialog.action = new.action AND lareferencialog.entity_id = new.entity_id AND lareferencialog.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
// String sqlCreateRuleIndexLaReferenciaLog = "create index if not exists lareferencialog_rule on lareferencialog(matomoid, source, id_visit, action, entity_id, \"timestamp\");";
// stmt.executeUpdate(sqlcreateRuleLaReferenciaLog);
// stmt.executeUpdate(sqlCreateRuleIndexLaReferenciaLog);
stmt.close();
ConnectDB.getHiveConnection().close();
logger.info("Lareferencia Tables Created");
} catch (Exception e) {
logger.error("Failed to create tables: " + e);
throw new Exception("Failed to create tables: " + e.toString(), e);
// System.exit(0);
}
}
// private void createTmpTables() throws Exception {
//
// try {
// Statement stmt = ConnectDB.getConnection().createStatement();
// String sqlCreateTmpTableLaReferenciaLog = "CREATE TABLE IF NOT EXISTS lareferencialogtmp(matomoid INTEGER, source TEXT, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
// String sqlcreateTmpRuleLaReferenciaLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
// + " ON INSERT TO lareferencialogtmp "
// + " WHERE (EXISTS ( SELECT lareferencialogtmp.matomoid, lareferencialogtmp.source, lareferencialogtmp.id_visit,"
// + "lareferencialogtmp.action, lareferencialogtmp.\"timestamp\", lareferencialogtmp.entity_id "
// + "FROM lareferencialogtmp "
// + "WHERE lareferencialogtmp.matomoid=new.matomoid AND lareferencialogtmp.source = new.source AND lareferencialogtmp.id_visit = new.id_visit AND lareferencialogtmp.action = new.action AND lareferencialogtmp.entity_id = new.entity_id AND lareferencialogtmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
// stmt.executeUpdate(sqlCreateTmpTableLaReferenciaLog);
// stmt.executeUpdate(sqlcreateTmpRuleLaReferenciaLog);
//
// stmt.close();
// log.info("Lareferencia Tmp Tables Created");
//
// } catch (Exception e) {
// log.error("Failed to create tmptables: " + e);
// throw new Exception("Failed to create tmp tables: " + e.toString(), e);
// // System.exit(0);
// }
// }
private String getPiwikLogUrl() {
return piwikUrl + "/";
}
private String getJson(String url) throws Exception {
try {
URL website = new URL(url);
URLConnection connection = website.openConnection();
StringBuilder response;
try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
response = new StringBuilder();
String inputLine;
while ((inputLine = in.readLine()) != null) {
response.append(inputLine);
// response.append("\n");
}
}
return response.toString();
} catch (Exception e) {
logger.error("Failed to get URL: " + e);
throw new Exception("Failed to get URL: " + e.toString(), e);
}
}
public void GetLaReferenciaRepos(String repoLogsPath) throws Exception {
String baseApiUrl = getPiwikLogUrl() + ApimethodGetAllSites + format + "&token_auth=" + this.tokenAuth;
String content = "";
List<Integer> siteIdsToVisit = new ArrayList<Integer>();
// Getting all the siteIds in a list for logging reasons & limiting the list
// to the max number of siteIds
content = getJson(baseApiUrl);
JSONParser parser = new JSONParser();
JSONArray jsonArray = (JSONArray) parser.parse(content);
for (Object aJsonArray : jsonArray) {
JSONObject jsonObjectRow = (JSONObject) aJsonArray;
siteIdsToVisit.add(Integer.parseInt(jsonObjectRow.get("idsite").toString()));
}
logger.info("Found the following siteIds for download: " + siteIdsToVisit);
if (ExecuteWorkflow.numberOfPiwikIdsToDownload > 0 &&
ExecuteWorkflow.numberOfPiwikIdsToDownload <= siteIdsToVisit.size()) {
logger.info("Trimming siteIds list to the size of: " + ExecuteWorkflow.numberOfPiwikIdsToDownload);
siteIdsToVisit = siteIdsToVisit.subList(0, ExecuteWorkflow.numberOfPiwikIdsToDownload);
}
logger.info("Downloading from repos with the followins siteIds: " + siteIdsToVisit);
for (int siteId : siteIdsToVisit) {
logger.info("Now working on piwikId: " + siteId);
this.GetLaReFerenciaLogs(repoLogsPath, siteId);
}
}
public void GetLaReFerenciaLogs(String repoLogsPath,
int laReferencialMatomoID) throws Exception {
logger.info("Downloading logs for LaReferencia repoid " + laReferencialMatomoID);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
// Setting the starting period
Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone();
logger.info("Starting period for log download: " + sdf.format(start.getTime()));
// Setting the ending period (last day of the month)
Calendar end = (Calendar) ExecuteWorkflow.endingLogPeriod.clone();
end.add(Calendar.MONTH, +1);
end.add(Calendar.DAY_OF_MONTH, -1);
logger.info("Ending period for log download: " + sdf.format(end.getTime()));
PreparedStatement st = ConnectDB
.getHiveConnection()
.prepareStatement(
"SELECT max(timestamp) FROM " + ConnectDB.getUsageStatsDBSchema() +
".lareferencialog WHERE matomoid=? GROUP BY timestamp HAVING max(timestamp) is not null");
st.setInt(1, laReferencialMatomoID);
ResultSet rs_date = st.executeQuery();
while (rs_date.next()) {
if (rs_date.getString(1) != null && !rs_date.getString(1).equals("null")
&& !rs_date.getString(1).equals("")) {
start.setTime(sdf.parse(rs_date.getString(1)));
}
}
rs_date.close();
for (Calendar currDay = (Calendar) start.clone(); currDay.before(end); currDay.add(Calendar.DATE, 1)) {
Date date = currDay.getTime();
logger
.info(
"Downloading logs for LaReferencia repoid " + laReferencialMatomoID + " and for "
+ sdf.format(date));
String period = "&period=day&date=" + sdf.format(date);
String outFolder = "";
outFolder = repoLogsPath;
FileSystem fs = FileSystem.get(new Configuration());
FSDataOutputStream fin = fs
.create(
new Path(outFolder + "/" + laReferencialMatomoID + "_LaRefPiwiklog" + sdf.format((date)) + ".json"),
true);
String baseApiUrl = getPiwikLogUrl() + APImethod + "&idSite=" + laReferencialMatomoID + period + format
+ "&expanded=5&filter_limit=1000&token_auth=" + tokenAuth;
String content = "";
int i = 0;
JSONParser parser = new JSONParser();
do {
String apiUrl = baseApiUrl;
if (i > 0) {
apiUrl += "&filter_offset=" + (i * 1000);
}
content = getJson(apiUrl);
if (content.length() == 0 || content.equals("[]"))
break;
JSONArray jsonArray = (JSONArray) parser.parse(content);
for (Object aJsonArray : jsonArray) {
JSONObject jsonObjectRaw = (JSONObject) aJsonArray;
fin.write(jsonObjectRaw.toJSONString().getBytes());
fin.writeChar('\n');
}
logger
.info(
"Downloaded part " + i + " of logs for LaReferencia repoid " + laReferencialMatomoID
+ " and for "
+ sdf.format(date));
i++;
} while (true);
fin.close();
}
}
}

View File

@ -0,0 +1,418 @@
package eu.dnetlib.oa.graph.usagestats.export;
import java.io.*;
import java.net.URLDecoder;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author D. Pierrakos, S. Zoupanos
*/
public class LaReferenciaStats {
private static final Logger logger = LoggerFactory.getLogger(LaReferenciaStats.class);
private String logRepoPath;
private Statement stmt = null;
private String CounterRobotsURL;
private ArrayList robotsList;
public LaReferenciaStats(String logRepoPath) throws Exception {
this.logRepoPath = logRepoPath;
this.createTables();
// this.createTmpTables();
}
/*
* private void connectDB() throws Exception { try { ConnectDB connectDB = new ConnectDB(); } catch (Exception e) {
* log.error("Connect to db failed: " + e); throw new Exception("Failed to connect to db: " + e.toString(), e); } }
*/
private void createTables() throws Exception {
try {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
logger.info("Creating LaReferencia tables");
String sqlCreateTableLareferenciaLog = "CREATE TABLE IF NOT EXISTS " +
ConnectDB.getUsageStatsDBSchema() + ".lareferencialog(matomoid INT, " +
"source STRING, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, " +
"source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " +
"clustered by (source, id_visit, action, timestamp, entity_id) into 100 buckets " +
"stored as orc tblproperties('transactional'='true')";
stmt.executeUpdate(sqlCreateTableLareferenciaLog);
logger.info("Created LaReferencia tables");
// String sqlcreateRuleLaReferenciaLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
// + " ON INSERT TO lareferencialog "
// + " WHERE (EXISTS ( SELECT lareferencialog.matomoid, lareferencialog.source, lareferencialog.id_visit,"
// + "lareferencialog.action, lareferencialog.\"timestamp\", lareferencialog.entity_id "
// + "FROM lareferencialog "
// + "WHERE lareferencialog.matomoid=new.matomoid AND lareferencialog.source = new.source AND lareferencialog.id_visit = new.id_visit AND lareferencialog.action = new.action AND lareferencialog.entity_id = new.entity_id AND lareferencialog.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
// String sqlCreateRuleIndexLaReferenciaLog = "create index if not exists lareferencialog_rule on lareferencialog(matomoid, source, id_visit, action, entity_id, \"timestamp\");";
// stmt.executeUpdate(sqlcreateRuleLaReferenciaLog);
// stmt.executeUpdate(sqlCreateRuleIndexLaReferenciaLog);
stmt.close();
ConnectDB.getHiveConnection().close();
logger.info("Lareferencia Tables Created");
} catch (Exception e) {
logger.error("Failed to create tables: " + e);
throw new Exception("Failed to create tables: " + e.toString(), e);
// System.exit(0);
}
}
// private void createTmpTables() throws Exception {
//
// try {
// Statement stmt = ConnectDB.getConnection().createStatement();
// String sqlCreateTmpTableLaReferenciaLog = "CREATE TABLE IF NOT EXISTS lareferencialogtmp(matomoid INTEGER, source TEXT, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
// String sqlcreateTmpRuleLaReferenciaLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
// + " ON INSERT TO lareferencialogtmp "
// + " WHERE (EXISTS ( SELECT lareferencialogtmp.matomoid, lareferencialogtmp.source, lareferencialogtmp.id_visit,"
// + "lareferencialogtmp.action, lareferencialogtmp.\"timestamp\", lareferencialogtmp.entity_id "
// + "FROM lareferencialogtmp "
// + "WHERE lareferencialogtmp.matomoid=new.matomoid AND lareferencialogtmp.source = new.source AND lareferencialogtmp.id_visit = new.id_visit AND lareferencialogtmp.action = new.action AND lareferencialogtmp.entity_id = new.entity_id AND lareferencialogtmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
// stmt.executeUpdate(sqlCreateTmpTableLaReferenciaLog);
// stmt.executeUpdate(sqlcreateTmpRuleLaReferenciaLog);
//
// stmt.close();
// log.info("Lareferencia Tmp Tables Created");
//
// } catch (Exception e) {
// log.error("Failed to create tmptables: " + e);
// throw new Exception("Failed to create tmp tables: " + e.toString(), e);
// // System.exit(0);
// }
// }
public void processLogs() throws Exception {
try {
logger.info("Processing LaReferencia repository logs");
processlaReferenciaLog();
logger.info("LaReferencia repository logs process done");
logger.info("LaReferencia removing double clicks");
removeDoubleClicks();
logger.info("LaReferencia removed double clicks");
logger.info("LaReferencia creating viewsStats");
viewsStats();
logger.info("LaReferencia created viewsStats");
logger.info("LaReferencia creating downloadsStats");
downloadsStats();
logger.info("LaReferencia created downloadsStats");
logger.info("LaReferencia updating Production Tables");
updateProdTables();
logger.info("LaReferencia updated Production Tables");
} catch (Exception e) {
logger.error("Failed to process logs: " + e);
throw new Exception("Failed to process logs: " + e.toString(), e);
}
}
public void processlaReferenciaLog() throws Exception {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
logger.info("Adding JSON Serde jar");
stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
logger.info("Added JSON Serde jar");
logger.info("Dropping lareferencialogtmp_json table");
String drop_lareferencialogtmp_json = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".lareferencialogtmp_json";
stmt.executeUpdate(drop_lareferencialogtmp_json);
logger.info("Dropped lareferencialogtmp_json table");
logger.info("Creating lareferencialogtmp_json");
String create_lareferencialogtmp_json = "CREATE EXTERNAL TABLE IF NOT EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".lareferencialogtmp_json(\n" +
" `idSite` STRING,\n" +
" `idVisit` STRING,\n" +
" `country` STRING,\n" +
" `referrerName` STRING,\n" +
" `browser` STRING,\n" +
" `repItem` STRING,\n" +
" `actionDetails` ARRAY<\n" +
" struct<\n" +
" timestamp: STRING,\n" +
" type: STRING,\n" +
" url: STRING,\n" +
" `customVariables`: struct<\n" +
" `1`: struct<\n" +
" `customVariablePageValue1`: STRING\n" +
" >,\n" +
" `2`: struct<\n" +
" `customVariablePageValue2`: STRING\n" +
" >\n" +
" >\n" +
" >\n" +
" >" +
")\n" +
"ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n" +
"LOCATION '" + ExecuteWorkflow.lareferenciaLogPath + "'\n" +
"TBLPROPERTIES (\"transactional\"=\"false\")";
stmt.executeUpdate(create_lareferencialogtmp_json);
logger.info("Created lareferencialogtmp_json");
logger.info("Dropping lareferencialogtmp table");
String drop_lareferencialogtmp = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".lareferencialogtmp";
stmt.executeUpdate(drop_lareferencialogtmp);
logger.info("Dropped lareferencialogtmp table");
logger.info("Creating lareferencialogtmp");
String create_lareferencialogtmp = "CREATE TABLE " +
ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp(matomoid INT, " +
"source STRING, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, " +
"source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) " +
"clustered by (source, id_visit, action, timestamp, entity_id) into 100 buckets " +
"stored as orc tblproperties('transactional'='true')";
stmt.executeUpdate(create_lareferencialogtmp);
logger.info("Created lareferencialogtmp");
logger.info("Inserting into lareferencialogtmp");
String insert_lareferencialogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp " +
"SELECT DISTINCT cast(idSite as INT) as matomoid, CONCAT('opendoar____::', " +
"actiondetail.customVariables.`2`.customVariablePageValue2) as source, idVisit as id_Visit, country, " +
"actiondetail.type as action, actiondetail.url as url, " +
"actiondetail.customVariables.`1`.`customVariablePageValue1` as entity_id, " +
"'repItem' as source_item_type, from_unixtime(cast(actiondetail.timestamp as BIGINT)) as timestamp, " +
"referrerName as referrer_name, browser as agent " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp_json " +
"LATERAL VIEW explode(actiondetails) actiondetailsTable AS actiondetail";
stmt.executeUpdate(insert_lareferencialogtmp);
logger.info("Inserted into lareferencialogtmp");
stmt.close();
}
public void removeDoubleClicks() throws Exception {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
logger.info("Cleaning download double clicks");
// clean download double clicks
String sql = "DELETE from " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp WHERE EXISTS (" +
"SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp p1, " +
ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp p2 " +
"WHERE p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id " +
"AND p1.action=p2.action AND p1.action='download' AND p1.timestamp!=p2.timestamp " +
"AND p1.timestamp<p2.timestamp AND ((unix_timestamp(p2.timestamp)-unix_timestamp(p1.timestamp))/60)<30 " +
"AND lareferencialogtmp.source=p1.source AND lareferencialogtmp.id_visit=p1.id_visit " +
"AND lareferencialogtmp.action=p1.action AND lareferencialogtmp.entity_id=p1.entity_id " +
"AND lareferencialogtmp.timestamp=p1.timestamp)";
stmt.executeUpdate(sql);
stmt.close();
logger.info("Cleaned download double clicks");
stmt = ConnectDB.getHiveConnection().createStatement();
logger.info("Cleaning action double clicks");
// clean view double clicks
sql = "DELETE from " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp WHERE EXISTS (" +
"SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp p1, " +
ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp p2 " +
"WHERE p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id " +
"AND p1.action=p2.action AND p1.action='action' AND p1.timestamp!=p2.timestamp " +
"AND p1.timestamp<p2.timestamp AND ((unix_timestamp(p2.timestamp)-unix_timestamp(p1.timestamp))/60)<10 " +
"AND lareferencialogtmp.source=p1.source AND lareferencialogtmp.id_visit=p1.id_visit " +
"AND lareferencialogtmp.action=p1.action AND lareferencialogtmp.entity_id=p1.entity_id " +
"AND lareferencialogtmp.timestamp=p1.timestamp)";
stmt.executeUpdate(sql);
stmt.close();
logger.info("Cleaned action double clicks");
// conn.close();
}
public void viewsStats() throws Exception {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
logger.info("Creating la_result_views_monthly_tmp view");
String sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema() + ".la_result_views_monthly_tmp AS "
+
"SELECT entity_id AS id, COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' " +
"THEN 1 ELSE 0 END) AS openaire_referrer, " +
"CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp where action='action' and " +
"(source_item_type='oaItem' or source_item_type='repItem') " +
"GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), " +
"source ORDER BY source, entity_id";
stmt.executeUpdate(sql);
logger.info("Created la_result_views_monthly_tmp view");
logger.info("Dropping la_views_stats_tmp table");
sql = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".la_views_stats_tmp";
stmt.executeUpdate(sql);
logger.info("Dropped la_views_stats_tmp table");
logger.info("Creating la_views_stats_tmp table");
sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".la_views_stats_tmp " +
"AS SELECT 'LaReferencia' as source, d.id as repository_id, ro.id as result_id, month as date, " +
"max(views) AS count, max(openaire_referrer) AS openaire " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".la_result_views_monthly_tmp p, " +
ConnectDB.getStatsDBSchema() + ".datasource_oids d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " +
"WHERE p.source=d.oid AND p.id=ro.oid " +
"GROUP BY d.id, ro.id, month " +
"ORDER BY d.id, ro.id, month";
stmt.executeUpdate(sql);
logger.info("Created la_views_stats_tmp table");
stmt.close();
ConnectDB.getHiveConnection().close();
}
private void downloadsStats() throws Exception {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
logger.info("Creating la_result_downloads_monthly_tmp view");
String sql = "CREATE OR REPLACE VIEW " + ConnectDB.getUsageStatsDBSchema()
+ ".la_result_downloads_monthly_tmp AS " +
"SELECT entity_id AS id, COUNT(entity_id) as downloads, SUM(CASE WHEN referrer_name LIKE '%openaire%' " +
"THEN 1 ELSE 0 END) AS openaire_referrer, " +
"CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')) AS month, source " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp where action='download' and " +
"(source_item_type='oaItem' or source_item_type='repItem') " +
"GROUP BY entity_id, CONCAT(YEAR(timestamp), '/', LPAD(MONTH(timestamp), 2, '0')), " +
"source ORDER BY source, entity_id";
stmt.executeUpdate(sql);
logger.info("Created la_result_downloads_monthly_tmp view");
logger.info("Dropping la_downloads_stats_tmp table");
sql = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".la_downloads_stats_tmp";
stmt.executeUpdate(sql);
logger.info("Dropped la_downloads_stats_tmp table");
logger.info("Creating la_downloads_stats_tmp table");
sql = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".la_downloads_stats_tmp " +
"AS SELECT 'LaReferencia' as source, d.id as repository_id, ro.id as result_id, month as date, " +
"max(downloads) AS count, max(openaire_referrer) AS openaire " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".la_result_downloads_monthly_tmp p, " +
ConnectDB.getStatsDBSchema() + ".datasource_oids d, " + ConnectDB.getStatsDBSchema() + ".result_oids ro " +
"WHERE p.source=d.oid AND p.id=ro.oid " +
"GROUP BY d.id, ro.id, month " +
"ORDER BY d.id, ro.id, month";
stmt.executeUpdate(sql);
logger.info("Created la_downloads_stats_tmp table");
stmt.close();
ConnectDB.getHiveConnection().close();
}
private void updateProdTables() throws SQLException, Exception {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
logger.info("Updating lareferencialog");
String sql = "insert into " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialog " +
"select * from " + ConnectDB.getUsageStatsDBSchema() + ".lareferencialogtmp";
stmt.executeUpdate(sql);
logger.info("Updating views_stats");
sql = "insert into " + ConnectDB.getUsageStatsDBSchema() + ".views_stats " +
"select * from " + ConnectDB.getUsageStatsDBSchema() + ".la_views_stats_tmp";
stmt.executeUpdate(sql);
// sql = "insert into public.views_stats select * from la_views_stats_tmp;";
// stmt.executeUpdate(sql);
logger.info("Updating downloads_stats");
sql = "insert into " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " +
"select * from " + ConnectDB.getUsageStatsDBSchema() + ".la_downloads_stats_tmp";
stmt.executeUpdate(sql);
// sql = "insert into public.downloads_stats select * from la_downloads_stats_tmp;";
// stmt.executeUpdate(sql);
stmt.close();
ConnectDB.getHiveConnection().close();
}
private ArrayList<String> listHdfsDir(String dir) throws Exception {
FileSystem hdfs = FileSystem.get(new Configuration());
RemoteIterator<LocatedFileStatus> Files;
ArrayList<String> fileNames = new ArrayList<>();
try {
Path exportPath = new Path(hdfs.getUri() + dir);
Files = hdfs.listFiles(exportPath, false);
while (Files.hasNext()) {
String fileName = Files.next().getPath().toString();
// log.info("Found hdfs file " + fileName);
fileNames.add(fileName);
}
// hdfs.close();
} catch (Exception e) {
logger.error("HDFS file path with exported data does not exist : " + new Path(hdfs.getUri() + logRepoPath));
throw new Exception("HDFS file path with exported data does not exist : " + logRepoPath, e);
}
return fileNames;
}
private String readHDFSFile(String filename) throws Exception {
String result;
try {
FileSystem fs = FileSystem.get(new Configuration());
// log.info("reading file : " + filename);
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(filename))));
StringBuilder sb = new StringBuilder();
String line = br.readLine();
while (line != null) {
if (!line.equals("[]")) {
sb.append(line);
}
// sb.append(line);
line = br.readLine();
}
result = sb.toString().replace("][{\"idSite\"", ",{\"idSite\"");
if (result.equals("")) {
result = "[]";
}
// fs.close();
} catch (Exception e) {
logger.error(e.getMessage());
throw new Exception(e);
}
return result;
}
}

View File

@ -0,0 +1,247 @@
package eu.dnetlib.oa.graph.usagestats.export;
import java.io.*;
import java.net.Authenticator;
import java.net.URL;
import java.net.URLConnection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author D. Pierrakos, S. Zoupanos
*/
public class PiwikDownloadLogs {
private final String piwikUrl;
private Date startDate;
private final String tokenAuth;
/*
* The Piwik's API method
*/
private final String APImethod = "?module=API&method=Live.getLastVisitsDetails";
private final String format = "&format=json";
private static final Logger logger = LoggerFactory.getLogger(PiwikDownloadLogs.class);
public PiwikDownloadLogs(String piwikUrl, String tokenAuth) {
this.piwikUrl = piwikUrl;
this.tokenAuth = tokenAuth;
}
private String getPiwikLogUrl() {
return "https://" + piwikUrl + "/";
}
private String getJson(String url) throws Exception {
try {
logger.debug("Connecting to download the JSON: " + url);
URL website = new URL(url);
URLConnection connection = website.openConnection();
StringBuilder response;
try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
response = new StringBuilder();
String inputLine;
while ((inputLine = in.readLine()) != null) {
response.append(inputLine);
}
}
return response.toString();
} catch (Exception e) {
logger.error("Failed to get URL: " + url + " Exception: " + e);
throw new Exception("Failed to get URL: " + url + " Exception: " + e.toString(), e);
}
}
class WorkerThread implements Runnable {
private Calendar currDay;
private int siteId;
private String repoLogsPath;
private String portalLogPath;
private String portalMatomoID;
public WorkerThread(Calendar currDay, int siteId, String repoLogsPath, String portalLogPath,
String portalMatomoID) throws IOException {
this.currDay = (Calendar) currDay.clone();
this.siteId = new Integer(siteId);
this.repoLogsPath = new String(repoLogsPath);
this.portalLogPath = new String(portalLogPath);
this.portalMatomoID = new String(portalMatomoID);
}
public void run() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
System.out
.println(
Thread.currentThread().getName() + " (Start) Thread for "
+ "parameters: currDay=" + sdf.format(currDay.getTime()) + ", siteId=" + siteId +
", repoLogsPath=" + repoLogsPath + ", portalLogPath=" + portalLogPath +
", portalLogPath=" + portalLogPath + ", portalMatomoID=" + portalMatomoID);
try {
GetOpenAIRELogsForDate(currDay, siteId, repoLogsPath, portalLogPath, portalMatomoID);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out
.println(
Thread.currentThread().getName() + " (End) Thread for "
+ "parameters: currDay=" + sdf.format(currDay.getTime()) + ", siteId=" + siteId +
", repoLogsPath=" + repoLogsPath + ", portalLogPath=" + portalLogPath +
", portalLogPath=" + portalLogPath + ", portalMatomoID=" + portalMatomoID);
}
public void GetOpenAIRELogsForDate(Calendar currDay, int siteId, String repoLogsPath, String portalLogPath,
String portalMatomoID) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Date date = currDay.getTime();
logger.info("Downloading logs for repoid " + siteId + " and for " + sdf.format(date));
String period = "&period=day&date=" + sdf.format(date);
String outFolder = "";
if (siteId == Integer.parseInt(portalMatomoID)) {
outFolder = portalLogPath;
} else {
outFolder = repoLogsPath;
}
String baseApiUrl = getPiwikLogUrl() + APImethod + "&idSite=" + siteId + period + format
+ "&expanded=5&filter_limit=1000&token_auth=" + tokenAuth;
String content = "";
int i = 0;
JSONParser parser = new JSONParser();
StringBuffer totalContent = new StringBuffer();
FileSystem fs = FileSystem.get(new Configuration());
do {
int writtenBytes = 0;
String apiUrl = baseApiUrl;
if (i > 0) {
apiUrl += "&filter_offset=" + (i * 1000);
}
content = getJson(apiUrl);
if (content.length() == 0 || content.equals("[]"))
break;
FSDataOutputStream fin = fs
.create(
new Path(outFolder + "/" + siteId + "_Piwiklog" + sdf.format((date)) + "_offset_" + i
+ ".json"),
true);
JSONArray jsonArray = (JSONArray) parser.parse(content);
for (Object aJsonArray : jsonArray) {
JSONObject jsonObjectRaw = (JSONObject) aJsonArray;
byte[] jsonObjectRawBytes = jsonObjectRaw.toJSONString().getBytes();
fin.write(jsonObjectRawBytes);
fin.writeChar('\n');
writtenBytes += jsonObjectRawBytes.length + 1;
}
fin.close();
System.out
.println(
Thread.currentThread().getName() + " (Finished writing) Wrote " + writtenBytes
+ " bytes. Filename: " + siteId + "_Piwiklog" + sdf.format((date)) + "_offset_" + i
+ ".json");
i++;
} while (true);
fs.close();
}
}
public void GetOpenAIRELogs(String repoLogsPath, String portalLogPath, String portalMatomoID) throws Exception {
Statement statement = ConnectDB.getHiveConnection().createStatement();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
ResultSet rs = statement
.executeQuery(
"SELECT distinct piwik_id from " + ConnectDB.getStatsDBSchema()
+ ".datasource where piwik_id is not null and piwik_id <> 0 order by piwik_id");
// Getting all the piwikids in a list for logging reasons & limitting the list
// to the max number of piwikids
List<Integer> piwikIdToVisit = new ArrayList<Integer>();
while (rs.next())
piwikIdToVisit.add(rs.getInt(1));
logger.info("Found the following piwikIds for download: " + piwikIdToVisit);
if (ExecuteWorkflow.numberOfPiwikIdsToDownload > 0 &&
ExecuteWorkflow.numberOfPiwikIdsToDownload <= piwikIdToVisit.size()) {
logger.info("Trimming piwikIds list to the size of: " + ExecuteWorkflow.numberOfPiwikIdsToDownload);
piwikIdToVisit = piwikIdToVisit.subList(0, ExecuteWorkflow.numberOfPiwikIdsToDownload);
}
logger.info("Downloading from repos with the following piwikIds: " + piwikIdToVisit);
// Setting the starting period
Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone();
logger.info("Starting period for log download: " + sdf.format(start.getTime()));
// Setting the ending period (last day of the month)
Calendar end = (Calendar) ExecuteWorkflow.endingLogPeriod.clone();
end.add(Calendar.MONTH, +1);
end.add(Calendar.DAY_OF_MONTH, -1);
logger.info("Ending period for log download: " + sdf.format(end.getTime()));
ExecutorService executor = Executors.newFixedThreadPool(ExecuteWorkflow.numberOfDownloadThreads);
for (int siteId : piwikIdToVisit) {
logger.info("Now working on piwikId: " + siteId);
PreparedStatement st = ConnectDB.DB_HIVE_CONNECTION
.prepareStatement(
"SELECT max(timestamp) FROM " + ConnectDB.getUsageStatsDBSchema()
+ ".piwiklog WHERE source=? GROUP BY timestamp HAVING max(timestamp) is not null");
st.setInt(1, siteId);
ResultSet rs_date = st.executeQuery();
while (rs_date.next()) {
if (rs_date.getString(1) != null && !rs_date.getString(1).equals("null")
&& !rs_date.getString(1).equals("")) {
start.setTime(sdf.parse(rs_date.getString(1)));
}
}
rs_date.close();
for (Calendar currDay = (Calendar) start.clone(); currDay.before(end); currDay.add(Calendar.DATE, 1)) {
Runnable worker = new WorkerThread(currDay, siteId, repoLogsPath, portalLogPath, portalMatomoID);
executor.execute(worker);// calling execute method of ExecutorService
}
}
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
}
}

View File

@ -0,0 +1,54 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package eu.dnetlib.oa.graph.usagestats.export;
/**
* @author D. Pierrakos, S. Zoupanos
*/
/**
* @author D. Pierrakos, S. Zoupanos
*/
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
import org.json.JSONException;
import org.json.simple.JSONArray;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
public class ReadCounterRobotsList {
private ArrayList robotsPatterns = new ArrayList();
private String COUNTER_ROBOTS_URL;
public ReadCounterRobotsList(String url) throws IOException, JSONException, ParseException {
COUNTER_ROBOTS_URL = url;
robotsPatterns = readRobotsPartners(COUNTER_ROBOTS_URL);
}
private ArrayList readRobotsPartners(String url) throws MalformedURLException, IOException, ParseException {
InputStream is = new URL(url).openStream();
JSONParser parser = new JSONParser();
BufferedReader reader = new BufferedReader(new InputStreamReader(is, Charset.forName("ISO-8859-1")));
JSONArray jsonArray = (JSONArray) parser.parse(reader);
for (Object aJsonArray : jsonArray) {
org.json.simple.JSONObject jsonObjectRow = (org.json.simple.JSONObject) aJsonArray;
robotsPatterns.add(jsonObjectRow.get("pattern").toString().replace("\\", "\\\\"));
}
return robotsPatterns;
}
public ArrayList getRobotsPatterns() {
return robotsPatterns;
}
}

View File

@ -0,0 +1,567 @@
package eu.dnetlib.oa.graph.usagestats.export;
import java.io.*;
// import java.io.BufferedReader;
// import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author D. Pierrakos, S. Zoupanos
*/
public class SarcStats {
private Statement stmtHive = null;
private Statement stmtImpala = null;
private static final Logger logger = LoggerFactory.getLogger(SarcStats.class);
public SarcStats() throws Exception {
// createTables();
}
private void createTables() throws Exception {
try {
stmtHive = ConnectDB.getHiveConnection().createStatement();
String sqlCreateTableSushiLog = "CREATE TABLE IF NOT EXISTS sushilog(source TEXT, repository TEXT, rid TEXT, date TEXT, metric_type TEXT, count INT, PRIMARY KEY(source, repository, rid, date, metric_type));";
stmtHive.executeUpdate(sqlCreateTableSushiLog);
// String sqlCopyPublicSushiLog="INSERT INTO sushilog SELECT * FROM public.sushilog;";
// stmt.executeUpdate(sqlCopyPublicSushiLog);
String sqlcreateRuleSushiLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
+ " ON INSERT TO sushilog "
+ " WHERE (EXISTS ( SELECT sushilog.source, sushilog.repository,"
+ "sushilog.rid, sushilog.date "
+ "FROM sushilog "
+ "WHERE sushilog.source = new.source AND sushilog.repository = new.repository AND sushilog.rid = new.rid AND sushilog.date = new.date AND sushilog.metric_type = new.metric_type)) DO INSTEAD NOTHING;";
stmtHive.executeUpdate(sqlcreateRuleSushiLog);
String createSushiIndex = "create index if not exists sushilog_duplicates on sushilog(source, repository, rid, date, metric_type);";
stmtHive.executeUpdate(createSushiIndex);
stmtHive.close();
ConnectDB.getHiveConnection().close();
logger.info("Sushi Tables Created");
} catch (Exception e) {
logger.error("Failed to create tables: " + e);
throw new Exception("Failed to create tables: " + e.toString(), e);
}
}
public void reCreateLogDirs() throws IOException {
FileSystem dfs = FileSystem.get(new Configuration());
logger.info("Deleting sarcsReport (Array) directory: " + ExecuteWorkflow.sarcsReportPathArray);
dfs.delete(new Path(ExecuteWorkflow.sarcsReportPathArray), true);
logger.info("Deleting sarcsReport (NonArray) directory: " + ExecuteWorkflow.sarcsReportPathNonArray);
dfs.delete(new Path(ExecuteWorkflow.sarcsReportPathNonArray), true);
logger.info("Creating sarcsReport (Array) directory: " + ExecuteWorkflow.sarcsReportPathArray);
dfs.mkdirs(new Path(ExecuteWorkflow.sarcsReportPathArray));
logger.info("Creating sarcsReport (NonArray) directory: " + ExecuteWorkflow.sarcsReportPathNonArray);
dfs.mkdirs(new Path(ExecuteWorkflow.sarcsReportPathNonArray));
}
public void processSarc(String sarcsReportPathArray, String sarcsReportPathNonArray) throws Exception {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
logger.info("Adding JSON Serde jar");
stmt.executeUpdate("add jar /usr/share/cmf/common_jars/hive-hcatalog-core-1.1.0-cdh5.14.0.jar");
logger.info("Added JSON Serde jar");
logger.info("Dropping sarc_sushilogtmp_json_array table");
String drop_sarc_sushilogtmp_json_array = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_array";
stmt.executeUpdate(drop_sarc_sushilogtmp_json_array);
logger.info("Dropped sarc_sushilogtmp_json_array table");
logger.info("Creating sarc_sushilogtmp_json_array table");
String create_sarc_sushilogtmp_json_array = "CREATE EXTERNAL TABLE IF NOT EXISTS " +
ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_array(\n" +
" `ItemIdentifier` ARRAY<\n" +
" struct<\n" +
" `Type`: STRING,\n" +
" `Value`: STRING\n" +
" >\n" +
" >,\n" +
" `ItemPerformance` struct<\n" +
" `Period`: struct<\n" +
" `Begin`: STRING,\n" +
" `End`: STRING\n" +
" >,\n" +
" `Instance`: struct<\n" +
" `Count`: STRING,\n" +
" `MetricType`: STRING\n" +
" >\n" +
" >\n" +
")" +
"ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n" +
"LOCATION '" + sarcsReportPathArray + "/'\n" +
"TBLPROPERTIES (\"transactional\"=\"false\")";
stmt.executeUpdate(create_sarc_sushilogtmp_json_array);
logger.info("Created sarc_sushilogtmp_json_array table");
logger.info("Dropping sarc_sushilogtmp_json_non_array table");
String drop_sarc_sushilogtmp_json_non_array = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".sarc_sushilogtmp_json_non_array";
stmt.executeUpdate(drop_sarc_sushilogtmp_json_non_array);
logger.info("Dropped sarc_sushilogtmp_json_non_array table");
logger.info("Creating sarc_sushilogtmp_json_non_array table");
String create_sarc_sushilogtmp_json_non_array = "CREATE EXTERNAL TABLE IF NOT EXISTS " +
ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_non_array (\n" +
" `ItemIdentifier` struct<\n" +
" `Type`: STRING,\n" +
" `Value`: STRING\n" +
" >,\n" +
" `ItemPerformance` struct<\n" +
" `Period`: struct<\n" +
" `Begin`: STRING,\n" +
" `End`: STRING\n" +
" >,\n" +
" `Instance`: struct<\n" +
" `Count`: STRING,\n" +
" `MetricType`: STRING\n" +
" >\n" +
" >" +
")" +
"ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n" +
"LOCATION '" + sarcsReportPathNonArray + "/'\n" +
"TBLPROPERTIES (\"transactional\"=\"false\")";
stmt.executeUpdate(create_sarc_sushilogtmp_json_non_array);
logger.info("Created sarc_sushilogtmp_json_non_array table");
logger.info("Creating sarc_sushilogtmp table");
String create_sarc_sushilogtmp = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ".sarc_sushilogtmp(source STRING, repository STRING, " +
"rid STRING, date STRING, metric_type STRING, count INT) clustered by (source) into 100 buckets stored as orc "
+
"tblproperties('transactional'='true')";
stmt.executeUpdate(create_sarc_sushilogtmp);
logger.info("Created sarc_sushilogtmp table");
logger.info("Inserting to sarc_sushilogtmp table (sarc_sushilogtmp_json_array)");
String insert_sarc_sushilogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp " +
"SELECT 'SARC-OJS', split(split(INPUT__FILE__NAME,'SarcsARReport_')[1],'_')[0], " +
" `ItemIdent`.`Value`, `ItemPerformance`.`Period`.`Begin`, " +
"`ItemPerformance`.`Instance`.`MetricType`, `ItemPerformance`.`Instance`.`Count` " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_array " +
"LATERAL VIEW posexplode(ItemIdentifier) ItemIdentifierTable AS seqi, ItemIdent " +
"WHERE `ItemIdent`.`Type`='DOI'";
stmt.executeUpdate(insert_sarc_sushilogtmp);
logger.info("Inserted to sarc_sushilogtmp table (sarc_sushilogtmp_json_array)");
logger.info("Inserting to sarc_sushilogtmp table (sarc_sushilogtmp_json_non_array)");
insert_sarc_sushilogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp " +
"SELECT 'SARC-OJS', split(split(INPUT__FILE__NAME,'SarcsARReport_')[1],'_')[0], " +
"`ItemIdentifier`.`Value`, `ItemPerformance`.`Period`.`Begin`, " +
"`ItemPerformance`.`Instance`.`MetricType`, `ItemPerformance`.`Instance`.`Count` " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_non_array";
stmt.executeUpdate(insert_sarc_sushilogtmp);
logger.info("Inserted to sarc_sushilogtmp table (sarc_sushilogtmp_json_non_array)");
ConnectDB.getHiveConnection().close();
}
public void getAndProcessSarc(String sarcsReportPathArray, String sarcsReportPathNonArray) throws Exception {
Statement stmt = ConnectDB.getHiveConnection().createStatement();
logger.info("Creating sushilog table");
String createSushilog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ".sushilog " +
"(`source` string, " +
"`repository` string, " +
"`rid` string, " +
"`date` string, " +
"`metric_type` string, " +
"`count` int)";
stmt.executeUpdate(createSushilog);
logger.info("Created sushilog table");
logger.info("Dropping sarc_sushilogtmp table");
String drop_sarc_sushilogtmp = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".sarc_sushilogtmp";
stmt.executeUpdate(drop_sarc_sushilogtmp);
logger.info("Dropped sarc_sushilogtmp table");
ConnectDB.getHiveConnection().close();
List<String[]> issnAndUrls = new ArrayList<String[]>();
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/motricidade/sushiLite/v1_7/", "1646-107X"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/antropologicas/sushiLite/v1_7/", "0873-819X"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/interaccoes/sushiLite/v1_7/", "1646-2335"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/cct/sushiLite/v1_7/", "2182-3030"
});
issnAndUrls.add(new String[] {
"https://actapediatrica.spp.pt/sushiLite/v1_7/", "0873-9781"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/sociologiapp/sushiLite/v1_7/", "0873-6529"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/finisterra/sushiLite/v1_7/", "0430-5027"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/sisyphus/sushiLite/v1_7/", "2182-8474"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/anestesiologia/sushiLite/v1_7/", "0871-6099"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/rpe/sushiLite/v1_7/", "0871-9187"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/psilogos/sushiLite/v1_7/", "1646-091X"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/juridica/sushiLite/v1_7/", "2183-5799"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/ecr/sushiLite/v1_7/", "1647-2098"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/nascercrescer/sushiLite/v1_7/", "0872-0754"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/cea/sushiLite/v1_7/", "1645-3794"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/proelium/sushiLite/v1_7/", "1645-8826"
});
issnAndUrls.add(new String[] {
"https://revistas.rcaap.pt/millenium/sushiLite/v1_7/", "0873-3015"
});
if (ExecuteWorkflow.sarcNumberOfIssnToDownload > 0 &&
ExecuteWorkflow.sarcNumberOfIssnToDownload <= issnAndUrls.size()) {
logger.info("Trimming siteIds list to the size of: " + ExecuteWorkflow.sarcNumberOfIssnToDownload);
issnAndUrls = issnAndUrls.subList(0, ExecuteWorkflow.sarcNumberOfIssnToDownload);
}
logger.info("(getAndProcessSarc) Downloading the followins opendoars: " + issnAndUrls);
for (String[] issnAndUrl : issnAndUrls) {
logger.info("Now working on ISSN: " + issnAndUrl[1]);
getARReport(sarcsReportPathArray, sarcsReportPathNonArray, issnAndUrl[0], issnAndUrl[1]);
}
}
public void finalizeSarcStats() throws Exception {
stmtHive = ConnectDB.getHiveConnection().createStatement();
stmtImpala = ConnectDB.getImpalaConnection().createStatement();
logger.info("Creating downloads_stats table");
String createDownloadsStats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ".downloads_stats " +
"(`source` string, " +
"`repository_id` string, " +
"`result_id` string, " +
"`date` string, " +
"`count` bigint, " +
"`openaire` bigint)";
stmtHive.executeUpdate(createDownloadsStats);
logger.info("Created downloads_stats table");
logger.info("Dropping sarc_sushilogtmp_impala table");
String drop_sarc_sushilogtmp_impala = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".sarc_sushilogtmp_impala";
stmtHive.executeUpdate(drop_sarc_sushilogtmp_impala);
logger.info("Dropped sarc_sushilogtmp_impala table");
logger.info("Creating sarc_sushilogtmp_impala, a table readable by impala");
String createSarcSushilogtmpImpala = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ".sarc_sushilogtmp_impala " +
"STORED AS PARQUET AS SELECT * FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp";
stmtHive.executeUpdate(createSarcSushilogtmpImpala);
logger.info("Created sarc_sushilogtmp_impala");
logger.info("Making sarc_sushilogtmp visible to impala");
String invalidateMetadata = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema()
+ ".sarc_sushilogtmp_impala;";
stmtImpala.executeUpdate(invalidateMetadata);
logger.info("Dropping downloads_stats_impala table");
String drop_downloads_stats_impala = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".downloads_stats_impala";
stmtHive.executeUpdate(drop_downloads_stats_impala);
logger.info("Dropped downloads_stats_impala table");
logger.info("Making downloads_stats_impala deletion visible to impala");
try {
String invalidateMetadataDownloadsStatsImpala = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema()
+ ".downloads_stats_impala;";
stmtImpala.executeUpdate(invalidateMetadataDownloadsStatsImpala);
} catch (SQLException sqle) {
}
// We run the following query in Impala because it is faster
logger.info("Creating downloads_stats_impala");
String createDownloadsStatsImpala = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema()
+ ".downloads_stats_impala AS " +
"SELECT s.source, d.id AS repository_id, " +
"ro.id as result_id, CONCAT(CAST(YEAR(`date`) AS STRING), '/', " +
"LPAD(CAST(MONTH(`date`) AS STRING), 2, '0')) AS `date`, s.count, '0' " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_impala s, " +
ConnectDB.getStatsDBSchema() + ".datasource_oids d, " +
ConnectDB.getStatsDBSchema() + ".datasource_results dr, " +
ConnectDB.getStatsDBSchema() + ".result_pids ro " +
"WHERE d.oid LIKE CONCAT('%', s.repository, '%') AND dr.id=d.id AND dr.result=ro.id AND " +
"s.rid=ro.pid AND ro.type='Digital Object Identifier' AND metric_type='ft_total' AND s.source='SARC-OJS'";
stmtImpala.executeUpdate(createDownloadsStatsImpala);
logger.info("Creating downloads_stats_impala");
// Insert into downloads_stats
logger.info("Inserting data from downloads_stats_impala into downloads_stats");
String insertDStats = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema()
+ ".downloads_stats SELECT * " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats_impala";
stmtHive.executeUpdate(insertDStats);
logger.info("Inserted into downloads_stats");
logger.info("Creating sushilog table");
String createSushilog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
+ ".sushilog " +
"(`source` string, " +
"`repository_id` string, " +
"`rid` string, " +
"`date` string, " +
"`metric_type` string, " +
"`count` int)";
stmtHive.executeUpdate(createSushilog);
logger.info("Created sushilog table");
// Insert into sushilog
logger.info("Inserting into sushilog");
String insertSushiLog = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema()
+ ".sushilog SELECT * " + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp";
stmtHive.executeUpdate(insertSushiLog);
logger.info("Inserted into sushilog");
stmtHive.close();
ConnectDB.getHiveConnection().close();
}
public void getARReport(String sarcsReportPathArray, String sarcsReportPathNonArray,
String url, String issn) throws Exception {
logger.info("Processing SARC! issn: " + issn + " with url: " + url);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM");
// Setting the starting period
Calendar start = (Calendar) ExecuteWorkflow.startingLogPeriod.clone();
logger.info("(getARReport) Starting period for log download: " + simpleDateFormat.format(start.getTime()));
// Setting the ending period (last day of the month)
Calendar end = (Calendar) ExecuteWorkflow.endingLogPeriod.clone();
end.add(Calendar.MONTH, +1);
end.add(Calendar.DAY_OF_MONTH, -1);
logger.info("(getARReport) Ending period for log download: " + simpleDateFormat.format(end.getTime()));
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
PreparedStatement st = ConnectDB
.getHiveConnection()
.prepareStatement(
"SELECT max(date) FROM " + ConnectDB.getUsageStatsDBSchema() + ".sushilog WHERE repository=?");
st.setString(1, issn);
ResultSet rs_date = st.executeQuery();
while (rs_date.next()) {
if (rs_date.getString(1) != null && !rs_date.getString(1).equals("null")
&& !rs_date.getString(1).equals("")) {
start.setTime(sdf.parse(rs_date.getString(1)));
}
}
rs_date.close();
// Creating the needed configuration for the correct storing of data
Configuration config = new Configuration();
config.addResource(new Path("/etc/hadoop/conf/core-site.xml"));
config.addResource(new Path("/etc/hadoop/conf/hdfs-site.xml"));
config
.set(
"fs.hdfs.impl",
org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
config
.set(
"fs.file.impl",
org.apache.hadoop.fs.LocalFileSystem.class.getName());
FileSystem dfs = FileSystem.get(config);
while (start.before(end)) {
String reportUrl = url + "GetReport/?Report=AR1&Format=json&BeginDate="
+ simpleDateFormat.format(start.getTime()) + "&EndDate=" + simpleDateFormat.format(start.getTime());
start.add(Calendar.MONTH, 1);
logger.info("(getARReport) Getting report: " + reportUrl);
String text = getJson(reportUrl);
if (text == null) {
continue;
}
JSONParser parser = new JSONParser();
JSONObject jsonObject = null;
try {
jsonObject = (JSONObject) parser.parse(text);
}
// if there is a parsing error continue with the next url
catch (ParseException pe) {
continue;
}
jsonObject = (JSONObject) jsonObject.get("sc:ReportResponse");
jsonObject = (JSONObject) jsonObject.get("sc:Report");
if (jsonObject == null) {
continue;
}
jsonObject = (JSONObject) jsonObject.get("c:Report");
jsonObject = (JSONObject) jsonObject.get("c:Customer");
Object obj = jsonObject.get("c:ReportItems");
JSONArray jsonArray = new JSONArray();
if (obj instanceof JSONObject) {
jsonArray.add(obj);
} else {
jsonArray = (JSONArray) obj;
// jsonArray = (JSONArray) jsonObject.get("c:ReportItems");
}
if (jsonArray == null) {
continue;
}
// Creating the file in the filesystem for the ItemIdentifier as array object
String filePathArray = sarcsReportPathArray + "/SarcsARReport_" + issn + "_" +
simpleDateFormat.format(start.getTime()) + ".json";
logger.info("Storing to file: " + filePathArray);
FSDataOutputStream finArray = dfs.create(new Path(filePathArray), true);
// Creating the file in the filesystem for the ItemIdentifier as array object
String filePathNonArray = sarcsReportPathNonArray + "/SarcsARReport_" + issn + "_" +
simpleDateFormat.format(start.getTime()) + ".json";
logger.info("Storing to file: " + filePathNonArray);
FSDataOutputStream finNonArray = dfs.create(new Path(filePathNonArray), true);
for (Object aJsonArray : jsonArray) {
JSONObject jsonObjectRow = (JSONObject) aJsonArray;
renameKeysRecursively(":", jsonObjectRow);
if (jsonObjectRow.get("ItemIdentifier") instanceof JSONObject) {
finNonArray.write(jsonObjectRow.toJSONString().getBytes());
finNonArray.writeChar('\n');
} else {
finArray.write(jsonObjectRow.toJSONString().getBytes());
finArray.writeChar('\n');
}
}
finArray.close();
finNonArray.close();
// Check the file size and if it is too big, delete it
File fileArray = new File(filePathArray);
if (fileArray.length() == 0)
fileArray.delete();
File fileNonArray = new File(filePathNonArray);
if (fileNonArray.length() == 0)
fileNonArray.delete();
}
dfs.close();
ConnectDB.getHiveConnection().close();
}
private void renameKeysRecursively(String delimiter, JSONArray givenJsonObj) throws Exception {
for (Object jjval : givenJsonObj) {
if (jjval instanceof JSONArray)
renameKeysRecursively(delimiter, (JSONArray) jjval);
else if (jjval instanceof JSONObject)
renameKeysRecursively(delimiter, (JSONObject) jjval);
// All other types of vals
else
;
}
}
private void renameKeysRecursively(String delimiter, JSONObject givenJsonObj) throws Exception {
Set<String> jkeys = new HashSet<String>(givenJsonObj.keySet());
for (String jkey : jkeys) {
String[] splitArray = jkey.split(delimiter);
String newJkey = splitArray[splitArray.length - 1];
Object jval = givenJsonObj.get(jkey);
givenJsonObj.remove(jkey);
givenJsonObj.put(newJkey, jval);
if (jval instanceof JSONObject)
renameKeysRecursively(delimiter, (JSONObject) jval);
if (jval instanceof JSONArray) {
renameKeysRecursively(delimiter, (JSONArray) jval);
}
}
}
private String getJson(String url) throws Exception {
// String cred=username+":"+password;
// String encoded = new sun.misc.BASE64Encoder().encode (cred.getBytes());
try {
URL website = new URL(url);
URLConnection connection = website.openConnection();
// connection.setRequestProperty ("Authorization", "Basic "+encoded);
StringBuilder response;
try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
response = new StringBuilder();
String inputLine;
while ((inputLine = in.readLine()) != null) {
response.append(inputLine);
response.append("\n");
}
}
return response.toString();
} catch (Exception e) {
// Logging error and silently continuing
logger.error("Failed to get URL: " + e);
System.out.println("Failed to get URL: " + e);
// return null;
// throw new Exception("Failed to get URL: " + e.toString(), e);
}
return "";
}
}

View File

@ -0,0 +1,248 @@
package eu.dnetlib.oa.graph.usagestats.export;
import java.io.IOException;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Main class for downloading and processing Usage statistics
*
* @author D. Pierrakos, S. Zoupanos
*/
public class UsageStatsExporter {
public UsageStatsExporter() {
}
private static final Logger logger = LoggerFactory.getLogger(UsageStatsExporter.class);
private void reCreateLogDirs() throws IllegalArgumentException, IOException {
FileSystem dfs = FileSystem.get(new Configuration());
logger.info("Deleting repoLog directory: " + ExecuteWorkflow.repoLogPath);
dfs.delete(new Path(ExecuteWorkflow.repoLogPath), true);
logger.info("Deleting portalLog directory: " + ExecuteWorkflow.portalLogPath);
dfs.delete(new Path(ExecuteWorkflow.portalLogPath), true);
logger.info("Deleting lareferenciaLog directory: " + ExecuteWorkflow.lareferenciaLogPath);
dfs.delete(new Path(ExecuteWorkflow.lareferenciaLogPath), true);
logger.info("Creating repoLog directory: " + ExecuteWorkflow.repoLogPath);
dfs.mkdirs(new Path(ExecuteWorkflow.repoLogPath));
logger.info("Creating portalLog directory: " + ExecuteWorkflow.portalLogPath);
dfs.mkdirs(new Path(ExecuteWorkflow.portalLogPath));
logger.info("Creating lareferenciaLog directory: " + ExecuteWorkflow.lareferenciaLogPath);
dfs.mkdirs(new Path(ExecuteWorkflow.lareferenciaLogPath));
}
public void export() throws Exception {
logger.info("Initialising DB properties");
ConnectDB.init();
// runImpalaQuery();
PiwikStatsDB piwikstatsdb = new PiwikStatsDB(ExecuteWorkflow.repoLogPath, ExecuteWorkflow.portalLogPath);
logger.info("Re-creating database and tables");
if (ExecuteWorkflow.recreateDbAndTables)
piwikstatsdb.recreateDBAndTables();
;
logger.info("Initializing the download logs module");
PiwikDownloadLogs piwd = new PiwikDownloadLogs(ExecuteWorkflow.matomoBaseURL, ExecuteWorkflow.matomoAuthToken);
if (ExecuteWorkflow.piwikEmptyDirs) {
logger.info("Recreating Piwik log directories");
piwikstatsdb.reCreateLogDirs();
}
// Downloading piwik logs (also managing directory creation)
if (ExecuteWorkflow.downloadPiwikLogs) {
logger.info("Downloading piwik logs");
piwd
.GetOpenAIRELogs(
ExecuteWorkflow.repoLogPath,
ExecuteWorkflow.portalLogPath, ExecuteWorkflow.portalMatomoID);
}
logger.info("Downloaded piwik logs");
// Create DB tables, insert/update statistics
String cRobotsUrl = "https://raw.githubusercontent.com/atmire/COUNTER-Robots/master/COUNTER_Robots_list.json";
piwikstatsdb.setCounterRobotsURL(cRobotsUrl);
if (ExecuteWorkflow.processPiwikLogs) {
logger.info("Processing logs");
piwikstatsdb.processLogs();
}
logger.info("Creating LaReferencia tables");
LaReferenciaDownloadLogs lrf = new LaReferenciaDownloadLogs(ExecuteWorkflow.lareferenciaBaseURL,
ExecuteWorkflow.lareferenciaAuthToken);
if (ExecuteWorkflow.laReferenciaEmptyDirs) {
logger.info("Recreating LaReferencia log directories");
lrf.reCreateLogDirs();
}
if (ExecuteWorkflow.downloadLaReferenciaLogs) {
logger.info("Downloading LaReferencia logs");
lrf.GetLaReferenciaRepos(ExecuteWorkflow.lareferenciaLogPath);
logger.info("Downloaded LaReferencia logs");
}
LaReferenciaStats lastats = new LaReferenciaStats(ExecuteWorkflow.lareferenciaLogPath);
if (ExecuteWorkflow.processLaReferenciaLogs) {
logger.info("Processing LaReferencia logs");
lastats.processLogs();
logger.info("LaReferencia logs done");
}
IrusStats irusstats = new IrusStats(ExecuteWorkflow.irusUKBaseURL);
if (ExecuteWorkflow.irusCreateTablesEmptyDirs) {
logger.info("Creating Irus Stats tables");
irusstats.createTables();
logger.info("Created Irus Stats tables");
logger.info("Re-create log dirs");
irusstats.reCreateLogDirs();
logger.info("Re-created log dirs");
}
if (ExecuteWorkflow.irusDownloadReports) {
irusstats.getIrusRRReport(ExecuteWorkflow.irusUKReportPath);
}
if (ExecuteWorkflow.irusProcessStats) {
irusstats.processIrusStats();
logger.info("Irus done");
}
SarcStats sarcStats = new SarcStats();
if (ExecuteWorkflow.sarcCreateTablesEmptyDirs) {
sarcStats.reCreateLogDirs();
}
if (ExecuteWorkflow.sarcDownloadReports) {
sarcStats.getAndProcessSarc(ExecuteWorkflow.sarcsReportPathArray, ExecuteWorkflow.sarcsReportPathNonArray);
}
if (ExecuteWorkflow.sarcProcessStats) {
sarcStats.processSarc(ExecuteWorkflow.sarcsReportPathArray, ExecuteWorkflow.sarcsReportPathNonArray);
sarcStats.finalizeSarcStats();
}
logger.info("Sarc done");
// finalize usagestats
if (ExecuteWorkflow.finalizeStats) {
piwikstatsdb.finalizeStats();
logger.info("Finalized stats");
}
// Clean duplicates from main tables
if (ExecuteWorkflow.cleanDuplicates) {
cleanDuplicates();
logger.info("Finalized stats");
}
// Make the tables available to Impala
if (ExecuteWorkflow.finalTablesVisibleToImpala) {
logger.info("Making tables visible to Impala");
invalidateMetadata();
}
logger.info("End");
}
private void cleanDuplicates() throws SQLException {
Statement stmt = null;
stmt = ConnectDB.getHiveConnection().createStatement();
logger.info("Dropping downloads_stats_with_duplicates table");
String dropOldDStatsTmpTable = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".downloads_stats_with_duplicates";
stmt.executeUpdate(dropOldDStatsTmpTable);
// Give to downloads_stats a temporary name
logger.info("Renaming downloads_stats to downloads_stats_with_duplicates");
String renameDStats = "ALTER TABLE " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats RENAME TO " +
ConnectDB.getUsageStatsDBSchema() + ".downloads_stats_with_duplicates";
stmt.executeUpdate(renameDStats);
// Create a new table free from duplicates
logger.info("Creating a downloads_stats table free from duplicates");
String cleanDStats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats " +
"AS SELECT DISTINCT * " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats_with_duplicates";
stmt.executeUpdate(cleanDStats);
logger.info("Dropping views_stats_with_duplicates table");
String dropOldVStatsTmpTable = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".views_stats_with_duplicates";
stmt.executeUpdate(dropOldVStatsTmpTable);
// Give to views_stats a temporary name
logger.info("Renaming views_stats to views_stats_with_duplicates");
String renameVStats = "ALTER TABLE " + ConnectDB.getUsageStatsDBSchema() + ".views_stats RENAME TO " +
ConnectDB.getUsageStatsDBSchema() + ".views_stats_with_duplicates";
stmt.executeUpdate(renameVStats);
// Create a new table free from duplicates
logger.info("Creating a views_stats table free from duplicates");
String cleanVStats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".views_stats " +
"AS SELECT DISTINCT * " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".views_stats_with_duplicates";
stmt.executeUpdate(cleanVStats);
logger.info("Dropping usage_stats_with_duplicates table");
String dropOldUStatsTmpTable = "DROP TABLE IF EXISTS " +
ConnectDB.getUsageStatsDBSchema() +
".usage_stats_with_duplicates";
stmt.executeUpdate(dropOldUStatsTmpTable);
// Give to downloads_stats a temporary name
logger.info("Renaming usage_stats to usage_stats_with_duplicates");
String renameUStats = "ALTER TABLE " + ConnectDB.getUsageStatsDBSchema() + ".usage_stats RENAME TO " +
ConnectDB.getUsageStatsDBSchema() + ".usage_stats_with_duplicates";
stmt.executeUpdate(renameUStats);
// Create a new table free from duplicates
logger.info("Creating a usage_stats table free from duplicates");
String cleanUStats = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema() + ".usage_stats " +
"AS SELECT DISTINCT * " +
"FROM " + ConnectDB.getUsageStatsDBSchema() + ".usage_stats_with_duplicates";
stmt.executeUpdate(cleanUStats);
stmt.close();
ConnectDB.getHiveConnection().close();
}
private void invalidateMetadata() throws SQLException {
Statement stmt = null;
stmt = ConnectDB.getImpalaConnection().createStatement();
String sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".downloads_stats";
stmt.executeUpdate(sql);
sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".views_stats";
stmt.executeUpdate(sql);
sql = "INVALIDATE METADATA " + ConnectDB.getUsageStatsDBSchema() + ".usage_stats";
stmt.executeUpdate(sql);
stmt.close();
ConnectDB.getHiveConnection().close();
}
}

View File

@ -0,0 +1,236 @@
[
{
"paramName": "mat",
"paramLongName": "matomoAuthToken",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "mbu",
"paramLongName": "matomoBaseURL",
"paramDescription": "URL of the isLookUp Service",
"paramRequired": true
},
{
"paramName": "rlp",
"paramLongName": "repoLogPath",
"paramDescription": "nameNode of the source cluster",
"paramRequired": true
},
{
"paramName": "plp",
"paramLongName": "portalLogPath",
"paramDescription": "namoNode of the target cluster",
"paramRequired": true
},
{
"paramName": "pmi",
"paramLongName": "portalMatomoID",
"paramDescription": "namoNode of the target cluster",
"paramRequired": true
},
{
"paramName": "iukbuw",
"paramLongName": "irusUKBaseURL",
"paramDescription": "working directory",
"paramRequired": true
},
{
"paramName": "iukrp",
"paramLongName": "irusUKReportPath",
"paramDescription": "maximum number of map tasks used in the distcp process",
"paramRequired": true
},
{
"paramName": "srpa",
"paramLongName": "sarcsReportPathArray",
"paramDescription": "memory for distcp action copying actionsets from remote cluster",
"paramRequired": true
},
{
"paramName": "srpna",
"paramLongName": "sarcsReportPathNonArray",
"paramDescription": "timeout for distcp copying actions from remote cluster",
"paramRequired": true
},
{
"paramName": "llp",
"paramLongName": "lareferenciaLogPath",
"paramDescription": "activate tranform-only mode. Only apply transformation step",
"paramRequired": true
},
{
"paramName": "lbu",
"paramLongName": "lareferenciaBaseURL",
"paramDescription": "activate tranform-only mode. Only apply transformation step",
"paramRequired": true
},
{
"paramName": "lat",
"paramLongName": "lareferenciaAuthToken",
"paramDescription": "activate tranform-only mode. Only apply transformation step",
"paramRequired": true
},
{
"paramName": "dbhu",
"paramLongName": "dbHiveUrl",
"paramDescription": "activate tranform-only mode. Only apply transformation step",
"paramRequired": true
},
{
"paramName": "dbiu",
"paramLongName": "dbImpalaUrl",
"paramDescription": "activate tranform-only mode. Only apply transformation step",
"paramRequired": true
},
{
"paramName": "usdbs",
"paramLongName": "usageStatsDBSchema",
"paramDescription": "activate tranform-only mode. Only apply transformation step",
"paramRequired": true
},
{
"paramName": "sdbs",
"paramLongName": "statsDBSchema",
"paramDescription": "activate tranform-only mode. Only apply transformation step",
"paramRequired": true
},
{
"paramName": "rdbt",
"paramLongName": "recreateDbAndTables",
"paramDescription": "Re-create database and initial tables?",
"paramRequired": true
},
{
"paramName": "pwed",
"paramLongName": "piwikEmptyDirs",
"paramDescription": "Empty piwik directories?",
"paramRequired": true
},
{
"paramName": "ppwl",
"paramLongName": "processPiwikLogs",
"paramDescription": "Process the piwiklogs (create & fill in the needed tables and process the data) based on the downloaded data",
"paramRequired": true
},
{
"paramName": "dpwl",
"paramLongName": "downloadPiwikLogs",
"paramDescription": "download piwik logs?",
"paramRequired": true
},
{
"paramName": "slp",
"paramLongName": "startingLogPeriod",
"paramDescription": "Starting log period",
"paramRequired": true
},
{
"paramName": "elp",
"paramLongName": "endingLogPeriod",
"paramDescription": "Ending log period",
"paramRequired": true
},
{
"paramName": "npidd",
"paramLongName": "numberOfPiwikIdsToDownload",
"paramDescription": "Limit the number of the downloaded piwikids to the first numberOfPiwikIdsToDownload",
"paramRequired": true
},
{
"paramName": "nsidd",
"paramLongName": "numberOfSiteIdsToDownload",
"paramDescription": "Limit the number of the downloaded siteids (La Referencia logs) to the first numberOfSiteIdsToDownload",
"paramRequired": true
},
{
"paramName": "lerd",
"paramLongName": "laReferenciaEmptyDirs",
"paramDescription": "Empty LaReferencia directories?",
"paramRequired": true
},
{
"paramName": "plrl",
"paramLongName": "processLaReferenciaLogs",
"paramDescription": "Process the La Referencia logs (create & fill in the needed tables and process the data) based on the downloaded data",
"paramRequired": true
},
{
"paramName": "dlrl",
"paramLongName": "downloadLaReferenciaLogs",
"paramDescription": "download La Referencia logs?",
"paramRequired": true
},
{
"paramName": "icted",
"paramLongName": "irusCreateTablesEmptyDirs",
"paramDescription": "Irus section: Create tables and empty JSON directories?",
"paramRequired": true
},
{
"paramName": "idr",
"paramLongName": "irusDownloadReports",
"paramDescription": "Irus section: Download reports?",
"paramRequired": true
},
{
"paramName": "ipr",
"paramLongName": "irusProcessStats",
"paramDescription": "Irus section: Process stats?",
"paramRequired": true
},
{
"paramName": "inod",
"paramLongName": "irusNumberOfOpendoarsToDownload",
"paramDescription": "Limit the number of the downloaded Opendoars (Irus) to the first irusNumberOfOpendoarsToDownload",
"paramRequired": true
},
{
"paramName": "icted",
"paramLongName": "sarcCreateTablesEmptyDirs",
"paramDescription": "Sarc section: Create tables and empty JSON directories?",
"paramRequired": true
},
{
"paramName": "idr",
"paramLongName": "sarcDownloadReports",
"paramDescription": "Sarc section: Download reports?",
"paramRequired": true
},
{
"paramName": "ipr",
"paramLongName": "sarcProcessStats",
"paramDescription": "Sarc section: Process stats?",
"paramRequired": true
},
{
"paramName": "inod",
"paramLongName": "sarcNumberOfIssnToDownload",
"paramDescription": "Limit the number of the downloaded ISSN (Sarc) to the first sarcNumberOfIssnToDownload?",
"paramRequired": true
},
{
"paramName": "fs",
"paramLongName": "finalizeStats",
"paramDescription": "Create the usage_stats table?",
"paramRequired": true
},
{
"paramName": "cd",
"paramLongName": "cleanDuplicates",
"paramDescription": "Clean duplicates from main Stats tables (usage_stats, views_stats and downloads_stats)?",
"paramRequired": true
},
{
"paramName": "ftvi",
"paramLongName": "finalTablesVisibleToImpala",
"paramDescription": "Make the usage_stats, views_stats and downloads_stats tables visible to Impala?",
"paramRequired": true
},
{
"paramName": "nodt",
"paramLongName": "numberOfDownloadThreads",
"paramDescription": "Number of download threads",
"paramRequired": true
}
]

View File

@ -0,0 +1,38 @@
<configuration>
<property>
<name>jobTracker</name>
<value>${jobTracker}</value>
</property>
<property>
<name>nameNode</name>
<value>${nameNode}</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000/;UseNativeQuery=1</value>
</property>
<property>
<name>impalaJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-gw.ocean.icm.edu.pl:21050/;auth=noSasl;</value>
</property>
<property>
<name>oozie.wf.workflow.notification.url</name>
<value>{serviceUrl}/v1/oozieNotification/jobUpdate?jobId=$jobId%26status=$status</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,91 @@
<workflow-app name="Usage Graph Stats" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>hiveMetastoreUris</name>
<description>Hive server metastore URIs</description>
</property>
<property>
<name>hiveJdbcUrl</name>
<description>Hive server jdbc url</description>
</property>
<property>
<name>impalaJdbcUrl</name>
<description>Impala server jdbc url</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>${hiveMetastoreUris}</value>
</property>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
</configuration>
</global>
<start to="Step1"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name='Step1'>
<java>
<main-class>eu.dnetlib.oa.graph.usagestats.export.ExecuteWorkflow</main-class>
<arg>--matomoAuthToken</arg><arg>${matomoAuthToken}</arg>
<arg>--matomoBaseURL</arg><arg>${matomoBaseURL}</arg>
<arg>--repoLogPath</arg><arg>${repoLogPath}</arg>
<arg>--portalLogPath</arg><arg>${portalLogPath}</arg>
<arg>--portalMatomoID</arg><arg>${portalMatomoID}</arg>
<arg>--irusUKBaseURL</arg><arg>${irusUKBaseURL}</arg>
<arg>--irusUKReportPath</arg><arg>${irusUKReportPath}</arg>
<arg>--sarcsReportPathArray</arg><arg>${sarcsReportPathArray}</arg>
<arg>--sarcsReportPathNonArray</arg><arg>${sarcsReportPathNonArray}</arg>
<arg>--lareferenciaLogPath</arg><arg>${lareferenciaLogPath}</arg>
<arg>--lareferenciaBaseURL</arg><arg>${lareferenciaBaseURL}</arg>
<arg>--lareferenciaAuthToken</arg><arg>${lareferenciaAuthToken}</arg>
<arg>--dbHiveUrl</arg><arg>${hiveJdbcUrl}</arg>
<arg>--dbImpalaUrl</arg><arg>${impalaJdbcUrl}</arg>
<arg>--usageStatsDBSchema</arg><arg>${usageStatsDBSchema}</arg>
<arg>--statsDBSchema</arg><arg>${statsDBSchema}</arg>
<arg>--recreateDbAndTables</arg><arg>${recreateDbAndTables}</arg>
<arg>--piwikEmptyDirs</arg><arg>${piwikEmptyDirs}</arg>
<arg>--downloadPiwikLogs</arg><arg>${downloadPiwikLogs}</arg>
<arg>--processPiwikLogs</arg><arg>${processPiwikLogs}</arg>
<arg>--startingLogPeriod</arg><arg>${startingLogPeriod}</arg>
<arg>--endingLogPeriod</arg><arg>${endingLogPeriod}</arg>
<arg>--numberOfPiwikIdsToDownload</arg><arg>${numberOfPiwikIdsToDownload}</arg>
<arg>--numberOfSiteIdsToDownload</arg><arg>${numberOfSiteIdsToDownload}</arg>
<arg>--laReferenciaEmptyDirs</arg><arg>${laReferenciaEmptyDirs}</arg>
<arg>--downloadLaReferenciaLogs</arg><arg>${downloadLaReferenciaLogs}</arg>
<arg>--processLaReferenciaLogs</arg><arg>${processLaReferenciaLogs}</arg>
<arg>--irusCreateTablesEmptyDirs</arg><arg>${irusCreateTablesEmptyDirs}</arg>
<arg>--irusDownloadReports</arg><arg>${irusDownloadReports}</arg>
<arg>--irusProcessStats</arg><arg>${irusProcessStats}</arg>
<arg>--irusNumberOfOpendoarsToDownload</arg><arg>${irusNumberOfOpendoarsToDownload}</arg>
<arg>--sarcCreateTablesEmptyDirs</arg><arg>${sarcCreateTablesEmptyDirs}</arg>
<arg>--sarcDownloadReports</arg><arg>${sarcDownloadReports}</arg>
<arg>--sarcProcessStats</arg><arg>${sarcProcessStats}</arg>
<arg>--sarcNumberOfIssnToDownload</arg><arg>${sarcNumberOfIssnToDownload}</arg>
<arg>--finalizeStats</arg><arg>${finalizeStats}</arg>
<arg>--cleanDuplicates</arg><arg>${cleanDuplicates}</arg>
<arg>--finalTablesVisibleToImpala</arg><arg>${finalTablesVisibleToImpala}</arg>
<arg>--numberOfDownloadThreads</arg><arg>${numberOfDownloadThreads}</arg>
<capture-output/>
</java>
<ok to="End" />
<error to="Kill" />
</action>
<end name="End"/>
</workflow-app>

View File

@ -26,6 +26,9 @@
<module>dhp-dedup-scholexplorer</module>
<module>dhp-graph-provision-scholexplorer</module>
<module>dhp-stats-update</module>
<module>dhp-usage-stats-update</module>
<module>dhp-java-action-simple</module>
<module>dhp-java-action-dbconnection/</module>
<module>dhp-broker-events</module>
</modules>