First release of open core
This commit is contained in:
284
pkg/datastore/sqlite_ingest.go
Normal file
284
pkg/datastore/sqlite_ingest.go
Normal file
@@ -0,0 +1,284 @@
|
||||
package datastore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"time"
|
||||
|
||||
domain2 "epigas.gitea.cloud/RiskRancher/core/pkg/domain"
|
||||
)
|
||||
|
||||
func (s *SQLiteStore) IngestTickets(ctx context.Context, tickets []domain2.Ticket) error {
|
||||
tx, err := s.DB.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
_, err = tx.ExecContext(ctx, `
|
||||
CREATE TEMP TABLE IF NOT EXISTS staging_tickets (
|
||||
domain TEXT, source TEXT, asset_identifier TEXT, title TEXT,
|
||||
description TEXT, recommended_remediation TEXT, severity TEXT,
|
||||
status TEXT, dedupe_hash TEXT
|
||||
)
|
||||
`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tx.ExecContext(ctx, `DELETE FROM staging_tickets`)
|
||||
|
||||
stmt, err := tx.PrepareContext(ctx, `
|
||||
INSERT INTO staging_tickets (domain, source, asset_identifier, title, description, recommended_remediation, severity, status, dedupe_hash)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, t := range tickets {
|
||||
status := t.Status
|
||||
if status == "" {
|
||||
status = "Waiting to be Triaged"
|
||||
}
|
||||
domain := t.Domain
|
||||
if domain == "" {
|
||||
domain = "Vulnerability"
|
||||
}
|
||||
source := t.Source
|
||||
if source == "" {
|
||||
source = "Manual"
|
||||
}
|
||||
|
||||
_, err = stmt.ExecContext(ctx, domain, source, t.AssetIdentifier, t.Title, t.Description, t.RecommendedRemediation, t.Severity, status, t.DedupeHash)
|
||||
if err != nil {
|
||||
stmt.Close()
|
||||
return err
|
||||
}
|
||||
}
|
||||
stmt.Close()
|
||||
|
||||
_, err = tx.ExecContext(ctx, `
|
||||
INSERT INTO tickets (domain, source, asset_identifier, title, description, recommended_remediation, severity, status, dedupe_hash)
|
||||
SELECT domain, source, asset_identifier, title, description, recommended_remediation, severity, status, dedupe_hash
|
||||
FROM staging_tickets
|
||||
WHERE true -- Prevents SQLite from mistaking 'ON CONFLICT' for a JOIN condition
|
||||
ON CONFLICT(dedupe_hash) DO UPDATE SET
|
||||
description = excluded.description,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tx.ExecContext(ctx, `DROP TABLE staging_tickets`)
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (s *SQLiteStore) GetAdapters(ctx context.Context) ([]domain2.Adapter, error) {
|
||||
rows, err := s.DB.QueryContext(ctx, "SELECT id, name, source_name, findings_path, mapping_title, mapping_asset, mapping_severity, mapping_description, mapping_remediation FROM data_adapters")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var adapters []domain2.Adapter
|
||||
for rows.Next() {
|
||||
var a domain2.Adapter
|
||||
rows.Scan(&a.ID, &a.Name, &a.SourceName, &a.FindingsPath, &a.MappingTitle, &a.MappingAsset, &a.MappingSeverity, &a.MappingDescription, &a.MappingRemediation)
|
||||
adapters = append(adapters, a)
|
||||
}
|
||||
return adapters, nil
|
||||
}
|
||||
|
||||
func (s *SQLiteStore) SaveAdapter(ctx context.Context, a domain2.Adapter) error {
|
||||
_, err := s.DB.ExecContext(ctx, `
|
||||
INSERT INTO data_adapters (name, source_name, findings_path, mapping_title, mapping_asset, mapping_severity, mapping_description, mapping_remediation)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||
a.Name, a.SourceName, a.FindingsPath, a.MappingTitle, a.MappingAsset, a.MappingSeverity, a.MappingDescription, a.MappingRemediation)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *SQLiteStore) GetAdapterByID(ctx context.Context, id int) (domain2.Adapter, error) {
|
||||
var a domain2.Adapter
|
||||
query := `
|
||||
SELECT
|
||||
id, name, source_name, findings_path,
|
||||
mapping_title, mapping_asset, mapping_severity,
|
||||
IFNULL(mapping_description, ''), IFNULL(mapping_remediation, ''),
|
||||
created_at, updated_at
|
||||
FROM data_adapters
|
||||
WHERE id = ?`
|
||||
|
||||
err := s.DB.QueryRowContext(ctx, query, id).Scan(
|
||||
&a.ID, &a.Name, &a.SourceName, &a.FindingsPath,
|
||||
&a.MappingTitle, &a.MappingAsset, &a.MappingSeverity,
|
||||
&a.MappingDescription, &a.MappingRemediation,
|
||||
&a.CreatedAt, &a.UpdatedAt,
|
||||
)
|
||||
return a, err
|
||||
}
|
||||
|
||||
func (s *SQLiteStore) DeleteAdapter(ctx context.Context, id int) error {
|
||||
_, err := s.DB.ExecContext(ctx, "DELETE FROM data_adapters WHERE id = ?", id)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *SQLiteStore) GetAdapterByName(ctx context.Context, name string) (domain2.Adapter, error) {
|
||||
var a domain2.Adapter
|
||||
query := `
|
||||
SELECT
|
||||
id, name, source_name, findings_path,
|
||||
mapping_title, mapping_asset, mapping_severity,
|
||||
IFNULL(mapping_description, ''), IFNULL(mapping_remediation, '')
|
||||
FROM data_adapters
|
||||
WHERE name = ?`
|
||||
|
||||
err := s.DB.QueryRowContext(ctx, query, name).Scan(
|
||||
&a.ID, &a.Name, &a.SourceName, &a.FindingsPath,
|
||||
&a.MappingTitle, &a.MappingAsset, &a.MappingSeverity,
|
||||
&a.MappingDescription, &a.MappingRemediation,
|
||||
)
|
||||
return a, err
|
||||
}
|
||||
|
||||
func (s *SQLiteStore) ProcessIngestionBatch(ctx context.Context, source, asset string, incoming []domain2.Ticket) error {
|
||||
slaMap, _ := s.buildSLAMap(ctx)
|
||||
|
||||
tx, err := s.DB.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
for i := range incoming {
|
||||
if incoming[i].Domain == "" {
|
||||
incoming[i].Domain = "Vulnerability"
|
||||
}
|
||||
if incoming[i].Status == "" {
|
||||
incoming[i].Status = "Waiting to be Triaged"
|
||||
}
|
||||
}
|
||||
|
||||
inserts, reopens, updates, closes, err := s.calculateDiffState(ctx, tx, source, asset, incoming)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.executeBatchMutations(ctx, tx, source, asset, slaMap, inserts, reopens, updates, closes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (s *SQLiteStore) calculateDiffState(ctx context.Context, tx *sql.Tx, source, asset string, incoming []domain2.Ticket) (inserts, reopens, descUpdates []domain2.Ticket, autocloses []string, err error) {
|
||||
rows, err := tx.QueryContext(ctx, `SELECT dedupe_hash, status, COALESCE(description, '') FROM tickets WHERE source = ? AND asset_identifier = ?`, source, asset)
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
type existingRecord struct{ status, description string }
|
||||
existingMap := make(map[string]existingRecord)
|
||||
for rows.Next() {
|
||||
var hash, status, desc string
|
||||
if err := rows.Scan(&hash, &status, &desc); err == nil {
|
||||
existingMap[hash] = existingRecord{status: status, description: desc}
|
||||
}
|
||||
}
|
||||
|
||||
incomingMap := make(map[string]bool)
|
||||
for _, ticket := range incoming {
|
||||
incomingMap[ticket.DedupeHash] = true
|
||||
existing, exists := existingMap[ticket.DedupeHash]
|
||||
if !exists {
|
||||
inserts = append(inserts, ticket)
|
||||
} else {
|
||||
if existing.status == "Patched" {
|
||||
reopens = append(reopens, ticket)
|
||||
}
|
||||
if ticket.Description != "" && ticket.Description != existing.description && existing.status != "Patched" && existing.status != "Risk Accepted" && existing.status != "False Positive" {
|
||||
descUpdates = append(descUpdates, ticket)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for hash, record := range existingMap {
|
||||
if !incomingMap[hash] && record.status != "Patched" && record.status != "Risk Accepted" && record.status != "False Positive" {
|
||||
autocloses = append(autocloses, hash)
|
||||
}
|
||||
}
|
||||
return inserts, reopens, descUpdates, autocloses, nil
|
||||
}
|
||||
|
||||
func (s *SQLiteStore) executeBatchMutations(ctx context.Context, tx *sql.Tx, source, asset string, slaMap map[string]map[string]domain2.SLAPolicy, inserts, reopens, descUpdates []domain2.Ticket, autocloses []string) error {
|
||||
now := time.Now()
|
||||
|
||||
// A. Inserts
|
||||
if len(inserts) > 0 {
|
||||
insertStmt, err := tx.PrepareContext(ctx, `INSERT INTO tickets (source, asset_identifier, title, severity, description, status, dedupe_hash, domain, triage_due_date, remediation_due_date) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer insertStmt.Close()
|
||||
|
||||
for _, t := range inserts {
|
||||
daysToTriage, daysToRemediate := 3, 30
|
||||
if dMap, ok := slaMap[t.Domain]; ok {
|
||||
if policy, ok := dMap[t.Severity]; ok {
|
||||
daysToTriage, daysToRemediate = policy.DaysToTriage, policy.DaysToRemediate
|
||||
}
|
||||
}
|
||||
_, err := insertStmt.ExecContext(ctx, source, asset, t.Title, t.Severity, t.Description, t.Status, t.DedupeHash, t.Domain, now.AddDate(0, 0, daysToTriage), now.AddDate(0, 0, daysToRemediate))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(reopens) > 0 {
|
||||
updateStmt, _ := tx.PrepareContext(ctx, `UPDATE tickets SET status = 'Waiting to be Triaged', patched_at = NULL, triage_due_date = ?, remediation_due_date = ? WHERE dedupe_hash = ?`)
|
||||
defer updateStmt.Close()
|
||||
for _, t := range reopens {
|
||||
updateStmt.ExecContext(ctx, now.AddDate(0, 0, 3), now.AddDate(0, 0, 30), t.DedupeHash) // Using default SLAs for fallback
|
||||
}
|
||||
}
|
||||
|
||||
if len(descUpdates) > 0 {
|
||||
descStmt, _ := tx.PrepareContext(ctx, `UPDATE tickets SET description = ? WHERE dedupe_hash = ?`)
|
||||
defer descStmt.Close()
|
||||
for _, t := range descUpdates {
|
||||
descStmt.ExecContext(ctx, t.Description, t.DedupeHash)
|
||||
}
|
||||
}
|
||||
|
||||
if len(autocloses) > 0 {
|
||||
closeStmt, _ := tx.PrepareContext(ctx, `UPDATE tickets SET status = 'Patched', patched_at = CURRENT_TIMESTAMP WHERE dedupe_hash = ?`)
|
||||
defer closeStmt.Close()
|
||||
for _, hash := range autocloses {
|
||||
closeStmt.ExecContext(ctx, hash)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SQLiteStore) LogSync(ctx context.Context, source, status string, records int, errMsg string) error {
|
||||
_, err := s.DB.ExecContext(ctx, `INSERT INTO sync_logs (source, status, records_processed, error_message) VALUES (?, ?, ?, ?)`, source, status, records, errMsg)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *SQLiteStore) GetRecentSyncLogs(ctx context.Context, limit int) ([]domain2.SyncLog, error) {
|
||||
rows, err := s.DB.QueryContext(ctx, `SELECT id, source, status, records_processed, IFNULL(error_message, ''), created_at FROM sync_logs ORDER BY id DESC LIMIT ?`, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var logs []domain2.SyncLog
|
||||
for rows.Next() {
|
||||
var l domain2.SyncLog
|
||||
rows.Scan(&l.ID, &l.Source, &l.Status, &l.RecordsProcessed, &l.ErrorMessage, &l.CreatedAt)
|
||||
logs = append(logs, l)
|
||||
}
|
||||
return logs, nil
|
||||
}
|
||||
Reference in New Issue
Block a user