Core Module API
The ign_lidar.core
module provides the main processing classes for the IGN LiDAR HD v2.0+ unified pipeline.
Overview​
The core module contains:
LiDARProcessor
- Main processing orchestratorTileStitcher
- Multi-tile stitching functionalityBoundaryHandler
- Boundary-aware feature computationPipelineManager
- Pipeline state and configuration management
LiDARProcessor​
Main class for processing LiDAR tiles through the unified pipeline.
Class Definition​
from ign_lidar.core.processor import LiDARProcessor
from omegaconf import DictConfig
class LiDARProcessor:
"""
Unified LiDAR processing pipeline.
Handles RAW LAZ → Enriched LAZ → Patches in a single workflow.
"""
def __init__(
self,
config: DictConfig,
gpu_enabled: bool = False,
verbose: bool = True
):
"""
Initialize processor with configuration.
Args:
config: Hydra configuration object
gpu_enabled: Enable GPU acceleration
verbose: Print progress information
"""
Methods​
process_tile()
​
Process a single LAZ tile through the complete pipeline.
def process_tile(
self,
input_path: str,
output_path: str,
save_enriched: bool = False,
enriched_path: Optional[str] = None
) -> ProcessingResult:
"""
Process single tile: RAW LAZ → Patches.
Args:
input_path: Path to input RAW LAZ file
output_path: Path for output patches LAZ
save_enriched: Whether to save enriched LAZ
enriched_path: Path for enriched LAZ (if save_enriched=True)
Returns:
ProcessingResult with statistics
Raises:
FileNotFoundError: Input file doesn't exist
ValueError: Invalid LAZ format
RuntimeError: Processing error
"""
Example:
from ign_lidar.core.processor import LiDARProcessor
from omegaconf import OmegaConf
# Load configuration
config = OmegaConf.load("config.yaml")
# Create processor
processor = LiDARProcessor(config, gpu_enabled=True)
# Process tile
result = processor.process_tile(
input_path="data/raw/tile_001.laz",
output_path="data/output/tile_001.laz"
)
print(f"Processed {result.num_points:,} points")
print(f"Extracted {result.num_patches} patches")
print(f"Time: {result.elapsed_time:.2f}s")
process_directory()
​
Process all LAZ files in a directory.
def process_directory(
self,
input_dir: str,
output_dir: str,
pattern: str = "*.laz",
parallel: bool = True,
max_workers: int = 4
) -> List[ProcessingResult]:
"""
Process all LAZ files in directory.
Args:
input_dir: Directory containing RAW LAZ files
output_dir: Output directory for patches
pattern: File pattern (glob) to match
parallel: Process files in parallel
max_workers: Number of parallel workers
Returns:
List of ProcessingResult objects
"""
Example:
# Process entire directory
results = processor.process_directory(
input_dir="data/raw",
output_dir="data/output",
pattern="*.laz",
parallel=True,
max_workers=4
)
# Summary
total_points = sum(r.num_points for r in results)
total_time = sum(r.elapsed_time for r in results)
print(f"Processed {len(results)} tiles")
print(f"Total points: {total_points:,}")
print(f"Total time: {total_time:.2f}s")
process_with_stitching()
​
Process multiple tiles with stitching.
def process_with_stitching(
self,
input_paths: List[str],
output_path: str,
stitch_config: Optional[DictConfig] = None
) -> ProcessingResult:
"""
Process and stitch multiple tiles.
Args:
input_paths: List of input LAZ file paths
output_path: Output path for stitched result
stitch_config: Optional stitching configuration
Returns:
ProcessingResult for stitched output
"""
Example:
# Stitch 3x3 grid of tiles
tile_paths = [
"data/raw/tile_001.laz",
"data/raw/tile_002.laz",
"data/raw/tile_003.laz",
# ... more tiles
]
result = processor.process_with_stitching(
input_paths=tile_paths,
output_path="data/output/stitched.laz"
)
Properties​
@property
def config(self) -> DictConfig:
"""Get current configuration."""
@property
def gpu_available(self) -> bool:
"""Check if GPU is available."""
@property
def stats(self) -> Dict[str, Any]:
"""Get processing statistics."""
ProcessingResult​
Result object returned by processing methods.
Class Definition​
from dataclasses import dataclass
from typing import Dict, Any
@dataclass
class ProcessingResult:
"""Results from processing operation."""
input_path: str # Input file path
output_path: str # Output file path
num_points: int # Number of points processed
num_patches: int # Number of patches extracted
elapsed_time: float # Processing time (seconds)
memory_peak: float # Peak memory usage (MB)
gpu_used: bool # Whether GPU was used
features_computed: List[str] # List of computed features
metadata: Dict[str, Any] # Additional metadata
success: bool # Whether processing succeeded
error: Optional[str] # Error message (if failed)
Example Usage​
result = processor.process_tile(
input_path="tile.laz",
output_path="output.laz"
)
if result.success:
print(f"✓ Success: {result.num_patches} patches")
print(f" Time: {result.elapsed_time:.2f}s")
print(f" Memory: {result.memory_peak:.1f} MB")
print(f" GPU: {'Yes' if result.gpu_used else 'No'}")
else:
print(f"✗ Failed: {result.error}")
TileStitcher​
Handle stitching of multiple LiDAR tiles.
Class Definition​
from ign_lidar.core.stitcher import TileStitcher
class TileStitcher:
"""
Stitch multiple LiDAR tiles into combined output.
"""
def __init__(
self,
config: DictConfig,
buffer_size: float = 10.0,
remove_duplicates: bool = True
):
"""
Initialize tile stitcher.
Args:
config: Hydra configuration
buffer_size: Buffer overlap size (meters)
remove_duplicates: Remove duplicate points in overlap
"""
Methods​
stitch_tiles()
​
def stitch_tiles(
self,
tile_paths: List[str],
output_path: str,
pattern: Optional[str] = None
) -> StitchingResult:
"""
Stitch multiple tiles together.
Args:
tile_paths: List of LAZ file paths to stitch
output_path: Output path for stitched result
pattern: Optional grid pattern (e.g., "3x3")
Returns:
StitchingResult with statistics
"""
Example:
from ign_lidar.core.stitcher import TileStitcher
stitcher = TileStitcher(
config=config,
buffer_size=10.0,
remove_duplicates=True
)
result = stitcher.stitch_tiles(
tile_paths=["tile_001.laz", "tile_002.laz"],
output_path="stitched.laz",
pattern="2x1"
)
print(f"Stitched {result.num_tiles} tiles")
print(f"Total points: {result.total_points:,}")
print(f"Duplicates removed: {result.duplicates_removed:,}")
detect_neighbors()
​
def detect_neighbors(
self,
tile_path: str,
all_tiles: List[str],
max_distance: float = 1.0
) -> List[str]:
"""
Detect neighboring tiles for boundary processing.
Args:
tile_path: Path to reference tile
all_tiles: List of all available tiles
max_distance: Maximum distance to consider as neighbor (meters)
Returns:
List of neighboring tile paths
"""
BoundaryHandler​
Handle boundary-aware feature computation across tile edges.
Class Definition​
from ign_lidar.core.boundary import BoundaryHandler
class BoundaryHandler:
"""
Handle boundary-aware feature computation.
"""
def __init__(
self,
buffer_size: float = 10.0,
search_radius: float = 5.0
):
"""
Initialize boundary handler.
Args:
buffer_size: Buffer zone size at tile edges (meters)
search_radius: Radius for neighbor search (meters)
"""
Methods​
extract_boundary_buffer()
​
def extract_boundary_buffer(
self,
points: np.ndarray,
bounds: Tuple[float, float, float, float]
) -> Tuple[np.ndarray, np.ndarray]:
"""
Extract points in boundary buffer zone.
Args:
points: Point cloud array (N, 3+)
bounds: Tile bounds (xmin, ymin, xmax, ymax)
Returns:
Tuple of (interior_points, boundary_points)
"""
compute_with_neighbors()
​
def compute_with_neighbors(
self,
tile_points: np.ndarray,
neighbor_points: List[np.ndarray],
feature_func: Callable
) -> np.ndarray:
"""
Compute features using cross-tile neighbors.
Args:
tile_points: Points from current tile
neighbor_points: Points from neighboring tiles
feature_func: Function to compute features
Returns:
Feature values for tile_points
"""
Example:
from ign_lidar.core.boundary import BoundaryHandler
from ign_lidar.features.geometric import compute_planarity
handler = BoundaryHandler(
buffer_size=10.0,
search_radius=5.0
)
# Load tile and neighbors
tile_pc = load_point_cloud("tile_001.laz")
neighbors = [
load_point_cloud("tile_002.laz"),
load_point_cloud("tile_003.laz")
]
# Compute features with boundary handling
features = handler.compute_with_neighbors(
tile_points=tile_pc,
neighbor_points=neighbors,
feature_func=compute_planarity
)
PipelineManager​
Manage pipeline state and configuration.
Class Definition​
from ign_lidar.core.pipeline import PipelineManager
class PipelineManager:
"""
Manage pipeline execution and state.
"""
def __init__(self, config: DictConfig):
"""Initialize pipeline manager."""
Methods​
create_processor()
​
def create_processor(
self,
gpu_enabled: bool = False
) -> LiDARProcessor:
"""
Create configured processor instance.
Args:
gpu_enabled: Enable GPU acceleration
Returns:
LiDARProcessor instance
"""
validate_config()
​
def validate_config(self) -> bool:
"""
Validate pipeline configuration.
Returns:
True if configuration is valid
Raises:
ValueError: Invalid configuration
"""
Complete Example​
Basic Processing Workflow​
from ign_lidar.core.processor import LiDARProcessor
from omegaconf import OmegaConf
from pathlib import Path
# 1. Load configuration
config = OmegaConf.load("config.yaml")
# 2. Create processor
processor = LiDARProcessor(
config=config,
gpu_enabled=True,
verbose=True
)
# 3. Process directory
input_dir = Path("data/raw")
output_dir = Path("data/output")
output_dir.mkdir(exist_ok=True)
results = processor.process_directory(
input_dir=str(input_dir),
output_dir=str(output_dir),
parallel=True,
max_workers=4
)
# 4. Report results
successful = [r for r in results if r.success]
failed = [r for r in results if not r.success]
print(f"\n{'='*50}")
print(f"Processing Complete")
print(f"{'='*50}")
print(f"Total files: {len(results)}")
print(f"Successful: {len(successful)}")
print(f"Failed: {len(failed)}")
if successful:
total_points = sum(r.num_points for r in successful)
total_patches = sum(r.num_patches for r in successful)
total_time = sum(r.elapsed_time for r in successful)
print(f"\nStatistics:")
print(f" Total points: {total_points:,}")
print(f" Total patches: {total_patches:,}")
print(f" Total time: {total_time:.2f}s")
print(f" Avg time/tile: {total_time/len(successful):.2f}s")
Advanced: Custom Pipeline​
from ign_lidar.core.processor import LiDARProcessor
from ign_lidar.core.boundary import BoundaryHandler
from ign_lidar.preprocessing.enrichment import enrich_point_cloud
from ign_lidar.features.geometric import compute_features
# Custom processing with boundary handling
processor = LiDARProcessor(config)
boundary_handler = BoundaryHandler(buffer_size=10.0)
# Load tiles
tile = load_tile("tile_001.laz")
neighbors = load_neighbors(["tile_002.laz", "tile_003.laz"])
# Enrich with boundary awareness
enriched = enrich_point_cloud(tile, config.preprocessing)
# Compute features using neighbors
features = boundary_handler.compute_with_neighbors(
tile_points=enriched,
neighbor_points=neighbors,
feature_func=lambda pts: compute_features(pts, config.features)
)
# Extract patches
patches = processor.extract_patches(enriched, features)
# Save output
save_patches(patches, "output.laz")
Related Documentation​
- Unified Pipeline Guide - Workflow overview
- Configuration API - Configuration options
- Preprocessing Guide - Preprocessing functions
- Features Overview - Feature computation
See Also​
- Hydra CLI - Command-line usage
- Tile Stitching - Multi-tile processing
- Boundary-Aware Features - Edge handling
- GPU Acceleration - GPU usage