Changeset 5901 for SRUAggregator


Ignore:
Timestamp:
12/11/14 15:58:31 (9 years ago)
Author:
emanuel.dima@uni-tuebingen.de
Message:
  1. alpha10: throttled client bug fix, more todos
Location:
SRUAggregator/trunk
Files:
11 edited

Legend:

Unmodified
Added
Removed
  • SRUAggregator/trunk/aggregator_development.yml

    r5900 r5901  
    11aggregatorParams:
    2   CENTER_REGISTRY_URL: http://centerregistry-clarin.esc.rzg.mpg.de/restxml/
     2  CENTER_REGISTRY_URL: http://centres.clarin.eu/restxml/
    33  WEBLICHT_URL: https://weblicht.sfs.uni-tuebingen.de/WebLicht-4/?input=
    44
    55  AGGREGATOR_FILE_PATH: /Users/edima/fcsAggregatorCorpora.json
    6   SCAN_MAX_DEPTH: 1
     6  SCAN_MAX_DEPTH: 3
    77  SCAN_TASK_INITIAL_DELAY: 0
    8   SCAN_TASK_INTERVAL: 6
     8  SCAN_TASK_INTERVAL: 24
    99  SCAN_TASK_TIME_UNIT: HOURS
    1010
    11   ENDPOINTS_SCAN_TIMEOUT_MS: 60000
    12   ENDPOINTS_SEARCH_TIMEOUT_MS: 5000
     11  SCAN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT: 4  # @depth2: 1=361s; 2=225s; 4=207s
     12  SEARCH_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT: 4
     13
     14
     15  ENDPOINTS_SCAN_TIMEOUT_MS: 10000
     16  ENDPOINTS_SEARCH_TIMEOUT_MS: 10000
    1317  EXECUTOR_SHUTDOWN_TIMEOUT_MS: 100
    1418
  • SRUAggregator/trunk/pom.xml

    r5900 r5901  
    88        <groupId>eu.clarin.sru.fcs</groupId>
    99        <artifactId>Aggregator2</artifactId>
    10         <version>2.0.0-alpha-9</version>
     10        <version>2.0.0-alpha-10</version>
    1111        <name>FCS Aggregator</name>
    1212
  • SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/app/Aggregator.java

    r5900 r5901  
    8080 * @author edima
    8181 *
     82 * TODO: version page: credits, open source, see vcr/version page
     83 *
     84 * TODO: change the order in the GUI: selected collections after the
     85 * language/layer
     86 *
     87 * TODO: condensed list of corpora
     88 *
     89 * TODO: group the list of corpora by institution?
     90 *
     91 * TODO: fix ordering of corpora in corpora view
     92 *
     93 * TODO: corpora search should not indicate the ones that don't match
     94 *
     95 * TODO: try to refine by language using a language library, with UI element
     96 *
     97 * TODO: helpdesk: switch to english (parameter of the form)
     98 *
     99 * TODO: label: "phonetic transcriptions" (ask the BAS guys)
     100 *
     101 * TODO: number of results control: make the buttons larger
     102 *
     103 * TODO: Collections view: home link (make a single consistent text for it)
     104 *
    82105 * TODO: push footer down
    83106 *
     
    107130 *
    108131 * TODO: show multiple hits on the same result in multiple rows, linked visually
     132 *
     133 * TODO: optimise page load
    109134 *
    110135 * TODO: improve help page text
     
    190215                log.info("Aggregator initialization started.");
    191216                sruScanClient = new ThrottledClient(
    192                                 new ClarinFCSClientBuilder()
     217                        new ClarinFCSClientBuilder()
    193218                                .setConnectTimeout(params.ENDPOINTS_SCAN_TIMEOUT_MS)
    194219                                .setSocketTimeout(params.ENDPOINTS_SCAN_TIMEOUT_MS)
    195220                                .addDefaultDataViewParsers()
    196221                                .enableLegacySupport()
    197                                 .buildThreadedClient());
     222                                .buildThreadedClient(),
     223                                params.SCAN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT,
     224                                params.SEARCH_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT
     225                );
    198226                sruSearchClient = new ThrottledClient(
    199227                                new ClarinFCSClientBuilder()
     
    202230                                .addDefaultDataViewParsers()
    203231                                .enableLegacySupport()
    204                                 .buildThreadedClient());
     232                                .buildThreadedClient(),
     233                                params.SCAN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT,
     234                                params.SEARCH_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT
     235                );
    205236
    206237                File corporaCacheFile = new File(params.AGGREGATOR_FILE_PATH);
  • SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/app/AggregatorConfiguration.java

    r5900 r5901  
    4242                @JsonProperty
    4343                @Range
     44                int SCAN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT;
     45
     46                @JsonProperty
     47                @Range
     48                int SEARCH_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT;
     49
     50                @JsonProperty
     51                @Range
    4452                int ENDPOINTS_SCAN_TIMEOUT_MS;
    4553
     
    6674                        return ENDPOINTS_SEARCH_TIMEOUT_MS;
    6775                }
     76
     77                @JsonIgnore
     78                public int getSCAN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT() {
     79                        return SCAN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT;
     80                }
     81
     82                @JsonIgnore
     83                public int getSEARCH_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT() {
     84                        return SEARCH_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT;
     85                }
    6886        }
    69 
    7087        public Params aggregatorParams = new Params();
    7188}
  • SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/client/GenericClient.java

    r5893 r5901  
    1414class GenericClient {
    1515        private final SRUThreadedClient sruClient;
     16        public final int maxConcurrentRequests;
    1617        // queue of operations waiting for execution
    17         private final Map<URI, Queue<Operation>> endpoint2exec = new HashMap<URI, Queue<Operation>>();
    18         private final int nowExecuting = 0;
     18
     19        static class ExecQueue {
     20
     21                int nowExecuting = 0;
     22                Queue<Operation> queue = new ArrayDeque<>();
     23        }
     24        private final Map<URI, ExecQueue> endpointMap = new HashMap<URI, ExecQueue>();
    1925        private final Object lock = new Object();
    2026
    21         GenericClient(SRUThreadedClient sruClient) {
     27        GenericClient(SRUThreadedClient sruClient, int maxConcurrentRequests) {
    2228                this.sruClient = sruClient;
     29                this.maxConcurrentRequests = maxConcurrentRequests;
    2330        }
    2431
     
    2633                op.setClient(this);
    2734                synchronized (lock) {
    28                         if (!endpoint2exec.containsKey(endpoint)) {
    29                                 endpoint2exec.put(endpoint, new ArrayDeque<Operation>());
     35                        ExecQueue eq = endpointMap.get(endpoint);
     36                        if (eq == null) {
     37                                eq = new ExecQueue();
     38                                endpointMap.put(endpoint, eq);
    3039                        }
    3140                        op.stats().enqueuedTime = System.currentTimeMillis();
    32                         endpoint2exec.get(endpoint).add(op);
     41                        eq.queue.add(op);
     42                        eq.nowExecuting++; // counter the following decrement in executeNext
    3343                        executeNextOperationOfEndpoint(endpoint);
    3444                }
     
    3747        void executeNextOperationOfEndpoint(URI endpoint) {
    3848                synchronized (lock) {
    39                         Queue<Operation> queue = endpoint2exec.get(endpoint);
    40                         if (queue == null || queue.isEmpty()) {
     49                        ExecQueue eq = endpointMap.get(endpoint);
     50                        eq.nowExecuting--; // assume an operation just finished
     51                        if (eq.queue.isEmpty()) {
    4152                                return;
    4253                        }
    43                         if (nowExecuting >= ThrottledClient.MAX_CONCURRENT_REQUESTS) {
     54                        if (eq.nowExecuting >= maxConcurrentRequests) {
    4455                                return;
    4556                        }
    46                         Operation op = queue.poll();
     57                        eq.nowExecuting++;
     58                        Operation op = eq.queue.poll();
    4759                        op.stats().startedTime = System.currentTimeMillis();
    4860                        op.execute(sruClient);
  • SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/client/ScanOperation.java

    r5894 r5901  
    3333                        sruClient.scan(request, this);
    3434                } catch (SRUClientException xc) {
    35                         stats.finishedTime = System.currentTimeMillis();
    36                         callback.onError(request, xc, stats);
     35                        onError(request, xc);
    3736                }
    3837        }
  • SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/client/ThrottledClient.java

    r5893 r5901  
    1212 */
    1313public class ThrottledClient {
    14 
    15         public final static int MAX_CONCURRENT_REQUESTS = 8;
    1614
    1715        public interface Stats {
     
    4038        GenericClient searchClient;
    4139
    42         public ThrottledClient(SRUThreadedClient sruClient) {
     40        public ThrottledClient(SRUThreadedClient sruClient, int maxConcurrentScanRequests, int maxConcurrentSearchRequests) {
    4341                this.sruClient = sruClient;
    44                 this.scanClient = new GenericClient(sruClient);
    45                 this.searchClient = new GenericClient(sruClient);
     42                this.scanClient = new GenericClient(sruClient, maxConcurrentScanRequests);
     43                this.searchClient = new GenericClient(sruClient, maxConcurrentSearchRequests);
    4644        }
    4745
  • SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/rest/RestService.java

    r5900 r5901  
    7575                                put("lastScanStats", new HashMap<String, Object>() {
    7676                                        {
    77                                                 put("maxConcurrentRequestsPerEndpoint", ThrottledClient.MAX_CONCURRENT_REQUESTS);
     77                                                put("maxConcurrentScanRequestsPerEndpoint",
     78                                                                Aggregator.getInstance().getParams().getSCAN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT());
    7879                                                put("timeout", params.getENDPOINTS_SCAN_TIMEOUT_MS() / 1000.);
    7980                                                put("institutions", scan.getInstitutions());
     
    8283                                put("searchStats", new HashMap<String, Object>() {
    8384                                        {
    84                                                 put("maxConcurrentRequestsPerEndpoint", ThrottledClient.MAX_CONCURRENT_REQUESTS);
     85                                                put("maxConcurrentSearchRequestsPerEndpoint",
     86                                                                Aggregator.getInstance().getParams().getSEARCH_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT());
    8587                                                put("timeout", params.getENDPOINTS_SEARCH_TIMEOUT_MS() / 1000.);
    8688                                                put("institutions", search.getInstitutions());
  • SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/scan/ScanCrawler.java

    r5900 r5901  
    8989                }
    9090
    91                 log.info("{} Start scan: {}", latch.get(), st.endpointUrl);
     91                log.info("{} Start scan: {}#{}", latch.get(), st.endpointUrl, normalizeHandle(st.parentCorpus));
    9292                latch.increment();
    9393                sruClient.scan(scanRequest, st);
     
    144144                                }
    145145
    146                                 log.info("{} Finished scan: {}", latch.get(), endpointUrl);
     146                                log.info("{} Finished scan: {}#{}", latch.get(), endpointUrl, normalizeHandle(parentCorpus));
    147147                        } catch (Exception xc) {
    148                                 log.error("{} Exception in callback {}", latch.get(), endpointUrl);
     148                                log.error("{} Exception in callback {}#{}", latch.get(), endpointUrl, normalizeHandle(parentCorpus));
    149149                                log.error("--> ", xc);
    150150                        } finally {
     
    155155                @Override
    156156                public void onError(SRUScanRequest request, SRUClientException error, ThrottledClient.Stats stats) {
    157                         latch.decrement();
    158                         log.error("{} Error while scanning {}: {}", latch.get(), endpointUrl, error.getMessage());
    159                         statistics.addEndpointDatapoint(institution, endpointUrl, stats.getQueueTime(), stats.getExecutionTime());
    160                         statistics.addErrorDatapoint(institution, endpointUrl, error);
    161                         if (Throw.isCausedBy(error, SocketTimeoutException.class)) {
    162                                 return;
    163                         }
    164                         log.error("--> " + request.getBaseURI() + "?" + request.getScanClause() + " --> ", error);
     157                        try {
     158                                log.error("{} Error while scanning {}#{}: {}", latch.get(), endpointUrl, normalizeHandle(parentCorpus), error.getMessage());
     159                                statistics.addEndpointDatapoint(institution, endpointUrl, stats.getQueueTime(), stats.getExecutionTime());
     160                                statistics.addErrorDatapoint(institution, endpointUrl, error);
     161                                if (Throw.isCausedBy(error, SocketTimeoutException.class)) {
     162                                        return;
     163                                }
     164                                log.error("--> " + request.getBaseURI() + "?" + request.getScanClause() + " --> ", error);
     165                        } finally {
     166                                latch.decrement();
     167                        }
    165168                }
    166169        }
  • SRUAggregator/trunk/src/main/resources/assets/index.html

    r5900 r5901  
    2424                                <a title="about" id="aboutlink">
    2525                                        <span class="glyphicon glyphicon-info-sign"></span>
    26                                         <span>VERSION 2.0.0.α9</span>
     26                                        <span>VERSION 2.0.0.α10</span>
    2727                                </a>
    2828                        </div>
  • SRUAggregator/trunk/src/test/java/eu/clarin/sru/fcs/aggregator/app/ScanCrawlerTest.java

    r5893 r5901  
    3030                                new ClarinFCSClientBuilder()
    3131                                .addDefaultDataViewParsers()
    32                                 .buildThreadedClient());
     32                                .buildThreadedClient(), 2, 2);
    3333
    3434                try {
Note: See TracChangeset for help on using the changeset viewer.