Changeset 5034


Ignore:
Timestamp:
04/24/14 09:59:17 (10 years ago)
Author:
yana.panchenko@uni-tuebingen.de
Message:

automated scan data caching - first working version

make scan cache crawl automatically with an update interval
store the crawled scan data in web app context and on file system
specify location for scan data to be stored - this has to be improved

Location:
SRUAggregator/trunk
Files:
57 added
9 edited

Legend:

Unmodified
Added
Removed
  • SRUAggregator/trunk/pom.xml

    r4106 r5034  
    1212    </properties>
    1313    <packaging>war</packaging>
    14     <name>Aggregator</name>
     14    <name>Aggregator-g</name>
    1515    <description>The Aggregator Project</description>
    1616    <organization>
     
    3232        <repository>
    3333            <id>sardine-google-svn-repo</id>
    34             <snapshots> <enabled>true</enabled> </snapshots>
     34            <snapshots>
     35                <enabled>true</enabled>
     36            </snapshots>
    3537            <name>Sardine maven repo at Google Code</name>
    3638            <url>http://sardine.googlecode.com/svn/maven/</url>
     
    7173            <groupId>com.sun.jersey</groupId>
    7274            <artifactId>jersey-client</artifactId>
     75            <version>1.17.1</version>
     76        </dependency>
     77        <dependency>
     78            <groupId>com.sun.jersey</groupId>
     79            <artifactId>jersey-servlet</artifactId>
    7380            <version>1.17.1</version>
    7481        </dependency>
     
    119126            <version>2.2</version>
    120127        </dependency>
     128       
     129        <dependency>
     130            <groupId>junit</groupId>
     131            <artifactId>junit</artifactId>
     132            <version>4.10</version>
     133            <scope>test</scope>
     134            <type>jar</type>
     135        </dependency>
     136       
     137        <dependency> 
     138            <groupId>org.mockito</groupId> 
     139            <artifactId>mockito-all</artifactId> 
     140            <version>1.9.5</version>
     141            <scope>test</scope>
     142        </dependency> 
     143       
    121144    </dependencies>
    122145   
  • SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/app/SearchOptions.java

    r4069 r5034  
    77import eu.clarin.sru.fcs.aggregator.sopt.CenterRegistryForTesting;
    88import eu.clarin.sru.fcs.aggregator.sopt.CenterRegistryI;
    9 import eu.clarin.sru.fcs.aggregator.sopt.CorporaScanCache;
    109import eu.clarin.sru.fcs.aggregator.sopt.Corpus;
    1110import eu.clarin.sru.fcs.aggregator.sopt.CorpusByInstitutionComparator;
     
    1312import eu.clarin.sru.fcs.aggregator.sopt.CorpusByNameComparator;
    1413import eu.clarin.sru.fcs.aggregator.sopt.CorpusByNameDComparator;
    15 import eu.clarin.sru.fcs.aggregator.sopt.CorpusCache;
    1614import eu.clarin.sru.fcs.aggregator.sopt.CorpusModelCached;
    1715import eu.clarin.sru.fcs.aggregator.sopt.CorpusModelI;
     
    2119import eu.clarin.sru.fcs.aggregator.sopt.CorpusRendererLive;
    2220import eu.clarin.sru.fcs.aggregator.sopt.Languages;
     21import eu.clarin.sru.fcs.aggregator.cache.ScanCacheI;
    2322import java.lang.reflect.Type;
    2423import java.util.ArrayList;
    25 import java.util.Collection;
    2624import java.util.Collections;
    2725import java.util.HashSet;
     
    8078    private SRUVersion version = SRUVersion.VERSION_1_2;
    8179
    82     private CorporaScanCache cache;
     80    private ScanCacheI cache;
    8381   
    8482    @Override
     
    8785        setUpSRUVersion();
    8886        setUpAggerationContext();
    89         cache = (CorporaScanCache) Executions.getCurrent().getDesktop().getWebApp().getAttribute(WebAppListener.CORPUS_CACHE);
     87        cache = (ScanCacheI) Executions.getCurrent().getDesktop().getWebApp().getAttribute(WebAppListener.CORPUS_CACHE);
    9088        //if (cache.isEmpty()) {
    9189        //    liveMode = true;
  • SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/app/WebAppListener.java

    r4069 r5034  
    11package eu.clarin.sru.fcs.aggregator.app;
    22
    3 import eu.clarin.sru.client.SRUClientException;
     3import eu.clarin.sru.fcs.aggregator.cache.ScanCrawlerRunnable;
     4import eu.clarin.sru.fcs.aggregator.cache.ScanCrawler;
     5import eu.clarin.sru.fcs.aggregator.cache.ScanCacheFiled;
     6import eu.clarin.sru.fcs.aggregator.cache.ScanCache;
    47import eu.clarin.sru.client.SRUThreadedClient;
    58import eu.clarin.sru.client.fcs.ClarinFCSRecordParser;
    6 import eu.clarin.sru.fcs.aggregator.sopt.CorporaScanCache;
    7 import eu.clarin.sru.fcs.aggregator.sopt.CorpusCache;
     9import eu.clarin.sru.fcs.aggregator.sopt.CenterRegistryI;
     10import eu.clarin.sru.fcs.aggregator.sopt.CenterRegistryLive;
    811import eu.clarin.sru.fcs.aggregator.sopt.Languages;
     12import eu.clarin.sru.fcs.aggregator.cache.ScanCacheI;
     13import java.io.File;
    914import java.io.IOException;
    1015import java.io.InputStream;
     
    1318import java.util.Set;
    1419import java.util.Timer;
     20import java.util.concurrent.ExecutorService;
     21import java.util.concurrent.Executors;
     22import java.util.concurrent.ScheduledExecutorService;
     23import java.util.concurrent.TimeUnit;
    1524import java.util.logging.Level;
    1625import java.util.logging.Logger;
     
    2231
    2332/**
    24  * Application initialization and clean up: only one SRU threaded client is used 
     33 * Application initialization and clean up: only one SRU threaded client is used
    2534 * in the application, it has to be shut down when the application stops. One
    2635 * Languages object instance is used within the application.
     
    3443    public static final String LANGUAGES = "LANG";
    3544    public static final String CORPUS_CACHE = "CORPUS_CACHE";
    36    
    3745    private static final Logger LOGGER = Logger.getLogger(WebAppListener.class.getName());
    38     private static final int HOURS_BETWEEN_CACHE_UPDATE = 3;
    39    
    40     private Timer cacheTimer;
    41    
     46    //private static final int HOURS_BETWEEN_CACHE_UPDATE = 3;
     47    //private Timer cacheTimer;
    4248    public static final String DE_TOK_MODEL = "/tokenizer/de-tuebadz-8.0-token.bin";
     49    private static final String AGGREGATOR_DIR_NAME = "aggregator";
     50    private static final String SCAN_DIR_NAME = "scan";
     51    private static final TimeUnit CACHE_UPDATE_INTERVAL_UNIT = TimeUnit.HOURS;
     52    private static final int CACHE_UPDATE_INTERVAL = 5;
     53    private static final int CACHE_MAX_DEPTH = 3;
     54    private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
     55    public static final String CORPUS_CRAWLER = "CORPUS_CRAWLER";
    4356
    4457    @Override
    4558    public void init(WebApp webapp) throws Exception {
     59
    4660        LOGGER.info("Aggregator is starting.");
     61       
    4762        Set<SearchResults> activeControllers = new HashSet<SearchResults>();
    4863        webapp.setAttribute(ACTIVE_SEARCH_CONTROLLERS, activeControllers);
    49         SRUThreadedClient searchClient = new SRUThreadedClient();
    50         searchClient.registerRecordParser(new ClarinFCSRecordParser());
    51         webapp.setAttribute(WebAppListener.SHARED_SRU_CLIENT, searchClient);
    5264       
     65        SRUThreadedClient sruClient = new SRUThreadedClient();
     66        sruClient.registerRecordParser(new ClarinFCSRecordParser());
     67        webapp.setAttribute(WebAppListener.SHARED_SRU_CLIENT, sruClient);
     68
    5369        Languages languages = new Languages();
    5470        webapp.setAttribute(LANGUAGES, languages);
    5571       
    56         // set up timer to run the cache corpora scan info task
    57         //cacheTimer = new Timer();
    58         //CorpusCache cache = new CorpusCache();
    59         //webapp.setAttribute(CORPUS_CACHE, cache);
     72        setUpScanCache(webapp);
    6073       
    61         //DateTime date = new DateTime();
    62         //date = date.withHourOfDay(1);
    63         //date = date.withMinuteOfHour(0);
    64         //date = date.withSecondOfMinute(0);
    65         //if (date.isBeforeNow()) {
    66         //    date = date.plusDays(1);
     74        setUpTokenizers(webapp);
     75       
     76    }
     77
     78    @Override
     79    public void cleanup(WebApp webapp) throws Exception {
     80        LOGGER.info("Aggregator is shutting down.");
     81        Set<SearchResults> activeControllers = (Set<SearchResults>) webapp.getAttribute(ACTIVE_SEARCH_CONTROLLERS);
     82        for (SearchResults activeController : activeControllers) {
     83            activeController.shutdown();
     84        }
     85        SRUThreadedClient searchClient = (SRUThreadedClient) webapp.getAttribute(WebAppListener.SHARED_SRU_CLIENT);
     86        shutdownAndAwaitTermination(searchClient);
     87        shutdownAndAwaitTermination(scheduler);
     88        //cacheTimer.cancel();
     89    }
     90
     91    private String getScanDirectory() {
     92        //File aggregatorDir = new File(System.getProperty("user.home"), "/." + AGGREGATOR_DIR_NAME);
     93        File aggregatorDir = new File("/var/www", "/." + AGGREGATOR_DIR_NAME);
     94       
     95        if (!aggregatorDir.exists()) {
     96            aggregatorDir.mkdir();
     97        }
     98        File scanDir = new File(aggregatorDir, SCAN_DIR_NAME);
     99        if (!scanDir.exists()) {
     100            scanDir.mkdir();
     101        }
     102        String scanPath = scanDir.getAbsolutePath();
     103        LOGGER.info("Scan location: " + scanPath);
     104        return scanPath;
     105    }
     106
     107    private void setUpScanCache(WebApp webapp) {
     108
     109        ScanCacheFiled scanCacheFiled = new ScanCacheFiled(getScanDirectory());
     110        CenterRegistryI centerRegistry = new CenterRegistryLive();
     111        SRUThreadedClient sruScanClient = (SRUThreadedClient) webapp.getAttribute(WebAppListener.SHARED_SRU_CLIENT);
     112        //EndpointUrlFilter filter = new EndpointUrlFilter();
     113        //filter.urlShouldContainAnyOf("leipzig", ".mpi.nl");
     114        //filter.urlShouldContainAnyOf("uni-tuebingen.de", ".mpi.nl");
     115        //filter.urlShouldContainAnyOf("dspin.dwds.de", "lindat.");
     116        //ScanCrawler scanCrawler = new ScanCrawler(centerRegistry, sruScanClient, filter, maxDepth);
     117        ScanCrawler scanCrawler = new ScanCrawler(centerRegistry, sruScanClient, null, CACHE_MAX_DEPTH);
     118        ScanCacheI scanCache;
     119
     120        //synchronized (scanCrawler) {
     121            LOGGER.info("Start cache read");
     122            try {
     123                scanCache = scanCacheFiled.read();
     124                LOGGER.info("Finished cache read, number of root corpora: " + scanCache.getRootCorpora().size());
     125            } catch (Exception e) {
     126                LOGGER.log(Level.SEVERE, "Error while reading the scan cache!", e);
     127                scanCache = new ScanCache();
     128            }
    67129        //}
    68         //LOGGER.info(date.toLocalTime().toString() + " " + date.toLocalTime().toString());
    69         //cacheTimer.scheduleAtFixedRate(new CacheCorporaScanTask(cache, searchClient), date.toDate(), HOURS_BETWEEN_CACHE_UPDATE * 3600000);
    70        
    71         // read cache from file
    72         CorporaScanCache cache = new CorporaScanCache(webapp.getRealPath("scan") + "/");
    73         webapp.setAttribute(CORPUS_CACHE, cache);
    74        
     130        webapp.setAttribute(CORPUS_CACHE, scanCache);
     131        webapp.setAttribute(CORPUS_CRAWLER, scanCrawler);
     132
     133        scheduler.scheduleAtFixedRate(
     134                new ScanCrawlerRunnable(scanCrawler, scanCacheFiled, webapp),
     135                0, CACHE_UPDATE_INTERVAL, CACHE_UPDATE_INTERVAL_UNIT);
     136
     137    }
     138
     139    private void shutdownAndAwaitTermination(SRUThreadedClient sruClient) {
     140        // with shutdown() there are memory leaks when web app stops even if all requests have been processed;
     141        // with shutdownNow() there are memory leaks when web app stops only if not all requests have been processed
     142        //searchClient.shutdown();
     143        //searchClient.shutdownNow();
     144        try {
     145            sruClient.shutdown(); // Disable new tasks from being submitted
     146            // Wait 10 secs for existing tasks to terminate
     147            // replace with awaitTermination if ever provided in SRUClient API
     148            Thread.sleep(10000);
     149            sruClient.shutdownNow(); // Cancel currently executing tasks
     150            // Wait 10 secs for tasks to respond to being cancelled
     151            // replace with awaitTermination if ever provided in SRUClient API
     152            Thread.sleep(10000);
     153        } catch (InterruptedException ie) {
     154            // (Re-)Cancel if current thread also interrupted
     155            sruClient.shutdownNow();
     156            // Preserve interrupt status
     157            Thread.currentThread().interrupt();
     158        }
     159    }
     160
     161    private void shutdownAndAwaitTermination(ExecutorService pool) {
     162        pool.shutdown(); // Disable new tasks from being submitted
     163        try {
     164            // Wait a while for existing tasks to terminate
     165            if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
     166                pool.shutdownNow(); // Cancel currently executing tasks
     167                // Wait a while for tasks to respond to being cancelled
     168                if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
     169                    LOGGER.info("Pool did not terminate");
     170                }
     171            }
     172        } catch (InterruptedException ie) {
     173            // (Re-)Cancel if current thread also interrupted
     174            pool.shutdownNow();
     175            // Preserve interrupt status
     176            Thread.currentThread().interrupt();
     177        }
     178    }
     179
     180    private void setUpTokenizers(WebApp webapp) {
    75181        TokenizerModel model = null;
    76182        try {
     
    83189        webapp.setAttribute(DE_TOK_MODEL, model);
    84190    }
    85 
    86     @Override
    87     public void cleanup(WebApp webapp) throws Exception {
    88         LOGGER.info("Aggregator is shutting down.");
    89         Set<SearchResults> activeControllers = (Set<SearchResults>) webapp.getAttribute(ACTIVE_SEARCH_CONTROLLERS);
    90         for (SearchResults activeController : activeControllers) {
    91             activeController.shutdown();
    92         }
    93         SRUThreadedClient searchClient = (SRUThreadedClient) webapp.getAttribute(WebAppListener.SHARED_SRU_CLIENT);
    94         // with shutdown() there are memory leaks when web app stops even if all requests have been processed;
    95         // with shutdownNow() there are memory leaks when web app stops only if not all requests have been processed
    96         searchClient.shutdownNow();
    97         //cacheTimer.cancel();
    98     }
    99191}
  • SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/sopt/CorporaScanCache.java

    r3059 r5034  
    11package eu.clarin.sru.fcs.aggregator.sopt;
    22
     3import eu.clarin.sru.fcs.aggregator.cache.ScanCacheI;
    34import eu.clarin.sru.fcs.aggregator.app.CacheCorporaScanIntoFileTask;
    45import java.io.BufferedInputStream;
     
    2526 * @author Yana Panchenko
    2627 */
    27 public class CorporaScanCache {
     28@Deprecated
     29public class CorporaScanCache implements ScanCacheI {
    2830
    2931    private Map<String, List<Corpus>> enpUrlToRootCorpora = new LinkedHashMap<String, List<Corpus>>(30);
     
    4547
    4648    public CorporaScanCache(String corporaScanDir) {
    47         createCache(corporaScanDir);
     49        readCache(corporaScanDir);
    4850    }
    4951
     
    6365        return rootCorpora;
    6466    }
    65 
    66     public List<Corpus> getRootCorpora(String lang) {
     67   
     68   
     69
     70    @Override
     71    public List<Corpus> getRootCorporaOfEndpoint(String enpointUrl) {
     72        return this.enpUrlToRootCorpora.get(enpointUrl);
     73    }
     74
     75    public List<Corpus> getRootCorporaForLang(String lang) {
    6776        List<Corpus> rootCorpora = new ArrayList<Corpus>(enpUrlToRootCorpora.size());
    6877        for (List<Corpus> corpora : this.enpUrlToRootCorpora.values()) {
     
    7685    }
    7786
     87    @Override
    7888    public Set<String> getLanguages() {
    7989        Set<String> languages = new HashSet<String>(this.langToCorpora.size());
     
    8292    }
    8393
     94    @Override
    8495    public List<Corpus> getChildren(Corpus corpus) {
    8596        List<Corpus> corpora = this.corpusToChildren.get(corpus);
     
    98109    }
    99110
    100     private void createCache(String corporaScanDir) {
     111    private void readCache(String corporaScanDir) {
    101112        File sruInstitutionsFile = new File(corporaScanDir + "inst.txt");
    102113        BufferedReader reader = null;
     
    292303        System.out.println(cache);
    293304    }
     305
     306    @Override
     307    public Map<String, Set<Corpus>> getRootCorporaForLang() {
     308        throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
     309    }
     310
     311    @Override
     312    public Map<String, Set<Corpus>> getTopUniqueLangToCorpora() {
     313        throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
     314    }
     315
     316    @Override
     317    public List<Corpus> getTopUniqueLanguageCorpora(String lang) {
     318        throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
     319    }
     320
    294321}
  • SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/sopt/CorpusModelCached.java

    r3059 r5034  
    11package eu.clarin.sru.fcs.aggregator.sopt;
    22
     3import eu.clarin.sru.fcs.aggregator.cache.ScanCacheI;
    34import java.util.ArrayList;
    45import java.util.Arrays;
     
    2122    private static final Logger logger = Logger.getLogger(CorpusModelCached.class.getName());
    2223    private Map<String, Set<Corpus>> selectedCorpora = new HashMap<String, Set<Corpus>>();
    23     private CorporaScanCache cache;
     24    private ScanCacheI cache;
    2425
    25     public CorpusModelCached(CorporaScanCache cache) {
     26    public CorpusModelCached(ScanCacheI cache) {
    2627        super(new DefaultTreeNode(new Corpus(), new ArrayList<DefaultTreeNode<Corpus>>()));
    2728        this.cache = cache;
  • SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/sopt/Endpoint.java

    r3044 r5034  
    3636        return url;
    3737    }
     38
     39    @Override
     40    public int hashCode() {
     41        int hash = 7;
     42        hash = 59 * hash + (this.url != null ? this.url.hashCode() : 0);
     43        return hash;
     44    }
     45
     46    @Override
     47    public boolean equals(Object obj) {
     48        if (obj == null) {
     49            return false;
     50        }
     51        if (getClass() != obj.getClass()) {
     52            return false;
     53        }
     54        final Endpoint other = (Endpoint) obj;
     55        if ((this.url == null) ? (other.url != null) : !this.url.equals(other.url)) {
     56            return false;
     57        }
     58        return true;
     59    }
     60
     61   
     62   
     63   
    3864}
  • SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/sopt/Institution.java

    r3059 r5034  
    22
    33import java.util.*;
    4 import java.util.logging.Logger;
    54
    65/**
     
    2322    @Override
    2423    public Endpoint add(String endpointUrl) {
    25         Endpoint ep = new Endpoint(endpointUrl, this);
    26         endpoints.add(ep);
     24        Endpoint ep = getEndpoint(endpointUrl);
     25        if (ep == null) {
     26            ep = new Endpoint(endpointUrl, this);
     27            endpoints.add(ep);
     28        }
    2729        return ep;
    2830    }
     
    5355   
    5456    @Override
     57    public Endpoint getEndpoint(String endpointUrl) {
     58        for (Endpoint ep : endpoints) {
     59            if (ep.getUrl().equals(endpointUrl)) {
     60                return ep;
     61            }
     62        }
     63        return null;
     64    }
     65   
     66    @Override
    5567    public String toString() {
    5668        if (name != null && name.length() > 0) {
     
    6072       }
    6173    }
     74   
     75   
    6276}
  • SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/sopt/InstitutionI.java

    r3059 r5034  
    2020   
    2121    public Endpoint getEndpoint(int index);
     22   
     23    public Endpoint getEndpoint(String endpointUrl);
    2224}
  • SRUAggregator/trunk/src/main/webapp/WEB-INF/web.xml

    r2502 r5034  
    4040        <url-pattern>/zkau/*</url-pattern>
    4141    </servlet-mapping>
     42   
     43    <servlet>
     44        <servlet-name>ServletAdaptor</servlet-name>
     45        <servlet-class>com.sun.jersey.spi.container.servlet.ServletContainer</servlet-class>
     46         <init-param>
     47         <param-name>javax.ws.rs.Application</param-name>
     48        <param-value>eu.clarin.sru.fcs.aggregator.rest.AggregatorService</param-value>
     49        </init-param>
     50        <load-on-startup>1</load-on-startup>
     51    </servlet>
     52    <servlet-mapping>
     53        <servlet-name>ServletAdaptor</servlet-name>
     54        <url-pattern>/service/*</url-pattern>
     55    </servlet-mapping>
     56   
    4257    <welcome-file-list>
    4358        <welcome-file>index.zul</welcome-file>
Note: See TracChangeset for help on using the changeset viewer.