Skip to content

AI DocGen

Optional LLM-based docstring generation, available via the lcp[ai] install extra. See the AI DocGen guide and architecture for the bigger picture.

lcp.ai.agent

DocGenAgent - orchestrator for AI documentation generation.

DocGenAgent

Agent that generates docstrings for undocumented Python symbols.

Parameters:

Name Type Description Default
provider LLMProvider

LLM provider to use for generating docstrings.

required
config DocGenConfig | None

Configuration for the generation run.

None
Source code in src/lcp/ai/agent.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
class DocGenAgent:
    """Agent that generates docstrings for undocumented Python symbols.

    Args:
        provider: LLM provider to use for generating docstrings.
        config: Configuration for the generation run.
    """

    def __init__(
        self,
        provider: LLMProvider,
        config: DocGenConfig | None = None,
    ) -> None:
        self._provider = provider
        self._config = config or DocGenConfig()

    def run(self, coverage_input: str | dict) -> DocGenResult:
        """Execute the documentation generation.

        Args:
            coverage_input: Path to coverage JSON file or parsed dict.

        Returns:
            DocGenResult with statistics and per-symbol results.
        """
        coverage_data = self._load_coverage(coverage_input)
        undocumented = coverage_data.get("undocumented", [])

        # Filter by kinds if configured
        symbols = self._filter_symbols(undocumented)

        if not symbols:
            return DocGenResult(
                symbols_processed=0,
                symbols_updated=0,
                symbols_skipped=0,
                symbols_failed=0,
                total_usage=TokenUsage(),
                results=[],
            )

        # Group symbols by source_file
        by_file: dict[str, list[tuple[int, dict]]] = defaultdict(list)
        no_file: list[tuple[int, dict]] = []

        for idx, sym in enumerate(symbols):
            source_file = sym.get("source_file")
            if source_file:
                by_file[source_file].append((idx, sym))
            else:
                no_file.append((idx, sym))

        # Process symbols and collect results
        all_results: list[SymbolResult] = [None] * len(symbols)  # type: ignore[list-item]
        total_usage = TokenUsage()

        # Process each file group
        for source_file, file_symbols in by_file.items():
            file_injections: list[tuple[int, str, str, str, SymbolResult]] = []

            for idx, sym in file_symbols:
                result = self._process_symbol(sym, source_file)
                all_results[idx] = result
                if result.usage:
                    total_usage = total_usage + result.usage

                if result.status in ("updated", "dry_run") and result.docstring:
                    file_injections.append(
                        (idx, sym.get("kind", ""), sym.get("entity", ""), result.docstring, result)
                    )

            # Batch write docstrings for this file
            if file_injections and not self._config.dry_run:
                injections = [
                    (kind, entity, docstring)
                    for _, kind, entity, docstring, _ in file_injections
                ]
                write_results = inject_docstrings_batch(source_file, injections)

                for i, success in enumerate(write_results):
                    _, _, _, _, sym_result = file_injections[i]
                    if not success:
                        sym_result.status = "skipped"

        # Process symbols without source files
        for idx, sym in no_file:
            result = SymbolResult(
                symbol_id=f"{sym.get('module', '')}:{sym.get('entity', '')}",
                kind=sym.get("kind", ""),
                source_file=None,
                status="skipped",
                error="No source file available",
            )
            all_results[idx] = result

        # Aggregate stats
        updated = sum(1 for r in all_results if r.status == "updated")
        skipped = sum(1 for r in all_results if r.status == "skipped")
        failed = sum(1 for r in all_results if r.status == "failed")
        dry_run_count = sum(1 for r in all_results if r.status == "dry_run")

        return DocGenResult(
            symbols_processed=len(symbols),
            symbols_updated=updated + dry_run_count,
            symbols_skipped=skipped,
            symbols_failed=failed,
            total_usage=total_usage,
            results=all_results,
        )

    def _load_coverage(self, coverage_input: str | dict) -> dict:
        """Load coverage data from a file path or dict."""
        if isinstance(coverage_input, dict):
            return coverage_input

        path = Path(coverage_input)
        with open(path, "r", encoding="utf-8") as f:
            return json.load(f)

    def _filter_symbols(self, undocumented: list[dict]) -> list[dict]:
        """Filter symbols based on config.kinds."""
        if not self._config.kinds:
            return undocumented

        return [s for s in undocumented if s.get("kind") in self._config.kinds]

    def _read_source_context(
        self, source_file: str, kind: str, entity: str
    ) -> str:
        """Read the source code context for a symbol.

        Args:
            source_file: Path to the source file.
            kind: Symbol kind.
            entity: Entity name.

        Returns:
            Source code string for the symbol.
        """
        try:
            source = Path(source_file).read_text(encoding="utf-8")
        except (OSError, UnicodeDecodeError):
            return ""

        try:
            tree = ast.parse(source)
        except SyntaxError:
            return ""

        source_lines = source.splitlines()

        if kind == "module":
            # Return first ~30 lines for module context
            return "\n".join(source_lines[:30])

        # Find the node
        node = self._find_source_node(tree, kind, entity)
        if node is None:
            return ""

        start = node.lineno - 1
        end = getattr(node, "end_lineno", start + 1)
        # Limit to 50 lines of context
        end = min(end, start + 50)

        return "\n".join(source_lines[start:end])

    def _find_source_node(
        self, tree: ast.Module, kind: str, entity: str
    ) -> ast.AST | None:
        """Find an AST node by kind and entity name."""
        if "#" in entity:
            class_name, method_name = entity.split("#", 1)
            for node in ast.walk(tree):
                if isinstance(node, ast.ClassDef) and node.name == class_name:
                    for item in node.body:
                        if isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)):
                            if item.name == method_name:
                                return item
            return None

        for node in ast.iter_child_nodes(tree):
            if kind == "class" and isinstance(node, ast.ClassDef) and node.name == entity:
                return node
            if kind == "function" and isinstance(
                node, (ast.FunctionDef, ast.AsyncFunctionDef)
            ):
                if node.name == entity:
                    return node

        return None

    def _generate_docstring(
        self, symbol: dict, source_context: str
    ) -> tuple[str, TokenUsage]:
        """Call the LLM to generate a docstring.

        Args:
            symbol: Symbol dict from coverage data.
            source_context: Source code context.

        Returns:
            Tuple of (docstring_text, token_usage).
        """
        system = build_system_prompt(
            docstring_style=self._config.docstring_style,
            description=self._config.description,
        )
        prompt = build_user_prompt(
            kind=symbol.get("kind", ""),
            module=symbol.get("module", ""),
            entity=symbol.get("entity", ""),
            source_context=source_context,
        )

        response = self._provider.generate(system, prompt)
        return response.content.strip(), response.usage

    def _process_symbol(self, symbol: dict, source_file: str) -> SymbolResult:
        """Process a single symbol: read context, generate docstring.

        Args:
            symbol: Symbol dict from coverage data.
            source_file: Path to source file.

        Returns:
            SymbolResult for this symbol.
        """
        symbol_id = f"{symbol.get('module', '')}:{symbol.get('entity', '')}"
        kind = symbol.get("kind", "")

        try:
            source_context = self._read_source_context(
                source_file, kind, symbol.get("entity", "")
            )

            if not source_context:
                return SymbolResult(
                    symbol_id=symbol_id,
                    kind=kind,
                    source_file=source_file,
                    status="skipped",
                    error="Could not read source context",
                )

            docstring, usage = self._generate_docstring(symbol, source_context)

            if not docstring:
                return SymbolResult(
                    symbol_id=symbol_id,
                    kind=kind,
                    source_file=source_file,
                    status="skipped",
                    usage=usage,
                    error="LLM returned empty docstring",
                )

            status = "dry_run" if self._config.dry_run else "updated"

            return SymbolResult(
                symbol_id=symbol_id,
                kind=kind,
                source_file=source_file,
                status=status,
                docstring=docstring,
                usage=usage,
            )

        except Exception as e:
            return SymbolResult(
                symbol_id=symbol_id,
                kind=kind,
                source_file=source_file,
                status="failed",
                error=str(e),
            )

    # ------------------------------------------------------------------
    # Hierarchical async engine
    # ------------------------------------------------------------------

    def run_sync(self, coverage_input: str | dict) -> DocGenResult:
        """Execute documentation generation synchronously.

        Uses hierarchical async engine if config is HierarchicalConfig,
        otherwise falls back to basic sequential processing.

        Args:
            coverage_input: Path to coverage JSON file or parsed dict.

        Returns:
            DocGenResult with statistics and per-symbol results.
        """
        if not isinstance(self._config, HierarchicalConfig):
            return self.run(coverage_input)
        return asyncio.run(self.run_async(coverage_input))

    async def run_async(self, coverage_input: str | dict) -> DocGenResult:
        """Execute hierarchical bottom-up documentation generation.

        Processes symbols level-by-level (leaves first, then classes,
        then modules), using asyncio.gather with a semaphore for
        concurrency control.

        Args:
            coverage_input: Path to coverage JSON file or parsed dict.

        Returns:
            DocGenResult with statistics and per-symbol results.
        """
        coverage_data = self._load_coverage(coverage_input)
        undocumented = coverage_data.get("undocumented", [])

        # Filter by kinds if configured
        symbols = self._filter_symbols(undocumented)

        if not symbols:
            return DocGenResult(
                symbols_processed=0,
                symbols_updated=0,
                symbols_skipped=0,
                symbols_failed=0,
                total_usage=TokenUsage(),
                results=[],
            )

        # Build hierarchy
        trees = build_hierarchy(symbols)

        config = self._config
        assert isinstance(config, HierarchicalConfig)
        sem = asyncio.Semaphore(config.max_workers)

        all_results: list[SymbolResult] = []
        total_usage = TokenUsage()

        # Process level by level: 0 (leaves) -> 1 (classes) -> 2 (modules)
        for level in (0, 1, 2):
            # Collect all pending nodes at this level from all trees
            level_nodes: list[tuple[SymbolNode, ModuleTree]] = []
            for tree in trees.values():
                for node in tree.levels.get(level, []):
                    if node.status == "pending":
                        level_nodes.append((node, tree))

            if not level_nodes:
                continue

            # Process all nodes at this level concurrently
            coros = [
                self._process_node(node, tree, sem)
                for node, tree in level_nodes
            ]
            results = await asyncio.gather(*coros, return_exceptions=True)

            # Collect results and update node status/docstring
            for i, result in enumerate(results):
                node, _tree = level_nodes[i]
                if isinstance(result, Exception):
                    sym_result = SymbolResult(
                        symbol_id=node.symbol_id,
                        kind=node.kind,
                        source_file=node.symbol.get("source_file"),
                        status="failed",
                        error=str(result),
                    )
                    node.status = "failed"
                else:
                    sym_result = result
                    node.status = sym_result.status
                    if sym_result.docstring:
                        node.docstring = sym_result.docstring

                all_results.append(sym_result)
                if sym_result.usage:
                    total_usage = total_usage + sym_result.usage

            # Propagate failures to next level
            self._propagate_failures(trees, level, config.failure_threshold)

        # Collect skipped nodes from propagation (nodes that were pending
        # but got marked as skipped by _propagate_failures)
        for tree in trees.values():
            for level_nodes_list in tree.levels.values():
                for node in level_nodes_list:
                    if node.status == "skipped":
                        # Check if already in results
                        existing_ids = {r.symbol_id for r in all_results}
                        if node.symbol_id not in existing_ids:
                            all_results.append(SymbolResult(
                                symbol_id=node.symbol_id,
                                kind=node.kind,
                                source_file=node.symbol.get("source_file"),
                                status="skipped",
                                error="Skipped due to child failure propagation",
                            ))

        # Batch write results if not dry_run
        if not config.dry_run:
            self._write_results(all_results)

        # Aggregate stats
        updated = sum(1 for r in all_results if r.status == "updated")
        skipped = sum(1 for r in all_results if r.status == "skipped")
        failed = sum(1 for r in all_results if r.status == "failed")
        dry_run_count = sum(1 for r in all_results if r.status == "dry_run")

        return DocGenResult(
            symbols_processed=len(all_results),
            symbols_updated=updated + dry_run_count,
            symbols_skipped=skipped,
            symbols_failed=failed,
            total_usage=total_usage,
            results=all_results,
        )

    async def _process_node(
        self,
        node: SymbolNode,
        tree: ModuleTree,
        semaphore: asyncio.Semaphore,
    ) -> SymbolResult:
        """Process a single SymbolNode asynchronously.

        Args:
            node: The symbol node to process.
            tree: The module tree containing this node.
            semaphore: Semaphore for concurrency control.

        Returns:
            SymbolResult for this node.
        """
        async with semaphore:
            context = build_context(node, tree)
            if not context:
                return SymbolResult(
                    symbol_id=node.symbol_id,
                    kind=node.kind,
                    source_file=node.symbol.get("source_file"),
                    status="skipped",
                    error="Could not build context",
                )

            system = build_system_prompt(
                docstring_style=self._config.docstring_style,
                description=self._config.description,
            )
            prompt = build_user_prompt_hierarchical(node, context)
            response = await self._provider.agenerate(system, prompt)

            docstring = response.content.strip()
            if not docstring:
                return SymbolResult(
                    symbol_id=node.symbol_id,
                    kind=node.kind,
                    source_file=node.symbol.get("source_file"),
                    status="skipped",
                    usage=response.usage,
                    error="LLM returned empty docstring",
                )

            status = "dry_run" if self._config.dry_run else "updated"
            return SymbolResult(
                symbol_id=node.symbol_id,
                kind=node.kind,
                source_file=node.symbol.get("source_file"),
                status=status,
                docstring=docstring,
                usage=response.usage,
            )

    def _propagate_failures(
        self,
        trees: dict[str, ModuleTree],
        completed_level: int,
        threshold: float,
    ) -> None:
        """Mark parent nodes as skipped if too many children failed.

        After processing a level, check the next level's nodes. If a
        parent's children have a failure ratio >= threshold, skip it.

        Args:
            trees: All module trees.
            completed_level: The level that was just processed.
            threshold: Failure ratio threshold (0.0 to 1.0).
        """
        next_level = completed_level + 1
        for tree in trees.values():
            for parent in tree.levels.get(next_level, []):
                if parent.status != "pending":
                    continue
                children = parent.children
                if not children:
                    continue
                failed = sum(1 for c in children if c.status == "failed")
                if failed / len(children) >= threshold:
                    parent.status = "skipped"

    def _write_results(self, results: list[SymbolResult]) -> None:
        """Batch write all generated docstrings grouped by file.

        Args:
            results: List of SymbolResult from the async run.
        """
        by_file: dict[str, list[SymbolResult]] = defaultdict(list)
        for r in results:
            if r.status == "updated" and r.docstring and r.source_file:
                by_file[r.source_file].append(r)

        for source_file, file_results in by_file.items():
            injections: list[tuple[str, str, str]] = []
            for r in file_results:
                # Extract entity from symbol_id (format: "module:entity")
                entity = r.symbol_id.split(":", 1)[-1]
                injections.append((r.kind, entity, r.docstring))

            write_results = inject_docstrings_batch(source_file, injections)

            for i, success in enumerate(write_results):
                if not success:
                    file_results[i].status = "skipped"

