2 * Copyright (c) 2011, 2012, 2013, 2014 Spectra Logic Corporation
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions, and the following disclaimer,
10 * without modification.
11 * 2. Redistributions in binary form must reproduce at minimum a disclaimer
12 * substantially similar to the "NO WARRANTY" disclaimer below
13 * ("Disclaimer") and any redistribution must be conditioned upon
14 * including a substantially similar Disclaimer requirement for further
15 * binary redistribution.
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTIBILITY AND FITNESS FOR
21 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 * HOLDERS OR CONTRIBUTORS BE LIABLE FOR SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
26 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
27 * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGES.
30 * Authors: Justin T. Gibbs (Spectra Logic Corporation)
37 #include <sys/cdefs.h>
39 #include <sys/socket.h>
56 #include "event_factory.h"
57 #include "exception.h"
61 __FBSDID("$FreeBSD$");
63 /*================================== Macros ==================================*/
64 #define NUM_ELEMENTS(x) (sizeof(x) / sizeof(*x))
66 /*============================ Namespace Control =============================*/
71 /*============================= Class Definitions ============================*/
72 /*----------------------------- DevdCtl::Consumer ----------------------------*/
73 //- Consumer Static Private Data -----------------------------------------------
74 const char Consumer::s_devdSockPath[] = "/var/run/devd.seqpacket.pipe";
76 //- Consumer Public Methods ----------------------------------------------------
77 Consumer::Consumer(Event::BuildMethod *defBuilder,
78 EventFactory::Record *regEntries,
81 m_eventFactory(defBuilder),
82 m_replayingEvents(false)
84 m_eventFactory.UpdateRegistry(regEntries, numEntries);
93 Consumer::ConnectToDevd()
95 struct sockaddr_un devdAddr;
99 if (m_devdSockFD != -1) {
100 /* Already connected. */
101 syslog(LOG_DEBUG, "%s: Already connected.", __func__);
104 syslog(LOG_INFO, "%s: Connecting to devd.", __func__);
106 memset(&devdAddr, 0, sizeof(devdAddr));
107 devdAddr.sun_family= AF_UNIX;
108 strlcpy(devdAddr.sun_path, s_devdSockPath, sizeof(devdAddr.sun_path));
109 sLen = SUN_LEN(&devdAddr);
111 m_devdSockFD = socket(AF_UNIX, SOCK_SEQPACKET, 0);
112 if (m_devdSockFD == -1)
113 err(1, "Unable to create socket");
114 if (fcntl(m_devdSockFD, F_SETFL, O_NONBLOCK) < 0)
116 result = connect(m_devdSockFD,
117 reinterpret_cast<sockaddr *>(&devdAddr),
120 syslog(LOG_INFO, "Unable to connect to devd");
121 DisconnectFromDevd();
125 syslog(LOG_INFO, "Connection to devd successful");
130 Consumer::DisconnectFromDevd()
132 if (m_devdSockFD != -1) {
133 syslog(LOG_INFO, "Disconnecting from devd.");
140 Consumer::ReadEvent()
142 char buf[MAX_EVENT_SIZE + 1];
145 len = ::recv(m_devdSockFD, buf, MAX_EVENT_SIZE, MSG_WAITALL);
147 return (std::string(""));
149 /* NULL-terminate the result */
151 return (std::string(buf));
156 Consumer::ReplayUnconsumedEvents(bool discardUnconsumed)
158 EventList::iterator event(m_unconsumedEvents.begin());
159 bool replayed_any = (event != m_unconsumedEvents.end());
161 m_replayingEvents = true;
163 syslog(LOG_INFO, "Started replaying unconsumed events");
164 while (event != m_unconsumedEvents.end()) {
165 bool consumed((*event)->Process());
166 if (consumed || discardUnconsumed) {
168 event = m_unconsumedEvents.erase(event);
174 syslog(LOG_INFO, "Finished replaying unconsumed events");
175 m_replayingEvents = false;
179 Consumer::SaveEvent(const Event &event)
181 if (m_replayingEvents)
183 m_unconsumedEvents.push_back(event.DeepCopy());
188 Consumer::NextEvent()
197 evString = ReadEvent();
198 if (! evString.empty()) {
199 Event::TimestampEventString(evString);
200 event = Event::CreateEvent(m_eventFactory, evString);
202 } catch (const Exception &exp) {
204 DisconnectFromDevd();
209 /* Capture and process buffered events. */
211 Consumer::ProcessEvents()
214 while ((event = NextEvent()) != NULL) {
215 if (event->Process())
222 Consumer::FlushEvents()
228 while (! s.empty()) ;
232 Consumer::EventsPending()
234 struct pollfd fds[1];
238 fds->fd = m_devdSockFD;
239 fds->events = POLLIN;
241 result = poll(fds, NUM_ELEMENTS(fds), /*timeout*/0);
242 } while (result == -1 && errno == EINTR);
245 err(1, "Polling for devd events failed");
247 if ((fds->revents & POLLERR) != 0)
248 throw Exception("Consumer::EventsPending(): "
249 "POLLERR detected on devd socket.");
251 if ((fds->revents & POLLHUP) != 0)
252 throw Exception("Consumer::EventsPending(): "
253 "POLLHUP detected on devd socket.");
255 return ((fds->revents & POLLIN) != 0);
258 } // namespace DevdCtl