RAG

org.llm4s.rag.RAG
See theRAG companion object
final class RAG extends Closeable

High-level RAG (Retrieval-Augmented Generation) pipeline.

Provides a unified interface for:

  • Document ingestion (from files, directories, or raw text)
  • Semantic search with hybrid fusion
  • Answer generation with retrieved context

Attributes

Example
// Create pipeline
val rag = RAG.builder()
 .withEmbeddings(EmbeddingProvider.OpenAI)
 .withChunking(ChunkerFactory.Strategy.Sentence, 800, 150)
 .build()
 .toOption.get
// Ingest documents
rag.ingest("./docs")
// Search
val results = rag.query("What is X?")
// With answer generation (requires LLM client)
val ragWithLLM = RAG.builder()
 .withEmbeddings(EmbeddingProvider.OpenAI)
 .withLLM(llmClient)
 .build()
 .toOption.get
val answer = ragWithLLM.queryWithAnswer("What is X?")
Companion
object
Graph
Supertypes
trait Closeable
trait AutoCloseable
class Object
trait Matchable
class Any

Members list

Value members

Concrete methods

def chunkCount: Int

Number of chunks indexed

Number of chunks indexed

Attributes

def clear(): Result[Unit]

Clear all indexed data.

Clear all indexed data.

Attributes

override def close(): Unit

Close resources.

Close resources.

Attributes

Definition Classes
Closeable -> AutoCloseable
def deleteDocument(docId: String): Result[Unit]

Delete a specific document and its chunks.

Delete a specific document and its chunks.

Value parameters

docId

Document ID to delete

Attributes

Returns

Unit on success

def deleteFromCollection(collectionPath: CollectionPath, documentId: String): Result[Long]

Delete a document from a collection.

Delete a document from a collection.

Value parameters

collectionPath

The collection containing the document

documentId

The document identifier

Attributes

Returns

Number of chunks deleted

def documentCount: Int

Number of documents ingested

Number of documents ingested

Attributes

def hasPermissions: Boolean

Check if permission-based search is available

Check if permission-based search is available

Attributes

def ingest(path: String, metadata: Map[String, String]): Result[Int]

Ingest a document from a file path.

Ingest a document from a file path.

Supports: .txt, .md, .pdf, .docx and other text-like formats.

Value parameters

metadata

Additional metadata to attach to all chunks

path

Path to file or directory

Attributes

Returns

Number of chunks created

def ingest(path: Path): Result[Int]

Ingest a document from a Path.

Ingest a document from a Path.

Attributes

Ingest documents from a DocumentLoader.

Ingest documents from a DocumentLoader.

Value parameters

loader

The document loader to ingest from

Attributes

Returns

Loading statistics with success/failure counts

def ingestAsync(loader: DocumentLoader)(implicit ec: ExecutionContext): Future[Result[LoadStats]]

Async version of ingest with parallel document processing.

Async version of ingest with parallel document processing.

Processes documents in batches with configurable parallelism. Uses the parallelism and batchSize settings from LoadingConfig.

Value parameters

ec

Execution context for async operations

loader

The document loader to ingest from

Attributes

Returns

Future with loading statistics

def ingestBytes(content: Array[Byte], filename: String, documentId: String, metadata: Map[String, String]): Result[Int]

Ingest a document from raw bytes.

Ingest a document from raw bytes.

This enables source-agnostic document ingestion - the same extraction logic works for documents from S3, HTTP responses, databases, etc.

Supported formats: PDF, DOCX, plain text, and anything Apache Tika can handle.

Value parameters

content

Raw document bytes

documentId

Unique identifier for this document

filename

Filename for format detection (e.g., "report.pdf")

metadata

Additional metadata

Attributes

Returns

Number of chunks created

def ingestBytesMultiple(documents: Iterator[(Array[Byte], String, String, Map[String, String])]): Result[LoadStats]

Ingest multiple documents from raw bytes.

Ingest multiple documents from raw bytes.

Value parameters

documents

Iterator of (content, filename, documentId, metadata) tuples

Attributes

Returns

Loading statistics with success/failure counts

def ingestChunks(documentId: String, chunks: Seq[String], metadata: Map[String, String]): Result[Int]

Ingest pre-chunked content (for advanced use cases).

Ingest pre-chunked content (for advanced use cases).

Attributes

def ingestPath(path: Path, metadata: Map[String, String]): Result[Int]

Ingest a document from a Path with metadata.

