Detecting file changes

Now we need to focus on our file listener. You may recall this is the part of the application that will accept client connections from our web server and our backup application and announce any changes to files.

The basic flow of this part is as follows:

  1. Listen for changes to files in a goroutine.
  2. Accept connections and add to the pool in a goroutine.
  3. If any changes are detected, announce them to the entire pool.

All three happen concurrently, and the first and the third can happen without any connections in the pool, although we assume there will be a connection that is always on with both our web server and our backup application.

Another critical role the file listener will fulfill is analyzing the directory on first load and reconciling it with our data store in Couchbase. Since the Go Couchbase library handles the get, update, and add operations, we won't need any custom views. In the following code, we'll examine the file listener process and show how we listen on a folder for changes:

package main

import
(
  "fmt"
  "github.com/howeyc/fsnotify"
  "net"
  "time"
  "io"  
  "io/ioutil"
  "github.com/couchbaselabs/go-couchbase"
  "crypto/md5"
  "encoding/hex"
  "encoding/json"  
  "strings"
  
)

var listenFolder = "mnt/sharedir"

type Client struct {
  ID int
  Connection *net.Conn  
}

Here, we've declared our shared folder as well as a connecting Client struct. In this application, Client is either a web listener or a backup listener, and we'll pass messages in one direction using the following JSON-encoded structure:

type File struct {
  Hash string "json:hash"
  Name string "json:file_name"
  Created int64 "json:created"
  CreatedUser  int "json:created_user"
  LastModified int64 "json:last_modified"
  LastModifiedUser int "json:last_modified_user"
  Revisions int "json:revisions"
  Version int "json:version"
}

If this looks familiar, it could be because it's also the example document format we set up initially.

Note

If you're not familiar with the syntactical sugar expressed earlier, these are known as struct tags. A tag is just a piece of additional metadata that can be applied to a struct field for key/value lookups via the reflect package. In this case, they're used to map our struct fields to JSON fields.

Let's first look at our overall Message struct:

type Message struct {
  Hash string "json:hash"
  Action string "json:action"
  Location string "json:location"  
  Name string "json:name"
  Version int "json:version"
}

We compartmentalize our file into a message, which alerts our other two processes of changes:

func generateHash(name string) string {

  hash := md5.New()
  io.WriteString(hash,name)
  hashString := hex.EncodeToString(hash.Sum(nil))

  return hashString
}

This is a somewhat unreliable method to generate a hash reference to a file and will fail if a filename changes. However, it allows us to keep track of files that are created, deleted, or modified.

Sending changes to clients

Here is the broadcast message that goes to all existing connections. We pass along our JSON-encoded Message struct with the current version, the current location, and the hash for reference. Our other servers will then react accordingly:

func alertServers(hash string, name string, action string, location string, version int) {

  msg := Message{Hash:hash,Action:action,Location:location,Name:name,Version:version}
  msgJSON,_ := json.Marshal(msg)

  fmt.Println(string(msgJSON))

  for i := range Clients {
    fmt.Println("Sending to clients")
    fmt.Fprintln(*Clients[i].Connection,string(msgJSON))
  }
}

Our backup server will create a copy of that file with the .[VERSION] extension in the backup folder.

Our web server will simply alert the user via our web interface that the file has changed:

func startServer(listener net.Listener) {
  for {  
    conn,err := listener.Accept()
    if err != nil {

    }
    currentClient := Client{ ID: 1, Connection: &conn}
    Clients = append(Clients,currentClient)
      for i:= range Clients {
        fmt.Println("Client",Clients[i].ID)
      }    
  }  

}

Does this code look familiar? We've taken almost our exact chat server Client handler and brought it over here nearly intact:

func removeFile(name string, bucket *couchbase.Bucket) {
  bucket.Delete(generateHash(name))
}

The removeFile function does one thing only and that's removing the file from our Couchbase data store. As it's reactive, we don't need to do anything on the file-server side because the file is already deleted. Also, there's no need to delete any backups, as this allows us to recover. Next, let's look at our function that updates an existing file:

