1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
use std::io::{self, BufRead, BufReader, Write};
use std::net::{Shutdown, TcpStream};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{self, Sender, Receiver};
use std::thread;

use encoding::{DecoderTrap, EncodingRef, EncoderTrap};
use std::time::Duration;

use message::{Message, ParseError};

/// This is the comprehensive set of events that can occur.
#[derive(Debug)]
pub enum Event {
    /// Connection was manually closed. The string is the reason.
    Closed(&'static str),
    /// Connection has dropped.
    Disconnected,
    /// Message from the IRC server.
    Message(Message),
    /// Error parsing a message from the server.
    ///
    /// This can probably be ignored, and it shouldn't ever happen, really.
    /// If you catch this you should probably open an issue on GitHub.
    ParseError(ParseError),
    /// Connection was sucessfully restored.
    Reconnected,
    /// Attempting to restore connection.
    Reconnecting,
    /// An error occured trying to restore the connection.
    ///
    /// This is normal in poor network conditions. It might take
    /// a few attempts before the connection can be restored.
    ReconnectionError(io::Error),
}

/// This the receiving end of a `mpsc` channel.
///
/// If is closed/dropped, the connection will also be dropped,
/// as there isn't anyone listening to the events anymore.
pub type Reader = Receiver<Event>;

/// Errors produced by the Writer.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum Error {
    /// Connection is already closed.
    AlreadyClosed,
    /// Connection is already disconnected.
    AlreadyDisconnected,
    /// Connection was manually closed.
    Closed,
    /// Connection was dropped.
    ///
    /// A reconnection might be in process.
    Disconnected,
}

enum StreamStatus {
    // The stream was closed manually.
    Closed,
    // The stream is connected.
    Connected(TcpStream),
    // The stream is disconnected, an attempt to reconnect will be made.
    Disconnected,
}

/// Used to send messages to the IRC server.
///
/// This object is thread safe. You can clone it and send the clones to other
/// threads. You can write from multiple threads without any issue. Internally,
/// it uses `Arc` and `Mutex`.
#[derive(Clone)]
pub struct Writer {
    stream: Arc<Mutex<StreamStatus>>,
    encoding: EncodingRef,
}

impl Writer {

    fn new(stream: TcpStream, encoding: EncodingRef) -> Writer {
        Writer {
            stream: Arc::new(Mutex::new(StreamStatus::Connected(stream))),
            encoding: encoding,
        }
    }

    fn set_connected(&self, stream: TcpStream) {
        *self.stream.lock().unwrap() = StreamStatus::Connected(stream);
    }

    fn set_disconnected(&self) {
        *self.stream.lock().unwrap() = StreamStatus::Disconnected;
    }

    /// Drop the connection and trigger the reconnection process.
    ///
    /// There might be a reconnection attempt, based on your settings.
    /// This should be used if you want the connection to be re-created.
    /// This is not the preferred way of shutting down the connection
    /// for good. Use `close` for this.
    pub fn disconnect(&self) -> Result<(), Error> {
        let mut status = self.stream.lock().unwrap();

        match *status {
            StreamStatus::Closed => {
                return Err(Error::Closed);
            }
            StreamStatus::Connected(ref mut stream) => {
                let _ = stream.shutdown(Shutdown::Both);
            }
            StreamStatus::Disconnected => {
                return Err(Error::AlreadyDisconnected);
            }
        }

        *status = StreamStatus::Disconnected;
        Ok(())
    }

    /// Check if the connection was manually closed.
    pub fn is_closed(&self) -> bool {
        match *self.stream.lock().unwrap() {
            StreamStatus::Closed => true,
            _ => false,
        }
    }

    /// Close the connection and stop listening for messages.
    ///
    /// There will not be any reconnection attempt.
    /// An error will be returned if the connection is already closed.
    pub fn close(&self) -> Result<(), Error> {
        let mut status = self.stream.lock().unwrap();

        match *status {
            StreamStatus::Closed => {
                return Err(Error::AlreadyClosed);
            }
            StreamStatus::Connected(ref mut stream) => {
                let _ = stream.shutdown(Shutdown::Both);
            }
            _ => {}
        }

        *status = StreamStatus::Closed;
        Ok(())
    }

    /// Send a raw string to the IRC server.
    ///
    /// A new line will be not be added, so make sure that you include it.
    /// An error will be returned if the client is disconnected.
    pub fn raw<S: AsRef<str>>(&self, data: S) -> Result<(), Error> {
        let mut status = self.stream.lock().unwrap();
        let mut failed = false;

        match *status {
            StreamStatus::Closed => {
                return Err(Error::Closed);
            }
            StreamStatus::Connected(ref mut stream) => {
                // Try to write to the stream.
                let bytes = self.encoding.encode(data.as_ref(), EncoderTrap::Ignore).unwrap();
                if stream.write(&bytes).is_err() {
                    // The write failed, shutdown the connection.
                    let _ = stream.shutdown(Shutdown::Both);
                    failed = true;
                }
            }
            StreamStatus::Disconnected => {
                return Err(Error::Disconnected);
            }
        }

        if failed {
            // The write failed, change the status.
            *status = StreamStatus::Disconnected;
            Err(Error::Disconnected)
        } else {
            // The write did not fail.
            Ok(())
        }
    }

}

impl Into<Event> for Result<Message, ParseError> {

    fn into(self) -> Event {
        match self {
            Ok(msg) => Event::Message(msg),
            Err(err) => Event::ParseError(err),
        }
    }

}

