wip: tidy up threaded indexing

This commit is contained in:
Marc Cataford 2020-09-28 11:12:56 -04:00
parent eaf669aa85
commit e82bc7490b
4 changed files with 34 additions and 33 deletions

View file

@ -5,5 +5,6 @@ SETTINGS_KEYS = [
"EXCLUDES", "EXCLUDES",
"FILE_TYPES", "FILE_TYPES",
"SIGNIFICANCE_THRESHOLD", "SIGNIFICANCE_THRESHOLD",
"INDEXING_PROCESSES",
] ]
QUERY_STRING_LENGTH = 1024 QUERY_STRING_LENGTH = 1024

View file

@ -3,12 +3,13 @@ from pathlib import Path
from typing import Dict, List from typing import Dict, List
import re import re
from time import perf_counter from time import perf_counter
from multiprocessing import Pool, Process, Manager from multiprocessing import Pool
import attr import attr
from settings import settings from settings import settings
from process_utils import chunkify_content
from document_models import Corpus from document_models import Corpus
from trigram_index import TrigramIndex from trigram_index import TrigramIndex
from line_index import LineIndex from line_index import LineIndex
@ -54,7 +55,7 @@ class Indexer(IndexerBase):
logger.info(f"[Discovery] Discovered {len(discovered)} files.") logger.info(f"[Discovery] Discovered {len(discovered)} files.")
self._preload(discovered) self._preload(discovered)
self._async_process(self.corpus.collect_unprocessed_documents()) self._populate_indices(self.corpus.collect_unprocessed_documents())
end_time = perf_counter() end_time = perf_counter()
logger.info( logger.info(
@ -131,44 +132,27 @@ class Indexer(IndexerBase):
logger.exception(e) logger.exception(e)
logger.error(f"Could not read {discovered_file}, skipping.") logger.error(f"Could not read {discovered_file}, skipping.")
# TODO: Tidy up. def _populate_indices(self, uids):
def _async_process(self, uids): processes = settings.INDEXING_PROCESSES
processes = 4 # Settings?
chunk_size = int(len(uids) / processes) # Extract into process utils
chunk_boundary = 0
pool = Pool(processes=processes) pool = Pool(processes=processes)
chunks = chunkify_content(uids, processes)
chunks = [] results = pool.map(self._bulk_process, chunks)
# TODO: Refactor indices to populate cleanly.
for i in range(processes): for result in results:
if i == processes - 1: self._trigram_index._trigrams.update(result[0])
chunks.append(uids[chunk_boundary:]) self._line_index._lines.update(result[1])
else:
chunks.append(uids[chunk_boundary : chunk_boundary + chunk_size])
chunk_boundary += chunk_size
r = pool.map(self._bulk_process, chunks) # is pool the best way?
for rs in r:
self._trigram_index._trigrams.update(rs[0])
self._line_index._lines.update(rs[1])
def _bulk_process(self, uids: List[str]): def _bulk_process(self, uids: List[str]):
for uid in uids: for uid in uids:
self._process(uid)
return (self._trigram_index._trigrams, self._line_index._lines)
def _process(self, uid: str):
document = self.corpus.get_document(uid=uid) document = self.corpus.get_document(uid=uid)
path = document.key path = document.key
content = document.content content = document.content
self._trigram_index.index(path, content) self._trigram_index.index(path, content)
self._line_index.index(path, content) self._line_index.index(path, content)
logger.info(f"[Indexing] Processed {path}") logger.info(f"[Indexing] Processed {path}")
return (self._trigram_index._trigrams, self._line_index._lines)
def _find_closest_line(self, path, index): def _find_closest_line(self, path, index):
content = self._line_index.query(path) content = self._line_index.query(path)

15
src/process_utils.py Normal file
View file

@ -0,0 +1,15 @@
def chunkify_content(content, chunk_count, chunk_size=None):
if chunk_size is None:
chunk_size = int(len(content) / chunk_count)
chunks = []
last_boundary = 0
for i in range(chunk_count):
if i == chunk_count - 1:
chunks.append(content[last_boundary:])
else:
chunks.append(content[last_boundary : last_boundary + chunk_size])
last_boundary += chunk_size
return chunks

View file

@ -14,6 +14,7 @@ default_settings = {
"FILE_TYPES": [], "FILE_TYPES": [],
"SIGNIFICANCE_THRESHOLD": 0, "SIGNIFICANCE_THRESHOLD": 0,
"WATCHED": [], "WATCHED": [],
"INDEXING_PROCESSES": 4,
} }