source: SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/app/Aggregator.java @ 6145

Last change on this file since 6145 was 6145, checked in by emanuel.dima@uni-tuebingen.de, 9 years ago
  1. beta-36: debugging js object, misc
File size: 11.3 KB
Line 
1package eu.clarin.sru.fcs.aggregator.app;
2
3import com.fasterxml.jackson.databind.ObjectMapper;
4import com.optimaize.langdetect.LanguageDetector;
5import com.optimaize.langdetect.LanguageDetectorBuilder;
6import com.optimaize.langdetect.ngram.NgramExtractors;
7import com.optimaize.langdetect.profiles.LanguageProfile;
8import com.optimaize.langdetect.profiles.LanguageProfileReader;
9import com.optimaize.langdetect.text.*;
10import eu.clarin.sru.client.SRUThreadedClient;
11import eu.clarin.sru.fcs.aggregator.search.Search;
12import eu.clarin.sru.fcs.aggregator.scan.ScanCrawlTask;
13import eu.clarin.sru.fcs.aggregator.scan.Corpora;
14import eu.clarin.sru.client.SRUVersion;
15import eu.clarin.sru.client.fcs.ClarinFCSClientBuilder;
16import eu.clarin.sru.client.fcs.ClarinFCSEndpointDescriptionParser;
17import eu.clarin.sru.fcs.aggregator.client.MaxConcurrentRequestsCallback;
18import eu.clarin.sru.fcs.aggregator.client.ThrottledClient;
19import eu.clarin.sru.fcs.aggregator.scan.Corpus;
20import eu.clarin.sru.fcs.aggregator.rest.RestService;
21import eu.clarin.sru.fcs.aggregator.scan.Statistics;
22import eu.clarin.sru.fcs.aggregator.util.LanguagesISO693;
23import io.dropwizard.Application;
24import io.dropwizard.assets.AssetsBundle;
25import io.dropwizard.setup.Bootstrap;
26import io.dropwizard.setup.Environment;
27import java.io.File;
28import java.io.IOException;
29import java.net.URI;
30import java.util.ArrayList;
31import java.util.Collections;
32import java.util.HashMap;
33import java.util.List;
34import java.util.Map;
35import java.util.concurrent.ExecutorService;
36import java.util.concurrent.Executors;
37import java.util.concurrent.ScheduledExecutorService;
38import java.util.concurrent.atomic.AtomicReference;
39import org.eclipse.jetty.server.session.SessionHandler;
40import org.slf4j.LoggerFactory;
41
42/**
43 * Main component of the Aggregator application intended to provide users access
44 * to CLARIN-FCS resources.
45 *
46 * The webapp base URL corresponds to the default behavior of displaying the
47 * main aggregator page, where the user can enter query, select the resources of
48 * CQL endpoints (as specified in the Clarin center registry), and search in
49 * these resources. The endpoints/resources selection is optional, by default
50 * all the endpoints root resources are selected.
51 *
52 * If invoked with 'x-aggregation-context' and 'query' parameter, the aggregator
53 * will pre-select provided resources and fill in the query field. This
54 * mechanism is currently used by VLO.
55 *
56 * Example: POST
57 * http://weblicht.sfs.uni-tuebingen.de/Aggregator HTTP/1.1
58 * query = bellen & x-aggregation-context =
59 * {"http://fedora.clarin-d.uni-saarland.de/sru/":["hdl:11858/00-246C-0000-0008-5F2A-0"]}
60 *
61 * If the Aggregator web page has the URL query string parameter 'mode'
62 * set to the string 'search', and the 'query' parameter is set,
63 * then the aggregator search results for this query are immediately displayed
64 * (i.e. users don't need to click 'search' in the aggregator page).
65 * This feature has been requested initially by Martin Wynne from CLARIN ERIC
66 *
67 * @author Yana Panchenko
68 * @author edima
69 *
70 * TODO: update comments everywhere
71 *
72 * TODO: add some basic docs
73 *
74 * TODO: Export search results to personal workspace using oauth
75 *
76 * TODO: websockets
77 *
78 * TODO: show multiple hits on the same result in multiple rows, linked visually
79 *
80 * TODO: optimise page load
81 *
82 * TODO: test it in IE, other browsers
83 *
84 * TODO: expand parents of x-aggregator-context selected corpus
85 *
86 */
87public class Aggregator extends Application<AggregatorConfiguration> {
88
89        private static final org.slf4j.Logger log = LoggerFactory.getLogger(Aggregator.class);
90
91        final int SEARCHES_SIZE_GC_THRESHOLD = 1000;
92        final int SEARCHES_AGE_GC_THRESHOLD = 60;
93
94        private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
95        private static Aggregator instance;
96        private AggregatorConfiguration.Params params;
97
98        private AtomicReference<Corpora> scanCacheAtom = new AtomicReference<Corpora>(new Corpora());
99        private AtomicReference<Statistics> scanStatsAtom = new AtomicReference<Statistics>(new Statistics());
100        private AtomicReference<Statistics> searchStatsAtom = new AtomicReference<Statistics>(new Statistics());
101
102        private LanguageDetector languageDetector;
103        private TextObjectFactory textObjectFactory;
104
105        private ThrottledClient sruClient = null;
106        public MaxConcurrentRequestsCallback maxScanConcurrentRequestsCallback;
107        public MaxConcurrentRequestsCallback maxSearchConcurrentRequestsCallback;
108        private Map<Long, Search> activeSearches = Collections.synchronizedMap(new HashMap<Long, Search>());
109
110        public static void main(String[] args) throws Exception {
111                new Aggregator().run(args);
112        }
113
114        @Override
115        public String getName() {
116                return "FCS Aggregator";
117        }
118
119        @Override
120        public void initialize(Bootstrap<AggregatorConfiguration> bootstrap) {
121                bootstrap.addBundle(new AssetsBundle("/assets", "/", "index.html", "static"));
122        }
123
124        @Override
125        public void run(AggregatorConfiguration config, Environment environment) throws Exception {
126                params = config.aggregatorParams;
127                instance = this;
128
129                List<String> wll = new ArrayList<String>();
130                for (String l : config.aggregatorParams.weblichtConfig.getAcceptedTcfLanguages()) {
131                        wll.add(LanguagesISO693.getInstance().code_3ForCode(l));
132                }
133                config.aggregatorParams.weblichtConfig.acceptedTcfLanguages = wll;
134
135                System.out.println("Using parameters: ");
136                try {
137                        System.out.println(new ObjectMapper().writerWithDefaultPrettyPrinter().
138                                        writeValueAsString(config.aggregatorParams));
139                } catch (IOException xc) {
140                }
141
142                environment.getApplicationContext().setSessionHandler(new SessionHandler());
143                environment.getApplicationContext().setErrorHandler(new ErrorHandler());
144                environment.jersey().setUrlPattern("/rest/*");
145                environment.jersey().register(new RestService());
146
147                try {
148                        init();
149                } catch (Exception ex) {
150                        log.error("INIT EXCEPTION", ex);
151                        throw ex; // force exit
152                }
153        }
154
155        public static Aggregator getInstance() {
156                return instance;
157        }
158
159        public AggregatorConfiguration.Params getParams() {
160                return params;
161        }
162
163        public Corpora getCorpora() {
164                return scanCacheAtom.get();
165        }
166
167        public Statistics getScanStatistics() {
168                return scanStatsAtom.get();
169        }
170
171        public Statistics getSearchStatistics() {
172                return searchStatsAtom.get();
173        }
174
175        public void init() throws IOException {
176                log.info("Aggregator initialization started.");
177
178                SRUThreadedClient sruScanClient
179                                = new ClarinFCSClientBuilder()
180                                .setConnectTimeout(params.ENDPOINTS_SCAN_TIMEOUT_MS)
181                                .setSocketTimeout(params.ENDPOINTS_SCAN_TIMEOUT_MS)
182                                .addDefaultDataViewParsers()
183                                .registerExtraResponseDataParser(
184                                                new ClarinFCSEndpointDescriptionParser())
185                                .enableLegacySupport()
186                                .buildThreadedClient();
187
188                SRUThreadedClient sruSearchClient
189                                = new ClarinFCSClientBuilder()
190                                .setConnectTimeout(params.ENDPOINTS_SEARCH_TIMEOUT_MS)
191                                .setSocketTimeout(params.ENDPOINTS_SEARCH_TIMEOUT_MS)
192                                .addDefaultDataViewParsers()
193                                .registerExtraResponseDataParser(
194                                                new ClarinFCSEndpointDescriptionParser())
195                                .enableLegacySupport()
196                                .buildThreadedClient();
197
198                maxScanConcurrentRequestsCallback = new MaxConcurrentRequestsCallback() {
199                        @Override
200                        public int getMaxConcurrentRequest(URI baseURI) {
201                                return params.SCAN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT;
202                        }
203                };
204
205                maxSearchConcurrentRequestsCallback = new MaxConcurrentRequestsCallback() {
206                        @Override
207                        public int getMaxConcurrentRequest(URI baseURI) {
208                                return params.slowEndpoints.contains(baseURI)
209                                                ? params.SEARCH_MAX_CONCURRENT_REQUESTS_PER_SLOW_ENDPOINT
210                                                : params.SEARCH_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT;
211                        }
212                };
213
214                sruClient = new ThrottledClient(
215                                sruScanClient, maxScanConcurrentRequestsCallback,
216                                sruSearchClient, maxSearchConcurrentRequestsCallback
217                );
218
219                File corporaCacheFile = new File(params.AGGREGATOR_FILE_PATH);
220                File corporaOldCacheFile = new File(params.AGGREGATOR_FILE_PATH_BACKUP);
221
222                // init corpora from file
223                {
224                        Corpora corpora = null;
225                        try {
226                                corpora = new ObjectMapper().readValue(corporaCacheFile, Corpora.class);
227                        } catch (Exception xc) {
228                                log.error("Failed to load cached corpora from primary file:", xc);
229                        }
230                        if (corpora == null) {
231                                try {
232                                        corpora = new ObjectMapper().readValue(corporaOldCacheFile, Corpora.class);
233                                } catch (Exception e) {
234                                        log.error("Failed to load cached corpora from backup file:", e);
235                                }
236                        }
237                        if (corpora != null) {
238                                scanCacheAtom.set(corpora);
239                                log.info("corpus list read from file; number of root corpora: " + scanCacheAtom.get().getCorpora().size());
240                        }
241                }
242
243                LanguagesISO693.getInstance(); // force init
244                initLanguageDetector();
245
246                ScanCrawlTask task = new ScanCrawlTask(sruClient,
247                                params.CENTER_REGISTRY_URL, params.SCAN_MAX_DEPTH,
248                                params.additionalCQLEndpoints,
249                                null, scanCacheAtom, corporaCacheFile, corporaOldCacheFile,
250                                scanStatsAtom, searchStatsAtom);
251                scheduler.scheduleAtFixedRate(task, params.SCAN_TASK_INITIAL_DELAY,
252                                params.SCAN_TASK_INTERVAL, params.getScanTaskTimeUnit());
253
254                log.info("Aggregator initialization finished.");
255        }
256
257        public void shutdown(AggregatorConfiguration config) {
258                log.info("Aggregator is shutting down.");
259                for (Search search : activeSearches.values()) {
260                        search.shutdown();
261                }
262                shutdownAndAwaitTermination(config.aggregatorParams, sruClient, scheduler);
263                log.info("Aggregator shutdown complete.");
264        }
265
266        // this function should be thread-safe
267        public Search startSearch(SRUVersion version, List<Corpus> corpora,
268                        String searchString, String searchLang,
269                        int firstRecord, int maxRecords) throws Exception {
270                if (corpora.isEmpty()) {
271                        // No corpora
272                        return null;
273                } else if (searchString.isEmpty()) {
274                        // No query
275                        return null;
276                } else {
277                        Search sr = new Search(sruClient,
278                                        version, searchStatsAtom.get(),
279                                        corpora, searchString, searchLang, maxRecords);
280                        if (activeSearches.size() > SEARCHES_SIZE_GC_THRESHOLD) {
281                                List<Long> toBeRemoved = new ArrayList<Long>();
282                                long t0 = System.currentTimeMillis();
283                                for (Map.Entry<Long, Search> e : activeSearches.entrySet()) {
284                                        long dtmin = (t0 - e.getValue().getCreatedAt()) / 1000 / 60;
285                                        if (dtmin > SEARCHES_AGE_GC_THRESHOLD) {
286                                                log.info("removing search " + e.getKey() + ": " + dtmin + " minutes old");
287                                                toBeRemoved.add(e.getKey());
288                                        }
289                                }
290                                for (Long l : toBeRemoved) {
291                                        activeSearches.remove(l);
292                                }
293                        }
294                        activeSearches.put(sr.getId(), sr);
295                        return sr;
296                }
297        }
298
299        public Search getSearchById(Long id) {
300                return activeSearches.get(id);
301        }
302
303        private static void shutdownAndAwaitTermination(AggregatorConfiguration.Params params,
304                        ThrottledClient sruClient, ExecutorService scheduler) {
305                try {
306                        sruClient.shutdown();
307                        scheduler.shutdown();
308                        Thread.sleep(params.EXECUTOR_SHUTDOWN_TIMEOUT_MS);
309                        sruClient.shutdownNow();
310                        scheduler.shutdownNow();
311                        Thread.sleep(params.EXECUTOR_SHUTDOWN_TIMEOUT_MS);
312                } catch (InterruptedException ie) {
313                        sruClient.shutdownNow();
314                        scheduler.shutdownNow();
315                        Thread.currentThread().interrupt();
316                }
317        }
318
319        public void initLanguageDetector() throws IOException {
320                List<LanguageProfile> languageProfiles = new LanguageProfileReader().readAll();
321                languageDetector = LanguageDetectorBuilder
322                                .create(NgramExtractors.standard())
323                                .withProfiles(languageProfiles)
324                                .build();
325
326                textObjectFactory = CommonTextObjectFactories.forDetectingOnLargeText();
327        }
328
329        public String detectLanguage(String text) {
330                return languageDetector.detect(textObjectFactory.forText(text)).orNull();
331        }
332}
Note: See TracBrowser for help on using the repository browser.