run

run(coverage_input: str | dict) -> DocGenResult

Execute the documentation generation.

Parameters:

Name Type Description Default
coverage_input str | dict

Path to coverage JSON file or parsed dict.

required

Returns:

Type Description
DocGenResult

DocGenResult with statistics and per-symbol results.

Source code in src/lcp/ai/agent.py
def run(self, coverage_input: str | dict) -> DocGenResult:
    """Execute the documentation generation.

    Args:
        coverage_input: Path to coverage JSON file or parsed dict.

    Returns:
        DocGenResult with statistics and per-symbol results.
    """
    coverage_data = self._load_coverage(coverage_input)
    undocumented = coverage_data.get("undocumented", [])

    # Filter by kinds if configured
    symbols = self._filter_symbols(undocumented)

    if not symbols:
        return DocGenResult(
            symbols_processed=0,
            symbols_updated=0,
            symbols_skipped=0,
            symbols_failed=0,
            total_usage=TokenUsage(),
            results=[],
        )

    # Group symbols by source_file
    by_file: dict[str, list[tuple[int, dict]]] = defaultdict(list)
    no_file: list[tuple[int, dict]] = []

    for idx, sym in enumerate(symbols):
        source_file = sym.get("source_file")
        if source_file:
            by_file[source_file].append((idx, sym))
        else:
            no_file.append((idx, sym))

    # Process symbols and collect results
    all_results: list[SymbolResult] = [None] * len(symbols)  # type: ignore[list-item]
    total_usage = TokenUsage()

    # Process each file group
    for source_file, file_symbols in by_file.items():
        file_injections: list[tuple[int, str, str, str, SymbolResult]] = []

        for idx, sym in file_symbols:
            result = self._process_symbol(sym, source_file)
            all_results[idx] = result
            if result.usage:
                total_usage = total_usage + result.usage

            if result.status in ("updated", "dry_run") and result.docstring:
                file_injections.append(
                    (idx, sym.get("kind", ""), sym.get("entity", ""), result.docstring, result)
                )

        # Batch write docstrings for this file
        if file_injections and not self._config.dry_run:
            injections = [
                (kind, entity, docstring)
                for _, kind, entity, docstring, _ in file_injections
            ]
            write_results = inject_docstrings_batch(source_file, injections)

            for i, success in enumerate(write_results):
                _, _, _, _, sym_result = file_injections[i]
                if not success:
                    sym_result.status = "skipped"

    # Process symbols without source files
    for idx, sym in no_file:
        result = SymbolResult(
            symbol_id=f"{sym.get('module', '')}:{sym.get('entity', '')}",
            kind=sym.get("kind", ""),
            source_file=None,
            status="skipped",
            error="No source file available",
        )
        all_results[idx] = result

    # Aggregate stats
    updated = sum(1 for r in all_results if r.status == "updated")
    skipped = sum(1 for r in all_results if r.status == "skipped")
    failed = sum(1 for r in all_results if r.status == "failed")
    dry_run_count = sum(1 for r in all_results if r.status == "dry_run")

    return DocGenResult(
        symbols_processed=len(symbols),
        symbols_updated=updated + dry_run_count,
        symbols_skipped=skipped,
        symbols_failed=failed,
        total_usage=total_usage,
        results=all_results,
    )

