Ознакомьтесь с нашей политикой обработки персональных данных
  • ↓
  • ↑
  • ⇑
 
Записи с темой: multithreding (список заголовков)
12:22 

Сказ про Plink, Expect и PowerShell. Часть II: Нам нужен multithreading (продолжение)

Саша это я, да
Сказ про Plink, Expect и PowerShell. Часть II: Нам нужен multithreading (Начало)

Метод Close теперь должен выглядеть следующим образом:

  1. public void Close() {
  2. inProgress = false;
  3. //Ждем завершения треда чтения из потока вывода
  4. readPsErr.Join();
  5. //Ждем завершения треда чтения из потока ошибок
  6. readPsOut.Join();
  7. ps.StandardOutput.Close();
  8. ps.StandardInput.Close();
  9. ps.StandardError.Close();
  10. if (!ps.HasExited) {
  11. ps.Kill();
  12. }
  13. }

Теперь всё должно работать правильно. Можно потестировать класс:

  1. public void Main(string args[]) {
  2. ZhExpect plink = new ZhExpect();
  3. plink.Spawn("plink", "-ssh -l root -pw qwerty 10.1.10.1");
  4. plink.Interact("***");
  5. plink.Close();
  6. }

Следующий этап - реализация метода Expect. Его назначение - обнаруживать в выводе процесса определенную последовательность символов. Если последовательность обнаружена, метод возвращает true, если по истечении заданного таймаута ожидаемой строки не поступило, возвращается false. Прежде чем, описывать его, определимся с местом, в котором будут храниться считанные данные. Для этой цели введем переменную типа string receivedText. Как несложно догадаться, запись в нее должна вестись в тредах считывания вывода. Поскольку теперь наши треды работают с общим полем, мы должны обеспечить их синхронизацию. Можно использовать самый простой способ - заключение опасных участков кода в блок lock. Соответственно, нам потребуется добавить в класс поле объекта блокировки - locker.

  1. public class ZhExpect {
  2. private Process ps;
  3. private bool inProgress;
  4. private Thread readPsOut;
  5. private Thread readPsErr;
  6. private string receivedText = "";
  7. private object locker = new object();
  8. //...
  9. private void ReadProcess (object bs) {
  10. //...
  11. while (inProgress) {
  12. if (readResult.IsCompleted) {
  13. count = bstream.EndRead(readResult);
  14. string currentReceivedText = System.Text.Encoding.UTF8.GetString(buffer, 0, count);
  15. lock (locker) { receivedText = receivedText + currentReceivedText; }
  16. Console.Write(currentReceivedText);
  17. readResult = bstream.BeginRead(buffer, 0, buffer.Length, null, null);
  18. }
  19. Thread.Sleep(10);
  20. }
  21. }
  22. //...
  23. }

Опишем метод Expect. Чтобы отслеживать таймаут, в каждой итерации будем запрашивать системное время, а разницу между ним и моментом вызова метода будем сравнивать с параметром timeOut. Этот аргумент будет задан в секундах.

Если мы не хотим, чтобы последующие вызовы метода копались в уже проанализированной информации, должна быть реализована возможность очистки буфера receivedText. Иногда, с другой стороны, может возникнуть прямо противоположная ситуация: требуется проверить наличие искомой строки даже в том случае, если предудыщий вызов Expect что-то нашел. Для удовлетворения этим требованиям в метод Expect введем аргумент cleanBuffer. Если его значение true, то после обнаружения искомой строки буфер принятых данных будет очищен, если - false, то принятые данные сохранятся для последующих вызовов Expect.

Очистку строки принятых данных нужно производить строго внутри блока lock метода Expect, т.к. мы должны быть уверены, что данные не будут потеряны. Так, например, если мы создадим отдельный метод для очистки receivedText и будем вызывать его сразу после Expect, то возникнет вероятность потери некоторых данных, т.к. в промежутке между вызовом Expect и очисткой данных треды чтения получают возможность записать в receivedText новую информацию.

  1. public bool Expect(string ExpectingString, int timeOut, bool cleanBuffer) {
  2. //Запоминаем время запуска метода
  3. System.DateTime startTime = System.DateTime.Now;
  4. do {
  5. //Как только мы входим в блок lock, блокируются все остальные lock-секции для объекта locker
  6. lock(locker) {
  7. if (receivedText.Contains(ExpectingString)) {
  8. if (cleanBuffer) receivedText = "";
  9. return true;
  10. }
  11. }
  12. //Снизим нагрузку на процессор
  13. Thread.Sleep(10);
  14. } while ((System.DateTime.Now - startTime).Seconds < timeOut);
  15. return false;
  16. }

