wip: (dirty) split indexing in processes
This commit is contained in:
parent
4db9d43b02
commit
eaf669aa85
1 changed files with 26 additions and 1 deletions
|
@ -3,6 +3,7 @@ from pathlib import Path
|
|||
from typing import Dict, List
|
||||
import re
|
||||
from time import perf_counter
|
||||
from multiprocessing import Pool, Process, Manager
|
||||
|
||||
import attr
|
||||
|
||||
|
@ -53,7 +54,7 @@ class Indexer(IndexerBase):
|
|||
logger.info(f"[Discovery] Discovered {len(discovered)} files.")
|
||||
|
||||
self._preload(discovered)
|
||||
self._bulk_process(self.corpus.collect_unprocessed_documents())
|
||||
self._async_process(self.corpus.collect_unprocessed_documents())
|
||||
end_time = perf_counter()
|
||||
|
||||
logger.info(
|
||||
|
@ -130,10 +131,34 @@ class Indexer(IndexerBase):
|
|||
logger.exception(e)
|
||||
logger.error(f"Could not read {discovered_file}, skipping.")
|
||||
|
||||
# TODO: Tidy up.
|
||||
def _async_process(self, uids):
|
||||
processes = 4 # Settings?
|
||||
chunk_size = int(len(uids) / processes) # Extract into process utils
|
||||
chunk_boundary = 0
|
||||
pool = Pool(processes=processes)
|
||||
|
||||
chunks = []
|
||||
|
||||
for i in range(processes):
|
||||
if i == processes - 1:
|
||||
chunks.append(uids[chunk_boundary:])
|
||||
else:
|
||||
chunks.append(uids[chunk_boundary : chunk_boundary + chunk_size])
|
||||
|
||||
chunk_boundary += chunk_size
|
||||
|
||||
r = pool.map(self._bulk_process, chunks) # is pool the best way?
|
||||
for rs in r:
|
||||
self._trigram_index._trigrams.update(rs[0])
|
||||
self._line_index._lines.update(rs[1])
|
||||
|
||||
def _bulk_process(self, uids: List[str]):
|
||||
for uid in uids:
|
||||
self._process(uid)
|
||||
|
||||
return (self._trigram_index._trigrams, self._line_index._lines)
|
||||
|
||||
def _process(self, uid: str):
|
||||
document = self.corpus.get_document(uid=uid)
|
||||
path = document.key
|
||||
|
|
Reference in a new issue