run_sync

run_sync(coverage_input: str | dict) -> DocGenResult

Execute documentation generation synchronously.

Uses hierarchical async engine if config is HierarchicalConfig, otherwise falls back to basic sequential processing.

Parameters:

Name Type Description Default
coverage_input str | dict

Path to coverage JSON file or parsed dict.

required

Returns:

Type Description
DocGenResult

DocGenResult with statistics and per-symbol results.

Source code in src/lcp/ai/agent.py
def run_sync(self, coverage_input: str | dict) -> DocGenResult:
    """Execute documentation generation synchronously.

    Uses hierarchical async engine if config is HierarchicalConfig,
    otherwise falls back to basic sequential processing.

    Args:
        coverage_input: Path to coverage JSON file or parsed dict.

    Returns:
        DocGenResult with statistics and per-symbol results.
    """
    if not isinstance(self._config, HierarchicalConfig):
        return self.run(coverage_input)
    return asyncio.run(self.run_async(coverage_input))

run_async async

run_async(coverage_input: str | dict) -> DocGenResult

Execute hierarchical bottom-up documentation generation.

Processes symbols level-by-level (leaves first, then classes, then modules), using asyncio.gather with a semaphore for concurrency control.

Parameters:

Name Type Description Default
coverage_input str | dict

Path to coverage JSON file or parsed dict.

