RSS in action

Let's take the concept of Rich Site Summary / Really Simple Syndication (RSS) and inject some real potential delays to identify where we can best utilize goroutines in an effort to speed up execution and prevent blocking code. One common way to bring real-life, potentially blocking application elements into your code is to use something involving network transmission.

This is also a great place to look at timeouts and close channels to ensure that our program doesn't fall apart if something takes too long.

To accomplish both these requirements, we'll build a very basic RSS reader that will simply parse through and grab the contents of five RSS feeds. We'll read each of these as well as the provided links on each, and then we'll generate an SVG report of the process available via HTTP.

Note

This is obviously an application best suited for a background task—you'll notice that each request can take a long time. However, for graphically representing a real-life process working with and without concurrency, it will work, especially with a single end user. We'll also log our steps to standard output, so be sure to take a look at your console as well.

For this example, we'll again use a third-party library, although it's entirely possible to parse RSS using Go's built-in XML package. Given the open-ended nature of XML and the specificity of RSS, we'll bypass them and use go-pkg-rss by Jim Teeuwen, available via the following go get command:

go get github.com/jteeuwen/go-pkg-rss

While this package is specifically intended as a replacement for the Google Reader product, which means that it does interval-based polling for new content within a set collection of sources, it also has a fairly neat and tidy RSS reading implementation. There are a few other RSS parsing libraries out there, though, so feel free to experiment.

An RSS reader with self diagnostics

Let's take a look at what we've learned so far, and use it to fetch and parse a set of RSS feeds concurrently while returning some visual feedback about the process in an internal web browser, as shown in the following code:

package main

import(
  "github.com/ajstarks/svgo"
  rss "github.com/jteeuwen/go-pkg-rss"    
  "net/http"
  "log"
  "fmt"
  "strconv"
  "time"
  "os"
  "sync"
  "runtime"
)

type Feed struct {
  url string
  status int
  itemCount int
  complete bool
  itemsComplete bool
  index int
}

Here is the basis of our feed's overall structure: we have a url variable that represents the feed's location, a status variable to indicate whether it's started, and a complete Boolean variable to indicate it's finished. The next piece is an individual FeedItem; here's how it can be laid out:

type FeedItem struct {
  feedIndex int
  complete bool
  url string
}

Meanwhile, we will not do much with individual items; at this point, we simply maintain a URL, whether it's complete or a FeedItem struct's index.

var feeds []Feed
var height int
var width int
var colors []string
var startTime int64
var timeout int
var feedSpace int

var wg sync.WaitGroup

