Ignore:
Timestamp:
04/07/15 20:38:40 (9 years ago)
Author:
emanuel.dima@uni-tuebingen.de
Message:
  1. beta-33: support for slow endpoints; POST external searches; UI errors for corner cases
File:
1 edited

Legend:

Unmodified
Added
Removed
  • SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/app/Aggregator.java

    r6123 r6131  
    88import com.optimaize.langdetect.profiles.LanguageProfileReader;
    99import com.optimaize.langdetect.text.*;
     10import eu.clarin.sru.client.SRUThreadedClient;
    1011import eu.clarin.sru.fcs.aggregator.search.Search;
    1112import eu.clarin.sru.fcs.aggregator.scan.ScanCrawlTask;
     
    1415import eu.clarin.sru.client.fcs.ClarinFCSClientBuilder;
    1516import eu.clarin.sru.client.fcs.ClarinFCSEndpointDescriptionParser;
     17import eu.clarin.sru.fcs.aggregator.client.MaxConcurrentRequestsCallback;
    1618import eu.clarin.sru.fcs.aggregator.client.ThrottledClient;
    1719import eu.clarin.sru.fcs.aggregator.scan.Corpus;
     
    2527import java.io.File;
    2628import java.io.IOException;
     29import java.net.URI;
    2730import java.util.ArrayList;
    2831import java.util.Collections;
     
    6568 * @author edima
    6669 *
    67  * TODO: improve help page text
    68  *
    6970 * TODO: update comments everywhere
    7071 *
     
    102103        private TextObjectFactory textObjectFactory;
    103104
    104         private ThrottledClient sruScanClient = null;
    105         private ThrottledClient sruSearchClient = null;
     105        private ThrottledClient sruClient = null;
     106        public MaxConcurrentRequestsCallback maxScanConcurrentRequestsCallback;
     107        public MaxConcurrentRequestsCallback maxSearchConcurrentRequestsCallback;
    106108        private Map<Long, Search> activeSearches = Collections.synchronizedMap(new HashMap<Long, Search>());
    107109
     
    173175        public void init() throws IOException {
    174176                log.info("Aggregator initialization started.");
    175                 sruScanClient = new ThrottledClient(
    176                                 new ClarinFCSClientBuilder()
     177
     178                SRUThreadedClient sruScanClient
     179                                = new ClarinFCSClientBuilder()
    177180                                .setConnectTimeout(params.ENDPOINTS_SCAN_TIMEOUT_MS)
    178181                                .setSocketTimeout(params.ENDPOINTS_SCAN_TIMEOUT_MS)
     
    181184                                                new ClarinFCSEndpointDescriptionParser())
    182185                                .enableLegacySupport()
    183                                 .buildThreadedClient(),
    184                                 params.SCAN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT
    185                 );
    186                 sruSearchClient = new ThrottledClient(
    187                                 new ClarinFCSClientBuilder()
     186                                .buildThreadedClient();
     187
     188                SRUThreadedClient sruSearchClient
     189                                = new ClarinFCSClientBuilder()
    188190                                .setConnectTimeout(params.ENDPOINTS_SEARCH_TIMEOUT_MS)
    189191                                .setSocketTimeout(params.ENDPOINTS_SEARCH_TIMEOUT_MS)
    190192                                .addDefaultDataViewParsers()
     193                                .registerExtraResponseDataParser(
     194                                                new ClarinFCSEndpointDescriptionParser())
    191195                                .enableLegacySupport()
    192                                 .buildThreadedClient(),
    193                                 params.SEARCH_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT
     196                                .buildThreadedClient();
     197
     198                maxScanConcurrentRequestsCallback = new MaxConcurrentRequestsCallback() {
     199                        @Override
     200                        public int getMaxConcurrentRequest(URI baseURI) {
     201                                return params.SCAN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT;
     202                        }
     203                };
     204
     205                maxSearchConcurrentRequestsCallback = new MaxConcurrentRequestsCallback() {
     206                        @Override
     207                        public int getMaxConcurrentRequest(URI baseURI) {
     208                                return params.slowEndpoints.contains(baseURI)
     209                                                ? params.SEARCH_MAX_CONCURRENT_REQUESTS_PER_SLOW_ENDPOINT
     210                                                : params.SEARCH_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT;
     211                        }
     212                };
     213
     214                sruClient = new ThrottledClient(
     215                                sruScanClient, maxScanConcurrentRequestsCallback,
     216                                sruSearchClient, maxSearchConcurrentRequestsCallback
    194217                );
    195218
     
    221244                initLanguageDetector();
    222245
    223                 ScanCrawlTask task = new ScanCrawlTask(sruScanClient,
     246                ScanCrawlTask task = new ScanCrawlTask(sruClient,
    224247                                params.CENTER_REGISTRY_URL, params.SCAN_MAX_DEPTH,
    225248                                params.additionalCQLEndpoints,
     
    237260                        search.shutdown();
    238261                }
    239                 shutdownAndAwaitTermination(config.aggregatorParams, sruScanClient, scheduler);
    240                 shutdownAndAwaitTermination(config.aggregatorParams, sruSearchClient, scheduler);
     262                shutdownAndAwaitTermination(config.aggregatorParams, sruClient, scheduler);
    241263                log.info("Aggregator shutdown complete.");
    242264        }
     
    253275                        return null;
    254276                } else {
    255                         Search sr = new Search(sruSearchClient, version, searchStatsAtom.get(),
     277                        Search sr = new Search(sruClient,
     278                                        version, searchStatsAtom.get(),
    256279                                        corpora, searchString, searchLang, maxRecords);
    257280                        if (activeSearches.size() > SEARCHES_SIZE_GC_THRESHOLD) {
Note: See TracChangeset for help on using the changeset viewer.