required

Returns:

Type Description
DocGenResult

DocGenResult with statistics and per-symbol results.

Source code in src/lcp/ai/agent.py
async def run_async(self, coverage_input: str | dict) -> DocGenResult:
    """Execute hierarchical bottom-up documentation generation.

    Processes symbols level-by-level (leaves first, then classes,
    then modules), using asyncio.gather with a semaphore for
    concurrency control.

    Args:
        coverage_input: Path to coverage JSON file or parsed dict.

    Returns:
        DocGenResult with statistics and per-symbol results.
    """
    coverage_data = self._load_coverage(coverage_input)
    undocumented = coverage_data.get("undocumented", [])

    # Filter by kinds if configured
    symbols = self._filter_symbols(undocumented)

    if not symbols:
        return DocGenResult(
            symbols_processed=0,
            symbols_updated=0,
            symbols_skipped=0,
            symbols_failed=0,
            total_usage=TokenUsage(),
            results=[],
        )

    # Build hierarchy
    trees = build_hierarchy(symbols)

    config = self._config
    assert isinstance(config, HierarchicalConfig)
    sem = asyncio.Semaphore(config.max_workers)

    all_results: list[SymbolResult] = []
    total_usage = TokenUsage()

    # Process level by level: 0 (leaves) -> 1 (classes) -> 2 (modules)
    for level in (0, 1, 2):
        # Collect all pending nodes at this level from all trees
        level_nodes: list[tuple[SymbolNode, ModuleTree]] = []
        for tree in trees.values():
            for node in tree.levels.get(level, []):
                if node.status == "pending":
                    level_nodes.append((node, tree))

        if not level_nodes:
            continue

        # Process all nodes at this level concurrently
        coros = [
            self._process_node(node, tree, sem)
            for node, tree in level_nodes
        ]
        results = await asyncio.gather(*coros, return_exceptions=True)

        # Collect results and update node status/docstring
        for i, result in enumerate(results):
            node, _tree = level_nodes[i]
            if isinstance(result, Exception):
                sym_result = SymbolResult(
                    symbol_id=node.symbol_id,
                    kind=node.kind,
                    source_file=node.symbol.get("source_file"),
                    status="failed",
                    error=str(result),
                )
                node.status = "failed"
            else:
                sym_result = result
                node.status = sym_result.status
                if sym_result.docstring:
                    node.docstring = sym_result.docstring

            all_results.append(sym_result)
            if sym_result.usage:
                total_usage = total_usage + sym_result.usage

        # Propagate failures to next level
        self._propagate_failures(trees, level, config.failure_threshold)

    # Collect skipped nodes from propagation (nodes that were pending
    # but got marked as skipped by _propagate_failures)
    for tree in trees.values():
        for level_nodes_list in tree.levels.values():
            for node in level_nodes_list:
                if node.status == "skipped":
                    # Check if already in results
                    existing_ids = {r.symbol_id for r in all_results}
                    if node.symbol_id not in existing_ids:
                        all_results.append(SymbolResult(
                            symbol_id=node.symbol_id,
                            kind=node.kind,
                            source_file=node.symbol.get("source_file"),
                            status="skipped",
                            error="Skipped due to child failure propagation",
                        ))

    # Batch write results if not dry_run
    if not config.dry_run:
        self._write_results(all_results)

    # Aggregate stats
    updated = sum(1 for r in all_results if r.status == "updated")
    skipped = sum(1 for r in all_results if r.status == "skipped")
    failed = sum(1 for r in all_results if r.status == "failed")
    dry_run_count = sum(1 for r in all_results if r.status == "dry_run")

    return DocGenResult(
        symbols_processed=len(all_results),
        symbols_updated=updated + dry_run_count,
        symbols_skipped=skipped,
        symbols_failed=failed,
        total_usage=total_usage,
        results=all_results,
    )

