'Dask-SSH cluster adding private key

I am trying to instantiate a Dask-SSH cluster through Jupyter Notebook. Using CLI, and the following commands it works:

dask-ssh 10.67.22.208 10.67.22.102 10.67.22.178 10.67.22.117 10.67.22.85 --ssh-private-key ~/.ssh/my_key.pem --ssh-username ubuntu

Indeed all the nodes are able to communicate without the means of any password, and simply using the private key all nodes are accessible.

However, I am not able to figure out how it would be the equivalent command for Python:

cluster = SSHCluster(["10.67.22.208", "10.67.22.102", "10.67.22.178", "10.67.22.117", "10.67.22.85"],
    connect_options={"known_hosts": "~/.ssh/known_hosts", "client_host_keys": "~/.ssh/my_key.pem", "username":"ubuntu"},    
    scheduler_options={"port": ":8786", "dashboard_address": ":8787"})
client = Client(cluster)

Which raises the following error:

---------------------------------------------------------------------------
HostKeyNotVerifiable                      Traceback (most recent call last)
<ipython-input-16-e82e9592ecd5> in <module>
      7 
      8 ###### INITIALIZE THE SSH CLUSTER
----> 9 cluster = SSHCluster(["10.67.22.208", "10.67.22.102", "10.67.22.178", "10.67.22.117", "10.67.22.85"],
     10     connect_options={"known_hosts": "~/.ssh/known_hosts", "client_host_keys": ".ssh/my_key.pem", "username":"ubuntu"},
     11     scheduler_options={"port": ":8786", "dashboard_address": ":8787"})

~/.local/lib/python3.8/site-packages/distributed/deploy/ssh.py in SSHCluster(hosts, connect_options, worker_options, scheduler_options, worker_module, remote_python, **kwargs)
    369         for i, host in enumerate(hosts[1:])
    370     }
--> 371     return SpecCluster(workers, scheduler, name="SSHCluster", **kwargs)

~/.local/lib/python3.8/site-packages/distributed/deploy/spec.py in __init__(self, workers, scheduler, worker, asynchronous, loop, security, silence_logs, name, shutdown_on_close)
    281         if not self.asynchronous:
    282             self._loop_runner.start()
--> 283             self.sync(self._start)
    284             self.sync(self._correct_state)
    285 

~/.local/lib/python3.8/site-packages/distributed/deploy/cluster.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    190             return future
    191         else:
--> 192             return sync(self.loop, func, *args, **kwargs)
    193 
    194     def _log(self, log):

~/.local/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    352     if error[0]:
    353         typ, exc, tb = error[0]
--> 354         raise exc.with_traceback(tb)
    355     else:
    356         return result[0]

~/.local/lib/python3.8/site-packages/distributed/utils.py in f()
    335             if callback_timeout is not None:
    336                 future = asyncio.wait_for(future, callback_timeout)
--> 337             result[0] = yield future
    338         except Exception as exc:
    339             error[0] = sys.exc_info()

~/.local/lib/python3.8/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

~/.local/lib/python3.8/site-packages/distributed/deploy/spec.py in _start(self)
    310                 cls = import_term(cls)
    311             self.scheduler = cls(**self.scheduler_spec.get("options", {}))
--> 312             self.scheduler = await self.scheduler
    313         self.scheduler_comm = rpc(
    314             getattr(self.scheduler, "external_address", None) or self.scheduler.address,

~/.local/lib/python3.8/site-packages/distributed/deploy/spec.py in _()
     70             async with self.lock:
     71                 if self.status == Status.created:
---> 72                     await self.start()
     73                     assert self.status == Status.running
     74             return self

~/.local/lib/python3.8/site-packages/distributed/deploy/ssh.py in start(self)
    172         logger.debug("Created Scheduler Connection")
    173 
--> 174         self.connection = await asyncssh.connect(self.address, **self.connect_options)
    175 
    176         result = await self.connection.run("uname")

~/.local/lib/python3.8/site-packages/asyncssh/connection.py in connect(host, port, tunnel, family, flags, local_addr, config, options, **kwargs)
   6801                                          **kwargs)
   6802 
-> 6803     return await _connect(options, loop, flags, conn_factory,
   6804                           'Opening SSH connection to')
   6805 

~/.local/lib/python3.8/site-packages/asyncssh/connection.py in _connect(options, loop, flags, conn_factory, msg)
    301     # pylint: disable=broad-except
    302     try:
--> 303         await conn.wait_established()
    304         free_conn = False
    305 

~/.local/lib/python3.8/site-packages/asyncssh/connection.py in wait_established(self)
   2241         """Wait for connection to be established"""
   2242 
-> 2243         await self._waiter
   2244 
   2245     async def wait_closed(self):

~/.local/lib/python3.8/site-packages/asyncssh/connection.py in data_received(self, data, datatype)
   1045         # pylint: disable=broad-except
   1046         try:
-> 1047             while self._inpbuf and self._recv_handler():
   1048                 pass
   1049         except DisconnectError as exc:

~/.local/lib/python3.8/site-packages/asyncssh/connection.py in _recv_packet(self)
   1285         if not skip_reason:
   1286             try:
-> 1287                 processed = handler.process_packet(pkttype, seq, packet)
   1288             except PacketDecodeError as exc:
   1289                 raise ProtocolError(str(exc)) from None

~/.local/lib/python3.8/site-packages/asyncssh/packet.py in process_packet(self, pkttype, pktid, packet)
    213 
    214         if pkttype in self._packet_handlers:
--> 215             self._packet_handlers[pkttype](self, pkttype, pktid, packet)
    216             return True
    217         else:

~/.local/lib/python3.8/site-packages/asyncssh/kex_dh.py in _process_reply(self, _pkttype, _pktid, packet)
    250         packet.check_end()
    251 
--> 252         host_key = self._conn.validate_server_host_key(host_key_data)
    253         self._verify_reply(host_key, host_key_data, sig)
    254 

~/.local/lib/python3.8/site-packages/asyncssh/connection.py in validate_server_host_key(self, key_data)
   2856                 self._peer_addr, self._port, key_data)
   2857         except ValueError as exc:
-> 2858             raise HostKeyNotVerifiable(str(exc)) from None
   2859 
   2860         self._server_host_key = host_key

HostKeyNotVerifiable: Host key is not trusted


Solution 1:[1]

In case someone still needs to know, one has to make the host you are trying to connect as known hosts. Then when establishing the connection in python, it will connect automatically. Adding some commands below to do it (these were executed on Ubuntu 20.04):

ssh-keygen //need to be done once only

This will generate a private and public key pair in the default location. When it asks for passphrase leave it blank. I haven't used this with passphrase so cannot comment on the behavior.

ssh-copy-id <worker hostname or ip> // for each worker do this

This will send the generated public key to the host. Here you will need to login once using password after which the public key will be added to the worker host. Now you can test by doing SSH to your hosts and it should not ask for password any more.

ssh <worker hostname or ip> //should not ask for password

Now your python code with dask should connect to the hosts for which you have done the above.

cluster = SSHCluster(["10.67.22.208", "10.67.22.102", "10.67.22.178", "10.67.22.117", "10.67.22.85"])
client = Client(cluster)

The above code should work now.

Here is a link for doing the same : https://towardsdatascience.com/set-up-a-dask-cluster-for-distributed-machine-learning-31f587b1b553

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1