Ingest a document from a Path with metadata.

Attributes

def ingestText(content: String, documentId: String, metadata: Map[String, String]): Result[Int]

Ingest raw text content.

Ingest raw text content.

Value parameters

content

The text content to ingest

documentId

Unique identifier for this document

metadata

Additional metadata

Attributes

Returns

Number of chunks created

def ingestWithPermissions(collectionPath: CollectionPath, documentId: String, content: String, metadata: Map[String, String], readableBy: Set[PrincipalId]): Result[Int]

Ingest a document into a specific collection with permission control.

Ingest a document into a specific collection with permission control.

Requires a SearchIndex to be configured via .withSearchIndex().

Value parameters

collectionPath

Target collection (must be a leaf collection)

content

Document text content

documentId

Unique document identifier

metadata

Additional document metadata

readableBy

Principal IDs that can read this document (empty = inherit from collection)

Attributes

Returns

Number of chunks indexed

def needsUpdate(doc: Document): Result[Boolean]

Check if a document needs updating based on version.

Check if a document needs updating based on version.

Value parameters

doc

Document to check

Attributes

Returns

true if document is new or changed

def query(query: String, topK: Option[Int]): Result[Seq[RAGSearchResult]]

Search for relevant chunks.

Search for relevant chunks.

Value parameters

query

The search query

topK

Override default topK (optional)

Attributes

Returns

Ranked search results

def queryWithAnswer(question: String, topK: Option[Int]): Result[RAGAnswerResult]

Search and generate an answer using LLM.

Search and generate an answer using LLM.

Requires an LLM client to be configured.

Value parameters

question

The question to answer

topK

Override default topK (optional)

Attributes

Returns

Answer with supporting contexts

def queryWithPermissions(auth: UserAuthorization, collectionPattern: CollectionPattern, queryText: String, topK: Option[Int]): Result[Seq[RAGSearchResult]]

Search with permission filtering.

Search with permission filtering.

Requires a SearchIndex to be configured via .withSearchIndex(). Results are filtered based on user authorization:

  • Collection-level: Only search collections the user can access
  • Document-level: Only return documents the user can read

Value parameters

auth

User authorization context with principal IDs

collectionPattern

Pattern to filter collections (e.g., "confluence" for all descendants)

queryText

The search query

topK

Maximum number of results (defaults to config.topK)

Attributes

Returns

Permission-filtered search results

def queryWithPermissionsAndAnswer(auth: UserAuthorization, collectionPattern: CollectionPattern, question: String, topK: Option[Int]): Result[RAGAnswerResult]

Search with permissions and generate an answer using LLM.

Search with permissions and generate an answer using LLM.

Requires both a SearchIndex and LLM client to be configured.

Value parameters

auth

User authorization context

collectionPattern

Pattern to filter collections

question

The question to answer

topK

Maximum number of results

Attributes

Returns

Answer with permission-filtered supporting contexts

Full refresh - re-process all documents.

Full refresh - re-process all documents.

Clears the registry and re-ingests all documents. Use this when you want to ensure a clean slate.

Value parameters

loader

The document loader to refresh from

Attributes

Returns

Sync statistics (all as "added")

def refreshAsync(loader: DocumentLoader)(implicit ec: ExecutionContext): Future[Result[SyncStats]]

Async full refresh.

Async full refresh.

Clears all data and re-ingests from the loader with parallel processing.

Value parameters

ec

Execution context for async operations

loader

The document loader to refresh from

Attributes

Returns

Future with sync statistics

def searchIndex: Option[SearchIndex]

Access the permission-based search index (if configured)

Access the permission-based search index (if configured)

Attributes

Get store statistics

Get store statistics

Attributes

Sync with a loader - only process changed documents.

Sync with a loader - only process changed documents.

Compares document versions to detect:

  • New documents (added)
  • Changed documents (updated - old chunks removed, new chunks added)
  • Deleted documents (removed from source)
  • Unchanged documents (skipped)

Value parameters

loader

The document loader to sync with

Attributes

Returns

Sync statistics

def syncAsync(loader: DocumentLoader)(implicit ec: ExecutionContext): Future[Result[SyncStats]]

Async sync with parallel change detection.

Async sync with parallel change detection.

Performs change detection in parallel, but applies updates sequentially to avoid conflicts in the vector store.

Value parameters

ec

Execution context for async operations

loader

The document loader to sync with

Attributes

Returns

Future with sync statistics

Concrete fields