Compare commits

..

37 Commits

Author SHA1 Message Date
Andrew Ayer
b649b399e4 Do not run actions on pull requests
It's a security minefield.  Thanks to caching of the build environment,
not even read-only actions are safe.
2025-06-23 23:20:54 -04:00
Andrew Ayer
aecfa745ca Add GitHub Actions for test and lint 2025-06-23 23:10:11 -04:00
Andrew Ayer
f5779c283c Add staticcheck configuration 2025-06-23 23:10:05 -04:00
Andrew Ayer
3e811e86d7 Decapitalize some error messages 2025-06-23 22:33:57 -04:00
Andrew Ayer
a4048f47f8 Send helpful User-Agent string with all requests 2025-06-23 16:32:35 -04:00
Daniel Peukert
187aed078c
Fix fmt typos 2025-06-23 19:27:39 +02:00
Andrew Ayer
8ab03b4cf8 Release v0.20.1 2025-06-19 18:30:03 -04:00
Andrew Ayer
bcbd4e62d9 Improve error handling of hooks and sendmail 2025-06-17 14:03:45 -04:00
Andrew Ayer
a2a1fb1dab Set WaitDelay when executing sendmail and hooks 2025-06-17 14:03:31 -04:00
Andrew Ayer
5430f737b0 Enforce a timeout when running sendmail
postfix's sendmail command sometimes retries forever instead of terminating on error (see #100)
2025-06-17 13:59:59 -04:00
Andrew Ayer
f0e8b18d9a Improve code clarity 2025-06-17 11:04:02 -04:00
Andrew Ayer
756782e964 Improve some comments 2025-06-17 11:01:15 -04:00
Andrew Ayer
53029c2a09 Imrove some comments 2025-06-17 10:52:32 -04:00
Andrew Ayer
b05a66f634 Only calculate root hash when needed to verify an STH 2025-06-17 10:45:56 -04:00
Andrew Ayer
b87b33a41b Upgrade dependencies 2025-06-16 23:33:51 -04:00
Andrew Ayer
3279459be2 Add Compare to LogID and merkletree.Hash 2025-06-16 14:24:26 -04:00
Andrew Ayer
d5bc1ef75b Simplify certspotterVersion
The old code is unnecessary now that go derives a version from the VCS info.
2025-06-13 16:26:10 -04:00
Andrew Ayer
38bcd36d98 Release v0.20.0 2025-06-13 12:24:17 -04:00
Andrew Ayer
ca7b11ca96 Print a friendlier error message if -batch_size specified 2025-06-13 12:22:23 -04:00
Andrew Ayer
26439b4deb Remove unused code 2025-05-30 17:09:02 -04:00
Andrew Ayer
9544d8ab50 Imprve comment 2025-05-21 14:59:56 -04:00
Andrew Ayer
694eb276a6 Also check timestamp when comparing STHs
otherwise we might fail to delete unverified_sths if they have a different timestamp
2025-05-21 14:33:58 -04:00
Andrew Ayer
90ead642b0 Simplify context cancellation checks 2025-05-21 14:31:24 -04:00
Andrew Ayer
56af38ca70 Rewrite STH pipeline to avoid prematurely deleting STHs 2025-05-21 14:08:12 -04:00
Andrew Ayer
0c22448e5f Avoid spurious file not found errors loading STH dir if an STH is concurrently deleted 2025-05-20 15:29:23 -04:00
Andrew Ayer
61b037a708 Improve docs for -verbose 2025-05-19 13:47:04 -04:00
Andrew Ayer
15e35abdaa Only print log errors to stderr if -verbose specified
Log errors are so frequent that they are drowning out fatal errors. This commit will reserve stderr for fatal errors by default. See #104 for background.

This means that operators will need to enable -verbose if they want to get details about why a health check failed.  This seems better than making stderr noisy by default. The long-term solution is #106.
2025-05-19 13:46:16 -04:00
Andrew Ayer
ce80beb1d4 Document the directories used by certspotter in the man page
Closes: #103
2025-05-19 13:35:47 -04:00
Andrew Ayer
b06aecc56c Improve man pages 2025-05-19 13:35:43 -04:00
Andrew Ayer
46c8fc64fd Improve verbose logging 2025-05-19 13:24:51 -04:00
Andrew Ayer
b89afef32a In verbose mode, print a message when exiting due to signal 2025-05-19 13:13:18 -04:00
Andrew Ayer
e50476620c sequencer: improve Godocs 2025-05-14 18:44:25 -04:00
Andrew Ayer
63845b370d sequencer: add Reserve method 2025-05-14 18:44:16 -04:00
Andrew Ayer
bdc589762a Improve http.Client configuration
Ensure HTTP/2 can be used.

Set IdleConnTimeout to the net/http default.

Remove MaxIdleConns limit so that connections are more likely to be reused.
2025-05-14 18:43:47 -04:00
Andrew Ayer
0ba3b07bd9 Remove -batch_size option
It's obsolete due to the new parallel downloading system.
2025-05-08 08:39:32 -04:00
Andrew Ayer
996068385f Fail health check for logs have never been contacted 2025-05-07 21:31:43 -04:00
Andrew Ayer
37531001bc Improve formatting of an error message 2025-05-07 18:26:18 -04:00
26 changed files with 421 additions and 225 deletions

35
.github/workflows/test.yml vendored Normal file
View File

@ -0,0 +1,35 @@
name: Test and lint Go Code
on:
push:
schedule:
- cron: '42 9 * * *' # Runs daily at 09:42 UTC
workflow_dispatch: # Allows manual triggering
permissions:
contents: read
jobs:
test:
name: Test
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version-file: go.mod
- name: Run tests
run: CGO_ENABLED=1 go test -race ./...
- name: Install staticcheck
run: go install honnef.co/go/tools/cmd/staticcheck@latest
- name: Run staticcheck
run: staticcheck ./...

View File

@ -1,5 +1,19 @@
# Change Log
## v0.20.1 (2025-06-19)
- Add resilience against sendmail hanging indefinitely.
- Add resilience against hooks which fork and keep stderr open.
- Upgrade dependencies to latest versions.
- Minor improvements to error handling, code quality, and efficiency.
## v0.20.0 (2025-06-13)
- Remove -batch_size option, which is obsolete due to new parallel download system.
- Only print log errors to stderr if -verbose is specified.
- Fix bug that could cause unverified STHs to be deleted prematurely.
- Fail health check if log has never been successfully contacted.
- Improve -verbose.
- Improve documentation.
## v0.19.1 (2025-05-07)
- Fix panic when retrying failed log requests.
- Properly log failed log requests.

