Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
timvaillancourt committed Aug 15, 2024
2 parents 653a951 + 48cb9ab commit 598c2f0
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 25 deletions.
4 changes: 0 additions & 4 deletions doc/command-line-flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,6 @@ List of metrics and threshold values; topping the threshold of any will cause th

Typically `gh-ost` is used to migrate tables on a master. If you wish to only perform the migration in full on a replica, connect `gh-ost` to said replica and pass `--migrate-on-replica`. `gh-ost` will briefly connect to the master but otherwise will make no changes on the master. Migration will be fully executed on the replica, while making sure to maintain a small replication lag.

### mysql-wait-timeout

If set to a value greater than zero, causes `gh-ost` to set a provided [MySQL `wait_timeout`](https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_wait_timeout) for MySQL sessions opened by `gh-ost`, specified in seconds.

### postpone-cut-over-flag-file

Indicate a file name, such that the final [cut-over](cut-over.md) step does not take place as long as the file exists.
Expand Down
1 change: 1 addition & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ type MigrationContext struct {
Hostname string
AssumeMasterHostname string
ApplierTimeZone string
ApplierWaitTimeout int64
TableEngine string
RowsEstimate int64
RowsDeltaEstimate int64
Expand Down
1 change: 0 additions & 1 deletion go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func main() {
flag.StringVar(&migrationContext.AssumeMasterHostname, "assume-master-host", "", "(optional) explicitly tell gh-ost the identity of the master. Format: some.host.com[:port] This is useful in master-master setups where you wish to pick an explicit master, or in a tungsten-replicator where gh-ost is unable to determine the master")
flag.IntVar(&migrationContext.InspectorConnectionConfig.Key.Port, "port", 3306, "MySQL port (preferably a replica, not the master)")
flag.Float64Var(&migrationContext.InspectorConnectionConfig.Timeout, "mysql-timeout", 0.0, "Connect, read and write timeout for MySQL")
flag.Float64Var(&migrationContext.InspectorConnectionConfig.WaitTimeout, "mysql-wait-timeout", 0.0, "wait_timeout for MySQL sessions")
flag.StringVar(&migrationContext.CliUser, "user", "", "MySQL user")
flag.StringVar(&migrationContext.CliPassword, "password", "", "MySQL password")
flag.StringVar(&migrationContext.CliMasterUser, "master-user", "", "MySQL user on master, if different from that on replica. Requires --assume-master-host")
Expand Down
40 changes: 35 additions & 5 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (this *Applier) InitDBConnections() (err error) {
return err
}
this.migrationContext.ApplierMySQLVersion = version
if err := this.validateAndReadTimeZone(); err != nil {
if err := this.validateAndReadGlobalVariables(); err != nil {
return err
}
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL {
Expand All @@ -106,10 +106,13 @@ func (this *Applier) InitDBConnections() (err error) {
return nil
}

// validateAndReadTimeZone potentially reads server time-zone
func (this *Applier) validateAndReadTimeZone() error {
query := `select /* gh-ost */ @@global.time_zone`
if err := this.db.QueryRow(query).Scan(&this.migrationContext.ApplierTimeZone); err != nil {
// validateAndReadGlobalVariables potentially reads server global variables, such as the time_zone and wait_timeout.
func (this *Applier) validateAndReadGlobalVariables() error {
query := `select /* gh-ost */ @@global.time_zone, @@global.wait_timeout`
if err := this.db.QueryRow(query).Scan(
&this.migrationContext.ApplierTimeZone,
&this.migrationContext.ApplierWaitTimeout,
); err != nil {
return err
}

Expand Down Expand Up @@ -934,6 +937,27 @@ func (this *Applier) CreateAtomicCutOverSentryTable() error {
return nil
}

// InitAtomicCutOverWaitTimeout sets the cut-over session wait_timeout in order to reduce the
// time an unresponsive (but still connected) gh-ost process can hold the cut-over lock.
func (this *Applier) InitAtomicCutOverWaitTimeout(tx *gosql.Tx) error {
cutOverWaitTimeoutSeconds := this.migrationContext.CutOverLockTimeoutSeconds * 3
this.migrationContext.Log.Infof("Setting cut-over idle timeout as %d seconds", cutOverWaitTimeoutSeconds)
query := fmt.Sprintf(`set /* gh-ost */ session wait_timeout:=%d`, cutOverWaitTimeoutSeconds)
_, err := tx.Exec(query)
return err
}

// RevertAtomicCutOverWaitTimeout restores the original wait_timeout for the applier session post-cut-over.
func (this *Applier) RevertAtomicCutOverWaitTimeout() {
this.migrationContext.Log.Infof("Reverting cut-over idle timeout to %d seconds", this.migrationContext.ApplierWaitTimeout)
query := fmt.Sprintf(`set /* gh-ost */ session wait_timeout:=%d`, this.migrationContext.ApplierWaitTimeout)
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
this.migrationContext.Log.Errorf("Failed to restore applier wait_timeout to %d seconds: %v",
this.migrationContext.ApplierWaitTimeout, err,
)
}
}

// AtomicCutOverMagicLock
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error) error {
tx, err := this.db.Begin()
Expand Down Expand Up @@ -979,6 +1003,12 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
return err
}

if err := this.InitAtomicCutOverWaitTimeout(tx); err != nil {
tableLocked <- err
return err
}
defer this.RevertAtomicCutOverWaitTimeout()

query = fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write, %s.%s write`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
Expand Down
14 changes: 9 additions & 5 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,12 +362,16 @@ func (this *Migrator) Migrate() (err error) {
// In MySQL 8.0 (and possibly earlier) some DDL statements can be applied instantly.
// Attempt to do this if AttemptInstantDDL is set.
if this.migrationContext.AttemptInstantDDL {
this.migrationContext.Log.Infof("Attempting to execute alter with ALGORITHM=INSTANT")
if err := this.applier.AttemptInstantDDL(); err == nil {
this.migrationContext.Log.Infof("Success! table %s.%s migrated instantly", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
return nil
if this.migrationContext.Noop {
this.migrationContext.Log.Debugf("Noop operation; not really attempting instant DDL")
} else {
this.migrationContext.Log.Infof("ALGORITHM=INSTANT not supported for this operation, proceeding with original algorithm: %s", err)
this.migrationContext.Log.Infof("Attempting to execute alter with ALGORITHM=INSTANT")
if err := this.applier.AttemptInstantDDL(); err == nil {
this.migrationContext.Log.Infof("Success! table %s.%s migrated instantly", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
return nil
} else {
this.migrationContext.Log.Infof("ALGORITHM=INSTANT not supported for this operation, proceeding with original algorithm: %s", err)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion go/logic/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
return setThrottle(false, "", base.NoThrottleReasonHint)
}

// initiateThrottlerMetrics initiates the various processes that collect measurements
// initiateThrottlerCollection initiates the various processes that collect measurements
// that may affect throttling. There are several components, all running independently,
// that collect such metrics.
func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan<- bool) {
Expand Down
2 changes: 1 addition & 1 deletion go/mysql/binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type BinlogCoordinates struct {
EventSize int64
}

// ParseInstanceKey will parse an InstanceKey from a string representation such as 127.0.0.1:3306
// ParseBinlogCoordinates will parse an InstanceKey from a string representation such as 127.0.0.1:3306
func ParseBinlogCoordinates(logFileLogPos string) (*BinlogCoordinates, error) {
tokens := strings.SplitN(logFileLogPos, ":", 2)
if len(tokens) != 2 {
Expand Down
5 changes: 0 additions & 5 deletions go/mysql/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ type ConnectionConfig struct {
Timeout float64
TransactionIsolation string
Charset string
WaitTimeout float64
}

func NewConnectionConfig() *ConnectionConfig {
Expand All @@ -52,7 +51,6 @@ func (this *ConnectionConfig) DuplicateCredentials(key InstanceKey) *ConnectionC
Timeout: this.Timeout,
TransactionIsolation: this.TransactionIsolation,
Charset: this.Charset,
WaitTimeout: this.WaitTimeout,
}
config.ImpliedKey = &config.Key
return config
Expand Down Expand Up @@ -141,9 +139,6 @@ func (this *ConnectionConfig) GetDBUri(databaseName string) string {
fmt.Sprintf("readTimeout=%fs", this.Timeout),
fmt.Sprintf("writeTimeout=%fs", this.Timeout),
}
if this.WaitTimeout > 0 {
connectionParams = append(connectionParams, fmt.Sprintf("wait_timeout=%fs", this.WaitTimeout))
}

return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?%s", this.User, this.Password, hostname, this.Key.Port, databaseName, strings.Join(connectionParams, "&"))
}
4 changes: 1 addition & 3 deletions go/mysql/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ func TestGetDBUri(t *testing.T) {
c.User = "gromit"
c.Password = "penguin"
c.Timeout = 1.2345
c.WaitTimeout = 0 // should be ignored
c.TransactionIsolation = transactionIsolation
c.Charset = "utf8mb4,utf8,latin1"

Expand All @@ -96,11 +95,10 @@ func TestGetDBUriWithTLSSetup(t *testing.T) {
c.User = "gromit"
c.Password = "penguin"
c.Timeout = 1.2345
c.WaitTimeout = 60
c.tlsConfig = &tls.Config{}
c.TransactionIsolation = transactionIsolation
c.Charset = "utf8mb4_general_ci,utf8_general_ci,latin1"

uri := c.GetDBUri("test")
test.S(t).ExpectEquals(uri, `gromit:penguin@tcp(myhost:3306)/test?autocommit=true&interpolateParams=true&charset=utf8mb4_general_ci,utf8_general_ci,latin1&tls=ghost&transaction_isolation="REPEATABLE-READ"&timeout=1.234500s&readTimeout=1.234500s&writeTimeout=1.234500s&wait_timeout=60.000000s`)
test.S(t).ExpectEquals(uri, `gromit:penguin@tcp(myhost:3306)/test?autocommit=true&interpolateParams=true&charset=utf8mb4_general_ci,utf8_general_ci,latin1&tls=ghost&transaction_isolation="REPEATABLE-READ"&timeout=1.234500s&readTimeout=1.234500s&writeTimeout=1.234500s`)
}

0 comments on commit 598c2f0

Please sign in to comment.