Changeset 6131 for SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/app/Aggregator.java
- Timestamp:
- 04/07/15 20:38:40 (9 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/app/Aggregator.java
r6123 r6131 8 8 import com.optimaize.langdetect.profiles.LanguageProfileReader; 9 9 import com.optimaize.langdetect.text.*; 10 import eu.clarin.sru.client.SRUThreadedClient; 10 11 import eu.clarin.sru.fcs.aggregator.search.Search; 11 12 import eu.clarin.sru.fcs.aggregator.scan.ScanCrawlTask; … … 14 15 import eu.clarin.sru.client.fcs.ClarinFCSClientBuilder; 15 16 import eu.clarin.sru.client.fcs.ClarinFCSEndpointDescriptionParser; 17 import eu.clarin.sru.fcs.aggregator.client.MaxConcurrentRequestsCallback; 16 18 import eu.clarin.sru.fcs.aggregator.client.ThrottledClient; 17 19 import eu.clarin.sru.fcs.aggregator.scan.Corpus; … … 25 27 import java.io.File; 26 28 import java.io.IOException; 29 import java.net.URI; 27 30 import java.util.ArrayList; 28 31 import java.util.Collections; … … 65 68 * @author edima 66 69 * 67 * TODO: improve help page text68 *69 70 * TODO: update comments everywhere 70 71 * … … 102 103 private TextObjectFactory textObjectFactory; 103 104 104 private ThrottledClient sruScanClient = null; 105 private ThrottledClient sruSearchClient = null; 105 private ThrottledClient sruClient = null; 106 public MaxConcurrentRequestsCallback maxScanConcurrentRequestsCallback; 107 public MaxConcurrentRequestsCallback maxSearchConcurrentRequestsCallback; 106 108 private Map<Long, Search> activeSearches = Collections.synchronizedMap(new HashMap<Long, Search>()); 107 109 … … 173 175 public void init() throws IOException { 174 176 log.info("Aggregator initialization started."); 175 sruScanClient = new ThrottledClient( 176 new ClarinFCSClientBuilder() 177 178 SRUThreadedClient sruScanClient 179 = new ClarinFCSClientBuilder() 177 180 .setConnectTimeout(params.ENDPOINTS_SCAN_TIMEOUT_MS) 178 181 .setSocketTimeout(params.ENDPOINTS_SCAN_TIMEOUT_MS) … … 181 184 new ClarinFCSEndpointDescriptionParser()) 182 185 .enableLegacySupport() 183 .buildThreadedClient(), 184 params.SCAN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT 185 ); 186 sruSearchClient = new ThrottledClient( 187 new ClarinFCSClientBuilder() 186 .buildThreadedClient(); 187 188 SRUThreadedClient sruSearchClient 189 = new ClarinFCSClientBuilder() 188 190 .setConnectTimeout(params.ENDPOINTS_SEARCH_TIMEOUT_MS) 189 191 .setSocketTimeout(params.ENDPOINTS_SEARCH_TIMEOUT_MS) 190 192 .addDefaultDataViewParsers() 193 .registerExtraResponseDataParser( 194 new ClarinFCSEndpointDescriptionParser()) 191 195 .enableLegacySupport() 192 .buildThreadedClient(), 193 params.SEARCH_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT 196 .buildThreadedClient(); 197 198 maxScanConcurrentRequestsCallback = new MaxConcurrentRequestsCallback() { 199 @Override 200 public int getMaxConcurrentRequest(URI baseURI) { 201 return params.SCAN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT; 202 } 203 }; 204 205 maxSearchConcurrentRequestsCallback = new MaxConcurrentRequestsCallback() { 206 @Override 207 public int getMaxConcurrentRequest(URI baseURI) { 208 return params.slowEndpoints.contains(baseURI) 209 ? params.SEARCH_MAX_CONCURRENT_REQUESTS_PER_SLOW_ENDPOINT 210 : params.SEARCH_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT; 211 } 212 }; 213 214 sruClient = new ThrottledClient( 215 sruScanClient, maxScanConcurrentRequestsCallback, 216 sruSearchClient, maxSearchConcurrentRequestsCallback 194 217 ); 195 218 … … 221 244 initLanguageDetector(); 222 245 223 ScanCrawlTask task = new ScanCrawlTask(sru ScanClient,246 ScanCrawlTask task = new ScanCrawlTask(sruClient, 224 247 params.CENTER_REGISTRY_URL, params.SCAN_MAX_DEPTH, 225 248 params.additionalCQLEndpoints, … … 237 260 search.shutdown(); 238 261 } 239 shutdownAndAwaitTermination(config.aggregatorParams, sruScanClient, scheduler); 240 shutdownAndAwaitTermination(config.aggregatorParams, sruSearchClient, scheduler); 262 shutdownAndAwaitTermination(config.aggregatorParams, sruClient, scheduler); 241 263 log.info("Aggregator shutdown complete."); 242 264 } … … 253 275 return null; 254 276 } else { 255 Search sr = new Search(sruSearchClient, version, searchStatsAtom.get(), 277 Search sr = new Search(sruClient, 278 version, searchStatsAtom.get(), 256 279 corpora, searchString, searchLang, maxRecords); 257 280 if (activeSearches.size() > SEARCHES_SIZE_GC_THRESHOLD) {
Note: See TracChangeset
for help on using the changeset viewer.