Последний штрих - простенький метод посылки данных в поток ввода процесса plink:

  1. public void Send(string SendString) {
  2. ps.StandardInput.Write(SendString);
  3. }

На этом описание нашего класса завершается, у нас есть все, что требуется. Можно потестить чего-нибудь:

  1. public static void Main(string[] args) {
  2. ZhExpect plink = new ZhExpect();
  3. for (int i = 23; i < 100; i++) {
  4. plink.Spawn("plink", "-ssh -l Admin -pw BestPassEver 10.0.0." + i);
  5. if (plink.Expect("/>", 5, true)) {
  6. plink.Send("sh ip confr");
  7. }
  8. plink.Interact("___");
  9. plink.Close();
  10. }
  11. }

Окончательный исходный код класса.


Note
  1. Метод Close в текущем исполнении, вообще говоря, не гарантирует корректного разъединения с сетевым устройством. Дело в том, что когда мы вручную убиваем процесс plink, пользователь может остаться залогиненым на устройстве. В некоторых случаях это может оказаться критичным. Однако если завершать работу plink посылкой ctrl-c, то сессия будет завершена корректно. Поэтому имеет смысл добавить в код Close следующую строку:
    1. //Эта строка должна быть расположена до вызова ps.StandardInput.Close()
    2. ps.StandardInput.Write("\x03");

  2. Используемая нами версия метода Expect работает далеко не идеально. Представим, что устройство в ответ на определенную команду может выдать один или несколько различных последовательностей, принадлежащих некому ограниченному множеству. если мы хотим отработать каждый возможный вариант, то нам необходимо сравнивать принимаемые данные с неким массивом строк. Причем проверка оставшегося времени для ожидания должна происходить после достижения конца массива. Таким образом, аргументом метода Expect должна являться не строка, а массив строк, а возвращаемое значение - массив типа bool.

  3. Весьма полезной может оказаться возможность логирования сессии. Я лично для этих целей использую еще одно дополнительное поле в классе Expect, в которе копируется весь вывод процесса. Это поле не обнуляется вплоть до вызова метода Close. В целом можно использовать и уже существующий receivedText.

В следующей части мы перейдем непосредственно к написанию скриптов на PowerShell.


Сказ про Plink, Expect и PowerShell. Предисловие
Сказ про Plink, Expect и PowerShell. Часть I: Простое решение
Сказ про Plink, Expect и PowerShell. Часть II: Нам нужен multithreading (Начало)
Сказ про Plink, Expect и PowerShell. Часть III: PowerShell!

@темы: синхронизация потоков, multithreding, Windows, Plink, Interact, Expect, C#, .NET Framework

12:20 

Сказ про Plink, Expect и PowerShell. Часть II: Нам нужен multithreading (Начало)

Саша это я, да
Результаты, полученные в первой части, вполне пригодны для работы и кому-нибудь их действительно хватит, однако многие, я в том числе, хотели бы получить расширенные возможности, нечто более похожее на привычный, удобный Expect. Вообще говоря, Expect предполагает анализ принимаемых от конкретного устройства данных. Например, мы должны отправить определенную команду, только если получим от конфигуируемого узла промпт определенного вида. Что нам нужно сделать, чтобы получить доступ к оправляемым устройством данным? Мы должны перенаправить поток вывода plink и использовать команды чтения из потока.

Всвязи с вышеозначенным напомню поставленную задачу

Синхронное и асинхронное чтение

Свойство StandardOutput класса Process предоставляет доступ к объекту класса System.IO.StreamReader. Рассмотрим его методы, которые могут считывать с данные с потока.
  1. Read() - синхронно считывает один символ из потока. Возвращает целочисленный код символа.
  2. Read(Char[], Int32, Int32) - синхронно считывает максимальное доступное количество символов в буфер. Возвращает количество символов, которые были считаны.
  3. ReadBlock() - блокирующая версия метода Read.
  4. ReadLine() - считывает строку символов из потока и возвращает ее.
  5. ReadToEnd() - считывает данные до конца потока, которые потом возвращает в виде строки. Нам не подходит, так как наш поток закончится только в момент завершения сессии.

