Consuming Server Sent Events in C#

Hey Guys!

I made a little example of what consuming events from the API might look like in C#. My C# is a little rusty, so I’m sure there are better ways to do this, and this example won’t do things like reconnect when the connection ends, etc.

Thanks,
David

4 Likes

Great, just what I needed :smile:

I am currently trying to get it working with JSON data, but that’s still a work in progress (It is starting to work tho :smiley:)

However, I have 1 problem. This command gives me an error:

((HttpWebRequest)request.AllowReadStreamBuffering) = false;

According to C# AllowReadStreamBuffering is missing a reference eventhough I am using .Net 4.5 (in which AllowReadStreamBuffering is included). I currently commented this out but this gives me all previously posted lines again if a new line is posted (which adds up quickly). Does anybody know how to fix this? (google isn’t helping :confused: )

Regards,
Indra

Hmm, It might want this instead (note the moved parens ()'s)

((HttpWebRequest)request).AllowReadStreamBuffering = false;
1 Like

Yes this gets accepted by c# but it doesn’t solve the reposting issue;
To demonstrate what I mean:

In this picture is the SSE stream of my spark core. I made exactly 3 adjustments, first I made R 255, then G en then B. As you can see, every time a new SSE comes in and gets written in the console, the previous lines are added again. Setting AllowReadStreamBuffering to false doesn’t solve this somehow.

Where are these previous values stored so I can delete them?

Hi @TheHawk1337,

Can you post your code? I think the code I posted would have different log output, so I’m guessing you’ve modified it a bit, so it’d hard for me to know what’s running exactly.

Thanks!
David

Yes ofcourse, please note that I used Json to send my data and I deserialize it in c#

using System;
using System.IO;
using System.Text;
using System.Collections.Generic;
using Newtonsoft.Json;
using System.Net;

public class RawSparkData
{
    public string data { get; set; }
    public string ttl { get; set; }
    public string published_at { get; set; }
    public string coreid { get; set; }
}

public class ParsedSparkData
{
    public int R { get; set; }
    public int G { get; set; }
    public int B { get; set; }
}


public class SSEvent
{
    public string Name { get; set; }
    public string Data { get; set; }
}



public class Application
{
    public static List<string> Queue = new List<string>(1024);


    public static void Main()
    {
        Console.Write("Hello World\n");
        Console.Write("Attempting to open stream\n");

        var response = Application.OpenSSEStream("https://api.spark.io/v1/devices/DEVICE ID/events?access_token=ACCES TOKEN");
        Console.Write("Success! \n");
    }


    public static Stream OpenSSEStream(string url)
    {
        /*
            Optionally ignore certificate errors
            ServicePointManager.ServerCertificateValidationCallback =
             new System.Net.Security.RemoteCertificateValidationCallback(AcceptAllCertifications);
        */


        var request = WebRequest.Create(new Uri(url));
        ((HttpWebRequest)request).AllowReadStreamBuffering = false;
        var response = request.GetResponse();
        var stream = response.GetResponseStream();

        Application.ReadStreamForever(stream);

        return stream;
    }

    public static void ReadStreamForever(Stream stream)
    {
        var encoder = new UTF8Encoding();
        var buffer = new byte[2048];
        while (true)
        {
            //TODO: Better evented handling of the response stream

            if (stream.CanRead)
            {
                int len = stream.Read(buffer, 0, 2048);
                if (len > 0)
                {
                    var text = encoder.GetString(buffer, 0, len);
                    Application.Push(text);
                }
            }
            //System.Threading.Thread.Sleep(250);
        }
    }

    public static void Push(string text)
    {
        if (String.IsNullOrWhiteSpace(text))
        {
            return;
        }

        var lines = text.Trim().Split('\n');
        Application.Queue.AddRange(lines);

        if (text.Contains("data:"))
        {
            Application.ProcessLines();
        }
    }

    public static void ProcessLines()
    {
        var lines = Application.Queue;

        SSEvent lastEvent = null;
        int index = 0;

        for (int i = 0; i < lines.Count; i++)
        {
            var line = lines[i];
            if (String.IsNullOrWhiteSpace(line))
            {
                continue;
            }
            line = line.Trim();

            if (line.StartsWith("event:"))
            {
                lastEvent = new SSEvent()
                {
                    Name = line.Replace("event:", String.Empty)
                };
            }
            else if (line.StartsWith("data:"))
            {
                if (lastEvent == null)
                {
                    continue;
                }


                lastEvent.Data = line.Replace("data:", String.Empty);

                var rawData = JsonConvert.DeserializeObject(lastEvent.Data);

                string json = Convert.ToString(rawData);

                //Console.WriteLine(json);

                RawSparkData rawdata = JsonConvert.DeserializeObject<RawSparkData>(json);


                //Console.WriteLine(rawdata.data);

                ParsedSparkData parseddata = JsonConvert.DeserializeObject<ParsedSparkData>(rawdata.data);

                int kleurR = parseddata.R;
                int kleurG = parseddata.G;
                int kleurB = parseddata.B;

                Console.WriteLine("R = " + kleurR + ", G = " + kleurG + ", B = " + kleurB);

                


                //Console.WriteLine("Found event: " + index);
                //Console.WriteLine("Name was: " + lastEvent.Name);
                //Console.WriteLine("Data was: " + lastEvent.Data);

                index++;
            }
        }
    }

    /*
        Optionally ignore certificate errors
 
    */
    public bool AcceptALlCertifications(object sender,
        System.Security.Cryptography.X509Certificates.X509Certificate cert,
        System.Security.Cryptography.X509Certificates.X509Chain chain,
        System.Net.Security.SslPolicyErrors errors)
    {
        return true;
    }
}
1 Like

Oh okay, I think I see my bug. I updated the gist with some logic around “lastEventIdx”. NOTE! I haven’t tested this locally, but the general idea is keeping track of the end of the last event data you found, and then trimming that off the queue so it doesn’t get output again and again.

I hope this helps!
Thanks,
David

2 Likes

Works amazing now, you’re the best David :smile:

Thanks a lot! :smile:

2 Likes