lcp.ai.models

Data models for the AI documentation generation module.

TokenUsage dataclass

Token usage statistics from an LLM call.

Source code in src/lcp/ai/models.py
@dataclass
class TokenUsage:
    """Token usage statistics from an LLM call."""

    input_tokens: int = 0
    output_tokens: int = 0
    cache_tokens: int = 0
    reasoning_tokens: int = 0

    def __add__(self, other: TokenUsage) -> TokenUsage:
        return TokenUsage(
            input_tokens=self.input_tokens + other.input_tokens,
            output_tokens=self.output_tokens + other.output_tokens,
            cache_tokens=self.cache_tokens + other.cache_tokens,
            reasoning_tokens=self.reasoning_tokens + other.reasoning_tokens,
        )

LLMResponse dataclass

Response from an LLM provider.

Source code in src/lcp/ai/models.py
@dataclass
class LLMResponse:
    """Response from an LLM provider."""

    content: str
    usage: TokenUsage

DocGenConfig dataclass

Configuration for documentation generation.

Source code in src/lcp/ai/models.py
@dataclass
class DocGenConfig:
    """Configuration for documentation generation."""

    kinds: list[str] | None = None
    description: str | None = None
    docstring_style: str = "google"
    dry_run: bool = False

HierarchicalConfig dataclass