/// These settings tell the reconnection process how to behave.
///
/// Default is implemented for this type, with fairly sensible settings.
/// See the Default trait implementation.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum ReconnectionSettings {
    /// Don't try to reconnect after failure.
    DoNotReconnect,
    /// Reconnect
    Reconnect {
        /// After trying this amount of times, it will stop trying.
        ///
        /// A value of 0 means infinite attempts.
        max_attempts: u32,
        /// Wait time between two attempts to reconnect in milliseconds.
        ///
        /// Note that if the computer's network is still unavailable, the connect
        /// call might block for about a minute until it fails. Somtimes, it fails
        /// instantly because it cannot resolve the hostname. You should probably
        /// leave at least a second of delay, so that it doesn't loop really fast
        /// while getting hostname resolution errors. You can watch the stream of
        /// errors via the ReconnectionError event.
        delay_between_attempts: Duration,
        /// Wait time after disconnection, before trying to reconnect.
        delay_after_disconnect: Duration,
    }
}

/// Default settings are provided for this enum.
///
/// They are:
///
/// `max_attempts` = 10
///
/// `delay_between_attempts` = 5 seconds
///
/// `delay_after_disconnect` = 60 seconds
impl Default for ReconnectionSettings {

    fn default() -> ReconnectionSettings {
        ReconnectionSettings::Reconnect {
            max_attempts: 10,
            delay_between_attempts: Duration::from_secs(5),
            delay_after_disconnect: Duration::from_secs(60),
        }
    }

}

fn reconnect(address: &str, handle: &Writer) -> io::Result<(BufReader<TcpStream>)> {
    let stream = try!(TcpStream::connect(address));
    let reader = BufReader::new(try!(stream.try_clone()));
    handle.set_connected(stream);
    Ok((reader))
}

fn reader_thread(address: String, mut reader: BufReader<TcpStream>,
                                   event_sender: Sender<Event>, handle: Writer,
                                   reco_settings: ReconnectionSettings,
                                   encoding: EncodingRef) {
    'read: loop {
        let mut buff = Vec::new();
        let res = reader.read_until(b'\n', &mut buff);

        // If there's an error or a zero length read, we should check to reconnect or exit.
        // If the size is 0, it means that the socket was shutdown.
        if res.is_err() || res.unwrap() == 0 {
            // If the stream has the closed status, the stream was manually closed.
            if handle.is_closed() {
                let _ = event_sender.send(Event::Closed("manually closed"));
                break;
            } else {
                // The stream was not closed manually, see what we should do.

                // Set the disconnected status on the writer.
                handle.set_disconnected();

                if event_sender.send(Event::Disconnected).is_err() {
                    break;
                }

                // Grab the reconnection settings or break the loop if no reconnection is desired.
                let (max_attempts, delay_between_attempts, delay_after_disconnect) = match reco_settings {
                    ReconnectionSettings::DoNotReconnect => {
                        let _ = handle.close();
                        let _ = event_sender.send(Event::Closed("do not reconnect"));
                        break;
                    }
                    ReconnectionSettings::Reconnect{ max_attempts,
                                                     delay_between_attempts,
                                                     delay_after_disconnect } => {
                        (max_attempts, delay_between_attempts, delay_after_disconnect)
                    }
                };

                thread::sleep(delay_after_disconnect);

                let mut attempts = 0u32;

                // Loop until reconnection is successful.
                'reconnect: loop {

                    // If max_attempts is zero, it means an infinite amount of attempts.
                    if max_attempts > 0 {
                        attempts += 1;
                        if attempts > max_attempts {
                            let _ = handle.close();
                            let _ = event_sender.send(Event::Closed("max attempts reached"));
                            break 'read;
                        }
                    }

                    if event_sender.send(Event::Reconnecting).is_err() {
                        break 'read;
                    }

                    // Try to reconnect.
                    match reconnect(&address, &handle) {
                        // Sucess, send event, and update reader.
                        Ok(new_reader) => {
                            reader = new_reader;
                            if event_sender.send(Event::Reconnected).is_err() {
                                break 'read;
                            }

                            break 'reconnect;
                        }
                        // Error, send event.
                        Err(err) => {
                            if event_sender.send(Event::ReconnectionError(err)).is_err() {
                                break 'read;
                            }
                        }
                    }
                    // sleep until we try to reconnect again
                    thread::sleep(delay_between_attempts);
                }
            }
        } else {
            // decode the message
            let line = encoding.decode(&buff, DecoderTrap::Ignore).unwrap();
            // Size is bigger than 0, try to parse the message. Send the result in the channel.
            if event_sender.send(Message::parse(&line).into()).is_err() {
                break;
            }
        }
    }

    // If we exited from a break (failed to send message through channel), we might not
    // have closed the stream cleanly. Do so if necessary.
    if !handle.is_closed() {
        let _ = handle.close();
    }
}

/// Create a connection to the given address.
///
/// A `Writer`/`Reader` pair is returned. If the connection fails,
/// an error is returned.
///
/// If you don't want to reconnect, use `ReconnectionSettings::DoNotReconnect`.
pub fn connect<A: AsRef<str>>(address: A, reco_settings: ReconnectionSettings, encoding: EncodingRef) -> io::Result<(Writer, Reader)> {
    let stream = try!(TcpStream::connect(address.as_ref()));
    let reader = BufReader::new(try!(stream.try_clone()));

    let (event_sender, event_reader) = mpsc::channel::<Event>();

    let writer = Writer::new(stream, encoding);
    // The reader thread needs a handle to modify the status.
    let reader_handle = writer.clone();

    let address_clone = address.as_ref().into();
    thread::spawn(move || {
        reader_thread(address_clone, reader, event_sender, reader_handle, reco_settings, encoding);
    });

    Ok((writer, event_reader))
}