View File

@ -46,7 +46,7 @@ func decodeASN1String(value *asn1.RawValue) (string, error) {
if value.Tag == 12 {
// UTF8String
if !utf8.Valid(value.Bytes) {
return "", errors.New("Malformed UTF8String")
return "", errors.New("malformed UTF8String")
}
return string(value.Bytes), nil
} else if value.Tag == 19 || value.Tag == 22 || value.Tag == 20 || value.Tag == 26 {
@ -74,5 +74,5 @@ func decodeASN1String(value *asn1.RawValue) (string, error) {
return stringFromUint32Slice(runes), nil
}
}
return "", errors.New("Not a string")
return "", errors.New("not a string")
}

View File

@ -253,5 +253,5 @@ func decodeASN1Time(value *asn1.RawValue) (time.Time, error) {
return parseGeneralizedTime(value.Bytes)
}
}
return time.Time{}, errors.New("Not a time value")
return time.Time{}, errors.New("not a time value")
}

View File

@ -25,6 +25,7 @@ import (
"syscall"
"time"
"software.sslmate.com/src/certspotter/ctclient"
"software.sslmate.com/src/certspotter/loglist"
"software.sslmate.com/src/certspotter/monitor"
)
@ -37,31 +38,11 @@ const defaultLogList = "https://loglist.certspotter.org/monitor.json"
func certspotterVersion() string {
if Version != "" {
return Version + "?"
}
info, ok := debug.ReadBuildInfo()
if !ok {
} else if info, ok := debug.ReadBuildInfo(); ok && strings.HasPrefix(info.Main.Version, "v") {
return info.Main.Version
} else {
return "unknown"
}
if strings.HasPrefix(info.Main.Version, "v") {
return info.Main.Version
}
var vcs, vcsRevision, vcsModified string
for _, s := range info.Settings {
switch s.Key {
case "vcs":
vcs = s.Value
case "vcs.revision":
vcsRevision = s.Value
case "vcs.modified":
vcsModified = s.Value
}
}
if vcs == "git" && vcsRevision != "" && vcsModified == "true" {
return vcsRevision + "+"
} else if vcs == "git" && vcsRevision != "" {
return vcsRevision
}
return "unknown"
}
func fileExists(filename string) bool {
@ -159,9 +140,10 @@ func appendFunc(slice *[]string) func(string) error {
func main() {
loglist.UserAgent = fmt.Sprintf("certspotter/%s (%s; %s; %s)", certspotterVersion(), runtime.Version(), runtime.GOOS, runtime.GOARCH)
ctclient.UserAgent = fmt.Sprintf("certspotter/%s (+https://github.com/SSLMate/certspotter)", certspotterVersion())
var flags struct {
batchSize int // TODO-4: respect this option
batchSize bool
email []string
healthcheck time.Duration
logs string
@ -174,7 +156,7 @@ func main() {
version bool
watchlist string
}
flag.IntVar(&flags.batchSize, "batch_size", 1000, "Max number of entries to request per call to get-entries (advanced)")
flag.Func("batch_size", "Obsolete; do not use", func(string) error { flags.batchSize = true; return nil }) // TODO: remove in 0.21.0
flag.Func("email", "Email address to contact when matching certificate is discovered (repeatable)", appendFunc(&flags.email))
flag.DurationVar(&flags.healthcheck, "healthcheck", 24*time.Hour, "How frequently to perform a health check")
flag.StringVar(&flags.logs, "logs", defaultLogList, "File path or URL of JSON list of logs to monitor")
@ -183,11 +165,15 @@ func main() {
flag.BoolVar(&flags.startAtEnd, "start_at_end", false, "Start monitoring new logs from the end rather than the beginning (saves considerable bandwidth)")
flag.StringVar(&flags.stateDir, "state_dir", defaultStateDir(), "Directory for storing log position and discovered certificates")
flag.BoolVar(&flags.stdout, "stdout", false, "Write matching certificates to stdout")
flag.BoolVar(&flags.verbose, "verbose", false, "Be verbose")
flag.BoolVar(&flags.verbose, "verbose", false, "Print detailed information about certspotter's operation to stderr")
flag.BoolVar(&flags.version, "version", false, "Print version and exit")
flag.StringVar(&flags.watchlist, "watchlist", defaultWatchListPathIfExists(), "File containing domain names to watch")
flag.Parse()
if flags.batchSize {
fmt.Fprintf(os.Stderr, "%s: -batch_size is obsolete; please remove it from your command line\n", programName)
os.Exit(2)
}
if flags.version {
fmt.Fprintf(os.Stdout, "certspotter version %s\n", certspotterVersion())
os.Exit(0)
@ -205,6 +191,7 @@ func main() {
ScriptDir: defaultScriptDir(),
Email: flags.email,
Stdout: flags.stdout,
Quiet: !flags.verbose,
}
config := &monitor.Config{
LogListSource: flags.logs,
@ -253,7 +240,12 @@ func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
if err := monitor.Run(ctx, config); err != nil && !errors.Is(err, context.Canceled) {
if err := monitor.Run(ctx, config); ctx.Err() == context.Canceled && errors.Is(err, context.Canceled) {
if flags.verbose {
fmt.Fprintf(os.Stderr, "%s: exiting due to SIGINT or SIGTERM\n", programName)
}
os.Exit(0)
} else {
fmt.Fprintf(os.Stderr, "%s: %s\n", programName, err)
os.Exit(1)
}

View File

@ -11,6 +11,7 @@
package ctclient
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
@ -23,6 +24,8 @@ import (
"time"
)
var UserAgent = "software.sslmate.com/src/certspotter"
// Create an HTTP client suitable for communicating with CT logs. dialContext, if non-nil, is used for dialing.
func NewHTTPClient(dialContext func(context.Context, string, string) (net.Conn, error)) *http.Client {
return &http.Client{
@ -31,9 +34,7 @@ func NewHTTPClient(dialContext func(context.Context, string, string) (net.Conn,
TLSHandshakeTimeout: 15 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
MaxIdleConnsPerHost: 10,
DisableKeepAlives: false,
MaxIdleConns: 100,
IdleConnTimeout: 15 * time.Second,
IdleConnTimeout: 90 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
TLSClientConfig: &tls.Config{
// We have to disable TLS certificate validation because because several logs
@ -45,7 +46,8 @@ func NewHTTPClient(dialContext func(context.Context, string, string) (net.Conn,
// updating should a log ever change to a different CA.)
InsecureSkipVerify: true,
},
DialContext: dialContext,
DialContext: dialContext,
ForceAttemptHTTP2: true,
},
CheckRedirect: func(*http.Request, []*http.Request) error {
return errors.New("redirects not followed")
@ -61,7 +63,7 @@ func get(ctx context.Context, httpClient *http.Client, fullURL string) ([]byte,
if err != nil {
return nil, err
}
request.Header.Set("User-Agent", "") // Don't send a User-Agent to make life harder for malicious logs
request.Header.Set("User-Agent", UserAgent)
if httpClient == nil {
httpClient = defaultHTTPClient
@ -79,7 +81,7 @@ func get(ctx context.Context, httpClient *http.Client, fullURL string) ([]byte,
}
if response.StatusCode != 200 {
return nil, fmt.Errorf("Get %q: %s (%q)", fullURL, response.Status, string(responseBody))
return nil, fmt.Errorf("Get %q: %s (%q)", fullURL, response.Status, bytes.TrimSpace(responseBody))
}
return responseBody, nil

View File

@ -10,6 +10,7 @@
package cttypes
import (
"bytes"
"encoding/base64"
"fmt"
"golang.org/x/crypto/cryptobyte"
@ -17,6 +18,10 @@ import (
type LogID [32]byte
func (id LogID) Compare(other LogID) int {
return bytes.Compare(id[:], other[:])
}
func (v *LogID) Unmarshal(s *cryptobyte.String) bool {
return s.CopyBytes((*v)[:])
}

View File

@ -31,3 +31,7 @@ type GossipedSignedTreeHead struct {
func (sth *SignedTreeHead) TimestampTime() time.Time {
return time.UnixMilli(int64(sth.Timestamp))
}
func (sth *SignedTreeHead) Same(other *SignedTreeHead) bool {
return sth.TreeSize == other.TreeSize && sth.Timestamp == other.Timestamp && sth.RootHash == other.RootHash
}

10
go.mod
View File

@ -1,13 +1,13 @@
module software.sslmate.com/src/certspotter
go 1.24
go 1.24.4
require (
golang.org/x/crypto v0.37.0
golang.org/x/net v0.39.0
golang.org/x/sync v0.13.0
golang.org/x/crypto v0.39.0
golang.org/x/net v0.41.0
golang.org/x/sync v0.15.0
)
require golang.org/x/text v0.24.0 // indirect
require golang.org/x/text v0.26.0 // indirect
retract v0.19.0 // Contains serious bugs.

16
go.sum
View File

@ -1,8 +1,8 @@
golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE=
golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc=
golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY=
golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E=
golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610=
golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM=
golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U=
golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw=
golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA=
golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M=
golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA=

View File

@ -262,21 +262,6 @@ func (ids *Identifiers) AddIPAddress(value net.IP) {
ids.appendIPAddress(value)
}
func (ids *Identifiers) dnsNamesString(sep string) string {
return strings.Join(ids.DNSNames, sep)
}
func (ids *Identifiers) ipAddrsString(sep string) string {
str := ""
for _, ipAddr := range ids.IPAddrs {
if str != "" {
str += sep
}
str += ipAddr.String()
}
return str
}
func (cert *CertInfo) ParseIdentifiers() (*Identifiers, error) {
ids := NewIdentifiers()

View File

@ -21,7 +21,7 @@ import (
"time"
)
var UserAgent = "certspotter"
var UserAgent = "software.sslmate.com/src/certspotter"
type ModificationToken struct {
etag string
@ -112,7 +112,7 @@ func Unmarshal(jsonBytes []byte) (*List, error) {
return nil, err
}
if err := list.Validate(); err != nil {
return nil, fmt.Errorf("Invalid log list: %s", err)
return nil, fmt.Errorf("invalid log list: %s", err)
}
return list, nil
}

View File

@ -65,7 +65,7 @@ The following environment variables are set for `discovered_cert` events:
`CERT_SHA256`
: The hex-encoded SHA-256 digest (sometimes called fingerprint) of the certificate.
: The hex-encoded SHA-256 digest (sometimes called fingerprint) of the certificate or precertificate.
The digest is computed over the ASN.1 DER encoding.
`PUBKEY_SHA256`

View File

@ -30,11 +30,6 @@ You can use Cert Spotter to detect:
# OPTIONS
-batch_size *NUMBER*
: Maximum number of entries to request per call to get-entries.
You should not generally need to change this. Defaults to 1000.
-email *ADDRESS*
: Email address to contact when a matching certificate is discovered, or
@ -95,7 +90,7 @@ You can use Cert Spotter to detect:
-verbose
: Be verbose.
: Print detailed information about certspotter's operation (such as errors contacting logs) to stderr.
-version
@ -141,10 +136,10 @@ the script interface, see certspotter-script(8).
# OPERATION
certspotter continuously monitors all browser-recognized Certificate
Transparency logs looking for certificates which are valid for any domain
on your watch list. When certspotter detects a matching certificate, it
emails you, executes a script, and/or writes a report to standard out,
as described above.
Transparency logs looking for certificates (including precertificates)
which are valid for any domain on your watch list. When certspotter
detects a matching certificate, it emails you, executes a script, and/or
writes a report to standard out, as described above.
certspotter also saves a copy of matching certificates in
`$CERTSPOTTER_STATE_DIR/certs` ("~/.certspotter/certs" by default)
@ -178,7 +173,7 @@ to write a file or execute a script), it prints a message to stderr and
exits with a non-zero status.
When certspotter encounters a problem monitoring a log, it prints a message
to stderr and continues running. It will try monitoring the log again later;
to stderr if `-verbose` is specified and continues running. It will try monitoring the log again later;
most log errors are transient.
Every 24 hours (unless overridden by `-healthcheck`), certspotter performs the
@ -195,7 +190,7 @@ standard out, as described above.
Health check failures should be rare, and you should take them seriously because it means
certspotter might not detect all certificates. It might also be an indication
of CT log misbehavior. Consult certspotter's stderr output for details, and if
of CT log misbehavior. Enable the `-verbose` flag and consult stderr for details, and if
you need help, file an issue at <https://github.com/SSLMate/certspotter>.
# EXIT STATUS
@ -229,6 +224,20 @@ and non-zero when a serious error occurs.
: Path to the sendmail binary used for sending emails. Defaults to `/usr/sbin/sendmail`.
# DIRECTORIES
Config directory
: Stores configuration, such as the watch list. The location is: (1) the `CERTSPOTTER_CONFIG_DIR` environment variable, if set, or (2) the default location `~/.certspotter`. certspotter does not write to this directory.
State directory
: Stores state, such as the position of each log and a store of discovered certificates. The location is: (1) the `-state_dir` command line flag, if provided, (2) the `CERTSPOTTER_STATE_DIR` environment variable, if set, or (3) the default location `~/.certspotter`. certspotter creates this directory if necessary.
Cache directory
: Stores cached data. The location is `$XDG_CACHE_HOME/certspotter` (which on Linux is `~/.cache/certspotter` by default). You can delete this directory without without impacting functionality, but certspotter may need to perform additional computation or network requests.
# SEE ALSO
certspotter-script(8)

View File

@ -10,6 +10,7 @@
package merkletree
import (
"bytes"
"crypto/sha256"
"encoding/base64"
"encoding/json"
@ -20,6 +21,10 @@ const HashLen = 32
type Hash [HashLen]byte
func (h Hash) Compare(other Hash) int {
return bytes.Compare(h[:], other[:])
}
func (h Hash) Base64String() string {
return base64.StdEncoding.EncodeToString(h[:])
}

View File

@ -75,7 +75,7 @@ func (daemon *daemon) startTask(ctx context.Context, ctlog *loglist.Log) task {
defer cancel()
err := monitorLogContinously(ctx, daemon.config, ctlog)
if daemon.config.Verbose {
log.Printf("task for log %s stopped with error %s", ctlog.GetMonitoringURL(), err)
log.Printf("%s: task stopped with error: %s", ctlog.GetMonitoringURL(), err)
}
if ctx.Err() == context.Canceled && errors.Is(err, context.Canceled) {
return nil
@ -137,9 +137,10 @@ func (daemon *daemon) run(ctx context.Context) error {
healthCheckTicker := time.NewTicker(daemon.config.HealthCheckInterval)
defer healthCheckTicker.Stop()
for ctx.Err() == nil {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-reloadLogListTicker.C:
if err := daemon.loadLogList(ctx); err != nil {
daemon.logListError = err.Error()
@ -153,7 +154,6 @@ func (daemon *daemon) run(ctx context.Context) error {
}
}
}
return ctx.Err()
}
func Run(ctx context.Context, config *Config) error {

View File

@ -34,6 +34,7 @@ type FilesystemState struct {
ScriptDir string
Email []string
Stdout bool
Quiet bool
}
func (s *FilesystemState) logStateDir(logID LogID) string {
@ -85,7 +86,7 @@ func (s *FilesystemState) StoreLogState(ctx context.Context, logID LogID, state
return writeJSONFile(filePath, state, 0666)
}
func (s *FilesystemState) StoreSTH(ctx context.Context, logID LogID, sth *cttypes.SignedTreeHead) error {
func (s *FilesystemState) StoreSTH(ctx context.Context, logID LogID, sth *cttypes.SignedTreeHead) (*StoredSTH, error) {
sthsDirPath := filepath.Join(s.logStateDir(logID), "unverified_sths")
return storeSTHInDir(sthsDirPath, sth)
}
@ -248,10 +249,12 @@ func (s *FilesystemState) NotifyHealthCheckFailure(ctx context.Context, ctlog *l
}
func (s *FilesystemState) NotifyError(ctx context.Context, ctlog *loglist.Log, err error) error {
if ctlog == nil {
log.Print(err)
} else {
log.Print(ctlog.GetMonitoringURL(), ": ", err)
if !s.Quiet {
if ctlog == nil {
log.Print(err)
} else {
log.Print(ctlog.GetMonitoringURL(), ": ", err)
}
}
return nil
}

View File

@ -24,15 +24,23 @@ func healthCheckFilename() string {
}
func healthCheckLog(ctx context.Context, config *Config, ctlog *loglist.Log) error {
state, err := config.State.LoadLogState(ctx, ctlog.LogID)
if err != nil {
return fmt.Errorf("error loading log state: %w", err)
} else if state == nil {
return nil
}
var (
position uint64
lastSuccess time.Time
verifiedSTH *cttypes.SignedTreeHead
)
if time.Since(state.LastSuccess) < config.HealthCheckInterval {
return nil
if state, err := config.State.LoadLogState(ctx, ctlog.LogID); err != nil {
return fmt.Errorf("error loading log state: %w", err)
} else if state != nil {
if time.Since(state.LastSuccess) < config.HealthCheckInterval {
// log is healthy
return nil
}
position = state.DownloadPosition.Size()
lastSuccess = state.LastSuccess
verifiedSTH = state.VerifiedSTH
}
sths, err := config.State.LoadSTHs(ctx, ctlog.LogID)
@ -43,8 +51,8 @@ func healthCheckLog(ctx context.Context, config *Config, ctlog *loglist.Log) err
if len(sths) == 0 {
info := &StaleSTHInfo{
Log: ctlog,
LastSuccess: state.LastSuccess,
LatestSTH: state.VerifiedSTH,
LastSuccess: lastSuccess,
LatestSTH: verifiedSTH,
}
if err := config.State.NotifyHealthCheckFailure(ctx, ctlog, info); err != nil {
return fmt.Errorf("error notifying about stale STH: %w", err)
@ -53,7 +61,7 @@ func healthCheckLog(ctx context.Context, config *Config, ctlog *loglist.Log) err
info := &BacklogInfo{
Log: ctlog,
LatestSTH: sths[len(sths)-1],
Position: state.DownloadPosition.Size(),
Position: position,
}
if err := config.State.NotifyHealthCheckFailure(ctx, ctlog, info); err != nil {
return fmt.Errorf("error notifying about backlog: %w", err)
@ -70,7 +78,7 @@ type HealthCheckFailure interface {
type StaleSTHInfo struct {
Log *loglist.Log
LastSuccess time.Time
LastSuccess time.Time // may be zero
LatestSTH *cttypes.SignedTreeHead // may be nil
}
@ -87,12 +95,19 @@ type StaleLogListInfo struct {
LastErrorTime time.Time
}
func (e *StaleSTHInfo) LastSuccessString() string {
if e.LastSuccess.IsZero() {
return "never"
} else {
return e.LastSuccess.String()
}
}
func (e *BacklogInfo) Backlog() uint64 {
return e.LatestSTH.TreeSize - e.Position
}
func (e *StaleSTHInfo) Summary() string {
return fmt.Sprintf("Unable to contact %s since %s", e.Log.GetMonitoringURL(), e.LastSuccess)
return fmt.Sprintf("Unable to contact %s since %s", e.Log.GetMonitoringURL(), e.LastSuccessString())
}
func (e *BacklogInfo) Summary() string {
return fmt.Sprintf("Backlog of size %d from %s", e.Backlog(), e.Log.GetMonitoringURL())
@ -103,9 +118,9 @@ func (e *StaleLogListInfo) Summary() string {
func (e *StaleSTHInfo) Text() string {
text := new(strings.Builder)
fmt.Fprintf(text, "certspotter has been unable to contact %s since %s. Consequentially, certspotter may fail to notify you about certificates in this log.\n", e.Log.GetMonitoringURL(), e.LastSuccess)
fmt.Fprintf(text, "certspotter has been unable to contact %s since %s. Consequentially, certspotter may fail to notify you about certificates in this log.\n", e.Log.GetMonitoringURL(), e.LastSuccessString())
fmt.Fprintf(text, "\n")
fmt.Fprintf(text, "For details, see certspotter's stderr output.\n")
fmt.Fprintf(text, "For details, enable -verbose and see certspotter's stderr output.\n")
fmt.Fprintf(text, "\n")
if e.LatestSTH != nil {
fmt.Fprintf(text, "Latest known log size = %d\n", e.LatestSTH.TreeSize)
@ -118,7 +133,7 @@ func (e *BacklogInfo) Text() string {
text := new(strings.Builder)
fmt.Fprintf(text, "certspotter has been unable to download entries from %s in a timely manner. Consequentially, certspotter may be slow to notify you about certificates in this log.\n", e.Log.GetMonitoringURL())
fmt.Fprintf(text, "\n")
fmt.Fprintf(text, "For more details, see certspotter's stderr output.\n")
fmt.Fprintf(text, "For details, enable -verbose and see certspotter's stderr output.\n")
fmt.Fprintf(text, "\n")
fmt.Fprintf(text, "Current log size = %d (as of %s)\n", e.LatestSTH.TreeSize, e.LatestSTH.StoredAt)
fmt.Fprintf(text, "Current position = %d\n", e.Position)

View File

@ -29,7 +29,8 @@ import (
)
const (
getSTHInterval = 5 * time.Minute
getSTHInterval = 5 * time.Minute
maxPartialTileAge = 5 * time.Minute
)
func downloadJobSize(ctlog *loglist.Log) uint64 {
@ -205,7 +206,7 @@ func newLogClient(config *Config, ctlog *loglist.Log) (ctclient.Log, ctclient.Is
logGetter: client,
}, nil
default:
return nil, nil, fmt.Errorf("log uses unknown protocol")
return nil, nil, errors.New("log uses unknown protocol")
}
}
@ -246,17 +247,18 @@ func monitorLogContinously(ctx context.Context, config *Config, ctlog *loglist.L
}
}
if config.Verbose {
log.Printf("brand new log %s (starting from %d)", ctlog.GetMonitoringURL(), state.DownloadPosition.Size())
log.Printf("%s: monitoring brand new log starting from position %d", ctlog.GetMonitoringURL(), state.DownloadPosition.Size())
}
if err := config.State.StoreLogState(ctx, ctlog.LogID, state); err != nil {
return fmt.Errorf("error storing log state: %w", err)
}
} else {
if config.Verbose {
log.Printf("%s: resuming monitoring from position %d", ctlog.GetMonitoringURL(), state.DownloadPosition.Size())
}
}
defer func() {
if config.Verbose {
log.Printf("saving state in defer for %s", ctlog.GetMonitoringURL())
}
storeCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
if err := config.State.StoreLogState(storeCtx, ctlog.LogID, state); err != nil && returnedErr == nil {
@ -267,14 +269,21 @@ func monitorLogContinously(ctx context.Context, config *Config, ctlog *loglist.L
retry:
position := state.DownloadPosition.Size()
// generateBatchesWorker ==> downloadWorker ==> processWorker ==> saveStateWorker
// logs are monitored using the following pipeline of workers, with each worker sending results to the next worker:
// 1 getSTHWorker ==> 1 generateBatchesWorker ==> multiple downloadWorkers ==> multiple processWorkers ==> 1 saveStateWorker
// getSTHWorker - periodically download STHs from the log
// generateBatchesWorker - generate batches of work
// downloadWorkers - download the entries in each batch
// processWorkers - process the certificates (store/notify if matches watch list) in each batch
// saveStateWorker - builds the Merkle Tree and compares against STHs
sths := make(chan *cttypes.SignedTreeHead, 1)
batches := make(chan *batch, downloadWorkers(ctlog))
processedBatches := sequencer.New[batch](0, uint64(downloadWorkers(ctlog))*10)
group, gctx := errgroup.WithContext(ctx)
group.Go(func() error { return getSTHWorker(gctx, config, ctlog, client) })
group.Go(func() error { return generateBatchesWorker(gctx, config, ctlog, position, batches) })
group.Go(func() error { return getSTHWorker(gctx, config, ctlog, client, sths) })
group.Go(func() error { return generateBatchesWorker(gctx, config, ctlog, position, sths, batches) })
for range downloadWorkers(ctlog) {
downloadedBatches := make(chan *batch, 1)
group.Go(func() error { return downloadWorker(gctx, config, ctlog, client, batches, downloadedBatches) })
@ -299,47 +308,18 @@ retry:
return err
}
func getSTHWorker(ctx context.Context, config *Config, ctlog *loglist.Log, client ctclient.Log) error {
for ctx.Err() == nil {
func getSTHWorker(ctx context.Context, config *Config, ctlog *loglist.Log, client ctclient.Log, sthsOut chan<- *cttypes.SignedTreeHead) error {
ticker := time.NewTicker(getSTHInterval)
defer ticker.Stop()
for {
sth, _, err := client.GetSTH(ctx)
if err != nil {
return err
}
if err := config.State.StoreSTH(ctx, ctlog.LogID, sth); err != nil {
return fmt.Errorf("error storing STH: %w", err)
}
if err := sleep(ctx, getSTHInterval); err != nil {
return err
}
}
return ctx.Err()
}
type batch struct {
number uint64
begin, end uint64
sths []*StoredSTH // STHs with sizes in range [begin,end], sorted by TreeSize
entries []ctclient.Entry // in range [begin,end)
}
func generateBatchesWorker(ctx context.Context, config *Config, ctlog *loglist.Log, position uint64, batches chan<- *batch) error {
ticker := time.NewTicker(15 * time.Second)
var number uint64
for ctx.Err() == nil {
sths, err := config.State.LoadSTHs(ctx, ctlog.LogID)
if err != nil {
return fmt.Errorf("error loading STHs: %w", err)
}
for len(sths) > 0 && sths[0].TreeSize < position {
// TODO-4: audit sths[0] against log's verified STH
if err := config.State.RemoveSTH(ctx, ctlog.LogID, &sths[0].SignedTreeHead); err != nil {
return fmt.Errorf("error removing STH: %w", err)
}
sths = sths[1:]
}
position, number, err = generateBatches(ctx, ctlog, position, number, sths, batches)
if err != nil {
return err
select {
case <-ctx.Done():
return ctx.Err()
case sthsOut <- sth:
}
select {
case <-ctx.Done():
@ -347,77 +327,132 @@ func generateBatchesWorker(ctx context.Context, config *Config, ctlog *loglist.L
case <-ticker.C:
}
}
return ctx.Err()
}
// return the time at which the right-most tile indicated by sths was discovered
func tileDiscoveryTime(sths []*StoredSTH) time.Time {
largestSTH, sths := sths[len(sths)-1], sths[:len(sths)-1]
tileNumber := largestSTH.TreeSize / ctclient.StaticTileWidth
storedAt := largestSTH.StoredAt
for _, sth := range slices.Backward(sths) {
if sth.TreeSize/ctclient.StaticTileWidth != tileNumber {
type batch struct {
number uint64
begin, end uint64
discoveredAt time.Time // time at which we became aware of the log having entries in range [begin,end)
sths []*StoredSTH // STHs with sizes in range [begin,end], sorted by TreeSize
entries []ctclient.Entry // in range [begin,end)
}
// Create a batch starting from begin, based on sths (which must be non-empty, sorted by TreeSize, and contain only STHs with TreeSize >= begin). Returns the batch, plus the remaining STHs.
func newBatch(number uint64, begin uint64, sths []*StoredSTH, downloadJobSize uint64) (*batch, []*StoredSTH) {
batch := &batch{
number: number,
begin: begin,
discoveredAt: sths[0].StoredAt,
}
maxEnd := (begin/downloadJobSize + 1) * downloadJobSize
for _, sth := range sths {
if sth.StoredAt.Before(batch.discoveredAt) {
batch.discoveredAt = sth.StoredAt
}
if sth.TreeSize <= maxEnd {
batch.end = sth.TreeSize
batch.sths = append(batch.sths, sth)
} else {
batch.end = maxEnd
break
}
if sth.StoredAt.Before(storedAt) {
storedAt = sth.StoredAt
}
}
return storedAt
return batch, sths[len(batch.sths):]
}
func generateBatches(ctx context.Context, ctlog *loglist.Log, position uint64, number uint64, sths []*StoredSTH, batches chan<- *batch) (uint64, uint64, error) {
// insert sth into sths, which is sorted by TreeSize, and return a new, still-sorted slice.
// if an equivalent STH is already in sths, it is returned unchanged.
func insertSTH(sths []*StoredSTH, sth *StoredSTH) []*StoredSTH {
i := len(sths)
for i > 0 {
if sths[i-1].Same(&sth.SignedTreeHead) {
return sths
}
if sths[i-1].TreeSize < sth.TreeSize {
break
}
i--
}
return slices.Insert(sths, i, sth)
}
func generateBatchesWorker(ctx context.Context, config *Config, ctlog *loglist.Log, position uint64, sthsIn <-chan *cttypes.SignedTreeHead, batchesOut chan<- *batch) error {
downloadJobSize := downloadJobSize(ctlog)
if len(sths) == 0 {
return position, number, nil
sths, err := config.State.LoadSTHs(ctx, ctlog.LogID)
if err != nil {
return fmt.Errorf("error loading STHs: %w", err)
}
largestSTH := sths[len(sths)-1]
treeSize := largestSTH.TreeSize
if ctlog.IsStaticCTAPI() && time.Since(tileDiscoveryTime(sths)) < 5*time.Minute {
// Round down to the tile boundary to avoid downloading a partial tile that was recently discovered
// In a future invocation of this function, either enough time will have passed that this code path will be skipped, or the log will have grown and treeSize will be rounded to a larger tile boundary
treeSize -= treeSize % ctclient.StaticTileWidth
if treeSize < position {
// This can arise with a brand new log when config.StartAtEnd is true
return position, number, nil
// sths is sorted by TreeSize but may contain STHs with TreeSize < position; get rid of them
for len(sths) > 0 && sths[0].TreeSize < position {
// TODO-4: audit sths[0] against log's verified STH
if err := config.State.RemoveSTH(ctx, ctlog.LogID, &sths[0].SignedTreeHead); err != nil {
return fmt.Errorf("error removing STH: %w", err)
}
sths = sths[1:]
}
// from this point, sths is sorted by TreeSize and contains only STHs with TreeSize >= position
handleSTH := func(sth *cttypes.SignedTreeHead) error {
if sth.TreeSize < position {
// TODO-4: audit against log's verified STH
} else {
storedSTH, err := config.State.StoreSTH(ctx, ctlog.LogID, sth)
if err != nil {
return fmt.Errorf("error storing STH: %w", err)
}
sths = insertSTH(sths, storedSTH)
}
return nil
}
var number uint64
for {
batch := &batch{
number: number,
begin: position,
end: min(treeSize, (position/downloadJobSize+1)*downloadJobSize),
for len(sths) == 0 {
select {
case <-ctx.Done():
return ctx.Err()
case sth := <-sthsIn:
if err := handleSTH(sth); err != nil {
return err
}
}
}
for len(sths) > 0 && sths[0].TreeSize <= batch.end {
batch.sths = append(batch.sths, sths[0])
sths = sths[1:]
batch, remainingSTHs := newBatch(number, position, sths, downloadJobSize)
if ctlog.IsStaticCTAPI() && batch.end%downloadJobSize != 0 {
// Wait to download this partial tile until it's old enough
if age := time.Since(batch.discoveredAt); age < maxPartialTileAge {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(maxPartialTileAge - age):
case sth := <-sthsIn:
if err := handleSTH(sth); err != nil {
return err
}
continue
}
}
}
select {
case <-ctx.Done():
return position, number, ctx.Err()
default:
return ctx.Err()
case sth := <-sthsIn:
if err := handleSTH(sth); err != nil {
return err
}
case batchesOut <- batch:
number = batch.number + 1
position = batch.end
sths = remainingSTHs
}
select {
case <-ctx.Done():
return position, number, ctx.Err()
case batches <- batch:
}
number++
if position == batch.end {
break
}
position = batch.end
}
return position, number, nil
}
func downloadWorker(ctx context.Context, config *Config, ctlog *loglist.Log, client ctclient.Log, batchesIn <-chan *batch, batchesOut chan<- *batch) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
var batch *batch
select {
case <-ctx.Done():
@ -431,11 +466,6 @@ func downloadWorker(ctx context.Context, config *Config, ctlog *loglist.Log, cli
}
batch.entries = entries
select {
case <-ctx.Done():
return ctx.Err()
default:
}
select {
case <-ctx.Done():
return ctx.Err()
@ -446,11 +476,6 @@ func downloadWorker(ctx context.Context, config *Config, ctlog *loglist.Log, cli
func processWorker(ctx context.Context, config *Config, ctlog *loglist.Log, issuerGetter ctclient.IssuerGetter, batchesIn <-chan *batch, batchesOut *sequencer.Channel[batch]) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
var batch *batch
select {
case <-ctx.Done():
@ -482,12 +507,11 @@ func saveStateWorker(ctx context.Context, config *Config, ctlog *loglist.Log, st
if batch.begin != state.DownloadPosition.Size() {
panic(fmt.Errorf("saveStateWorker: expected batch to start at %d but got %d instead", state.DownloadPosition.Size(), batch.begin))
}
rootHash := state.DownloadPosition.CalculateRoot()
for {
for len(batch.sths) > 0 && batch.sths[0].TreeSize == state.DownloadPosition.Size() {
sth := batch.sths[0]
batch.sths = batch.sths[1:]
if sth.RootHash != rootHash {
if rootHash := state.DownloadPosition.CalculateRoot(); sth.RootHash != rootHash {
return &verifyEntriesError{
sth: &sth.SignedTreeHead,
entriesRootHash: rootHash,
@ -503,6 +527,9 @@ func saveStateWorker(ctx context.Context, config *Config, ctlog *loglist.Log, st
if err := config.State.RemoveSTH(ctx, ctlog.LogID, &sth.SignedTreeHead); err != nil {
return fmt.Errorf("error removing verified STH: %w", err)
}
if config.Verbose {
log.Printf("%s: verified position is now %d", ctlog.GetMonitoringURL(), sth.SignedTreeHead.TreeSize)
}
}
if len(batch.entries) == 0 {
break
@ -511,7 +538,6 @@ func saveStateWorker(ctx context.Context, config *Config, ctlog *loglist.Log, st
batch.entries = batch.entries[1:]
leafHash := merkletree.HashLeaf(entry.LeafInput())
state.DownloadPosition.Add(leafHash)
rootHash = state.DownloadPosition.CalculateRoot()
}
if err := config.State.StoreLogState(ctx, ctlog.LogID, state); err != nil {

View File

@ -89,18 +89,26 @@ func sendEmail(ctx context.Context, to []string, notif *notification) error {
args = append(args, "--")
args = append(args, to...)
sendmail := exec.CommandContext(ctx, sendmailPath(), args...)
sendmailCtx, cancel := context.WithDeadline(ctx, time.Now().Add(2*time.Minute))
defer cancel()
sendmail := exec.CommandContext(sendmailCtx, sendmailPath(), args...)
sendmail.Stdin = stdin
sendmail.Stderr = stderr
sendmail.WaitDelay = 5 * time.Second
if err := sendmail.Run(); err == nil {
if err := sendmail.Run(); err == nil || err == exec.ErrWaitDelay {
return nil
} else if sendmailCtx.Err() != nil && ctx.Err() == nil {
return fmt.Errorf("error sending email to %v: sendmail command timed out", to)
} else if ctx.Err() != nil {
// if the context was canceled, we can't be sure that the error is the fault of sendmail, so ignore it
return ctx.Err()
} else if exitErr, isExitError := err.(*exec.ExitError); isExitError && exitErr.Exited() {
return fmt.Errorf("error sending email to %v: sendmail failed with exit code %d and error %q", to, exitErr.ExitCode(), strings.TrimSpace(stderr.String()))
} else if isExitError {
return fmt.Errorf("error sending email to %v: sendmail terminated by signal with error %q", to, strings.TrimSpace(stderr.String()))
} else {
return fmt.Errorf("error sending email to %v: %w", to, err)
return fmt.Errorf("error sending email to %v: error running sendmail command: %w", to, err)
}
}
@ -111,10 +119,12 @@ func execScript(ctx context.Context, scriptName string, notif *notification) err
cmd.Env = os.Environ()
cmd.Env = append(cmd.Env, notif.environ...)
cmd.Stderr = stderr
cmd.WaitDelay = 5 * time.Second
if err := cmd.Run(); err == nil {
if err := cmd.Run(); err == nil || err == exec.ErrWaitDelay {
return nil
} else if ctx.Err() != nil {
// if the context was canceled, we can't be sure that the error is the fault of the script, so ignore it
return ctx.Err()
} else if exitErr, isExitError := err.(*exec.ExitError); isExitError && exitErr.Exited() {
return fmt.Errorf("script %q exited with code %d and error %q", scriptName, exitErr.ExitCode(), strings.TrimSpace(stderr.String()))

View File

@ -107,8 +107,15 @@ func processCertificate(ctx context.Context, config *Config, entry *LogEntry, ce
}
chain, chainErr := getChain(ctx)
if errors.Is(chainErr, context.Canceled) {
return chainErr
if chainErr != nil {
if ctx.Err() != nil {
// Getting chain failed, but it was probably because our context
// has been canceled, so just act like we never called getChain.
return ctx.Err()
}
// Although getting the chain failed, we still want to notify
// the user about the matching certificate. We'll include chainErr in the
// notification so the user knows why the chain is missing or incorrect.
}
cert := &DiscoveredCert{

View File

@ -54,7 +54,7 @@ type StateProvider interface {
// Store STH for retrieval by LoadSTHs. If an STH with the same
// timestamp and root hash is already stored, this STH can be ignored.
StoreSTH(context.Context, LogID, *cttypes.SignedTreeHead) error
StoreSTH(context.Context, LogID, *cttypes.SignedTreeHead) (*StoredSTH, error)
// Load all STHs for this log previously stored with StoreSTH.
// The returned slice must be sorted by tree size.

View File

@ -47,7 +47,9 @@ func loadSTHsFromDir(dirPath string) ([]*StoredSTH, error) {
continue
}
sth, err := readSTHFile(filepath.Join(dirPath, filename))
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
continue
} else if err != nil {
return nil, err
}
sths = append(sths, sth)
@ -81,14 +83,26 @@ func readSTHFile(filePath string) (*StoredSTH, error) {
return sth, nil
}
func storeSTHInDir(dirPath string, sth *cttypes.SignedTreeHead) error {
func storeSTHInDir(dirPath string, sth *cttypes.SignedTreeHead) (*StoredSTH, error) {
filePath := filepath.Join(dirPath, sthFilename(sth))
if fileExists(filePath) {
// If the file already exists, we don't want its mtime to change
// because StoredSTH.StoredAt needs to be the time the STH was *first* stored.
return nil
if info, err := os.Lstat(filePath); err == nil {
return &StoredSTH{
SignedTreeHead: *sth,
StoredAt: info.ModTime(),
}, nil
} else if !errors.Is(err, fs.ErrNotExist) {
return nil, err
}
return writeJSONFile(filePath, sth, 0666)
if err := writeJSONFile(filePath, sth, 0666); err != nil {
return nil, err
}
return &StoredSTH{
SignedTreeHead: *sth,
StoredAt: time.Now(), // not the exact modtime of the file, but close enough for our purposes
}, nil
}
func removeSTHFromDir(dirPath string, sth *cttypes.SignedTreeHead) error {

View File

@ -22,6 +22,8 @@ type seqWriter struct {
// Channel[T] is a multi-producer, single-consumer channel of items with monotonicaly-increasing sequence numbers.
// Items can be sent in any order, but they are always received in order of their sequence number.
// It is unsafe to call Next concurrently with itself, or to call Add/Reserve concurrently with another Add/Reserve
// call for the same sequence number. Otherwise, methods are safe to call concurrently.
type Channel[T any] struct {
mu sync.Mutex
next uint64
@ -72,6 +74,27 @@ func (seq *Channel[T]) index(seqNbr uint64) int {
return int(seqNbr % seq.Cap())
}
// Wait until the channel has capacity for an item with the given sequence number.
// After this function returns nil, calling Add with the same sequence number will not block.
func (seq *Channel[T]) Reserve(ctx context.Context, sequenceNumber uint64) error {
seq.mu.Lock()
if sequenceNumber >= seq.next+seq.Cap() {
ready := seq.parkWriter(sequenceNumber)
seq.mu.Unlock()
select {
case <-ctx.Done():
seq.mu.Lock()
seq.forgetWriter(sequenceNumber)
seq.mu.Unlock()
return ctx.Err()
case <-ready:
}
} else {
seq.mu.Unlock()
}
return nil
}
// Send an item with the given sequence number. Blocks if the channel does not have capacity for the item.
// It is undefined behavior to send a sequence number that has previously been sent.
func (seq *Channel[T]) Add(ctx context.Context, sequenceNumber uint64, item *T) error {

View File

@ -147,3 +147,49 @@ func TestSequencerOutOfOrder(t *testing.T) {
//t.Logf("seq.Next %d", i)
}
}
func TestSequencerOutOfOrderReserve(t *testing.T) {
ctx := context.Background()
seq := New[uint64](0, 10)
ch := make(chan uint64)
go func() {
for i := range uint64(10_000) {
ch <- i
}
}()
ch2 := make(chan uint64)
for job := range 4 {
go func() {
for i := range ch {
time.Sleep(mathrand.N(11 * time.Duration(job+1) * time.Millisecond))
err := seq.Reserve(ctx, i)
if err != nil {
panic(fmt.Sprintf("%d: seq.Reserve returned unexpected error %v", i, err))
}
ch2 <- i
}
}()
}
for range 4 {
go func() {
for i := range ch2 {
time.Sleep(mathrand.N(7 * time.Millisecond))
t.Logf("seq.Add %d", i)
err := seq.Add(ctx, i, &i)
if err != nil {
panic(fmt.Sprintf("%d: seq.Add returned unexpected error %v", i, err))
}
}
}()
}
for i := range uint64(10_000) {
next, err := seq.Next(ctx)
if err != nil {
t.Fatalf("%d: seq.Next returned unexpected error %v", i, err)
}
if *next != i {
t.Fatalf("%d: got unexpected value %d", i, *next)
}
t.Logf("seq.Next %d", i)
}
}

1
staticcheck.conf Normal file
View File

@ -0,0 +1 @@
checks = ["inherit", "-ST1005", "-S1002"]