Bases: DocGenConfig

Configuration for hierarchical documentation generation.

Extends DocGenConfig with parameters controlling the hierarchical bottom-up processing mode and async parallelism.

Source code in src/lcp/ai/models.py
@dataclass
class HierarchicalConfig(DocGenConfig):
    """Configuration for hierarchical documentation generation.

    Extends DocGenConfig with parameters controlling the hierarchical
    bottom-up processing mode and async parallelism.
    """

    max_workers: int = 4
    failure_threshold: float = 0.5

SymbolResult dataclass

Result of processing a single symbol.

Source code in src/lcp/ai/models.py
@dataclass
class SymbolResult:
    """Result of processing a single symbol."""

    symbol_id: str
    kind: str
    source_file: str | None
    status: str  # "updated", "skipped", "failed", "dry_run"
    docstring: str | None = None
    usage: TokenUsage | None = None
    error: str | None = None

DocGenResult dataclass

Result of a documentation generation run.

Source code in src/lcp/ai/models.py
@dataclass
class DocGenResult:
    """Result of a documentation generation run."""

    symbols_processed: int
    symbols_updated: int
    symbols_skipped: int
    symbols_failed: int
    total_usage: TokenUsage
    results: list[SymbolResult] = field(default_factory=list)

lcp.ai.hierarchy

