afex / hystrix-go Goto Github PK
View Code? Open in Web Editor NEWNetflix's Hystrix latency and fault tolerance library, for Go
License: MIT License
Netflix's Hystrix latency and fault tolerance library, for Go
License: MIT License
output:
fallback failed with '{"id":"user.srv","code":500,"detail":"error","status":"Internal Server Error"}'. run error was 'hystrix: timeout'
How can I get the original error?
Here is the issue when I do this.
func FetchReq() (ids []string, err error) {
err = hystrix.Do("FetchA", func() error {
for _ , id := range fetchRequest() {
ids = append(ids, fmt.Sprintf("%d", id))
}
}
if len(str) == 0 {
return nil , errors.New("No response")
}
return str , nil
}
If there is a timeout in fetchRequest
we will get the panic say ids is nil in for block
It's similar to this problem.
https://play.golang.org/p/5LEjvISGuE4
I think we can have another way to solve this error when timeout.
solving like this to exit when timeout.
https://play.golang.org/p/PRsgLmUMSsQ
The error can be referred to this line.
https://github.com/afex/hystrix-go/blob/master/hystrix/hystrix.go#L205
We are seeing a pattern which indicates a likely goroutine leak. When there are downstream errors and circuit is opened for any given command, then after a while lot of goroutines get created and never closed ( pprof output attached ). A quick glance at the code suggests that they are created for every command's CircuitBreaker
. But we ran few test where only single command is used, still the goroutine count created by metricExchange.Monitor
and poolMetrics.Monitor
seemed to be increasing according to the pprof output.
##Test environment:
We ran around 100 RPM, no failure ( i.e. circuit was never opened ), the expectation was that it will have constant number of go-routines created by hystrix-go
lib metricExchange.Monitor
and poolMetrics.Monitor
. But as it can be seen in the output, the number increased.
I'm considering this for use in a production application. We use Glide for package management, and it would be very helpful if this repository had a semantic version tag so that our builds are reproducible. Without that, I'm hesitant.
I do not want to limit the MaxConcurrentRequests.
should I just put a huge number like 100000000 ?
or do you have a config on it?
What if I have N database servers, I have to establish connection to first one who works, how should I write this command that tries first server, then second etc.?
In theory, when ErrorPercentThreshold is bigger than 100, the circuit should be always closed. But there are some exception. If you run the code below, you will find out that the circuit may possibly be open.
package main
import (
"fmt"
"github.com/afex/hystrix-go/hystrix"
"log"
"math/rand"
"sync"
"time"
)
func main() {
f := time.Millisecond * 200
cun := 1000
num := cun * 3
hystrix.SetLogger(log.Default())
hystrix.ConfigureCommand("my_command", hystrix.CommandConfig{
Timeout: int(f.Milliseconds()),
MaxConcurrentRequests: cun,
RequestVolumeThreshold: 30,
SleepWindow: 30,
ErrorPercentThreshold: 120,
})
var lock sync.Mutex
var wg sync.WaitGroup
rand.Seed(int64(time.Now().Nanosecond()))
mm := make(map[string]int)
mark := 0
now := time.Now()
for i := 0; i < num; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := hystrix.Do("my_command", func() error {
a := rand.Intn(5)
if a < 1 {
time.Sleep(time.Millisecond * 10)
return fmt.Errorf("internal error")
} else if a >= 4 {
time.Sleep(time.Millisecond * 300)
return nil
} else {
time.Sleep(time.Millisecond * 9)
lock.Lock()
if _, found := mm["success"]; found {
mm["success"]++
} else {
mm["success"] = 1
}
lock.Unlock()
return nil
}
}, func(err error) error {
if err != nil {
lock.Lock()
if err.Error() == "hystrix: circuit open" && mark == 0 {
log.Println("circuit on at: ", time.Now().Sub(now).Milliseconds())
mark = 1
}
if _, found := mm[err.Error()]; found {
mm[err.Error()]++
} else {
mm[err.Error()] = 1
}
lock.Unlock()
} else {
panic("err is nil")
}
return nil
})
if err != nil {
fmt.Println("err: ", err)
}
}()
}
wg.Wait()
log.Println("end at: ", time.Now().Sub(now).Milliseconds())
count := 0
for key, value := range mm {
count += value
fmt.Printf("%s: %f\n", key, float32(value) / float32(num))
}
if count != num {
panic("don't match")
}
}
In https://github.com/afex/hystrix-go/blob/fa1af6a1f4f56e0e50d427fe901cd604d8c6fb8a/hystrix/metrics.go#L138, the value of errs may be larger than that of reqs because the code calculate the errs later than reqs.
One simple way to fix the problem is to make the circuit always heathy when ErrorPercentThreshold is bigger than 100 like:
func (m *metricExchange) IsHealthy(now time.Time) bool {
errRate := getSettings(m.Name).ErrorPercentThreshold
if errRate > 100 {
return true
}
return m.ErrorPercent(now) < errRate
}
Hi!
If a function times out, ala:
https://github.com/afex/hystrix-go/blob/master/hystrix/hystrix.go#L160-L165
Then this would block, causing a leaked goroutine:
https://github.com/afex/hystrix-go/blob/master/hystrix/hystrix.go#L97
Yeah?
Currently, when running hystrix.Do
or hystrix.Go
, there is no built in context cancellation during a timeout. For example, when making a dynamo query:
err := hystrix.Do(HystrixCommand, func() error {
results, err := this.ddb.QueryWithContext(ctx, query)
return err
}, nil)
If hystrix times out here, the dynamo query will not be cancelled (since the context is not cancelled) and may stay open forever. I had a case recently, where I reached my dynamo read capacity and this query stayed open for > 80 seconds.
It is possible to add my own context cancellation:
ctx, cancel := context.WithCancel(ctx)
err := hystrix.Do(HystrixCommand, func() error {
results, err := this.ddb.QueryWithContext(ctx, query)
return err
}, func(err error) error {
cancel()
return nil
})
But I think it should be built into the library:
err := hystrix.Do(ctx, HystrixCommand, func(ctx context.Context) error {
results, err := this.ddb.QueryWithContext(ctx, query)
return err
}, nil)
Where the passed in context to hystrix.Do
is wrapped with context.WithCancel(ctx)
, passed into the runFunc, and then the cancel func is called when there is an error/timeout.
I have one of the requirement to execute some functionality like reconnect with back-end system as soon as Circuit Open.
Tried to look at code to identify if there is any such functionality is available, but unfortunately there is nothing like Callback on State Change.
Do you have any plan introduce callback on state Change, other than the circuit state change logs.
I will be happy to raise a PR by adding Callback functionility
I'm using hystrix-go in go-kit, as one of the circuit breaker implementations. Thank you for your good work!
On a recent CI run, I noticed an error. I've reproduced the unit test as a single executable. Consider this program. When I checkout revision be7b59e, I get this output:
ugh ~/tmp/hystr rm -rf $GOPATH/pkg/darwin_amd64/github.com/afex/hystrix-go ; go run example.go
priming with 40 successful requests
switching to errors...
now the next few requests should give us our error: kaboom
got expected error 1
got expected error 2
the circuit should have opened by now
hystrix-go: opening circuit my-endpoint
got expected error: hystrix: circuit open
got expected error: hystrix: circuit open
got expected error: hystrix: circuit open
got expected error: hystrix: circuit open
got expected error: hystrix: circuit open
everything works as expected
When I checkout revision 5b79165, I get this output:
ugh ~/tmp/hystr rm -rf $GOPATH/pkg/darwin_amd64/github.com/afex/hystrix-go ; go run example.go
priming with 40 successful requests
switching to errors...
now the next few requests should give us our error: kaboom
got expected error 1
got expected error 2
the circuit should have opened by now
got unexpected error at request 1: kaboom
exit status 1
If there's a bug in my unit test, I suspect it's to do with the shouldPass statement. Can you find a problem, or is this a regression?
Feature request: Need a separate treatment of errors returned by the hystrix command due to illegal arguments or non-system failures, similar to HystrixBadRequestException in the Java version of hystrix
Workaround:
badRequests
before engaging hystrix.Go
badRequests
instead of returning thembadRequests
in addition to those returned by hystrix.Go
Proposal:
badRequest
in hystrixerror
to a type that implements both error
and badRequest
interfaces so that the commands will return it instead of the original errorbadRequest
errors neither count against the failure metrics nor trigger fallback logichystrix.Go
execution, user's code can pull all errors from the same channel and if necessary, can treat badRequest
errors separatelyAre you planning on implementing this? If not, I'm happy to take a look at it. Thanks.
I'd like to start using the project but seems that the project is not being maintained for someone else.
Does anyone has information about that or if there is a fork that people are contributing to?
When looking at logs it would be nice to note what package created the error.
CircuitBreaker.IsOpen() is not read only, so when call it before hystrix.Go or hystrix.Do maybe circuit status is changed. some smell code in this function, snippet as:
if !circuit.metrics.IsHealthy(time.Now()) {
// too many failures, open the circuit
circuit.setOpen()
return true
}
i am new with hystrix, but shouldn't have a hard timeout limit? i read the code, when time is up it just return timeout err, but the real job is running in another goroutine and may be never be stoped. am i right with this?should it be solve?
hystrix.Go("test_name", func() error {
for{
log.Println("hello hystrix")
time.Sleep(time.Microsecond * 100)
}
return errors.New("test return err")
}, func(e error) error {
log.Println("ops~ got err!", e)
return errors.New("second error")
})
Is this project still maintained? Although there are several passing pull request, there has not been a commit in 9 months.
hystrix.ConfigureCommand(key, apiConfig.HystrixConfig) set config at init, how to update at runtime?
Hi,
Sample code:
payload io.Reader) (bool, int) {
//build request
request, err := http.NewRequest(method, url, payload)
if err != nil {
log.Printf("failed in request build %s %s \n", url, err.Error())
return false, 0
}
//make request
response, err := h.HttpExecute(request)
if err != nil {
log.Println("HttpCommand=Error URL=", url, " Error=", err)
return false, 0
} else {
log.Println("HttpCommand=Success response=", response, " error=", err)
io.Copy(ioutil.Discard, response.Body)
defer response.Body.Close()
return true, response.StatusCode
}
}
func (h *HTTPSink) HttpExecute(input *http.Request) (response *http.Response, err error){
if err := hystrix.Do("http", func() (err error) {
response, err = h.client.Do(input)
return err
}, nil); err != nil {
return nil, err
}
return response, nil
}
In the above code response, err := h.HttpExecute(request) is behaving in non deterministic fashion. While running in debug mode err is coming as nil although destination server failed (4xx) but while executing the same line second time getting correct error
First invocation result
Second Invoction Result
is it supposed to use so much memory? any ways to cut it down?
We have a need to check if a circuit is open. Is there an easy way to do that? Also, we want to send some status to our metrics server and the only way I can think of is to hack the HTTP streamer. An example here (to stdout):
// This is used to output the Hystrix stream to stdout and only used for debugging
// circuit stats
type outputResponseStdout struct {
HeaderMap http.Header
}
func (o *outputResponseStdout) Header() http.Header {
m := o.HeaderMap
if m == nil {
m = make(http.Header)
o.HeaderMap = m
}
return m
}
func (o *outputResponseStdout) Write(buf []byte) (int, error) {
return os.Stdout.Write(buf)
}
func (o *outputResponseStdout) WriteHeader(c int) {
fmt.Println("HTTP code: ", c)
}
func OutputHystrixEvents() {
s := hystrix.NewStreamHandler()
s.Start()
rh := &outputResponseStdout{}
req, err := http.NewRequest("GET", "", nil)
if err != nil {
return
}
s.ServeHTTP(rh, req)
}
With the current "Build and test" instructions in the readme:
vagrant up
because the https://github.com/smartystreets/assertions package now needs Go 1.13 (for errors.Is
but the VM provides Go 1.9go test ./...
instructions don't work because /go
is root:root
but the VM guest user id vagrant:vagrant
so the go tool can't download the dependenciessuch chown -R vagrant:vagrant /go
the go tool downloads success, but the build fails because of https://github.com/smartystreets/assertions dependency on Go 1.13 again.Given the following code hystrix will permanently hang a go routine:
errors := hystrix.Go("foo", func() error {
fmt.Println("Just checking if success/failure")
return nil
}, nil)
err := <- errors
return err
hystrix-go/hystrix/pool_metrics.go
Line 45 in fa1af6a
My question is : why lock here?
From my point of view, there is only one goroutine that executes this block of code. Also, the element in channel won't be changed once pushed to channel.
There is currently a race condition caused by closing the chan error
returned by hystrix.Go
. Despite hystrix.Go
having no errors, receives on the error channel can succeed, hiding receives on output channels.
Below is a simple example to illustrate:
out := make(chan bool, 1)
errs := hystrix.Go("command", func() error {
out <- true
return nil
}, nil)
select {
case x := <-out:
// this sometimes won't execute
case err := <-errs:
// this can execute, despite there being no errors
}
Question,
newbie to hystrix. will look in greater detail.
Is this a complete port? How far along is it?
Thanks.
var globalSettings = &Settings{
Timeout: time.Duration(DefaultTimeout) * time.Millisecond,
MaxConcurrentRequests: DefaultMaxConcurrent,
RequestVolumeThreshold: uint64(DefaultVolumeThreshold),
SleepWindow: time.Duration(DefaultSleepWindow) * time.Millisecond,
ErrorPercentThreshold: DefaultErrorPercentThreshold,
}
func ConfigureGlobal(config *CommandConfig) {
settingsMutex.Lock()
defer settingsMutex.Unlock()
if config.Timeout != 0 {
globalSettings.Timeout = time.Duration(config.Timeout) * time.Millisecond
}
if config.MaxConcurrentRequests != 0 {
globalSettings.MaxConcurrentRequests = config.MaxConcurrentRequests
}
if config.RequestVolumeThreshold != 0 {
globalSettings.RequestVolumeThreshold = uint64(config.RequestVolumeThreshold)
}
if config.SleepWindow != 0 {
globalSettings.SleepWindow = time.Duration(config.SleepWindow) * time.Millisecond
}
if config.ErrorPercentThreshold != 0 {
globalSettings.ErrorPercentThreshold = config.ErrorPercentThreshold
}
}
// ConfigureCommand applies settings for a circuit
func ConfigureCommand(name string, config CommandConfig) {
settingsMutex.Lock()
defer settingsMutex.Unlock()
timeout := globalSettings.Timeout
if config.Timeout != 0 {
timeout = time.Duration(config.Timeout) * time.Millisecond
}
max := globalSettings.MaxConcurrentRequests
if config.MaxConcurrentRequests != 0 {
max = config.MaxConcurrentRequests
}
volume := globalSettings.RequestVolumeThreshold
if config.RequestVolumeThreshold != 0 {
volume = uint64(config.RequestVolumeThreshold)
}
sleep := globalSettings.SleepWindow
if config.SleepWindow != 0 {
sleep = time.Duration(config.SleepWindow) * time.Millisecond
}
errorPercent := globalSettings.ErrorPercentThreshold
if config.ErrorPercentThreshold != 0 {
errorPercent = config.ErrorPercentThreshold
}
circuitSettings[name] = &Settings{
Timeout: timeout,
MaxConcurrentRequests: max,
RequestVolumeThreshold: volume,
SleepWindow: sleep,
ErrorPercentThreshold: errorPercent,
}
}
package hystrix
import (
"sync"
"time"
)
var (
// DefaultTimeout is how long to wait for command to complete, in milliseconds
DefaultTimeout = 2000
// DefaultMaxConcurrent is how many commands of the same type can run at the same time
DefaultMaxConcurrent = 100
// DefaultVolumeThreshold is the minimum number of requests needed before a circuit can be tripped due to health
DefaultVolumeThreshold = 20
// DefaultSleepWindow is how long, in milliseconds, to wait after a circuit opens before testing for recovery
DefaultSleepWindow = 5000
// DefaultErrorPercentThreshold causes circuits to open once the rolling measure of errors exceeds this percent of requests
DefaultErrorPercentThreshold = 50
)
type Settings struct {
Timeout time.Duration
MaxConcurrentRequests int
RequestVolumeThreshold uint64
SleepWindow time.Duration
ErrorPercentThreshold int
}
// CommandConfig is used to tune circuit settings at runtime
type CommandConfig struct {
Timeout int `json:"timeout"`
MaxConcurrentRequests int `json:"max_concurrent_requests"`
RequestVolumeThreshold int `json:"request_volume_threshold"`
SleepWindow int `json:"sleep_window"`
ErrorPercentThreshold int `json:"error_percent_threshold"`
}
var circuitSettings map[string]*Settings
var settingsMutex *sync.RWMutex
var globalSettings = &Settings{
Timeout: time.Duration(DefaultTimeout) * time.Millisecond,
MaxConcurrentRequests: DefaultMaxConcurrent,
RequestVolumeThreshold: uint64(DefaultVolumeThreshold),
SleepWindow: time.Duration(DefaultSleepWindow) * time.Millisecond,
ErrorPercentThreshold: DefaultErrorPercentThreshold,
}
func ConfigureGlobal(config *CommandConfig) {
settingsMutex.Lock()
defer settingsMutex.Unlock()
if config.Timeout != 0 {
globalSettings.Timeout = time.Duration(config.Timeout) * time.Millisecond
}
if config.MaxConcurrentRequests != 0 {
globalSettings.MaxConcurrentRequests = config.MaxConcurrentRequests
}
if config.RequestVolumeThreshold != 0 {
globalSettings.RequestVolumeThreshold = uint64(config.RequestVolumeThreshold)
}
if config.SleepWindow != 0 {
globalSettings.SleepWindow = time.Duration(config.SleepWindow) * time.Millisecond
}
if config.ErrorPercentThreshold != 0 {
globalSettings.ErrorPercentThreshold = config.ErrorPercentThreshold
}
}
func init() {
circuitSettings = make(map[string]*Settings)
settingsMutex = &sync.RWMutex{}
}
// Configure applies settings for a set of circuits
func Configure(cmds map[string]CommandConfig) {
for k, v := range cmds {
ConfigureCommand(k, v)
}
}
// ConfigureCommand applies settings for a circuit
func ConfigureCommand(name string, config CommandConfig) {
settingsMutex.Lock()
defer settingsMutex.Unlock()
timeout := globalSettings.Timeout
if config.Timeout != 0 {
timeout = time.Duration(config.Timeout) * time.Millisecond
}
max := globalSettings.MaxConcurrentRequests
if config.MaxConcurrentRequests != 0 {
max = config.MaxConcurrentRequests
}
volume := globalSettings.RequestVolumeThreshold
if config.RequestVolumeThreshold != 0 {
volume = uint64(config.RequestVolumeThreshold)
}
sleep := globalSettings.SleepWindow
if config.SleepWindow != 0 {
sleep = time.Duration(config.SleepWindow) * time.Millisecond
}
errorPercent := globalSettings.ErrorPercentThreshold
if config.ErrorPercentThreshold != 0 {
errorPercent = config.ErrorPercentThreshold
}
circuitSettings[name] = &Settings{
Timeout: timeout,
MaxConcurrentRequests: max,
RequestVolumeThreshold: volume,
SleepWindow: sleep,
ErrorPercentThreshold: errorPercent,
}
}
func getSettings(name string) *Settings {
settingsMutex.RLock()
s, exists := circuitSettings[name]
settingsMutex.RUnlock()
if !exists {
ConfigureCommand(name, CommandConfig{})
s = getSettings(name)
}
return s
}
func GetCircuitSettings() map[string]*Settings {
copy := make(map[string]*Settings)
settingsMutex.RLock()
for key, val := range circuitSettings {
copy[key] = val
}
settingsMutex.RUnlock()
return copy
}
I found an error when travis run Gimme 1.6.0, it works well with 1.6.1.
I tried gimme 1.6.0
on my own laptop, it didn't install Golang 1.6.0, but other versions were okay.
I want to check if there are there any plans for supporting Prometheus metrics export? If no one is working on it I can also try and raise a PR for the same.
If the callback returns an error (without having a fallback) an error is sent here: https://github.com/afex/hystrix-go/blob/master/hystrix/hystrix.go#L96.
If a timeout in timer.C
occurs, an error will be sent here: https://github.com/afex/hystrix-go/blob/master/hystrix/hystrix.go#L125
Both errors can be triggered on the same call (i.e. if a timeout occurs and later the callback returns an error without a fallback function). If the error channel is closed after the caller receives the first error, the second error won't be able to be sent on a closed channel and panic. If this is expected behavior, should we document it here: https://github.com/afex/hystrix-go#waiting-for-output ? The workaround is not to close the channel and let it GC the second error.
This is not a bug but more of a query. I could find any other channel so posting it here.
Can you please explain what is the difference between DefaultVolumeThreshold
and DefaultErrorPercentThreshold
? What is the time window used for both?
Let's say I have set DefaultVolumeThreshold
to 10 , DefaultErrorPercentThreshold
to 5 and rolling time window of 1 second. In 1 second let's say all 6 requests failed will the circuit open as it is crossing the DefaultErrorPercentThreshold
? What will be behavior other way around if I set DefaultVolumeThreshold
to 5 and DefaultErrorPercentThreshold
to 10 and all 6 requests failed? Will the circuit open in this case?
when I Register metric collector,I want to know the metric data in callback belongs to which name.
but Update func has no any more name info.
I am trying to figure out whether using hystrix-go I can run hystrix stream event as same as the app server port. Currently if I give same values of port to both, the server fails to start.
One way I can think is to create an api on the server with the route /hystrix.stream
and run it as a normal api. What I am not sure is how to handle the request. I got this idea from some java projects but the hystrix library for java provides functions for handling that request.
Has anyone figured this out before? Thanks.
In order to calibrate our circuits and roll them out safely, we'd like to run circuits in a way that does not actually block requests, but just logs circuit metrics like request times, number of concurrent requests, and error rate (basically any metric that could trigger timeouts or the circuit opening). The idea would be to look at these metrics and set reasonable thresholds.
AFAICT, it's fairly difficult to get these metrics and log them. The two options I see are
plugins
, e.g. write/register something very similar to DefaultMetricCollector
and log metrics like error rate. This isn't a complete solution, though, since the number of concurrent requests lives in the executorPool
, which AFAICT is independent of the metric collectors.
Serve up the SSE port, and in the same app set up a client to the SSE port that logs circuit metrics. This would provide all of the metrics that could trigger circuits opening, but seems needlessly complex...
Is there any other option? If not, is there any change to hystrix-go that you'd be open to accepting to accommodate the use case we have in mind? Thanks!
https://github.com/afex/hystrix-go/blob/master/hystrix/metrics.go#L57
Not sure that this was intended, but it looks like a fallback-success event would result in the error incrementing.
Currently running into an issue where I want to have a run function for the hystrix command to be returned from another function so I can use it multiple times within the package. I create a named type type fooFunc func() error
to supply to the hystrix Do and Go commands, but my program will not compile complaining cannot use f (type fooFunc) as type hystrix.runFunc in argument to hystrix.Do
.
If runFuc were exported I would be able to use it as a return argument in my package and reuse my functions as necessary.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.