forked from D-Net/dnet-hadoop
usage-raw-data workflow - refactoring
This commit is contained in:
parent
6ede4143a3
commit
ba33cd61d7
|
|
@ -16,7 +16,6 @@ import javax.xml.transform.*;
|
|||
import javax.xml.transform.dom.DOMSource;
|
||||
import javax.xml.transform.stream.StreamResult;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.spark.util.LongAccumulator;
|
||||
import org.dom4j.Document;
|
||||
|
|
@ -43,6 +42,7 @@ import eu.dnetlib.dhp.schema.common.ModelConstants;
|
|||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
|
||||
|
||||
public class XmlRecordFactory implements Serializable {
|
||||
|
||||
|
|
|
|||
|
|
@ -7,8 +7,6 @@ import java.io.IOException;
|
|||
import java.io.StringReader;
|
||||
import java.util.List;
|
||||
|
||||
import eu.dnetlib.dhp.oa.provision.utils.ContextDef;
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.dom4j.Document;
|
||||
import org.dom4j.DocumentException;
|
||||
|
|
@ -23,8 +21,10 @@ import com.google.common.collect.Lists;
|
|||
import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
|
||||
import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
|
||||
import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
|
||||
import eu.dnetlib.dhp.oa.provision.utils.ContextDef;
|
||||
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
|
||||
import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory;
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
|
|
@ -137,17 +137,18 @@ public class XmlRecordFactoryTest {
|
|||
@Test
|
||||
public void testEnermapsRecord() throws IOException, DocumentException {
|
||||
|
||||
String contextmap = "<entries><entry id=\"enermaps\" label=\"Energy Research\" name=\"context\" type=\"community\"/>" +
|
||||
"<entry id=\"enermaps::selection\" label=\"Featured dataset\" name=\"category\"/>"+
|
||||
"<entry id=\"enermaps::selection::tgs00004\" label=\"Dataset title\" name=\"concept\"/>"+
|
||||
"</entries>";
|
||||
String contextmap = "<entries><entry id=\"enermaps\" label=\"Energy Research\" name=\"context\" type=\"community\"/>"
|
||||
+
|
||||
"<entry id=\"enermaps::selection\" label=\"Featured dataset\" name=\"category\"/>" +
|
||||
"<entry id=\"enermaps::selection::tgs00004\" label=\"Dataset title\" name=\"concept\"/>" +
|
||||
"</entries>";
|
||||
|
||||
ContextMapper contextMapper = ContextMapper.fromXml(contextmap);
|
||||
XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, XmlConverterJob.schemaLocation,
|
||||
otherDsTypeId);
|
||||
otherDsTypeId);
|
||||
|
||||
Dataset d = OBJECT_MAPPER
|
||||
.readValue(IOUtils.toString(getClass().getResourceAsStream("enermaps.json")), Dataset.class);
|
||||
.readValue(IOUtils.toString(getClass().getResourceAsStream("enermaps.json")), Dataset.class);
|
||||
|
||||
JoinedEntity je = new JoinedEntity<>(d);
|
||||
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@
|
|||
<parent>
|
||||
<artifactId>dhp-workflows</artifactId>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<version>1.2.4-SNAPSHOT</version>
|
||||
<version>1.2.5-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>dhp-usage-raw-data-update</artifactId>
|
||||
|
|
@ -39,20 +39,18 @@
|
|||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
|
||||
<cdh.hive.version>0.13.1-cdh5.2.1</cdh.hive.version>
|
||||
<cdh.hadoop.version>2.5.0-cdh5.2.1</cdh.hadoop.version>
|
||||
<cdh.hive.version>1.1.0-cdh5.16.2</cdh.hive.version>
|
||||
<cdh.hadoop.version>2.6.0-cdh5.16.2</cdh.hadoop.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-core_2.11</artifactId>
|
||||
<version>2.2.0</version>
|
||||
<artifactId>spark-core_${scala.binary.version}</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-sql_2.11</artifactId>
|
||||
<version>2.4.5</version>
|
||||
<artifactId>spark-sql_${scala.binary.version}</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.googlecode.json-simple</groupId>
|
||||
|
|
@ -74,7 +72,13 @@
|
|||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
<version>${cdh.hadoop.version}</version>
|
||||
</dependency>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>jdk.tools</groupId>
|
||||
<artifactId>jdk.tools</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-common</artifactId>
|
||||
|
|
|
|||
|
|
@ -7,16 +7,8 @@
|
|||
package eu.dnetlib.oa.graph.usagerawdata.export;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
/**
|
||||
* @author D. Pierrakos, S. Zoupanos
|
||||
*/
|
||||
/**
|
||||
* @author D. Pierrakos, S. Zoupanos
|
||||
*/
|
||||
|
|
@ -31,7 +23,9 @@ public abstract class ConnectDB {
|
|||
private static String dbImpalaUrl;
|
||||
private static String usageStatsDBSchema;
|
||||
private static String statsDBSchema;
|
||||
private final static Logger log = Logger.getLogger(ConnectDB.class);
|
||||
|
||||
private ConnectDB() {
|
||||
}
|
||||
|
||||
static void init() throws ClassNotFoundException {
|
||||
|
||||
|
|
@ -72,10 +66,6 @@ public abstract class ConnectDB {
|
|||
}
|
||||
|
||||
private static Connection connectHive() throws SQLException {
|
||||
/*
|
||||
* Connection connection = DriverManager.getConnection(dbHiveUrl); Statement stmt =
|
||||
* connection.createStatement(); log.debug("Opened database successfully"); return connection;
|
||||
*/
|
||||
ComboPooledDataSource cpds = new ComboPooledDataSource();
|
||||
cpds.setJdbcUrl(dbHiveUrl);
|
||||
cpds.setAcquireIncrement(1);
|
||||
|
|
@ -97,10 +87,6 @@ public abstract class ConnectDB {
|
|||
}
|
||||
|
||||
private static Connection connectImpala() throws SQLException {
|
||||
/*
|
||||
* Connection connection = DriverManager.getConnection(dbImpalaUrl); Statement stmt =
|
||||
* connection.createStatement(); log.debug("Opened database successfully"); return connection;
|
||||
*/
|
||||
ComboPooledDataSource cpds = new ComboPooledDataSource();
|
||||
cpds.setJdbcUrl(dbImpalaUrl);
|
||||
cpds.setAcquireIncrement(1);
|
||||
|
|
|
|||
|
|
@ -70,11 +70,11 @@ public class ExecuteWorkflow {
|
|||
BasicConfigurator.configure();
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
UsageStatsExporter.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/graph/usagerawdata/export/usagerawdata_parameters.json")));
|
||||
IOUtils
|
||||
.toString(
|
||||
UsageStatsExporter.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/oa/graph/usagerawdata/export/usagerawdata_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
// Setting up the initial parameters
|
||||
|
|
|
|||
|
|
@ -49,9 +49,9 @@ public class IrusStats {
|
|||
logger.info("Creating sushilog");
|
||||
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
||||
String sqlCreateTableSushiLog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".sushilog(source STRING, "
|
||||
+ "repository STRING, rid STRING, date STRING, metric_type STRING, count INT) clustered by (source, "
|
||||
+ "repository, rid, date, metric_type) into 100 buckets stored as orc tblproperties('transactional'='true')";
|
||||
+ ".sushilog(source STRING, "
|
||||
+ "repository STRING, rid STRING, date STRING, metric_type STRING, count INT) clustered by (source, "
|
||||
+ "repository, rid, date, metric_type) into 100 buckets stored as orc tblproperties('transactional'='true')";
|
||||
stmt.executeUpdate(sqlCreateTableSushiLog);
|
||||
logger.info("Created sushilog");
|
||||
|
||||
|
|
@ -74,70 +74,70 @@ public class IrusStats {
|
|||
|
||||
logger.info("Dropping sushilogtmp_json table");
|
||||
String dropSushilogtmpJson = "DROP TABLE IF EXISTS "
|
||||
+ ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".sushilogtmp_json";
|
||||
+ ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".sushilogtmp_json";
|
||||
stmt.executeUpdate(dropSushilogtmpJson);
|
||||
logger.info("Dropped sushilogtmp_json table");
|
||||
|
||||
logger.info("Creating irus_sushilogtmp_json table");
|
||||
String createSushilogtmpJson = "CREATE EXTERNAL TABLE IF NOT EXISTS "
|
||||
+ ConnectDB.getUsageStatsDBSchema() + ".irus_sushilogtmp_json(\n"
|
||||
+ " `ItemIdentifier` ARRAY<\n"
|
||||
+ " struct<\n"
|
||||
+ " Type: STRING,\n"
|
||||
+ " Value: STRING\n"
|
||||
+ " >\n"
|
||||
+ " >,\n"
|
||||
+ " `ItemPerformance` ARRAY<\n"
|
||||
+ " struct<\n"
|
||||
+ " `Period`: struct<\n"
|
||||
+ " `Begin`: STRING,\n"
|
||||
+ " `End`: STRING\n"
|
||||
+ " >,\n"
|
||||
+ " `Instance`: struct<\n"
|
||||
+ " `Count`: STRING,\n"
|
||||
+ " `MetricType`: STRING\n"
|
||||
+ " >\n"
|
||||
+ " >\n"
|
||||
+ " >\n"
|
||||
+ ")\n"
|
||||
+ "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n"
|
||||
+ "LOCATION '" + ExecuteWorkflow.irusUKReportPath + "'\n"
|
||||
+ "TBLPROPERTIES (\"transactional\"=\"false\")";
|
||||
+ ConnectDB.getUsageStatsDBSchema() + ".irus_sushilogtmp_json(\n"
|
||||
+ " `ItemIdentifier` ARRAY<\n"
|
||||
+ " struct<\n"
|
||||
+ " Type: STRING,\n"
|
||||
+ " Value: STRING\n"
|
||||
+ " >\n"
|
||||
+ " >,\n"
|
||||
+ " `ItemPerformance` ARRAY<\n"
|
||||
+ " struct<\n"
|
||||
+ " `Period`: struct<\n"
|
||||
+ " `Begin`: STRING,\n"
|
||||
+ " `End`: STRING\n"
|
||||
+ " >,\n"
|
||||
+ " `Instance`: struct<\n"
|
||||
+ " `Count`: STRING,\n"
|
||||
+ " `MetricType`: STRING\n"
|
||||
+ " >\n"
|
||||
+ " >\n"
|
||||
+ " >\n"
|
||||
+ ")\n"
|
||||
+ "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n"
|
||||
+ "LOCATION '" + ExecuteWorkflow.irusUKReportPath + "'\n"
|
||||
+ "TBLPROPERTIES (\"transactional\"=\"false\")";
|
||||
stmt.executeUpdate(createSushilogtmpJson);
|
||||
logger.info("Created irus_sushilogtmp_json table");
|
||||
|
||||
logger.info("Dropping irus_sushilogtmp table");
|
||||
String dropSushilogtmp = "DROP TABLE IF EXISTS "
|
||||
+ ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".irus_sushilogtmp";
|
||||
+ ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".irus_sushilogtmp";
|
||||
stmt.executeUpdate(dropSushilogtmp);
|
||||
logger.info("Dropped irus_sushilogtmp table");
|
||||
|
||||
logger.info("Creating irus_sushilogtmp table");
|
||||
String createSushilogtmp = "CREATE TABLE " + ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".irus_sushilogtmp(source STRING, repository STRING, "
|
||||
+ "rid STRING, date STRING, metric_type STRING, count INT) clustered by (source) into 100 buckets stored as orc "
|
||||
+ "tblproperties('transactional'='true')";
|
||||
+ ".irus_sushilogtmp(source STRING, repository STRING, "
|
||||
+ "rid STRING, date STRING, metric_type STRING, count INT) clustered by (source) into 100 buckets stored as orc "
|
||||
+ "tblproperties('transactional'='true')";
|
||||
stmt.executeUpdate(createSushilogtmp);
|
||||
logger.info("Created irus_sushilogtmp table");
|
||||
|
||||
logger.info("Inserting to irus_sushilogtmp table");
|
||||
String insertSushilogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".irus_sushilogtmp "
|
||||
+ "SELECT 'IRUS-UK', CONCAT('opendoar____::', split(split(INPUT__FILE__NAME,'IrusIRReport_')[1],'_')[0]), "
|
||||
+ "`ItemIdent`.`Value`, `ItemPerf`.`Period`.`Begin`, "
|
||||
+ "`ItemPerf`.`Instance`.`MetricType`, `ItemPerf`.`Instance`.`Count` "
|
||||
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + ".irus_sushilogtmp_json "
|
||||
+ "LATERAL VIEW posexplode(ItemIdentifier) ItemIdentifierTable AS seqi, ItemIdent "
|
||||
+ "LATERAL VIEW posexplode(ItemPerformance) ItemPerformanceTable AS seqp, ItemPerf "
|
||||
+ "WHERE `ItemIdent`.`Type`= 'OAI'";
|
||||
+ "SELECT 'IRUS-UK', CONCAT('opendoar____::', split(split(INPUT__FILE__NAME,'IrusIRReport_')[1],'_')[0]), "
|
||||
+ "`ItemIdent`.`Value`, `ItemPerf`.`Period`.`Begin`, "
|
||||
+ "`ItemPerf`.`Instance`.`MetricType`, `ItemPerf`.`Instance`.`Count` "
|
||||
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + ".irus_sushilogtmp_json "
|
||||
+ "LATERAL VIEW posexplode(ItemIdentifier) ItemIdentifierTable AS seqi, ItemIdent "
|
||||
+ "LATERAL VIEW posexplode(ItemPerformance) ItemPerformanceTable AS seqp, ItemPerf "
|
||||
+ "WHERE `ItemIdent`.`Type`= 'OAI'";
|
||||
stmt.executeUpdate(insertSushilogtmp);
|
||||
logger.info("Inserted to irus_sushilogtmp table");
|
||||
|
||||
logger.info("Inserting to sushilog table");
|
||||
String insertToShushilog = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".sushilog SELECT * FROM "
|
||||
+ ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".irus_sushilogtmp";
|
||||
+ ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".irus_sushilogtmp";
|
||||
stmt.executeUpdate(insertToShushilog);
|
||||
logger.info("Inserted to sushilog table");
|
||||
|
||||
|
|
@ -155,8 +155,8 @@ public class IrusStats {
|
|||
logger.info("(getIrusRRReport) Ending period for log download: {}", YYYY_MM_FORMAT.format(end));
|
||||
|
||||
String reportUrl = irusUKURL + "GetReport/?Report=RR1&Release=4&RequestorID=OpenAIRE&BeginDate="
|
||||
+ YYYY_MM_FORMAT.format(start) + "&EndDate=" + YYYY_MM_FORMAT.format(end)
|
||||
+ "&RepositoryIdentifier=&ItemDataType=&NewJiscBand=&Granularity=Monthly&Callback=";
|
||||
+ YYYY_MM_FORMAT.format(start) + "&EndDate=" + YYYY_MM_FORMAT.format(end)
|
||||
+ "&RepositoryIdentifier=&ItemDataType=&NewJiscBand=&Granularity=Monthly&Callback=";
|
||||
logger.info("(getIrusRRReport) Getting report: {}", reportUrl);
|
||||
|
||||
String text = getJson(reportUrl, "", "");
|
||||
|
|
@ -188,7 +188,7 @@ public class IrusStats {
|
|||
logger.info("(getIrusRRReport) Found the following opendoars for download: {}", opendoarsToVisit);
|
||||
|
||||
if (ExecuteWorkflow.irusNumberOfOpendoarsToDownload > 0
|
||||
&& ExecuteWorkflow.irusNumberOfOpendoarsToDownload <= opendoarsToVisit.size()) {
|
||||
&& ExecuteWorkflow.irusNumberOfOpendoarsToDownload <= opendoarsToVisit.size()) {
|
||||
logger.info("Trimming siteIds list to the size of: " + ExecuteWorkflow.irusNumberOfOpendoarsToDownload);
|
||||
opendoarsToVisit = opendoarsToVisit.subList(0, ExecuteWorkflow.irusNumberOfOpendoarsToDownload);
|
||||
}
|
||||
|
|
@ -217,9 +217,9 @@ public class IrusStats {
|
|||
logger.info("(getIrusIRReport) Ending period for log download: {}", YYYY_MM_FORMAT.format(end));
|
||||
|
||||
PreparedStatement st = ConnectDB
|
||||
.getHiveConnection()
|
||||
.prepareStatement(
|
||||
"SELECT max(date) FROM " + ConnectDB.getUsageStatsDBSchema() + ".sushilog WHERE repository=?");
|
||||
.getHiveConnection()
|
||||
.prepareStatement(
|
||||
"SELECT max(date) FROM " + ConnectDB.getUsageStatsDBSchema() + ".sushilog WHERE repository=?");
|
||||
st.setString(1, "opendoar____::" + opendoar);
|
||||
ResultSet rs_date = st.executeQuery();
|
||||
|
||||
|
|
@ -240,16 +240,16 @@ public class IrusStats {
|
|||
start = start.plusMonths(1);
|
||||
while (start.isBefore(end)) {
|
||||
String reportUrl = this.irusUKURL + "GetReport/?Report=IR1&Release=4&RequestorID=OpenAIRE&BeginDate="
|
||||
+ YYYY_MM_FORMAT.format(start) + "&EndDate=" + YYYY_MM_FORMAT.format(start)
|
||||
+ "&RepositoryIdentifier=opendoar%3A" + opendoar
|
||||
+ "&ItemIdentifier=&ItemDataType=&hasDOI=&Granularity=Monthly&Callback=";
|
||||
+ YYYY_MM_FORMAT.format(start) + "&EndDate=" + YYYY_MM_FORMAT.format(start)
|
||||
+ "&RepositoryIdentifier=opendoar%3A" + opendoar
|
||||
+ "&ItemIdentifier=&ItemDataType=&hasDOI=&Granularity=Monthly&Callback=";
|
||||
start = start.plusMonths(1);
|
||||
String text = getJson(reportUrl, "", "");
|
||||
if (text == null)
|
||||
continue;
|
||||
FileSystem fs = FileSystem.get(new Configuration());
|
||||
String filePath = irusUKReportPath + "/" + "IrusIRReport_" + opendoar + "_" + YYYY_MM_FORMAT.format(start)
|
||||
+ ".json";
|
||||
+ ".json";
|
||||
logger.info("Storing to file: {}", filePath);
|
||||
|
||||
FSDataOutputStream fin = fs.create(new Path(filePath), true);
|
||||
|
|
|
|||
|
|
@ -1,6 +1,8 @@
|
|||
|
||||
package eu.dnetlib.oa.graph.usagerawdata.export;
|
||||
|
||||
import static eu.dnetlib.oa.graph.usagerawdata.export.ExecuteWorkflow.numberOfDaysToProcess;
|
||||
|
||||
import java.io.*;
|
||||
import java.net.URL;
|
||||
import java.net.URLConnection;
|
||||
|
|
@ -23,8 +25,6 @@ import org.json.simple.parser.JSONParser;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static eu.dnetlib.oa.graph.usagerawdata.export.ExecuteWorkflow.numberOfDaysToProcess;
|
||||
|
||||
public class LaReferenciaDownloadLogs {
|
||||
|
||||
private final String piwikUrl;
|
||||
|
|
@ -65,11 +65,11 @@ public class LaReferenciaDownloadLogs {
|
|||
|
||||
logger.info("Creating LaReferencia tables");
|
||||
String sqlCreateTableLareferenciaLog = "CREATE TABLE IF NOT EXISTS "
|
||||
+ ConnectDB.getUsageStatsDBSchema() + ".lareferencialog(matomoid INT, "
|
||||
+ "source STRING, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, "
|
||||
+ "source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) "
|
||||
+ "clustered by (source, id_visit, action, timestamp, entity_id) into 100 buckets "
|
||||
+ "stored as orc tblproperties('transactional'='true')";
|
||||
+ ConnectDB.getUsageStatsDBSchema() + ".lareferencialog(matomoid INT, "
|
||||
+ "source STRING, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, "
|
||||
+ "source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) "
|
||||
+ "clustered by (source, id_visit, action, timestamp, entity_id) into 100 buckets "
|
||||
+ "stored as orc tblproperties('transactional'='true')";
|
||||
stmt.executeUpdate(sqlCreateTableLareferenciaLog);
|
||||
logger.info("Created LaReferencia tables");
|
||||
|
||||
|
|
@ -127,7 +127,7 @@ public class LaReferenciaDownloadLogs {
|
|||
logger.info("Found the following siteIds for download: " + siteIdsToVisit);
|
||||
|
||||
if (ExecuteWorkflow.numberOfPiwikIdsToDownload > 0
|
||||
&& ExecuteWorkflow.numberOfPiwikIdsToDownload <= siteIdsToVisit.size()) {
|
||||
&& ExecuteWorkflow.numberOfPiwikIdsToDownload <= siteIdsToVisit.size()) {
|
||||
logger.info("Trimming siteIds list to the size of: " + ExecuteWorkflow.numberOfPiwikIdsToDownload);
|
||||
siteIdsToVisit = siteIdsToVisit.subList(0, ExecuteWorkflow.numberOfPiwikIdsToDownload);
|
||||
}
|
||||
|
|
@ -145,10 +145,10 @@ public class LaReferenciaDownloadLogs {
|
|||
|
||||
// Get the latest timestamp from the logs
|
||||
PreparedStatement st = ConnectDB
|
||||
.getHiveConnection()
|
||||
.prepareStatement(
|
||||
"SELECT max(timestamp) FROM " + ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".lareferencialog WHERE matomoid=?");
|
||||
.getHiveConnection()
|
||||
.prepareStatement(
|
||||
"SELECT max(timestamp) FROM " + ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".lareferencialog WHERE matomoid=?");
|
||||
st.setInt(1, laReferencialMatomoID);
|
||||
ResultSet rs_date = st.executeQuery();
|
||||
|
||||
|
|
@ -168,27 +168,27 @@ public class LaReferenciaDownloadLogs {
|
|||
for (LocalDate currentDay = start; !currentDay.isAfter(end); currentDay = currentDay.plusDays(1)) {
|
||||
if (dateMax != null && !currentDay.isAfter(dateMax)) {
|
||||
logger
|
||||
.info(
|
||||
"Date found in logs " + dateMax + " and not downloading Matomo logs for "
|
||||
+ laReferencialMatomoID);
|
||||
.info(
|
||||
"Date found in logs " + dateMax + " and not downloading Matomo logs for "
|
||||
+ laReferencialMatomoID);
|
||||
continue;
|
||||
}
|
||||
|
||||
logger
|
||||
.info(
|
||||
"Downloading logs for LaReferencia repoid {} and for {}", laReferencialMatomoID,
|
||||
currentDay.format(YYYY_MM_DD_FORMAT));
|
||||
.info(
|
||||
"Downloading logs for LaReferencia repoid {} and for {}", laReferencialMatomoID,
|
||||
currentDay.format(YYYY_MM_DD_FORMAT));
|
||||
|
||||
String period = "&period=day&date=" + currentDay.format(YYYY_MM_DD_FORMAT);
|
||||
String outFolder = repoLogsPath;
|
||||
|
||||
FileSystem fs = FileSystem.get(new Configuration());
|
||||
String filename = outFolder + "/" + laReferencialMatomoID + "_LaRefPiwiklog"
|
||||
+ currentDay.format(YYYY_MM_DD_FORMAT) + ".json";
|
||||
+ currentDay.format(YYYY_MM_DD_FORMAT) + ".json";
|
||||
FSDataOutputStream fin = fs.create(new Path(filename), true);
|
||||
|
||||
String baseApiUrl = getPiwikLogUrl() + APImethod + "&idSite=" + laReferencialMatomoID + period + format
|
||||
+ "&expanded=5&filter_limit=10&token_auth=" + tokenAuth;
|
||||
+ "&expanded=5&filter_limit=10&token_auth=" + tokenAuth;
|
||||
|
||||
int i = 0;
|
||||
String content;
|
||||
|
|
@ -213,9 +213,9 @@ public class LaReferenciaDownloadLogs {
|
|||
}
|
||||
|
||||
logger
|
||||
.info(
|
||||
"Downloaded part " + i + " of logs for LaReferencia repoid " + laReferencialMatomoID
|
||||
+ " and for " + currentDay.format(YYYY_MM_DD_FORMAT));
|
||||
.info(
|
||||
"Downloaded part " + i + " of logs for LaReferencia repoid " + laReferencialMatomoID
|
||||
+ " and for " + currentDay.format(YYYY_MM_DD_FORMAT));
|
||||
i++;
|
||||
} while (true);
|
||||
|
||||
|
|
@ -241,9 +241,9 @@ public class LaReferenciaDownloadLogs {
|
|||
|
||||
logger.info("Starting period for log download: {}", YYYY_MM_DD_FORMAT.format(start));
|
||||
logger
|
||||
.info(
|
||||
"Ending period for log download ({} days or yesterday): {}", numberOfDaysToProcess,
|
||||
YYYY_MM_DD_FORMAT.format(end));
|
||||
.info(
|
||||
"Ending period for log download ({} days or yesterday): {}", numberOfDaysToProcess,
|
||||
YYYY_MM_DD_FORMAT.format(end));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,6 +24,9 @@ import org.json.simple.parser.JSONParser;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* @author D. Pierrakos, S. Zoupanos
|
||||
*/
|
||||
public class LaReferenciaStats {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(LaReferenciaStats.class);
|
||||
|
|
@ -38,8 +41,13 @@ public class LaReferenciaStats {
|
|||
public LaReferenciaStats(String logRepoPath) throws Exception {
|
||||
this.logRepoPath = logRepoPath;
|
||||
this.createTables();
|
||||
// this.createTmpTables();
|
||||
}
|
||||
|
||||
/*
|
||||
* private void connectDB() throws Exception { try { ConnectDB connectDB = new ConnectDB(); } catch (Exception e) {
|
||||
* log.error("Connect to db failed: " + e); throw new Exception("Failed to connect to db: " + e.toString(), e); } }
|
||||
*/
|
||||
private void createTables() throws Exception {
|
||||
try {
|
||||
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
||||
|
|
@ -61,6 +69,7 @@ public class LaReferenciaStats {
|
|||
} catch (Exception e) {
|
||||
logger.error("Failed to create tables: " + e);
|
||||
throw new Exception("Failed to create tables: " + e.toString(), e);
|
||||
// System.exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -204,7 +213,7 @@ public class LaReferenciaStats {
|
|||
// conn.close();
|
||||
}
|
||||
|
||||
private void updateProdTables() throws SQLException {
|
||||
private void updateProdTables() throws SQLException, Exception {
|
||||
|
||||
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
||||
|
|
@ -269,6 +278,8 @@ public class LaReferenciaStats {
|
|||
if (result.equals("")) {
|
||||
result = "[]";
|
||||
}
|
||||
|
||||
// fs.close();
|
||||
} catch (Exception e) {
|
||||
logger.error(e.getMessage());
|
||||
throw new Exception(e);
|
||||
|
|
|
|||
|
|
@ -1,6 +1,8 @@
|
|||
|
||||
package eu.dnetlib.oa.graph.usagerawdata.export;
|
||||
|
||||
import static eu.dnetlib.oa.graph.usagerawdata.export.ExecuteWorkflow.numberOfDaysToProcess;
|
||||
|
||||
import java.io.*;
|
||||
import java.net.URL;
|
||||
import java.net.URLConnection;
|
||||
|
|
@ -27,8 +29,6 @@ import org.json.simple.parser.JSONParser;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static eu.dnetlib.oa.graph.usagerawdata.export.ExecuteWorkflow.numberOfDaysToProcess;
|
||||
|
||||
public class PiwikDownloadLogs {
|
||||
|
||||
private final String piwikUrl;
|
||||
|
|
@ -42,7 +42,7 @@ public class PiwikDownloadLogs {
|
|||
Map<Integer, List<LocalDateTime>> siteIdsWithNoDataDates = new HashMap<>();
|
||||
|
||||
private static final DateTimeFormatter YYYY_MM_DD_HH_MM_SS_FORMAT = DateTimeFormatter
|
||||
.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
private static final DateTimeFormatter YYYY_MM_DD_DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd");
|
||||
|
||||
// Get start/end period
|
||||
|
|
@ -59,11 +59,13 @@ public class PiwikDownloadLogs {
|
|||
}
|
||||
|
||||
public void getOpenAIRELogs(String repoLogsPath, String portalLogPath, String portalMatomoID) throws Exception {
|
||||
logger.info("### numberOfDaysToProcess: {}", numberOfDaysToProcess);
|
||||
|
||||
Statement statement = ConnectDB.getHiveConnection().createStatement();
|
||||
ResultSet rs = statement
|
||||
.executeQuery(
|
||||
"SELECT distinct piwik_id from " + ConnectDB.getStatsDBSchema()
|
||||
+ ".datasource where piwik_id is not null and piwik_id <> 0 order by piwik_id");
|
||||
.executeQuery(
|
||||
"SELECT distinct piwik_id from " + ConnectDB.getStatsDBSchema()
|
||||
+ ".datasource where piwik_id is not null and piwik_id <> 0 order by piwik_id");
|
||||
|
||||
List<Integer> piwikIdToVisit = new ArrayList<>();
|
||||
while (rs.next()) {
|
||||
|
|
@ -72,7 +74,7 @@ public class PiwikDownloadLogs {
|
|||
logger.info("Found the following piwikIds for download: {}", piwikIdToVisit);
|
||||
|
||||
if (ExecuteWorkflow.numberOfPiwikIdsToDownload > 0
|
||||
&& ExecuteWorkflow.numberOfPiwikIdsToDownload <= piwikIdToVisit.size()) {
|
||||
&& ExecuteWorkflow.numberOfPiwikIdsToDownload <= piwikIdToVisit.size()) {
|
||||
logger.info("Trimming piwikIds list to the size of: " + ExecuteWorkflow.numberOfPiwikIdsToDownload);
|
||||
piwikIdToVisit = piwikIdToVisit.subList(0, ExecuteWorkflow.numberOfPiwikIdsToDownload);
|
||||
}
|
||||
|
|
@ -95,9 +97,9 @@ public class PiwikDownloadLogs {
|
|||
initializeDateRange(dateMax);
|
||||
for (LocalDate currDay = start; currDay.isBefore(end); currDay = currDay.plusDays(1)) {
|
||||
logger
|
||||
.info(
|
||||
"### (2nd loop) - Downloading Matomo logs - siteId: {}, date: {}", siteId,
|
||||
currDay.format(YYYY_MM_DD_DATE_FORMAT));
|
||||
.info(
|
||||
"### (2nd loop) - Downloading Matomo logs - siteId: {}, date: {}", siteId,
|
||||
currDay.format(YYYY_MM_DD_DATE_FORMAT));
|
||||
getOpenAIRELogsForDate(currDay, siteId, repoLogsPath, portalLogPath, portalMatomoID);
|
||||
} // end 2nd loop
|
||||
} // end 1st loop
|
||||
|
|
@ -123,13 +125,13 @@ public class PiwikDownloadLogs {
|
|||
|
||||
logger.info("Starting period for log download: {}", YYYY_MM_DD_DATE_FORMAT.format(start));
|
||||
logger
|
||||
.info(
|
||||
"Ending period for log download ({} days or yesterday): {}", numberOfDaysToProcess,
|
||||
YYYY_MM_DD_DATE_FORMAT.format(end));
|
||||
.info(
|
||||
"Ending period for log download ({} days or yesterday): {}", numberOfDaysToProcess,
|
||||
YYYY_MM_DD_DATE_FORMAT.format(end));
|
||||
}
|
||||
|
||||
private void getOpenAIRELogsForDate(LocalDate currDay, int siteId, String repoLogsPath, String portalLogPath,
|
||||
String portalMatomoID) throws Exception {
|
||||
String portalMatomoID) throws Exception {
|
||||
logger.info("### Downloading logs - siteId: {}, date: {}", siteId, currDay.format(YYYY_MM_DD_DATE_FORMAT));
|
||||
|
||||
String period = "&period=day&date=" + currDay.format(YYYY_MM_DD_DATE_FORMAT);
|
||||
|
|
@ -142,7 +144,7 @@ public class PiwikDownloadLogs {
|
|||
}
|
||||
|
||||
String baseApiUrl = getPiwikLogUrl() + APImethod + "&idSite=" + siteId + period + format
|
||||
+ "&expanded=5&filter_limit=1000&token_auth=" + tokenAuth;
|
||||
+ "&expanded=5&filter_limit=1000&token_auth=" + tokenAuth;
|
||||
String content;
|
||||
|
||||
int i = 0;
|
||||
|
|
@ -165,9 +167,9 @@ public class PiwikDownloadLogs {
|
|||
if (i == 0) {
|
||||
LocalDateTime missingDateTime = currDay.atStartOfDay();
|
||||
logger
|
||||
.info(
|
||||
"### no content for siteId: {} for date: {} (store in siteIdsWithNoDataDates)", siteId,
|
||||
missingDateTime.format(YYYY_MM_DD_DATE_FORMAT));
|
||||
.info(
|
||||
"### no content for siteId: {} for date: {} (store in siteIdsWithNoDataDates)", siteId,
|
||||
missingDateTime.format(YYYY_MM_DD_DATE_FORMAT));
|
||||
siteIdsWithNoDataDates.computeIfAbsent(siteId, k -> new ArrayList<>()).add(missingDateTime);
|
||||
}
|
||||
|
||||
|
|
@ -177,7 +179,7 @@ public class PiwikDownloadLogs {
|
|||
JSONArray jsonArray = (JSONArray) parser.parse(content);
|
||||
|
||||
String pathString = outFolder + "/" + siteId + "_Piwiklog" + currDay.format(YYYY_MM_DD_DATE_FORMAT)
|
||||
+ "_offset_" + i + ".json";
|
||||
+ "_offset_" + i + ".json";
|
||||
FSDataOutputStream fin = fs.create(new Path(pathString), true);
|
||||
logger.info("Writing data - Path with filename: {}", pathString);
|
||||
|
||||
|
|
@ -230,9 +232,9 @@ public class PiwikDownloadLogs {
|
|||
Map<Integer, LocalDateTime> resultMap = new HashMap<>();
|
||||
|
||||
PreparedStatement st = ConnectDB.DB_HIVE_CONNECTION
|
||||
.prepareStatement(
|
||||
"SELECT source, MAX(`timestamp`) from " + ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".piwiklog GROUP BY source");
|
||||
.prepareStatement(
|
||||
"SELECT source, MAX(`timestamp`) from " + ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".piwiklog GROUP BY source");
|
||||
ResultSet rs = st.executeQuery();
|
||||
|
||||
while (rs.next()) {
|
||||
|
|
@ -247,9 +249,9 @@ public class PiwikDownloadLogs {
|
|||
}
|
||||
|
||||
resultMap
|
||||
.forEach(
|
||||
(siteId, timestamp) -> logger
|
||||
.info("[getMaxTimestamps] - siteId: {}, Max Timestamp: {}", siteId, timestamp));
|
||||
.forEach(
|
||||
(siteId, timestamp) -> logger
|
||||
.info("[getMaxTimestamps] - siteId: {}, Max Timestamp: {}", siteId, timestamp));
|
||||
return resultMap;
|
||||
}
|
||||
|
||||
|
|
@ -270,9 +272,9 @@ public class PiwikDownloadLogs {
|
|||
|
||||
for (Map.Entry<Integer, LocalDateTime> entry : maxDatesWithNoData.entrySet()) {
|
||||
logger
|
||||
.info(
|
||||
"### findMaxDateForEachSite - SiteId: {} -> Max Missing Date: {}", entry.getKey(),
|
||||
entry.getValue().format(YYYY_MM_DD_HH_MM_SS_FORMAT));
|
||||
.info(
|
||||
"### findMaxDateForEachSite - SiteId: {} -> Max Missing Date: {}", entry.getKey(),
|
||||
entry.getValue().format(YYYY_MM_DD_HH_MM_SS_FORMAT));
|
||||
}
|
||||
|
||||
dummyInsertPiwiklog(maxDatesWithNoData);
|
||||
|
|
@ -282,7 +284,7 @@ public class PiwikDownloadLogs {
|
|||
logger.info("### insert Piwiklog MaxTimestamp");
|
||||
|
||||
String insertQuery = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".piwiklog (source, timestamp, action) VALUES (?, ?, ?)";
|
||||
+ ".piwiklog (source, timestamp, action) VALUES (?, ?, ?)";
|
||||
|
||||
try (PreparedStatement st = ConnectDB.getHiveConnection().prepareStatement(insertQuery)) {
|
||||
for (Map.Entry<Integer, LocalDateTime> entry : maxDatesWithNoData.entrySet()) {
|
||||
|
|
|
|||
|
|
@ -17,6 +17,9 @@ import org.apache.hadoop.fs.RemoteIterator;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* @author D. Pierrakos, S. Zoupanos
|
||||
*/
|
||||
public class PiwikStatsDB {
|
||||
|
||||
private String logPath;
|
||||
|
|
@ -30,7 +33,7 @@ public class PiwikStatsDB {
|
|||
private String CounterRobotsURL;
|
||||
private ArrayList robotsList;
|
||||
|
||||
public PiwikStatsDB(String logRepoPath, String logPortalPath) {
|
||||
public PiwikStatsDB(String logRepoPath, String logPortalPath) throws Exception {
|
||||
this.logRepoPath = logRepoPath;
|
||||
this.logPortalPath = logPortalPath;
|
||||
|
||||
|
|
@ -55,6 +58,9 @@ public class PiwikStatsDB {
|
|||
public void recreateDBAndTables() throws Exception {
|
||||
this.createDatabase();
|
||||
this.createTables();
|
||||
// The piwiklog table is not needed since it is built
|
||||
// on top of JSON files
|
||||
//////////// this.createTmpTables();
|
||||
}
|
||||
|
||||
public ArrayList getRobotsList() {
|
||||
|
|
@ -112,6 +118,12 @@ public class PiwikStatsDB {
|
|||
+ "into 100 buckets stored as orc tblproperties('transactional'='true')";
|
||||
stmt.executeUpdate(sqlCreateTablePiwikLog);
|
||||
|
||||
// String dropT = "TRUNCATE TABLE "
|
||||
// + ConnectDB.getUsageStatsDBSchema()
|
||||
// + ".piwiklog ";
|
||||
// stmt.executeUpdate(dropT);
|
||||
// logger.info("truncated piwiklog");
|
||||
|
||||
/////////////////////////////////////////
|
||||
// Rule for duplicate inserts @ piwiklog
|
||||
/////////////////////////////////////////
|
||||
|
|
@ -250,7 +262,7 @@ public class PiwikStatsDB {
|
|||
|
||||
public void removeDoubleClicks() throws Exception {
|
||||
Statement stmt = ConnectDB.getHiveConnection().createStatement();
|
||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
||||
// ConnectDB.getHiveConnection().setAutoCommit(false);
|
||||
|
||||
logger.info("Cleaning download double clicks");
|
||||
// clean download double clicks
|
||||
|
|
|
|||
|
|
@ -30,8 +30,6 @@ import org.json.simple.parser.ParseException;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static eu.dnetlib.oa.graph.usagerawdata.export.ExecuteWorkflow.numberOfDaysToProcess;
|
||||
|
||||
public class SarcStats {
|
||||
|
||||
private Statement stmtHive = null;
|
||||
|
|
@ -46,6 +44,8 @@ public class SarcStats {
|
|||
|
||||
private static final Logger logger = LoggerFactory.getLogger(SarcStats.class);
|
||||
|
||||
private static final int NUM_OF_DAYS = 15; // number of days to process since the start date
|
||||
|
||||
public SarcStats() {
|
||||
}
|
||||
|
||||
|
|
@ -56,11 +56,11 @@ public class SarcStats {
|
|||
stmtHive.executeUpdate(sqlCreateTableSushiLog);
|
||||
|
||||
String sqlcreateRuleSushiLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
|
||||
+ " ON INSERT TO sushilog "
|
||||
+ " WHERE (EXISTS ( SELECT sushilog.source, sushilog.repository,"
|
||||
+ "sushilog.rid, sushilog.date "
|
||||
+ "FROM sushilog "
|
||||
+ "WHERE sushilog.source = new.source AND sushilog.repository = new.repository AND sushilog.rid = new.rid AND sushilog.date = new.date AND sushilog.metric_type = new.metric_type)) DO INSTEAD NOTHING;";
|
||||
+ " ON INSERT TO sushilog "
|
||||
+ " WHERE (EXISTS ( SELECT sushilog.source, sushilog.repository,"
|
||||
+ "sushilog.rid, sushilog.date "
|
||||
+ "FROM sushilog "
|
||||
+ "WHERE sushilog.source = new.source AND sushilog.repository = new.repository AND sushilog.rid = new.rid AND sushilog.date = new.date AND sushilog.metric_type = new.metric_type)) DO INSTEAD NOTHING;";
|
||||
stmtHive.executeUpdate(sqlcreateRuleSushiLog);
|
||||
String createSushiIndex = "create index if not exists sushilog_duplicates on sushilog(source, repository, rid, date, metric_type);";
|
||||
stmtHive.executeUpdate(createSushiIndex);
|
||||
|
|
@ -100,92 +100,92 @@ public class SarcStats {
|
|||
|
||||
logger.info("Dropping sarc_sushilogtmp_json_array table");
|
||||
String drop_sarc_sushilogtmp_json_array = "DROP TABLE IF EXISTS "
|
||||
+ ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_array";
|
||||
+ ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_array";
|
||||
stmt.executeUpdate(drop_sarc_sushilogtmp_json_array);
|
||||
logger.info("Dropped sarc_sushilogtmp_json_array table");
|
||||
|
||||
logger.info("Creating sarc_sushilogtmp_json_array table");
|
||||
String create_sarc_sushilogtmp_json_array = "CREATE EXTERNAL TABLE IF NOT EXISTS "
|
||||
+ ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_array(\n"
|
||||
+ " `ItemIdentifier` ARRAY<\n"
|
||||
+ " struct<\n"
|
||||
+ " `Type`: STRING,\n"
|
||||
+ " `Value`: STRING\n"
|
||||
+ " >\n"
|
||||
+ " >,\n"
|
||||
+ " `ItemPerformance` struct<\n"
|
||||
+ " `Period`: struct<\n"
|
||||
+ " `Begin`: STRING,\n"
|
||||
+ " `End`: STRING\n"
|
||||
+ " >,\n"
|
||||
+ " `Instance`: struct<\n"
|
||||
+ " `Count`: STRING,\n"
|
||||
+ " `MetricType`: STRING\n"
|
||||
+ " >\n"
|
||||
+ " >\n"
|
||||
+ ")"
|
||||
+ "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n"
|
||||
+ "LOCATION '" + sarcsReportPathArray + "/'\n"
|
||||
+ "TBLPROPERTIES (\"transactional\"=\"false\")";
|
||||
+ ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_array(\n"
|
||||
+ " `ItemIdentifier` ARRAY<\n"
|
||||
+ " struct<\n"
|
||||
+ " `Type`: STRING,\n"
|
||||
+ " `Value`: STRING\n"
|
||||
+ " >\n"
|
||||
+ " >,\n"
|
||||
+ " `ItemPerformance` struct<\n"
|
||||
+ " `Period`: struct<\n"
|
||||
+ " `Begin`: STRING,\n"
|
||||
+ " `End`: STRING\n"
|
||||
+ " >,\n"
|
||||
+ " `Instance`: struct<\n"
|
||||
+ " `Count`: STRING,\n"
|
||||
+ " `MetricType`: STRING\n"
|
||||
+ " >\n"
|
||||
+ " >\n"
|
||||
+ ")"
|
||||
+ "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n"
|
||||
+ "LOCATION '" + sarcsReportPathArray + "/'\n"
|
||||
+ "TBLPROPERTIES (\"transactional\"=\"false\")";
|
||||
stmt.executeUpdate(create_sarc_sushilogtmp_json_array);
|
||||
logger.info("Created sarc_sushilogtmp_json_array table");
|
||||
|
||||
logger.info("Dropping sarc_sushilogtmp_json_non_array table");
|
||||
String drop_sarc_sushilogtmp_json_non_array = "DROP TABLE IF EXISTS "
|
||||
+ ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".sarc_sushilogtmp_json_non_array";
|
||||
+ ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".sarc_sushilogtmp_json_non_array";
|
||||
stmt.executeUpdate(drop_sarc_sushilogtmp_json_non_array);
|
||||
logger.info("Dropped sarc_sushilogtmp_json_non_array table");
|
||||
|
||||
logger.info("Creating sarc_sushilogtmp_json_non_array table");
|
||||
String create_sarc_sushilogtmp_json_non_array = "CREATE EXTERNAL TABLE IF NOT EXISTS "
|
||||
+ ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_non_array (\n"
|
||||
+ " `ItemIdentifier` struct<\n"
|
||||
+ " `Type`: STRING,\n"
|
||||
+ " `Value`: STRING\n"
|
||||
+ " >,\n"
|
||||
+ " `ItemPerformance` struct<\n"
|
||||
+ " `Period`: struct<\n"
|
||||
+ " `Begin`: STRING,\n"
|
||||
+ " `End`: STRING\n"
|
||||
+ " >,\n"
|
||||
+ " `Instance`: struct<\n"
|
||||
+ " `Count`: STRING,\n"
|
||||
+ " `MetricType`: STRING\n"
|
||||
+ " >\n"
|
||||
+ " >"
|
||||
+ ")"
|
||||
+ "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n"
|
||||
+ "LOCATION '" + sarcsReportPathNonArray + "/'\n"
|
||||
+ "TBLPROPERTIES (\"transactional\"=\"false\")";
|
||||
+ ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_non_array (\n"
|
||||
+ " `ItemIdentifier` struct<\n"
|
||||
+ " `Type`: STRING,\n"
|
||||
+ " `Value`: STRING\n"
|
||||
+ " >,\n"
|
||||
+ " `ItemPerformance` struct<\n"
|
||||
+ " `Period`: struct<\n"
|
||||
+ " `Begin`: STRING,\n"
|
||||
+ " `End`: STRING\n"
|
||||
+ " >,\n"
|
||||
+ " `Instance`: struct<\n"
|
||||
+ " `Count`: STRING,\n"
|
||||
+ " `MetricType`: STRING\n"
|
||||
+ " >\n"
|
||||
+ " >"
|
||||
+ ")"
|
||||
+ "ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'\n"
|
||||
+ "LOCATION '" + sarcsReportPathNonArray + "/'\n"
|
||||
+ "TBLPROPERTIES (\"transactional\"=\"false\")";
|
||||
stmt.executeUpdate(create_sarc_sushilogtmp_json_non_array);
|
||||
logger.info("Created sarc_sushilogtmp_json_non_array table");
|
||||
|
||||
logger.info("Creating sarc_sushilogtmp table");
|
||||
String create_sarc_sushilogtmp = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".sarc_sushilogtmp(source STRING, repository STRING, "
|
||||
+ "rid STRING, date STRING, metric_type STRING, count INT) clustered by (source) into 100 buckets stored as orc "
|
||||
+ "tblproperties('transactional'='true')";
|
||||
+ ".sarc_sushilogtmp(source STRING, repository STRING, "
|
||||
+ "rid STRING, date STRING, metric_type STRING, count INT) clustered by (source) into 100 buckets stored as orc "
|
||||
+ "tblproperties('transactional'='true')";
|
||||
stmt.executeUpdate(create_sarc_sushilogtmp);
|
||||
logger.info("Created sarc_sushilogtmp table");
|
||||
|
||||
logger.info("Inserting to sarc_sushilogtmp table (sarc_sushilogtmp_json_array)");
|
||||
String insert_sarc_sushilogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp "
|
||||
+ "SELECT 'SARC-OJS', split(split(INPUT__FILE__NAME,'SarcsARReport_')[1],'_')[0], "
|
||||
+ " `ItemIdent`.`Value`, `ItemPerformance`.`Period`.`Begin`, "
|
||||
+ "`ItemPerformance`.`Instance`.`MetricType`, `ItemPerformance`.`Instance`.`Count` "
|
||||
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_array "
|
||||
+ "LATERAL VIEW posexplode(ItemIdentifier) ItemIdentifierTable AS seqi, ItemIdent "
|
||||
+ "WHERE `ItemIdent`.`Type`='DOI'";
|
||||
+ "SELECT 'SARC-OJS', split(split(INPUT__FILE__NAME,'SarcsARReport_')[1],'_')[0], "
|
||||
+ " `ItemIdent`.`Value`, `ItemPerformance`.`Period`.`Begin`, "
|
||||
+ "`ItemPerformance`.`Instance`.`MetricType`, `ItemPerformance`.`Instance`.`Count` "
|
||||
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_array "
|
||||
+ "LATERAL VIEW posexplode(ItemIdentifier) ItemIdentifierTable AS seqi, ItemIdent "
|
||||
+ "WHERE `ItemIdent`.`Type`='DOI'";
|
||||
stmt.executeUpdate(insert_sarc_sushilogtmp);
|
||||
logger.info("Inserted to sarc_sushilogtmp table (sarc_sushilogtmp_json_array)");
|
||||
|
||||
logger.info("Inserting to sarc_sushilogtmp table (sarc_sushilogtmp_json_non_array)");
|
||||
insert_sarc_sushilogtmp = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp "
|
||||
+ "SELECT 'SARC-OJS', split(split(INPUT__FILE__NAME,'SarcsARReport_')[1],'_')[0], "
|
||||
+ "`ItemIdentifier`.`Value`, `ItemPerformance`.`Period`.`Begin`, "
|
||||
+ "`ItemPerformance`.`Instance`.`MetricType`, `ItemPerformance`.`Instance`.`Count` "
|
||||
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_non_array";
|
||||
+ "SELECT 'SARC-OJS', split(split(INPUT__FILE__NAME,'SarcsARReport_')[1],'_')[0], "
|
||||
+ "`ItemIdentifier`.`Value`, `ItemPerformance`.`Period`.`Begin`, "
|
||||
+ "`ItemPerformance`.`Instance`.`MetricType`, `ItemPerformance`.`Instance`.`Count` "
|
||||
+ "FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp_json_non_array";
|
||||
stmt.executeUpdate(insert_sarc_sushilogtmp);
|
||||
logger.info("Inserted to sarc_sushilogtmp table (sarc_sushilogtmp_json_non_array)");
|
||||
|
||||
|
|
@ -199,20 +199,20 @@ public class SarcStats {
|
|||
|
||||
logger.info("Creating sushilog table");
|
||||
String createSushilog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".sushilog "
|
||||
+ "(`source` string, "
|
||||
+ "`repository` string, "
|
||||
+ "`rid` string, "
|
||||
+ "`date` string, "
|
||||
+ "`metric_type` string, "
|
||||
+ "`count` int)";
|
||||
+ ".sushilog "
|
||||
+ "(`source` string, "
|
||||
+ "`repository` string, "
|
||||
+ "`rid` string, "
|
||||
+ "`date` string, "
|
||||
+ "`metric_type` string, "
|
||||
+ "`count` int)";
|
||||
stmt.executeUpdate(createSushilog);
|
||||
logger.info("Created sushilog table");
|
||||
|
||||
logger.info("Dropping sarc_sushilogtmp table");
|
||||
String drop_sarc_sushilogtmp = "DROP TABLE IF EXISTS "
|
||||
+ ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".sarc_sushilogtmp";
|
||||
+ ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".sarc_sushilogtmp";
|
||||
stmt.executeUpdate(drop_sarc_sushilogtmp);
|
||||
logger.info("Dropped sarc_sushilogtmp table");
|
||||
ConnectDB.getHiveConnection().close();
|
||||
|
|
@ -271,7 +271,7 @@ public class SarcStats {
|
|||
// });
|
||||
|
||||
if (ExecuteWorkflow.sarcNumberOfIssnToDownload > 0
|
||||
&& ExecuteWorkflow.sarcNumberOfIssnToDownload <= issnAndUrls.size()) {
|
||||
&& ExecuteWorkflow.sarcNumberOfIssnToDownload <= issnAndUrls.size()) {
|
||||
logger.info("Trimming siteIds list to the size of: {}", ExecuteWorkflow.sarcNumberOfIssnToDownload);
|
||||
issnAndUrls = issnAndUrls.subList(0, ExecuteWorkflow.sarcNumberOfIssnToDownload);
|
||||
}
|
||||
|
|
@ -293,7 +293,7 @@ public class SarcStats {
|
|||
// Insert into sushilog
|
||||
logger.info("Inserting into sushilog");
|
||||
String insertSushiLog = "INSERT INTO " + ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".sushilog SELECT * " + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp";
|
||||
+ ".sushilog SELECT * " + "FROM " + ConnectDB.getUsageStatsDBSchema() + ".sarc_sushilogtmp";
|
||||
stmtHive.executeUpdate(insertSushiLog);
|
||||
logger.info("Inserted into sushilog");
|
||||
|
||||
|
|
@ -302,7 +302,7 @@ public class SarcStats {
|
|||
}
|
||||
|
||||
public void getARReport(String sarcsReportPathArray, String sarcsReportPathNonArray,
|
||||
String url, String issn) throws Exception {
|
||||
String url, String issn) throws Exception {
|
||||
logger.info("Processing SARC! issn: " + issn + " with url: " + url);
|
||||
ConnectDB.getHiveConnection().setAutoCommit(false);
|
||||
|
||||
|
|
@ -315,9 +315,9 @@ public class SarcStats {
|
|||
// logger.info("(getARReport) Ending period for log download: " + end.format(YYYY_MM));
|
||||
|
||||
PreparedStatement st = ConnectDB
|
||||
.getHiveConnection()
|
||||
.prepareStatement(
|
||||
"SELECT max(date) FROM " + ConnectDB.getUsageStatsDBSchema() + ".sushilog WHERE repository=?");
|
||||
.getHiveConnection()
|
||||
.prepareStatement(
|
||||
"SELECT max(date) FROM " + ConnectDB.getUsageStatsDBSchema() + ".sushilog WHERE repository=?");
|
||||
st.setString(1, issn);
|
||||
ResultSet rs_date = st.executeQuery();
|
||||
|
||||
|
|
@ -337,13 +337,13 @@ public class SarcStats {
|
|||
config.addResource(new Path("/etc/hadoop/conf/core-site.xml"));
|
||||
config.addResource(new Path("/etc/hadoop/conf/hdfs-site.xml"));
|
||||
config
|
||||
.set(
|
||||
"fs.hdfs.impl",
|
||||
DistributedFileSystem.class.getName());
|
||||
.set(
|
||||
"fs.hdfs.impl",
|
||||
DistributedFileSystem.class.getName());
|
||||
config
|
||||
.set(
|
||||
"fs.file.impl",
|
||||
LocalFileSystem.class.getName());
|
||||
.set(
|
||||
"fs.file.impl",
|
||||
LocalFileSystem.class.getName());
|
||||
FileSystem dfs = FileSystem.get(config);
|
||||
|
||||
if (dateMax != null && !end.isAfter(dateMax)) {
|
||||
|
|
@ -351,7 +351,7 @@ public class SarcStats {
|
|||
} else {
|
||||
while (start.isBefore(end) || start.equals(end)) {
|
||||
String reportUrl = url + "GetReport/?Report=AR1&Format=json&BeginDate=" + YYYY_MM.format(start)
|
||||
+ "&EndDate=" + YYYY_MM.format(start.plusMonths(1));
|
||||
+ "&EndDate=" + YYYY_MM.format(start.plusMonths(1));
|
||||
|
||||
logger.info("(getARReport) Getting report: {}", reportUrl);
|
||||
String text = getJson(reportUrl);
|
||||
|
|
@ -390,13 +390,13 @@ public class SarcStats {
|
|||
|
||||
// Creating the file in the filesystem for the ItemIdentifier as array object
|
||||
String filePathArray = sarcsReportPathArray + "/SarcsARReport_" + issn + "_"
|
||||
+ YYYY_MM.format(start) + ".json";
|
||||
+ YYYY_MM.format(start) + ".json";
|
||||
logger.info("Storing to file: " + filePathArray);
|
||||
FSDataOutputStream finArray = dfs.create(new Path(filePathArray), true);
|
||||
|
||||
// Creating the file in the filesystem for the ItemIdentifier as array object
|
||||
String filePathNonArray = sarcsReportPathNonArray + "/SarcsARReport_" + issn + "_"
|
||||
+ YYYY_MM.format(start) + ".json";
|
||||
+ YYYY_MM.format(start) + ".json";
|
||||
logger.info("Storing to file: " + filePathNonArray);
|
||||
FSDataOutputStream finNonArray = dfs.create(new Path(filePathNonArray), true);
|
||||
|
||||
|
|
@ -441,7 +441,7 @@ public class SarcStats {
|
|||
}
|
||||
|
||||
// Add number of days
|
||||
end = start.plusDays(numberOfDaysToProcess); // number of days to process since the start date
|
||||
end = start.plusDays(NUM_OF_DAYS);
|
||||
|
||||
// Ensure end date is not after yesterday
|
||||
LocalDate yesterday = LocalDate.now().minusDays(1);
|
||||
|
|
@ -450,7 +450,7 @@ public class SarcStats {
|
|||
}
|
||||
|
||||
logger.info("Starting period for log download: {}", YYYY_MM.format(start));
|
||||
logger.info("Ending period for log download ({} days or yesterday): {}", numberOfDaysToProcess, YYYY_MM.format(end));
|
||||
logger.info("Ending period for log download ({} days or yesterday): {}", NUM_OF_DAYS, YYYY_MM.format(end));
|
||||
}
|
||||
|
||||
private void renameKeysRecursively(String delimiter, JSONArray givenJsonObj) throws Exception {
|
||||
|
|
|
|||
|
|
@ -10,6 +10,11 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Main class for downloading and processing Usage statistics
|
||||
*
|
||||
* @author D. Pierrakos, S. Zoupanos
|
||||
*/
|
||||
public class UsageStatsExporter {
|
||||
|
||||
public UsageStatsExporter() {
|
||||
|
|
@ -65,9 +70,9 @@ public class UsageStatsExporter {
|
|||
if (ExecuteWorkflow.downloadPiwikLogs) {
|
||||
logger.info("Downloading piwik logs");
|
||||
piwd
|
||||
.getOpenAIRELogs(
|
||||
ExecuteWorkflow.repoLogPath,
|
||||
ExecuteWorkflow.portalLogPath, ExecuteWorkflow.portalMatomoID);
|
||||
.getOpenAIRELogs(
|
||||
ExecuteWorkflow.repoLogPath,
|
||||
ExecuteWorkflow.portalLogPath, ExecuteWorkflow.portalMatomoID);
|
||||
}
|
||||
logger.info("Downloaded piwik logs");
|
||||
|
||||
|
|
@ -82,7 +87,7 @@ public class UsageStatsExporter {
|
|||
|
||||
logger.info("Creating LaReferencia tables");
|
||||
LaReferenciaDownloadLogs lrf = new LaReferenciaDownloadLogs(ExecuteWorkflow.lareferenciaBaseURL,
|
||||
ExecuteWorkflow.lareferenciaAuthToken);
|
||||
ExecuteWorkflow.lareferenciaAuthToken);
|
||||
|
||||
if (ExecuteWorkflow.laReferenciaEmptyDirs) {
|
||||
logger.info("Recreating LaReferencia log directories");
|
||||
|
|
@ -159,36 +164,36 @@ public class UsageStatsExporter {
|
|||
|
||||
logger.info("Creating LaReferencia tables");
|
||||
String sqlCreateTableLareferenciaLog = "CREATE TABLE IF NOT EXISTS "
|
||||
+ ConnectDB.getUsageStatsDBSchema() + ".lareferencialog(matomoid INT, "
|
||||
+ "source STRING, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, "
|
||||
+ "source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) "
|
||||
+ "clustered by (source, id_visit, action, timestamp, entity_id) into 100 buckets "
|
||||
+ "stored as orc tblproperties('transactional'='true')";
|
||||
+ ConnectDB.getUsageStatsDBSchema() + ".lareferencialog(matomoid INT, "
|
||||
+ "source STRING, id_visit STRING, country STRING, action STRING, url STRING, entity_id STRING, "
|
||||
+ "source_item_type STRING, timestamp STRING, referrer_name STRING, agent STRING) "
|
||||
+ "clustered by (source, id_visit, action, timestamp, entity_id) into 100 buckets "
|
||||
+ "stored as orc tblproperties('transactional'='true')";
|
||||
stmt.executeUpdate(sqlCreateTableLareferenciaLog);
|
||||
logger.info("Created LaReferencia tables");
|
||||
|
||||
logger.info("Creating sushilog");
|
||||
|
||||
String sqlCreateTableSushiLog = "CREATE TABLE IF NOT EXISTS " + ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".sushilog(source STRING, "
|
||||
+ "repository STRING, rid STRING, date STRING, metric_type STRING, count INT) clustered by (source, "
|
||||
+ "repository, rid, date, metric_type) into 100 buckets stored as orc tblproperties('transactional'='true')";
|
||||
+ ".sushilog(source STRING, "
|
||||
+ "repository STRING, rid STRING, date STRING, metric_type STRING, count INT) clustered by (source, "
|
||||
+ "repository, rid, date, metric_type) into 100 buckets stored as orc tblproperties('transactional'='true')";
|
||||
stmt.executeUpdate(sqlCreateTableSushiLog);
|
||||
logger.info("Created sushilog");
|
||||
|
||||
logger.info("Updating piwiklog");
|
||||
String sql = "insert into " + ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".piwiklog select * from openaire_prod_usage_raw.piwiklog";
|
||||
+ ".piwiklog select * from openaire_prod_usage_raw.piwiklog";
|
||||
stmt.executeUpdate(sql);
|
||||
|
||||
logger.info("Updating lareferencialog");
|
||||
sql = "insert into " + ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".lareferencialog select * from openaire_prod_usage_raw.lareferencialog";
|
||||
+ ".lareferencialog select * from openaire_prod_usage_raw.lareferencialog";
|
||||
stmt.executeUpdate(sql);
|
||||
|
||||
logger.info("Updating sushilog");
|
||||
sql = "insert into " + ConnectDB.getUsageStatsDBSchema()
|
||||
+ ".sushilog select * from openaire_prod_usage_raw.sushilog";
|
||||
+ ".sushilog select * from openaire_prod_usage_raw.sushilog";
|
||||
stmt.executeUpdate(sql);
|
||||
|
||||
stmt.close();
|
||||
|
|
|
|||
|
|
@ -215,5 +215,11 @@
|
|||
"paramLongName": "numberOfDownloadThreads",
|
||||
"paramDescription": "Number of download threads",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "nodtp",
|
||||
"paramLongName": "numberOfDaysToProcess",
|
||||
"paramDescription": "Number Of Days To Process",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
||||
|
|
|
|||
|
|
@ -30,6 +30,10 @@
|
|||
<name>oozie.launcher.mapred.job.queue.name</name>
|
||||
<value>${oozieLauncherQueueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>mapred.child.java.opts</name>
|
||||
<value>-Xmx16g</value>
|
||||
</property>
|
||||
</configuration>
|
||||
</global>
|
||||
|
||||
|
|
@ -78,6 +82,7 @@
|
|||
<arg>--sarcNumberOfIssnToDownload</arg><arg>${sarcNumberOfIssnToDownload}</arg>
|
||||
<arg>--finalizeStats</arg><arg>${finalizeStats}</arg>
|
||||
<arg>--numberOfDownloadThreads</arg><arg>${numberOfDownloadThreads}</arg>
|
||||
<arg>--numberOfDaysToProcess</arg><arg>${numberOfDaysToProcess}</arg>
|
||||
<capture-output/>
|
||||
</java>
|
||||
<ok to="End" />
|
||||
|
|
|
|||
Loading…
Reference in New Issue