Hierarchy builder for bottom-up documentation generation.

SymbolNode dataclass

A node in the hierarchical symbol tree.

Source code in src/lcp/ai/hierarchy.py
@dataclass
class SymbolNode:
    """A node in the hierarchical symbol tree."""

    symbol: dict
    kind: str
    level: int
    children: list[SymbolNode] = field(default_factory=list)
    docstring: str | None = None
    status: str = "pending"

    @property
    def symbol_id(self) -> str:
        return f"{self.symbol.get('module', '')}:{self.symbol.get('entity', '')}"

    @property
    def entity(self) -> str:
        return self.symbol.get("entity", "")

    @property
    def module(self) -> str:
        return self.symbol.get("module", "")

ModuleTree dataclass

Hierarchical tree for a single module.

Source code in src/lcp/ai/hierarchy.py
@dataclass
class ModuleTree:
    """Hierarchical tree for a single module."""

    module_name: str
    source_file: str
    root: SymbolNode
    levels: dict[int, list[SymbolNode]] = field(default_factory=dict)

build_hierarchy

build_hierarchy(undocumented: list[dict]) -> dict[str, ModuleTree]

Build a hierarchy of symbol trees from a flat list of undocumented symbols.

Groups symbols by module, identifies parent-child relationships, and organizes them into levels for bottom-up processing.