func updateExistingFile(name string, bucket *couchbase.Bucket) int {
  fmt.Println(name,"updated")
  hashString := generateHash(name)
  
  thisFile := Files[hashString]
  thisFile.Hash = hashString
  thisFile.Name = name
  thisFile.Version = thisFile.Version + 1
  thisFile.LastModified = time.Now().Unix()
  Files[hashString] = thisFile
  bucket.Set(hashString,0,Files[hashString])
  return thisFile.Version
}

This function essentially overwrites any values in Couchbase with new ones, copying an existing File struct and changing the LastModified date:

func evalFile(event *fsnotify.FileEvent, bucket *couchbase.Bucket) {
  fmt.Println(event.Name,"changed")
  create := event.IsCreate()
  fileComponents := strings.Split(event.Name,"\")
  fileComponentSize := len(fileComponents)
  trueFileName := fileComponents[fileComponentSize-1]
  hashString := generateHash(trueFileName)

  if create == true {
    updateFile(trueFileName,bucket)
    alertServers(hashString,event.Name,"CREATE",event.Name,0)
  }
  delete := event.IsDelete()
  if delete == true {
    removeFile(trueFileName,bucket)
    alertServers(hashString,event.Name,"DELETE",event.Name,0)    
  }
  modify := event.IsModify()
  if modify == true {
    newVersion := updateExistingFile(trueFileName,bucket)
    fmt.Println(newVersion)
    alertServers(hashString,trueFileName,"MODIFY",event.Name,newVersion)
  }
  rename := event.IsRename()
  if rename == true {


  }
}

Here, we react to any changes to the filesystem in our watched directory. We aren't reacting to renames, but you can handle those as well. Here's how we'd approach the general updateFile function:

func updateFile(name string, bucket *couchbase.Bucket) {
  thisFile := File{}
  hashString := generateHash(name)
  
  thisFile.Hash = hashString
  thisFile.Name = name
  thisFile.Created = time.Now().Unix()
  thisFile.CreatedUser = 0
  thisFile.LastModified = time.Now().Unix()
  thisFile.LastModifiedUser = 0
  thisFile.Revisions = 0
  thisFile.Version = 1

  Files[hashString] = thisFile

  checkFile := File{}
  err := bucket.Get(hashString,&checkFile)
  if err != nil {
    fmt.Println("New File Added",name)
    bucket.Set(hashString,0,thisFile)
  }
}

Checking records against Couchbase

When it comes to checking for existing records against Couchbase, we check whether a hash exists in our Couchbase bucket. If it doesn't, we create it. If it does, we do nothing. To handle shutdowns more robustly, we should also ingest existing records into our application. The code for doing this is as follows:

var Clients []Client
var Files map[string] File


func main() {
  Files = make(map[string]File)
  endScript := make(chan bool)

  couchbaseClient, err := couchbase.Connect("http://localhost:8091/")
    if err != nil {
      fmt.Println("Error connecting to Couchbase", err)
    }
  pool, err := couchbaseClient.GetPool("default")
    if err != nil {
      fmt.Println("Error getting pool",err)
    }
  bucket, err := pool.GetBucket("file_manager")
    if err != nil {
      fmt.Println("Error getting bucket",err)
    }  


  files, _ := ioutil.ReadDir(listenFolder)
  for _, file := range files {
    updateFile(file.Name(),bucket)
  }

    dirSpy, err := fsnotify.NewWatcher()
    defer dirSpy.Close()

  listener, err := net.Listen("tcp", ":9000")
  if err != nil {
    fmt.Println ("Could not start server!",err)
  }

  go func() {
        for {
            select {
            case ev := <-dirSpy.Event:
                evalFile(ev,bucket)
            case err := <-dirSpy.Error:
                fmt.Println("error:", err)
            }
        }
    }()
    err = dirSpy.Watch(listenFolder)  
  startServer(listener)

  <-endScript
}

Finally, main() handles setting up our connections and goroutines, including a file watcher, our TCP server, and connecting to Couchbase.

Now, let's look at another step in the whole process where we will automatically create backups of our modified files.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.182.62