В нашем распоряжении, таким образом, пока есть только 3 синхронных метода чтения. Сразу хочу заметить, что подойдёт нам, фактически, только один.ReadLine() и Read(Char[], Int32, Int32) считывают вводимые данные "пачками". Напомню, что сейчас речь идет о синхронном чтении из потока. Это означает, что обсуждаемые методы будут блокировать поток выполнения до тех пор, пока не решат, что ввод данных завершен. Проблема же кроется в том, как эти методы определяют момент завершения ввода. ReadLine() будет ожидать, пока в потоке ввода не обнаружится символ завершения строки (посылка которого инициируется нажатием enter). Например, промпт, передаваемый устройством, редко содержит символ \r. То есть он не будет выведен на консоль, пока мы сами не пошлем устройству символ возврата каретки. Проблемы будут и с выводом на экран локального ввода — если режим "echo" отключен (ReadKey(false)), то введенные нами символы будут будут отражены в окне консоли только после нажатия клавиши enter (что эквивалентно посылке \r). Если же режим "echo" включен ReadKey(true), то мы сразу увидим в консоли печатаемые нами символы, но после нажатия enter устройство отошлет нам наш же ввод, в результате чего он будет отображен повторно. Метод Read(Char[], Int32, Int32) работает иначе - он блокирует выполнение до тех пор, пока не заполнит свой буфер. При этом нет гарантии, что посланный устройством промпт заполнит буфер аккурат до краёв, и выполнение будет разблокировано. Отсюда аналогичные проблемы.

Асинхронные методы, в противовес, позволяют избежать блокировки потока. .NET Framework, начиная с версии 4.5, предусматривает в составе класса System.IO.StreamReader несколько методов асинхронного чтения (ReadAsync, ReadBlockAsync, ReadLineAsync, ReadToEndAsync), но мы не будем их рассматривать, поскольку в XP, который не совместим с версией 4.5, их использовать не получиться. Но не отчаивайтесь, есть и другой способ асинхронного чтения.

Чтобы считывать из потока асинхронно, нужно сначала получить с помощью свойства BaseStream объекта StreamReader объект типа Stream и позже использовать уже его методы BeginRead и EndRead. Причем после каждого BeginRead должен быть обязательно вызван EndRead. Подробности рассмотрим чуть позже.

Переход на C#

Следует заметить, что обойтись одним только PowerShell'ом у нас не получится. Давайте попробуем разобраться, почему организовать адекватную работу с перенаправленными потоками средствами PoSh весьма сложно.
  1. Чтение и запись могут выполняться только в едином потоке, поэтому методы синхронного чтения всегда намертво будут блокировать выполнение основного потока, пока не получат данные. Следовательно, перед тем, как начать выполнять метод синхронного чтения, нужно удостовериться, что на выходе процесса есть какие-то данные, иначе выполнение заблокируется навсегда. Хорошего способа реализовать такую проверку я не нашел. Например, рекомендуемый для этой цели Peek() сам может заблокировать выполнение.
  2. Методы асинхронного чтения без проблем начнут операцию чтения и сразу после этого вернут управление. Однако операция завершения асинхронного чтения и получение считанных данных возможны только в том случае, если опять-таки было считано хоть что-то. Вообще говоря, для асинхронного чтения есть эффективный способ проверки этого условия, но даже при его использовании добиться адекватного вывода на консоль довольно сложно (если это вообще возможно - у меня так и не получилось).
Кроме того, заметим, что свои служебные сообщения plink передает в стандартный поток ошибок. В качестве примера можно привести запрос на добавление сетевого узла в список доверенных при подключении через ssh. Т.е. поток ошибок тоже желательно перенаправить, что влечет за собой дополнительные трудности.

Самый очевидный выход из ситуации - это использование multithreading'а (многопоточности). Гораздо проще написать библиотеку на C#, средствами которой каждый поток ввода/вывода будет существовать в отдельном потоке выполнения (чтобы избежать путаницы далее вместо "потоки выполнения" я буду употреблять "треды";). Что касается выбора между асинхронным и синхронным чтением, то это вопрос личных предпочтений. Как показала практика асинхронный метод оказался более сложным в исполнении, но также и несколько более эффективным, чем синхронное чтение каждого символа по отдельности.