Parameters:

Name Type Description Default
undocumented list[dict]

List of symbol dicts from coverage JSON.

required

Returns:

Type Description
dict[str, ModuleTree]

Dict mapping module name to ModuleTree.

Source code in src/lcp/ai/hierarchy.py
def build_hierarchy(undocumented: list[dict]) -> dict[str, ModuleTree]:
    """Build a hierarchy of symbol trees from a flat list of undocumented symbols.

    Groups symbols by module, identifies parent-child relationships,
    and organizes them into levels for bottom-up processing.

    Args:
        undocumented: List of symbol dicts from coverage JSON.

    Returns:
        Dict mapping module name to ModuleTree.
    """
    # Filter out symbols without source files
    with_source = [s for s in undocumented if s.get("source_file")]

    # Group by module
    by_module: dict[str, list[dict]] = defaultdict(list)
    for sym in with_source:
        by_module[sym["module"]].append(sym)

    trees: dict[str, ModuleTree] = {}

    for module_name, symbols in by_module.items():
        tree = _build_module_tree(module_name, symbols)
        if tree is not None:
            trees[module_name] = tree

    return trees

build_context

build_context(node: SymbolNode, tree: ModuleTree) -> str

Build the LLM context for a symbol based on its level.

Level 0 (functions/methods): source code of the symbol. Level 1 (classes): class structure + full docstrings of children. Level 2 (modules): top-of-file + summary lines of children.

Parameters:

Name Type Description Default
node SymbolNode

The symbol node to build context for.

required
tree ModuleTree

The module tree containing this node.

required

Returns:

Type Description
str

Context string to pass to the LLM prompt.

Source code in src/lcp/ai/hierarchy.py
def build_context(node: SymbolNode, tree: ModuleTree) -> str:
    """Build the LLM context for a symbol based on its level.

    Level 0 (functions/methods): source code of the symbol.
    Level 1 (classes): class structure + full docstrings of children.
    Level 2 (modules): top-of-file + summary lines of children.

    Args:
        node: The symbol node to build context for.
        tree: The module tree containing this node.

    Returns:
        Context string to pass to the LLM prompt.
    """
    source = _read_file_source(tree.source_file)
    if not source:
        return ""

    ast_tree = _parse_ast(source)
    if ast_tree is None:
        return ""

    source_lines = source.splitlines()

    if node.level == LEVEL_LEAF:
        return _build_leaf_context(node, ast_tree, source_lines)
    elif node.level == LEVEL_CLASS:
        return _build_class_context(node, ast_tree, source_lines)
    elif node.level == LEVEL_MODULE:
        return _build_module_context(node, ast_tree, source_lines)

    return ""

lcp.ai.provider

Abstract base class for LLM providers.

LLMProvider

Bases: ABC

Abstract base class for LLM connectors.

Source code in src/lcp/ai/provider.py
class LLMProvider(ABC):
    """Abstract base class for LLM connectors."""

    @abstractmethod
    def generate(self, system: str, prompt: str) -> LLMResponse:
        """Generate text from the LLM (synchronous)."""

    @abstractmethod
    async def agenerate(self, system: str, prompt: str) -> LLMResponse:
        """Generate text from the LLM (async)."""

    @property
    @abstractmethod
    def name(self) -> str:
        """Name of the provider."""

name abstractmethod property

name: str

Name of the provider.

generate abstractmethod

generate(system: str, prompt: str) -> LLMResponse

Generate text from the LLM (synchronous).

Source code in src/lcp/ai/provider.py
@abstractmethod
def generate(self, system: str, prompt: str) -> LLMResponse:
    """Generate text from the LLM (synchronous)."""

agenerate abstractmethod async

agenerate(system: str, prompt: str) -> LLMResponse

Generate text from the LLM (async).

Source code in src/lcp/ai/provider.py
@abstractmethod
async def agenerate(self, system: str, prompt: str) -> LLMResponse:
    """Generate text from the LLM (async)."""