2026-04-02 10:57:36 -04:00
package datastore
import (
"context"
"database/sql"
"time"
2026-04-29 11:04:14 -04:00
domain2 "code.riskrancher.com/RiskRancher/core/pkg/domain"
2026-04-02 10:57:36 -04:00
)
func ( s * SQLiteStore ) IngestTickets ( ctx context . Context , tickets [ ] domain2 . Ticket ) error {
tx , err := s . DB . BeginTx ( ctx , nil )
if err != nil {
return err
}
defer tx . Rollback ( )
_ , err = tx . ExecContext ( ctx , `
CREATE TEMP TABLE IF NOT EXISTS staging_tickets (
domain TEXT, source TEXT, asset_identifier TEXT, title TEXT,
description TEXT, recommended_remediation TEXT, severity TEXT,
status TEXT, dedupe_hash TEXT
)
` )
if err != nil {
return err
}
tx . ExecContext ( ctx , ` DELETE FROM staging_tickets ` )
stmt , err := tx . PrepareContext ( ctx , `
INSERT INTO staging_tickets (domain, source, asset_identifier, title, description, recommended_remediation, severity, status, dedupe_hash)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
` )
if err != nil {
return err
}
for _ , t := range tickets {
status := t . Status
if status == "" {
status = "Waiting to be Triaged"
}
domain := t . Domain
if domain == "" {
domain = "Vulnerability"
}
source := t . Source
if source == "" {
source = "Manual"
}
_ , err = stmt . ExecContext ( ctx , domain , source , t . AssetIdentifier , t . Title , t . Description , t . RecommendedRemediation , t . Severity , status , t . DedupeHash )
if err != nil {
stmt . Close ( )
return err
}
}
stmt . Close ( )
_ , err = tx . ExecContext ( ctx , `
INSERT INTO tickets (domain, source, asset_identifier, title, description, recommended_remediation, severity, status, dedupe_hash)
SELECT domain, source, asset_identifier, title, description, recommended_remediation, severity, status, dedupe_hash
FROM staging_tickets
WHERE true -- Prevents SQLite from mistaking 'ON CONFLICT' for a JOIN condition
ON CONFLICT(dedupe_hash) DO UPDATE SET
description = excluded.description,
updated_at = CURRENT_TIMESTAMP
` )
if err != nil {
return err
}
tx . ExecContext ( ctx , ` DROP TABLE staging_tickets ` )
return tx . Commit ( )
}
func ( s * SQLiteStore ) GetAdapters ( ctx context . Context ) ( [ ] domain2 . Adapter , error ) {
rows , err := s . DB . QueryContext ( ctx , "SELECT id, name, source_name, findings_path, mapping_title, mapping_asset, mapping_severity, mapping_description, mapping_remediation FROM data_adapters" )
if err != nil {
return nil , err
}
defer rows . Close ( )
var adapters [ ] domain2 . Adapter
for rows . Next ( ) {
var a domain2 . Adapter
rows . Scan ( & a . ID , & a . Name , & a . SourceName , & a . FindingsPath , & a . MappingTitle , & a . MappingAsset , & a . MappingSeverity , & a . MappingDescription , & a . MappingRemediation )
adapters = append ( adapters , a )
}
return adapters , nil
}
func ( s * SQLiteStore ) SaveAdapter ( ctx context . Context , a domain2 . Adapter ) error {
_ , err := s . DB . ExecContext ( ctx , `
INSERT INTO data_adapters (name, source_name, findings_path, mapping_title, mapping_asset, mapping_severity, mapping_description, mapping_remediation)
VALUES (?, ?, ?, ?, ?, ?, ?, ?) ` ,
a . Name , a . SourceName , a . FindingsPath , a . MappingTitle , a . MappingAsset , a . MappingSeverity , a . MappingDescription , a . MappingRemediation )
return err
}
func ( s * SQLiteStore ) GetAdapterByID ( ctx context . Context , id int ) ( domain2 . Adapter , error ) {
var a domain2 . Adapter
query := `
SELECT
id, name, source_name, findings_path,
mapping_title, mapping_asset, mapping_severity,
IFNULL(mapping_description, ''), IFNULL(mapping_remediation, ''),
created_at, updated_at
FROM data_adapters
WHERE id = ? `
err := s . DB . QueryRowContext ( ctx , query , id ) . Scan (
& a . ID , & a . Name , & a . SourceName , & a . FindingsPath ,
& a . MappingTitle , & a . MappingAsset , & a . MappingSeverity ,
& a . MappingDescription , & a . MappingRemediation ,
& a . CreatedAt , & a . UpdatedAt ,
)
return a , err
}
func ( s * SQLiteStore ) DeleteAdapter ( ctx context . Context , id int ) error {
_ , err := s . DB . ExecContext ( ctx , "DELETE FROM data_adapters WHERE id = ?" , id )
return err
}
func ( s * SQLiteStore ) GetAdapterByName ( ctx context . Context , name string ) ( domain2 . Adapter , error ) {
var a domain2 . Adapter
query := `
SELECT
id, name, source_name, findings_path,
mapping_title, mapping_asset, mapping_severity,
IFNULL(mapping_description, ''), IFNULL(mapping_remediation, '')
FROM data_adapters
WHERE name = ? `
err := s . DB . QueryRowContext ( ctx , query , name ) . Scan (
& a . ID , & a . Name , & a . SourceName , & a . FindingsPath ,
& a . MappingTitle , & a . MappingAsset , & a . MappingSeverity ,
& a . MappingDescription , & a . MappingRemediation ,
)
return a , err
}
func ( s * SQLiteStore ) ProcessIngestionBatch ( ctx context . Context , source , asset string , incoming [ ] domain2 . Ticket ) error {
slaMap , _ := s . buildSLAMap ( ctx )
tx , err := s . DB . BeginTx ( ctx , nil )
if err != nil {
return err
}
defer tx . Rollback ( )
for i := range incoming {
if incoming [ i ] . Domain == "" {
incoming [ i ] . Domain = "Vulnerability"
}
if incoming [ i ] . Status == "" {
incoming [ i ] . Status = "Waiting to be Triaged"
}
}
inserts , reopens , updates , closes , err := s . calculateDiffState ( ctx , tx , source , asset , incoming )
if err != nil {
return err
}
if err := s . executeBatchMutations ( ctx , tx , source , asset , slaMap , inserts , reopens , updates , closes ) ; err != nil {
return err
}
return tx . Commit ( )
}
func ( s * SQLiteStore ) calculateDiffState ( ctx context . Context , tx * sql . Tx , source , asset string , incoming [ ] domain2 . Ticket ) ( inserts , reopens , descUpdates [ ] domain2 . Ticket , autocloses [ ] string , err error ) {
rows , err := tx . QueryContext ( ctx , ` SELECT dedupe_hash, status, COALESCE(description, '') FROM tickets WHERE source = ? AND asset_identifier = ? ` , source , asset )
if err != nil {
return nil , nil , nil , nil , err
}
defer rows . Close ( )
type existingRecord struct { status , description string }
existingMap := make ( map [ string ] existingRecord )
for rows . Next ( ) {
var hash , status , desc string
if err := rows . Scan ( & hash , & status , & desc ) ; err == nil {
existingMap [ hash ] = existingRecord { status : status , description : desc }
}
}
incomingMap := make ( map [ string ] bool )
for _ , ticket := range incoming {
incomingMap [ ticket . DedupeHash ] = true
existing , exists := existingMap [ ticket . DedupeHash ]
if ! exists {
inserts = append ( inserts , ticket )
} else {
if existing . status == "Patched" {
reopens = append ( reopens , ticket )
}
if ticket . Description != "" && ticket . Description != existing . description && existing . status != "Patched" && existing . status != "Risk Accepted" && existing . status != "False Positive" {
descUpdates = append ( descUpdates , ticket )
}
}
}
for hash , record := range existingMap {
if ! incomingMap [ hash ] && record . status != "Patched" && record . status != "Risk Accepted" && record . status != "False Positive" {
autocloses = append ( autocloses , hash )
}
}
return inserts , reopens , descUpdates , autocloses , nil
}
func ( s * SQLiteStore ) executeBatchMutations ( ctx context . Context , tx * sql . Tx , source , asset string , slaMap map [ string ] map [ string ] domain2 . SLAPolicy , inserts , reopens , descUpdates [ ] domain2 . Ticket , autocloses [ ] string ) error {
now := time . Now ( )
// A. Inserts
if len ( inserts ) > 0 {
insertStmt , err := tx . PrepareContext ( ctx , ` INSERT INTO tickets (source, asset_identifier, title, severity, description, status, dedupe_hash, domain, triage_due_date, remediation_due_date) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ` )
if err != nil {
return err
}
defer insertStmt . Close ( )
for _ , t := range inserts {
daysToTriage , daysToRemediate := 3 , 30
if dMap , ok := slaMap [ t . Domain ] ; ok {
if policy , ok := dMap [ t . Severity ] ; ok {
daysToTriage , daysToRemediate = policy . DaysToTriage , policy . DaysToRemediate
}
}
_ , err := insertStmt . ExecContext ( ctx , source , asset , t . Title , t . Severity , t . Description , t . Status , t . DedupeHash , t . Domain , now . AddDate ( 0 , 0 , daysToTriage ) , now . AddDate ( 0 , 0 , daysToRemediate ) )
if err != nil {
return err
}
}
}
if len ( reopens ) > 0 {
updateStmt , _ := tx . PrepareContext ( ctx , ` UPDATE tickets SET status = 'Waiting to be Triaged', patched_at = NULL, triage_due_date = ?, remediation_due_date = ? WHERE dedupe_hash = ? ` )
defer updateStmt . Close ( )
for _ , t := range reopens {
updateStmt . ExecContext ( ctx , now . AddDate ( 0 , 0 , 3 ) , now . AddDate ( 0 , 0 , 30 ) , t . DedupeHash ) // Using default SLAs for fallback
}
}
if len ( descUpdates ) > 0 {
descStmt , _ := tx . PrepareContext ( ctx , ` UPDATE tickets SET description = ? WHERE dedupe_hash = ? ` )
defer descStmt . Close ( )
for _ , t := range descUpdates {
descStmt . ExecContext ( ctx , t . Description , t . DedupeHash )
}
}
if len ( autocloses ) > 0 {
closeStmt , _ := tx . PrepareContext ( ctx , ` UPDATE tickets SET status = 'Patched', patched_at = CURRENT_TIMESTAMP WHERE dedupe_hash = ? ` )
defer closeStmt . Close ( )
for _ , hash := range autocloses {
closeStmt . ExecContext ( ctx , hash )
}
}
return nil
}
func ( s * SQLiteStore ) LogSync ( ctx context . Context , source , status string , records int , errMsg string ) error {
_ , err := s . DB . ExecContext ( ctx , ` INSERT INTO sync_logs (source, status, records_processed, error_message) VALUES (?, ?, ?, ?) ` , source , status , records , errMsg )
return err
}
func ( s * SQLiteStore ) GetRecentSyncLogs ( ctx context . Context , limit int ) ( [ ] domain2 . SyncLog , error ) {
rows , err := s . DB . QueryContext ( ctx , ` SELECT id, source, status, records_processed, IFNULL(error_message, ''), created_at FROM sync_logs ORDER BY id DESC LIMIT ? ` , limit )
if err != nil {
return nil , err
}
defer rows . Close ( )
var logs [ ] domain2 . SyncLog
for rows . Next ( ) {
var l domain2 . SyncLog
rows . Scan ( & l . ID , & l . Source , & l . Status , & l . RecordsProcessed , & l . ErrorMessage , & l . CreatedAt )
logs = append ( logs , l )
}
return logs , nil
}