func grabFeed(feed *Feed, feedChan chan bool, osvg *svg.SVG) {


  startGrab := time.Now().Unix()
  startGrabSeconds := startGrab - startTime

  fmt.Println("Grabbing feed",feed.url," 
    at",startGrabSeconds,"second mark")

  if feed.status == 0 {
    fmt.Println("Feed not yet read")
    feed.status = 1

    startX := int(startGrabSeconds * 33);
    startY := feedSpace * (feed.index)

    fmt.Println(startY)
    wg.Add(1)

    rssFeed := rss.New(timeout, true, channelHandler, 
      itemsHandler);

    if err := rssFeed.Fetch(feed.url, nil); err != nil {
      fmt.Fprintf(os.Stderr, "[e] %s: %s", feed.url, err)
      return
    } else {



      endSec := time.Now().Unix()    
      endX := int( (endSec - startGrab) )
      if endX == 0 {
        endX = 1
      }
      fmt.Println("Read feed in",endX,"seconds")
      osvg.Rect(startX,startY,endX,feedSpace,"fill: 
        #000000;opacity:.4")
      wg.Wait()
      
      endGrab := time.Now().Unix()
      endGrabSeconds := endGrab - startTime
      feedEndX := int(endGrabSeconds * 33);      

      osvg.Rect(feedEndX,startY,1,feedSpace,"fill:#ff0000;opacity:.9")

      feedChan <- true
    }


  }else if feed.status == 1{
    fmt.Println("Feed already in progress")
  }



}

The grabFeed() method directly controls the flow of grabbing any individual feed. It also bypasses potential concurrent duplication through the WaitGroup struct. Next, let's check out the itemsHandler function:

func channelHandler(feed *rss.Feed, newchannels []*rss.Channel) {

}

func itemsHandler(feed *rss.Feed, ch *rss.Channel, newitems []*rss.Item) {

  fmt.Println("Found",len(newitems),"items in",feed.Url)


  for i := range newitems {
    url := *newitems[i].Guid
    fmt.Println(url)

  }

  wg.Done()
}

The itemsHandler function doesn't do much at this point, other than instantiating a new FeedItem struct—in the real world, we'd take this as the next step and retrieve the values of the items themselves. Our next step is to look at the process that grabs individual feeds and marks the time taken for each one, as follows:

func getRSS(rw http.ResponseWriter, req *http.Request) {
  startTime = time.Now().Unix()  
  rw.Header().Set("Content-Type", "image/svg+xml")
  outputSVG := svg.New(rw)
  outputSVG.Start(width, height)

  feedSpace = (height-20) / len(feeds)

  for i:= 0; i < 30000; i++ {
    timeText := strconv.FormatInt(int64(i/10),10)
    if i % 1000 == 0 {
      outputSVG.Text(i/30,390,timeText,"text-anchor:middle;font-
        size:10px;fill:#000000")      
    }else if i % 4 == 0 {
      outputSVG.Circle(i,377,1,"fill:#cccccc;stroke:none")  
    }


    if i % 10 == 0 {
      outputSVG.Rect(i,0,1,400,"fill:#dddddd")
    }
    if i % 50 == 0 {
      outputSVG.Rect(i,0,1,400,"fill:#cccccc")
    }
  

  }

  feedChan := make(chan bool, 3)

  for i := range feeds {

    outputSVG.Rect(0, (i*feedSpace), width, feedSpace, 
      "fill:"+colors[i]+";stroke:none;")
    feeds[i].status = 0
    go grabFeed(&feeds[i], feedChan, outputSVG)
    <- feedChan
  }

  outputSVG.End()
}

Here, we retrieve the RSS feed and mark points on our SVG with the status of our retrieval and read events. Our main() function will primarily handle the setup of feeds, as follows:

func main() {

  runtime.GOMAXPROCS(2)

  timeout = 1000

  width = 1000
  height = 400

  feeds = append(feeds, Feed{index: 0, url: 
    "https://groups.google.com/forum/feed/golang-
    nuts/msgs/rss_v2_0.xml?num=50", status: 0, itemCount: 0, 
    complete: false, itemsComplete: false})
  feeds = append(feeds, Feed{index: 1, url: 
    "http://www.reddit.com/r/golang/.rss", status: 0, itemCount: 
    0, complete: false, itemsComplete: false})
  feeds = append(feeds, Feed{index: 2, url: 
    "https://groups.google.com/forum/feed/golang-
    dev/msgs/rss_v2_0.xml?num=50", status: 0, itemCount: 0, 
    complete: false, itemsComplete: false })

Here is our slice of FeedItem structs:

  colors = append(colors,"#ff9999")
  colors = append(colors,"#99ff99")
  colors = append(colors,"#9999ff")  

In the print version, these colors may not be particularly useful, but testing it on your system will allow you to delineate between events inside the application. We'll need an HTTP route to act as an endpoint; here's how we'll set that up:

  http.Handle("/getrss", http.HandlerFunc(getRSS))
    err := http.ListenAndServe(":1900", nil)
    if err != nil {
        log.Fatal("ListenAndServe:", err)
    }  
}

When run, you should see the start and duration of the RSS feed retrieval and parsing, followed by a thin line indicating that the feed has been parsed and all items read.

Each of the three blocks expresses the full time to process each feed, demonstrating the nonconcurrent execution of this version, as shown in the following screenshot:

An RSS reader with self diagnostics

Note that we don't do anything interesting with the feed items, we simply read the URL. The next step will be to grab the items via HTTP, as shown in the following code snippet:

  url := *newitems[i].Guid
      response, _, err := http.Get(url)
      if err != nil {

      }

With this example, we stop at every step to provide some sort of feedback to the SVG that some event has occurred. Our channel here is buffered and we explicitly state that it must receive three Boolean messages before it can finish blocking, as shown in the following code snippet:

  feedChan := make(chan bool, 3)

  for i := range feeds {

    outputSVG.Rect(0, (i*feedSpace), width, feedSpace, 
      "fill:"+colors[i]+";stroke:none;")
    feeds[i].status = 0
    go grabFeed(&feeds[i], feedChan, outputSVG)
    <- feedChan
  }

  outputSVG.End()

By giving 3 as the second parameter in our channel invocation, we tell Go that this channel must receive three responses before continuing the application. You should use caution with this, though, particularly in setting things explicitly as we have done here. What if one of the goroutines never sent a Boolean across the channel? The application would crash.

Note that we also increased our timeline here, from 800ms to 60 seconds, to allow for retrieval of all feeds. Keep in mind that if our script exceeds 60 seconds, all actions beyond that time will occur outside of this visual timeline representation.

By implementing the WaitGroup struct while reading feeds, we impose some serialization and synchronization to the application. The second feed will not start until the first feed has completed retrieving all URLs. You can probably see where this might introduce some errors going forward:

    wg.Add(1)
    rssFeed := rss.New(timeout, true, channelHandler, 
      itemsHandler);
    …
    wg.Wait()

This tells our application to yield until we set the Done() command from the itemsHandler() function.

So what happens if we remove WaitGroups entirely? Given that the calls to grab the feed items are asynchronous, we may not see the status of all of our RSS calls; instead, we might see just one or two feeds or no feed at all.

Imposing a timeout

So what happens if nothing runs within our timeline? As you might expect, we'll get three bars with no activity in them. It's important to consider how to kill processes that aren't doing what we expect them to. In this case, the best method is a timeout. The Get method in the http package does not natively support a timeout, so you'll have to roll your own rssFeed.Fetch (and underlying http.Get()) implementation if you want to prevent these requests from going into perpetuity and killing your application. We'll dig into this a bit later; in the mean time, take a look at the Transport struct, available in the core http package at http://golang.org/pkg/net/http/#Transport.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.16.217.187