Я буду описывать класс постепенно, добавляя в него необходимые элементы. Для начала создадим простой и бесполезный класс, который просто перенаправляет потоки ввода, вывода и ошибок процесса. Данные из потоков вывода и ошибок он будет выводить на экран, а ввод забирать с клавиатуры. Мы опишем в классе всего одно поле, четыре метода и конструктор.

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Diagnostics;
  4. using System.Threading;
  5. using System.IO;
  6. namespace Zh
  7. {
  8. public class ZhExpect
  9. {
  10. private Process ps;
  11. //В конструкторе создаем экземпляр класса Process и описываем StartInfo для него точно
  12. //так же, как мы делали это ранее (см.).
  13. public ZhExpect() {
  14. ps = new Process();
  15. ps.StartInfo.UseShellExecute = false;
  16. ps.StartInfo.RedirectStandardError = true;
  17. ps.StartInfo.RedirectStandardInput = true;
  18. ps.StartInfo.RedirectStandardOutput = true;
  19. }
  20. //Запуск процесса и создание двух дополнительных тредов будет осуществляться в методе Spawn.
  21. public void Spawn(String fileName, String arguments) {
  22. ps.StartInfo.Arguments = arguments;
  23. ps.StartInfo.FileName = fileName;
  24. ps.Start();
  25. new Thread(ReadProcess).Start(ps.StandardOutput.BaseStream);
  26. new Thread(ReadProcess).Start(ps.StandardError.BaseStream);
  27. }

ReadProcess - это метод, ссылка на который хранится в параметрезированном делегате, который указывается в конструкторе Thread. Именно код метода ReadProcess будет отрабатываться в каждом отдельном треде. Так как сигнатура делегата предполагает единственный параметр типа object, аргумент в ReadProcess должен иметь именно такой тип, а уже в процессе выполнения будет производиться приведение к типу Stream. В качестве параметра методу будет передаваться значение свойства BaseStream стандартных потоков вывода и ошибок, так как именно этот объект предоставляет средства для асинхронного считывания. Само асинхронное чтение осуществляется в два этапа: сначала для объекта Stream вызывается метод BeginRead. В качестве аргументов метод принимает буфер данных типа byte[], индекс начальной позиции для чтения и максимальное количество байтов, которые он может считать (есть еще два параметра, но их рассмотрение мы опустим и на их место будет передавать null). BeginRead возвращает объект System.IAsyncResult, который используется для определения состояния процесса асинхронного чтения. Саму проверку можно осуществлять с помощью его свойства IsCompleted. Если значение свойства true, то асинхронная операция выполнена, и мы можем завершить чтение вызовом метода Stream.EndRead, который возвращает число считанных байтов. Теперь эти данные становятся доступны через указанный ранее буфер. Прежде чем вывести их на экран, переконвертируем их в строковый тип методом System.Text.Encoding.UTF8.GetString(buffer, 0, count), где buffer - наш буфер с данными, count - число считанных байт. Вот так это выглядит в коде:

  1. private void ReadProcess (object bs) {
  2. //Приведение типа
  3. Stream bstream = (Stream) bs;
  4. //Буффер, в котором будут сохраняться считанные данные
  5. byte[] buffer = new byte[1000];
  6. //Начало асинхронного чтения
  7. System.IAsyncResult readResult = bstream.BeginRead(buffer, 0, buffer.Length, null, null);
  8. //Переменная, в которой будет хранится число считанных байт
  9. int count;
  10. //Рабочий цикл, существует пока процесс не завершится
  11. while (!ps.HasExited) {
  12. //Если данные считаны, завершаем чтение
  13. if (readResult.IsCompleted) {
  14. //Получение количество считанных байт
  15. count = bstream.EndRead(readResult);
  16. //Конвертация и вывод данных на экран
  17. Console.Write(System.Text.Encoding.UTF8.GetString(buffer, 0, count));
  18. //Повторный запуск асинхронного чтения
  19. readResult = bstream.BeginRead(buffer, 0, buffer.Length, null, null);
  20. }
  21. //Снизим нагрузку на процессор
  22. Thread.Sleep(10);
  23. }
  24. }

В языке Expect для перехода режим пользовательского управления используется функция Interact. Создадим метод с аналогичным назначением и названием. Он будет принимать один аргумент, представляющий собой строку текста, при введении которой будет произведен выход из режима Interact.

  1. public void Interact(string stopString) {
  2. StreamWriter psStreamWriter = ps.StandardInput;
  3. System.ConsoleKeyInfo key;
  4. String str = "";
  5. while(!ps.HasExited) {
  6. key = Console.ReadKey(true);
  7. //Посылаем спецсимволы, чтобы работали нажатия на клавиши со стрелками
  8. if (key.Key.Equals(System.ConsoleKey.UpArrow)) psStreamWriter.Write("x1b[A");
  9. else if (key.Key.Equals(System.ConsoleKey.DownArrow)) psStreamWriter.Write("x1b[B");
  10. else if (key.Key.Equals(System.ConsoleKey.LeftArrow)) psStreamWriter.Write("x1b[D");
  11. else if (key.Key.Equals(System.ConsoleKey.RightArrow)) psStreamWriter.Write("x1b[C");
  12. //В противном случае, если был введен любой другой символ, дописываем его в строку, с которой
  13. //будем сравнивать stopString
  14. else {
  15. str = str + key.KeyChar;
  16. //Записываем в поток ввода введенный символ
  17. psStreamWriter.Write(key.KeyChar);
  18. }
  19. //Как правило, строка, определяющая выход из режима Interact, представляет собой набор символов,
  20. //который не может быть интерпретирован оборудованием, поэтому при обнаружении этой последовательности
  21. //мы запишем в поток также специальные символы \b, количество которых равно длинне stopString.
  22. //Обычно это эквивалентно стиранию введенных символов.
  23. if (str.Contains(stopString)) {
  24. for (int i = 0; i < stopString.Length-1; i++) {
  25. psStreamWriter.Write("b");
  26. }
  27. break;
  28. }
  29. }
  30. }

За освобождение ресурсов после закрытия соединения отвечает метод Close. На него возложена задача закрытия потоков и освобождения ресурсов, занятых на чтение и запись. Вызывать этот метод желательно после каждой сессии - это обеспечит стабильность в работе.

  1. public void Close() {
  2. ps.StandardOutput.Close();
  3. ps.StandardInput.Close();
  4. ps.StandardError.Close();
  5. if (!ps.HasExited) {
  6. ps.Kill();
  7. }
  8. }

Существование двух тредов для чтения вывода требует от нас некоторой аккуратности. После того как мы осовбодим ресурсы методом Close, тредам больше не с чем будет работать(нельзя считывать с закрытых потоков). Например, BeginRead будет генерировать исключение "ObjectDisposedException: Cannot access a Closed file". Таким образом, мы должны завершить треды чтения до того, как потоки будут закрыты. Для этого введем в класс поле inProgress, которое после старта процесса будет устанавливаться в true, а сразу после вызова метода Close в false. Это поле теперь будет служить условием продолжения цикла в методе ReadProcess:

  1. while (inProgress) {
  2. if (readResult.IsCompleted) {
  3. count = bstream.EndRead(readResult);
  4. Console.Write(System.Text.Encoding.UTF8.GetString(buffer, 0, count));
  5. readResult = bstream.BeginRead(buffer, 0, buffer.Length, null, null);
  6. }
  7. Thread.Sleep(10);
  8. }

Прежде чем производить операции по освобождению ресурсов в методе Close, мы должны убедиться, что оба треда чтения из потоков успешно завершены. Для этой цели нам сгодится метод класса Thread - Join(), который остановит выполнение текущего потока и будет дожидаться завершения треда, для которого был вызван. Поскольку этот метод, что естественно, не статический, нам придется добавить еще два поля типа Thread - readPsOut и readPsErr. Способ запуска потоков в методе Spawn также должен измениться:

  1. public void Spawn(String fileName, String arguments) {
  2. ps.StartInfo.Arguments = arguments;
  3. ps.StartInfo.FileName = fileName;
  4. ps.Start();
  5. readPsOut = new Thread(ReadProcess);
  6. readPsErr = new Thread(ReadProcess);
  7. inProgress = true;
  8. readPsOut.Start(ps.StandardOutput.BaseStream);
  9. readPsErr.Start(ps.StandardError.BaseStream);
  10. }



Сказ про Plink, Expect и PowerShell. Предисловие
Сказ про Plink, Expect и PowerShell. Часть I: Простое решение
Сказ про Plink, Expect и PowerShell. Часть II: Нам нужен multithreading (Продолжение)
Сказ про Plink, Expect и PowerShell. Часть III: PowerShell!

@темы: синхронизация потоков, multithreding, Windows, Plink, Interact, Expect, C#, .NET Framework

Выдох Вэйдера

главная