| 1 | // Copyright (C) 2024 The Qt Company Ltd. |
| 2 | // SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only |
| 3 | |
| 4 | #include "qgst_bus_observer_p.h" |
| 5 | |
| 6 | QT_BEGIN_NAMESPACE |
| 7 | |
| 8 | QGstBusObserver::QGstBusObserver(QGstBusHandle bus) |
| 9 | : QGstBusHandle{ |
| 10 | std::move(bus), |
| 11 | } |
| 12 | { |
| 13 | if (!get()) |
| 14 | return; |
| 15 | |
| 16 | GPollFD pollFd{}; |
| 17 | gst_bus_get_pollfd(bus: get(), fd: &pollFd); |
| 18 | Q_ASSERT(pollFd.fd); |
| 19 | |
| 20 | #ifndef Q_OS_WIN |
| 21 | m_socketNotifier.setSocket(pollFd.fd); |
| 22 | |
| 23 | QObject::connect(sender: &m_socketNotifier, signal: &QSocketNotifier::activated, context: &m_socketNotifier, |
| 24 | slot: [this](QSocketDescriptor, QSocketNotifier::Type) { |
| 25 | this->processAllPendingMessages(); |
| 26 | }); |
| 27 | |
| 28 | m_socketNotifier.setEnabled(true); |
| 29 | #else |
| 30 | m_socketNotifier.setHandle(reinterpret_cast<Qt::HANDLE>(pollFd.fd)); |
| 31 | |
| 32 | QObject::connect(&m_socketNotifier, &QWinEventNotifier::activated, &m_socketNotifier, |
| 33 | [this](QWinEventNotifier::HANDLE) { |
| 34 | this->processAllPendingMessages(); |
| 35 | }); |
| 36 | m_socketNotifier.setEnabled(true); |
| 37 | #endif |
| 38 | |
| 39 | } |
| 40 | |
| 41 | QGstBusObserver::~QGstBusObserver() |
| 42 | { |
| 43 | close(); |
| 44 | } |
| 45 | |
| 46 | void QGstBusObserver::close() |
| 47 | { |
| 48 | if (!get()) |
| 49 | return; |
| 50 | |
| 51 | QGstBusHandle::reset(); |
| 52 | } |
| 53 | |
| 54 | void QGstBusObserver::installMessageFilter(QGstreamerBusMessageFilter *filter) |
| 55 | { |
| 56 | Q_ASSERT(filter); |
| 57 | if (!busFilters.contains(t: filter)) |
| 58 | busFilters.append(t: filter); |
| 59 | } |
| 60 | |
| 61 | void QGstBusObserver::removeMessageFilter(QGstreamerBusMessageFilter *filter) |
| 62 | { |
| 63 | Q_ASSERT(filter); |
| 64 | busFilters.removeAll(t: filter); |
| 65 | } |
| 66 | |
| 67 | bool QGstBusObserver::processNextPendingMessage(GstMessageType type, |
| 68 | std::optional<std::chrono::nanoseconds> timeout) |
| 69 | { |
| 70 | if (!get()) |
| 71 | return false; |
| 72 | |
| 73 | GstClockTime gstTimeout = [&]() -> GstClockTime { |
| 74 | if (!timeout) |
| 75 | return GST_CLOCK_TIME_NONE; // block forever |
| 76 | return timeout->count(); |
| 77 | }(); |
| 78 | |
| 79 | QGstreamerMessage message{ |
| 80 | gst_bus_timed_pop_filtered(bus: get(), timeout: gstTimeout, types: type), |
| 81 | QGstreamerMessage::HasRef, |
| 82 | }; |
| 83 | if (!message) |
| 84 | return false; |
| 85 | |
| 86 | for (QGstreamerBusMessageFilter *filter : std::as_const(t&: busFilters)) { |
| 87 | if (filter->processBusMessage(message)) |
| 88 | break; |
| 89 | } |
| 90 | |
| 91 | return true; |
| 92 | } |
| 93 | |
| 94 | bool QGstBusObserver::currentThreadIsNotifierThread() const |
| 95 | { |
| 96 | return m_socketNotifier.thread()->isCurrentThread(); |
| 97 | } |
| 98 | |
| 99 | void QGstBusObserver::processAllPendingMessages() |
| 100 | { |
| 101 | for (;;) { |
| 102 | bool messageHandled = processNextPendingMessage(type: GST_MESSAGE_ANY, timeout: std::chrono::nanoseconds{ 0 }); |
| 103 | |
| 104 | if (!messageHandled) |
| 105 | return; |
| 106 | } |
| 107 | } |
| 108 | |
| 109 | QT_END_NAMESPACE |
| 110 | |