Files
core/pkg/datastore/sqlite_ingest.go

285 lines
9.3 KiB
Go
Raw Normal View History

2026-04-02 10:57:36 -04:00
package datastore
import (
"context"
"database/sql"
"time"
domain2 "epigas.gitea.cloud/RiskRancher/core/pkg/domain"
)
func (s *SQLiteStore) IngestTickets(ctx context.Context, tickets []domain2.Ticket) error {
tx, err := s.DB.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
_, err = tx.ExecContext(ctx, `
CREATE TEMP TABLE IF NOT EXISTS staging_tickets (
domain TEXT, source TEXT, asset_identifier TEXT, title TEXT,
description TEXT, recommended_remediation TEXT, severity TEXT,
status TEXT, dedupe_hash TEXT
)
`)
if err != nil {
return err
}
tx.ExecContext(ctx, `DELETE FROM staging_tickets`)
stmt, err := tx.PrepareContext(ctx, `
INSERT INTO staging_tickets (domain, source, asset_identifier, title, description, recommended_remediation, severity, status, dedupe_hash)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`)
if err != nil {
return err
}
for _, t := range tickets {
status := t.Status
if status == "" {
status = "Waiting to be Triaged"
}
domain := t.Domain
if domain == "" {
domain = "Vulnerability"
}
source := t.Source
if source == "" {
source = "Manual"
}
_, err = stmt.ExecContext(ctx, domain, source, t.AssetIdentifier, t.Title, t.Description, t.RecommendedRemediation, t.Severity, status, t.DedupeHash)
if err != nil {
stmt.Close()
return err
}
}
stmt.Close()
_, err = tx.ExecContext(ctx, `
INSERT INTO tickets (domain, source, asset_identifier, title, description, recommended_remediation, severity, status, dedupe_hash)
SELECT domain, source, asset_identifier, title, description, recommended_remediation, severity, status, dedupe_hash
FROM staging_tickets
WHERE true -- Prevents SQLite from mistaking 'ON CONFLICT' for a JOIN condition
ON CONFLICT(dedupe_hash) DO UPDATE SET
description = excluded.description,
updated_at = CURRENT_TIMESTAMP
`)
if err != nil {
return err
}
tx.ExecContext(ctx, `DROP TABLE staging_tickets`)
return tx.Commit()
}
func (s *SQLiteStore) GetAdapters(ctx context.Context) ([]domain2.Adapter, error) {
rows, err := s.DB.QueryContext(ctx, "SELECT id, name, source_name, findings_path, mapping_title, mapping_asset, mapping_severity, mapping_description, mapping_remediation FROM data_adapters")
if err != nil {
return nil, err
}
defer rows.Close()
var adapters []domain2.Adapter
for rows.Next() {
var a domain2.Adapter
rows.Scan(&a.ID, &a.Name, &a.SourceName, &a.FindingsPath, &a.MappingTitle, &a.MappingAsset, &a.MappingSeverity, &a.MappingDescription, &a.MappingRemediation)
adapters = append(adapters, a)
}
return adapters, nil
}
func (s *SQLiteStore) SaveAdapter(ctx context.Context, a domain2.Adapter) error {
_, err := s.DB.ExecContext(ctx, `
INSERT INTO data_adapters (name, source_name, findings_path, mapping_title, mapping_asset, mapping_severity, mapping_description, mapping_remediation)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
a.Name, a.SourceName, a.FindingsPath, a.MappingTitle, a.MappingAsset, a.MappingSeverity, a.MappingDescription, a.MappingRemediation)
return err
}
func (s *SQLiteStore) GetAdapterByID(ctx context.Context, id int) (domain2.Adapter, error) {
var a domain2.Adapter
query := `
SELECT
id, name, source_name, findings_path,
mapping_title, mapping_asset, mapping_severity,
IFNULL(mapping_description, ''), IFNULL(mapping_remediation, ''),
created_at, updated_at
FROM data_adapters
WHERE id = ?`
err := s.DB.QueryRowContext(ctx, query, id).Scan(
&a.ID, &a.Name, &a.SourceName, &a.FindingsPath,
&a.MappingTitle, &a.MappingAsset, &a.MappingSeverity,
&a.MappingDescription, &a.MappingRemediation,
&a.CreatedAt, &a.UpdatedAt,
)
return a, err
}
func (s *SQLiteStore) DeleteAdapter(ctx context.Context, id int) error {
_, err := s.DB.ExecContext(ctx, "DELETE FROM data_adapters WHERE id = ?", id)
return err
}
func (s *SQLiteStore) GetAdapterByName(ctx context.Context, name string) (domain2.Adapter, error) {
var a domain2.Adapter
query := `
SELECT
id, name, source_name, findings_path,
mapping_title, mapping_asset, mapping_severity,
IFNULL(mapping_description, ''), IFNULL(mapping_remediation, '')
FROM data_adapters
WHERE name = ?`
err := s.DB.QueryRowContext(ctx, query, name).Scan(
&a.ID, &a.Name, &a.SourceName, &a.FindingsPath,
&a.MappingTitle, &a.MappingAsset, &a.MappingSeverity,
&a.MappingDescription, &a.MappingRemediation,
)
return a, err
}
func (s *SQLiteStore) ProcessIngestionBatch(ctx context.Context, source, asset string, incoming []domain2.Ticket) error {
slaMap, _ := s.buildSLAMap(ctx)
tx, err := s.DB.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
for i := range incoming {
if incoming[i].Domain == "" {
incoming[i].Domain = "Vulnerability"
}
if incoming[i].Status == "" {
incoming[i].Status = "Waiting to be Triaged"
}
}
inserts, reopens, updates, closes, err := s.calculateDiffState(ctx, tx, source, asset, incoming)
if err != nil {
return err
}
if err := s.executeBatchMutations(ctx, tx, source, asset, slaMap, inserts, reopens, updates, closes); err != nil {
return err
}
return tx.Commit()
}
func (s *SQLiteStore) calculateDiffState(ctx context.Context, tx *sql.Tx, source, asset string, incoming []domain2.Ticket) (inserts, reopens, descUpdates []domain2.Ticket, autocloses []string, err error) {
rows, err := tx.QueryContext(ctx, `SELECT dedupe_hash, status, COALESCE(description, '') FROM tickets WHERE source = ? AND asset_identifier = ?`, source, asset)
if err != nil {
return nil, nil, nil, nil, err
}
defer rows.Close()
type existingRecord struct{ status, description string }
existingMap := make(map[string]existingRecord)
for rows.Next() {
var hash, status, desc string
if err := rows.Scan(&hash, &status, &desc); err == nil {
existingMap[hash] = existingRecord{status: status, description: desc}
}
}
incomingMap := make(map[string]bool)
for _, ticket := range incoming {
incomingMap[ticket.DedupeHash] = true
existing, exists := existingMap[ticket.DedupeHash]
if !exists {
inserts = append(inserts, ticket)
} else {
if existing.status == "Patched" {
reopens = append(reopens, ticket)
}
if ticket.Description != "" && ticket.Description != existing.description && existing.status != "Patched" && existing.status != "Risk Accepted" && existing.status != "False Positive" {
descUpdates = append(descUpdates, ticket)
}
}
}
for hash, record := range existingMap {
if !incomingMap[hash] && record.status != "Patched" && record.status != "Risk Accepted" && record.status != "False Positive" {
autocloses = append(autocloses, hash)
}
}
return inserts, reopens, descUpdates, autocloses, nil
}
func (s *SQLiteStore) executeBatchMutations(ctx context.Context, tx *sql.Tx, source, asset string, slaMap map[string]map[string]domain2.SLAPolicy, inserts, reopens, descUpdates []domain2.Ticket, autocloses []string) error {
now := time.Now()
// A. Inserts
if len(inserts) > 0 {
insertStmt, err := tx.PrepareContext(ctx, `INSERT INTO tickets (source, asset_identifier, title, severity, description, status, dedupe_hash, domain, triage_due_date, remediation_due_date) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
if err != nil {
return err
}
defer insertStmt.Close()
for _, t := range inserts {
daysToTriage, daysToRemediate := 3, 30
if dMap, ok := slaMap[t.Domain]; ok {
if policy, ok := dMap[t.Severity]; ok {
daysToTriage, daysToRemediate = policy.DaysToTriage, policy.DaysToRemediate
}
}
_, err := insertStmt.ExecContext(ctx, source, asset, t.Title, t.Severity, t.Description, t.Status, t.DedupeHash, t.Domain, now.AddDate(0, 0, daysToTriage), now.AddDate(0, 0, daysToRemediate))
if err != nil {
return err
}
}
}
if len(reopens) > 0 {
updateStmt, _ := tx.PrepareContext(ctx, `UPDATE tickets SET status = 'Waiting to be Triaged', patched_at = NULL, triage_due_date = ?, remediation_due_date = ? WHERE dedupe_hash = ?`)
defer updateStmt.Close()
for _, t := range reopens {
updateStmt.ExecContext(ctx, now.AddDate(0, 0, 3), now.AddDate(0, 0, 30), t.DedupeHash) // Using default SLAs for fallback
}
}
if len(descUpdates) > 0 {
descStmt, _ := tx.PrepareContext(ctx, `UPDATE tickets SET description = ? WHERE dedupe_hash = ?`)
defer descStmt.Close()
for _, t := range descUpdates {
descStmt.ExecContext(ctx, t.Description, t.DedupeHash)
}
}
if len(autocloses) > 0 {
closeStmt, _ := tx.PrepareContext(ctx, `UPDATE tickets SET status = 'Patched', patched_at = CURRENT_TIMESTAMP WHERE dedupe_hash = ?`)
defer closeStmt.Close()
for _, hash := range autocloses {
closeStmt.ExecContext(ctx, hash)
}
}
return nil
}
func (s *SQLiteStore) LogSync(ctx context.Context, source, status string, records int, errMsg string) error {
_, err := s.DB.ExecContext(ctx, `INSERT INTO sync_logs (source, status, records_processed, error_message) VALUES (?, ?, ?, ?)`, source, status, records, errMsg)
return err
}
func (s *SQLiteStore) GetRecentSyncLogs(ctx context.Context, limit int) ([]domain2.SyncLog, error) {
rows, err := s.DB.QueryContext(ctx, `SELECT id, source, status, records_processed, IFNULL(error_message, ''), created_at FROM sync_logs ORDER BY id DESC LIMIT ?`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var logs []domain2.SyncLog
for rows.Next() {
var l domain2.SyncLog
rows.Scan(&l.ID, &l.Source, &l.Status, &l.RecordsProcessed, &l.ErrorMessage, &l.CreatedAt)
logs = append(logs, l)
}
return logs, nil
}